Browse Source

消息推送验证线程池

zhangdongming 3 weeks ago
parent
commit
5c7689db38
2 changed files with 30 additions and 11 deletions
  1. 11 4
      Object/GlobalThreadPoolObject.py
  2. 19 7
      Service/DevicePushService.py

+ 11 - 4
Object/GlobalThreadPoolObject.py

@@ -6,6 +6,7 @@
 @Email   : zhangdongming@asj6.wecom.work
 @Software: PyCharm
 """
+# GlobalThreadPoolObject.py
 import threading
 from concurrent.futures import ThreadPoolExecutor
 
@@ -23,19 +24,25 @@ class GlobalThreadPool:
                     cls._instance = super(GlobalThreadPool, cls).__new__(cls)
 
                     max_workers = 20
-                    if CONFIG_INFO == CONFIG_US or CONFIG_INFO == CONFIG_EUR:
-                        max_workers = 300
-                    elif CONFIG_INFO == CONFIG_CN:
+                    if CONFIG_INFO in (CONFIG_US, CONFIG_EUR):
                         max_workers = 100
+                    elif CONFIG_INFO == CONFIG_CN:
+                        max_workers = 80
 
                     cls._instance.executor = ThreadPoolExecutor(
+                        max_workers=max_workers,
                         thread_name_prefix="global-thread-pool"
                     )
 
         return cls._instance
 
     def submit(self, fn, *args, **kwargs):
-        return self.executor.submit(fn, *args, **kwargs)
+        try:
+            return self.executor.submit(fn, *args, **kwargs)
+        except RuntimeError as e:
+            # 线程池满了等异常,抛给外面去兜底(回退机制)
+            raise e
 
     def shutdown(self, wait=True):
         self.executor.shutdown(wait)
+

+ 19 - 7
Service/DevicePushService.py

@@ -6,6 +6,7 @@
 @Email   : zhangdongming@asj6.wecom.work
 @Software: PyCharm
 """
+import copy
 import datetime
 import hashlib
 import json
@@ -29,6 +30,7 @@ from AnsjerPush.config import XMPUSH_CONFIG, OPPOPUSH_CONFIG, XM_PUSH_CHANNEL_ID
 from Model.models import UidPushModel, SysMsgModel, DeviceSharePermission, DeviceChannelUserSet, \
     DeviceChannelUserPermission, UidSetModel, Device_Info, UserAudioVideoPush, PushLog
 from Object.ETkObject import ETkObject
+from Object.GlobalThreadPoolObject import GlobalThreadPool
 from Object.OCIObjectStorage import OCIObjectStorage
 from Object.RedisObject import RedisObject
 from Object.UidTokenObject import UidTokenObject
@@ -248,13 +250,23 @@ class DevicePushService:
                     params['push_type'] = push_type
                     params['redis_obj'] = redis_obj
 
-                    # GlobalThreadPool().submit(cls.send_app_msg_push, **params)
-                    push_thread = threading.Thread(
-                        target=cls.send_app_msg_push,
-                        kwargs=params
-                    )
-                    push_thread.start()
-
+                    # 使用 copy.deepcopy 保证每个任务的 params 是独立的
+                    push_params = copy.copy(params)  # 浅拷贝字典结构
+                    push_params['push_kwargs'] = copy.deepcopy(params['push_kwargs'])  # 独立修改的字段再深拷贝
+
+                    try:
+                        GlobalThreadPool().submit(cls.send_app_msg_push, **push_params)
+                    except RuntimeError as e:
+                        ERROR_INFO_LOGGER.error(
+                            '推送消息线程池异常uid:{} 线程池已满,触发降级,使用原始线程启动. error_line:{} error_msg:{}'.format(
+                                uid, e.__traceback__.tb_lineno, repr(e)
+                            )
+                        )
+                        push_thread = threading.Thread(
+                            target=cls.send_app_msg_push,
+                            kwargs=push_params
+                        )
+                        push_thread.start()
                     # 过滤相同的token_val
                     push_token_list.append(process_token)
         except Exception as e: