Pārlūkot izejas kodu

优化AI识别消息推送、增加APP提醒间隔、取消dynamoDB数据存储

zhangdongming 1 gadu atpakaļ
vecāks
revīzija
1d81b84745
2 mainītis faili ar 185 papildinājumiem un 92 dzēšanām
  1. 178 85
      Controller/AiController.py
  2. 7 7
      Object/SageMakerAiObject.py

+ 178 - 85
Controller/AiController.py

@@ -33,7 +33,10 @@ from Object.SageMakerAiObject import SageMakerAiObject
 from Object.TokenObject import TokenObject
 from Object.enums.MessageTypeEnum import MessageTypeEnum
 from Service.CommonService import CommonService
+from Service.DevicePushService import DevicePushService
 from Service.EquipmentInfoService import EquipmentInfoService
+from Object.RedisObject import RedisObject
+import threading
 
 TIME_LOGGER = logging.getLogger('time')
 
@@ -112,12 +115,16 @@ class AiView(View):
                 values('token_val', 'app_type', 'appBundleId', 'm_code', 'push_type', 'userID_id',
                        'userID__NickName',
                        'lang', 'm_code', 'tz', 'uid_set__nickname', 'uid_set__detect_interval',
-                       'uid_set__detect_group',
-                       'uid_set__channel')
+                       'uid_set__detect_group', 'uid_set__new_detect_interval',
+                       'uid_set__channel', 'uid_set__msg_notify')
             if not uid_push_qs.exists():
                 TIME_LOGGER.info(f'uid={uid},用户没有开启AI推送')
                 return response.json(173)
 
+            redis_obj = RedisObject(db=6)
+            APP_NOTIFY_KEY = f'ASJ:NOTIFY:PUSH:{uid}'  # 推送间隔缓存KEY
+            push_cache_data = redis_obj.get_data(APP_NOTIFY_KEY)
+            is_push = False if push_cache_data else True
             ai_server = 'sageMaker'
             if AiServiceQuery[0]['orders__payType'] == 10:  # AI首次体验前半个月调Rekognition
                 now_time = int(time.time())
@@ -125,17 +132,72 @@ class AiView(View):
                 if (now_time - add_time) <= (3600 * 24 * 3):
                     ai_server = 'rekognition'
 
+            self.add_push_cache(APP_NOTIFY_KEY, redis_obj, push_cache_data,
+                                uid_push_qs[0]['uid_set__new_detect_interval'])
+
+            push_thread = threading.Thread(target=self.image_label_detection,
+                                           kwargs={'ai_server': ai_server, 'uid': uid, 'file_list': file_list,
+                                                   'detect_group': detect_group, 'n_time': n_time,
+                                                   'uid_push_qs': uid_push_qs,
+                                                   'channel': channel, 'is_push': is_push})
+            push_thread.start()  # AI识别异步存表&推送
+
+            return response.json(0)
+        except Exception as e:
+            print(e)
+            data = {
+                'errLine': e.__traceback__.tb_lineno,
+                'errMsg': repr(e)
+            }
+            TIME_LOGGER.info(f'rekognition识别errMsg={data}')
+            return response.json(48, data)
+
+    def add_push_cache(self, key, redis_obj, cache_push_data, push_interval):
+        """
+        推送间隔缓存设置
+        """
+        if push_interval > 0:
+            if cache_push_data:  # 缓存存在
+                interval = json.loads(cache_push_data)['interval']
+                if interval != push_interval:
+                    push_data = {'interval': push_interval}
+                    redis_obj.set_data(key=key, val=json.dumps(push_data), expire=push_interval)
+            else:  # 缓存不存在
+                push_data = {'interval': push_interval}
+                redis_obj.set_data(key=key, val=json.dumps(push_data), expire=push_interval)
+
+    def image_label_detection(self, ai_server, uid, file_list, detect_group,
+                              n_time, uid_push_qs, channel, is_push):
+        """
+        :param ai_server: AI服务类型
+        :param uid: 用户uid
+        :param file_list: 图片base64列表
+        :param detect_group: 识别组
+        :param n_time: 时间戳
+        :param uid_push_qs: 推送数据
+        :param channel: 推送通道
+        :param is_push: 是否APP提醒推送
+        :return:
+        """
+        try:
+            start_time = time.time()
+            notify_data = uid_push_qs[0]['uid_set__msg_notify']
+
+            # APP推送提醒状态
+            notify = self.is_ai_push(uid, notify_data) if is_push else is_push
+
             if ai_server == 'sageMaker':  # 自建模型sageMaker AI
                 sage_maker = SageMakerAiObject()
                 ai_result = sage_maker.sage_maker_ai_server(uid, file_list)  # 图片base64识别AI标签
                 if ai_result:
-                    res = sage_maker.get_table_name(uid, ai_result, AiServiceQuery[0]['detect_group'])
-                    if not res:
-                        return response.json(0)
-                    sage_maker.save_push_message(uid, n_time, uid_push_qs, channel, res, file_list)
-                    return response.json(0)
-
-            TIME_LOGGER.info(f'*****执行Reko,uid={uid} run {ai_server}')
+                    res = sage_maker.get_table_name(uid, ai_result, detect_group)
+                    if not res:  # 当前识别结果未匹配
+                        return False
+                    # 保存推送消息
+                    sage_maker.save_push_message(uid, n_time, uid_push_qs, channel, res, file_list, notify)
+                    return True
+
+            TIME_LOGGER.info(f'*****现执行Reko,uid={uid}识别类型={ai_server}')
             dir_path = os.path.join(BASE_DIR, 'static/ai/' + uid + '/' + str(n_time))
             if not os.path.exists(dir_path):
                 os.makedirs(dir_path)
@@ -153,7 +215,7 @@ class AiView(View):
             image_size = MergePic.merge_images(dir_path, image_size, image_colnum)
             photo = open(dir_path + '.jpg', 'rb')  # 打开合成图
 
-            # 识别合成图片
+            # rekognition识别合成图片
             maxLabels = 50  # 最大标签
             minConfidence = 80  # 置信度
 
@@ -169,18 +231,17 @@ class AiView(View):
                 MinConfidence=minConfidence)
             photo.close()
             if rekognition_res['ResponseMetadata']['HTTPStatusCode'] != 200:
-                return response.json(173)
+                return False
+            end_time = time.time()
 
             labels = self.labelsCoords(detect_group, rekognition_res, image_size)  # 检查标签是否符合用户选择的识别类型
-            TIME_LOGGER.info('*****执行完Rekognition得到labels')
+            TIME_LOGGER.info(f'uid={uid},{(end_time - start_time)}s,rekognition Result={labels}')
 
             # 将识别结果存到S3以及DynamoDB
-            AiView.store_image_results_to_dynamo_and_s3(file_path_list, uid, channel, n_time, labels, rekognition_res)
+            # AiView.store_image_results_to_dynamo_and_s3(file_path_list, uid, channel, n_time, labels, rekognition_res)
             eventType = labels['eventType']
             label_str = ','.join(labels['label_list'])
             new_bounding_box_dict = labels['new_bounding_box_dict']
-            TIME_LOGGER.info(eventType)
-            TIME_LOGGER.info(label_str)
 
             # 上传缩略图到s3
             file_dict = {}
@@ -188,84 +249,116 @@ class AiView(View):
                 file_dict[val] = "{uid}/{channel}/{n_time}_{i}.jpeg".format(uid=uid, channel=channel,  # 封面图
                                                                             n_time=n_time, i=i)
             self.upload_s3(file_dict, dir_path)
-            # 存储消息以及推送
-
-            uid_push_list = []
-            for qs in uid_push_qs:
-                uid_push_list.append(qs)
-
-            nickname = uid_push_list[0]['uid_set__nickname']
-            if not nickname:
-                nickname = uid
-
-            userID_ids = []
-            for up in uid_push_list:
-                push_type = up['push_type']
-                appBundleId = up['appBundleId']
-                token_val = up['token_val']
-                lang = up['lang']
-                tz = up['tz']
-                if tz is None or tz == '':
-                    tz = 0
-                local_date_time = CommonService.get_now_time_str(n_time=n_time, tz=tz, lang='cn')
-                TIME_LOGGER.info('*****AI消息存库{},{},{}'.format(uid, local_date_time, tz))
-                # 以下是存库
-                userID_id = up["userID_id"]
-                if userID_id not in userID_ids:
-                    now_time = int(time.time())
-                    EquipmentInfoService.randoms_insert_equipment_info(
-                        device_user_id=userID_id,
-                        event_time=n_time,
-                        event_type=eventType,
-                        device_uid=uid,
-                        device_nick_name=nickname,
-                        channel=channel,
-                        alarm=label_str,
-                        is_st=3,
-                        add_time=now_time,
-                        storage_location=2,
-                        border_coords=json.dumps(new_bounding_box_dict)
-                    )
-                    userID_ids.append(userID_id)
-
-                # 推送标题
-                msg_title = self.get_msg_title(appBundleId=appBundleId, nickname=nickname)
-                # 推送内容
-                msg_text = self.get_msg_text(channel=channel, n_time=n_time, lang=lang, tz=tz, label_list=label_str)
-                kwargs = {
-                    'uid': uid,
-                    'channel': channel,
-                    'event_type': eventType,
-                    'n_time': n_time,
-                    'appBundleId': appBundleId,
-                    'token_val': token_val,
-                    'msg_title': msg_title,
-                    'msg_text': msg_text,
-                }
-                try:
-                    # 推送消息
-                    if push_type == 0:  # ios apns
-                        self.do_apns(**kwargs)
-                    elif push_type == 1:  # android gcm
-                        self.do_fcm(**kwargs)
-                    elif push_type == 2:  # android jpush
-                        self.do_jpush(**kwargs)
-                except Exception as e:
-                    TIME_LOGGER.info(
-                        '*****error,uid={uid},errLine={errLine}, errMsg={errMsg}'
-                            .format(uid=uid, errLine=e.__traceback__.tb_lineno, errMsg=repr(e)))
-                    continue
+
+            self.save_message_and_push(eventType, uid, n_time, uid_push_qs, channel,
+                                       label_str, new_bounding_box_dict, notify)
 
             AiView.save_cloud_ai_tag(uid, int(n_time), eventType, 0)
-            return response.json(0)
         except Exception as e:
-            print(e)
             data = {
                 'errLine': e.__traceback__.tb_lineno,
                 'errMsg': repr(e)
             }
             TIME_LOGGER.info(f'rekognition识别errMsg={data}')
-            return response.json(48, data)
+
+    def save_message_and_push(self, eventType, uid, n_time, uid_push_qs, channel, label_str, new_bounding_box_dict,
+                              notify):
+        """
+        保存消息以及推送
+        """
+        uid_push_list = []
+        for qs in uid_push_qs:
+            uid_push_list.append(qs)
+
+        nickname = uid_push_list[0]['uid_set__nickname']
+        if not nickname:
+            nickname = uid
+
+        userID_ids = []
+        for up in uid_push_list:
+            push_type = up['push_type']
+            appBundleId = up['appBundleId']
+            token_val = up['token_val']
+            lang = up['lang']
+            tz = up['tz']
+            if tz is None or tz == '':
+                tz = 0
+            local_date_time = CommonService.get_now_time_str(n_time=n_time, tz=tz, lang='cn')
+            TIME_LOGGER.info('*****AI消息存库{},{},{}'.format(uid, local_date_time, tz))
+            # 以下是存库
+            userID_id = up["userID_id"]
+            if userID_id not in userID_ids:
+                now_time = int(time.time())
+                EquipmentInfoService.randoms_insert_equipment_info(
+                    device_user_id=userID_id,
+                    event_time=n_time,
+                    event_type=eventType,
+                    device_uid=uid,
+                    device_nick_name=nickname,
+                    channel=channel,
+                    alarm=label_str,
+                    is_st=3,
+                    add_time=now_time,
+                    storage_location=2,
+                    border_coords=json.dumps(new_bounding_box_dict)
+                )
+                userID_ids.append(userID_id)
+
+            if not notify:  # 不推送
+                continue
+            # 推送标题
+            msg_title = self.get_msg_title(appBundleId=appBundleId, nickname=nickname)
+            # 推送内容
+            msg_text = self.get_msg_text(channel=channel, n_time=n_time, lang=lang, tz=tz, label_list=label_str)
+            kwargs = {
+                'uid': uid,
+                'channel': channel,
+                'event_type': eventType,
+                'n_time': n_time,
+                'appBundleId': appBundleId,
+                'token_val': token_val,
+                'msg_title': msg_title,
+                'msg_text': msg_text,
+            }
+            try:
+                # 推送消息
+                if push_type == 0:  # ios apns
+                    self.do_apns(**kwargs)
+                elif push_type == 1:  # android gcm
+                    self.do_fcm(**kwargs)
+                elif push_type == 2:  # android jpush
+                    self.do_jpush(**kwargs)
+            except Exception as e:
+                TIME_LOGGER.info('*****error,uid={uid},errLine={errLine}, errMsg={errMsg}'
+                                 .format(uid=uid, errLine=e.__traceback__.tb_lineno, errMsg=repr(e)))
+                continue
+
+    def is_ai_push(self, uid, app_push_config):
+        """
+        是否进行APP消息提醒
+        @return: True|False
+        """
+        try:
+            if not app_push_config:
+                return True
+
+            is_push = app_push_config['appPush']
+            if is_push != 1:  # 1:进行APP提醒,其它则不执行APP提醒
+                return False
+
+            all_day = app_push_config['pushTime']['allDay']
+
+            if all_day == 0:  # 1:全天提醒,0:自定义时间提醒
+                push_time_config = app_push_config['pushTime']
+                # 计算当前时间是否在自定义消息提醒范围内
+                if not DevicePushService.is_push_notify_allowed_now(push_time_config):
+                    return False
+            # 在开启接收APP消息提醒时,判断是否勾选云端AI消息提醒
+            return app_push_config['eventTypes']['aiCloud'] == 1
+        except Exception as e:
+            TIME_LOGGER.info('*****error,uid={uid},errLine={errLine}, errMsg={errMsg}'
+                             .format(uid=uid, errLine=e.__traceback__.tb_lineno, errMsg=repr(e)))
+            return True
 
     def del_path(self, path):
         try:

+ 7 - 7
Object/SageMakerAiObject.py

@@ -37,6 +37,7 @@ class SageMakerAiObject:
     def sage_maker_ai_server(uid, base64_list):
 
         try:
+            start = time.time()
             url = 'ec2-34-192-147-108.compute-1.amazonaws.com:8001'
             model_name = 'pdchv'
 
@@ -62,11 +63,10 @@ class SageMakerAiObject:
             confidence = 0.5
             client_timeout = 3
 
-            start = time.time()
             results = ai.yolo_infer(img_list, nms_threshold, confidence, client_timeout)
 
             end = time.time()
-            LOGGER.info(f'uid={uid},{(end - start) * 1000}ms,sageMaker Result={results}')
+            LOGGER.info(f'uid={uid},{(end - start)}s,sageMaker Result={results}')
 
             ai.close()
 
@@ -143,7 +143,7 @@ class SageMakerAiObject:
             return False
 
     @staticmethod
-    def save_push_message(uid, d_push_time, uid_push_qs, channel, res, file_list):
+    def save_push_message(uid, d_push_time, uid_push_qs, channel, res, file_list, notify):
         """
         保存推送消息
         """
@@ -216,10 +216,10 @@ class SageMakerAiObject:
                 equipment_info_model.objects.bulk_create(equipment_info_list)
 
             SageMakerAiObject().upload_image_to_s3(uid, channel, d_push_time, file_list)  # 上传到云端
-
-            push_thread = threading.Thread(target=SageMakerAiObject.async_app_msg_push,
-                                           kwargs={'uid': uid, 'push_msg_list': push_msg_list})
-            push_thread.start()  # APP消息提醒异步推送
+            if notify:  # 异步APP消息通知
+                push_thread = threading.Thread(target=SageMakerAiObject.async_app_msg_push,
+                                               kwargs={'uid': uid, 'push_msg_list': push_msg_list})
+                push_thread.start()  # APP消息提醒异步推送
 
             AiController.AiView().save_cloud_ai_tag(uid, d_push_time, event_type, 0)  # 关联AI标签
             return True