Эх сурвалжийг харах

线程池异步推送图片,移除线程阻塞判断

locky 1 жил өмнө
parent
commit
f293480331

+ 1 - 28
Object/GlobalThreadPoolObject.py

@@ -8,7 +8,6 @@
 """
 import threading
 from concurrent.futures import ThreadPoolExecutor
-from queue import Queue
 
 from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_CN, CONFIG_US
 
@@ -23,47 +22,21 @@ class GlobalThreadPool:
                 if not cls._instance:
                     cls._instance = super(GlobalThreadPool, cls).__new__(cls)
 
-                    # 设置默认值
                     max_workers = 20
-                    queue_size = 200000
-
-                    # 根据配置调整线程池参数 目前*2倍
-                    # 创建一个全局的线程池实例
-                    # 线程池最大数量
-                    # 1、CPU密集型:操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2。核数为4的话一般设置5或8
-                    # 2、IO密集型:文件操作,网络操作,数据库操作,一般线程设置为:CPU核数 / (1-0.9),核数为4的话,一般设置 40
                     if CONFIG_INFO == CONFIG_US or CONFIG_INFO == CONFIG_EUR:
                         max_workers = 200
                     elif CONFIG_INFO == CONFIG_CN:
                         max_workers = 100
 
-                    # 创建线程池
                     cls._instance.executor = ThreadPoolExecutor(
                         max_workers=max_workers,
                         thread_name_prefix="global-thread-pool"
                     )
 
-                    # 设置阻塞队列大小
-                    cls._instance.queue = Queue(maxsize=queue_size)
-
         return cls._instance
 
     def submit(self, fn, *args, **kwargs):
-        if self.queue.full():
-            raise Exception("Task queue is full, rejecting new tasks")
-        else:
-            self.queue.put(None)  # 占位符,表示提交了一个任务
-            return self.executor.submit(fn, *args, **kwargs)
+        return self.executor.submit(fn, *args, **kwargs)
 
     def shutdown(self, wait=True):
         self.executor.shutdown(wait)
-
-
-# 用法示例:
-class SomeService:
-    def some_method(self, **params):
-        thread_pool = GlobalThreadPool()
-        thread_pool.submit(self.some_task, **params)
-
-    def some_task(self, **params):
-        pass

+ 10 - 4
Service/DevicePushService.py

@@ -394,12 +394,18 @@ class DevicePushService:
                 else:
                     key = '{}/{}/{}_0.jpeg'.format(kwargs['uid'], kwargs['channel'], kwargs['n_time'])
                 # 开始异步推送图片
-                push_thread = threading.Thread(target=cls.async_send_picture_push, args=(
+                thread_pool = GlobalThreadPool()
+                thread_pool.submit(cls.async_send_picture_push, (
                     push_type, kwargs['aws_s3_client'], kwargs['bucket'], key,
                     kwargs['uid'], kwargs['appBundleId'], kwargs['token_val'], kwargs['event_type'], kwargs['n_time'],
-                    push_kwargs['msg_title'], push_kwargs['msg_text'], kwargs['channel'], kwargs['storage_location'],
-                    kwargs['redis_obj']))
-                push_thread.start()
+                    push_kwargs['msg_title'], push_kwargs['msg_text'], kwargs['channel'], kwargs['storage_location']))
+
+                # push_thread = threading.Thread(target=cls.async_send_picture_push, args=(
+                #     push_type, kwargs['aws_s3_client'], kwargs['bucket'], key,
+                #     kwargs['uid'], kwargs['appBundleId'], kwargs['token_val'], kwargs['event_type'], kwargs['n_time'],
+                #     push_kwargs['msg_title'], push_kwargs['msg_text'], kwargs['channel'], kwargs['storage_location']))
+                # push_thread.start()
+
                 push_result = True
 
             # 不推图