#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved. @AUTHOR: ASJRD018 @NAME: azoauth @software: PyCharm @DATE: 2020/1/13 17:01 @Version: python3.6 @MODIFY DECORD:ansjer dev @file: index.py @Contact: chanjunkai@163.com """ import json import threading import time import requests import logging from django.views.generic import TemplateView from django.shortcuts import render_to_response from django.http import JsonResponse from object.ResObject import ResObject from urllib.parse import urlencode import subprocess import uuid # from gevent.pool import Pool from model.models import UserModel, UidRtspModel, AlexaAuthModel, UserCountModel from object.ResponseObject import ResponseObject from object.tkObject import tkObject from service.CommonService import CommonService from object.RedisObject import RedisObject from azoauth.config import * class authView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = json.loads(request.body.decode('utf-8')) return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): logger = logging.getLogger('django') logger.info('------oa2/auth请求参数: {}------'.format(request_dict)) state = request_dict.get("state", '') client_id = request_dict.get("client_id", '') response_type = request_dict.get("response_type", '') scope = request_dict.get("scope", '') redirect_uri = request_dict.get("redirect_uri", '') context = { 'state': state, 'client_id': client_id, 'response_type': response_type, 'scope': scope, 'redirect_uri': redirect_uri, 'skill_name': 'zosi smart' } return render_to_response("login.html", context) # Anlapus登录 class authAnlapusView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = json.loads(request.body.decode('utf-8')) return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): state = request_dict.get("state", '') client_id = request_dict.get("client_id", '') response_type = request_dict.get("response_type", '') scope = request_dict.get("scope", '') redirect_uri = request_dict.get("redirect_uri", '') context = { 'state': state, 'client_id': client_id, 'response_type': response_type, 'scope': scope, 'redirect_uri': redirect_uri, 'skill_name': 'Anlapus' } return render_to_response("login_anlapus.html", context) class loginHandleView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = json.loads(request.body.decode('utf-8')) return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): response = ResObject() user = request_dict.get("user", '') pwd = request_dict.get("pwd", '') state = request_dict.get("state", '') client_id = request_dict.get("client_id", '') response_type = request_dict.get("response_type", '') scope = request_dict.get("scope", '') redirect_uri = request_dict.get("redirect_uri", '') skill_name = request_dict.get("skill_name", '') # 返回code logger = logging.getLogger('django') logger.info('------开始认证登录------') logger.info('redirect_uri: {}'.format(redirect_uri)) if client_id != 'azalexaclient' or response_type != 'code' or scope != 'profile': return response.json(10, res={'msg': 'error'}, extra={'msg': 'message wrong'}) auth_request_url = '{SERVER_PREFIX}/oalexa/auth'.format(SERVER_PREFIX=SERVER_PREFIX) requests_data = {'userName': user, 'userPwd': pwd} res = requests.post(url=auth_request_url, data=requests_data) if res.status_code != 200: return response.json(10, res={'错误': '请求响应异常'}) res_json = res.json() # 添加测试服务器测试 if res_json['result_code'] != 0: auth_request_url = '{SERVER_PREFIX}/oalexa/auth'.format(SERVER_PREFIX=SERVER_PREFIX_TEST) res = requests.post(url=auth_request_url, data=requests_data) if res.status_code != 200: return response.json(10, res={'错误': '请求响应异常'}) res_json = res.json() logger.info('请求服务器url: {}'.format(auth_request_url)) logger.info('服务器响应: {}'.format(res_json)) if res_json['result_code'] != 0: return response.json(10, res={'msg': 'error'}, extra={'msg': res_json['reason']}) nowTime = int(time.time()) code = CommonService.encrypt_data(32) userID = res_json['result']['userID'] user_qs = UserModel.objects.filter(userID=userID) if user_qs.exists(): user_qs.update(code=code, updTime=nowTime) else: UserModel.objects.create(userID=userID, code=code, addTime=nowTime, updTime=nowTime) year_month = str(time.strftime('%Y%m', time.localtime(nowTime))) # 获取当前年月 user_count_qs = UserCountModel.objects.filter(skill_name=skill_name, year_month=year_month).values('amount') if not user_count_qs.exists(): UserCountModel.objects.create(skill_name=skill_name, year_month=year_month, amount=1) else: # 用户数量+1 amount = user_count_qs[0]['amount'] + 1 user_count_qs.update(amount=amount) redirect_uri += '?code=' + code + '&state=' + state return response.json(0, res=redirect_uri) class oa2TokenView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' # request_dict = json.loads(request.body.decode('utf-8')) request_dict = request.POST return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): # 增加对code和client_id的校验代码,返回access_token和refresh_token code = request_dict.get("code", None) client_id = request_dict.get("client_id", None) refresh_token = request_dict.get("refresh_token", None) logger = logging.getLogger('django') logger.info('token-------------begin--------') logger.info(code) logger.info(client_id) logger.info(refresh_token) logger.info(request_dict) print('client_id:') print(client_id) user_qs = UserModel.objects.filter(code=code) if not user_qs.exists(): user_qs = UserModel.objects.filter(refresh_token=refresh_token) if user_qs.exists(): access_token = CommonService.encrypt_data(randomlength=32) refresh_token = CommonService.encrypt_data(randomlength=32) is_update = user_qs.update(access_token=access_token, refresh_token=refresh_token) print(is_update) if is_update: res_json = { "access_token": access_token, "token_type": "bearer", "expires_in": 3600, "refresh_token": refresh_token, # 'test': 'joker' } logger.info(res_json) return JsonResponse(res_json) else: logger.info({'msg': 'error'}) return JsonResponse({'msg': 'error'}) else: res_json = {'msg': 'code not exists'} return JsonResponse(res_json) def runSendRtspMsg_thread(UID, PWD, MSG): command = "./pushtool {UID} {PWD} {MSG} 1".format(UID=UID, PWD=PWD, MSG=MSG) print('command=>{command}'.format(command=command)) try: back = subprocess. \ Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=10) except Exception as e: return False else: print("back0----", back[0].decode()) # 注意需要进行解码操作,默认输出的是字节 print("back1----", back[1].decode()) # back是一个元祖,可以通过元祖取值的方式获取结果 return True class oa2RtspStartView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): st = request_dict.get("st", 0) uid = request_dict.get("id", '') access_token = request_dict.get("access_token", '') skill_name = request_dict.get("skill_name", 'zosi smart') user_qs = UserModel.objects.filter(access_token=access_token) if not user_qs.exists(): return JsonResponse({'错误': '用户数据不存在'}) ur_qs = UidRtspModel.objects.filter(uid=uid).values('uid', 'nick', 'rtsp_url', 'password', 'region') if not ur_qs.exists(): return JsonResponse({'错误': 'uid数据不存在'}) UID = ur_qs[0]['uid'] nick = ur_qs[0]['nick'] PWD = ur_qs[0]['password'] region = ur_qs[0]['region'] stream_name = ur_qs[0]['rtsp_url'] channel = '0' if '_' in UID: # 多通道设备 channel = UID[-1:] UID = UID[:-2] RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['EN'] MSG = '{}://{}:8554/{}'.format(RTSP_PREFIX, RESP_SERVER_DOMAIN, stream_name) logger = logging.getLogger('django') logger.info('------{} 开始向设备下发推流指令------'.format(skill_name)) # 此处后续应该用异步去发送指令 if int(st) == 1: send_flag = self.runSendStop(UID, PWD, MSG) logger.info('----------send_flag---st=1-----------------') if send_flag: return JsonResponse({'msg': 'stop yes', 'code': 0}) else: return JsonResponse({'msg': 'stop no', 'code': 0}) # 请求MQTT发布消息 url = '{}/iot/requestPublishMessage'.format(SERVER_PREFIX_TEST) # 测试服务器 requests_data = {'UID': UID, 'MSG': MSG+' 1'+channel} # 1: 开始推流,0: 停止推流; channel: 推流通道 r = requests.post(url, requests_data) if r.status_code != 200: return JsonResponse({'错误': '请求响应异常'}) res = r.json() logger.info('请求MQTT发布消息返回状态: {}'.format(res['result_code'])) if res['result_code'] == 0: logger.info('请求MQTT下发指令成功') elif res['result_code'] == 10043: command = "./pushtool {UID} {PWD} {MSG} 1 {channel}".format(UID=UID, PWD=PWD, MSG=MSG, channel=channel) logger.info('------------推流指令: {}---------------'.format(command)) self.runSendRtspMsg(logger, region, command) else: return JsonResponse({'错误': '请求MQTT发布消息异常'}) # 拉流地址 rtsp_uri = '{}://{}:443/{}'.format(RTSP_PREFIX, RESP_SERVER_DOMAIN, stream_name) stop_time = int(time.time()) + 2*60 expirationTime = time.strftime('%Y-%m-%dT%H:%MZ',time.localtime(stop_time)) res_json = { 'uid': UID, 'pwd': PWD, 'msg': MSG, 'uri': rtsp_uri, 'endpointId': uid, 'friendlyName': nick, 'manufacturerName': skill_name, 'expirationTime': expirationTime, 'description': 'Camera connected via {}'.format(skill_name), 'audioCodecs': 'ACC', 'videoCodecs': 'H264', 'protocols': ['RTSP'], 'idleTimeoutSeconds': 5, 'modelName': 'P1425-LE', 'authorizationTypes': ['NONE'], 'manufacturerId': 'zosi-ACCC8E5E7513', 'resolutions': {'width': 640, 'height': 360}, } logger.info('------------返回控制摄像头的信息---------------: {}'.format(res_json)) return JsonResponse(res_json, safe=False) def runReqRtspMsg(self, UID, PWD, MSG): request_url = 'http://localhost:5000/?UID={UID}&MSG={MSG}&CMD=1&PWD={PWD}'. \ format(UID=UID, PWD=PWD, MSG=MSG) res = requests.get(url=request_url) print(res) return True # 触发此方法,让摄像头推流到MSG流地址 def runSendRtspMsg(self, logger, region, command): if region == 'CN': logger.info('------------国内发送推流指令---------------') url = "http://52.83.252.41:7880/alexa/command?command={command}".format(command=command) # 请求国内服务器调用pushtool try: requests.get(url=url, timeout=2) except Exception as e: logger.info('请求国内服务器调用pushtool异常: {}'.format(repr(e))) else: logger.info('------------国外发送推流指令---------------') try: back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=2) logger.info('back: {}'.format(str(back[0].decode()) + str(back[1].decode()))) except Exception as e: logger.info('调用pushtool异常: {}'.format(repr(e))) def runSendStop(self, UID, PWD, MSG): command = "./pushtool {UID} {PWD} {MSG} 0".format(UID=UID, PWD=PWD, MSG=MSG) print('command=>{command}'.format(command=command)) try: back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=10) except Exception as e: return False else: print("back0----", back[0].decode()) # 注意需要进行解码操作,默认输出的是字节 print("back1----", back[1].decode()) # back是一个元祖,可以通过元祖取值的方式获取结果 return True class oa2DiscoveryDevice(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): # 增加对code和client_id的校验代码,返回access_token和refresh_token skill_name = request_dict.get("skill_name", 'zosi smart') access_token = request_dict.get("access_token", None) logger = logging.getLogger('django') logger.info('--------{} 开始搜索设备--------'.format(skill_name)) user_qs = UserModel.objects.filter(access_token=access_token) if not user_qs.exists(): return JsonResponse({'错误': '用户数据不存在'}) response = ResObject() user = user_qs[0] userID = user.userID logger.info('userID: {}'.format(userID)) #更新事件网关接口 alexAuth = AlexaAuthModel.objects.filter(token=access_token).order_by('-addTime') if alexAuth.exists(): auth_res = alexAuth.values() event_access_token = auth_res[0]['access_token'] event_refresh_token = auth_res[0]['refresh_token'] event_token = auth_res[0]['token'] event_expiresTime = auth_res[0]['expiresTime'] event_addTime = auth_res[0]['addTime'] event_updTime = auth_res[0]['updTime'] event_alexa_region = auth_res[0]['alexa_region'] AlexaAuthModel.objects.filter(userID=userID).delete() alexAuth.delete() logger.info('update_event_access_token') logger.info(event_token) AlexaAuthModel.objects.create( userID=userID, access_token=event_access_token, refresh_token=event_refresh_token, token=event_token, expiresTime=event_expiresTime, addTime=event_addTime, updTime=event_updTime, alexa_region=event_alexa_region, skill_name=skill_name, ) auth_request_url = '{}/oalexa/discoveryuid'.format(SERVER_PREFIX) requests_data = {'sid': 'admin', 'sst': 'admin', 'alexa_user_id': userID} res = requests.post(url=auth_request_url, data=requests_data) res_json = res.json() logger.info('正式服务器响应: {}'.format(res_json)) test_flag = False # 用来区分测试服务器,后面删掉 # 添加测试服务器测试 if res_json['result_code'] != 0: auth_request_url = '{}/oalexa/discoveryuid'.format(SERVER_PREFIX_TEST) res = requests.post(url=auth_request_url, data=requests_data) res_json = res.json() test_flag = True logger.info('请求服务器url: {}'.format(auth_request_url)) logger.info('服务器响应: {}'.format(res_json)) if res_json['result_code'] != 0: return response.json(0, res={'msg': 'error'}) uid_arr = res_json['result']['uid_arr'] rtko = tkObject(rank=1) now_time = int(time.time()) user.uid_rtsp.clear() res_json = [] uid_rtsp_id_list = [] for uid_a in uid_arr: uid = uid_a['uid'] nick = uid_a['nick'] rtsp_url = rtko.encrypt(data=uid) region = 'CN' if uid_a['region'] == 'CN' else 'EN' if test_flag: multi_channel = uid_a['multi_channel'] if multi_channel: # 多通道设备: uid_通道号 uid += '_' + str(uid_a['channel']) try: uid_rtsp_qs = UidRtspModel.objects.get(uid=uid) except UidRtspModel.DoesNotExist: uid_rtsp_qs = UidRtspModel.objects.create(uid=uid, nick=nick, region=region, password=uid_a['password'], rtsp_url=rtsp_url, addTime=now_time, updTime=now_time) else: uid_rtsp_qs.nick = nick uid_rtsp_qs.region = region uid_rtsp_qs.password = uid_a['password'] uid_rtsp_qs.save() uid_rtsp_id_list.append(uid_rtsp_qs.id) RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA[region] rtsp_uri = '{}://{}:443/{}'.format(RTSP_PREFIX, RESP_SERVER_DOMAIN, rtsp_url) ur_data = { 'uri': rtsp_uri, 'endpointId': uid, 'friendlyName': nick, 'manufacturerName': skill_name, 'description': 'Camera connected via {}'.format(skill_name), 'protocols': ['RTSP'], 'audioCodecs': ['ACC'], 'videoCodecs': ['H264'], 'modelName': 'P1425-LE', 'authorizationTypes': ['NONE'], 'manufacturerId': 'zosi-ACCC8E5E7513', 'resolutions': [{'width': 1280, 'height': 720}], } res_json.append(ur_data) user.uid_rtsp.add(*uid_rtsp_id_list) logger.info('搜索设备返回值: {}'.format(res_json)) return JsonResponse(res_json, safe=False) #新增 def testRunSendStop(request): request.encoding = 'utf-8' if request.method == 'GET': request_dict = request.GET UID=request_dict.get('UID', None) PWD=request_dict.get('PWD', None) MSG=request_dict.get('MSG', None) command = "./pushtool {UID} {PWD} {MSG} 0".format(UID=UID, PWD=PWD, MSG=MSG) print('command=>{command}'.format(command=command)) try: back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=10) except Exception as e: return False else: print("back0----", back[0].decode()) # 注意需要进行解码操作,默认输出的是字节 print("back1----", back[1].decode()) # back是一个元祖,可以通过元祖取值的方式获取结果 return JsonResponse({'msg': 'stop Stream', 'code': 0}) from datetime import datetime def testRunStream(request): request.encoding = 'utf-8' if request.method == 'GET': request_dict = request.GET UID=request_dict.get('UID', None) PWD=request_dict.get('PWD', None) MSG=request_dict.get('MSG', None) time1=datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] command = "./pushtool {UID} {PWD} {MSG} 1".format(UID=UID, PWD=PWD, MSG=MSG) print('command=>{command}'.format(command=command)) try: back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=10) time2=datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] # print("时间:%s"%datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) except Exception as e: return repr(e) else: print("back0----", back[0].decode()) # 注意需要进行解码操作,默认输出的是字节 print("back1----", back[1].decode()) # back是一个元祖,可以通过元祖取值的方式获取结果 #return str(back[0].decode()) + str(back[1].decode()) return JsonResponse({'msg': "star is %s"%time1+",end is %s"%time2, 'code': 0}) # return JsonResponse({'msg': "run stream", 'code': 0}) #测试是否正常接口 def test(request): return JsonResponse({'msg': 'Server running normal', 'code': 0}) #test接口 def loadBalancingServer(request): try: res1 = requests.get('http://rtsp.zositech.com:10008/api/v1/players', timeout=5) except Exception as e: res1 = -1 time1=datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] print("时间1:%s"%time1) # ---------压测接口demo------------ redisObj = RedisObject(db=1) key1 = 'rtsp.zositech.com' key2 = '18.222.107.129' redis_data1 = redisObj.get_data(key1) redis_data2 = redisObj.get_data(key2) if redis_data1 and redis_data2: print("进缓存") mm=min(redis_data1,redis_data2) if redis_data1 == mm and redis_data1 != -1: RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['EN'] elif redis_data2 == mm and redis_data2 != -1: RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['EN1'] # ---------/压测demo------------ # else: # print("第一次") # res1 = requests.get('http://rtsp.zositech.com:10008/api/v1/players', timeout=0.001) # res2 = requests.get('http://18.222.107.129:10008/api/v1/players', timeout=0.001) # res_data1 = res1.json() # res_data2 = res1.json() # total1 = res_data1['total'] # total2 = res_data2['total'] # print(total1) # print(total2) # redisObj8.set_data(key=key1, val=total1, expire=30) # redisObj8.set_data(key=key2, val=total2, expire=30) # mm=min(total1,total2) # print(mm) # if total1 == mm: # RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['EN'] # else: # RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['EN1'] return JsonResponse({'msg': 'The number of people online is :%s'%res1, 'code': 0}) class powerController(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST return self.power_controller(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.power_controller(request_dict) def power_controller(self, request_dict): uid = request_dict.get('uid', '') access_token = request_dict.get('access_token', '') skill_name = request_dict.get('skill_name', 'zosi smart') power_controller = request_dict.get('power_controller', '') user_qs = UserModel.objects.filter(access_token=access_token) if not user_qs.exists(): return JsonResponse({'result_code': '500' , '错误': '用户数据不存在'}) ur_qs = UidRtspModel.objects.filter(uid=uid).values('uid', 'nick', 'rtsp_url', 'password', 'region') if not ur_qs.exists(): return JsonResponse({'result_code': '500', '错误': 'uid数据不存在'}) logger = logging.getLogger('django') logger.info('{} 唤醒设备 {}'.format(skill_name, uid)) # 请求MQTT发布消息 url = '{}/iot/requestPublishMessage'.format(SERVER_PREFIX_TEST) # 测试服务器 requests_data = {'UID': uid, 'MSG': power_controller} # TurnOn, TurnOff r = requests.post(url, requests_data) if r.status_code != 200: return JsonResponse({'result_code': '500', '错误': '请求响应异常'}) res = r.json() logger.info('请求MQTT发布消息返回状态: {}'.format(res['result_code'])) if res['result_code'] != 0: logger.info('请求MQTT下发指令失败') return JsonResponse({'result_code': '500', '错误': '请求MQTT下发指令失败'}) else: logger.info('请求MQTT下发指令成功') return JsonResponse({'result_code': '0'})