GatewayService.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. # -*- coding: utf-8 -*-
  2. """
  3. @Time : 2022/5/19 11:43
  4. @Auth : Locky
  5. @File :GatewayService.py
  6. @IDE :PyCharm
  7. """
  8. import os
  9. import apns2
  10. import jpush
  11. from pyfcm import FCMNotification
  12. from AnsjerPush.config import APP_BUNDLE_DICT, APNS_MODE, BASE_DIR, APNS_CONFIG, FCM_CONFIG, JPUSH_CONFIG
  13. # 网关推送类
  14. from Service.CommonService import CommonService
  15. class GatewayPushService:
  16. # 获取推送消息标题
  17. @staticmethod
  18. def get_msg_title(app_bundle_id, nickname):
  19. if app_bundle_id in APP_BUNDLE_DICT.keys():
  20. return APP_BUNDLE_DICT[app_bundle_id] + '(' + nickname + ')'
  21. else:
  22. return nickname
  23. # 获取推送消息内容
  24. @staticmethod
  25. def get_msg_text(n_time, tz, lang, alarm):
  26. n_date = CommonService.get_now_time_str(n_time=n_time, tz=tz, lang=lang)
  27. if lang == 'cn':
  28. msg_text = '{} 日期:{}'.format(alarm, n_date)
  29. else:
  30. msg_text = '{} date:{}'.format(alarm, n_date)
  31. return msg_text
  32. # ios apns 推送
  33. @staticmethod
  34. def ios_apns_push(nickname, app_bundle_id, token_val, n_time, event_type, msg_title, msg_text):
  35. try:
  36. cli = apns2.APNSClient(mode=APNS_MODE,
  37. client_cert=os.path.join(BASE_DIR, APNS_CONFIG[app_bundle_id]['pem_path']))
  38. alert = apns2.PayloadAlert(title=msg_title, body=msg_text)
  39. push_data = {'alert': 'Motion', 'msg': '', 'sound': '', 'zpush': '1',
  40. 'received_at': n_time, 'event_time': n_time, 'event_type': event_type, 'nickname': nickname
  41. }
  42. payload = apns2.Payload(alert=alert, custom=push_data, sound='default')
  43. n = apns2.Notification(payload=payload, priority=apns2.PRIORITY_LOW)
  44. res = cli.push(n=n, device_token=token_val, topic=app_bundle_id)
  45. assert res.status_code == 200
  46. except Exception as e:
  47. return repr(e)
  48. # android fcm 推送
  49. @staticmethod
  50. def android_fcm_push(nickname, app_bundle_id, token_val, n_time, event_type, msg_title, msg_text):
  51. try:
  52. serverKey = FCM_CONFIG[app_bundle_id]
  53. push_service = FCMNotification(api_key=serverKey)
  54. push_data = {'alert': 'Motion', 'msg': '', 'sound': 'sound.aif', 'zpush': '1',
  55. 'received_at': n_time, 'event_time': n_time, 'event_type': event_type, 'nickname': nickname
  56. }
  57. result = push_service.notify_single_device(registration_id=token_val, message_title=msg_title,
  58. message_body=msg_text, data_message=push_data,
  59. extra_kwargs={'default_sound': True,
  60. 'default_vibrate_timings': True,
  61. 'default_light_settings': True
  62. }
  63. )
  64. return result
  65. except Exception as e:
  66. return repr(e)
  67. # android 极光 推送
  68. @staticmethod
  69. def android_jpush(nickname, app_bundle_id, token_val, n_time, event_type, msg_title, msg_text):
  70. try:
  71. app_key = JPUSH_CONFIG[app_bundle_id]['Key']
  72. master_secret = JPUSH_CONFIG[app_bundle_id]['Secret']
  73. # 换成各自的app_key和master_secret
  74. _jpush = jpush.JPush(app_key, master_secret)
  75. push = _jpush.create_push()
  76. push.audience = jpush.registration_id(token_val)
  77. push_data = {'alert': 'Motion', 'msg': '', 'sound': 'sound.aif', 'zpush': '1',
  78. 'received_at': n_time, 'event_time': n_time, 'event_type': event_type, 'nickname': nickname
  79. }
  80. android = jpush.android(title=msg_title, big_text=msg_text, alert=msg_text, extras=push_data,
  81. priority=1, style=1, alert_type=7
  82. )
  83. push.notification = jpush.notification(android=android)
  84. push.platform = jpush.all_
  85. res = push.send()
  86. assert res.status_code == 200
  87. except Exception as e:
  88. return repr(e)