فهرست منبع

修改消息推送使用线程池、APP通知加异常捕获

zhangdongming 1 سال پیش
والد
کامیت
e854ff459b
2فایلهای تغییر یافته به همراه82 افزوده شده و 64 حذف شده
  1. 20 7
      Controller/DetectControllerV2.py
  2. 62 57
      Service/DevicePushService.py

+ 20 - 7
Controller/DetectControllerV2.py

@@ -8,7 +8,18 @@ from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_CN, CONFIG_US
 
 from Object.RedisObject import RedisObject
 from Service.DevicePushService import DevicePushService
+from concurrent.futures import ThreadPoolExecutor
+
 TIME_LOGGER = logging.getLogger('time')
+# 创建一个全局的线程池实例
+# 线程池最大数量
+# 1、CPU密集型:操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2。核数为4的话一般设置5或8
+# 2、IO密集型:文件操作,网络操作,数据库操作,一般线程设置为:CPU核数 / (1-0.9),核数为4的话,一般设置 40
+executor = ThreadPoolExecutor(max_workers=20)
+if CONFIG_INFO == CONFIG_US or CONFIG_INFO == CONFIG_EUR:
+    ThreadPoolExecutor(max_workers=80)
+elif CONFIG_INFO == CONFIG_CN:
+    ThreadPoolExecutor(max_workers=40)
 
 
 # 移动侦测V2接口
@@ -134,11 +145,14 @@ class NotificationV2View(View):
                       'app_push_config': uid_set_push_list[0]['uid_set__msg_notify'],
                       'uid_set_push_list': uid_set_push_list}
 
+            # 使用全局的线程池提交推送任务
+            executor.submit(push_and_save_data, **params)
+
             # 异步推送消息和保存数据
-            push_thread = threading.Thread(
-                target=push_and_save_data,
-                kwargs=params)
-            push_thread.start()
+            # push_thread = threading.Thread(
+            #     target=push_and_save_data,
+            #     kwargs=params)
+            # push_thread.start()
 
             # 视频通话不返回图片链接
             if event_type == 607:
@@ -161,9 +175,8 @@ class NotificationV2View(View):
                 uid, n_time, event_type, json.dumps(res_data)))
             return JsonResponse(status=200, data=res_data)
         except Exception as e:
-            TIME_LOGGER.info(
-                'V2推送接口异常, etk:{}, uid:{},error_line:{},error_msg:{}'.format(etk, uid, e.__traceback__.tb_lineno,
-                                                                                   repr(e)))
+            TIME_LOGGER.info('V2推送接口异常uid:{},etk:{},error_line:{},error_msg:{}'.
+                             format(uid, etk, e.__traceback__.tb_lineno, repr(e)))
             data = {
                 'error_line': e.__traceback__.tb_lineno,
                 'error_msg': repr(e)

+ 62 - 57
Service/DevicePushService.py

@@ -37,6 +37,7 @@ from Service.HuaweiPushService.HuaweiPushService import HuaweiPushObject
 from Service.PushService import PushObject
 
 LOGGING = logging.getLogger('info')
+TIME_LOGGER = logging.getLogger('time')
 
 
 class DevicePushService:
@@ -157,63 +158,67 @@ class DevicePushService:
         @param params: 推送参数
         @return: bool
         """
-        uid = params['uid']
-        params['event_tag'] = cls.get_event_tag(params['ai_type'], params['event_type'], params['detection'])
-        is_app_push = True if params['event_type'] in [606, 607] else \
-            cls.is_send_app_push(
-                params['event_type'], params['event_tag'], params['app_push_config'], params['app_push'], uid)
-
-        # 低功耗产品推送,休眠702、低电量704提醒,并且detection=0,0标识单事件类型,1标识多事件类型
-        is_app_push = True if params['event_type'] in [702, 704] and params['detection'] == 0 else is_app_push
-
-        # 推送
-        if is_app_push:
-            msg_key = 'PUSH:MSG:IMAGE:{}:{}:{}'.format(params['uid'], params['channel'], params['n_time'])
-            d_params = {'is_st': params['is_st'], 'storage_location': params['storage_location'],
-                        'event_tag': params['event_tag'], 'event_type': params['event_type']}
-            RedisObject(3).set_data(msg_key, json.dumps(d_params), 60)
-
-            push_kwargs = params['push_kwargs']
-            for up in params['uid_set_push_list']:
-                push_type = up['push_type']
-                lang = up['lang']
-                tz = up['tz']
-                if tz is None or tz == '':
-                    tz = 0
-                if params['event_type'] in [606, 607] and push_type in [5, 6]:
-                    push_kwargs['jg_token_val'] = up['jg_token_val']
-                else:
-                    if 'jg_token_val' in push_kwargs:
-                        push_kwargs.pop('jg_token_val')
-
-                appBundleId = up['appBundleId']
-                token_val = up['token_val']
-                # 发送标题
-                msg_title = cls.get_msg_title(nickname=params['nickname'])
-                # 发送内容
-                msg_text = cls.get_msg_text(channel=params['channel'], n_time=params['n_time'], lang=lang, tz=tz,
-                                            event_type=params['event_type'], ai_type=params['ai_type'],
-                                            device_type=params['device_type'], electricity=params['electricity'],
-                                            dealings_type=params['dealings_type'], event_tag=params['event_tag']
-                                            )
-
-                # 补齐推送参数
-                push_kwargs['appBundleId'] = appBundleId
-                push_kwargs['token_val'] = token_val
-                push_kwargs['msg_title'] = msg_title
-                push_kwargs['msg_text'] = msg_text
-                params['push_kwargs'] = push_kwargs
-                params['appBundleId'] = appBundleId
-                params['token_val'] = token_val
-                params['lang'] = lang
-                params['tz'] = tz
-                params['push_type'] = push_type
-
-                push_thread = threading.Thread(
-                    target=cls.send_app_msg_push,
-                    kwargs=params
-                )
-                push_thread.start()
+        try:
+            uid = params['uid']
+            params['event_tag'] = cls.get_event_tag(params['ai_type'], params['event_type'], params['detection'])
+            is_app_push = True if params['event_type'] in [606, 607] else \
+                cls.is_send_app_push(
+                    params['event_type'], params['event_tag'], params['app_push_config'], params['app_push'], uid)
+
+            # 低功耗产品推送,休眠702、低电量704提醒,并且detection=0,0标识单事件类型,1标识多事件类型
+            is_app_push = True if params['event_type'] in [702, 704] and params['detection'] == 0 else is_app_push
+
+            # 推送
+            if is_app_push:
+                msg_key = 'PUSH:MSG:IMAGE:{}:{}:{}'.format(params['uid'], params['channel'], params['n_time'])
+                d_params = {'is_st': params['is_st'], 'storage_location': params['storage_location'],
+                            'event_tag': params['event_tag'], 'event_type': params['event_type']}
+                RedisObject(3).set_data(msg_key, json.dumps(d_params), 60)
+
+                push_kwargs = params['push_kwargs']
+                for up in params['uid_set_push_list']:
+                    push_type = up['push_type']
+                    lang = up['lang']
+                    tz = up['tz']
+                    if tz is None or tz == '':
+                        tz = 0
+                    if params['event_type'] in [606, 607] and push_type in [5, 6]:
+                        push_kwargs['jg_token_val'] = up['jg_token_val']
+                    else:
+                        if 'jg_token_val' in push_kwargs:
+                            push_kwargs.pop('jg_token_val')
+
+                    appBundleId = up['appBundleId']
+                    token_val = up['token_val']
+                    # 发送标题
+                    msg_title = cls.get_msg_title(nickname=params['nickname'])
+                    # 发送内容
+                    msg_text = cls.get_msg_text(channel=params['channel'], n_time=params['n_time'], lang=lang, tz=tz,
+                                                event_type=params['event_type'], ai_type=params['ai_type'],
+                                                device_type=params['device_type'], electricity=params['electricity'],
+                                                dealings_type=params['dealings_type'], event_tag=params['event_tag']
+                                                )
+
+                    # 补齐推送参数
+                    push_kwargs['appBundleId'] = appBundleId
+                    push_kwargs['token_val'] = token_val
+                    push_kwargs['msg_title'] = msg_title
+                    push_kwargs['msg_text'] = msg_text
+                    params['push_kwargs'] = push_kwargs
+                    params['appBundleId'] = appBundleId
+                    params['token_val'] = token_val
+                    params['lang'] = lang
+                    params['tz'] = tz
+                    params['push_type'] = push_type
+
+                    push_thread = threading.Thread(
+                        target=cls.send_app_msg_push,
+                        kwargs=params
+                    )
+                    push_thread.start()
+        except Exception as e:
+            TIME_LOGGER.info('APP通知V2推送接口异常uid:{},error_line:{},error_msg:{}'
+                             .format(params['uid'], e.__traceback__.tb_lineno, repr(e)))
 
     @classmethod
     def save_msg_push(cls, **params):