فهرست منبع

合并主分支代码

locky 11 ماه پیش
والد
کامیت
7cc1f5a19b
4فایلهای تغییر یافته به همراه85 افزوده شده و 65 حذف شده
  1. 10 10
      Controller/DetectController.py
  2. 25 22
      Controller/DetectControllerV2.py
  3. 3 2
      Object/OCIObjectStorage.py
  4. 47 31
      Service/DevicePushService.py

+ 10 - 10
Controller/DetectController.py

@@ -44,7 +44,7 @@ class NotificationView(View):
         is_st = request_dict.get('is_st', None)
         if not all([channel, n_time]):
             return JsonResponse(status=200, data={'code': 444, 'msg': 'error channel or n_time'})
-        redisObj = RedisObject(db=6)
+        redis_obj = RedisObject()
         try:
             uid = DevicePushService.decode_uid(etk, uidToken)  # 解密uid
             if len(uid) != 20 and len(uid) != 14:
@@ -61,9 +61,9 @@ class NotificationView(View):
             else:
                 dkey = '{}_{}_flag'.format(uid, channel)
 
-            have_ykey = redisObj.get_data(key=ykey)  # uid_set 数据库缓存
-            have_pkey = redisObj.get_data(key=pkey)  # 一分钟限制key
-            have_dkey = redisObj.get_data(key=dkey)  # 推送类型限制
+            have_ykey = redis_obj.get_data(key=ykey)  # uid_set 数据库缓存
+            have_pkey = redis_obj.get_data(key=pkey)  # 一分钟限制key
+            have_dkey = redis_obj.get_data(key=dkey)  # 推送类型限制
 
             # 一分钟外,推送开启状态
             detect_med_type = 0  # 0推送旧机制 1存库不推送,2推送存库
@@ -74,7 +74,7 @@ class NotificationView(View):
 
             # 数据库读取数据
             if have_ykey:
-                uid_push_list = eval(redisObj.get_data(key=ykey))
+                uid_push_list = eval(redis_obj.get_data(key=ykey))
             else:
                 # 从数据库查询出来
                 uid_push_qs = DevicePushService.query_uid_push(uid, event_type)
@@ -83,7 +83,7 @@ class NotificationView(View):
                     return JsonResponse(status=200, data={'code': 176, 'msg': 'no uid_push data'})
                 # 修改redis数据,并设置过期时间为10分钟
                 uid_push_list = DevicePushService.qs_to_list(uid_push_qs)
-                redisObj.set_data(key=ykey, val=str(uid_push_list), expire=600)
+                redis_obj.set_data(key=ykey, val=str(uid_push_list), expire=600)
                 if not uid_push_list:
                     res_data = {'code': 404, 'msg': 'error !'}
                     return JsonResponse(status=200, data=res_data)
@@ -107,11 +107,11 @@ class NotificationView(View):
                         new_detect_interval = uid_push_list[0]['uid_set__new_detect_interval']
                         detect_interval = new_detect_interval if new_detect_interval > 0 else detect_interval
                         detect_interval = 60 if detect_interval < 60 else detect_interval
-                    redisObj.set_data(key=dkey, val=1, expire=detect_interval - 5)
-                    redisObj.set_data(key=pkey, val=1, expire=60)
+                    redis_obj.set_data(key=dkey, val=1, expire=detect_interval - 5)
+                    redis_obj.set_data(key=pkey, val=1, expire=60)
             # 旧模式并且没有pkey,重新创建一个
             if not detect_group and not have_pkey:
-                redisObj.set_data(key=pkey, val=1, expire=60)
+                redis_obj.set_data(key=pkey, val=1, expire=60)
             auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET)
             bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg')
 
@@ -126,7 +126,7 @@ class NotificationView(View):
                       'is_sys_msg': is_sys_msg, 'channel': channel, 'event_type': event_type, 'n_time': n_time,
                       'electricity': '', 'bucket': bucket, 'app_push': have_dkey, 'storage_location': 1, 'ai_type': 0,
                       'dealings_type': 0, 'detection': 0, 'device_type': 1, 'app_push_config': '',
-                      'uid_set_push_list': uid_push_list}
+                      'uid_set_push_list': uid_push_list, 'redis_obj': redis_obj}
 
             # 异步推送消息和保存数据
             push_thread = threading.Thread(

+ 25 - 22
Controller/DetectControllerV2.py

@@ -6,7 +6,6 @@ from django.http import JsonResponse
 from django.views.generic.base import View
 
 from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_US
-from Object.GlobalThreadPoolObject import GlobalThreadPool
 from Object.RedisObject import RedisObject
 from Service.DevicePushService import DevicePushService
 
@@ -141,17 +140,17 @@ class NotificationV2View(View):
                       'device_type': device_type,
                       'dealings_type': dealings_type, 'detection': detection,
                       'app_push_config': uid_set_push_list[0]['uid_set__msg_notify'],
-                      'uid_set_push_list': uid_set_push_list}
+                      'uid_set_push_list': uid_set_push_list, 'redis_obj': redis_obj}
 
             # 使用全局的线程池提交推送任务
-            thread_pool = GlobalThreadPool()
-            thread_pool.submit(push_and_save_data, **params)
+            # thread_pool = GlobalThreadPool()
+            # thread_pool.submit(push_and_save_data, **params)
 
             # 异步推送消息和保存数据
-            # push_thread = threading.Thread(
-            #     target=push_and_save_data,
-            #     kwargs=params)
-            # push_thread.start()
+            push_thread = threading.Thread(
+                target=push_and_save_data,
+                kwargs=params)
+            push_thread.start()
 
             # 视频通话不返回图片链接
             if event_type == 607:
@@ -185,17 +184,21 @@ class NotificationV2View(View):
 
 def push_and_save_data(**params):
     uid = params['uid']
-    TIME_LOGGER.info('{}开始异步存表和推送'.format(uid))
-
-    # 线程池推送消息
-    thread_pool = GlobalThreadPool()
-    thread_pool.submit(DevicePushService.push_msg, **params)
-
-    # 异步推送消息
-    # push_thread = threading.Thread(
-    #     target=DevicePushService.push_msg,
-    #     kwargs=params)
-    # push_thread.start()
-    # 保存推送数据
-    result = DevicePushService.save_msg_push(**params)
-    TIME_LOGGER.info('{}存表结果:{}'.format(uid, result))
+    try:
+        TIME_LOGGER.info('{}开始异步存表和推送'.format(uid))
+
+        # 线程池推送消息
+        # thread_pool = GlobalThreadPool()
+        # thread_pool.submit(DevicePushService.push_msg, **params)
+
+        # 异步推送消息
+        push_thread = threading.Thread(
+            target=DevicePushService.push_msg,
+            kwargs=params)
+        push_thread.start()
+        # 保存推送数据
+        result = DevicePushService.save_msg_push(**params)
+        TIME_LOGGER.info('{}存表结果:{}'.format(uid, result))
+    except Exception as e:
+        ERROR_INFO_LOGGER.info(
+            'V2推送第一个线程异常{},error_line:{},error_msg:{}'.format(uid, e.__traceback__.tb_lineno, repr(e)))

+ 3 - 2
Object/OCIObjectStorage.py

@@ -59,12 +59,13 @@ class OCIObjectStorage:
                          .format(errLine=e.__traceback__.tb_lineno, errMsg=repr(e)))
             return None
 
-    def get_preauthenticated_request_url(self, bucket_name, name, object_name, time_expires):
+    def get_preauthenticated_request_url(self, bucket_name, name, object_name, time_expires, access_type='ObjectRead'):
         """
         获取指定对象预认证请求URL。
         @param bucket_name: 存储桶名称
         @param name: 请求名称 是创建的预授权链接的名称,是方便管理用的,不会影响功能。比如对每个桶分别创建链接,如果要删除或者查看,可以根据name看出来是对哪个桶的链接。
         @param object_name: 对象名
+        @param access_type: 类型
         @param time_expires: 失效时间 需要datetime类型格式 例如:datetime.utcnow() + timedelta(minutes=30)
         @return: 预认证请求URL
         """
@@ -77,7 +78,7 @@ class OCIObjectStorage:
                 create_preauthenticated_request_details=oci.object_storage.models.CreatePreauthenticatedRequestDetails(
                     name=name,
                     object_name=object_name,
-                    access_type="ObjectRead",
+                    access_type=access_type,
                     time_expires=time_expires
                 )
             )

+ 47 - 31
Service/DevicePushService.py

@@ -28,7 +28,6 @@ from Model.models import UidPushModel, SysMsgModel, DeviceSharePermission, Devic
     DeviceChannelUserPermission, UidSetModel, Device_Info, UserAudioVideoPush, PushLog
 from Object.ETkObject import ETkObject
 from Object.OCIObjectStorage import OCIObjectStorage
-from Object.RedisObject import RedisObject
 from Object.UidTokenObject import UidTokenObject
 from Object.utils import LocalDateTimeUtil
 from Service.CommonService import CommonService
@@ -36,7 +35,7 @@ from Service.EquipmentInfoService import EquipmentInfoService, EQUIPMENT_INFO_DI
 from Service.HuaweiPushService.HuaweiPushService import HuaweiPushObject
 from Service.PushService import PushObject
 from django.db import close_old_connections
-from Object.GlobalThreadPoolObject import GlobalThreadPool
+
 
 LOGGING = logging.getLogger('info')
 TIME_LOGGER = logging.getLogger('time')
@@ -172,13 +171,13 @@ class DevicePushService:
 
             # 低功耗产品推送,休眠702、低电量704提醒,并且detection=0,0标识单事件类型,1标识多事件类型
             is_app_push = True if params['event_type'] in [702, 704] and params['detection'] == 0 else is_app_push
-
+            redis_obj = params['redis_obj']
             # 推送
             if is_app_push:
                 msg_key = 'PUSH:MSG:IMAGE:{}:{}:{}'.format(params['uid'], params['channel'], params['n_time'])
                 d_params = {'is_st': params['is_st'], 'storage_location': params['storage_location'],
                             'event_tag': params['event_tag'], 'event_type': params['event_type']}
-                RedisObject(3).set_data(msg_key, json.dumps(d_params), 60)
+                redis_obj.set_data(msg_key, json.dumps(d_params), 60)
 
                 push_kwargs = params['push_kwargs']
                 for up in params['uid_set_push_list']:
@@ -215,13 +214,14 @@ class DevicePushService:
                     params['lang'] = lang
                     params['tz'] = tz
                     params['push_type'] = push_type
-
-                    GlobalThreadPool().submit(cls.send_app_msg_push, **params)
-                    # push_thread = threading.Thread(
-                    #     target=cls.send_app_msg_push,
-                    #     kwargs=params
-                    # )
-                    # push_thread.start()
+                    params['redis_obj'] = redis_obj
+
+                    # GlobalThreadPool().submit(cls.send_app_msg_push, **params)
+                    push_thread = threading.Thread(
+                        target=cls.send_app_msg_push,
+                        kwargs=params
+                    )
+                    push_thread.start()
         except Exception as e:
             ERROR_INFO_LOGGER.info(
                 '推送消息线程异常,uid:{},error_line:{},error_msg:{}'
@@ -238,7 +238,7 @@ class DevicePushService:
         saved_user_id_list = []
         uid = params['uid']
         now_time = int(time.time())
-        redis_obj = RedisObject()
+        redis_obj = params['redis_obj']
         try:
             params['event_tag'] = cls.get_event_tag(params['ai_type'], params['event_type'], params['detection'])
 
@@ -320,6 +320,7 @@ class DevicePushService:
                                     encode('UTF-8', 'ignore').decode('UTF-8')
                             equipment_info_list.append(equipment_info_model(**equipment_info_data))
                         equipment_info_model.objects.bulk_create(equipment_info_list)
+
             return True
         except Exception as e:
             ERROR_INFO_LOGGER.info(
@@ -367,7 +368,7 @@ class DevicePushService:
             LOGGING.info('算法对照打印:{}'.format(ALGORITHM_COMBO_TYPES))
             return types
         except Exception as e:
-            print('推送错误异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+            LOGGING.info('推送错误异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
             return event_type
 
     @staticmethod
@@ -402,17 +403,18 @@ class DevicePushService:
                 else:
                     key = '{}/{}/{}_0.jpeg'.format(kwargs['uid'], kwargs['channel'], kwargs['n_time'])
                 # 开始异步推送图片
-                thread_pool = GlobalThreadPool()
-                thread_pool.submit(cls.async_send_picture_push, (
-                    push_type, kwargs['aws_s3_client'], kwargs['bucket'], key,
-                    kwargs['uid'], kwargs['appBundleId'], kwargs['token_val'], kwargs['event_type'], kwargs['n_time'],
-                    push_kwargs['msg_title'], push_kwargs['msg_text'], kwargs['channel'], kwargs['storage_location']))
-
-                # push_thread = threading.Thread(target=cls.async_send_picture_push, args=(
+                # thread_pool = GlobalThreadPool()
+                # thread_pool.submit(cls.async_send_picture_push, (
                 #     push_type, kwargs['aws_s3_client'], kwargs['bucket'], key,
                 #     kwargs['uid'], kwargs['appBundleId'], kwargs['token_val'], kwargs['event_type'], kwargs['n_time'],
                 #     push_kwargs['msg_title'], push_kwargs['msg_text'], kwargs['channel'], kwargs['storage_location']))
-                # push_thread.start()
+
+                push_thread = threading.Thread(target=cls.async_send_picture_push, args=(
+                    push_type, kwargs['aws_s3_client'], kwargs['bucket'], key,
+                    kwargs['uid'], kwargs['appBundleId'], kwargs['token_val'], kwargs['event_type'], kwargs['n_time'],
+                    push_kwargs['msg_title'], push_kwargs['msg_text'], kwargs['channel'], kwargs['storage_location'],
+                    kwargs['redis_obj']))
+                push_thread.start()
 
                 push_result = True
 
@@ -736,8 +738,8 @@ class DevicePushService:
             return response.json()
 
     @classmethod
-    def async_send_picture_push(cls, push_type, aws_s3_client, bucket, key, uid, appBundleId,
-                                token_val, event_type, n_time, msg_title, msg_text, channel, storage_reg):
+    def async_send_picture_push(cls, push_type, aws_s3_client, bucket, key, uid, appBundleId, token_val,
+                                event_type, n_time, msg_title, msg_text, channel, storage_reg, redis_obj):
         """
         异步推送图片
         """
@@ -748,7 +750,7 @@ class DevicePushService:
                 oss_img_bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg')
                 image_url = oss_img_bucket.sign_url('GET', key, 300)
             elif storage_reg in [3, 4]:
-                image_url = DevicePushService.oci_object_url(storage_reg, bucket, key)
+                image_url = DevicePushService.oci_object_url(uid, redis_obj, storage_reg, bucket, key)
             elif storage_reg == 5:
                 image_url = DevicePushService.create_obs_signed_url(key, 'GET')
             else:
@@ -767,23 +769,37 @@ class DevicePushService:
                     token_val=token_val, msg_title=msg_title, msg_text=msg_text, uid=uid, event_type=event_type,
                     n_time=n_time, image_url=image_url, channel=channel)
 
-            LOGGING.info('{}推送图片,push_type:{},推送结果:{}'.format(uid, push_type, push_result))
+            TIME_LOGGER.info('{}推送图片,push_type:{},推送结果:{}'.format(uid, push_type, push_result))
         except Exception as e:
-            LOGGING.error('异步推送图片异常,error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+            ERROR_INFO_LOGGER.error('异步推送图片异常,error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
     @staticmethod
-    def oci_object_url(storage_location, bucket, obj_name):
+    def oci_object_url(uid, redis_obj, storage_location, bucket, obj_name):
         """
         获取OCI对象存储URL 有效期5分钟
+        @param uid: uid
+        @param redis_obj: 缓存客户端
         @param storage_location: 存储区域
         @param bucket: 存储桶
         @param obj_name: 对象名称
         @return: url
         """
-        oci = OCIObjectStorage('eur' if storage_location == 4 else 'us')
-        time_expires = datetime.datetime.utcnow() + datetime.timedelta(minutes=60)
-        result = oci.get_preauthenticated_request_url(bucket, 'ociPush', obj_name, time_expires)
-        return result.full_path if result else ''
+        try:
+            uid_key = f'PUSH:PICTURE:OCI:URL:{uid}'
+            oci_url = redis_obj.get_data(uid_key)
+            if oci_url:
+                return oci_url + obj_name
+            oci = OCIObjectStorage('eur' if storage_location == 4 else 'us')
+            prefix_name = f'{uid}/'
+            time_expires = datetime.datetime.utcnow() + datetime.timedelta(minutes=60)
+            result = oci.get_preauthenticated_request_url(bucket, 'ociPush', prefix_name, time_expires,
+                                                          'AnyObjectRead')  # 授权到指定uid文件夹
+            full_url = result.full_path if result else ''
+            redis_obj.set_data(uid_key, full_url, 3580)
+            return full_url + obj_name
+        except Exception as e:
+            LOGGING.error('oci查询消息列表异常error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+            return ''
 
     @staticmethod
     def create_oci_req_url(storage_location, bucket, obj_name, oci=None):