Snapshot.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. # @Author : Rocky
  2. # @File : Snapshot.py
  3. # @Time : 2025/7/29 8:51
  4. import uuid
  5. import requests
  6. from django.http import JsonResponse
  7. from django.views import View
  8. from azoauth.config import LOGGER, SERVER_API, ALEXA_EVENT_API
  9. from model.models import UserModel, UidRtspModel, AlexaAuthModel
  10. from object.ResObject import ResObject
  11. from object.RedisObject import RedisObject
  12. from service.CommonService import CommonService
  13. class SnapshotView(View):
  14. def get(self, request, *args, **kwargs):
  15. request.encoding = 'utf-8'
  16. operation = kwargs.get('operation')
  17. return self.validation(request.GET, operation)
  18. def post(self, request, *args, **kwargs):
  19. request.encoding = 'utf-8'
  20. operation = kwargs.get('operation')
  21. return self.validation(request.POST, operation)
  22. def validation(self, request_dict, operation):
  23. response = ResObject()
  24. if operation == 'sendMqtt': # 下发快照指令
  25. return self.send_mqtt(request_dict)
  26. elif operation == 'asynEventResponse': # 异步事件响应
  27. return self.asyn_event_response(request_dict)
  28. else:
  29. return response.json(10, 'error url')
  30. @classmethod
  31. def send_mqtt(cls, request_dict):
  32. uid = request_dict.get("uid", None)
  33. access_token = request_dict.get("access_token", None)
  34. correlation_token = request_dict.get("correlation_token", None)
  35. if not all([uid, access_token, correlation_token]):
  36. return JsonResponse({'result_code': '444', '错误': '参数错误'})
  37. try:
  38. user_qs = UserModel.objects.filter(access_token=access_token).values('userID')
  39. if not user_qs.exists():
  40. return JsonResponse({'result_code': '500', '错误': '用户数据不存在'})
  41. # 更新correlation_token
  42. user_id = user_qs[0]['userID']
  43. redis_obj = RedisObject()
  44. key = 'correlation_token_{}'.format(user_id)
  45. redis_obj.set_data(key=key, val=correlation_token, expire=60*5)
  46. region = user_qs.values('region_code')[0]['region_code']
  47. send_res = cls._send_mqtt_snapshot_command(uid, region)
  48. if send_res:
  49. return JsonResponse({'result_code': '0'})
  50. else:
  51. return JsonResponse({
  52. 'result_code': '500',
  53. 'msg': 'send mqtt failed'}
  54. )
  55. except Exception as e:
  56. return JsonResponse({'result_code': '500', 'error_msg': 'error_line:{}, error_msg:{}'.
  57. format(e.__traceback__.tb_lineno, repr(e))})
  58. @staticmethod
  59. def _send_mqtt_snapshot_command(uid, region):
  60. thing_name = CommonService.query_serial_with_uid(uid)
  61. topic_name = 'ansjer/generic/{}'.format(thing_name)
  62. msg = {
  63. "commandType": "alexaSnapshot",
  64. }
  65. try:
  66. result = CommonService.req_publish_mqtt_msg(thing_name, topic_name, msg)
  67. LOGGER.info('快照指令下发结果 {}: {}'.format(uid, result))
  68. if result:
  69. return True
  70. domain_name = SERVER_API[region]
  71. url = '{}/iot/requestPublishMessage'.format(domain_name)
  72. request_data = {'UID': uid, 'commandType': 'alexaSnapshot'}
  73. response = requests.post(url, data=request_data, timeout=10)
  74. if response.status_code == 200:
  75. result = response.json()
  76. if result.get('result_code') == 0:
  77. LOGGER.info('快照远程MQTT接口成功')
  78. return True
  79. else:
  80. LOGGER.info('快照远程MQTT接口失败, res:{}'.format(result))
  81. return False
  82. else:
  83. LOGGER.info('快照远程MQTT接口失败, 状态码: {}', response.status_code)
  84. return False
  85. except Exception as e:
  86. LOGGER.info('快照MQTT函数异常: error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  87. return False
  88. @staticmethod
  89. def asyn_event_response(request_dict):
  90. uid = request_dict.get("uid", None)
  91. image_uri = request_dict.get("image_uri", None)
  92. time_sample = request_dict.get("time_sample", None)
  93. if not all([uid, image_uri, time_sample]):
  94. return JsonResponse({'result_code': '444'})
  95. try:
  96. uid_qs = UidRtspModel.objects.filter(uid=uid).values('user_id')
  97. if not uid_qs.exists():
  98. return JsonResponse({'result_code': '173'})
  99. user_id = uid_qs[0]['user_id']
  100. # 获取token
  101. access_token = CommonService.get_access_token(user_id)
  102. if not access_token:
  103. return JsonResponse({'result_code': '500', 'msg': 'get access_token error'})
  104. redis_obj = RedisObject()
  105. key = 'correlation_token_{}'.format(user_id)
  106. correlation_token = redis_obj.get_data(key)
  107. if not correlation_token:
  108. return JsonResponse({'result_code': '500', 'msg': 'correlation_token not exit'})
  109. auth_qs = AlexaAuthModel.objects.filter(userID=user_id).values('alexa_region')
  110. alexa_region = auth_qs[0]['alexa_region']
  111. api_uri = ALEXA_EVENT_API[alexa_region]
  112. message_id = str(uuid.uuid4()).strip()
  113. # 转换时间格式
  114. time_sample = int(time_sample)
  115. expiration_time = time_sample + 60 * 10 # 图片链接有效时间: 十分钟
  116. time_of_sample = CommonService.timestamp_to_iso8601(time_sample)
  117. uri_expiration_time = CommonService.timestamp_to_iso8601(expiration_time)
  118. bearer_access_token = "Bearer {}".format(access_token)
  119. headers = {"content-type": "application/json", "Authorization": bearer_access_token}
  120. payload_json = {
  121. "event": {
  122. "header": {
  123. "namespace": "Alexa.SmartVision.SnapshotProvider",
  124. "name": "Snapshot",
  125. "messageId": message_id,
  126. "correlationToken": correlation_token,
  127. "payloadVersion": "1.1"
  128. },
  129. "endpoint": {
  130. "scope": {
  131. "type": "BearerToken",
  132. "token": access_token
  133. },
  134. "endpointId": uid
  135. },
  136. "payload": {
  137. "value": {
  138. "uri": image_uri,
  139. "uriExpirationTime": uri_expiration_time,
  140. "authenticationType": "ACCESS_TOKEN"
  141. },
  142. "timeOfSample": time_of_sample,
  143. "uncertaintyInMilliseconds": 0
  144. }
  145. }
  146. }
  147. LOGGER.info('快照异步事件请求: url:{},data:{}, headers:{}, time_sample:{}'.format(
  148. api_uri, payload_json, headers, time_sample))
  149. response = requests.post(api_uri, json=payload_json, headers=headers, timeout=30)
  150. LOGGER.info('快照异步事件请求响应: {}'.format(response.json()))
  151. return JsonResponse({'res': 'success'})
  152. except Exception as e:
  153. return JsonResponse({'result_code': '500', 'error_msg': 'error_line:{}, error_msg:{}'.
  154. format(e.__traceback__.tb_lineno, repr(e))})