123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- # -*- coding: utf-8 -*-
- """
- @Time : 2022/5/19 11:43
- @Auth : Locky
- @File :GatewayService.py
- @IDE :PyCharm
- """
- import logging
- import os
- import apns2
- import jpush
- from pyfcm import FCMNotification
- from AnsjerPush.config import APP_BUNDLE_DICT, APNS_MODE, BASE_DIR, APNS_CONFIG, FCM_CONFIG, JPUSH_CONFIG
- # 网关推送类
- from Service.CommonService import CommonService
- class GatewayPushService:
- # 获取推送消息标题
- @staticmethod
- def get_msg_title(app_bundle_id, nickname):
- if app_bundle_id in APP_BUNDLE_DICT.keys():
- return APP_BUNDLE_DICT[app_bundle_id] + '(' + nickname + ')'
- else:
- return nickname
- # 获取推送消息内容
- @staticmethod
- def get_msg_text(n_time, tz, lang, alarm):
- n_date = CommonService.get_now_time_str(n_time=n_time, tz=tz, lang=lang)
- if lang == 'cn':
- msg_text = '{} 日期:{}'.format(alarm, n_date)
- else:
- msg_text = '{} date:{}'.format(alarm, n_date)
- return msg_text
- # ios apns 推送
- @staticmethod
- def ios_apns_push(nickname, app_bundle_id, token_val, n_time, event_type, msg_title, msg_text,
- uid='', channel='1', launch_image=None):
- logger = logging.getLogger('info')
- try:
- pem_path = os.path.join(BASE_DIR, APNS_CONFIG[app_bundle_id]['pem_path'])
- logger.info('apns推送app_bundle_id:{}, pem_path:{}'.format(app_bundle_id, pem_path))
- cli = apns2.APNSClient(mode=APNS_MODE, client_cert=pem_path)
- alert = apns2.PayloadAlert(title=msg_title, body=msg_text, launch_image=launch_image)
- push_data = {'alert': 'Motion', 'msg': '', 'sound': '', 'zpush': '1', 'uid': uid, 'channel': channel,
- 'received_at': n_time, 'event_time': n_time, 'event_type': event_type, 'nickname': nickname,
- 'image_url': launch_image
- }
- payload = apns2.Payload(alert=alert, custom=push_data, sound='default', category='myCategory',
- mutable_content=True)
- n = apns2.Notification(payload=payload, priority=apns2.PRIORITY_LOW)
- res = cli.push(n=n, device_token=token_val, topic=app_bundle_id)
- assert res.status_code == 200
- except Exception as e:
- logger.info('--->IOS推送异常{}'.format(repr(e)))
- return repr(e)
- # android fcm 推送
- @staticmethod
- def android_fcm_push(nickname, app_bundle_id, token_val, n_time, event_type, msg_title, msg_text,
- uid='', channel='1', image=''):
- logger = logging.getLogger('info')
- try:
- serverKey = FCM_CONFIG[app_bundle_id]
- push_service = FCMNotification(api_key=serverKey)
- push_data = {'alert': 'Motion', 'msg': '', 'sound': 'sound.aif', 'zpush': '1', 'image': image,
- 'received_at': n_time, 'event_time': n_time, 'event_type': event_type, 'nickname': nickname,
- 'uid': uid, 'channel': channel
- }
- result = push_service.notify_single_device(registration_id=token_val, message_title=msg_title,
- message_body=msg_text, data_message=push_data,
- extra_kwargs={'default_sound': True,
- 'default_vibrate_timings': True,
- 'default_light_settings': True,
- }
- )
- logger.info('fcm推送结果:{}'.format(result))
- return result
- except Exception as e:
- return repr(e)
- # android 极光 推送
- @staticmethod
- def android_jpush(nickname, app_bundle_id, token_val, n_time, event_type, msg_title, msg_text):
- try:
- app_key = JPUSH_CONFIG[app_bundle_id]['Key']
- master_secret = JPUSH_CONFIG[app_bundle_id]['Secret']
- # 换成各自的app_key和master_secret
- _jpush = jpush.JPush(app_key, master_secret)
- push = _jpush.create_push()
- push.audience = jpush.registration_id(token_val)
- push_data = {'alert': 'Motion', 'msg': '', 'sound': 'sound.aif', 'zpush': '1',
- 'received_at': n_time, 'event_time': n_time, 'event_type': event_type, 'nickname': nickname
- }
- android = jpush.android(title=msg_title, big_text=msg_text, alert=msg_text, extras=push_data,
- priority=1, style=1, alert_type=7
- )
- push.notification = jpush.notification(android=android)
- push.platform = jpush.all_
- res = push.send()
- assert res.status_code == 200
- except Exception as e:
- return repr(e)
|