Prechádzať zdrojové kódy

error_info日志文件记录推送异常,线程池推送消息

locky 1 rok pred
rodič
commit
848cc0e0bf

+ 15 - 1
AnsjerPush/cn_config/cn_formal_settings.py

@@ -234,6 +234,15 @@ LOGGING = {
             'formatter': 'standard',
             'encoding': 'utf-8',
         },
+        'error_info': {
+            'level': 'INFO',
+            'class': 'logging.handlers.RotatingFileHandler',
+            'filename': BASE_DIR + '/static/log/error_info/info.log',
+            'backupCount': 10,
+            'maxBytes': 1024 * 1024 * 2 * 100,  # 100M
+            'formatter': 'standard',
+            'encoding': 'utf-8',
+        },
     },
     'loggers': {
         'django': {
@@ -261,6 +270,11 @@ LOGGING = {
             'handlers': ['customized_push'],
             'level': 'INFO',
             'propagate': False
-        }
+        },
+        'error_info': {
+            'handlers': ['error_info'],
+            'level': 'INFO',
+            'propagate': False
+        },
     }
 }

+ 79 - 15
AnsjerPush/dev_config/local_settings.py

@@ -176,9 +176,11 @@ LOGGING = {
     'disable_existing_loggers': True,
     'formatters': {
         'error_format': {
-            # 'format': '{"asctime":"%(asctime)s","thread":"%(threadName)s:%(thread)d","errorline":"%(lineno)d","errorlevel":"%(levelname)s","errorcontent":"%(message)s"}'
             'format': '%(asctime)s %(threadName)s %(thread)d %(lineno)d %(levelname)s %(message)s'
         },
+        'standard': {
+            'format': '[%(asctime)s] [%(filename)s:%(lineno)d] [%(module)s:%(funcName)s] '
+                      '[%(levelname)s]- %(message)s'},
     },
     'filters': {
     },
@@ -191,31 +193,93 @@ LOGGING = {
         'default': {
             'level': 'ERROR',
             'class': 'logging.handlers.RotatingFileHandler',
-            'filename': BASE_DIR + '/static/log/error.log',
+            'filename': BASE_DIR + '/static/log/error/error.log',
             'maxBytes': 1024 * 1024 * 5,  # 5 MB
             'backupCount': 5,
             'formatter': 'error_format',
         },
         'console': {
-            # 'level': 'ERROR',
-            'level': 'DEBUG',
+            'level': 'ERROR',
             'class': 'logging.StreamHandler',
             'formatter': 'error_format'
         },
+        'info': {
+            'level': 'INFO',
+            'class': 'logging.handlers.RotatingFileHandler',
+            'filename': BASE_DIR + '/static/log/info/info.log',
+            'backupCount': 10,
+            'maxBytes': 1024 * 1024 * 2 * 1024,  # 2G
+            'formatter': 'standard',
+            'encoding': 'utf-8',
+        },
+        'time': {
+            'level': 'INFO',
+            'class': 'logging.handlers.RotatingFileHandler',
+            'filename': BASE_DIR + '/static/log/time/info.log',
+            'backupCount': 10,
+            'maxBytes': 1024 * 1024 * 2 * 1024,  # 2G
+            'formatter': 'standard',
+            'encoding': 'utf-8',
+        },
+        'v1_push': {
+            'level': 'INFO',
+            'class': 'logging.handlers.RotatingFileHandler',
+            'filename': BASE_DIR + '/static/log/v1log/info.log',
+            'backupCount': 10,
+            'maxBytes': 1024 * 1024 * 2 * 1024,  # 2G
+            'formatter': 'standard',
+            'encoding': 'utf-8',
+        },
+        'customized_push': {
+            'level': 'INFO',
+            'class': 'logging.handlers.RotatingFileHandler',
+            'filename': BASE_DIR + '/static/log/customized_push/info.log',
+            'backupCount': 10,
+            'maxBytes': 1024 * 1024 * 2 * 100,  # 100M
+            'formatter': 'standard',
+            'encoding': 'utf-8',
+        },
+        'error_info': {
+            'level': 'INFO',
+            'class': 'logging.handlers.RotatingFileHandler',
+            'filename': BASE_DIR + '/static/log/error_info/info.log',
+            'backupCount': 10,
+            'maxBytes': 1024 * 1024 * 2 * 100,  # 100M
+            'formatter': 'standard',
+            'encoding': 'utf-8',
+        },
     },
     'loggers': {
         'django': {
             'handlers': ['default', 'console'],
-            # 'handlers': ['mail_admins','default','console'],
-            # 'level': 'ERROR',
-            'level': 'DEBUG',
-            'propagate': True
-        },
-        # 'django.db.backends': {
-        #     'handlers': ['console'],
-        #     'propagate': True,
-        #     'level': 'DEBUG',
-        # },
+            'level': 'ERROR',
+            'propagate': False
+        },
+        # log 调用时需要当作参数传入
+        'info': {
+            'handlers': ['info'],
+            'level': 'INFO',
+            'propagate': False
+        },
+        'time': {
+            'handlers': ['time'],
+            'level': 'INFO',
+            'propagate': False
+        },
+        'v1_push': {
+            'handlers': ['v1_push'],
+            'level': 'INFO',
+            'propagate': False
+        },
+        'customized_push': {
+            'handlers': ['customized_push'],
+            'level': 'INFO',
+            'propagate': False
+        },
+        'error_info': {
+            'handlers': ['error_info'],
+            'level': 'INFO',
+            'propagate': False
+        },
     }
 }
-

+ 15 - 1
AnsjerPush/eur_config/eur_formal_settings.py

@@ -228,6 +228,15 @@ LOGGING = {
             'formatter': 'standard',
             'encoding': 'utf-8',
         },
+        'error_info': {
+            'level': 'INFO',
+            'class': 'logging.handlers.RotatingFileHandler',
+            'filename': BASE_DIR + '/static/log/error_info/info.log',
+            'backupCount': 10,
+            'maxBytes': 1024 * 1024 * 2 * 100,  # 100M
+            'formatter': 'standard',
+            'encoding': 'utf-8',
+        },
     },
     'loggers': {
         'django': {
@@ -255,6 +264,11 @@ LOGGING = {
             'handlers': ['customized_push'],
             'level': 'INFO',
             'propagate': False
-        }
+        },
+        'error_info': {
+            'handlers': ['error_info'],
+            'level': 'INFO',
+            'propagate': False
+        },
     }
 }

+ 21 - 9
AnsjerPush/test_config/test_settings.py

@@ -53,15 +53,15 @@ WSGI_APPLICATION = 'AnsjerPush.wsgi.application'
 
 # 业务数据库
 DATABASE_DATA = 'ansjer_server_test'
-SERVER_HOST = 'server-cn.cvp7gfpnmziz.rds.cn-northwest-1.amazonaws.com.cn'
-DATABASES_USER = 'aws_rds'
-DATABASES_PASS = 'H84NQ8NARr9e39tn6aW5'
+SERVER_HOST = '124.70.222.33'
+DATABASES_USER = 'root'
+DATABASES_PASS = 'Ansjer123'
 
 # 推送数据库
 DATABASE_DATA2 = 'ansjer_push_test'
-SERVER_HOST2 = 'push-cn.cvp7gfpnmziz.rds.cn-northwest-1.amazonaws.com.cn'
-DATABASES_USER2 = 'aws_rds'
-DATABASES_PASS2 = 'Dil02uKDyd5Mxv7fhhHJ'
+SERVER_HOST2 = '124.70.222.33'
+DATABASES_USER2 = 'root'
+DATABASES_PASS2 = 'Ansjer123'
 
 DATABASES = {
     'default': {
@@ -72,7 +72,6 @@ DATABASES = {
         'HOST': SERVER_HOST,
         'PORT': '3306',
         'AUTOCOMMIT': True,
-        'CONN_MAX_AGE': 60,
         'OPTIONS': {'charset': 'utf8mb4',
                     'use_unicode': True,
                     'init_command': "SET sql_mode='STRICT_TRANS_TABLES'"
@@ -86,7 +85,6 @@ DATABASES = {
         'HOST': SERVER_HOST2,
         'PORT': '3306',
         'AUTOCOMMIT': True,
-        'CONN_MAX_AGE': 60,
         'OPTIONS': {'charset': 'utf8mb4',
                     'use_unicode': True,
                     'init_command': "SET sql_mode='STRICT_TRANS_TABLES'"
@@ -233,6 +231,15 @@ LOGGING = {
             'formatter': 'standard',
             'encoding': 'utf-8',
         },
+        'error_info': {
+            'level': 'INFO',
+            'class': 'logging.handlers.RotatingFileHandler',
+            'filename': BASE_DIR + '/static/log/error_info/info.log',
+            'backupCount': 10,
+            'maxBytes': 1024 * 1024 * 2 * 100,  # 100M
+            'formatter': 'standard',
+            'encoding': 'utf-8',
+        },
     },
     'loggers': {
         'django': {
@@ -260,6 +267,11 @@ LOGGING = {
             'handlers': ['customized_push'],
             'level': 'INFO',
             'propagate': False
-        }
+        },
+        'error_info': {
+            'handlers': ['error_info'],
+            'level': 'INFO',
+            'propagate': False
+        },
     }
 }

+ 15 - 1
AnsjerPush/us_config/formal_settings.py

@@ -235,6 +235,15 @@ LOGGING = {
             'formatter': 'standard',
             'encoding': 'utf-8',
         },
+        'error_info': {
+            'level': 'INFO',
+            'class': 'logging.handlers.RotatingFileHandler',
+            'filename': BASE_DIR + '/static/log/error_info/info.log',
+            'backupCount': 10,
+            'maxBytes': 1024 * 1024 * 2 * 100,  # 100M
+            'formatter': 'standard',
+            'encoding': 'utf-8',
+        },
     },
     'loggers': {
         'django': {
@@ -262,6 +271,11 @@ LOGGING = {
             'handlers': ['customized_push'],
             'level': 'INFO',
             'propagate': False
-        }
+        },
+        'error_info': {
+            'handlers': ['error_info'],
+            'level': 'INFO',
+            'propagate': False
+        },
     }
 }

+ 16 - 18
Controller/DetectControllerV2.py

@@ -4,22 +4,14 @@ import threading
 
 from django.http import JsonResponse
 from django.views.generic.base import View
-from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_CN, CONFIG_US
 
+from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_US, CONFIG_CN
+from Object.GlobalThreadPoolObject import GlobalThreadPool
 from Object.RedisObject import RedisObject
 from Service.DevicePushService import DevicePushService
-from concurrent.futures import ThreadPoolExecutor
 
 TIME_LOGGER = logging.getLogger('time')
-# 创建一个全局的线程池实例
-# 线程池最大数量
-# 1、CPU密集型:操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2。核数为4的话一般设置5或8
-# 2、IO密集型:文件操作,网络操作,数据库操作,一般线程设置为:CPU核数 / (1-0.9),核数为4的话,一般设置 40
-executor = ThreadPoolExecutor(max_workers=20)
-if CONFIG_INFO == CONFIG_US or CONFIG_INFO == CONFIG_EUR:
-    ThreadPoolExecutor(max_workers=80)
-elif CONFIG_INFO == CONFIG_CN:
-    ThreadPoolExecutor(max_workers=40)
+ERROR_INFO_LOGGER = logging.getLogger('error_info')
 
 
 # 移动侦测V2接口
@@ -141,7 +133,8 @@ class NotificationV2View(View):
                       'uid_set_push_list': uid_set_push_list}
 
             # 使用全局的线程池提交推送任务
-            executor.submit(push_and_save_data, **params)
+            thread_pool = GlobalThreadPool()
+            thread_pool.submit(push_and_save_data, **params)
 
             # 异步推送消息和保存数据
             # push_thread = threading.Thread(
@@ -170,8 +163,8 @@ class NotificationV2View(View):
                 uid, n_time, event_type, json.dumps(res_data)))
             return JsonResponse(status=200, data=res_data)
         except Exception as e:
-            TIME_LOGGER.info('V2推送接口异常uid:{},etk:{},error_line:{},error_msg:{}'.
-                             format(uid, etk, e.__traceback__.tb_lineno, repr(e)))
+            ERROR_INFO_LOGGER.info('V2推送接口异常,uid:{},etk:{},error_line:{},error_msg:{}'.
+                                   format(uid, etk, e.__traceback__.tb_lineno, repr(e)))
             data = {
                 'error_line': e.__traceback__.tb_lineno,
                 'error_msg': repr(e)
@@ -182,11 +175,16 @@ class NotificationV2View(View):
 def push_and_save_data(**params):
     uid = params['uid']
     TIME_LOGGER.info('{}开始异步存表和推送'.format(uid))
+
+    # 线程池推送消息
+    thread_pool = GlobalThreadPool()
+    thread_pool.submit(DevicePushService.push_msg, **params)
+
     # 异步推送消息
-    push_thread = threading.Thread(
-        target=DevicePushService.push_msg,
-        kwargs=params)
-    push_thread.start()
+    # push_thread = threading.Thread(
+    #     target=DevicePushService.push_msg,
+    #     kwargs=params)
+    # push_thread.start()
     # 保存推送数据
     result = DevicePushService.save_msg_push(**params)
     TIME_LOGGER.info('{}存表结果:{}'.format(uid, result))

+ 69 - 0
Object/GlobalThreadPoolObject.py

@@ -0,0 +1,69 @@
+# -*- encoding: utf-8 -*-
+"""
+@File    : GlobalThreadPoolObject.py
+@Time    : 2024/8/20 19:49
+@Author  : stephen
+@Email   : zhangdongming@asj6.wecom.work
+@Software: PyCharm
+"""
+import threading
+from concurrent.futures import ThreadPoolExecutor
+from queue import Queue
+
+from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_CN, CONFIG_US
+
+
+class GlobalThreadPool:
+    _instance = None
+    _lock = threading.Lock()
+
+    def __new__(cls):
+        if not cls._instance:
+            with cls._lock:
+                if not cls._instance:
+                    cls._instance = super(GlobalThreadPool, cls).__new__(cls)
+
+                    # 设置默认值
+                    max_workers = 20
+                    queue_size = 200000
+
+                    # 根据配置调整线程池参数 目前*2倍
+                    # 创建一个全局的线程池实例
+                    # 线程池最大数量
+                    # 1、CPU密集型:操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2。核数为4的话一般设置5或8
+                    # 2、IO密集型:文件操作,网络操作,数据库操作,一般线程设置为:CPU核数 / (1-0.9),核数为4的话,一般设置 40
+                    if CONFIG_INFO == CONFIG_US or CONFIG_INFO == CONFIG_EUR:
+                        max_workers = 200
+                    elif CONFIG_INFO == CONFIG_CN:
+                        max_workers = 100
+
+                    # 创建线程池
+                    cls._instance.executor = ThreadPoolExecutor(
+                        max_workers=max_workers,
+                        thread_name_prefix="global-thread-pool"
+                    )
+
+                    # 设置阻塞队列大小
+                    cls._instance.queue = Queue(maxsize=queue_size)
+
+        return cls._instance
+
+    def submit(self, fn, *args, **kwargs):
+        if self.queue.full():
+            raise Exception("Task queue is full, rejecting new tasks")
+        else:
+            self.queue.put(None)  # 占位符,表示提交了一个任务
+            return self.executor.submit(fn, *args, **kwargs)
+
+    def shutdown(self, wait=True):
+        self.executor.shutdown(wait)
+
+
+# 用法示例:
+class SomeService:
+    def some_method(self, **params):
+        thread_pool = GlobalThreadPool()
+        thread_pool.submit(self.some_task, **params)
+
+    def some_task(self, **params):
+        pass

+ 17 - 10
Service/DevicePushService.py

@@ -33,9 +33,11 @@ from Service.EquipmentInfoService import EquipmentInfoService, EQUIPMENT_INFO_DI
 from Service.HuaweiPushService.HuaweiPushService import HuaweiPushObject
 from Service.PushService import PushObject
 from django.db import close_old_connections
+from Object.GlobalThreadPoolObject import GlobalThreadPool
 
 LOGGING = logging.getLogger('info')
 TIME_LOGGER = logging.getLogger('time')
+ERROR_INFO_LOGGER = logging.getLogger('error_info')
 
 
 class DevicePushService:
@@ -208,16 +210,17 @@ class DevicePushService:
                     params['lang'] = lang
                     params['tz'] = tz
                     params['push_type'] = push_type
-                    params['redis_obj'] = redis_obj
 
-                    push_thread = threading.Thread(
-                        target=cls.send_app_msg_push,
-                        kwargs=params
-                    )
-                    push_thread.start()
+                    GlobalThreadPool().submit(cls.send_app_msg_push, **params)
+                    # push_thread = threading.Thread(
+                    #     target=cls.send_app_msg_push,
+                    #     kwargs=params
+                    # )
+                    # push_thread.start()
         except Exception as e:
-            TIME_LOGGER.info('APP通知V2推送接口异常uid:{},error_line:{},error_msg:{}'
-                             .format(params['uid'], e.__traceback__.tb_lineno, repr(e)))
+            ERROR_INFO_LOGGER.info(
+                '推送消息线程异常,uid:{},error_line:{},error_msg:{}'
+                .format(params['uid'], e.__traceback__.tb_lineno, repr(e)))
 
     @classmethod
     def save_msg_push(cls, **params):
@@ -310,7 +313,9 @@ class DevicePushService:
 
             return True
         except Exception as e:
-            LOGGING.info('推送消息或存表异常uid:{}, error_line:{}, error_msg:{}'.format(uid, e.__traceback__.tb_lineno, repr(e)))
+            ERROR_INFO_LOGGER.info(
+                '保存推送数据和推送消息线程异常,uid:{}, error_line:{}, error_msg:{}'.
+                format(uid, e.__traceback__.tb_lineno, repr(e)))
             return False
 
     @classmethod
@@ -441,12 +446,14 @@ class DevicePushService:
                     push_result = PushObject.android_honorpush(**push_kwargs)
 
             if kwargs['event_type'] in [606, 607]:
+                close_old_connections()
                 # 写入日志表
                 PushLog.objects.create(uid=uid, event_type=kwargs['event_type'], created_time=int(time.time()),
                                        content=push_kwargs, push_result=push_result, push_type=push_type)
             return push_result
         except Exception as e:
-            LOGGING.error('发送推送异常,error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+            ERROR_INFO_LOGGER.info(
+                '发送推送线程异常,error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
             return False
 
     @staticmethod