CustomizedPushService.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. # @Author : Rocky
  2. # @File : CustomizedPushService.py
  3. # @Time : 2023/10/19 15:49
  4. import logging
  5. import threading
  6. import time
  7. from concurrent.futures import ThreadPoolExecutor
  8. from django.db.models.functions import Substr, Length
  9. from AnsjerPush.config import CONFIG_INFO, CONFIG_TEST, CONFIG_CN, XM_PUSH_CHANNEL_ID
  10. from Model.models import DeviceTypeModel, Device_Info, GatewayPush, CountryModel, SysMsgModel, UserEmailSubscriptions, \
  11. AppDeviceType, UidSetModel
  12. from Service.CommonService import CommonService
  13. from Service.HuaweiPushService.HuaweiPushService import HuaweiPushObject
  14. from Service.PushService import PushObject
  15. CUSTOMIZED_PUSH_LOGGER = logging.getLogger('customized_push')
  16. class CustomizedPushObject:
  17. @staticmethod
  18. def query_push_user(device_name, country, register_period):
  19. """
  20. 查询需要推送的用户id列表
  21. @param device_name: 设备型号
  22. @param country: 国家
  23. @param register_period: 用户注册年限
  24. @return: uid_id_list
  25. """
  26. # 查询云存设备
  27. if device_name == 'cloud_storage':
  28. ipc_types = list(AppDeviceType.objects.filter(model=2).values_list('type', flat=True).distinct())
  29. uid_list = (UidSetModel.objects.annotate(ucode_char=Substr('ucode', Length('ucode') - 3, 1)).
  30. filter(device_type__in=ipc_types, ucode_char__in=['4', '5']).values_list('uid', flat=True))
  31. device_info_qs = Device_Info.objects.filter(UID__in=list(uid_list))
  32. # 国外服推给指定国家用户
  33. if CONFIG_INFO not in [CONFIG_TEST, CONFIG_CN]:
  34. country_name_list = country.split(',')
  35. country_id_list = CountryModel.objects.filter(country_name__in=country_name_list). \
  36. values_list('id', flat=True)
  37. device_info_qs = device_info_qs.filter(userID__region_country__in=country_id_list)
  38. else:
  39. # 设备型号和国家
  40. device_name_list = device_name.split(',')
  41. device_type_list = DeviceTypeModel.objects.filter(name__in=device_name_list).values_list('type', flat=True)
  42. # 测试和国内服推给所有用户
  43. if CONFIG_INFO in [CONFIG_TEST, CONFIG_CN]:
  44. device_info_qs = Device_Info.objects.filter(Type__in=device_type_list)
  45. else:
  46. country_name_list = country.split(',')
  47. country_id_list = CountryModel.objects.filter(country_name__in=country_name_list).\
  48. values_list('id', flat=True)
  49. device_info_qs = Device_Info.objects.filter(Type__in=device_type_list,
  50. userID__region_country__in=country_id_list)
  51. # 获取时间范围
  52. now_time = int(time.time())
  53. index = register_period.find('-')
  54. n, m = register_period[:index], register_period[index + 1:]
  55. if m == '':
  56. # 0-,所有时间
  57. if n == '0':
  58. device_info_qs = device_info_qs.values_list('userID_id', flat=True)
  59. # n-,n年以上
  60. else:
  61. # n年前时间戳转时间字符串
  62. n_years_seconds = int(n) * 365 * 24 * 60 * 60
  63. n_year_ago_timestamp = now_time - n_years_seconds
  64. n_year_ago = CommonService.timestamp_to_str(n_year_ago_timestamp)
  65. # 注册时间越小越早
  66. device_info_qs = device_info_qs.filter(userID__data_joined__lte=n_year_ago). \
  67. values_list('userID_id', flat=True)
  68. else:
  69. # n-m年,(如2-3年)
  70. n_years_seconds, m_years_seconds = int(n) * 365 * 24 * 60 * 60, int(m) * 365 * 24 * 60 * 60
  71. n_year_ago_timestamp = now_time - n_years_seconds
  72. m_year_ago_timestamp = now_time - m_years_seconds
  73. # 时间戳转时间字符串
  74. n_year_ago = CommonService.timestamp_to_str(n_year_ago_timestamp) # 2021
  75. m_year_ago = CommonService.timestamp_to_str(m_year_ago_timestamp) # 2020
  76. # 2020 <= 注册时间 <= 2021
  77. device_info_qs = device_info_qs. \
  78. filter(userID__data_joined__gte=m_year_ago, userID__data_joined__lte=n_year_ago). \
  79. values_list('userID_id', flat=True)
  80. user_id_list = list(set(device_info_qs))
  81. return user_id_list
  82. @classmethod
  83. def push_and_save_sys_msg(cls, **kwargs):
  84. """
  85. 推送和保存系统消息(优化版:支持百万级用户)
  86. @param kwargs: 参数
  87. @return:
  88. """
  89. customized_push_id = kwargs['id']
  90. user_id_list = kwargs['user_id_list']
  91. title = kwargs['title']
  92. msg = kwargs['msg']
  93. link = kwargs['link']
  94. icon_link = kwargs['icon_link'] if kwargs['icon_link'] != '' else None
  95. n_time = int(time.time())
  96. push_kwargs = {
  97. 'n_time': n_time,
  98. 'title': title,
  99. 'msg': msg,
  100. 'icon_link': icon_link
  101. }
  102. # 推送
  103. if kwargs['push_app'] == 'ZosiSmart':
  104. app_bundle_id_list = ['com.ansjer.zccloud_a', 'com.ansjer.zccloud']
  105. else:
  106. app_bundle_id_list = ['com.ansjer.zccloud_ab', 'com.ansjer.customizede']
  107. try:
  108. # 使用流式查询,避免一次性加载所有数据到内存
  109. gateway_push_qs = GatewayPush.objects.filter(
  110. user_id__in=user_id_list, app_bundle_id__in=app_bundle_id_list). \
  111. values('user_id', 'app_bundle_id', 'push_type', 'token_val')
  112. if not gateway_push_qs.exists():
  113. CUSTOMIZED_PUSH_LOGGER.info('customized_push_id:{}没有需要推送的用户'.format(customized_push_id))
  114. return
  115. # 批量查询所有相关用户的邮件订阅状态,减少数据库查询
  116. unsubscribed_user_ids = set(
  117. UserEmailSubscriptions.objects.filter(
  118. user_id__in=user_id_list, push_sub_status=0
  119. ).values_list('user_id', flat=True)
  120. )
  121. sys_msg_list = []
  122. saved_user_id_set = set() # 使用 set 提高查找效率
  123. gateway_push_list = []
  124. batch_size = 1000 # 每批处理1000条数据
  125. sys_msg_batch_size = 5000 # 系统消息批量插入大小
  126. # 使用 iterator() 流式查询,分批处理
  127. for gateway_push in gateway_push_qs.iterator(chunk_size=batch_size):
  128. user_id = gateway_push['user_id']
  129. # 使用 set 的 O(1) 查找,而不是 list 的 O(n) 查找
  130. if user_id not in saved_user_id_set:
  131. saved_user_id_set.add(user_id)
  132. sys_msg_list.append(SysMsgModel(
  133. userID_id=user_id, title=title, msg=msg, jumpLink=link,
  134. addTime=n_time, updTime=n_time))
  135. # 分批插入系统消息,避免单次插入过多
  136. if len(sys_msg_list) >= sys_msg_batch_size:
  137. SysMsgModel.objects.bulk_create(sys_msg_list, batch_size=sys_msg_batch_size)
  138. sys_msg_list = []
  139. CUSTOMIZED_PUSH_LOGGER.info(
  140. 'customized_push_id:{}已保存{}条系统消息'.format(
  141. customized_push_id, len(saved_user_id_set)))
  142. # 检查用户是否取消订阅
  143. if user_id not in unsubscribed_user_ids:
  144. gateway_push_list.append(gateway_push)
  145. # 保存剩余的系统消息
  146. if sys_msg_list:
  147. SysMsgModel.objects.bulk_create(sys_msg_list, batch_size=sys_msg_batch_size)
  148. CUSTOMIZED_PUSH_LOGGER.info(
  149. 'customized_push_id:{}共保存{}条系统消息,准备推送{}条'.format(
  150. customized_push_id, len(saved_user_id_set), len(gateway_push_list)))
  151. # 异步推送消息
  152. if gateway_push_list:
  153. pre_push_kwargs = {
  154. 'push_kwargs': push_kwargs,
  155. 'gateway_push_list': gateway_push_list,
  156. 'customized_push_id': customized_push_id
  157. }
  158. pre_push_thread = threading.Thread(
  159. target=cls.thr_pool_push,
  160. kwargs=pre_push_kwargs)
  161. pre_push_thread.start()
  162. CUSTOMIZED_PUSH_LOGGER.info('customized_push_id:{}推送任务已启动'.format(customized_push_id))
  163. except Exception as e:
  164. CUSTOMIZED_PUSH_LOGGER.info('定制化推送或保存数据异常,'
  165. 'error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  166. @classmethod
  167. def thr_pool_push(cls, **kwargs):
  168. """
  169. 线程池推送(优化版:限制并发数)
  170. """
  171. push_kwargs = kwargs['push_kwargs']
  172. gateway_push_list = kwargs['gateway_push_list']
  173. customized_push_id = kwargs.get('customized_push_id', 'unknown')
  174. CUSTOMIZED_PUSH_LOGGER.info(
  175. 'customized_push_id:{}线程池推送开始,共{}条'.format(customized_push_id, len(gateway_push_list)))
  176. # 限制最大线程数,避免创建过多线程导致系统资源耗尽
  177. max_workers = 50 # 根据服务器性能调整
  178. batch_count = 0
  179. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  180. executor.map(
  181. lambda gateway_push_kwargs: cls.start_push(push_kwargs, gateway_push_kwargs),
  182. gateway_push_list)
  183. batch_count += 1
  184. CUSTOMIZED_PUSH_LOGGER.info(
  185. 'customized_push_id:{}线程池推送完成'.format(customized_push_id))
  186. @classmethod
  187. def start_push(cls, push_kwargs, gateway_push_kwargs):
  188. title = push_kwargs['title']
  189. n_time = push_kwargs['n_time']
  190. msg = push_kwargs['msg']
  191. icon_link = push_kwargs['icon_link']
  192. push_type = gateway_push_kwargs['push_type']
  193. user_id = gateway_push_kwargs['user_id']
  194. app_bundle_id = gateway_push_kwargs['app_bundle_id']
  195. token_val = gateway_push_kwargs['token_val']
  196. push_succeed = cls.push_msg(push_type, app_bundle_id, token_val, n_time, title, msg, icon_link)
  197. push_status = '成功' if push_succeed else '失败'
  198. CUSTOMIZED_PUSH_LOGGER.info('{}推送{},push_type:{}'.format(user_id, push_status, push_type))
  199. @staticmethod
  200. def push_msg(push_type, app_bundle_id, token_val, n_time, title, msg, icon_link):
  201. push_kwargs = {
  202. 'nickname': '',
  203. 'event_type': 0,
  204. 'app_bundle_id': app_bundle_id,
  205. 'token_val': token_val,
  206. 'msg_title': title,
  207. 'msg_text': msg,
  208. 'n_time': n_time,
  209. }
  210. try:
  211. # ios
  212. if push_type == 0:
  213. push_kwargs['launch_image'] = icon_link
  214. return PushObject.ios_p8_push(**push_kwargs)
  215. # gcm
  216. elif push_type == 1:
  217. if icon_link is None:
  218. icon_link = ''
  219. push_kwargs['image'] = icon_link
  220. return PushObject.android_fcm_push_v1(**push_kwargs)
  221. # 极光
  222. elif push_type == 2:
  223. return PushObject.android_jpush(**push_kwargs)
  224. # 华为
  225. elif push_type == 3:
  226. push_kwargs['image_url'] = icon_link
  227. huawei_push_object = HuaweiPushObject()
  228. return huawei_push_object.send_push_notify_message(**push_kwargs)
  229. # 小米
  230. elif push_type == 4:
  231. push_kwargs['channel_id'] = XM_PUSH_CHANNEL_ID['service_reminder']
  232. return PushObject.android_xmpush(**push_kwargs)
  233. # vivo
  234. elif push_type == 5:
  235. return PushObject.android_vivopush(**push_kwargs)
  236. # oppo
  237. elif push_type == 6:
  238. push_kwargs['channel_id'] = 'VALUE_ADDED'
  239. return PushObject.android_oppopush(**push_kwargs)
  240. # 魅族
  241. elif push_type == 7:
  242. return PushObject.android_meizupush(**push_kwargs)
  243. else:
  244. return False
  245. except Exception as e:
  246. CUSTOMIZED_PUSH_LOGGER.info('定制化推送异常,'
  247. 'error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  248. return False