#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ """ import json import time, datetime import requests import uuid import logging from django.views.generic import TemplateView from django.shortcuts import render_to_response from django.http import JsonResponse import http.client from urllib.parse import urlencode from object.ResObject import ResObject import subprocess # from gevent.pool import Pool from model.models import UserModel,UidRtspModel from object.ResponseObject import ResponseObject from object.tkObject import tkObject from service.CommonService import CommonService from model.models import AlexaAuthModel from object.RedisObject import RedisObject from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt from azoauth.config import * rtspServer = "rtsp.zositech.com,3.16.66.144" class deviceStatus(TemplateView): @method_decorator(csrf_exempt) def dispatch(self, *args, **kwargs): return super(deviceStatus, self).dispatch(*args, **kwargs) def get(self, request, *args, **kwargs): logger = logging.getLogger('django') logger.info('get----------------------------------------') request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): logger = logging.getLogger('django') request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation is None: return JsonResponse({'code':404,'msg':'not found'}) if operation == 'saveAccessToken': return self.saveAccessToken(request_dict, response) if operation == 'getAccessToken': return self.getAccessToken(request_dict, response) if operation == 'addOrUpdate': return self.addOrUpdate(request_dict, response) if operation == 'delete': return self.delete(request_dict, response) if operation == 'stopPush': return self.notifiesDeviceStopPush(request_dict, response) def saveAccessToken(self,request_dict, response): access_token = request_dict.get("access_token", '') refresh_token = request_dict.get("refresh_token", '') token = request_dict.get("token", '') alexa_region = request_dict.get("region", '') logger = logging.getLogger('django') logger.info('login-------------begin---token') logger.info(access_token) logger.info(refresh_token) alexAuth = AlexaAuthModel.objects.filter(token=token) nowTime = int(time.time()) if not alexAuth.exists(): AlexaAuthModel.objects.create( access_token = access_token, refresh_token = refresh_token, token = token, expiresTime = nowTime + 3200, addTime=nowTime, updTime=nowTime, alexa_region=alexa_region, ) else: alexAuth.update( access_token = access_token, refresh_token = refresh_token, expiresTime = nowTime + 3200, token = token, updTime=nowTime, alexa_region=alexa_region, ) return JsonResponse({'code':200,'msg':'success'}) def getAccessToken(self,request_dict, response): logger = logging.getLogger('django') logger.info('getAccessToken') logger.info(request_dict) return JsonResponse({'code':200,'msg':'success'}) #向alex事件网关发送更新设备操作 def addOrUpdate(self,request_dict, response): logger = logging.getLogger('django') logger.info('in__________________________first') UID = request_dict.get("UID", '') userID = request_dict.get("userID", '') uid_nick = request_dict.get("uid_nick", '') encrypt_pwd = request_dict.get("password", '') region = request_dict.get("region", 'EN') if UID == '' or userID == '' or uid_nick == '' or encrypt_pwd == '': return JsonResponse({'code':101,'msg':'fail'}) commonService = CommonService() password = commonService.decode_pwd(encrypt_pwd) alexAuth = AlexaAuthModel.objects.filter(userID=userID) if not alexAuth.exists(): logger.info('not found user') logger.info(UID) return JsonResponse({'code':102,'msg':'not found user'}) info = alexAuth.values() expiresTime = info[0]['expiresTime'] now_time = int(time.time()) access_token = info[0]['access_token'] refresh_token = info[0]['refresh_token'] alexa_region = info[0]['alexa_region'] if now_time > expiresTime: logger.info(refresh_token) res = self.getRefreshToken(refresh_token) logger.info(res) if('error' not in res): alexAuth.update( access_token = res['access_token'], refresh_token = res['refresh_token'], expiresTime = now_time + 3000, updTime = now_time, ) access_token = res['access_token'] else: logger.info('get refresh_token fail') return JsonResponse({'code':102,'msg':'get refresh_token fail'}) #添加rtsp记录 rtko = tkObject(rank=1) rtsp_url = rtko.encrypt(data=UID) try: uid_rtsp_qs = UidRtspModel.objects.get(uid=UID) except UidRtspModel.DoesNotExist: uid_rtsp_qs = UidRtspModel.objects.create(uid=UID, password=password, nick=uid_nick, addTime=now_time, updTime=now_time, rtsp_url=rtsp_url, region=region) else: # if uid_rtsp_qs.password != uid_a['password']: uid_rtsp_qs.password = password uid_rtsp_qs.nick = uid_nick uid_rtsp_qs.region = 'EN' uid_rtsp_qs.save() api_uri = ALEXA_EVENT_API[alexa_region] messageId = str(uuid.uuid4()).strip() bearer_access_token = "Bearer {access_token}".format(access_token=access_token) headers = {"content-type": "application/json", "Authorization": bearer_access_token} payload_json = { "event": { "header": { "namespace": "Alexa.Discovery", "name": "AddOrUpdateReport", "payloadVersion": "3", "messageId": messageId, }, "payload": { "endpoints": [ { "endpointId": UID, "manufacturerName": "zosi smart", "modelName": "P1425-LE", "friendlyName": uid_nick, "description": "Camera connected via zosi smart", "displayCategories": ["CAMERA"], "capabilities": [ { "type": "AlexaInterface", "interface": "Alexa.CameraStreamController", "version": "3", "cameraStreamConfigurations": [ { "protocols": ["RTSP"], "resolutions": [{"width": 1280, "height": 720}], "authorizationTypes": ["NONE"], "videoCodecs": ["H264"], "audioCodecs": ["ACC"], } ], } ], } ], "scope": { "type": "BearerToken", "token": 'sdf', }, }, } } response = requests.post(api_uri, json=payload_json, headers=headers) logger.info('--------addOrUpdate_response') logger.info(response) return JsonResponse({'res':'success'}) # return json.dumps('test',indent=4,sort_keys=True,ensure_ascii=False) #格式化返回内容 #向alex事件网关发送删除设备操作 def delete(self,request_dict, response): UID = request_dict.get("UID", '') userID = request_dict.get("userID", '') logger = logging.getLogger('django') logger.info('class:deviceStatus-------function:delete------------------') logger.info(UID) logger.info(userID) if UID == '': return JsonResponse({'code':111,'msg':'fail'}) alexAuth = AlexaAuthModel.objects.filter(userID=userID) if not alexAuth.exists(): return JsonResponse({'code':102,'msg':'not found user'}) info = alexAuth.values() expiresTime = info[0]['expiresTime'] refresh_token = info[0]['refresh_token'] now_time = int(time.time()) access_token = info[0]['access_token'] alexa_region = info[0]['alexa_region'] if now_time > expiresTime: res = self.getRefreshToken(refresh_token) if('error' not in res): alexAuth.update( access_token = res['access_token'], refresh_token = res['refresh_token'], expiresTime = now_time + 300, updTime = now_time, ) access_token = res['access_token'] else: return JsonResponse({'code':102,'msg':'get refresh_token fail'}) messageId = str(uuid.uuid4()) headers = { "Authorization": "Bearer " + access_token, "Content-Type": "application/json;charset=UTF-8", "Cache-Control": "no-cache" } payload = { "event": { "header": { "namespace": "Alexa.Discovery", "name": "DeleteReport", "messageId": messageId, "payloadVersion": "3" }, "payload": { "endpoints": [ { "endpointId": UID } ], "scope": { "type": "BearerToken", "token": access_token } } } } # return JsonResponse({"res": headers}) api_uri = ALEXA_EVENT_API[alexa_region] response = requests.post(api_uri, json=payload, headers=headers) logger.info('--------delete_response') logger.info(response) return JsonResponse({'res':'success'}) def getRefreshToken(self,refresh_token): payload = { 'grant_type': 'refresh_token', 'refresh_token': refresh_token, 'client_id': 'amzn1.application-oa2-client.efb07b51dd444f848b6f0598635da3cc', 'client_secret': '8a49390ebe362bfee153be87587f5673d0c1d8aeb6bc1ef736fda6c9d5d81c8f', } auth_request_url = 'https://api.amazon.com/auth/o2/token' headers = { 'content-type': "application/x-www-form-urlencoded", 'cache-control': "no-cache" } res = requests.post(auth_request_url, payload,headers) request_json = res.json() return request_json def notifiesDeviceStopPush(self,request_dict, response): play_url = "http://rtsp.zositech.com:10008/api/v1/players" push_url = "http://rtsp.zositech.com:10008/api/v1/pushers" try: # queryPlay = requests.get(url=play_url, timeout=5) # qplay = queryPlay.json() queryPush = requests.get(url=push_url, timeout=5) qpush = queryPush.json() except Exception as e: return response.json(103, res='query fail') else: # response.json(103, res=list(qplay)) #正在拉流的ID # play_rows = qplay['rows'] # play_paths = [] # for val in play_rows: # play_paths.append(val['path']) #正在推流的ID push_rows = qpush['rows'] push_paths = [] for val in push_rows: startAt = self.str_to_timestamp(val['startAt']) run_time = int(time.time()) - int(startAt) if run_time > 10*60: push_paths.append(val['path']) send_flag = False #正在推流但是没有拉流的推流通知设备断掉 # for push_path in push_paths: # if push_path not in play_paths: # path_list = push_path.split('/') # has_rtsp_url = UidRtspModel.objects.filter(rtsp_url=path_list[-1]).values('uid','password') # # has_rtsp_url = UidRtspModel.objects.filter(rtsp_url='RFJrWk9OVVJPUVRnM1IxaFNXazFFUXpFeE1VRT1U').values('uid') # if has_rtsp_url.exists(): # send_flag = self.runSendStop(has_rtsp_url[0]['uid'], has_rtsp_url[0]['password'], push_path) # if send_flag: # return JsonResponse({'res':'success'}) # else: # return JsonResponse({'res':'fail'}) success = 0 for push_path in push_paths: path_list = push_path.split('/') has_rtsp_url = UidRtspModel.objects.filter(rtsp_url=path_list[-1]).values('uid','password') # has_rtsp_url = UidRtspModel.objects.filter(rtsp_url='RFJrWk9OVVJPUVRnM1IxaFNXazFFUXpFeE1VRT1U').values('uid') if has_rtsp_url.exists(): send_flag = self.runSendStop(has_rtsp_url[0]['uid'], has_rtsp_url[0]['password'], push_path) if send_flag: success += 1 return JsonResponse({'successStopNum': success}) #把格式化时间转换成时间戳 def str_to_timestamp(self, str_time=None, format='%Y-%m-%d %H:%M:%S'): if str_time: time_tuple = time.strptime(str_time, format) # 把格式化好的时间转换成元祖 result = time.mktime(time_tuple) # 把时间元祖转换成时间戳 return int(result) return int(time.time()) # 本地时间转换为UTC 传入的本地时间戳 1531411200 # def local_to_utc(self, local_ts, utc_format='%Y-%m-%dT%H:%MZ'): # import pytz # local_tz = pytz.timezone('Asia/Chongqing') #定义本地时区 # local_format = "%Y-%m-%d %H:%M:%S" #定义本地时间format # # time_str = time.strftime(local_format, time.localtime(local_ts)) #首先将本地时间戳转化为时间元组,用strftime格式化成字符串 # dt = datetime.datetime.strptime(time_str, local_format) #将字符串用strptime 转为为datetime中 datetime格式 # local_dt = local_tz.localize(dt, is_dst=None) #给时间添加时区,等价于 dt.replace(tzinfo=pytz.timezone('Asia/Chongqing')) # utc_dt = local_dt.astimezone(pytz.utc) #astimezone切换时区 # return utc_dt.strftime(utc_format) #返回世界时间格式 #触发此方法,让摄像头推流到MSG流地址 def runSendStop(self, UID, PWD, MSG): command = "./pushtool {UID} {PWD} {MSG} 0".format(UID=UID, PWD=PWD, MSG=MSG) # print('command=>{command}'.format(command=command)) command_url = "http://47.115.134.251/index.php?command={command}".format(command=command) try: exec_res = requests.get(url=command_url, timeout=1) res = exec_res.json() if res['code'] == 200: return True except Exception as e: return False #请求alexa事件网关接口失败错误码 ''' 400 Bad Request INVALID_REQUEST_EXCEPTION 消息无效,可能是因为缺少字段、不正确的值或格式错误的 JSON。根据文档检查邮件以验证邮件是否包含所有必需的字段。 401 Unauthorized INVALID_ACCESS_TOKEN_EXCEPTION 访问令牌无效,因为它已过期或格式错误。刷新令牌并重试请求。如果用户禁用您的技能,这也会使访问令牌失效。这意味着用户已吊销授权,您可以停止为他们发送更改报告。 403 Forbidden SKILL_NEVER_ENABLED_EXCEPTION 请确保将事件发送到正确的区域终结点。例如,北美中的事件应发送到北美终结点。 403 Forbidden INSUFFICIENT_PERMISSION_EXCEPTION 令牌没有所需的权限。确保该技能具有发送 Alexa 事件的权限。请参阅异步消息身份验证的步骤。 404 Not Found SKILL_NOT_FOUND_EXCEPTION 找不到与此令牌关联的技能 ID。当技能处于不同阶段(如认证)时生成用户的访问令牌时,将发生此错误。尝试禁用并重新启用此用户的技能。 413 Payload Too Large REQUEST_ENTITY_TOO_LARGE_EXCEPTION 事件有效负载的大小太大。请求中允许的最大终结点数为 300。以较小的有效负载发送邮件。 429 Too Many Requests THROTTLING_EXCEPTION 请求数过高。重新发送消息最多三次,每次发送尝试之间至少有一秒的间隔。 500 Internal Server Error INTERNAL_SERVICE_EXCEPTION Alexa 发生错误,无法处理该消息。重新发送消息最多三次,每次发送尝试之间至少有一秒的间隔。如果问题仍然存在,请联系亚马逊支持。 503 Service Unavailable SERVICE_UNAVAILABLE_EXCEPTION 亚历克萨不能接受这条信息。重新发送消息最多三次,每次尝试之间至少有一秒的间隔。如果问题仍然存在,请联系亚马逊支持。 '''