import json import logging import threading import oss2 from django.http import JsonResponse from django.views.generic.base import View from AnsjerPush.config import CONFIG_INFO, CONFIG_CN from AnsjerPush.config import OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET from Object.RedisObject import RedisObject from Service.DevicePushService import DevicePushService V1_PUSH_LOGGER = logging.getLogger('v1_push') # 旧移动侦测接口 class NotificationView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' return self.validation(request.GET) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' return self.validation(request.POST) @staticmethod def validation(request_dict): """ 设备触发报警消息推送 @param request_dict:uidToken 加密uid @param request_dict:etk 加密uid @param request_dict:channel 设备通道号 @param request_dict:n_time 设备触发报警时间 @param request_dict:event_type 设备事件类型 @param request_dict:is_st 文件类型(0:无,1:图片,2:视频) """ uidToken = request_dict.get('uidToken', None) etk = request_dict.get('etk', None) channel = request_dict.get('channel', '1') n_time = request_dict.get('n_time', None) event_type = request_dict.get('event_type', None) is_st = request_dict.get('is_st', None) if not all([channel, n_time]): return JsonResponse(status=200, data={'code': 444, 'msg': 'error channel or n_time'}) redis_obj = RedisObject() try: uid = DevicePushService.decode_uid(etk, uidToken) # 解密uid if len(uid) != 20 and len(uid) != 14: return JsonResponse(status=200, data={'code': 404, 'msg': 'wrong uid'}) V1_PUSH_LOGGER.info('旧移动侦测接口uid:{},时间戳:{},事件类型:{}'.format(uid, n_time, event_type)) event_type = int(event_type) pkey = '{}_{}_{}_ptl'.format(uid, event_type, channel) ykey = '{}_redis_qs'.format(uid) is_sys_msg = DevicePushService.judge_sys_msg(event_type) if is_sys_msg: dkey = '{}_{}_{}_flag'.format(uid, event_type, channel) else: dkey = '{}_{}_flag'.format(uid, channel) have_ykey = redis_obj.get_data(key=ykey) # uid_set 数据库缓存 have_pkey = redis_obj.get_data(key=pkey) # 一分钟限制key have_dkey = redis_obj.get_data(key=dkey) # 推送类型限制 # 一分钟外,推送开启状态 detect_med_type = 0 # 0推送旧机制 1存库不推送,2推送存库 if event_type not in [606, 607]: if have_pkey: res_data = {'code': 0, 'msg': 'Push it once a minute'} return JsonResponse(status=200, data=res_data) # 数据库读取数据 if have_ykey: uid_push_list = eval(redis_obj.get_data(key=ykey)) else: # 从数据库查询出来 uid_push_qs = DevicePushService.query_uid_push(uid, event_type) if not uid_push_qs.exists(): V1_PUSH_LOGGER.info('{}uid_push数据不存在'.format(uid)) return JsonResponse(status=200, data={'code': 176, 'msg': 'no uid_push data'}) # 修改redis数据,并设置过期时间为10分钟 uid_push_list = DevicePushService.qs_to_list(uid_push_qs) redis_obj.set_data(key=ykey, val=str(uid_push_list), expire=600) if not uid_push_list: res_data = {'code': 404, 'msg': 'error !'} return JsonResponse(status=200, data=res_data) if not uid_push_list: res_data = {'code': 0, 'msg': 'uid_push_list not exist'} return JsonResponse(status=200, data=res_data) nickname = uid_push_list[0]['uid_set__nickname'] detect_interval = uid_push_list[0]['uid_set__detect_interval'] detect_group = uid_push_list[0]['uid_set__detect_group'] if not nickname: nickname = uid if detect_group is not None: if have_dkey: detect_med_type = 1 # 1为存库不推送 else: detect_med_type = 2 # 为2的话,既推送,又存库 if CONFIG_INFO != CONFIG_CN: new_detect_interval = uid_push_list[0]['uid_set__new_detect_interval'] detect_interval = new_detect_interval if new_detect_interval > 0 else detect_interval detect_interval = 60 if detect_interval < 60 else detect_interval redis_obj.set_data(key=dkey, val=1, expire=detect_interval - 5) redis_obj.set_data(key=pkey, val=1, expire=60) # 旧模式并且没有pkey,重新创建一个 if not detect_group and not have_pkey: redis_obj.set_data(key=pkey, val=1, expire=60) auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET) bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg') # 推送相关参数 push_kwargs = { 'uid': uid, 'channel': channel, 'event_type': event_type, 'n_time': n_time, } params = {'nickname': nickname, 'uid': uid, 'push_kwargs': push_kwargs, 'is_st': is_st, 'is_sys_msg': is_sys_msg, 'channel': channel, 'event_type': event_type, 'n_time': n_time, 'electricity': '', 'bucket': bucket, 'app_push': have_dkey, 'storage_location': 1, 'ai_type': 0, 'dealings_type': 0, 'detection': 0, 'device_type': 1, 'app_push_config': '', 'uid_set_push_list': uid_push_list, 'redis_obj': redis_obj} # 异步推送消息和保存数据 push_thread = threading.Thread( target=push_and_save_data, kwargs=params) push_thread.start() res_data = {} if is_st == '0' or is_st == '2': res_data = {'code': 0, 'msg': 'success 0 or 2'} return JsonResponse(status=200, data=res_data) elif is_st == '1': obj = '{}/{}/{}.jpeg'.format(uid, channel, n_time) url = bucket.sign_url('PUT', obj, 3600) res_data = {'code': 0, 'img_push': url, 'msg': 'success 1'} elif is_st == '3': img_url_list = [] for i in range(int(is_st)): obj = '{}/{}/{}_{}.jpeg'.format(uid, channel, n_time, i) url = bucket.sign_url('PUT', obj, 3600) img_url_list.append(url) res_data = {'code': 0, 'img_url_list': img_url_list, 'msg': 'success 3'} V1_PUSH_LOGGER.info('旧推送接口响应,uid:{},n_time:{},事件类型:{},响应:{}'. format(uid, n_time, event_type, json.dumps(res_data))) return JsonResponse(status=200, data=res_data) except Exception as e: V1_PUSH_LOGGER.info('旧推送接口异常,error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) data = { 'error_line': e.__traceback__.tb_lineno, 'error_msg': repr(e) } return JsonResponse(status=200, data=json.dumps(data), safe=False) def push_and_save_data(**params): uid = params['uid'] V1_PUSH_LOGGER.info('{}开始异步存表和推送'.format(uid)) # 异步推送消息 push_thread = threading.Thread( target=DevicePushService.push_msg, kwargs=params) push_thread.start() # 保存推送数据 result = DevicePushService.save_msg_push(**params) V1_PUSH_LOGGER.info('{}存表结果:{}'.format(uid, result))