123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- # @Author : Rocky
- # @File : Snapshot.py
- # @Time : 2025/7/29 8:51
- import uuid
- import requests
- from django.http import JsonResponse
- from django.views import View
- from azoauth.config import LOGGER, SERVER_API, ALEXA_EVENT_API
- from model.models import UserModel, UidRtspModel, AlexaAuthModel
- from object.ResObject import ResObject
- from object.RedisObject import RedisObject
- from service.CommonService import CommonService
- class SnapshotView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, operation)
- def validation(self, request_dict, operation):
- response = ResObject()
- if operation == 'sendMqtt': # 下发快照指令
- return self.send_mqtt(request_dict)
- elif operation == 'asynEventResponse': # 异步事件响应
- return self.asyn_event_response(request_dict)
- else:
- return response.json(10, 'error url')
- @classmethod
- def send_mqtt(cls, request_dict):
- uid = request_dict.get("uid", None)
- access_token = request_dict.get("access_token", None)
- correlation_token = request_dict.get("correlation_token", None)
- if not all([uid, access_token, correlation_token]):
- return JsonResponse({'result_code': '444', '错误': '参数错误'})
- try:
- user_qs = UserModel.objects.filter(access_token=access_token).values('userID')
- if not user_qs.exists():
- return JsonResponse({'result_code': '500', '错误': '用户数据不存在'})
- # 更新correlation_token
- user_id = user_qs[0]['userID']
- redis_obj = RedisObject()
- key = 'correlation_token_{}'.format(user_id)
- redis_obj.set_data(key=key, val=correlation_token, expire=60*5)
- region = user_qs.values('region_code')[0]['region_code']
- send_res = cls._send_mqtt_snapshot_command(uid, region)
- if send_res:
- return JsonResponse({'result_code': '0'})
- else:
- return JsonResponse({
- 'result_code': '500',
- 'msg': 'send mqtt failed'}
- )
- except Exception as e:
- return JsonResponse({'result_code': '500', 'error_msg': 'error_line:{}, error_msg:{}'.
- format(e.__traceback__.tb_lineno, repr(e))})
- @staticmethod
- def _send_mqtt_snapshot_command(uid, region):
- thing_name = CommonService.query_serial_with_uid(uid)
- topic_name = 'ansjer/generic/{}'.format(thing_name)
- msg = {
- "commandType": "alexaSnapshot",
- }
- try:
- result = CommonService.req_publish_mqtt_msg(thing_name, topic_name, msg)
- LOGGER.info('快照指令下发结果 {}: {}'.format(uid, result))
- if result:
- return True
- domain_name = SERVER_API[region]
- url = '{}/iot/requestPublishMessage'.format(domain_name)
- request_data = {'UID': uid, 'commandType': 'alexaSnapshot'}
- response = requests.post(url, data=request_data, timeout=10)
- if response.status_code == 200:
- result = response.json()
- if result.get('result_code') == 0:
- LOGGER.info('快照远程MQTT接口成功')
- return True
- else:
- LOGGER.info('快照远程MQTT接口失败, res:{}'.format(result))
- return False
- else:
- LOGGER.info('快照远程MQTT接口失败, 状态码: {}', response.status_code)
- return False
- except Exception as e:
- LOGGER.info('快照MQTT函数异常: error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- return False
- @staticmethod
- def asyn_event_response(request_dict):
- uid = request_dict.get("uid", None)
- image_uri = request_dict.get("image_uri", None)
- time_sample = request_dict.get("time_sample", None)
- if not all([uid, image_uri, time_sample]):
- return JsonResponse({'result_code': '444'})
- try:
- uid_qs = UidRtspModel.objects.filter(uid=uid).values('user_id')
- if not uid_qs.exists():
- return JsonResponse({'result_code': '173'})
- user_id = uid_qs[0]['user_id']
- # 获取token
- access_token = CommonService.get_access_token(user_id)
- if not access_token:
- return JsonResponse({'result_code': '500', 'msg': 'get access_token error'})
- redis_obj = RedisObject()
- key = 'correlation_token_{}'.format(user_id)
- correlation_token = redis_obj.get_data(key)
- if not correlation_token:
- return JsonResponse({'result_code': '500', 'msg': 'correlation_token not exit'})
- auth_qs = AlexaAuthModel.objects.filter(userID=user_id).values('alexa_region')
- alexa_region = auth_qs[0]['alexa_region']
- api_uri = ALEXA_EVENT_API[alexa_region]
- message_id = str(uuid.uuid4()).strip()
- # 转换时间格式
- time_sample = int(time_sample)
- expiration_time = time_sample + 60 * 10 # 图片链接有效时间: 十分钟
- time_of_sample = CommonService.timestamp_to_iso8601(time_sample)
- uri_expiration_time = CommonService.timestamp_to_iso8601(expiration_time)
- bearer_access_token = "Bearer {}".format(access_token)
- headers = {"content-type": "application/json", "Authorization": bearer_access_token}
- payload_json = {
- "event": {
- "header": {
- "namespace": "Alexa.SmartVision.SnapshotProvider",
- "name": "Snapshot",
- "messageId": message_id,
- "correlationToken": correlation_token,
- "payloadVersion": "1.1"
- },
- "endpoint": {
- "scope": {
- "type": "BearerToken",
- "token": access_token
- },
- "endpointId": uid
- },
- "payload": {
- "value": {
- "uri": image_uri,
- "uriExpirationTime": uri_expiration_time,
- "authenticationType": "ACCESS_TOKEN"
- },
- "timeOfSample": time_of_sample,
- "uncertaintyInMilliseconds": 0
- }
- }
- }
- LOGGER.info('快照异步事件请求: url:{},data:{}'.format(api_uri, payload_json))
- response = requests.post(api_uri, json=payload_json, headers=headers)
- LOGGER.info('快照异步事件请求响应: {}'.format(response.json()))
- return JsonResponse({'res': 'success'})
- except Exception as e:
- return JsonResponse({'result_code': '500', 'error_msg': 'error_line:{}, error_msg:{}'.
- format(e.__traceback__.tb_lineno, repr(e))})
|