Pārlūkot izejas kodu

消息推送异步线程替换线程池

zhangdongming 1 gadu atpakaļ
vecāks
revīzija
1d73539278
2 mainītis faili ar 89 papildinājumiem un 68 dzēšanām
  1. 22 7
      Controller/DetectControllerV2.py
  2. 67 61
      Service/DevicePushService.py

+ 22 - 7
Controller/DetectControllerV2.py

@@ -4,11 +4,22 @@ import threading
 
 from django.http import JsonResponse
 from django.views.generic.base import View
-from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_CN
+from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_CN, CONFIG_US
 
 from Object.RedisObject import RedisObject
 from Service.DevicePushService import DevicePushService
+from concurrent.futures import ThreadPoolExecutor
+
 TIME_LOGGER = logging.getLogger('time')
+# 创建一个全局的线程池实例
+# 线程池最大数量
+# 1、CPU密集型:操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2。核数为4的话一般设置5或8
+# 2、IO密集型:文件操作,网络操作,数据库操作,一般线程设置为:CPU核数 / (1-0.9),核数为4的话,一般设置 40
+executor = ThreadPoolExecutor(max_workers=20)
+if CONFIG_INFO == CONFIG_US or CONFIG_INFO == CONFIG_EUR:
+    ThreadPoolExecutor(max_workers=80)
+elif CONFIG_INFO == CONFIG_CN:
+    ThreadPoolExecutor(max_workers=40)
 
 
 # 移动侦测V2接口
@@ -50,7 +61,7 @@ class NotificationV2View(View):
         dealings_type = int(request_dict.get('dealingsType', 0))
         detection = int(request_dict.get('detection', 0))
         button = request_dict.get('button', '1')
-
+        uid = ""
         # 参数校验
         if not all([channel, n_time]):
             return JsonResponse(status=200, data={'code': 444, 'msg': 'param is wrong'})
@@ -129,11 +140,14 @@ class NotificationV2View(View):
                       'app_push_config': uid_set_push_list[0]['uid_set__msg_notify'],
                       'uid_set_push_list': uid_set_push_list}
 
+            # 使用全局的线程池提交推送任务
+            executor.submit(push_and_save_data, **params)
+
             # 异步推送消息和保存数据
-            push_thread = threading.Thread(
-                target=push_and_save_data,
-                kwargs=params)
-            push_thread.start()
+            # push_thread = threading.Thread(
+            #     target=push_and_save_data,
+            #     kwargs=params)
+            # push_thread.start()
 
             # 视频通话不返回图片链接
             if event_type == 607:
@@ -156,7 +170,8 @@ class NotificationV2View(View):
                 uid, n_time, event_type, json.dumps(res_data)))
             return JsonResponse(status=200, data=res_data)
         except Exception as e:
-            TIME_LOGGER.info('V2推送接口异常,error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+            TIME_LOGGER.info('V2推送接口异常uid:{},etk:{},error_line:{},error_msg:{}'.
+                             format(uid, etk, e.__traceback__.tb_lineno, repr(e)))
             data = {
                 'error_line': e.__traceback__.tb_lineno,
                 'error_msg': repr(e)

+ 67 - 61
Service/DevicePushService.py

@@ -32,8 +32,10 @@ from Service.CommonService import CommonService
 from Service.EquipmentInfoService import EquipmentInfoService, EQUIPMENT_INFO_DICT
 from Service.HuaweiPushService.HuaweiPushService import HuaweiPushObject
 from Service.PushService import PushObject
+from django.db import close_old_connections
 
-LOGGING = logging.getLogger('time')
+LOGGING = logging.getLogger('info')
+TIME_LOGGER = logging.getLogger('time')
 
 
 class DevicePushService:
@@ -154,64 +156,68 @@ class DevicePushService:
         @param params: 推送参数
         @return: bool
         """
-        uid = params['uid']
-        params['event_tag'] = cls.get_event_tag(params['ai_type'], params['event_type'], params['detection'])
-        is_app_push = True if params['event_type'] in [606, 607] else \
-            cls.is_send_app_push(
-                params['event_type'], params['event_tag'], params['app_push_config'], params['app_push'], uid)
-
-        # 低功耗产品推送,休眠702、低电量704提醒,并且detection=0,0标识单事件类型,1标识多事件类型
-        is_app_push = True if params['event_type'] in [702, 704] and params['detection'] == 0 else is_app_push
-        redis_obj = RedisObject(3)
-        # 推送
-        if is_app_push:
-            msg_key = 'PUSH:MSG:IMAGE:{}:{}:{}'.format(params['uid'], params['channel'], params['n_time'])
-            d_params = {'is_st': params['is_st'], 'storage_location': params['storage_location'],
-                        'event_tag': params['event_tag'], 'event_type': params['event_type']}
-            redis_obj.set_data(msg_key, json.dumps(d_params), 60)
-
-            push_kwargs = params['push_kwargs']
-            for up in params['uid_set_push_list']:
-                push_type = up['push_type']
-                lang = up['lang']
-                tz = up['tz']
-                if tz is None or tz == '':
-                    tz = 0
-                if params['event_type'] in [606, 607] and push_type in [5, 6]:
-                    push_kwargs['jg_token_val'] = up['jg_token_val']
-                else:
-                    if 'jg_token_val' in push_kwargs:
-                        push_kwargs.pop('jg_token_val')
-
-                appBundleId = up['appBundleId']
-                token_val = up['token_val']
-                # 发送标题
-                msg_title = cls.get_msg_title(nickname=params['nickname'])
-                # 发送内容
-                msg_text = cls.get_msg_text(channel=params['channel'], n_time=params['n_time'], lang=lang, tz=tz,
-                                            event_type=params['event_type'], ai_type=params['ai_type'],
-                                            device_type=params['device_type'], electricity=params['electricity'],
-                                            dealings_type=params['dealings_type'], event_tag=params['event_tag']
-                                            )
-
-                # 补齐推送参数
-                push_kwargs['appBundleId'] = appBundleId
-                push_kwargs['token_val'] = token_val
-                push_kwargs['msg_title'] = msg_title
-                push_kwargs['msg_text'] = msg_text
-                params['push_kwargs'] = push_kwargs
-                params['appBundleId'] = appBundleId
-                params['token_val'] = token_val
-                params['lang'] = lang
-                params['tz'] = tz
-                params['push_type'] = push_type
-                params['redis_obj'] = redis_obj
-
-                push_thread = threading.Thread(
-                    target=cls.send_app_msg_push,
-                    kwargs=params
-                )
-                push_thread.start()
+        try:
+            uid = params['uid']
+            params['event_tag'] = cls.get_event_tag(params['ai_type'], params['event_type'], params['detection'])
+            is_app_push = True if params['event_type'] in [606, 607] else \
+                cls.is_send_app_push(
+                    params['event_type'], params['event_tag'], params['app_push_config'], params['app_push'], uid)
+
+            # 低功耗产品推送,休眠702、低电量704提醒,并且detection=0,0标识单事件类型,1标识多事件类型
+            is_app_push = True if params['event_type'] in [702, 704] and params['detection'] == 0 else is_app_push
+            redis_obj = RedisObject(3)
+            # 推送
+            if is_app_push:
+                msg_key = 'PUSH:MSG:IMAGE:{}:{}:{}'.format(params['uid'], params['channel'], params['n_time'])
+                d_params = {'is_st': params['is_st'], 'storage_location': params['storage_location'],
+                            'event_tag': params['event_tag'], 'event_type': params['event_type']}
+                redis_obj.set_data(msg_key, json.dumps(d_params), 60)
+
+                push_kwargs = params['push_kwargs']
+                for up in params['uid_set_push_list']:
+                    push_type = up['push_type']
+                    lang = up['lang']
+                    tz = up['tz']
+                    if tz is None or tz == '':
+                        tz = 0
+                    if params['event_type'] in [606, 607] and push_type in [5, 6]:
+                        push_kwargs['jg_token_val'] = up['jg_token_val']
+                    else:
+                        if 'jg_token_val' in push_kwargs:
+                            push_kwargs.pop('jg_token_val')
+
+                    appBundleId = up['appBundleId']
+                    token_val = up['token_val']
+                    # 发送标题
+                    msg_title = cls.get_msg_title(nickname=params['nickname'])
+                    # 发送内容
+                    msg_text = cls.get_msg_text(channel=params['channel'], n_time=params['n_time'], lang=lang, tz=tz,
+                                                event_type=params['event_type'], ai_type=params['ai_type'],
+                                                device_type=params['device_type'], electricity=params['electricity'],
+                                                dealings_type=params['dealings_type'], event_tag=params['event_tag']
+                                                )
+
+                    # 补齐推送参数
+                    push_kwargs['appBundleId'] = appBundleId
+                    push_kwargs['token_val'] = token_val
+                    push_kwargs['msg_title'] = msg_title
+                    push_kwargs['msg_text'] = msg_text
+                    params['push_kwargs'] = push_kwargs
+                    params['appBundleId'] = appBundleId
+                    params['token_val'] = token_val
+                    params['lang'] = lang
+                    params['tz'] = tz
+                    params['push_type'] = push_type
+                    params['redis_obj'] = redis_obj
+
+                    push_thread = threading.Thread(
+                        target=cls.send_app_msg_push,
+                        kwargs=params
+                    )
+                    push_thread.start()
+        except Exception as e:
+            TIME_LOGGER.info('APP通知V2推送接口异常uid:{},error_line:{},error_msg:{}'
+                             .format(params['uid'], e.__traceback__.tb_lineno, repr(e)))
 
     @classmethod
     def save_msg_push(cls, **params):
@@ -273,7 +279,7 @@ class DevicePushService:
                         redis_obj.rpush(equipment_info_key, equipment_info_value)
                         LOGGING.info('***保存推送消息uid:{},time:{},user_id:{}'.format(uid, params['n_time'], user_id))
                     saved_user_id_list.append(user_id)
-
+            close_old_connections()
             # 写入系统消息
             if sys_msg_list:
                 SysMsgModel.objects.bulk_create(sys_msg_list)
@@ -304,7 +310,7 @@ class DevicePushService:
 
             return True
         except Exception as e:
-            LOGGING.info('推送消息或存表异常uid{}: error_line:{}, error_msg:{}'.format(uid, e.__traceback__.tb_lineno, repr(e)))
+            LOGGING.info('推送消息或存表异常uid:{}, error_line:{}, error_msg:{}'.format(uid, e.__traceback__.tb_lineno, repr(e)))
             return False
 
     @classmethod