123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334 |
- import base64
- import json
- import logging
- import os
- import threading
- import time
- import numpy as np
- from PIL import Image
- from django.views.generic.base import View
- from AnsjerPush.config import BASE_DIR
- from Model.models import UidPushModel, AiService, Device_Info, VodHlsTag, VodHlsTagType
- from Object.AiEngineObject import AiEngine
- from Object.ETkObject import ETkObject
- from Object.ResponseObject import ResponseObject
- from Object.enums.MessageTypeEnum import MessageTypeEnum
- from Object.utils import LocalDateTimeUtil
- from Service.CommonService import CommonService
- from Service.EquipmentInfoService import EquipmentInfoService
- from Service.PushService import PushObject
- LOGGING = logging.getLogger('info')
- CLOUD_BASED_AI_URL = '34.192.147.108:8001'
- MODEL_NAME = 'AI_5obj_pdcpv_detect_yolov5_pipeline'
- class AiView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, operation)
- def validation(self, request_dict, operation):
- response = ResponseObject()
- if operation == 'identification': # ai识别推送
- return self.identification(request_dict, response)
- else:
- return response.json(414)
- @staticmethod
- def identification(request_dict, response):
- """
- ai识别推送
- @param request_dict: 请求数据
- @request_dict etk: uid token
- @request_dict n_time: 设备的当前时间
- @request_dict channel: 通道
- @request_dict fileOne: 图片一
- @request_dict fileTwo: 图片二
- @request_dict fileThree: 图片三
- @param response: 响应
- @return: response
- """
- etk = request_dict.get('etk', None)
- n_time = request_dict.get('n_time', None)
- channel = request_dict.get('channel', '1')
- file_one = request_dict.get('fileOne', None)
- file_two = request_dict.get('fileTwo', None)
- file_three = request_dict.get('fileThree', None)
- if not all([etk, n_time]):
- return response.json(444)
- # 解密etk并判断uid长度
- eto = ETkObject(etk)
- uid = eto.uid
- LOGGING.info('---进入ai识别推送接口--- etk:{}, uid:{}'.format(etk, uid))
- receive_time = int(time.time())
- file_list = [file_one, file_two, file_three]
- # 查询设备是否有使用中的ai服务
- ai_service_qs = AiService.objects.filter(uid=uid, detect_status=1, use_status=1, endTime__gt=receive_time). \
- values('detect_group')
- if not ai_service_qs.exists():
- return response.json(173)
- # 查询推送相关数据
- uid_push_qs = UidPushModel.objects.filter(uid_set__uid=uid). \
- values('push_type', 'appBundleId', 'token_val', 'lang', 'tz', 'userID_id')
- if not uid_push_qs.exists():
- return response.json(173)
- # 查询设备数据
- device_info_qs = Device_Info.objects.filter(UID=uid).first()
- nickname = uid if device_info_qs is None else device_info_qs.NickName
- now_time = int(time.time())
- try:
- dir_path = os.path.join(BASE_DIR, 'static/ai/' + uid + '/' + str(n_time))
- if not os.path.exists(dir_path):
- os.makedirs(dir_path)
- file_path_list = []
- for i, val in enumerate(file_list):
- val = val.replace(' ', '+')
- val = base64.b64decode(val)
- file_path = "{dir_path}/{n_time}_{i}.jpg".format(dir_path=dir_path, n_time=n_time, i=i)
- file_path_list.append(file_path)
- with open(file_path, 'wb') as f:
- f.write(val)
- f.close()
- ai_view = AiView()
- ai_results = ai_view.image_aI_recognition(file_path_list, 0.45, 0.45, 1)
- if not ai_results:
- CommonService.del_path(dir_path)
- return response.json(0)
- event_type = ai_view.get_cloud_recognition_tag(ai_results)
- if event_type == 0:
- CommonService.del_path(dir_path)
- return response.json(0)
- new_bounding_box_dict = ai_view.get_pic_coordinates(ai_results)
- # 上传缩略图到s3
- file_dict = {}
- for i, val in enumerate(file_path_list):
- # 封面图
- file_dict[val] = '{}/{}/{}_{}.jpeg'.format(uid, channel, n_time, i)
- upload_images_thread = threading.Thread(target=CommonService.upload_images, args=(file_dict, dir_path))
- upload_images_thread.start()
- # 存储消息以及推送
- uid_push_list = [uid_push for uid_push in uid_push_qs]
- eq_list = []
- user_id_list = []
- local_date_time = ''
- lang = uid_push_list[0]['lang']
- label_str = ai_view.get_tag_message(lang, event_type)
- for up in uid_push_list:
- # 保存推送数据
- tz = up['tz']
- if tz is None or tz == '':
- tz = 0
- local_date_time = CommonService.get_now_time_str(n_time=n_time, tz=tz, lang='cn')[:10]
- user_id = up['userID_id']
- if user_id not in user_id_list:
- eq_list.append(EquipmentInfoService.get_equipment_info_obj(
- local_date_time,
- device_user_id=user_id,
- event_time=n_time,
- event_type=event_type,
- device_uid=uid,
- device_nick_name=nickname,
- channel=channel,
- alarm=label_str,
- is_st=3,
- receive_time=receive_time,
- add_time=now_time,
- storage_location=2,
- border_coords=new_bounding_box_dict
- ))
- user_id_list.append(user_id)
- # 推送
- push_type = up['push_type']
- app_bundle_id = up['appBundleId']
- token_val = up['token_val']
- lang = up['lang']
- # 推送标题和推送内容
- msg_title = PushObject.get_msg_title(nickname=nickname)
- msg_text = PushObject.get_ai_msg_text(channel=channel, n_time=n_time, lang=lang, tz=tz, label=label_str)
- kwargs = {
- 'nickname': nickname,
- 'app_bundle_id': app_bundle_id,
- 'token_val': token_val,
- 'n_time': n_time,
- 'event_type': event_type,
- 'msg_title': msg_title,
- 'msg_text': msg_text,
- 'uid': uid,
- 'channel': channel,
- }
- try:
- # 推送消息
- if push_type == 0: # ios apns
- PushObject.ios_apns_push(**kwargs)
- elif push_type == 1: # android gcm
- PushObject.android_fcm_push(**kwargs)
- elif push_type == 2: # android jpush
- kwargs.pop('uid')
- kwargs.pop('channel')
- PushObject.android_jpush(**kwargs)
- elif push_type == 4: # android 小米推送
- PushObject.android_xmpush(**kwargs)
- elif push_type == 5: # android vivo推送
- PushObject.android_vivopush(**kwargs)
- elif push_type == 6: # android oppo推送
- PushObject.android_oppopush(**kwargs)
- elif push_type == 7: # android 魅族推送
- PushObject.android_meizupush(**kwargs)
- except Exception as e:
- LOGGING.info('ai推送消息异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- continue
- AiView.save_cloud_ai_tag(uid, int(n_time), event_type)
- week = LocalDateTimeUtil.date_to_week(local_date_time)
- EquipmentInfoService.equipment_info_bulk_create(week, eq_list)
- return response.json(0)
- except Exception as e:
- LOGGING.info('---ai识别推送异常---:{}'.format(repr(e)))
- data = {
- 'errLine': e.__traceback__.tb_lineno,
- 'errMsg': repr(e)
- }
- return response.json(48, data)
- @classmethod
- def image_aI_recognition(cls, input_name_arr, nms_threshold, confidence, client_timeout):
- """
- 自有图片云模型识别
- :param input_name_arr: 推理图片地址名
- :param nms_threshold: nms置信度
- :param confidence: 目标置信度(一般只用调整这个)
- :param client_timeout: 超时时间(秒为单位)
- :return: results 推理结果
- """
- try:
- t = time.time()
- # 建立长连接
- ai = AiEngine(url=CLOUD_BASED_AI_URL)
- # 检查连通性、推理服务器状态
- if ai.health:
- LOGGING.info('健康状况通过')
- # 设定模型
- if ai.set_model(MODEL_NAME):
- LOGGING.info('设置模型通过')
- # 推理张数(一次最多推理128张!)
- # 图片名称(这里可以改成内存)注意改完之后要检查input_tmp的【类型(type)、尺寸(shape)】是否和之前的一致
- # 输入尺寸固定640wx360h,如需变动可以联系我们,我们这边做resize会快
- input_name_arr = np.array(list(map(np.array, map(Image.open, input_name_arr))))
- # 推理
- results = ai.yolo_infer(input_name_arr, nms_threshold, confidence, client_timeout)
- # 推理完请关闭长连接
- ai.close()
- LOGGING.info(f'coast:{time.time() - t:.4f}s')
- # 报错返回
- if results == 'e_timeout':
- raise Exception('推理超时')
- elif results == 'e_no_model':
- raise Exception('没有设置模型')
- LOGGING.info('云上模型推理结果:{}'.format(results))
- return results
- except Exception as e:
- LOGGING.info('云模型AI识别失败,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- return {}
- @classmethod
- def get_cloud_recognition_tag(cls, results):
- """
- 根据推理结果
- 返回组合类型,0则推理失败,1:人,2:车,3:宠物,4:包裹
- """
- if not results:
- return 0
- # 返回字典,可以复制到绘制python脚本中查看结果
- tag_list = []
- event_dict = {1: 1, 0: 2, 3: 2, 2: 3, 4: 4}
- for k, v in results.items():
- if len(v) > 0:
- for item in v:
- tag_list.append(event_dict.get(item['classID'], 0))
- if tag_list:
- tag_list = set(tag_list)
- event_type = ''
- for val in tag_list:
- event_type += str(val)
- return int(event_type)
- else:
- return 0
- @classmethod
- def get_pic_coordinates(cls, results):
- """
- 获取识别图片坐标
- """
- try:
- ai_dict = {}
- for i in range(3):
- ai_dict['file_' + str(i)] = results['pic_' + str(i)]
- return json.dumps(ai_dict)
- except Exception as e:
- LOGGING.info('AI推理结果解析异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- return ''
- @staticmethod
- def get_tag_message(lang, event_type):
- event_type = str(event_type)
- types = []
- if len(event_type) > 1:
- for i in range(1, len(event_type) + 1):
- types.append(MessageTypeEnum(int(event_type[i - 1:i])))
- else:
- types.append(int(event_type))
- msg_cn = {1: '人', 2: '动物', 3: '车', 4: '包裹'}
- msg_en = {1: 'person', 2: 'animal', 3: 'vehicle', 4: 'package'}
- msg_text = ''
- for item in types:
- if lang == 'cn':
- msg_text += msg_cn.get(item) + ' '
- else:
- msg_text += msg_en.get(item) + ' '
- return msg_text
- @classmethod
- def save_cloud_ai_tag(cls, uid, event_time, types):
- """
- 保存云存AI标签
- """
- try:
- types = str(types)
- if not types:
- return False
- n_time = int(time.time())
- vod_hls_tag = {"uid": uid, "ai_event_time": event_time, "created_time": n_time}
- vod_tag_vo = VodHlsTag.objects.create(**vod_hls_tag)
- tag_list = []
- if len(types) > 1:
- for i in range(1, len(types) + 1):
- ai_type = MessageTypeEnum(int(types[i - 1:i]))
- vod_tag_type_vo = VodHlsTagType(tag_id=vod_tag_vo.id, created_time=n_time, type=ai_type.value)
- tag_list.append(vod_tag_type_vo)
- else:
- ai_type = MessageTypeEnum(int(types))
- vod_tag_type_vo = {"tag_id": vod_tag_vo.id, "created_time": n_time, "type": ai_type.value}
- VodHlsTagType.objects.create(**vod_tag_type_vo)
- if tag_list:
- VodHlsTagType.objects.bulk_create(tag_list)
- return True
- except Exception as e:
- print('AI标签存储异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- return False
|