#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved. @AUTHOR: ASJRD018 @NAME: azoauth @software: PyCharm @DATE: 2020/1/13 17:01 @Version: python3.6 @MODIFY DECORD:ansjer dev @file: index.py @Contact: chanjunkai@163.com """ import json import threading import time import requests import logging from django.views.generic import TemplateView from django.shortcuts import render_to_response from django.http import JsonResponse from object.ResObject import ResObject from urllib.parse import urlencode import subprocess import uuid # from gevent.pool import Pool from model.models import UserModel, UidRtspModel, AlexaAuthModel, UserCountModel, SwitchModel from object.ResponseObject import ResponseObject from object.tkObject import tkObject from service.CommonService import CommonService from object.RedisObject import RedisObject from azoauth.config import * from datetime import datetime class authView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = json.loads(request.body.decode('utf-8')) return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): logger = logging.getLogger('django') logger.info('------oa2/auth请求参数: {}------'.format(request_dict)) state = request_dict.get("state", '') client_id = request_dict.get("client_id", '') response_type = request_dict.get("response_type", '') scope = request_dict.get("scope", '') redirect_uri = request_dict.get("redirect_uri", '') context = { 'state': state, 'client_id': client_id, 'response_type': response_type, 'scope': scope, 'redirect_uri': redirect_uri, 'skill_name': 'zosi smart' } return render_to_response("login.html", context) # Anlapus登录 class authAnlapusView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = json.loads(request.body.decode('utf-8')) return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): state = request_dict.get("state", '') client_id = request_dict.get("client_id", '') response_type = request_dict.get("response_type", '') scope = request_dict.get("scope", '') redirect_uri = request_dict.get("redirect_uri", '') context = { 'state': state, 'client_id': client_id, 'response_type': response_type, 'scope': scope, 'redirect_uri': redirect_uri, 'skill_name': 'Anlapus' } return render_to_response("login_anlapus.html", context) # loocam登录 class authLoocamView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = json.loads(request.body.decode('utf-8')) return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): state = request_dict.get("state", '') client_id = request_dict.get("client_id", '') response_type = request_dict.get("response_type", '') scope = request_dict.get("scope", '') redirect_uri = request_dict.get("redirect_uri", '') context = { 'state': state, 'client_id': client_id, 'response_type': response_type, 'scope': scope, 'redirect_uri': redirect_uri, 'skill_name': 'loocam' } logger = logging.getLogger('django') logger.info('loocam请求登录网页,参数为{}'.format(context)) return render_to_response("login_loocam.html", context) class loginHandleView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = json.loads(request.body.decode('utf-8')) return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): response = ResObject() user = request_dict.get("user", '') pwd = request_dict.get("pwd", '') state = request_dict.get("state", '') client_id = request_dict.get("client_id", '') response_type = request_dict.get("response_type", '') scope = request_dict.get("scope", '') redirect_uri = request_dict.get("redirect_uri", '') skill_name = request_dict.get("skill_name", '') # 返回code logger = logging.getLogger('django') logger.info('------开始认证登录------') logger.info('userName: {}, userPwd: {}, redirect_uri: {}'.format(user, pwd, redirect_uri)) if client_id != 'azalexaclient' or response_type != 'code' or scope != 'profile': return response.json(10, res={'msg': 'error'}, extra={'msg': 'message wrong'}) auth_request_url = '{SERVER_PREFIX}/oalexa/auth'.format(SERVER_PREFIX=SERVER_PREFIX) requests_data = {'userName': user, 'userPwd': pwd} res = requests.post(url=auth_request_url, data=requests_data) if res.status_code != 200: return response.json(10, res={'错误': '请求响应异常'}) res_json = res.json() # 添加测试服务器测试 if res_json['result_code'] != 0: auth_request_url = '{SERVER_PREFIX}/oalexa/auth'.format(SERVER_PREFIX=SERVER_PREFIX_TEST) res = requests.post(url=auth_request_url, data=requests_data) if res.status_code != 200: return response.json(10, res={'错误': '请求响应异常'}) res_json = res.json() logger.info('请求服务器url: {}'.format(auth_request_url)) logger.info('服务器响应: {}'.format(res_json)) if res_json['result_code'] != 0: return response.json(10, res={'msg': 'error'}, extra={'msg': res_json['reason']}) nowTime = int(time.time()) code = CommonService.encrypt_data(32) userID = res_json['result']['userID'] user_qs = UserModel.objects.filter(userID=userID) if user_qs.exists(): user_qs.update(code=code, updTime=nowTime) else: UserModel.objects.create(userID=userID, code=code, addTime=nowTime, updTime=nowTime) year_month = str(time.strftime('%Y%m', time.localtime(nowTime))) # 获取当前年月 user_count_qs = UserCountModel.objects.filter(skill_name=skill_name, year_month=year_month).values('amount') if not user_count_qs.exists(): UserCountModel.objects.create(skill_name=skill_name, year_month=year_month, amount=1) else: # 用户数量+1 amount = user_count_qs[0]['amount'] + 1 user_count_qs.update(amount=amount) redirect_uri += '?code=' + code + '&state=' + state return response.json(0, res=redirect_uri) class oa2TokenView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' # request_dict = json.loads(request.body.decode('utf-8')) request_dict = request.POST return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): # 增加对code和client_id的校验代码,返回access_token和refresh_token code = request_dict.get("code", None) client_id = request_dict.get("client_id", None) refresh_token = request_dict.get("refresh_token", None) logger = logging.getLogger('django') logger.info('token-------------begin--------') logger.info(code) logger.info(client_id) logger.info(refresh_token) logger.info(request_dict) print('client_id:') print(client_id) user_qs = UserModel.objects.filter(code=code) if not user_qs.exists(): user_qs = UserModel.objects.filter(refresh_token=refresh_token) if user_qs.exists(): access_token = CommonService.encrypt_data(randomlength=32) refresh_token = CommonService.encrypt_data(randomlength=32) is_update = user_qs.update(access_token=access_token, refresh_token=refresh_token) print(is_update) if is_update: res_json = { "access_token": access_token, "token_type": "bearer", "expires_in": 3600, "refresh_token": refresh_token, # 'test': 'joker' } logger.info(res_json) return JsonResponse(res_json) else: logger.info({'msg': 'error'}) return JsonResponse({'msg': 'error'}) else: res_json = {'msg': 'code not exists'} return JsonResponse(res_json) def runSendRtspMsg_thread(UID, PWD, MSG): command = "./pushtool {UID} {PWD} {MSG} 1".format(UID=UID, PWD=PWD, MSG=MSG) print('command=>{command}'.format(command=command)) try: back = subprocess. \ Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=10) except Exception as e: return False else: print("back0----", back[0].decode()) # 注意需要进行解码操作,默认输出的是字节 print("back1----", back[1].decode()) # back是一个元祖,可以通过元祖取值的方式获取结果 return True class oa2RtspStartView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): st = request_dict.get("st", 0) uid = request_dict.get("id", '') access_token = request_dict.get("access_token", '') skill_name = request_dict.get("skill_name", 'zosi smart') user_qs = UserModel.objects.filter(access_token=access_token) if not user_qs.exists(): return JsonResponse({'错误': '用户数据不存在'}) ur_qs = UidRtspModel.objects.filter(uid=uid).values('uid', 'nick', 'rtsp_url', 'password', 'region') if not ur_qs.exists(): return JsonResponse({'错误': 'uid数据不存在'}) UID = ur_qs[0]['uid'] nick = ur_qs[0]['nick'] PWD = ur_qs[0]['password'] region = ur_qs[0]['region'] stream_name = ur_qs[0]['rtsp_url'] channel = '0' if '_' in UID: # 多通道设备 channel = UID[-1:] UID = UID[:-2] RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA[region] MSG = '{}://{}:8554/{}'.format(RTSP_PREFIX, RESP_SERVER_DOMAIN, stream_name) logger = logging.getLogger('django') logger.info('------{} 开始向设备下发推流指令------'.format(skill_name)) # 此处后续应该用异步去发送指令 if int(st) == 1: send_flag = self.runSendStop(UID, PWD, MSG) logger.info('----------send_flag---st=1-----------------') if send_flag: return JsonResponse({'msg': 'stop yes', 'code': 0}) else: return JsonResponse({'msg': 'stop no', 'code': 0}) # pushtool指令 command = "./pushtool {UID} {PWD} {MSG} 1 {channel}".format(UID=UID, PWD=PWD, MSG=MSG, channel=channel) # 请求MQTT发布消息 url = '{}/iot/requestPublishMessage'.format(SERVER_PREFIX) requests_data = {'UID': UID, 'rtsp': MSG, 'enable': '1'} # 1: 开始推流,0: 停止推流; channel: 推流通道 r = requests.post(url, requests_data) if r.status_code == 200: res = r.json() logger.info('请求MQTT发布消息参数:{},result_code: {}'.format(requests_data, res['result_code'])) if res['result_code'] == 0: logger.info('请求MQTT下发指令成功---正式服') elif res['result_code'] == 10044: url = '{}/iot/requestPublishMessage'.format(SERVER_PREFIX_TEST) # 测试服务器 r = requests.post(url, requests_data) if r.status_code == 200: res = r.json() if res['result_code'] == 0: logger.info('请求MQTT下发指令成功---测试服') else: self.runSendRtspMsg(logger, region, command) else: self.runSendRtspMsg(logger, region, command) else: self.runSendRtspMsg(logger, region, command) else: # 使用pushtool通知设备推流 self.runSendRtspMsg(logger, region, command) # 拉流地址 rtsp_uri = '{}://{}:443/{}'.format(RTSP_PREFIX, RESP_SERVER_DOMAIN, stream_name) stop_time = int(time.time()) + 2 * 60 expirationTime = time.strftime('%Y-%m-%dT%H:%MZ', time.localtime(stop_time)) res_json = { 'uid': UID, 'pwd': PWD, 'msg': MSG, 'uri': rtsp_uri, 'endpointId': uid, 'friendlyName': nick, 'manufacturerName': skill_name, 'expirationTime': expirationTime, 'description': 'Camera connected via {}'.format(skill_name), 'audioCodecs': 'ACC', 'videoCodecs': 'H264', 'protocols': ['RTSP'], 'idleTimeoutSeconds': 5, 'modelName': 'P1425-LE', 'authorizationTypes': ['NONE'], 'manufacturerId': 'zosi-ACCC8E5E7513', 'resolutions': {'width': 640, 'height': 360}, } logger.info('------------返回控制摄像头的信息---------------: {}'.format(res_json)) return JsonResponse(res_json, safe=False) def runReqRtspMsg(self, UID, PWD, MSG): request_url = 'http://localhost:5000/?UID={UID}&MSG={MSG}&CMD=1&PWD={PWD}'. \ format(UID=UID, PWD=PWD, MSG=MSG) res = requests.get(url=request_url) print(res) return True # 触发此方法,让摄像头推流到MSG流地址 def runSendRtspMsg(self, logger, region, command): logger.info('------------推流指令: {}---------------'.format(command)) if region == 'CN': logger.info('------------国内发送推流指令---------------') url = "http://52.83.252.41:7880/alexa/command?command={command}".format(command=command) # 请求国内服务器调用pushtool try: requests.get(url=url, timeout=2) except Exception as e: logger.info('请求国内服务器调用pushtool异常: {}'.format(repr(e))) else: logger.info('------------国外发送推流指令---------------') try: back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=2) logger.info('back: {}'.format(str(back[0].decode()) + str(back[1].decode()))) except Exception as e: self.runSendRtspMsg(logger, 'CN', command) # 调用失败时尝试用国内的发送 logger.info('调用pushtool异常: {}'.format(repr(e))) def runSendStop(self, UID, PWD, MSG): command = "./pushtool {UID} {PWD} {MSG} 0".format(UID=UID, PWD=PWD, MSG=MSG) print('command=>{command}'.format(command=command)) try: back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=10) except Exception as e: return False else: print("back0----", back[0].decode()) # 注意需要进行解码操作,默认输出的是字节 print("back1----", back[1].decode()) # back是一个元祖,可以通过元祖取值的方式获取结果 return True class oa2DiscoveryDevice(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): # 增加对code和client_id的校验代码,返回access_token和refresh_token skill_name = request_dict.get("skill_name", 'zosi smart') access_token = request_dict.get("access_token", None) logger = logging.getLogger('django') logger.info('--------{} 开始搜索设备--------'.format(skill_name)) user_qs = UserModel.objects.filter(access_token=access_token) if not user_qs.exists(): return JsonResponse({'错误': '用户数据不存在'}) response = ResObject() user = user_qs[0] userID = user.userID logger.info('userID: {}'.format(userID)) # 更新事件网关接口 alexAuth = AlexaAuthModel.objects.filter(token=access_token).order_by('-addTime') if alexAuth.exists(): auth_res = alexAuth.values() event_access_token = auth_res[0]['access_token'] event_refresh_token = auth_res[0]['refresh_token'] event_token = auth_res[0]['token'] event_expiresTime = auth_res[0]['expiresTime'] event_addTime = auth_res[0]['addTime'] event_updTime = auth_res[0]['updTime'] event_alexa_region = auth_res[0]['alexa_region'] AlexaAuthModel.objects.filter(userID=userID).delete() alexAuth.delete() logger.info('update_event_access_token') logger.info(event_token) AlexaAuthModel.objects.create( userID=userID, access_token=event_access_token, refresh_token=event_refresh_token, token=event_token, expiresTime=event_expiresTime, addTime=event_addTime, updTime=event_updTime, alexa_region=event_alexa_region, skill_name=skill_name, ) auth_request_url = '{}/oalexa/discoveryuid'.format(SERVER_PREFIX) requests_data = {'sid': 'admin', 'sst': 'admin', 'alexa_user_id': userID} res = requests.post(url=auth_request_url, data=requests_data) res_json = res.json() logger.info('正式服务器响应: {}'.format(res_json)) test_flag = False # 用来区分测试服务器,后面删掉 # 添加测试服务器测试 if res_json['result_code'] != 0: auth_request_url = '{}/oalexa/discoveryuid'.format(SERVER_PREFIX_TEST) res = requests.post(url=auth_request_url, data=requests_data) res_json = res.json() test_flag = True logger.info('请求服务器url: {}'.format(auth_request_url)) logger.info('服务器响应: {}'.format(res_json)) if res_json['result_code'] != 0: return response.json(0, res={'msg': 'error'}) uid_arr = res_json['result']['uid_arr'] rtko = tkObject(rank=1) now_time = int(time.time()) user.uid_rtsp.clear() res_json = [] uid_rtsp_id_list = [] for uid_a in uid_arr: uid = uid_a['uid'] nick = uid_a['nick'] rtsp_url = rtko.encrypt(data=uid) region = 'EU' if uid_a['region'] == 'EU' else 'EN' if test_flag: multi_channel = uid_a['multi_channel'] if multi_channel: # 多通道设备: uid_通道号 uid += '_' + str(uid_a['channel']) try: uid_rtsp_qs = UidRtspModel.objects.get(uid=uid) except UidRtspModel.DoesNotExist: uid_rtsp_qs = UidRtspModel.objects.create(uid=uid, nick=nick, region=region, password=uid_a['password'], rtsp_url=rtsp_url, addTime=now_time, updTime=now_time) else: uid_rtsp_qs.nick = nick uid_rtsp_qs.region = region uid_rtsp_qs.password = uid_a['password'] uid_rtsp_qs.save() uid_rtsp_id_list.append(uid_rtsp_qs.id) RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA[region] rtsp_uri = '{}://{}:443/{}'.format(RTSP_PREFIX, RESP_SERVER_DOMAIN, rtsp_url) ur_data = { 'uri': rtsp_uri, 'endpointId': uid, 'friendlyName': nick, 'manufacturerName': skill_name, 'description': 'Camera connected via {}'.format(skill_name), 'protocols': ['RTSP'], 'audioCodecs': ['ACC'], 'videoCodecs': ['H264'], 'modelName': 'P1425-LE', 'authorizationTypes': ['NONE'], 'manufacturerId': 'zosi-ACCC8E5E7513', 'resolutions': [{'width': 1280, 'height': 720}], } res_json.append(ur_data) user.uid_rtsp.add(*uid_rtsp_id_list) logger.info('搜索设备返回值: {}'.format(res_json)) return JsonResponse(res_json, safe=False) class oa2DiscoverySwitch(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): # 增加对code和client_id的校验代码,返回access_token和refresh_token skill_name = request_dict.get("skill_name", 'loocam') access_token = request_dict.get("access_token", None) logger = logging.getLogger('django') logger.info('--------{} 开始搜索设备--------'.format(skill_name)) user_qs = UserModel.objects.filter(access_token=access_token) if not user_qs.exists(): return JsonResponse({'错误': '用户数据不存在'}) response = ResObject() user = user_qs.first() userID = user.userID logger.info('userID: {}'.format(userID)) # 更新事件网关接口 alexAuth = AlexaAuthModel.objects.filter(token=access_token).order_by('-addTime') if alexAuth.exists(): auth_res = alexAuth.values() event_access_token = auth_res[0]['access_token'] event_refresh_token = auth_res[0]['refresh_token'] event_token = auth_res[0]['token'] event_expiresTime = auth_res[0]['expiresTime'] event_addTime = auth_res[0]['addTime'] event_updTime = auth_res[0]['updTime'] event_alexa_region = auth_res[0]['alexa_region'] AlexaAuthModel.objects.filter(userID=userID).delete() alexAuth.delete() logger.info('update_event_access_token') logger.info(event_token) AlexaAuthModel.objects.create( userID=userID, access_token=event_access_token, refresh_token=event_refresh_token, token=event_token, expiresTime=event_expiresTime, addTime=event_addTime, updTime=event_updTime, alexa_region=event_alexa_region, skill_name=skill_name, ) auth_request_url = '{}/oalexa/discoveryswitch'.format(SERVER_PREFIX) requests_data = {'sid': 'admin', 'sst': 'admin', 'alexa_user_id': userID} res = requests.post(url=auth_request_url, data=requests_data) res_json = res.json() logger.info('正式服务器响应: {}'.format(res_json)) # 添加测试服务器测试 if res_json['result_code'] != 0: auth_request_url = '{}/oalexa/discoveryswitch'.format(SERVER_PREFIX_TEST) res = requests.post(url=auth_request_url, data=requests_data) res_json = res.json() logger.info('请求服务器url: {}'.format(auth_request_url)) logger.info('服务器响应: {}'.format(res_json)) if res_json['result_code'] != 0: return response.json(0, res={'msg': 'error'}) switch_list = res_json['result']['switch_list'] now_time = int(time.time()) res_json = [] for switch in switch_list: serial_number = switch['uid'] nick = switch['nick'] region = 'EU' if switch['region'] == 'EU' else 'EN' switch_info_qs = SwitchModel.objects.filter(serial_number=serial_number, userID=userID) if switch_info_qs.exists(): switch_info_qs.update(nick=nick, region=region, updTime=now_time) else: SwitchModel.objects.create(serial_number=serial_number, nick=nick, region=region, addTime=now_time, updTime=now_time, userID=userID) ur_data = { 'endpointId': serial_number, 'friendlyName': nick, 'manufacturerName': skill_name, 'description': 'Camera connected via {}'.format(skill_name), 'modelName': 'P1425-LE', 'manufacturerId': 'zosi-ACCC8E5E7513', } res_json.append(ur_data) logger.info('搜索设备返回值: {}'.format(res_json)) return JsonResponse(res_json, safe=False) # 新增 def testRunSendStop(request): request.encoding = 'utf-8' if request.method == 'GET': request_dict = request.GET UID = request_dict.get('UID', None) PWD = request_dict.get('PWD', None) MSG = request_dict.get('MSG', None) command = "./pushtool {UID} {PWD} {MSG} 0".format(UID=UID, PWD=PWD, MSG=MSG) print('command=>{command}'.format(command=command)) try: back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=10) except Exception as e: return False else: print("back0----", back[0].decode()) # 注意需要进行解码操作,默认输出的是字节 print("back1----", back[1].decode()) # back是一个元祖,可以通过元祖取值的方式获取结果 return JsonResponse({'msg': 'stop Stream', 'code': 0}) def testRunStream(request): request.encoding = 'utf-8' if request.method == 'GET': request_dict = request.GET UID = request_dict.get('UID', None) PWD = request_dict.get('PWD', None) MSG = request_dict.get('MSG', None) time1 = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] command = "./pushtool {UID} {PWD} {MSG} 1".format(UID=UID, PWD=PWD, MSG=MSG) print('command=>{command}'.format(command=command)) try: back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=10) time2 = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] # print("时间:%s"%datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) except Exception as e: return repr(e) else: print("back0----", back[0].decode()) # 注意需要进行解码操作,默认输出的是字节 print("back1----", back[1].decode()) # back是一个元祖,可以通过元祖取值的方式获取结果 # return str(back[0].decode()) + str(back[1].decode()) return JsonResponse({'msg': "star is %s" % time1 + ",end is %s" % time2, 'code': 0}) # return JsonResponse({'msg': "run stream", 'code': 0}) # 测试是否正常接口 def test(request): return JsonResponse({'msg': 'Server running normal', 'code': 0}) # test接口 def loadBalancingServer(request): try: res1 = requests.get('http://rtsp.zositech.com:10008/api/v1/players', timeout=5) except Exception as e: res1 = -1 time1 = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] print("时间1:%s" % time1) # ---------压测接口demo------------ redisObj = RedisObject(db=1) key1 = 'rtsp.zositech.com' key2 = '18.222.107.129' redis_data1 = redisObj.get_data(key1) redis_data2 = redisObj.get_data(key2) if redis_data1 and redis_data2: print("进缓存") mm = min(redis_data1, redis_data2) if redis_data1 == mm and redis_data1 != -1: RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['EN'] elif redis_data2 == mm and redis_data2 != -1: RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['EN1'] # ---------/压测demo------------ # else: # print("第一次") # res1 = requests.get('http://rtsp.zositech.com:10008/api/v1/players', timeout=0.001) # res2 = requests.get('http://18.222.107.129:10008/api/v1/players', timeout=0.001) # res_data1 = res1.json() # res_data2 = res1.json() # total1 = res_data1['total'] # total2 = res_data2['total'] # print(total1) # print(total2) # redisObj8.set_data(key=key1, val=total1, expire=30) # redisObj8.set_data(key=key2, val=total2, expire=30) # mm=min(total1,total2) # print(mm) # if total1 == mm: # RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['EN'] # else: # RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['EN1'] return JsonResponse({'msg': 'The number of people online is :%s' % res1, 'code': 0}) class powerController(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST return self.power_controller(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.power_controller(request_dict) def power_controller(self, request_dict): serial_number = request_dict.get('serial_number', '') access_token = request_dict.get('access_token', '') skill_name = request_dict.get('skill_name', 'loocam') power_controller = request_dict.get('power_controller', '') if not all([serial_number, access_token, skill_name]): return JsonResponse({'result_code': '500', '错误': '缺少参数'}) user_qs = UserModel.objects.filter(access_token=access_token) if not user_qs.exists(): return JsonResponse({'result_code': '500', '错误': '用户数据不存在'}) userID = user_qs.first().userID switch_qs = SwitchModel.objects.filter(serial_number=serial_number, userID=userID).values('nick', 'region') if not switch_qs.exists(): return JsonResponse({'result_code': '500', '错误': 'serial_number数据不存在'}) logger = logging.getLogger('django') logger.info('{} 唤醒设备 {}'.format(skill_name, serial_number)) # 请求MQTT发布消息 url = '{}/api/loocam/open/socket/alexa-socket-switch'.format(SERVER_PREFIX_TEST) # 测试服务器 requests_data = {'serial_number': serial_number, 'power_controller': power_controller} # TurnOn, TurnOff r = requests.post(url, requests_data) if r.status_code != 200: return JsonResponse({'result_code': '500', '错误': '请求响应异常'}) res = r.json() logger.info('请求MQTT发布消息返回状态: {}'.format(res['result_code'])) if res['result_code'] != 0: logger.info('请求MQTT下发指令失败') return JsonResponse({'result_code': '500', '错误': '请求MQTT下发指令失败'}) else: logger.info('请求MQTT下发指令成功') return JsonResponse({'result_code': '0'})