import json from AnsjerPush.config import LOGGER, DATA_PUSH_EVENT_TYPE_LIST from Object.enums.EventTypeEnum import EventTypeEnumObj from Service.CommonService import CommonService from Service.HuaweiPushService import push_admin from Service.HuaweiPushService.push_admin import messaging class HuaweiPushObject: # 华为推送服务类 def __init__(self): self.app_id = '101064781' self.app_secret = '29d5c5367208e35079f14779597b8f6bcc28ee39091546ed577862231fdd0fdd' self.init_app() def init_app(self): """init sdk app""" push_admin.initialize_app(self.app_id, self.app_secret) def send_push_notify_message(self, token_val, msg_title, msg_text, image_url=None, uid='', nickname='', n_time='', event_type='0', channel='', app_bundle_id='', appBundleId=''): """ 发送推送消息 @param token_val: 手机推送token @param msg_title: 标题 @param msg_text: 内容 @param image_url: 图片链接 @param uid: uid @param nickname: 设备昵称 @param n_time: 当前时间 @param event_type: 事件类型 @param channel: 通道 @param app_bundle_id: APP包id @param appBundleId: APP包id @return: bool """ LOGGER.info( '华为推送参数: ' 'uid:{}, token_val:{}, msg_title:{}, msg_text:{}, image_url:{}, event_type:{}, n_time:{}, channel:{}'. format(uid, token_val, msg_title, msg_text, image_url, event_type, n_time, channel)) send_succeed = self.send_notify_message(msg_title, msg_text, image_url, uid, nickname, event_type, n_time, token_val, channel) if int(event_type) in DATA_PUSH_EVENT_TYPE_LIST: self.send_data_message(uid, event_type, n_time, token_val, channel) return send_succeed def send_notify_message( self, msg_title, msg_text, image_url, uid, nickname, event_type, n_time, token_val, channel): """ 发送通知推送 @param msg_title: @param msg_text: @param image_url: @param uid: @param nickname: @param event_type: @param n_time: @param token_val: @param channel: @return: bool """ LOGGER.info('{}进入发送通知推送函数'.format(uid)) msg_title = '设备昵称: {}'.format(msg_title) notification = messaging.Notification( title=msg_title, body=msg_text, image=image_url ) # 跳转类型 jump_type = CommonService.get_jump_type(event_type) # 自定义键值对 data = { 'alert': msg_text, 'msg': '', 'sound': 'sound.aif', 'zpush': '1', 'uid': uid, 'nickname': nickname, 'event_type': event_type, 'received_at': n_time, 'event_time': n_time, 'channel': channel, 'jump_type': jump_type } data = json.dumps(data) # 推送通知内容配置 intent = 'intent://com.vivo.pushvideo/detail?#Intent;scheme=vpushscheme;launchFlags=0x10000000;S.uid={};S.event_type={};S.event_time={};end'.format( uid, event_type, n_time) android_notification = self.android_notification(msg_title, msg_text, intent) # 安卓配置 android = messaging.AndroidConfig( data=data, collapse_key=-1, urgency=messaging.AndroidConfig.NORMAL_PRIORITY, ttl='10000s', bi_tag='the_sample_bi_tag_for_receipt_service', notification=android_notification, category='DEVICE_REMINDER' ) message = messaging.Message( notification=notification, android=android, token=[token_val] ) try: import certifi response = messaging.send_message(message, verify_peer=certifi.where()) LOGGER.info('{}华为通知推送响应: {}'.format(uid, json.dumps(vars(response)))) assert (response.code == '80000000') return True except Exception as e: LOGGER.error('uid:{}, 华为通知推送异常: {}'.format(uid ,repr(e))) return False @staticmethod def send_data_message(uid, event_type, n_time, token_val, channel): """ 发送透传推送 @param uid: @param event_type: @param n_time: @param token_val: @param channel: @return: None """ LOGGER.info('{}进入发送透传推送函数'.format(uid)) data = { 'uid': uid, 'event_type': event_type, 'event_time': n_time, 'channel': channel } data = json.dumps(data) android = messaging.AndroidConfig( collapse_key=-1, urgency=messaging.AndroidConfig.HIGH_PRIORITY, ttl='10000s', bi_tag='the_sample_bi_tag_for_receipt_service' ) message = messaging.Message( data=data, android=android, token=[token_val] ) try: import certifi response = messaging.send_message(message, verify_peer=certifi.where()) LOGGER.info('{}华为透传推送响应: {}'.format(uid, json.dumps(vars(response)))) assert (response.code == '80000000') except Exception as e: LOGGER.info('华为透传推送异常: {}'.format(repr(e))) @staticmethod def android_notification(msg_title, msg_text, intent): return messaging.AndroidNotification( icon='/raw/ic_launcher2', color='#AACCDD', sound='/raw/shake', default_sound=True, click_action=messaging.AndroidClickAction( action_type=1, intent=intent ), body_loc_key='M.String.body', body_loc_args=('boy', 'dog'), title_loc_key='M.String.title', title_loc_args=['Girl', 'Cat'], channel_id='1', notify_summary='', multi_lang_key={'title_key': {'en': 'value1'}, 'body_key': {'en': 'value2'}}, style=1, big_title=msg_title, big_body=msg_text, auto_clear=86400000, importance=messaging.AndroidNotification.PRIORITY_HIGH, light_settings=messaging.AndroidLightSettings(color=messaging.AndroidLightSettingsColor( alpha=0, red=0, green=1, blue=1), light_on_duration='3.5', light_off_duration='5S'), badge=messaging.AndroidBadgeNotification( add_num=1, clazz='Classic'), visibility=messaging.AndroidNotification.PUBLIC, foreground_show=False )