import base64 import json import logging import os import threading import time import numpy as np from PIL import Image from django.views.generic.base import View from AnsjerPush.config import BASE_DIR from Model.models import UidPushModel, AiService, Device_Info, VodHlsTag, VodHlsTagType from Object.AiEngineObject import AiEngine from Object.ETkObject import ETkObject from Object.ResponseObject import ResponseObject from Object.enums.MessageTypeEnum import MessageTypeEnum from Object.utils import LocalDateTimeUtil from Service.CommonService import CommonService from Service.EquipmentInfoService import EquipmentInfoService from Service.HuaweiPushService.HuaweiPushService import HuaweiPushObject from Service.PushService import PushObject LOGGING = logging.getLogger('info') CLOUD_BASED_AI_URL = '34.192.147.108:8001' MODEL_NAME = 'AI_5obj_pdcpv_detect_yolov5_pipeline' # 建立长连接 ai_connect = ''# AiEngine(url=CLOUD_BASED_AI_URL) HEALTH = False # 检查连通性、推理服务器状态 # if ai_connect.health: # HEALTH = True # print('健康状况通过') # # 设定模型 # if ai_connect.set_model(MODEL_NAME): # print('设置模型通过') class AiView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, operation) def validation(self, request_dict, operation): response = ResponseObject() if operation == 'identification': # ai识别推送 return self.identification(request_dict, response) else: return response.json(414) @staticmethod def identification(request_dict, response): """ ai识别推送 @param request_dict: 请求数据 @request_dict etk: uid token @request_dict n_time: 设备的当前时间 @request_dict channel: 通道 @request_dict fileOne: 图片一 @request_dict fileTwo: 图片二 @request_dict fileThree: 图片三 @param response: 响应 @return: response """ etk = request_dict.get('etk', None) n_time = request_dict.get('n_time', None) channel = request_dict.get('channel', '1') file_one = request_dict.get('fileOne', None) file_two = request_dict.get('fileTwo', None) file_three = request_dict.get('fileThree', None) if not all([etk, n_time]): return response.json(444) # 解密etk并判断uid长度 eto = ETkObject(etk) uid = eto.uid LOGGING.info('---进入ai识别推送接口--- etk:{}, uid:{}'.format(etk, uid)) receive_time = int(time.time()) file_list = [file_one, file_two, file_three] # 查询设备是否有使用中的ai服务 ai_service_qs = AiService.objects.filter(uid=uid, detect_status=1, use_status=1, endTime__gt=receive_time). \ values('detect_group') if not ai_service_qs.exists(): return response.json(173) # 查询推送相关数据 uid_push_qs = UidPushModel.objects.filter(uid_set__uid=uid). \ values('push_type', 'appBundleId', 'token_val', 'lang', 'tz', 'userID_id') if not uid_push_qs.exists(): return response.json(173) # 查询设备数据 device_info_qs = Device_Info.objects.filter(UID=uid).first() nickname = uid if device_info_qs is None else device_info_qs.NickName now_time = int(time.time()) try: dir_path = os.path.join(BASE_DIR, 'static/ai/' + uid + '/' + str(n_time)) if not os.path.exists(dir_path): os.makedirs(dir_path) file_path_list = [] for i, val in enumerate(file_list): val = val.replace(' ', '+') val = base64.b64decode(val) file_path = "{dir_path}/{n_time}_{i}.jpg".format(dir_path=dir_path, n_time=n_time, i=i) file_path_list.append(file_path) with open(file_path, 'wb') as f: f.write(val) f.close() ai_view = AiView() ai_results = ai_view.image_aI_recognition(file_path_list, 0.45, 0.55, 1) if not ai_results: CommonService.del_path(dir_path) return response.json(0) event_type = ai_view.get_cloud_recognition_tag(ai_results) if event_type == 0: CommonService.del_path(dir_path) return response.json(0) new_bounding_box_dict = ai_view.get_pic_coordinates(ai_results) # 上传缩略图到s3 file_dict = {} for i, val in enumerate(file_path_list): # 封面图 file_dict[val] = '{}/{}/{}_{}.jpeg'.format(uid, channel, n_time, i) upload_images_thread = threading.Thread(target=CommonService.upload_images, args=(file_dict, dir_path)) upload_images_thread.start() # 存储消息以及推送 uid_push_list = [uid_push for uid_push in uid_push_qs] eq_list = [] user_id_list = [] local_date_time = '' lang = uid_push_list[0]['lang'] label_str = ai_view.get_tag_message(lang, event_type) for up in uid_push_list: # 保存推送数据 tz = up['tz'] if tz is None or tz == '': tz = 0 local_date_time = CommonService.get_now_time_str(n_time=n_time, tz=tz, lang='cn')[:10] user_id = up['userID_id'] if user_id not in user_id_list: eq_list.append(EquipmentInfoService.get_equipment_info_obj( local_date_time, device_user_id=user_id, event_time=n_time, event_type=event_type, device_uid=uid, device_nick_name=nickname, channel=channel, alarm=label_str, is_st=3, receive_time=receive_time, add_time=now_time, storage_location=2, border_coords=new_bounding_box_dict )) user_id_list.append(user_id) # 推送 push_type = up['push_type'] app_bundle_id = up['appBundleId'] token_val = up['token_val'] lang = up['lang'] # 推送标题和推送内容 msg_title = PushObject.get_msg_title(nickname=nickname) msg_text = PushObject.get_ai_msg_text(channel=channel, n_time=n_time, lang=lang, tz=tz, label=label_str) kwargs = { 'nickname': nickname, 'app_bundle_id': app_bundle_id, 'token_val': token_val, 'n_time': n_time, 'event_type': event_type, 'msg_title': msg_title, 'msg_text': msg_text, 'uid': uid, 'channel': channel, } try: # 推送消息 if push_type == 0: # ios apns PushObject.ios_apns_push(**kwargs) elif push_type == 1: # android gcm PushObject.android_fcm_push(**kwargs) elif push_type == 2: # android jpush kwargs.pop('uid') kwargs.pop('channel') PushObject.android_jpush(**kwargs) elif push_type == 3: huawei_push_object = HuaweiPushObject() huawei_push_object.send_push_notify_message(**kwargs) elif push_type == 4: # android 小米推送 PushObject.android_xmpush(**kwargs) elif push_type == 5: # android vivo推送 PushObject.android_vivopush(**kwargs) elif push_type == 6: # android oppo推送 PushObject.android_oppopush(**kwargs) elif push_type == 7: # android 魅族推送 PushObject.android_meizupush(**kwargs) except Exception as e: LOGGING.info('ai推送消息异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) continue AiView.save_cloud_ai_tag(uid, int(n_time), event_type) week = LocalDateTimeUtil.date_to_week(local_date_time) EquipmentInfoService.equipment_info_bulk_create(week, eq_list) return response.json(0) except Exception as e: LOGGING.info('---ai识别推送异常---:{}'.format(repr(e))) data = { 'errLine': e.__traceback__.tb_lineno, 'errMsg': repr(e) } return response.json(48, data) @classmethod def image_aI_recognition(cls, input_name_arr, nms_threshold, confidence, client_timeout): """ 自有图片云模型识别 :param input_name_arr: 推理图片地址名 :param nms_threshold: nms置信度 :param confidence: 目标置信度(一般只用调整这个) :param client_timeout: 超时时间(秒为单位) :return: results 推理结果 """ try: if not HEALTH: LOGGING.info('AI health:{}'.format(HEALTH)) return {} # 推理张数(一次最多推理128张!) # 图片名称(这里可以改成内存)注意改完之后要检查input_tmp的【类型(type)、尺寸(shape)】是否和之前的一致 # 输入尺寸固定640wx360h,如需变动可以联系我们,我们这边做resize会快 input_name_arr = np.array(list(map(np.array, map(Image.open, input_name_arr)))) # 推理 results = ai_connect.yolo_infer(input_name_arr, nms_threshold, confidence, client_timeout) # 报错返回 if results == 'e_timeout': raise Exception('推理超时') elif results == 'e_no_model': raise Exception('没有设置模型') LOGGING.info('云上模型推理结果:{}'.format(results)) return results except Exception as e: LOGGING.info('云模型AI识别失败,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return {} @classmethod def get_cloud_recognition_tag(cls, results): """ 根据推理结果 返回组合类型,0则推理失败,1:人,2:车,3:宠物,4:包裹 """ if not results: return 0 # 返回字典,可以复制到绘制python脚本中查看结果 tag_list = [] event_dict = {1: 1, 0: 2, 3: 2, 2: 3, 4: 4} for k, v in results.items(): if len(v) > 0: for item in v: tag_list.append(event_dict.get(item['classID'], 0)) if tag_list: tag_list = set(tag_list) event_type = '' for val in tag_list: event_type += str(val) return int(event_type) else: return 0 @classmethod def get_pic_coordinates(cls, results): """ 获取识别图片坐标 """ try: ai_dict = {} for i in range(3): ai_dict['file_' + str(i)] = results['pic_' + str(i)] return json.dumps(ai_dict) except Exception as e: LOGGING.info('AI推理结果解析异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return '' @staticmethod def get_tag_message(lang, event_type): event_type = str(event_type) types = [] if len(event_type) > 1: for i in range(1, len(event_type) + 1): types.append(MessageTypeEnum(int(event_type[i - 1:i]))) else: types.append(int(event_type)) msg_cn = {1: '人', 2: '动物', 3: '车', 4: '包裹'} msg_en = {1: 'person', 2: 'animal', 3: 'vehicle', 4: 'package'} msg_text = '' for item in types: if lang == 'cn': msg_text += msg_cn.get(item) + ' ' else: msg_text += msg_en.get(item) + ' ' return msg_text @classmethod def save_cloud_ai_tag(cls, uid, event_time, types): """ 保存云存AI标签 """ try: types = str(types) if not types: return False n_time = int(time.time()) vod_hls_tag = {"uid": uid, "ai_event_time": event_time, "created_time": n_time} vod_tag_vo = VodHlsTag.objects.create(**vod_hls_tag) tag_list = [] if len(types) > 1: for i in range(1, len(types) + 1): ai_type = MessageTypeEnum(int(types[i - 1:i])) vod_tag_type_vo = VodHlsTagType(tag_id=vod_tag_vo.id, created_time=n_time, type=ai_type.value) tag_list.append(vod_tag_type_vo) else: ai_type = MessageTypeEnum(int(types)) vod_tag_type_vo = {"tag_id": vod_tag_vo.id, "created_time": n_time, "type": ai_type.value} VodHlsTagType.objects.create(**vod_tag_type_vo) if tag_list: VodHlsTagType.objects.bulk_create(tag_list) return True except Exception as e: print('AI标签存储异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return False