import json import logging import threading from django.http import JsonResponse from django.views.generic.base import View from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_US, CONFIG_CN from Object.GlobalThreadPoolObject import GlobalThreadPool from Object.RedisObject import RedisObject from Service.DevicePushService import DevicePushService TIME_LOGGER = logging.getLogger('time') ERROR_INFO_LOGGER = logging.getLogger('error_info') # 移动侦测V2接口 class NotificationV2View(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' return self.validation(request.GET) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' return self.validation(request.POST) @staticmethod def validation(request_dict): """ 设备触发报警消息推送 @param request_dict:uidToken 加密uid @param request_dict:etk 加密uid @param request_dict:channel 设备通道号 @param request_dict:n_time 设备触发报警时间 @param request_dict:event_type 设备事件类型 @param request_dict:is_st 文件类型(0,2:无图, 1:单张图片, 3:三张图片) @param request_dict:region 文件存储区域(1:国外, 2:国内) @param request_dict:electricity 电量值 @param request_dict:time_token 时间戳token @param request_dict:uid uid @param request_dict:dealings_type 往来检测 1:来,2:离开 @param request_dict:detection 检测类型 0:普通,1:算法 """ uidToken = request_dict.get('uidToken', None) etk = request_dict.get('etk', None) channel = request_dict.get('channel', '1') n_time = request_dict.get('n_time', None) event_type = request_dict.get('event_type', None) is_st = request_dict.get('is_st', None) region = request_dict.get('region', None) electricity = request_dict.get('electricity', '') dealings_type = int(request_dict.get('dealingsType', 0)) detection = int(request_dict.get('detection', 0)) button = request_dict.get('button', '1') uid = "" # 参数校验 if not all([channel, n_time]): return JsonResponse(status=200, data={'code': 444, 'msg': 'param is wrong'}) if not region or not is_st: return JsonResponse(status=200, data={'code': 404, 'msg': 'no region or is_st'}) is_st = int(is_st) region = int(region) event_type = int(event_type) redis_obj = RedisObject() try: uid = DevicePushService.decode_uid(etk, uidToken) if len(uid) != 20 and len(uid) != 14: return JsonResponse(status=200, data={'code': 404, 'msg': 'wrong uid'}) TIME_LOGGER.info('开始推送,uid:{},参数:{}'.format(uid, request_dict)) # 判断是否为系统消息 is_sys_msg = DevicePushService.judge_sys_msg(event_type) if is_sys_msg: push_interval = '{}_{}_{}_flag'.format(uid, channel, event_type) else: push_interval = '{}_{}_flag'.format(uid, channel) req_limiting = '{}_{}_{}_ptl'.format(uid, channel, event_type) cache_req_limiting = redis_obj.get_data(key=req_limiting) # 获取请求限流缓存数据 cache_app_push = redis_obj.get_data(key=push_interval) # 获取APP推送消息时间间隔缓存数据 if event_type not in [606, 607]: if cache_req_limiting: # 限流存在则直接返回 return JsonResponse(status=200, data={'code': 0, 'msg': 'Push again in one minute'}) redis_obj.set_data(key=req_limiting, val=1, expire=60) # 当缓存不存在限流数据 重新设置一分钟请求一次 # 查询uid_push和uid_set数据 uid_push_qs = DevicePushService.query_uid_push(uid, event_type, button) if not uid_push_qs.exists(): TIME_LOGGER.info('推送响应,uid:{},uid_push数据不存在!'.format(uid)) return JsonResponse(status=200, data={'code': 176, 'msg': 'no uid_push data'}) ai_type = uid_push_qs.first()['uid_set__ai_type'] device_type = uid_push_qs.first()['uid_set__device_type'] # uid_push_qs转存列表 uid_set_push_list = DevicePushService.qs_to_list(uid_push_qs) nickname = uid_set_push_list[0]['uid_set__nickname'] nickname = uid if not nickname else nickname # APP消息提醒推送间隔 detect_interval = uid_set_push_list[0]['uid_set__detect_interval'] if event_type not in [606, 607]: if not cache_app_push: # 缓存APP提醒推送间隔 默认1分钟提醒一次 DevicePushService.cache_push_detect_interval(redis_obj, push_interval, detect_interval, uid_set_push_list[0]['uid_set__new_detect_interval']) else: cache_app_push = '' bucket = '' aws_s3_client = '' # 推图,初始化s3 client if is_st == 1 or is_st == 3: aws_s3_client = DevicePushService.get_s3_client(region=region) bucket = 'foreignpush' if region == 1 else 'push' # 推送相关参数 push_kwargs = { 'uid': uid, 'channel': channel, 'event_type': event_type, 'n_time': n_time, } # 对象存储区域 2:AWS,3:oci美国凤凰城,4:oci英国伦敦 storage_location = 2 if CONFIG_INFO == CONFIG_CN else 4 if CONFIG_INFO == CONFIG_EUR else 3 params = {'nickname': nickname, 'uid': uid, 'push_kwargs': push_kwargs, 'is_st': is_st, 'region': region, 'is_sys_msg': is_sys_msg, 'channel': channel, 'event_type': event_type, 'n_time': n_time, 'electricity': electricity, 'bucket': bucket, 'aws_s3_client': aws_s3_client, 'app_push': cache_app_push, 'storage_location': storage_location, 'ai_type': ai_type, 'device_type': device_type, 'dealings_type': dealings_type, 'detection': detection, 'app_push_config': uid_set_push_list[0]['uid_set__msg_notify'], 'uid_set_push_list': uid_set_push_list, 'redis_obj': redis_obj} # 使用全局的线程池提交推送任务 # thread_pool = GlobalThreadPool() # thread_pool.submit(push_and_save_data, **params) # 异步推送消息和保存数据 push_thread = threading.Thread( target=push_and_save_data, kwargs=params) push_thread.start() # 视频通话不返回图片链接 if event_type == 607: TIME_LOGGER.info('推送响应,uid:{},n_time:{},事件类型:{}'.format(uid, n_time, event_type)) return JsonResponse(status=200, data={'code': 0, 'msg': 'success'}) # 获取S3对象上传链接 kwargs = { 'is_st': is_st, 'uid': uid, 'channel': channel, 'n_time': n_time, 'region': region, 'aws_s3_client': aws_s3_client, 'storage_location': storage_location } res_data = DevicePushService.get_res_data(**kwargs) TIME_LOGGER.info('推送响应,uid:{},n_time:{},事件类型:{},响应:{}'.format( uid, n_time, event_type, json.dumps(res_data))) return JsonResponse(status=200, data=res_data) except Exception as e: ERROR_INFO_LOGGER.info('V2推送接口异常,uid:{},etk:{},error_line:{},error_msg:{}'. format(uid, etk, e.__traceback__.tb_lineno, repr(e))) data = { 'error_line': e.__traceback__.tb_lineno, 'error_msg': repr(e) } return JsonResponse(status=200, data=json.dumps(data), safe=False) def push_and_save_data(**params): uid = params['uid'] TIME_LOGGER.info('{}开始异步存表和推送'.format(uid)) # 线程池推送消息 # thread_pool = GlobalThreadPool() # thread_pool.submit(DevicePushService.push_msg, **params) # 异步推送消息 push_thread = threading.Thread( target=DevicePushService.push_msg, kwargs=params) push_thread.start() # 保存推送数据 result = DevicePushService.save_msg_push(**params) TIME_LOGGER.info('{}存表结果:{}'.format(uid, result))