Kaynağa Gözat

网关推送和场景日志推送使用异步推送

locky 1 yıl önce
ebeveyn
işleme
bab60cba2b
1 değiştirilmiş dosya ile 99 ekleme ve 59 silme
  1. 99 59
      Controller/gatewayController.py

+ 99 - 59
Controller/gatewayController.py

@@ -4,6 +4,7 @@
 @Time : 2022/5/9 10:51
 @File :gatewayController.py
 """
+import threading
 import time
 
 from django.views.generic.base import View
@@ -125,42 +126,18 @@ class GatewayView(View):
                 equipment_info_kwargs['device_user_id'] = user_id
                 equipment_info_list.append(equipment_info_model(**equipment_info_kwargs))
 
-                # 查询推送配置数据
-                gateway_push_qs = GatewayPush.objects.filter(user_id=user_id, logout=False). \
-                    values('user_id', 'app_bundle_id', 'app_type', 'push_type', 'token_val', 'm_code', 'lang', 'm_code',
-                           'tz')
-                if not gateway_push_qs.exists():
-                    continue
-
-                kwargs = {
+                # 开启异步推送
+                push_kwargs = {
+                    'user_id': user_id,
                     'n_time': n_time,
                     'event_type': event_type,
                     'nickname': nickname,
+                    'alarm': alarm,
                 }
-
-                # 推送到每台登录账号的手机
-                for gateway_push in gateway_push_qs:
-                    app_bundle_id = gateway_push['app_bundle_id']
-                    push_type = gateway_push['push_type']
-                    token_val = gateway_push['token_val']
-                    lang = gateway_push['lang']
-                    tz = gateway_push['tz'] if gateway_push['tz'] else 0
-
-                    # 获取推送所需数据
-                    msg_title = PushObject.get_msg_title(nickname)
-                    msg_text = PushObject.get_gateway_msg_text(n_time, tz, lang, alarm)
-
-                    kwargs['msg_title'] = msg_title
-                    kwargs['msg_text'] = msg_text
-                    kwargs['app_bundle_id'] = app_bundle_id
-                    kwargs['token_val'] = token_val
-
-                    try:
-                        # 推送消息
-                        cls.push_msg(push_type, **kwargs)
-                    except Exception as e:
-                        LOGGER.info('网关推送消息异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
-                        continue
+                push_thread = threading.Thread(
+                    target=cls.gateway_push_msg,
+                    kwargs=push_kwargs)
+                push_thread.start()
 
             if equipment_info_list:
                 equipment_info_model.objects.bulk_create(equipment_info_list)
@@ -331,38 +308,101 @@ class GatewayView(View):
                        'tz')
             if not gateway_push_qs.exists():
                 return response.json(174)
-            for task in tasks:
-                event_type = task['event_type']
-                if event_type == '1001':
-                    kwargs = {
-                        'n_time': n_time,
-                        'event_type': event_type,
-                        'nickname': nickname,
-                    }
-                    event_info = task['value']
-                    # 推送到每台登录账号的手机
-                    for gateway_push in gateway_push_qs:
-                        app_bundle_id = gateway_push['app_bundle_id']
-                        push_type = gateway_push['push_type']
-                        token_val = gateway_push['token_val']
-
-                        kwargs['msg_title'] = PushObject.get_msg_title(nickname)
-                        kwargs['msg_text'] = event_info
-                        kwargs['app_bundle_id'] = app_bundle_id
-                        kwargs['token_val'] = token_val
-
-                        try:
-                            # 推送消息
-                            cls.push_msg(push_type, **kwargs)
-                        except Exception as e:
-                            LOGGER.info(
-                                '场景日志推送消息异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
-                            continue
+
+            # 开启异步推送
+            push_kwargs = {
+                'tasks': tasks,
+                'n_time': n_time,
+                'nickname': nickname,
+                'gateway_push_qs': gateway_push_qs
+            }
+            push_thread = threading.Thread(
+                target=cls.scene_msg_push,
+                kwargs=push_kwargs)
+            push_thread.start()
+
             return response.json(0)
         except Exception as e:
             LOGGER.info('---场景日志推送接口异常--- {}'.format(repr(e)))
             return response.json(500, 'error_ine:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
+    @classmethod
+    def gateway_push_msg(cls, **push_kwargs):
+        user_id = push_kwargs['user_id']
+        # 查询推送配置数据
+        gateway_push_qs = GatewayPush.objects.filter(user_id=user_id, logout=False). \
+            values('user_id', 'app_bundle_id', 'app_type', 'push_type', 'token_val', 'm_code', 'lang', 'm_code',
+                   'tz')
+        if gateway_push_qs.exists():
+            n_time = push_kwargs['n_time']
+            event_type = push_kwargs['event_type']
+            nickname = push_kwargs['nickname']
+            alarm = push_kwargs['alarm']
+
+            kwargs = {
+                'n_time': n_time,
+                'event_type': event_type,
+                'nickname': nickname,
+            }
+
+            # 推送到每台登录账号的手机
+            for gateway_push in gateway_push_qs:
+                app_bundle_id = gateway_push['app_bundle_id']
+                push_type = gateway_push['push_type']
+                token_val = gateway_push['token_val']
+                lang = gateway_push['lang']
+                tz = gateway_push['tz'] if gateway_push['tz'] else 0
+
+                # 获取推送所需数据
+                msg_title = PushObject.get_msg_title(nickname)
+                msg_text = PushObject.get_gateway_msg_text(n_time, tz, lang, alarm)
+
+                kwargs['msg_title'] = msg_title
+                kwargs['msg_text'] = msg_text
+                kwargs['app_bundle_id'] = app_bundle_id
+                kwargs['token_val'] = token_val
+
+                try:
+                    # 推送消息
+                    cls.push_msg(push_type, **kwargs)
+                except Exception as e:
+                    LOGGER.info('网关推送消息异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+                    continue
+
+    @classmethod
+    def scene_msg_push(cls, **push_kwargs):
+        tasks = push_kwargs['tasks']
+        n_time = push_kwargs['n_time']
+        nickname = push_kwargs['nickname']
+        gateway_push_qs = push_kwargs['gateway_push_qs']
+        for task in tasks:
+            event_type = task['event_type']
+            if event_type == '1001':
+                kwargs = {
+                    'n_time': n_time,
+                    'event_type': event_type,
+                    'nickname': nickname,
+                }
+                event_info = task['value']
+                # 推送到每台登录账号的手机
+                for gateway_push in gateway_push_qs:
+                    app_bundle_id = gateway_push['app_bundle_id']
+                    push_type = gateway_push['push_type']
+                    token_val = gateway_push['token_val']
+
+                    kwargs['msg_title'] = PushObject.get_msg_title(nickname)
+                    kwargs['msg_text'] = event_info
+                    kwargs['app_bundle_id'] = app_bundle_id
+                    kwargs['token_val'] = token_val
+
+                    try:
+                        # 推送消息
+                        cls.push_msg(push_type, **kwargs)
+                    except Exception as e:
+                        LOGGER.info(
+                            '场景日志推送消息异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+                        continue
+
     @staticmethod
     def push_msg(push_type, **kwargs):
         """