# -*- encoding: utf-8 -*- """ @File : GlobalThreadPoolObject.py @Time : 2024/8/20 19:49 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import threading from concurrent.futures import ThreadPoolExecutor from queue import Queue from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_CN, CONFIG_US class GlobalThreadPool: _instance = None _lock = threading.Lock() def __new__(cls): if not cls._instance: with cls._lock: if not cls._instance: cls._instance = super(GlobalThreadPool, cls).__new__(cls) # 设置默认值 max_workers = 20 queue_size = 200000 # 根据配置调整线程池参数 目前*2倍 # 创建一个全局的线程池实例 # 线程池最大数量 # 1、CPU密集型:操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2。核数为4的话一般设置5或8 # 2、IO密集型:文件操作,网络操作,数据库操作,一般线程设置为:CPU核数 / (1-0.9),核数为4的话,一般设置 40 if CONFIG_INFO == CONFIG_US or CONFIG_INFO == CONFIG_EUR: max_workers = 200 elif CONFIG_INFO == CONFIG_CN: max_workers = 100 # 创建线程池 cls._instance.executor = ThreadPoolExecutor( max_workers=max_workers, thread_name_prefix="global-thread-pool" ) # 设置阻塞队列大小 cls._instance.queue = Queue(maxsize=queue_size) return cls._instance def submit(self, fn, *args, **kwargs): if self.queue.full(): raise Exception("Task queue is full, rejecting new tasks") else: self.queue.put(None) # 占位符,表示提交了一个任务 return self.executor.submit(fn, *args, **kwargs) def shutdown(self, wait=True): self.executor.shutdown(wait) # 用法示例: class SomeService: def some_method(self, **params): thread_pool = GlobalThreadPool() thread_pool.submit(self.some_task, **params) def some_task(self, **params): pass