123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- import json
- from AnsjerPush.config import LOGGER
- from Object.enums.EventTypeEnum import EventTypeEnumObj
- from Service.VSeesHuaweiPushService import push_admin
- from Service.VSeesHuaweiPushService.push_admin import messaging
- class VseesHuaweiPushObject:
- # 华为推送服务类
- def __init__(self):
- self.app_id = '108703647'
- self.app_secret = '6bc5b0b0ab4588fcd667b3e6c1ab4fd5d857de21ad69ee9d46e077b9f5f24e8f'
- self.init_app()
- def init_app(self):
- """init sdk app"""
- push_admin.initialize_app(self.app_id, self.app_secret)
- def send_push_notify_message(self, token_val, msg_title, msg_text, image_url=None, uid='', nickname='', n_time='',
- event_type='0', channel='', app_bundle_id='', appBundleId=''):
- """
- 发送推送消息
- @param token_val: 手机推送token
- @param msg_title: 标题
- @param msg_text: 内容
- @param image_url: 图片链接
- @param uid: uid
- @param nickname: 设备昵称
- @param n_time: 当前时间
- @param event_type: 事件类型
- @param channel: 通道
- @param app_bundle_id: APP包id
- @param appBundleId: APP包id
- @return: bool
- """
- LOGGER.info(
- '华为推送参数(微瞳): '
- 'uid:{}, token_val:{}, msg_title:{}, msg_text:{}, image_url:{}, event_type:{}, n_time:{}, channel:{}'.
- format(uid, token_val, msg_title, msg_text, image_url, event_type, n_time, channel))
- send_succeed = self.send_notify_message(msg_title, msg_text, image_url, uid, nickname,
- event_type, n_time, token_val, channel)
- if int(event_type) in EventTypeEnumObj.DATA_PUSH_EVENT_TYPE_LIST.value:
- self.send_data_message(uid, event_type, n_time, token_val, channel)
- return send_succeed
- def send_notify_message(
- self, msg_title, msg_text, image_url, uid, nickname, event_type, n_time, token_val, channel):
- """
- 发送通知推送
- @param msg_title:
- @param msg_text:
- @param image_url:
- @param uid:
- @param nickname:
- @param event_type:
- @param n_time:
- @param token_val:
- @param channel:
- @return: bool
- """
- LOGGER.info('{}进入发送通知推送函数(微瞳)'.format(uid))
- msg_title = '设备昵称: {}'.format(msg_title)
- notification = messaging.Notification(
- title=msg_title,
- body=msg_text,
- image=image_url
- )
- # 自定义键值对
- data = {
- 'alert': msg_text, 'msg': '', 'sound': 'sound.aif', 'zpush': '1', 'uid': uid, 'nickname': nickname,
- 'event_type': event_type, 'received_at': n_time, 'event_time': n_time, 'channel': channel
- }
- data = json.dumps(data)
- # 推送通知内容配置
- intent = 'intent://com.vivo.pushvideo/detail?#Intent;scheme=vpushscheme;launchFlags=0x10000000;S.uid={};S.event_type={};S.event_time={};end'.format(
- uid, event_type, n_time)
- android_notification = self.android_notification(msg_title, msg_text, intent)
- # 微瞳消息类型暂时改为音视频通话
- category = 'VOIP' if self.app_id == '108703647' else 'DEVICE_REMINDER'
- # 安卓配置
- android = messaging.AndroidConfig(
- data=data,
- collapse_key=-1,
- urgency=messaging.AndroidConfig.NORMAL_PRIORITY,
- ttl='10000s',
- bi_tag='the_sample_bi_tag_for_receipt_service',
- notification=android_notification,
- category=category
- )
- message = messaging.Message(
- notification=notification,
- android=android,
- token=[token_val]
- )
- try:
- import certifi
- response = messaging.send_message(message, verify_peer=certifi.where())
- LOGGER.info('{}华为通知推送响应(微瞳): {}'.format(uid, json.dumps(vars(response))))
- assert (response.code == '80000000')
- return True
- except Exception as e:
- LOGGER.info('华为通知推送异常(微瞳): {}'.format(repr(e)))
- return False
- @staticmethod
- def send_data_message(uid, event_type, n_time, token_val, channel):
- """
- 发送透传推送
- @param uid:
- @param event_type:
- @param n_time:
- @param token_val:
- @param channel:
- @return: None
- """
- LOGGER.info('{}进入发送透传推送函数(微瞳)'.format(uid))
- data = {
- 'uid': uid, 'event_type': event_type, 'event_time': n_time, 'channel': channel
- }
- data = json.dumps(data)
- android = messaging.AndroidConfig(
- collapse_key=-1,
- urgency=messaging.AndroidConfig.HIGH_PRIORITY,
- ttl='10000s',
- bi_tag='the_sample_bi_tag_for_receipt_service'
- )
- message = messaging.Message(
- data=data,
- android=android,
- token=[token_val]
- )
- try:
- import certifi
- response = messaging.send_message(message, verify_peer=certifi.where())
- LOGGER.info('{}华为透传推送响应(微瞳): {}'.format(uid, json.dumps(vars(response))))
- assert (response.code == '80000000')
- except Exception as e:
- LOGGER.info('华为透传推送异常(微瞳): {}'.format(repr(e)))
- @staticmethod
- def android_notification(msg_title, msg_text, intent):
- return messaging.AndroidNotification(
- icon='/raw/ic_launcher2',
- color='#AACCDD',
- sound='/raw/shake',
- default_sound=True,
- click_action=messaging.AndroidClickAction(
- action_type=1,
- intent=intent
- ),
- body_loc_key='M.String.body',
- body_loc_args=('boy', 'dog'),
- title_loc_key='M.String.title',
- title_loc_args=['Girl', 'Cat'],
- channel_id='1',
- notify_summary='',
- multi_lang_key={'title_key': {'en': 'value1'}, 'body_key': {'en': 'value2'}},
- style=1,
- big_title=msg_title,
- big_body=msg_text,
- auto_clear=86400000,
- importance=messaging.AndroidNotification.PRIORITY_HIGH,
- light_settings=messaging.AndroidLightSettings(color=messaging.AndroidLightSettingsColor(
- alpha=0, red=0, green=1, blue=1), light_on_duration='3.5', light_off_duration='5S'),
- badge=messaging.AndroidBadgeNotification(
- add_num=1, clazz='Classic'),
- visibility=messaging.AndroidNotification.PUBLIC,
- foreground_show=False,
- # buttons=[
- # {'name': '接听', 'action_type': 1},
- # {'name': '拒绝', 'action_type': 3}
- # ]
- )
- @staticmethod
- def huawei_transparent_transmission(nickname, app_bundle_id, token_val, n_time, event_type, msg_title,
- msg_text, user_id):
- """
- 发送透传推送
- @param nickname:
- @param app_bundle_id:
- @param event_type:
- @param n_time:
- @param token_val:
- @param msg_title:
- @param msg_text:
- @param user_id:
- @return: None
- """
- data = {
- 'nickname': nickname, 'event_type': event_type, 'event_time': n_time, 'msg_title': msg_title,
- 'msg_text': msg_text
- }
- data = json.dumps(data)
- android = messaging.AndroidConfig(
- collapse_key=-1,
- urgency=messaging.AndroidConfig.HIGH_PRIORITY,
- ttl='10000s',
- bi_tag='the_sample_bi_tag_for_receipt_service'
- )
- message = messaging.Message(
- data=data,
- android=android,
- token=[token_val]
- )
- try:
- import certifi
- response = messaging.send_message(message, verify_peer=certifi.where())
- LOGGER.info('{}退出登录,华为透传推送响应(微瞳): {}'.format(user_id, json.dumps(vars(response))))
- assert (response.code == '80000000')
- except Exception as e:
- LOGGER.info('{}退出登录,华为透传推送异常(微瞳): {}'.format(user_id, repr(e)))
|