Browse Source

增加请求MQTT下发快照指令

zhuo 3 weeks ago
parent
commit
6b652e5b88
1 changed files with 89 additions and 183 deletions
  1. 89 183
      controller/index.py

+ 89 - 183
controller/index.py

@@ -5,6 +5,8 @@ import threading
 import time
 from collections import OrderedDict
 from datetime import datetime
+from gc import enable
+
 import requests
 from django.http import JsonResponse
 from django.shortcuts import render
@@ -20,6 +22,7 @@ from object.tkObject import tkObject
 from service.CommonService import CommonService
 import uuid
 
+
 class authView(TemplateView):
     def post(self, request, *args, **kwargs):
         request.encoding = 'utf-8'
@@ -980,208 +983,111 @@ class VesseTest(TemplateView):
             print(e)
             return response.json(500, repr(e))
 
-def get_domain_by_region(region):
-    if region == 'EU':
-        return SERVER_PREFIX_EU
-    if region == 'CN':
-        return SERVER_PREFIX_TEST
-    return SERVER_PREFIX
+
+
+logger = logging.getLogger('django')
+
 
 def get_uuid():
     return str(uuid.uuid4())
 
+
 class GetSnapshotView(View):
-    def post(self, request):
-        logger = logging.getLogger('django')
-        try:
-            data = json.loads(request.body)
-            endpoint_id = data['endpoint_id']
-            access_token = data['access_token']
-            correlation_token = data.get('correlation_token', '')
-            region = data.get('region', 'US')
-            if not endpoint_id or not access_token:
-                raise ValueError("endpoint_id 或 access_token 为空")
+    def post(self, request, *args, **kwargs):
+        request.encoding = 'utf-8'
+        request_dict = request.POST
+        return self.validate(request_dict)
 
-        except Exception as e:
-            logger.error(f"参数解析/校验失败: {e}")
-            return JsonResponse({'error': 'BAD_REQUEST'}, status=400)
+    def get(self, request, *args, **kwargs):
+        request.encoding = 'utf-8'
+        request_dict = request.GET
+        return self.validate(request_dict)
 
+    def validate(self, request_dict):
         try:
-            user = UserModel.objects.get(access_token=access_token)
-        except UserModel.DoesNotExist:
-            return JsonResponse({'error': 'INVALID_TOKEN'}, status=401)
-
-        # 验证设备归属
-        if not user.uid_rtsp.filter(uid=endpoint_id).exists():
-            return JsonResponse({'error': 'NO_SUCH_DEVICE'}, status=403)
-
-        # 获取设备信息
-        device = user.uid_rtsp.get(uid=endpoint_id)
-        password = device.password or ''
-        region = getattr(device, 'region', region)
-        rtsp_path = device.rtsp_url
+            uid = request_dict.get("uid", None)
+            access_token = request_dict.get("access_token", None)
+            correlation_token = request_dict.get('correlation_token', '')
 
-        rtsp_domain = get_domain_by_region(region).split('//')[-1]
-        rtsp_url = f'rtsp://{rtsp_domain}:8554/{rtsp_path}'
+            if not uid or not access_token:
+                raise ValueError("endpoint_id or access_token is empty")
 
-        if '_' in endpoint_id:
-            uid_base, channel = endpoint_id.rsplit('_', 1)
-        else:
-            uid_base = endpoint_id
-            channel  = '0'
-
-        logger.info(f"[GetSnapshot] uid={uid_base}, channel={channel}, region={region}")
-
-        domain_name = get_domain_by_region(region)
-
-        # 启动线程异步下发快照指令
-        threading.Thread(
-            target=self.send_snapshot_command,
-            args=(domain_name, uid_base, password, rtsp_url, channel, logger),
-            daemon=True
-        ).start()
-        logger.info(f"[GetSnapshot] 已启动快照指令线程 → domain={domain_name}, UID={uid_base}")
-
-        #  返回 DeferredResponse
-        return JsonResponse({
-            "event": {
-                "header": {
-                    "namespace": "Alexa",
-                    "name": "DeferredResponse",
-                    "messageId": get_uuid(),
-                    "correlationToken": correlation_token,
-                    "payloadVersion": "3"
-                },
-                "endpoint": {
-                    "endpointId": endpoint_id
-                },
-                "payload": {
-                    "estimatedDeferralInSeconds": 7
+            # 2. 验证用户和设备
+            user = UserModel.objects.get(access_token=access_token)
+            if not user.uid_rtsp.filter(uid=uid).exists():
+                raise PermissionError("Device not belong to user")
+
+            device = user.uid_rtsp.get(uid=uid)
+            region = getattr(device, 'region', 'US')
+            threading.Thread(
+                target=self._send_mqtt_snapshot_command,
+                args=(uid, region),
+                daemon=True
+            ).start()
+
+            # 5. 返回Alexa延迟响应
+            return JsonResponse({
+                "event": {
+                    "header": {
+                        "namespace": "Alexa",
+                        "name": "DeferredResponse",
+                        "messageId": get_uuid(),
+                        "correlationToken": correlation_token,
+                        "payloadVersion": "3"
+                    },
+                    "endpoint": {
+                        "endpointId": uid
+                    },
+                    "payload": {
+                        "estimatedDeferralInSeconds": 7
+                    }
                 }
-            }
-        })
+            })
 
-    def send_snapshot_command(self, domain_name, uid, password, rtsp_url, channel, logger):
+        except Exception as e:
+            logger.error(f"Error: {str(e)}")
+            return JsonResponse({'error': 'INTERNAL_ERROR'}, status=500)
 
-        url = f'{domain_name}/iot/requestPublishMessage'
-        payload = {
-            'UID': uid,
-            'rtsp': rtsp_url,
-            'enable': '1',
-            'channel': channel
-        }
-        try:
-            resp = requests.post(url, data=payload, timeout=10)
-            if resp.status_code != 200:
-                logger.error(f"[send_snapshot] {url} 返回 HTTP {resp.status_code}")
-                return
-            data = resp.json()
-            logger.info(f"[send_snapshot] 响应({url}): {data}")
-            if data.get('result_code') != 0:
-                logger.error(f"[send_snapshot] 下发失败, result_code={data.get('result_code')}")
-        except Exception as ex:
-            logger.error(f"[send_snapshot] 请求异常: {ex}")
-
-def get_domain_by_region(region):
-    if region == 'EU':
-        return SERVER_PREFIX_EU
-    if region == 'CN':
-        return SERVER_PREFIX_TEST
-    return SERVER_PREFIX
+    @staticmethod
+    def _send_mqtt_snapshot_command(uid,region):
+        thing_name = CommonService.query_serial_with_uid(uid)
+        topic_name='ansjer/generic/{}'.format(thing_name)
 
-def get_uuid():
-    return str(uuid.uuid4())
+        msg = {
+            "commandType": "alexaSnapshot",
+        }
 
+        result = CommonService.req_publish_mqtt_msg(thing_name, topic_name, msg,qos=1)
+        logger.info('快照指令下发结果 {}: {}'.format(uid, result))
 
-def send_snapshot_command(domain_name, uid, password, rtsp_url, channel, logger):
+        if result:
+            return True
 
-    url = f'{domain_name}/iot/requestPublishMessage'
-    payload = {
-        'UID': uid,
-        'rtsp': rtsp_url,
-        'enable': '1',
-        'channel': channel
-    }
-    try:
-        resp = requests.post(url, data=payload, timeout=10)
-        if resp.status_code != 200:
-            logger.error(f"[send_snapshot] {url} 返回 HTTP {resp.status_code}")
-            return
-        data = resp.json()
-        logger.info(f"[send_snapshot] 响应({url}): {data}")
-        if data.get('result_code') != 0:
-            logger.error(f"[send_snapshot] 下发失败, result_code={data.get('result_code')}")
-    except Exception as ex:
-        logger.error(f"[send_snapshot] 请求异常: {ex}")
+        if region == 'EU':
+            domain_name = SERVER_PREFIX_EU
+        elif region == 'CN':
+            domain_name = SERVER_PREFIX_TEST
+        else:
+            domain_name = SERVER_PREFIX
 
+        url = '{}/iot/requestPublishMessage'.format(domain_name)
+        request_data = {'UID': uid, 'commandType': 'alexaSnapshot'}
 
-class GetSnapshotView(View):
-    def post(self, request):
-        logger = logging.getLogger('django')
         try:
-            data = json.loads(request.body)
-            endpoint_id = data['endpoint_id']
-            access_token = data['access_token']
-            correlation_token = data.get('correlation_token', '')
-            region = data.get('region', 'US')
-            if not endpoint_id or not access_token:
-                raise ValueError("endpoint_id 或 access_token 为空")
+            response = requests.post(url, data=request_data, timeout=10)
+            if response.status_code == 200:
+                result = response.json()
+                logger.info('远程MQTT接口返回: %s', result)
+                if result.get('result_code') == 0:
+                    return True
+                else:
+                    logger.error('远程MQTT接口业务失败, result_code=%s, reason=%s',
+                                 result.get('result_code'), result.get('reason'))
+                    return False
+            else:
+                logger.error('远程MQTT接口调用失败, HTTP status=%s', response.status_code)
+                return False
 
         except Exception as e:
-            logger.error(f"参数解析/校验失败: {e}")
-            return JsonResponse({'error': 'BAD_REQUEST'}, status=400)
-
-        try:
-            user = UserModel.objects.get(access_token=access_token)
-        except UserModel.DoesNotExist:
-            return JsonResponse({'error': 'INVALID_TOKEN'}, status=401)
-
-        # 验证设备归属
-        if not user.uid_rtsp.filter(uid=endpoint_id).exists():
-            return JsonResponse({'error': 'NO_SUCH_DEVICE'}, status=403)
-
-        # 获取设备信息
-        device = user.uid_rtsp.get(uid=endpoint_id)
-        password = device.password or ''
-        region = getattr(device, 'region', region)
-        rtsp_path = device.rtsp_url
-
-        rtsp_domain = get_domain_by_region(region).split('//')[-1]
-        rtsp_url = f'rtsp://{rtsp_domain}:8554/{rtsp_path}'
-
-        if '_' in endpoint_id:
-            uid_base, channel = endpoint_id.rsplit('_', 1)
-        else:
-            uid_base = endpoint_id
-            channel  = '0'
-
-        logger.info(f"[GetSnapshot] uid={uid_base}, channel={channel}, region={region}")
-
-        domain_name = get_domain_by_region(region)
-
-        # 启动线程异步下发快照指令
-        threading.Thread(
-            target=send_snapshot_command,
-            args=(domain_name, uid_base, password, rtsp_url, channel, logger),
-            daemon=True
-        ).start()
-        logger.info(f"[GetSnapshot] 已启动快照指令线程 → domain={domain_name}, UID={uid_base}")
-
-        #  返回 DeferredResponse
-        return JsonResponse({
-            "event": {
-                "header": {
-                    "namespace": "Alexa",
-                    "name": "DeferredResponse",
-                    "messageId": get_uuid(),
-                    "correlationToken": correlation_token,
-                    "payloadVersion": "3"
-                },
-                "endpoint": {
-                    "endpointId": endpoint_id
-                },
-                "payload": {
-                    "estimatedDeferralInSeconds": 7
-                }
-            }
-        })
+            logger.error('调用远程MQTT接口异常: %s', e)
+            return False