#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved. @AUTHOR: ASJRD018 @NAME: azoauth @software: PyCharm @DATE: 2020/1/13 17:01 @Version: python3.6 @MODIFY DECORD:ansjer dev @file: index.py @Contact: chanjunkai@163.com """ import json import time import requests import logging from django.views.generic import TemplateView from django.shortcuts import render_to_response from django.http import JsonResponse from object.ResObject import ResObject from urllib.parse import urlencode import subprocess import uuid # from gevent.pool import Pool from model.models import UserModel,UidRtspModel,AlexaAuthModel from object.tkObject import tkObject from service.CommonService import CommonService from object.RedisObject import RedisObject from azoauth.config import * class authView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = json.loads(request.body.decode('utf-8')) return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): state = request_dict.get("state", '') client_id = request_dict.get("client_id", '') response_type = request_dict.get("response_type", '') scope = request_dict.get("scope", '') redirect_uri = request_dict.get("redirect_uri", '') context = { 'state': state, 'client_id': client_id, 'response_type': response_type, 'scope': scope, 'redirect_uri': redirect_uri } return render_to_response("login.html", context) # return render_template('./login.html', **context) class loginHandleView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = json.loads(request.body.decode('utf-8')) return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): response = ResObject() user = request_dict.get("user", '') pwd = request_dict.get("pwd", '') state = request_dict.get("state", '') client_id = request_dict.get("client_id", '') response_type = request_dict.get("response_type", '') scope = request_dict.get("scope", '') redirect_uri = request_dict.get("redirect_uri", '') # 返回code print('userPwd---------------') logger = logging.getLogger('django') logger.info('login-------------begin') logger.info(client_id) logger.info(redirect_uri) logger.info(request_dict) print(user) print(pwd) auth_request_url = '{SERVER_PREFIX}/oalexa/auth'. \ format(SERVER_PREFIX=SERVER_PREFIX) print(auth_request_url) # auth_request_url = 'http://192.168.136.40:8077/oalexa/auth' requests_data = {'userName': user, 'userPwd': pwd} res = requests.post(url=auth_request_url, data=requests_data) print(res.json()) print(res.json()['result_code']) res_json = res.json() if res_json['result_code'] == 0: userID = res_json['result']['userID'] # uid_arr = res_json['result']['uid_arr'] nowTime = int(time.time()) user_qs = UserModel.objects.filter(userID=userID) if not user_qs.exists(): UserModel.objects.create( userID=userID, addTime=nowTime, updTime=nowTime) user_qs = UserModel.objects.filter(userID=userID) code = CommonService.encrypt_data(32) user_qs.update(code=code) redirect_uri = redirect_uri + '?code=' + code + '&state=' + state return response.json(0, res=redirect_uri) else: return response.json(10, res={'msg': 'error'}, extra={'msg': res_json['reason']}) class oa2TokenView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' # request_dict = json.loads(request.body.decode('utf-8')) request_dict = request.POST return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): # 增加对code和client_id的校验代码,返回access_token和refresh_token code = request_dict.get("code", None) client_id = request_dict.get("client_id", None) refresh_token = request_dict.get("refresh_token", None) from var_dump import var_dump logger = logging.getLogger('django') logger.info('token-------------begin') logger.info(code) logger.info(client_id) logger.info(refresh_token) logger.info(request_dict) print('tokenView---------------') print('refresh_token:') print(refresh_token) print('code:') print(code) print('client_id:') print(client_id) user_qs = UserModel.objects.filter(code=code) if not user_qs.exists(): user_qs = UserModel.objects.filter(refresh_token=refresh_token) if user_qs.exists(): access_token = CommonService.encrypt_data(randomlength=32) refresh_token = CommonService.encrypt_data(randomlength=32) is_update = user_qs.update(access_token=access_token, refresh_token=refresh_token) print(is_update) if is_update: res_json = { "access_token": access_token, "token_type": "bearer", "expires_in": 3600, "refresh_token": refresh_token } print(res_json) return JsonResponse(res_json) else: return JsonResponse({'msg': 'error'}) else: res_json = {'msg': 'code not exists'} print(res_json) return JsonResponse(res_json) import threading def runSendRtspMsg_thread(UID, PWD, MSG): command = "./pushtool {UID} {PWD} {MSG} 1".format(UID=UID, PWD=PWD, MSG=MSG) print('command=>{command}'.format(command=command)) try: back = subprocess. \ Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=10) except Exception as e: return False else: print("back0----", back[0].decode()) # 注意需要进行解码操作,默认输出的是字节 print("back1----", back[1].decode()) # back是一个元祖,可以通过元祖取值的方式获取结果 return True class oa2RtspStartView(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST # request_dict = json.loads(request.body.decode('utf-8')) return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): ''' VVDHCVBYDKFMJRWA111A ''' access_token = request_dict.get("access_token", '') st = request_dict.get("st", 0) id = request_dict.get("id", '') # redisObj = RedisObject(db=9) # key = '{UID}_rtsp_key'.format(UID=id) # redis_data = redisObj.get_data(key=key) # #redis_data = redisObj.del_data(key=key) # print(st) # if redis_data and st == 0: # #if redis_data: # print('select by redis data') # # time.sleep(3) # res_json = eval(redis_data) # return JsonResponse(res_json, safe=False) # user_qs = UserModel.objects.filter(access_token=access_token) user_qs = UserModel.objects.filter(access_token=access_token) if user_qs.exists(): print('select by mysql data') ur_qs = UidRtspModel.objects.filter(uid=id).values('uid', 'nick', 'rtsp_url', 'password', 'region') # ur_qs = UidRtspModel.objects.filter(usermodel__userID=userID,uid=) UID = ur_qs[0]['uid'] region = ur_qs[0]['region'] PWD = ur_qs[0]['password'] stream_name = ur_qs[0]['rtsp_url'] nick = ur_qs[0]['nick'] #RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA[region] # RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['CN'] RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['EN'] #如果是美国,则进行负载均衡 # if RESP_SERVER_DOMAIN == RESP_SERVER_DOMAIN_DATA['EN']: # redisObj1 = RedisObject(db=1) # key1 = RESP_SERVER_DOMAIN_DATA['EN'] # key2 = RESP_SERVER_DOMAIN_DATA['EN1'] # redis_data1 = redisObj1.get_data(key1) # redis_data2 = redisObj1.get_data(key2) # mm=min(redis_data1,redis_data2) # if redis_data1 == mm and redis_data1 != -1: # RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['EN'] # elif redis_data2 == mm and redis_data2 != -1: # RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['EN1'] MSG = '{RTSP_PREFIX}://{RESP_SERVER_DOMAIN}:{PORT_PREFIX}/{stream_name}'. \ format(RESP_SERVER_DOMAIN=RESP_SERVER_DOMAIN, stream_name=stream_name, PORT_PREFIX=PORT_PREFIX, RTSP_PREFIX=RTSP_PREFIX) #zlm # MSG = '{RTSP_PREFIX}://{RESP_SERVER_DOMAIN}:{PORT_PREFIX}/live/{stream_name}'. \ # format(RESP_SERVER_DOMAIN=RESP_SERVER_DOMAIN, # stream_name=stream_name, # PORT_PREFIX=PORT_PREFIX, # RTSP_PREFIX=RTSP_PREFIX) # po = Pool(10) # po.apply_async(self.runSendRtspMsg, (UID, PWD, MSG)) print(UID) print(PWD) print(MSG) logger = logging.getLogger('django') # logger.info('开始打印---------------------------') # logger.info(MSG) # logger.info(st) # send_flag = self.runSendStop(UID, PWD, MSG) # logger.info(send_flag) # if send_flag: # return JsonResponse({'msg': 'stop yes', 'code': 0}) # 此处后续应该用异步去发送指令 if int(st) == 1: send_flag = self.runSendStop(UID, PWD, MSG) logger.info('----------send_flag---st=1-----------------') if send_flag: return JsonResponse({'msg': 'stop yes', 'code': 0}) else: return JsonResponse({'msg': 'stop no', 'code': 0}) # threading.Thread(target=runSendRtspMsg_thread, args=(UID, PWD, MSG)).start() # send_flag = self.runReqRtspMsg(UID, PWD, MSG) send_flag = self.runSendRtspMsg(UID, PWD, MSG) logger.info('开始打印----------send_flag3.0-----------------') logger.info(send_flag) #E #拉流地址 rtsp_uri = '{RTSP_PREFIX}://{RESP_SERVER_DOMAIN}:443/{stream_name}'. \ format(RESP_SERVER_DOMAIN=RESP_SERVER_DOMAIN, stream_name=stream_name, RTSP_PREFIX=RTSP_PREFIX) #zlm443 # rtsp_uri = '{RTSP_PREFIX}://{RESP_SERVER_DOMAIN}:443/live/{stream_name}'. \ # format(RESP_SERVER_DOMAIN=RESP_SERVER_DOMAIN, # stream_name=stream_name, # RTSP_PREFIX=RTSP_PREFIX) #----------新增关闭流的线程 # stream_name = threading.Thread(target=self.runSendStop,args=(UID, PWD, MSG)) # stream_name.start() #----------/新增关闭流的线程 stop_time = int(time.time()) + 2*60 expirationTime = time.strftime('%Y-%m-%dT%H:%MZ',time.localtime(stop_time)) res_json = { 'endpointId': id, 'manufacturerName': 'zosi smart', 'manufacturerId': 'zosi-ACCC8E5E7513', 'modelName': 'P1425-LE', 'friendlyName': nick, 'description': 'Camera connected via zosi smart', "expirationTime": expirationTime, "idleTimeoutSeconds": 5, #'resolutions': [{'width': 1280, 'height': 720}], 'resolutions': {'width': 640, 'height': 360}, 'videoCodecs': 'H264', 'audioCodecs': 'ACC', # 'audioCodecs': ['G711'], 'protocols': ['RTSP'], 'authorizationTypes': ['NONE'], 'uri': rtsp_uri, # 'msg': send_flag, 'uid':UID, 'pwd':PWD, 'msg':MSG } logger.info('开始打印------------返回控制摄像头的结果2.0---------------') logger.info(res_json) # redisObj.set_data(key=key, val=str(res_json), expire=15) return JsonResponse(res_json, safe=False) else: return JsonResponse({'msg': 'wrong'}) def runReqRtspMsg(self, UID, PWD, MSG): request_url = 'http://localhost:5000/?UID={UID}&MSG={MSG}&CMD=1&PWD={PWD}'. \ format(UID=UID, PWD=PWD, MSG=MSG) res = requests.get(url=request_url) print(res) return True #触发此方法,让摄像头推流到MSG流地址 def runSendRtspMsg(self, UID, PWD, MSG): # return True logger = logging.getLogger('django') logger.info('开始打印------------摄像头推流到MSG流地址的结果---------------') command = "./pushtool {UID} {PWD} {MSG} 1".format(UID=UID, PWD=PWD, MSG=MSG) # print('command=>{command}'.format(command=command)) command_url = "http://47.115.134.251/index.php?command={command}".format(command=command) logger.info("-----------command_url={command_url}".format(command_url=command_url)) try: logger.info("-----------------开始调用接口") exec_res = requests.get(url=command_url, timeout=2) res = exec_res.json() logger.info('------------调用接口执行发送命令---------------') logger.info(res) if res['code'] == 200: return True except Exception as e: logger.info('------------except__e-------------') logger.info(e) return False # try: # back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ # communicate(timeout=2) # except Exception as e: # logger.info('开始打印------------reprrrrrrrrrr--------except-------') # logger.info(repr(e)) # return repr(e) # else: # print("back0----", back[0].decode()) # 注意需要进行解码操作,默认输出的是字节 # print("back1----", back[1].decode()) # back是一个元祖,可以通过元祖取值的方式获取结果 # # logger.info(UID) # logger.info(back[0].decode()) # logger.info(back[1].decode()) # # return str(back[0].decode()) + str(back[1].decode()) # return True def runSendStop(self, UID, PWD, MSG): command = "./pushtool {UID} {PWD} {MSG} 0".format(UID=UID, PWD=PWD, MSG=MSG) print('command=>{command}'.format(command=command)) try: back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=10) except Exception as e: return False else: print("back0----", back[0].decode()) # 注意需要进行解码操作,默认输出的是字节 print("back1----", back[1].decode()) # back是一个元祖,可以通过元祖取值的方式获取结果 return True class oa2DiscoveryDevice(TemplateView): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST return self.validate(request_dict) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET return self.validate(request_dict) def validate(self, request_dict): # 增加对code和client_id的校验代码,返回access_token和refresh_token access_token = request_dict.get("access_token", None) region = request_dict.get("region", 'cn') logger = logging.getLogger('django') try: user_qs = UserModel.objects.get(access_token=access_token) except Exception as e: return JsonResponse({'res': '11'}) else: response = ResObject() userID = user_qs.userID #更新事件网关接口 alexAuth = AlexaAuthModel.objects.filter(token=access_token) if alexAuth.exists(): auth_res = alexAuth.values() event_access_token = auth_res[0]['access_token'] event_refresh_token = auth_res[0]['refresh_token'] event_token = auth_res[0]['token'] event_expiresTime = auth_res[0]['expiresTime'] event_addTime = auth_res[0]['addTime'] event_updTime = auth_res[0]['updTime'] AlexaAuthModel.objects.filter(userID = userID).delete() alexAuth.delete() logger.info('update_event_access_token') logger.info(event_token) AlexaAuthModel.objects.create( userID = userID, access_token = event_access_token, refresh_token = event_refresh_token, token = event_token, expiresTime = event_expiresTime, addTime = event_addTime, updTime = event_updTime, ) auth_request_url = '{SERVER_PREFIX}/oalexa/discoveryuid'.format(SERVER_PREFIX=SERVER_PREFIX) requests_data = {'sid': 'admin', 'sst': 'admin', 'alexa_user_id': userID} res = requests.post(url=auth_request_url, data=requests_data) res_json = res.json() print(res_json) logger.info('开始打印---------------------------') logger.info(userID) logger.info(res_json) if res_json['result_code'] == 0: uid_arr = res_json['result']['uid_arr'] rtko = tkObject(rank=1) now_time = int(time.time()) user_qs.uid_rtsp.clear() res_json = [] uid_rtsp_id_list = [] for uid_a in uid_arr: uid = uid_a['uid'] if uid_a['region'] == 'CN': region = 'CN' else: region = 'EN' rtsp_url = rtko.encrypt(data=uid) try: uid_rtsp_qs = UidRtspModel.objects.get(uid=uid) except UidRtspModel.DoesNotExist: uid_rtsp_qs = UidRtspModel.objects.create(uid=uid_a['uid'], password=uid_a['password'], nick=uid_a['nick'], addTime=now_time, updTime=now_time, rtsp_url=rtsp_url, region=region) else: # if uid_rtsp_qs.password != uid_a['password']: uid_rtsp_qs.password = uid_a['password'] uid_rtsp_qs.nick = uid_a['nick'] uid_rtsp_qs.region = region uid_rtsp_qs.save() uid_rtsp_id_list.append(uid_rtsp_qs.id) RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA[region] rtsp_uri = '{RTSP_PREFIX}://{RESP_SERVER_DOMAIN}:443/{stream_name}'. \ format(RESP_SERVER_DOMAIN=RESP_SERVER_DOMAIN, stream_name=rtsp_url, RTSP_PREFIX=RTSP_PREFIX) ur_data = { 'endpointId': uid_a['uid'], 'manufacturerName': 'zosi smart', 'manufacturerId': 'zosi-ACCC8E5E7513', 'modelName': 'P1425-LE', 'friendlyName': uid_a['nick'], 'description': 'Camera connected via zosi smart', 'resolutions': [{'width': 1280, 'height': 720}], 'videoCodecs': ['H264'], 'audioCodecs': ['ACC'], 'protocols': ['RTSP'], 'authorizationTypes': ['NONE'], 'uri': rtsp_uri } res_json.append(ur_data) print(uid_rtsp_id_list) user_qs.uid_rtsp.add(*uid_rtsp_id_list) logger.info('________________471') logger.info(res_json) return JsonResponse(res_json, safe=False) else: return response.json(0, res={'msg': 'error'}) #新增 def testRunSendStop(request): request.encoding = 'utf-8' if request.method == 'GET': request_dict = request.GET UID=request_dict.get('UID', None) PWD=request_dict.get('PWD', None) MSG=request_dict.get('MSG', None) command = "./pushtool {UID} {PWD} {MSG} 0".format(UID=UID, PWD=PWD, MSG=MSG) print('command=>{command}'.format(command=command)) try: back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=10) except Exception as e: return False else: print("back0----", back[0].decode()) # 注意需要进行解码操作,默认输出的是字节 print("back1----", back[1].decode()) # back是一个元祖,可以通过元祖取值的方式获取结果 return JsonResponse({'msg': 'stop Stream', 'code': 0}) from datetime import datetime def testRunStream(request): request.encoding = 'utf-8' if request.method == 'GET': request_dict = request.GET UID=request_dict.get('UID', None) PWD=request_dict.get('PWD', None) MSG=request_dict.get('MSG', None) time1=datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] command = "./pushtool {UID} {PWD} {MSG} 1".format(UID=UID, PWD=PWD, MSG=MSG) print('command=>{command}'.format(command=command)) try: back = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE). \ communicate(timeout=10) time2=datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] # print("时间:%s"%datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]) except Exception as e: return repr(e) else: print("back0----", back[0].decode()) # 注意需要进行解码操作,默认输出的是字节 print("back1----", back[1].decode()) # back是一个元祖,可以通过元祖取值的方式获取结果 #return str(back[0].decode()) + str(back[1].decode()) return JsonResponse({'msg': "star is %s"%time1+",end is %s"%time2, 'code': 0}) # return JsonResponse({'msg': "run stream", 'code': 0}) #测试是否正常接口 def test(request): return JsonResponse({'msg': 'Server running normal', 'code': 0}) #test接口 def loadBalancingServer(request): try: res1 = requests.get('http://rtsp.zositech.org:10008/api/v1/players', timeout=5) except Exception as e: res1 = -1 time1=datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] print("时间1:%s"%time1) # ---------压测接口demo------------ redisObj = RedisObject(db=1) key1 = 'rtsp.zositech.org' key2 = '18.222.107.129' redis_data1 = redisObj.get_data(key1) redis_data2 = redisObj.get_data(key2) if redis_data1 and redis_data2: print("进缓存") mm=min(redis_data1,redis_data2) if redis_data1 == mm and redis_data1 != -1: RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['EN'] elif redis_data2 == mm and redis_data2 != -1: RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['EN1'] # ---------/压测demo------------ # else: # print("第一次") # res1 = requests.get('http://rtsp.zositech.org:10008/api/v1/players', timeout=0.001) # res2 = requests.get('http://18.222.107.129:10008/api/v1/players', timeout=0.001) # res_data1 = res1.json() # res_data2 = res1.json() # total1 = res_data1['total'] # total2 = res_data2['total'] # print(total1) # print(total2) # redisObj8.set_data(key=key1, val=total1, expire=30) # redisObj8.set_data(key=key2, val=total2, expire=30) # mm=min(total1,total2) # print(mm) # if total1 == mm: # RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['EN'] # else: # RESP_SERVER_DOMAIN = RESP_SERVER_DOMAIN_DATA['EN1'] return JsonResponse({'msg': 'The number of people online is :%s'%res1, 'code': 0}) ''' [{"endpointId":1,"manufacturerName":"zosi smart","manufacturerId":"zosi-ACCC8E5E7513","modelName":"P1425-LE","friendlyName":"Camera VVDHCV","description":"Camera VVDHCV","resolutions":[{"width":1280,"height":720}],"videoCodecs":["H264"],"audioCodecs":["ACC"],"protocols":["RTSP"],"authorizationTypes":["NONE"],"uri":"rtsp://rtsp.zositech.xyz:8554/WVZsWkVTRU5XUWxsRVMwWk5TbEpYUVRFeE1VRT1p"},{"endpointId":2,"manufacturerName":"zosi smart","manufacturerId":"zosi-ACCC8E5E7513","modelName":"P1425-LE","friendlyName":"Camera 5N6ZW8","description":"Camera 5N6ZW8","resolutions":[{"width":1280,"height":720}],"videoCodecs":["H264"],"audioCodecs":["ACC"],"protocols":["RTSP"],"authorizationTypes":["NONE"],"uri":"rtsp://rtsp.zositech.xyz:8554/Rk5VNDJXbGM0TnpSSE5rRXpOMWMwTmpFeE1VRT1D"},{"endpointId":3,"manufacturerName":"zosi smart","manufacturerId":"zosi-ACCC8E5E7513","modelName":"P1425-LE","friendlyName":"Camera T3SLCA","description":"Camera T3SLCA","resolutions":[{"width":1280,"height":720}],"videoCodecs":["H264"],"audioCodecs":["ACC"],"protocols":["RTSP"],"authorizationTypes":["NONE"],"uri":"rtsp://rtsp.zositech.xyz:8554/b1ZETlRURU5CU0VvNVVERktTRmxDTkRFeE1VRT1X"}] '''