Bladeren bron

修改异步推送消息替换线程池

zhangdongming 1 jaar geleden
bovenliggende
commit
76ef4d0fc2
3 gewijzigde bestanden met toevoegingen van 94 en 26 verwijderingen
  1. 17 21
      Controller/DetectControllerV2.py
  2. 69 0
      Object/GlobalThreadPoolObject.py
  3. 8 5
      Service/DevicePushService.py

+ 17 - 21
Controller/DetectControllerV2.py

@@ -4,25 +4,15 @@ import threading
 
 from django.http import JsonResponse
 from django.views.generic.base import View
-from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_CN, CONFIG_US
 
+from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_US
+from Object.GlobalThreadPoolObject import GlobalThreadPool
 from Object.RedisObject import RedisObject
 from Service.DevicePushService import DevicePushService
-from concurrent.futures import ThreadPoolExecutor
 
 TIME_LOGGER = logging.getLogger('time')
 ERROR_INFO_LOGGER = logging.getLogger('error_info')
 
-# 创建一个全局的线程池实例
-# 线程池最大数量
-# 1、CPU密集型:操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2。核数为4的话一般设置5或8
-# 2、IO密集型:文件操作,网络操作,数据库操作,一般线程设置为:CPU核数 / (1-0.9),核数为4的话,一般设置 40
-executor = ThreadPoolExecutor(max_workers=20)
-if CONFIG_INFO == CONFIG_US or CONFIG_INFO == CONFIG_EUR:
-    ThreadPoolExecutor(max_workers=80)
-elif CONFIG_INFO == CONFIG_CN:
-    ThreadPoolExecutor(max_workers=40)
-
 
 # 移动侦测V2接口
 class NotificationV2View(View):
@@ -147,13 +137,15 @@ class NotificationV2View(View):
             params = {'nickname': nickname, 'uid': uid, 'push_kwargs': push_kwargs, 'is_st': is_st, 'region': region,
                       'is_sys_msg': is_sys_msg, 'channel': channel, 'event_type': event_type, 'n_time': n_time,
                       'electricity': electricity, 'bucket': bucket, 'aws_s3_client': aws_s3_client,
-                      'app_push': cache_app_push, 'storage_location': storage_location, 'ai_type': ai_type, 'device_type': device_type,
+                      'app_push': cache_app_push, 'storage_location': storage_location, 'ai_type': ai_type,
+                      'device_type': device_type,
                       'dealings_type': dealings_type, 'detection': detection,
                       'app_push_config': uid_set_push_list[0]['uid_set__msg_notify'],
                       'uid_set_push_list': uid_set_push_list}
 
             # 使用全局的线程池提交推送任务
-            executor.submit(push_and_save_data, **params)
+            thread_pool = GlobalThreadPool()
+            thread_pool.submit(push_and_save_data, **params)
 
             # 异步推送消息和保存数据
             # push_thread = threading.Thread(
@@ -182,9 +174,8 @@ class NotificationV2View(View):
                 uid, n_time, event_type, json.dumps(res_data)))
             return JsonResponse(status=200, data=res_data)
         except Exception as e:
-            ERROR_INFO_LOGGER.info(
-                'V2推送接口异常,uid:{},etk:{},error_line:{},error_msg:{}'.
-                format(uid, etk, e.__traceback__.tb_lineno, repr(e)))
+            ERROR_INFO_LOGGER.info('V2推送接口异常,uid:{},etk:{},error_line:{},error_msg:{}'.
+                                   format(uid, etk, e.__traceback__.tb_lineno, repr(e)))
             data = {
                 'error_line': e.__traceback__.tb_lineno,
                 'error_msg': repr(e)
@@ -195,11 +186,16 @@ class NotificationV2View(View):
 def push_and_save_data(**params):
     uid = params['uid']
     TIME_LOGGER.info('{}开始异步存表和推送'.format(uid))
+
+    # 线程池推送消息
+    thread_pool = GlobalThreadPool()
+    thread_pool.submit(DevicePushService.push_msg, **params)
+
     # 异步推送消息
-    push_thread = threading.Thread(
-        target=DevicePushService.push_msg,
-        kwargs=params)
-    push_thread.start()
+    # push_thread = threading.Thread(
+    #     target=DevicePushService.push_msg,
+    #     kwargs=params)
+    # push_thread.start()
     # 保存推送数据
     result = DevicePushService.save_msg_push(**params)
     TIME_LOGGER.info('{}存表结果:{}'.format(uid, result))

+ 69 - 0
Object/GlobalThreadPoolObject.py

@@ -0,0 +1,69 @@
+# -*- encoding: utf-8 -*-
+"""
+@File    : GlobalThreadPoolObject.py
+@Time    : 2024/8/20 19:49
+@Author  : stephen
+@Email   : zhangdongming@asj6.wecom.work
+@Software: PyCharm
+"""
+import threading
+from concurrent.futures import ThreadPoolExecutor
+from queue import Queue
+
+from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_CN, CONFIG_US
+
+
+class GlobalThreadPool:
+    _instance = None
+    _lock = threading.Lock()
+
+    def __new__(cls):
+        if not cls._instance:
+            with cls._lock:
+                if not cls._instance:
+                    cls._instance = super(GlobalThreadPool, cls).__new__(cls)
+
+                    # 设置默认值
+                    max_workers = 20
+                    queue_size = 200000
+
+                    # 根据配置调整线程池参数 目前*2倍
+                    # 创建一个全局的线程池实例
+                    # 线程池最大数量
+                    # 1、CPU密集型:操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2。核数为4的话一般设置5或8
+                    # 2、IO密集型:文件操作,网络操作,数据库操作,一般线程设置为:CPU核数 / (1-0.9),核数为4的话,一般设置 40
+                    if CONFIG_INFO == CONFIG_US or CONFIG_INFO == CONFIG_EUR:
+                        max_workers = 200
+                    elif CONFIG_INFO == CONFIG_CN:
+                        max_workers = 100
+
+                    # 创建线程池
+                    cls._instance.executor = ThreadPoolExecutor(
+                        max_workers=max_workers,
+                        thread_name_prefix="global-thread-pool"
+                    )
+
+                    # 设置阻塞队列大小
+                    cls._instance.queue = Queue(maxsize=queue_size)
+
+        return cls._instance
+
+    def submit(self, fn, *args, **kwargs):
+        if self.queue.full():
+            raise Exception("Task queue is full, rejecting new tasks")
+        else:
+            self.queue.put(None)  # 占位符,表示提交了一个任务
+            return self.executor.submit(fn, *args, **kwargs)
+
+    def shutdown(self, wait=True):
+        self.executor.shutdown(wait)
+
+
+# 用法示例:
+class SomeService:
+    def some_method(self, **params):
+        thread_pool = GlobalThreadPool()
+        thread_pool.submit(self.some_task, **params)
+
+    def some_task(self, **params):
+        pass

+ 8 - 5
Service/DevicePushService.py

@@ -36,6 +36,7 @@ from Service.EquipmentInfoService import EquipmentInfoService, EQUIPMENT_INFO_DI
 from Service.HuaweiPushService.HuaweiPushService import HuaweiPushObject
 from Service.PushService import PushObject
 from django.db import close_old_connections
+from Object.GlobalThreadPoolObject import GlobalThreadPool
 
 LOGGING = logging.getLogger('info')
 TIME_LOGGER = logging.getLogger('time')
@@ -213,11 +214,12 @@ class DevicePushService:
                     params['tz'] = tz
                     params['push_type'] = push_type
 
-                    push_thread = threading.Thread(
-                        target=cls.send_app_msg_push,
-                        kwargs=params
-                    )
-                    push_thread.start()
+                    GlobalThreadPool().submit(cls.send_app_msg_push, **params)
+                    # push_thread = threading.Thread(
+                    #     target=cls.send_app_msg_push,
+                    #     kwargs=params
+                    # )
+                    # push_thread.start()
         except Exception as e:
             ERROR_INFO_LOGGER.info(
                 '推送消息线程异常,uid:{},error_line:{},error_msg:{}'
@@ -451,6 +453,7 @@ class DevicePushService:
                     push_result = PushObject.android_honorpush(**push_kwargs)
 
             if kwargs['event_type'] in [606, 607]:
+                close_old_connections()
                 # 写入日志表
                 PushLog.objects.create(uid=uid, event_type=kwargs['event_type'], created_time=int(time.time()),
                                        content=push_kwargs, push_result=push_result, push_type=push_type)