# @Author : Rocky # @File : Snapshot.py # @Time : 2025/7/29 8:51 import uuid import requests from django.http import JsonResponse from django.views import View from azoauth.config import LOGGER, SERVER_API, ALEXA_EVENT_API from model.models import UserModel, UidRtspModel, AlexaAuthModel from object.ResObject import ResObject from object.RedisObject import RedisObject from service.CommonService import CommonService class SnapshotView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, operation) def validation(self, request_dict, operation): response = ResObject() if operation == 'sendMqtt': # 下发快照指令 return self.send_mqtt(request_dict) elif operation == 'asynEventResponse': # 异步事件响应 return self.asyn_event_response(request_dict) else: return response.json(10, 'error url') @classmethod def send_mqtt(cls, request_dict): uid = request_dict.get("uid", None) access_token = request_dict.get("access_token", None) correlation_token = request_dict.get("correlation_token", None) if not all([uid, access_token, correlation_token]): return JsonResponse({'result_code': '444', '错误': '参数错误'}) try: user_qs = UserModel.objects.filter(access_token=access_token).values('userID') if not user_qs.exists(): return JsonResponse({'result_code': '500', '错误': '用户数据不存在'}) # 更新correlation_token user_id = user_qs[0]['userID'] redis_obj = RedisObject() key = 'correlation_token_{}'.format(uid) redis_obj.set_data(key=key, val=correlation_token, expire=60*5) region = user_qs.values('region_code')[0]['region_code'] send_res = cls._send_mqtt_snapshot_command(uid, region) if send_res: return JsonResponse({'result_code': '0'}) else: return JsonResponse({ 'result_code': '500', 'msg': 'send mqtt failed'} ) except Exception as e: return JsonResponse({'result_code': '500', 'error_msg': 'error_line:{}, error_msg:{}'. format(e.__traceback__.tb_lineno, repr(e))}) @staticmethod def _send_mqtt_snapshot_command(uid, region): thing_name = CommonService.query_serial_with_uid(uid) topic_name = 'ansjer/generic/{}'.format(thing_name) msg = { "commandType": "alexaSnapshot", } try: result = CommonService.req_publish_mqtt_msg(thing_name, topic_name, msg) LOGGER.info('快照指令下发结果 {}: {}'.format(uid, result)) if result: return True domain_name = SERVER_API[region] url = '{}/iot/requestPublishMessage'.format(domain_name) request_data = {'UID': uid, 'commandType': 'alexaSnapshot'} response = requests.post(url, data=request_data, timeout=10) if response.status_code == 200: result = response.json() if result.get('result_code') == 0: LOGGER.info('快照远程MQTT接口成功') return True else: LOGGER.info('快照远程MQTT接口失败, res:{}'.format(result)) return False else: LOGGER.info('快照远程MQTT接口失败, 状态码: {}', response.status_code) return False except Exception as e: LOGGER.info('快照MQTT函数异常: error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return False @staticmethod def asyn_event_response(request_dict): uid = request_dict.get("uid", None) image_uri = request_dict.get("image_uri", None) time_sample = request_dict.get("time_sample", None) if not all([uid, image_uri, time_sample]): return JsonResponse({'result_code': '444'}) try: uid_qs = UidRtspModel.objects.filter(uid=uid).values('user_id') if not uid_qs.exists(): return JsonResponse({'result_code': '173'}) user_id = uid_qs[0]['user_id'] # 获取token access_token = CommonService.get_access_token(user_id) if not access_token: return JsonResponse({'result_code': '500', 'msg': 'get access_token error'}) redis_obj = RedisObject() key = 'correlation_token_{}'.format(uid) correlation_token = redis_obj.get_data(key) if not correlation_token: return JsonResponse({'result_code': '500', 'msg': 'correlation_token not exit'}) auth_qs = AlexaAuthModel.objects.filter(userID=user_id).values('alexa_region') alexa_region = auth_qs[0]['alexa_region'] api_uri = ALEXA_EVENT_API[alexa_region] message_id = str(uuid.uuid4()).strip() # 转换时间格式 time_sample = int(time_sample) expiration_time = time_sample + 60 * 10 # 图片链接有效时间: 十分钟 time_of_sample = CommonService.timestamp_to_iso8601(time_sample) uri_expiration_time = CommonService.timestamp_to_iso8601(expiration_time) bearer_access_token = "Bearer {}".format(access_token) headers = {"content-type": "application/json", "Authorization": bearer_access_token} payload_json = { "event": { "header": { "namespace": "Alexa.SmartVision.SnapshotProvider", "name": "Snapshot", "messageId": message_id, "correlationToken": correlation_token, "payloadVersion": "1.1" }, "endpoint": { "scope": { "type": "BearerToken", "token": access_token }, "endpointId": uid }, "payload": { "value": { "uri": image_uri, "uriExpirationTime": uri_expiration_time, "authenticationType": "ACCESS_TOKEN" }, "timeOfSample": time_of_sample, "uncertaintyInMilliseconds": 0 } } } LOGGER.info('快照异步事件请求: url:{},data:{}, headers:{}, time_sample:{}'.format( api_uri, payload_json, headers, time_sample)) response = requests.post(api_uri, json=payload_json, headers=headers, timeout=30) LOGGER.info('快照异步事件请求响应: {}'.format(response.json())) return JsonResponse({'res': 'success'}) except Exception as e: return JsonResponse({'result_code': '500', 'error_msg': 'error_line:{}, error_msg:{}'. format(e.__traceback__.tb_lineno, repr(e))})