Browse Source

快照异步事件响应接口

locky 2 weeks ago
parent
commit
769e22ea3b
3 changed files with 152 additions and 16 deletions
  1. 85 6
      controller/Snapshot.py
  2. 0 1
      model/models.py
  3. 67 9
      service/CommonService.py

+ 85 - 6
controller/Snapshot.py

@@ -1,15 +1,17 @@
 # @Author    : Rocky
 # @File      : Snapshot.py
 # @Time      : 2025/7/29 8:51
-import threading
+
+import uuid
 
 import requests
 from django.http import JsonResponse
 from django.views import View
 
-from azoauth.config import LOGGER, SERVER_API
-from model.models import UserModel
+from azoauth.config import LOGGER, SERVER_API, ALEXA_EVENT_API
+from model.models import UserModel, UidRtspModel, AlexaAuthModel
 from object.ResObject import ResObject
+from object.RedisObject import RedisObject
 from service.CommonService import CommonService
 
 
@@ -29,7 +31,7 @@ class SnapshotView(View):
         if operation == 'sendMqtt':             # 下发快照指令
             return self.send_mqtt(request_dict)
         elif operation == 'asynEventResponse':  # 异步事件响应
-            return self.asyn_event_response(request_dict, response)
+            return self.asyn_event_response(request_dict)
         else:
             return response.json(10, 'error url')
 
@@ -43,12 +45,15 @@ class SnapshotView(View):
             return JsonResponse({'result_code': '444', '错误': '参数错误'})
 
         try:
-            user_qs = UserModel.objects.filter(access_token=access_token)
+            user_qs = UserModel.objects.filter(access_token=access_token).values('userID')
             if not user_qs.exists():
                 return JsonResponse({'result_code': '500', '错误': '用户数据不存在'})
 
             # 更新correlation_token
-            user_qs.update(correlation_token=correlation_token)
+            user_id = user_qs[0]['userID']
+            redis_obj = RedisObject()
+            key = 'correlation_token_{}'.format(user_id)
+            redis_obj.set_data(key=key, val=correlation_token, expire=60*5)
 
             region = user_qs.values('region_code')[0]['region_code']
             send_res = cls._send_mqtt_snapshot_command(uid, region)
@@ -100,3 +105,77 @@ class SnapshotView(View):
         except Exception as e:
             LOGGER.info('快照MQTT函数异常: error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
             return False
+
+    @staticmethod
+    def asyn_event_response(request_dict):
+        uid = request_dict.get("uid", None)
+        image_uri = request_dict.get("image_uri", None)
+        time_sample = request_dict.get("time_sample", None)
+
+        if not all([uid, image_uri, time_sample]):
+            return JsonResponse({'result_code': '444'})
+
+        try:
+            uid_qs = UidRtspModel.objects.filter(uid=uid).values('user_id')
+            if not uid_qs.exists():
+                return JsonResponse({'result_code': '173'})
+            user_id = uid_qs[0]['user_id']
+            # 获取token
+            access_token = CommonService.get_access_token(user_id)
+            if not access_token:
+                return JsonResponse({'result_code': '500', 'msg': 'get access_token error'})
+            redis_obj = RedisObject()
+            key = 'correlation_token_{}'.format(user_id)
+            correlation_token = redis_obj.get_data(key)
+            if not correlation_token:
+                return JsonResponse({'result_code': '500', 'msg': 'correlation_token not exit'})
+
+            auth_qs = AlexaAuthModel.objects.filter(userID=user_id).values('alexa_region')
+            alexa_region = auth_qs[0]['alexa_region']
+            api_uri = ALEXA_EVENT_API[alexa_region]
+            message_id = str(uuid.uuid4()).strip()
+
+            # 转换时间格式
+            time_sample = int(time_sample)
+            expiration_time = time_sample + 60 * 10     # 图片链接有效时间: 十分钟
+            time_of_sample = CommonService.timestamp_to_iso8601(time_sample)
+            uri_expiration_time = CommonService.timestamp_to_iso8601(expiration_time)
+
+            bearer_access_token = "Bearer {}".format(access_token)
+            headers = {"content-type": "application/json", "Authorization": bearer_access_token}
+            payload_json = {
+                "event": {
+                    "header": {
+                        "namespace": "Alexa.SmartVision.SnapshotProvider",
+                        "name": "Snapshot",
+                        "messageId": message_id,
+                        "correlationToken": correlation_token,
+                        "payloadVersion": "1.1"
+                    },
+                    "endpoint": {
+                        "scope": {
+                            "type": "BearerToken",
+                            "token": "OAuth2.0 bearer token"
+                        },
+                        "endpointId": uid
+                    },
+                    "payload": {
+                        "value": {
+                            "uri": image_uri,
+                            "uriExpirationTime": uri_expiration_time,
+                            "authenticationType": "ACCESS_TOKEN"
+                        },
+                        "timeOfSample": time_of_sample,
+                        "uncertaintyInMilliseconds": 0
+                    }
+                }
+            }
+
+            LOGGER.info('快照异步事件请求: url:{},data:{}'.format(api_uri, payload_json))
+            response = requests.post(api_uri, json=payload_json, headers=headers)
+            LOGGER.info('快照异步事件请求响应: {}'.format(response))
+
+            return JsonResponse({'res': 'success'})
+        except Exception as e:
+            return JsonResponse({'result_code': '500', 'error_msg': 'error_line:{}, error_msg:{}'.
+                                format(e.__traceback__.tb_lineno, repr(e))})

+ 0 - 1
model/models.py

@@ -11,7 +11,6 @@ class UserModel(models.Model):
     user_authorization_code = models.CharField(max_length=32, default='', verbose_name='用户授权码')
     access_token = models.CharField(max_length=64, unique=False, default='', verbose_name='访问令牌')
     refresh_token = models.CharField(max_length=64, unique=False, default='', verbose_name='刷新令牌')
-    correlation_token = models.TextField(default='', verbose_name='当前令牌')
     uid_rtsp = models.ManyToManyField(to='UidRtspModel', blank=True, verbose_name=u'用户关联uid_rtsp表',
                                       db_table='user_uid_rtsp')
     addTime = models.IntegerField(verbose_name='添加时间', default=0)

+ 67 - 9
service/CommonService.py

@@ -1,19 +1,13 @@
-
 import datetime
 import time
-from pathlib import Path
 from random import Random
 import base64
-import ipdb
+import pytz
 import requests
 import OpenSSL.crypto as ct
 from base64 import encodebytes
-import simplejson as json
-from django.core import serializers
-from django.utils import timezone
-from pyipip import IPIPDatabase
-
-from model.models import iotdeviceInfoModel, UidRtspModel
+from azoauth.config import ALEXA_EVENT_API, CLIENT_CONFIG, LOGGER
+from model.models import iotdeviceInfoModel, UidRtspModel, AlexaAuthModel
 
 
 # 复用性且公用较高封装代码在这
@@ -191,3 +185,67 @@ GCqvlyw5dfxNA+EtxNE2wCW/LW7ENJlACgcfgPlBZtpLheWoZB/maw4=
         signature = encodebytes(signature).decode('utf8').replace('\n', '')
         # print('signature:', signature)
         return signature
+
+    @staticmethod
+    def timestamp_to_iso8601(timestamp):
+        # 将十位时间戳(秒)转换为UTC时区的datetime对象
+        utc_dt = datetime.datetime.fromtimestamp(timestamp, pytz.UTC)
+        # 格式化为YYYY-MM-DDThh:mm:ssZ格式
+        return utc_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
+
+    @classmethod
+    def get_access_token(cls, user_id):
+        auth_qs = AlexaAuthModel.objects.filter(userID=user_id).values(
+            'expiresTime', 'refresh_token', 'access_token', 'alexa_region', 'skill_name')
+        if not auth_qs.exists():
+            return False
+
+        expires_time = auth_qs[0]['expiresTime']
+        alexa_region = auth_qs[0]['alexa_region']
+
+        now_time = int(time.time())
+        if now_time < expires_time:
+            access_token = auth_qs[0]['access_token']
+        else:
+            if alexa_region not in ALEXA_EVENT_API.keys():
+                return False
+
+            skill_name = auth_qs[0]['skill_name']
+            refresh_token = auth_qs[0]['refresh_token']
+            res = cls.get_refresh_token(refresh_token, skill_name)
+            if 'error' not in res:
+                auth_qs.update(
+                    access_token=res['access_token'],
+                    refresh_token=res['refresh_token'],
+                    expiresTime=now_time + 3000,
+                    updTime=now_time,
+                )
+                access_token = res['access_token']
+            else:
+                return False
+        return access_token
+
+    @staticmethod
+    def get_refresh_token(refresh_token, skill_name):
+        # 请求更新token
+        if skill_name not in CLIENT_CONFIG.keys():
+            return {'error': 'error skill_name'}
+
+        payload = {
+            'grant_type': 'refresh_token',
+            'refresh_token': refresh_token,
+            'client_id': CLIENT_CONFIG[skill_name]['client_id'],
+            'client_secret': CLIENT_CONFIG[skill_name]['client_secret'],
+        }
+        auth_request_url = 'https://api.amazon.com/auth/o2/token'
+        headers = {
+            'content-type': "application/x-www-form-urlencoded",
+            'cache-control': "no-cache"
+        }
+        try:
+            res = requests.post(auth_request_url, data=payload, headers=headers, timeout=5)
+            res.raise_for_status()
+            request_json = res.json()
+            return request_json
+        except requests.exceptions.RequestException as e:
+            return {'error': str(e)}