# -*- coding: utf-8 -*- """ @Time : 2022/5/19 11:43 @Auth : Locky @File :GatewayService.py @IDE :PyCharm """ import logging import os import apns2 import jpush from pyfcm import FCMNotification from AnsjerPush.config import APP_BUNDLE_DICT, APNS_MODE, BASE_DIR, APNS_CONFIG, FCM_CONFIG, JPUSH_CONFIG # 网关推送类 from Service.CommonService import CommonService class GatewayPushService: # 获取推送消息标题 @staticmethod def get_msg_title(app_bundle_id, nickname): if app_bundle_id in APP_BUNDLE_DICT.keys(): return APP_BUNDLE_DICT[app_bundle_id] + '(' + nickname + ')' else: return nickname # 获取推送消息内容 @staticmethod def get_msg_text(n_time, tz, lang, alarm): n_date = CommonService.get_now_time_str(n_time=n_time, tz=tz, lang=lang) if lang == 'cn': msg_text = '{} 日期:{}'.format(alarm, n_date) else: msg_text = '{} date:{}'.format(alarm, n_date) return msg_text # ios apns 推送 @staticmethod def ios_apns_push(nickname, app_bundle_id, token_val, n_time, event_type, msg_title, msg_text, launch_image=None): logger = logging.getLogger('info') try: pem_path = os.path.join(BASE_DIR, APNS_CONFIG[app_bundle_id]['pem_path']) cli = apns2.APNSClient(mode=APNS_MODE, client_cert=pem_path) alert = apns2.PayloadAlert(title=msg_title, body=msg_text, launch_image=launch_image) push_data = {'alert': 'Motion', 'msg': '', 'sound': '', 'zpush': '1', 'received_at': n_time, 'event_time': n_time, 'event_type': event_type, 'nickname': nickname } payload = apns2.Payload(alert=alert, custom=push_data, sound='default', mutable_content=True) n = apns2.Notification(payload=payload, priority=apns2.PRIORITY_LOW) res = cli.push(n=n, device_token=token_val, topic=app_bundle_id) assert res.status_code == 200 except Exception as e: logger.info('--->IOS推送异常{}'.format(repr(e))) return repr(e) # android fcm 推送 @staticmethod def android_fcm_push(nickname, app_bundle_id, token_val, n_time, event_type, msg_title, msg_text, image=''): logger = logging.getLogger('info') try: serverKey = FCM_CONFIG[app_bundle_id] push_service = FCMNotification(api_key=serverKey) push_data = {'alert': 'Motion', 'msg': '', 'sound': 'sound.aif', 'zpush': '1', 'received_at': n_time, 'event_time': n_time, 'event_type': event_type, 'nickname': nickname } result = push_service.notify_single_device(registration_id=token_val, message_title=msg_title, message_body=msg_text, data_message=push_data, extra_notification_kwargs={ 'image': image }, extra_kwargs={'default_sound': True, 'default_vibrate_timings': True, 'default_light_settings': True, } ) logger.info('fcm推送结果:{}'.format(result)) return result except Exception as e: return repr(e) # android 极光 推送 @staticmethod def android_jpush(nickname, app_bundle_id, token_val, n_time, event_type, msg_title, msg_text): try: app_key = JPUSH_CONFIG[app_bundle_id]['Key'] master_secret = JPUSH_CONFIG[app_bundle_id]['Secret'] # 换成各自的app_key和master_secret _jpush = jpush.JPush(app_key, master_secret) push = _jpush.create_push() push.audience = jpush.registration_id(token_val) push_data = {'alert': 'Motion', 'msg': '', 'sound': 'sound.aif', 'zpush': '1', 'received_at': n_time, 'event_time': n_time, 'event_type': event_type, 'nickname': nickname } android = jpush.android(title=msg_title, big_text=msg_text, alert=msg_text, extras=push_data, priority=1, style=1, alert_type=7 ) push.notification = jpush.notification(android=android) push.platform = jpush.all_ res = push.send() assert res.status_code == 200 except Exception as e: return repr(e)