瀏覽代碼

推送数据不保存到旧表,旧推送使用异步推送,优化代码

locky 1 年之前
父節點
當前提交
418a36d958
共有 3 個文件被更改,包括 76 次插入150 次删除
  1. 38 90
      Controller/DetectController.py
  2. 7 17
      Controller/DetectControllerV2.py
  3. 31 43
      Service/DevicePushService.py

+ 38 - 90
Controller/DetectController.py

@@ -1,5 +1,6 @@
 import json
 import logging
+import threading
 
 import oss2
 from django.http import JsonResponse
@@ -7,13 +8,10 @@ from django.views.generic.base import View
 
 from AnsjerPush.config import CONFIG_INFO, CONFIG_CN
 from AnsjerPush.config import OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET
-from Model.models import SysMsgModel
 from Object.RedisObject import RedisObject
-from Object.utils import LocalDateTimeUtil
 from Service.DevicePushService import DevicePushService
-from Service.EquipmentInfoService import EquipmentInfoService
 
-logger = logging.getLogger('v1_push')
+V1_PUSH_LOGGER = logging.getLogger('v1_push')
 
 
 # 旧移动侦测接口
@@ -27,7 +25,8 @@ class NotificationView(View):
         request.encoding = 'utf-8'
         return self.validation(request.POST)
 
-    def validation(self, request_dict):
+    @staticmethod
+    def validation(request_dict):
         """
         设备触发报警消息推送
         @param request_dict:uidToken 加密uid
@@ -50,12 +49,14 @@ class NotificationView(View):
             uid = DevicePushService.decode_uid(etk, uidToken)  # 解密uid
             if len(uid) != 20 and len(uid) != 14:
                 return JsonResponse(status=200, data={'code': 404, 'msg': 'wrong uid'})
-            logger.info("旧移动侦测接口的uid:{},时间戳:{},事件类型:{}".format(uid, n_time, event_type))
+
+            V1_PUSH_LOGGER.info('旧移动侦测接口uid:{},时间戳:{},事件类型:{}'.format(uid, n_time, event_type))
+
             event_type = int(event_type)
             pkey = '{uid}_{channel}_{event_type}_ptl'.format(uid=uid, event_type=event_type, channel=channel)
             ykey = '{uid}_redis_qs'.format(uid=uid)
-            is_sys_msg = self.is_sys_msg(event_type)
-            if is_sys_msg is True:
+            is_sys_msg = DevicePushService.judge_sys_msg(event_type)
+            if is_sys_msg:
                 dkey = '{uid}_{channel}_{event_type}_flag'.format(uid=uid, event_type=event_type, channel=channel)
             else:
                 dkey = '{uid}_{channel}_flag'.format(uid=uid, channel=channel)
@@ -66,8 +67,7 @@ class NotificationView(View):
 
             # 一分钟外,推送开启状态
             detect_med_type = 0  # 0推送旧机制 1存库不推送,2推送存库
-            # 暂时注销
-            if event_type != 606:
+            if event_type not in [606, 607]:
                 if have_pkey:
                     res_data = {'code': 0, 'msg': 'Push it once a minute'}
                     return JsonResponse(status=200, data=res_data)
@@ -79,7 +79,7 @@ class NotificationView(View):
                 # 从数据库查询出来
                 uid_push_qs = DevicePushService.query_uid_push(uid, event_type)
                 if not uid_push_qs.exists():
-                    logger.info('消息推送-uid_push 数据不存在')
+                    V1_PUSH_LOGGER.info('消息推送-uid_push 数据不存在')
                     return JsonResponse(status=200, data={'code': 176, 'msg': 'no uid_push data'})
                 # 修改redis数据,并设置过期时间为10分钟
                 uid_push_list = DevicePushService.qs_to_list(uid_push_qs)
@@ -125,100 +125,48 @@ class NotificationView(View):
                       'electricity': '', 'bucket': bucket, 'app_push': have_dkey, 'storage_location': 1, 'ai_type': 0,
                       'dealings_type': 0, 'detection': 0, 'device_type': 1, 'app_push_config': '',
                       'uid_set_push_list': uid_push_list}
-            #  推送以及报警消息存库
-            result = DevicePushService.save_msg_push(**params)
-            if result['code_date'] is None:
-                result['code_date'] = {'do_apns_code': '', 'do_fcm_code': '', 'do_jpush_code': ''}
-            if detect_med_type == 1:
-                result['code_date']['do_apns_code'] = '只存库不推送'
-                result['code_date']['do_fcm_code'] = '只存库不推送'
-                result['code_date']['do_jpush_code'] = '只存库不推送'
-            if is_sys_msg:
-                SysMsgModel.objects.bulk_create(result['sys_msg_list'])
-            else:
-                if result['new_device_info_list'] and len(result['new_device_info_list']) > 0:
-                    # 根据日期获得星期几
-                    week = LocalDateTimeUtil.date_to_week(result['local_date_time'])
-                    EquipmentInfoService.equipment_info_bulk_create(week, result['new_device_info_list'])
+
+            # 异步推送消息和保存数据
+            push_thread = threading.Thread(
+                target=push_and_save_data,
+                kwargs=params)
+            push_thread.start()
+
+            res_data = {}
             if is_st == '0' or is_st == '2':
-                print("is_st=0or2")
-                for up in uid_push_list:
-                    if up['push_type'] == 0:  # ios apns
-                        up['do_apns_code'] = result['code_date']['do_apns_code']
-                    elif up['push_type'] == 1:  # android gcm
-                        up['do_fcm_code'] = result['code_date']['do_fcm_code']
-                    elif up['push_type'] == 2:  # android jpush
-                        up['do_jpush_code'] = result['code_date']['do_jpush_code']
-                    del up['push_type']
-                    del up['userID_id']
-                    del up['userID__NickName']
-                    del up['lang']
-                    del up['tz']
-                    del up['uid_set__nickname']
-                    del up['uid_set__detect_interval']
-                    del up['uid_set__detect_group']
-                return JsonResponse(status=200, data={'code': 0, 'msg': 'success 0 or 2'})
+                res_data = {'code': 0, 'msg': 'success 0 or 2'}
+                return JsonResponse(status=200, data=res_data)
 
             elif is_st == '1':
-                print("is_st=1")
-                # Endpoint以杭州为例,其它Region请按实际情况填写。
                 obj = '{uid}/{channel}/{filename}.jpeg'.format(uid=uid, channel=channel, filename=n_time)
-                # 设置此签名URL在60秒内有效。
-                url = bucket.sign_url('PUT', obj, 7200)
-                for up in uid_push_list:
-                    up['do_apns_code'] = result['code_date']['do_apns_code']
-                    up['do_fcm_code'] = result['code_date']['do_fcm_code']
-                    up['do_jpush_code'] = result['code_date']['do_jpush_code']
-                    del up['push_type']
-                    del up['userID_id']
-                    del up['userID__NickName']
-                    del up['lang']
-                    del up['tz']
-                    del up['uid_set__nickname']
-                    del up['uid_set__detect_interval']
-                    del up['uid_set__detect_group']
-                res_data = {'code': 0, 'img_push': url, 'msg': 'success'}
-                logger.info('推送响应,uid:{},n_time:{},事件类型:{},响应:{}'.format(uid, n_time, event_type, json.dumps(res_data)))
-                return JsonResponse(status=200, data=res_data)
+                url = bucket.sign_url('PUT', obj, 3600)
+                res_data = {'code': 0, 'img_push': url, 'msg': 'success 1'}
 
             elif is_st == '3':
-                print("is_st=3")
-                # 人形检测带动图
-                # Endpoint以杭州为例,其它Region请按实际情况填写。
                 img_url_list = []
                 for i in range(int(is_st)):
                     obj = '{uid}/{channel}/{filename}_{st}.jpeg'. \
                         format(uid=uid, channel=channel, filename=n_time, st=i)
-                    # 设置此签名URL在60秒内有效。
-                    url = bucket.sign_url('PUT', obj, 7200)
+                    url = bucket.sign_url('PUT', obj, 3600)
                     img_url_list.append(url)
 
-                for up in uid_push_list:
-                    up['do_apns_code'] = result['code_date']['do_apns_code']
-                    up['do_fcm_code'] = result['code_date']['do_fcm_code']
-                    up['do_jpush_code'] = result['code_date']['do_jpush_code']
-                    del up['push_type']
-                    del up['userID_id']
-                    del up['userID__NickName']
-                    del up['lang']
-                    del up['tz']
-                    del up['uid_set__nickname']
-                    del up['uid_set__detect_interval']
-                    del up['uid_set__detect_group']
-
                 res_data = {'code': 0, 'img_url_list': img_url_list, 'msg': 'success 3'}
-                logger.info('推送响应,uid:{},n_time:{},事件类型:{},响应:{}'.format(uid, n_time, event_type, json.dumps(res_data)))
-                return JsonResponse(status=200, data=res_data)
+
+            V1_PUSH_LOGGER.info('旧推送接口响应,uid:{},n_time:{},事件类型:{},响应:{}'.
+                                format(uid, n_time, event_type, json.dumps(res_data)))
+            return JsonResponse(status=200, data=res_data)
+
         except Exception as e:
-            logger.info('旧推送接口异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+            V1_PUSH_LOGGER.info('旧推送接口异常,error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
             data = {
-                'errLine': e.__traceback__.tb_lineno,
-                'errMsg': repr(e),
+                'error_line': e.__traceback__.tb_lineno,
+                'error_msg': repr(e)
             }
             return JsonResponse(status=200, data=json.dumps(data), safe=False)
 
-    def is_sys_msg(self, event_type):
-        event_type_list = [702, 703, 704]
-        if event_type in event_type_list:
-            return True
-        return False
+
+def push_and_save_data(**params):
+    V1_PUSH_LOGGER.info('{}开始异步存表和推送'.format(params['uid']))
+    # 保存推送数据和推送消息
+    result = DevicePushService.save_msg_push(**params)
+    V1_PUSH_LOGGER.info('存表和推送结果:{}'.format(result))

+ 7 - 17
Controller/DetectControllerV2.py

@@ -67,7 +67,7 @@ class NotificationV2View(View):
                 return JsonResponse(status=200, data={'code': 404, 'msg': 'wrong uid'})
             TIME_LOGGER.info('开始推送,uid:{},n_time:{},事件类型:{}'.format(uid, n_time, event_type))
             # 判断是否为系统消息
-            is_sys_msg = DevicePushService.judge_sys_msg(int(event_type))
+            is_sys_msg = DevicePushService.judge_sys_msg(event_type)
             if is_sys_msg:
                 push_interval = '{uid}_{channel}_{event_type}_flag'.format(uid=uid, channel=channel,
                                                                            event_type=event_type)
@@ -152,26 +152,16 @@ class NotificationV2View(View):
                 uid, n_time, event_type, json.dumps(result_dict)))
             return JsonResponse(status=200, data=result_dict)
         except Exception as e:
-            logger.info('V2推送接口异常, errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+            logger.info('V2推送接口异常,error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
             data = {
-                'errLine': e.__traceback__.tb_lineno,
-                'errMsg': repr(e),
+                'error_line': e.__traceback__.tb_lineno,
+                'error_msg': repr(e)
             }
             return JsonResponse(status=200, data=json.dumps(data), safe=False)
 
 
 def push_and_save_data(**params):
-    TIME_LOGGER.info('{}开始异步推送'.format(params['uid']))
-    # 推送消息,生成推送数据列表
+    TIME_LOGGER.info('{}开始异步存表和推送'.format(params['uid']))
+    # 保存推送数据和推送消息
     result = DevicePushService.save_msg_push(**params)
-    # 保存推送数据
-    TIME_LOGGER.info('{}开始异步存表'.format(params['uid']))
-    save_success = DevicePushService.save_sys_msg(
-        params['is_sys_msg'],
-        result['local_date_time'],
-        result['sys_msg_list'],
-        result['new_device_info_list'])
-    if save_success:
-        TIME_LOGGER.info('{}异步存表成功'.format(params['uid']))
-    else:
-        TIME_LOGGER.info('{}异步存表失败'.format(params['uid']))
+    TIME_LOGGER.info('存表和推送结果:{}'.format(result))

+ 31 - 43
Service/DevicePushService.py

@@ -140,22 +140,20 @@ class DevicePushService:
     @classmethod
     def save_msg_push(cls, **params):
         """
-        推送消息,返回推送数据列表
+        保存推送数据和推送消息
         @param params: 推送参数
-        @return: dict
+        @return: bool
         """
-        new_device_info_list = []
         sys_msg_list = []
-        userID_ids = []
+        saved_user_id_list = []
         kwag_args = params['kwag_args']
-        code_data = {'do_apns_code': '', 'do_fcm_code': '', 'do_jpush_code': ''}
-        local_date_time = ''
-        # push_permission = True 多通道权限限制接收
+        now_time = int(time.time())
         try:
             params['event_tag'] = cls.get_event_tag(params['ai_type'], params['event_type'], params['detection'])
             is_app_push = True if params['event_type'] in [606, 607] else \
                 cls.is_send_app_push(params['event_type'], params['event_tag'], params['app_push_config'],
                                      params['app_push'], params['uid'])
+
             for up in params['uid_set_push_list']:
                 appBundleId = up['appBundleId']
                 token_val = up['token_val']
@@ -180,38 +178,43 @@ class DevicePushService:
                 else:
                     if 'jg_token_val' in kwag_args:
                         kwag_args.pop('jg_token_val')
-                local_date_time = CommonService.get_now_time_str(n_time=params['n_time'], tz=tz, lang='cn')
-                local_date_time = local_date_time[0:10]
-                # 以下是存库
-                userID_id = up["userID_id"]
-                if userID_id not in userID_ids:
-                    now_time = int(time.time())
+
+                # 保存系统消息或推送消息数据
+                user_id = up['userID_id']
+                if user_id not in saved_user_id_list:     # 防止同一用户重复写入数据
+                    # 系统消息
                     if params['is_sys_msg']:
                         sys_msg_text = cls.get_msg_text(channel=params['channel'], n_time=params['n_time'], lang=lang,
                                                         tz=tz, is_sys=1, device_type=params['device_type'],
                                                         event_type=params['event_type'],
                                                         electricity=params['electricity'],
                                                         )
-                        sys_msg_list.append(SysMsgModel(userID_id=userID_id, msg=sys_msg_text, addTime=now_time,
+                        sys_msg_list.append(SysMsgModel(userID_id=user_id, msg=sys_msg_text, addTime=now_time,
                                                         updTime=now_time, uid=params['uid'],
                                                         eventType=params['event_type']))
+                    # 推送消息
                     else:
-                        params['userID_id'] = userID_id
-                        # push_permission = DevicePushService.check_share_permission(userID_id,
-                        # params['channel'],params['uid'])
-                        new_device_info_list.append(cls.created_device_vo(local_date_time, **params))
+                        params['userID_id'] = user_id
                         cls.save_equipment_info(**params)
-                    userID_ids.append(userID_id)
-                params['appBundleId'] = appBundleId
-                params['token_val'] = token_val
-                params['lang'] = lang
-                params['tz'] = tz
-                params['kwag_args'] = kwag_args
-                code_data = cls.send_app_msg_push(up['push_type'], **params) if is_app_push else code_data
-            return {'code_date': code_data, 'new_device_info_list': new_device_info_list, 'sys_msg_list': sys_msg_list,
-                    'local_date_time': local_date_time}
+                    saved_user_id_list.append(user_id)
+
+                # 推送
+                if is_app_push:
+                    params['appBundleId'] = appBundleId
+                    params['token_val'] = token_val
+                    params['lang'] = lang
+                    params['tz'] = tz
+                    params['kwag_args'] = kwag_args
+                    cls.send_app_msg_push(up['push_type'], **params)
+
+            # 写入系统消息
+            if sys_msg_list:
+                SysMsgModel.objects.bulk_create(sys_msg_list)
+
+            return True
         except Exception as e:
-            LOGGING.info('推送消息或存表异常: errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+            LOGGING.info('推送消息或存表异常: error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+            return False
 
     @classmethod
     def get_event_tag(cls, ai_type, event_type, detection=0):
@@ -337,21 +340,6 @@ class DevicePushService:
             LOGGING.info('异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
             return None
 
-    @classmethod
-    def save_sys_msg(cls, is_sys_msg, local_date_time, sys_msg_list, new_device_info_list):
-        """
-        保存系统消息&设备推送消息存库
-        """
-        if is_sys_msg:
-            SysMsgModel.objects.bulk_create(sys_msg_list)
-        else:
-            # new 分表批量存储 设备信息
-            if new_device_info_list and len(new_device_info_list) > 0:
-                # 根据日期获得星期几
-                week = LocalDateTimeUtil.date_to_week(local_date_time)
-                EquipmentInfoService.equipment_info_bulk_create(week, new_device_info_list)
-        return True
-
     @classmethod
     def created_device_vo(cls, local_date_time, **params):
         """