GlobalThreadPoolObject.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : GlobalThreadPoolObject.py
  4. @Time : 2024/8/20 19:49
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. import threading
  10. from concurrent.futures import ThreadPoolExecutor
  11. from queue import Queue
  12. from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_CN, CONFIG_US
  13. class GlobalThreadPool:
  14. _instance = None
  15. _lock = threading.Lock()
  16. def __new__(cls):
  17. if not cls._instance:
  18. with cls._lock:
  19. if not cls._instance:
  20. cls._instance = super(GlobalThreadPool, cls).__new__(cls)
  21. # 设置默认值
  22. max_workers = 20
  23. queue_size = 200000
  24. # 根据配置调整线程池参数 目前*2倍
  25. # 创建一个全局的线程池实例
  26. # 线程池最大数量
  27. # 1、CPU密集型:操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2。核数为4的话一般设置5或8
  28. # 2、IO密集型:文件操作,网络操作,数据库操作,一般线程设置为:CPU核数 / (1-0.9),核数为4的话,一般设置 40
  29. if CONFIG_INFO == CONFIG_US or CONFIG_INFO == CONFIG_EUR:
  30. max_workers = 200
  31. elif CONFIG_INFO == CONFIG_CN:
  32. max_workers = 100
  33. # 创建线程池
  34. cls._instance.executor = ThreadPoolExecutor(
  35. max_workers=max_workers,
  36. thread_name_prefix="global-thread-pool"
  37. )
  38. # 设置阻塞队列大小
  39. cls._instance.queue = Queue(maxsize=queue_size)
  40. return cls._instance
  41. def submit(self, fn, *args, **kwargs):
  42. if self.queue.full():
  43. raise Exception("Task queue is full, rejecting new tasks")
  44. else:
  45. self.queue.put(None) # 占位符,表示提交了一个任务
  46. return self.executor.submit(fn, *args, **kwargs)
  47. def shutdown(self, wait=True):
  48. self.executor.shutdown(wait)
  49. # 用法示例:
  50. class SomeService:
  51. def some_method(self, **params):
  52. thread_pool = GlobalThreadPool()
  53. thread_pool.submit(self.some_task, **params)
  54. def some_task(self, **params):
  55. pass