123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- """
- import json
- import time, datetime
- import requests
- import uuid
- import logging
- from django.views.generic import TemplateView
- from django.shortcuts import render_to_response
- from django.http import JsonResponse
- import http.client
- from urllib.parse import urlencode
- from object.ResObject import ResObject
- import subprocess
- # from gevent.pool import Pool
- from model.models import UserModel,UidRtspModel
- from object.ResponseObject import ResponseObject
- from object.tkObject import tkObject
- from service.CommonService import CommonService
- from model.models import AlexaAuthModel
- from object.RedisObject import RedisObject
- from django.utils.decorators import method_decorator
- from django.views.decorators.csrf import csrf_exempt
- from azoauth.config import *
- rtspServer = "rtsp.zositech.com,3.16.66.144"
- class deviceStatus(TemplateView):
- @method_decorator(csrf_exempt)
- def dispatch(self, *args, **kwargs):
- return super(deviceStatus, self).dispatch(*args, **kwargs)
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation is None:
- return JsonResponse({'code': 404, 'msg': 'not found'})
- if operation == 'saveAccessToken':
- return self.saveAccessToken(request_dict)
- if operation == 'getAccessToken':
- return self.getAccessToken(request_dict, response)
- if operation == 'addOrUpdate':
- return self.addOrUpdate(request_dict, response)
- if operation == 'addOrUpdateV2':
- return self.addOrUpdateV2(request_dict)
- if operation == 'delete':
- return self.delete(request_dict)
- if operation == 'stopPush':
- return self.notifiesDeviceStopPush(request_dict, response)
- def saveAccessToken(self, request_dict):
- token = request_dict.get("token", '')
- alexa_region = request_dict.get("region", '')
- skill_name = request_dict.get("skill_name", '')
- access_token = request_dict.get("access_token", '')
- refresh_token = request_dict.get("refresh_token", '')
- logger = logging.getLogger('django')
- logger.info('--------认证登录,地区: {}, skill: {}--------'.format(alexa_region, skill_name))
- alexAuth = AlexaAuthModel.objects.filter(token=token)
- nowTime = int(time.time())
- if not alexAuth.exists():
- AlexaAuthModel.objects.create(
- token=token,
- addTime=nowTime,
- updTime=nowTime,
- skill_name=skill_name,
- alexa_region=alexa_region,
- access_token=access_token,
- refresh_token=refresh_token,
- expiresTime=nowTime + 3200,
- )
- else:
- alexAuth.update(
- token=token,
- updTime=nowTime,
- skill_name=skill_name,
- alexa_region=alexa_region,
- access_token=access_token,
- refresh_token=refresh_token,
- expiresTime=nowTime + 3200,
- )
- return JsonResponse({'code': 200, 'msg': 'success'})
- def getAccessToken(self,request_dict, response):
- logger = logging.getLogger('django')
- logger.info('-------------getAccessToken')
- # logger.info(request_dict)
- return JsonResponse({'code':200,'msg':'success'})
- # 向alex事件网关发送更新设备操作,已改成V2接口,后期可删掉
- def addOrUpdate(self,request_dict, response):
- logger = logging.getLogger('django')
- logger.info('in__________________________first')
- UID = request_dict.get("UID", '')
- userID = request_dict.get("userID", '')
- uid_nick = request_dict.get("uid_nick", '')
- encrypt_pwd = request_dict.get("password", '')
- region = request_dict.get("region", 'EN')
- if UID == '' or userID == '' or uid_nick == '' or encrypt_pwd == '':
- return JsonResponse({'code':101,'msg':'fail'})
- commonService = CommonService()
- password = commonService.decode_pwd(encrypt_pwd)
- alexAuth = AlexaAuthModel.objects.filter(userID=userID)
- if not alexAuth.exists():
- logger.info('not found user')
- logger.info(UID)
- return JsonResponse({'code':102,'msg':'not found user'})
- info = alexAuth.values()
- expiresTime = info[0]['expiresTime']
- now_time = int(time.time())
- access_token = info[0]['access_token']
- refresh_token = info[0]['refresh_token']
- alexa_region = info[0]['alexa_region']
- if now_time > expiresTime:
- logger.info(refresh_token)
- res = self.getRefreshToken(refresh_token)
- logger.info(res)
- if('error' not in res):
- alexAuth.update(
- access_token = res['access_token'],
- refresh_token = res['refresh_token'],
- expiresTime = now_time + 3000,
- updTime = now_time,
- )
- access_token = res['access_token']
- else:
- logger.info('get refresh_token fail')
- return JsonResponse({'code':102,'msg':'get refresh_token fail'})
- #添加rtsp记录
- rtko = tkObject(rank=1)
- rtsp_url = rtko.encrypt(data=UID)
- try:
- uid_rtsp_qs = UidRtspModel.objects.get(uid=UID)
- except UidRtspModel.DoesNotExist:
- uid_rtsp_qs = UidRtspModel.objects.create(uid=UID, password=password,
- nick=uid_nick, addTime=now_time,
- updTime=now_time, rtsp_url=rtsp_url,
- region=region)
- else:
- # if uid_rtsp_qs.password != uid_a['password']:
- uid_rtsp_qs.password = password
- uid_rtsp_qs.nick = uid_nick
- uid_rtsp_qs.region = 'EN'
- uid_rtsp_qs.save()
- api_uri = ALEXA_EVENT_API[alexa_region]
- messageId = str(uuid.uuid4()).strip()
- bearer_access_token = "Bearer {access_token}".format(access_token=access_token)
- headers = {"content-type": "application/json", "Authorization": bearer_access_token}
- payload_json = {
- "event": {
- "header": {
- "namespace": "Alexa.Discovery",
- "name": "AddOrUpdateReport",
- "payloadVersion": "3",
- "messageId": messageId,
- },
- "payload": {
- "endpoints": [
- {
- "endpointId": UID,
- "manufacturerName": "zosi smart",
- "modelName": "P1425-LE",
- "friendlyName": uid_nick,
- "description": "Camera connected via zosi smart",
- "displayCategories": ["CAMERA"],
- "capabilities": [
- {
- "type": "AlexaInterface",
- "interface": "Alexa.CameraStreamController",
- "version": "3",
- "cameraStreamConfigurations": [
- {
- "protocols": ["RTSP"],
- "resolutions": [{"width": 1280, "height": 720}],
- "authorizationTypes": ["NONE"],
- "videoCodecs": ["H264"],
- "audioCodecs": ["ACC"],
- }
- ],
- }
- ],
- }
- ],
- "scope": {
- "type": "BearerToken",
- "token": 'sdf',
- },
- },
- }
- }
- response = requests.post(api_uri, json=payload_json, headers=headers)
- logger.info('--------addOrUpdate_response')
- logger.info(response)
- return JsonResponse({'res':'success'})
- # 向alexa事件网关发送更新设备操作V2接口
- def addOrUpdateV2(self, request_dict):
- logger = logging.getLogger('django')
- logger.info('--------添加/更新设备信息V2--------')
- region = request_dict.get("region", 'EN')
- data_list = request_dict.get("data_list", '')
- logger.info('data_list: {}'.format(data_list))
- data_list = json.loads(data_list) # 多通道设备才传 channel 键值对
- if not data_list:
- return JsonResponse({'code': 101, 'msg': 'Parameter error'})
- try:
- UID = data_list[0]['UID']
- userID = data_list[0]['userID']
- password = data_list[0]['password']
- password = CommonService().decode_pwd(password)
- # 获取alexa授权信息
- alexAuth = AlexaAuthModel.objects.filter(userID=userID).\
- values('expiresTime', 'access_token', 'refresh_token', 'alexa_region', 'skill_name')
- if not alexAuth.exists():
- logger.info('UID为 {} 的用户不存在'.format(UID))
- return JsonResponse({'code': 102, 'msg': 'not found user'})
- skill_name = alexAuth[0]['skill_name']
- expiresTime = alexAuth[0]['expiresTime']
- access_token = alexAuth[0]['access_token']
- refresh_token = alexAuth[0]['refresh_token']
- alexa_region = alexAuth[0]['alexa_region']
- if alexa_region not in ALEXA_EVENT_API.keys():
- logger.info('alexa区域信息错误,alexa_region: {}'.format(alexa_region))
- return JsonResponse({'code': 102, 'msg': 'alexa_region error'})
- # 更新alexa token
- now_time = int(time.time())
- if now_time > expiresTime:
- logger.info(refresh_token)
- res = self.getRefreshToken(refresh_token, skill_name)
- logger.info(res)
- if 'error' not in res:
- alexAuth.update(
- updTime=now_time,
- expiresTime=now_time + 3000,
- access_token=res['access_token'],
- refresh_token=res['refresh_token'],
- )
- access_token = res['access_token']
- else:
- logger.info('get refresh_token fail')
- return JsonResponse({'code': 102, 'msg': 'get refresh_token fail'})
- # 添加rtsp记录
- channel = len(data_list) # 列表的元素个数即通道数量
- rtsp_url = tkObject(rank=1).encrypt(data=UID)
- uid_rtsp_qs = UidRtspModel.objects.filter(uid__contains=UID)
- if not uid_rtsp_qs.exists():
- # 创建UidRtsp数据
- if channel == 1:
- # 单通道设备
- UidRtspModel.objects.create(uid=UID, nick=data_list[0]['uid_nick'], region=region, rtsp_url=rtsp_url,
- password=password, addTime=now_time, updTime=now_time)
- else:
- # 多通道设备
- bulk = []
- for data in data_list:
- uid = UID + '_' + str(data['channel']) # 多通道设备: uid_通道号
- uidRtsp = UidRtspModel(uid=uid, nick=data['uid_nick'], region=region, rtsp_url=rtsp_url,
- password=password, addTime=now_time, updTime=now_time)
- bulk.append(uidRtsp)
- UidRtspModel.objects.bulk_create(bulk)
- else:
- # 更新UidRtsp数据
- count = len(uid_rtsp_qs)
- if count != channel:
- return JsonResponse({'code': 103, 'msg': '通道数不匹配'})
- if count == 1:
- uid_rtsp_qs.update(nick=data_list[0]['uid_nick'], region=region, password=password)
- else:
- # 多通道设备
- for data in data_list:
- uid = UID + '_' + str(data['channel']) # 多通道设备: uid_通道号
- UidRtspModel.objects.filter(uid=uid).update(nick=data['uid_nick'], region=region, password=password)
- api_uri = ALEXA_EVENT_API[alexa_region]
- messageId = str(uuid.uuid4()).strip()
- bearer_access_token = "Bearer {access_token}".format(access_token=access_token)
- headers = {"content-type": "application/json", "Authorization": bearer_access_token}
- endpoints = self.append_endpoint(data_list, channel)
- payload_json = {
- "event": {
- "header": {
- "namespace": "Alexa.Discovery",
- "name": "AddOrUpdateReport",
- "payloadVersion": "3",
- "messageId": messageId,
- },
- "payload": {
- "endpoints": endpoints,
- "scope": {
- "type": "BearerToken",
- "token": 'sdf',
- },
- },
- }
- }
- response = requests.post(api_uri, json=payload_json, headers=headers)
- logger.info('--------Alexa AddOrUpdateReport响应: {}--------'.format(response))
- return JsonResponse({'res': 'success'})
- except Exception as e:
- logger.info('--------添加/更新设备信息V2异常--------: {}'.format(repr(e)))
- def append_endpoint(self, data_list, channel):
- # 组织 endpoints 数据
- endpoints = []
- for data in data_list:
- endpointId = data['UID'] if channel == 1 else data['UID']+'_'+str(data['channel'])
- endpoint = {
- "endpointId": endpointId,
- "manufacturerName": "zosi smart",
- "modelName": "P1425-LE",
- "friendlyName": data['uid_nick'],
- "description": "Camera connected via zosi smart",
- "displayCategories": ["CAMERA"],
- "capabilities": [
- {
- "type": "AlexaInterface",
- "interface": "Alexa.CameraStreamController",
- "version": "3",
- "cameraStreamConfigurations": [
- {
- "protocols": ["RTSP"],
- "resolutions": [{"width": 1280, "height": 720}],
- "authorizationTypes": ["NONE"],
- "videoCodecs": ["H264"],
- "audioCodecs": ["ACC"],
- }
- ],
- }
- ],
- }
- endpoints.append(endpoint)
- return endpoints
- # 向alexa事件网关发送删除设备操作
- def delete(self, request_dict):
- UID = request_dict.get("UID", '')
- userID = request_dict.get("userID", '')
- logger = logging.getLogger('django')
- logger.info('--------删除设备--------')
- logger.info('userID: {}, UID: {}'.format(userID, UID))
- if not all([UID, userID]):
- return JsonResponse({'code': 111, 'msg': 'fail'})
- try:
- alexAuth = AlexaAuthModel.objects.filter(userID=userID).\
- values('expiresTime', 'refresh_token', 'access_token', 'alexa_region', 'skill_name')
- if not alexAuth.exists():
- logger.info('UID为 {} 的用户不存在'.format(UID))
- return JsonResponse({'code': 102, 'msg': 'not found user'})
- skill_name = alexAuth[0]['skill_name']
- expiresTime = alexAuth[0]['expiresTime']
- refresh_token = alexAuth[0]['refresh_token']
- access_token = alexAuth[0]['access_token']
- alexa_region = alexAuth[0]['alexa_region']
- if alexa_region not in ALEXA_EVENT_API.keys():
- logger.info('alexa区域信息错误,alexa_region: {}'.format(alexa_region))
- return JsonResponse({'code': 102, 'msg': 'alexa_region error'})
- now_time = int(time.time())
- if now_time > expiresTime:
- res = self.getRefreshToken(refresh_token, skill_name)
- if 'error' not in res:
- alexAuth.update(
- access_token=res['access_token'],
- refresh_token=res['refresh_token'],
- expiresTime=now_time + 300,
- updTime=now_time,
- )
- access_token = res['access_token']
- else:
- return JsonResponse({'code': 102, 'msg': 'get refresh_token fail'})
- uidRtsp_qs = UidRtspModel.objects.filter(uid__contains=UID).values('uid')
- if not uidRtsp_qs.exists():
- return JsonResponse({'code': 103, 'msg': '不存在uidRtsp数据'})
- endpoints = []
- for uidRtsp in uidRtsp_qs:
- endpointId = {"endpointId": uidRtsp['uid']}
- endpoints.append(endpointId)
- headers = {
- "Authorization": "Bearer " + access_token,
- "Content-Type": "application/json;charset=UTF-8",
- "Cache-Control": "no-cache"
- }
- payload = {
- "event": {
- "header": {
- "namespace": "Alexa.Discovery",
- "name": "DeleteReport",
- "messageId": str(uuid.uuid4()),
- "payloadVersion": "3"
- },
- "payload": {
- "endpoints": endpoints,
- "scope": {
- "type": "BearerToken",
- "token": access_token
- }
- }
- }
- }
- api_uri = ALEXA_EVENT_API[alexa_region]
- response = requests.post(api_uri, json=payload, headers=headers)
- logger.info('--------Alexa DeleteReport响应: {}--------'.format(response))
- return JsonResponse({'res': 'success'})
- except Exception as e:
- logger.info('--------删除设备异常--------: {}'.format(repr(e)))
- def getRefreshToken(self, refresh_token, skill_name):
- # 请求更新token
- logger = logging.getLogger('django')
- logger.info('--------{}请求更新token--------'.format(skill_name))
- if skill_name not in CLIENT_CONFIG.keys():
- # skill_name = 'Anlapus' # 应对UL测试,后期改回
- logger.info('--------技能名称错误,skill_name: {}--------'.format(skill_name))
- return JsonResponse({'code': 111, 'msg': 'skill_name error'})
- payload = {
- 'grant_type': 'refresh_token',
- 'refresh_token': refresh_token,
- 'client_id': CLIENT_CONFIG[skill_name]['client_id'],
- 'client_secret': CLIENT_CONFIG[skill_name]['client_secret'],
- }
- auth_request_url = 'https://api.amazon.com/auth/o2/token'
- headers = {
- 'content-type': "application/x-www-form-urlencoded",
- 'cache-control': "no-cache"
- }
- res = requests.post(auth_request_url, payload, headers)
- request_json = res.json()
- return request_json
- def notifiesDeviceStopPush(self,request_dict, response):
- play_url = "http://rtsp.zositech.com:10008/api/v1/players"
- push_url = "http://rtsp.zositech.com:10008/api/v1/pushers"
- try:
- # queryPlay = requests.get(url=play_url, timeout=5)
- # qplay = queryPlay.json()
- queryPush = requests.get(url=push_url, timeout=5)
- qpush = queryPush.json()
- except Exception as e:
- return response.json(103, res='query fail')
- else:
- # response.json(103, res=list(qplay))
- #正在拉流的ID
- # play_rows = qplay['rows']
- # play_paths = []
- # for val in play_rows:
- # play_paths.append(val['path'])
- #正在推流的ID
- push_rows = qpush['rows']
- push_paths = []
- for val in push_rows:
- startAt = self.str_to_timestamp(val['startAt'])
- run_time = int(time.time()) - int(startAt)
- if run_time > 10*60:
- push_paths.append(val['path'])
- send_flag = False
- #正在推流但是没有拉流的推流通知设备断掉
- # for push_path in push_paths:
- # if push_path not in play_paths:
- # path_list = push_path.split('/')
- # has_rtsp_url = UidRtspModel.objects.filter(rtsp_url=path_list[-1]).values('uid','password')
- # # has_rtsp_url = UidRtspModel.objects.filter(rtsp_url='RFJrWk9OVVJPUVRnM1IxaFNXazFFUXpFeE1VRT1U').values('uid')
- # if has_rtsp_url.exists():
- # send_flag = self.runSendStop(has_rtsp_url[0]['uid'], has_rtsp_url[0]['password'], push_path)
- # if send_flag:
- # return JsonResponse({'res':'success'})
- # else:
- # return JsonResponse({'res':'fail'})
- success = 0
- for push_path in push_paths:
- path_list = push_path.split('/')
- has_rtsp_url = UidRtspModel.objects.filter(rtsp_url=path_list[-1]).values('uid','password')
- # has_rtsp_url = UidRtspModel.objects.filter(rtsp_url='RFJrWk9OVVJPUVRnM1IxaFNXazFFUXpFeE1VRT1U').values('uid')
- if has_rtsp_url.exists():
- send_flag = self.runSendStop(has_rtsp_url[0]['uid'], has_rtsp_url[0]['password'], push_path)
- if send_flag:
- success += 1
- return JsonResponse({'successStopNum': success})
- #把格式化时间转换成时间戳
- def str_to_timestamp(self, str_time=None, format='%Y-%m-%d %H:%M:%S'):
- if str_time:
- time_tuple = time.strptime(str_time, format) # 把格式化好的时间转换成元祖
- result = time.mktime(time_tuple) # 把时间元祖转换成时间戳
- return int(result)
- return int(time.time())
- # 本地时间转换为UTC 传入的本地时间戳 1531411200
- # def local_to_utc(self, local_ts, utc_format='%Y-%m-%dT%H:%MZ'):
- # import pytz
- # local_tz = pytz.timezone('Asia/Chongqing') #定义本地时区
- # local_format = "%Y-%m-%d %H:%M:%S" #定义本地时间format
- #
- # time_str = time.strftime(local_format, time.localtime(local_ts)) #首先将本地时间戳转化为时间元组,用strftime格式化成字符串
- # dt = datetime.datetime.strptime(time_str, local_format) #将字符串用strptime 转为为datetime中 datetime格式
- # local_dt = local_tz.localize(dt, is_dst=None) #给时间添加时区,等价于 dt.replace(tzinfo=pytz.timezone('Asia/Chongqing'))
- # utc_dt = local_dt.astimezone(pytz.utc) #astimezone切换时区
- # return utc_dt.strftime(utc_format) #返回世界时间格式
- #触发此方法,让摄像头推流到MSG流地址
- def runSendStop(self, UID, PWD, MSG):
- command = "./pushtool {UID} {PWD} {MSG} 0".format(UID=UID, PWD=PWD, MSG=MSG)
- # print('command=>{command}'.format(command=command))
- command_url = "http://47.115.134.251/index.php?command={command}".format(command=command)
- try:
- exec_res = requests.get(url=command_url, timeout=1)
- res = exec_res.json()
- if res['code'] == 200:
- return True
- except Exception as e:
- return False
- #请求alexa事件网关接口失败错误码
- '''
- 400 Bad Request INVALID_REQUEST_EXCEPTION 消息无效,可能是因为缺少字段、不正确的值或格式错误的 JSON。根据文档检查邮件以验证邮件是否包含所有必需的字段。
- 401 Unauthorized INVALID_ACCESS_TOKEN_EXCEPTION 访问令牌无效,因为它已过期或格式错误。刷新令牌并重试请求。如果用户禁用您的技能,这也会使访问令牌失效。这意味着用户已吊销授权,您可以停止为他们发送更改报告。
- 403 Forbidden SKILL_NEVER_ENABLED_EXCEPTION 请确保将事件发送到正确的区域终结点。例如,北美中的事件应发送到北美终结点。
- 403 Forbidden INSUFFICIENT_PERMISSION_EXCEPTION 令牌没有所需的权限。确保该技能具有发送 Alexa 事件的权限。请参阅异步消息身份验证的步骤。
- 404 Not Found SKILL_NOT_FOUND_EXCEPTION 找不到与此令牌关联的技能 ID。当技能处于不同阶段(如认证)时生成用户的访问令牌时,将发生此错误。尝试禁用并重新启用此用户的技能。
- 413 Payload Too Large REQUEST_ENTITY_TOO_LARGE_EXCEPTION 事件有效负载的大小太大。请求中允许的最大终结点数为 300。以较小的有效负载发送邮件。
- 429 Too Many Requests THROTTLING_EXCEPTION 请求数过高。重新发送消息最多三次,每次发送尝试之间至少有一秒的间隔。
- 500 Internal Server Error INTERNAL_SERVICE_EXCEPTION Alexa 发生错误,无法处理该消息。重新发送消息最多三次,每次发送尝试之间至少有一秒的间隔。如果问题仍然存在,请联系亚马逊支持。
- 503 Service Unavailable SERVICE_UNAVAILABLE_EXCEPTION 亚历克萨不能接受这条信息。重新发送消息最多三次,每次尝试之间至少有一秒的间隔。如果问题仍然存在,请联系亚马逊支持。
- '''
|