Bläddra i källkod

异步下发推流指令

locky 3 månader sedan
förälder
incheckning
55268657fd
2 ändrade filer med 73 tillägg och 48 borttagningar
  1. 1 1
      azoauth/urls.py
  2. 72 47
      controller/index.py

+ 1 - 1
azoauth/urls.py

@@ -34,7 +34,7 @@ urlpatterns = [
 
     path('oa2/rtspStart', index.oa2RtspStartView.as_view()),            # 通知摄像头设备推流
     path('oa2/powerController', index.powerController.as_view()),       # 控制智能插座开关
-    path('oa2/rtc', index.RtcController.as_view()),                     # rtc
+    re_path('oa2/(?P<operation>.*)', index.RtcController.as_view()),                     # rtc
 
     url(r'^deviceStatus/(?P<operation>.*)$', deviceStatus.deviceStatus.as_view()),  # 更新设备信息等接口
     url(r'^vseesTest/(?P<operation>.*)', index.VesseTest.as_view()),  # test

+ 72 - 47
controller/index.py

@@ -1,6 +1,7 @@
 import json
 import logging
 import subprocess
+import threading
 import time
 from datetime import datetime
 
@@ -334,7 +335,7 @@ class oa2RtspStartView(TemplateView):
         logger = logging.getLogger('django')
         logger.info('------{} 开始向设备下发推流指令------'.format(skill_name))
 
-        # 此处后续应该用异步发送指令
+        # 此处用线程异步发送指令
         if int(st) == 1:
             send_flag = self.runSendStop(UID, PWD, MSG)
             logger.info('----------send_flag---st=1-----------------')
@@ -343,32 +344,12 @@ class oa2RtspStartView(TemplateView):
             else:
                 return JsonResponse({'msg': 'stop no', 'code': 0})
 
-        # pushtool指令
-        command = "./pushtool {UID} {PWD} {MSG} 1 {channel}".format(UID=UID, PWD=PWD, MSG=MSG, channel=channel)
-        # 请求MQTT发布消息
-        url = '{}/iot/requestPublishMessage'.format(domain_name)
-        requests_data = {'UID': UID, 'rtsp': MSG, 'enable': '1'}  # 1: 开始推流,0: 停止推流; channel: 推流通道
-        r = requests.post(url, requests_data)
-        if r.status_code == 200:
-            res = r.json()
-            logger.info('请求MQTT发布消息参数:{},result_code: {}'.format(requests_data, res['result_code']))
-            if res['result_code'] == 0:
-                logger.info('请求MQTT下发指令成功---正式服')
-            elif res['result_code'] == 10044:
-                url = '{}/iot/requestPublishMessage'.format(SERVER_PREFIX_TEST)  # 测试服务器
-                r = requests.post(url, requests_data)
-                if r.status_code == 200:
-                    res = r.json()
-                    if res['result_code'] == 0:
-                        logger.info('请求MQTT下发指令成功---测试服')
-                    else:
-                        self.runSendRtspMsg(logger, region, command)
-                else:
-                    self.runSendRtspMsg(logger, region, command)
-            else:
-                self.runSendRtspMsg(logger, region, command)
-        else:  # 使用pushtool通知设备推流
-            self.runSendRtspMsg(logger, region, command)
+        # 创建并启动线程
+        thread = threading.Thread(
+            target=self.send_rtsp_command_thread,
+            args=(domain_name, UID, PWD, MSG, channel, logger, region)
+        )
+        thread.start()
 
         # 拉流地址
         rtsp_uri = '{}://{}:443/{}'.format(RTSP_PREFIX, RESP_SERVER_DOMAIN, stream_name)
@@ -398,6 +379,41 @@ class oa2RtspStartView(TemplateView):
         logger.info('------------返回控制摄像头的信息---------------: {}'.format(res_json))
         return JsonResponse(res_json, safe=False)
 
+    def send_rtsp_command_thread(self, domain_name, UID, PWD, MSG, channel, logger, region):
+        """异步发送RTSP命令的线程函数"""
+        command = "./pushtool {UID} {PWD} {MSG} 1 {channel}".format(
+            UID=UID, PWD=PWD, MSG=MSG, channel=channel)
+        
+        # 请求MQTT发布消息
+        url = '{}/iot/requestPublishMessage'.format(domain_name)
+        requests_data = {'UID': UID, 'rtsp': MSG, 'enable': '1'}
+        
+        try:
+            r = requests.post(url, requests_data)
+            if r.status_code == 200:
+                res = r.json()
+                logger.info('请求MQTT发布消息参数:{},result_code: {}'.format(requests_data, res['result_code']))
+                if res['result_code'] == 0:
+                    logger.info('请求MQTT下发指令成功---正式服')
+                elif res['result_code'] == 10044:
+                    url = '{}/iot/requestPublishMessage'.format(SERVER_PREFIX_TEST)
+                    r = requests.post(url, requests_data)
+                    if r.status_code == 200:
+                        res = r.json()
+                        if res['result_code'] == 0:
+                            logger.info('请求MQTT下发指令成功---测试服')
+                        else:
+                            self.runSendRtspMsg(logger, region, command)
+                    else:
+                        self.runSendRtspMsg(logger, region, command)
+                else:
+                    self.runSendRtspMsg(logger, region, command)
+            else:
+                self.runSendRtspMsg(logger, region, command)
+        except Exception as e:
+            logger.error('发送RTSP命令异常: {}'.format(str(e)))
+            self.runSendRtspMsg(logger, region, command)
+
     def runReqRtspMsg(self, UID, PWD, MSG):
         request_url = 'http://localhost:5000/?UID={UID}&MSG={MSG}&CMD=1&PWD={PWD}'. \
             format(UID=UID, PWD=PWD, MSG=MSG)
@@ -708,22 +724,28 @@ class powerController(TemplateView):
 
 
 class RtcController(TemplateView):
-    def post(self, request, *args, **kwargs):
+    def get(self, request, *args, **kwargs):
         request.encoding = 'utf-8'
-        request_dict = request.POST
-        return self.validate(request_dict)
+        operation = kwargs.get('operation')
+        return self.validation(request.GET, operation)
 
-    def get(self, request, *args, **kwargs):
+    def post(self, request, *args, **kwargs):
         request.encoding = 'utf-8'
-        request_dict = request.GET
-        return self.validate(request_dict)
+        operation = kwargs.get('operation')
+        return self.validation(request.POST, operation)
 
-    def validate(self, request_dict):
+    def validation(self, request_dict, operation):
+        if operation == 'rtc':
+            return self.rtc(request_dict)
+
+    @classmethod
+    def rtc(cls, request_dict):
         uid = request_dict.get("uid")
         access_token = request_dict.get("access_token")
         skill_name = request_dict.get("skill_name")
+        offer_sdp = request_dict.get("offer_sdp")
 
-        if not all([uid, access_token, skill_name]):
+        if not all([uid, access_token, skill_name, offer_sdp]):
             return JsonResponse({'错误': '缺少参数'})
 
         user_qs = UserModel.objects.filter(access_token=access_token)
@@ -777,18 +799,21 @@ class RtcController(TemplateView):
                     logger.info('请求MQTT下发指令成功---测试服')
 
             # 获取SDP
-            params = {
-                'src': uid,
-            }
-            r = requests.get(url=rtc_url, params=params, timeout=30)
-            assert r.status_code == 200
-            res = r.json()
-            # 遍历producers数组,查找包含sdp字段的对象
-            sdp = ''
-            for producer in res['producers']:
-                if 'sdp' in producer:
-                    sdp = producer['sdp']
-                    break
+            sdp = cls.handle_alexa_offer(offer_sdp, rtsp)
+
+            # params = {
+            #     'src': uid,
+            # }
+            # r = requests.get(url=rtc_url, params=params, timeout=30)
+            # assert r.status_code == 200
+            # res = r.json()
+            # # 遍历producers数组,查找包含sdp字段的对象
+            # sdp = ''
+            # for producer in res['producers']:
+            #     if 'sdp' in producer:
+            #         sdp = producer['sdp']
+            #         break
+
             res_json = {
                 'SDP': sdp
             }