123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- # -*- encoding: utf-8 -*-
- """
- @File : GlobalThreadPoolObject.py
- @Time : 2024/8/20 19:49
- @Author : stephen
- @Email : zhangdongming@asj6.wecom.work
- @Software: PyCharm
- """
- import threading
- from concurrent.futures import ThreadPoolExecutor
- from queue import Queue
- from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_CN, CONFIG_US
- class GlobalThreadPool:
- _instance = None
- _lock = threading.Lock()
- def __new__(cls):
- if not cls._instance:
- with cls._lock:
- if not cls._instance:
- cls._instance = super(GlobalThreadPool, cls).__new__(cls)
- # 设置默认值
- max_workers = 20
- queue_size = 200000
- # 根据配置调整线程池参数 目前*2倍
- # 创建一个全局的线程池实例
- # 线程池最大数量
- # 1、CPU密集型:操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2。核数为4的话一般设置5或8
- # 2、IO密集型:文件操作,网络操作,数据库操作,一般线程设置为:CPU核数 / (1-0.9),核数为4的话,一般设置 40
- if CONFIG_INFO == CONFIG_US or CONFIG_INFO == CONFIG_EUR:
- max_workers = 200
- elif CONFIG_INFO == CONFIG_CN:
- max_workers = 100
- # 创建线程池
- cls._instance.executor = ThreadPoolExecutor(
- max_workers=max_workers,
- thread_name_prefix="global-thread-pool"
- )
- # 设置阻塞队列大小
- cls._instance.queue = Queue(maxsize=queue_size)
- return cls._instance
- def submit(self, fn, *args, **kwargs):
- if self.queue.full():
- raise Exception("Task queue is full, rejecting new tasks")
- else:
- self.queue.put(None) # 占位符,表示提交了一个任务
- return self.executor.submit(fn, *args, **kwargs)
- def shutdown(self, wait=True):
- self.executor.shutdown(wait)
- # 用法示例:
- class SomeService:
- def some_method(self, **params):
- thread_pool = GlobalThreadPool()
- thread_pool.submit(self.some_task, **params)
- def some_task(self, **params):
- pass
|