#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ """ import json import time import requests import uuid import logging from django.views.generic import TemplateView from django.shortcuts import render_to_response from django.http import JsonResponse import http.client from urllib.parse import urlencode from object.ResObject import ResObject import subprocess # from gevent.pool import Pool from model.models import UserModel,UidRtspModel from object.ResponseObject import ResponseObject from object.tkObject import tkObject from service.CommonService import CommonService from model.models import AlexaAuthModel from object.RedisObject import RedisObject from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt rtspServer = "rtsp.zositech.org,3.16.66.144" class deviceStatus(TemplateView): @method_decorator(csrf_exempt) def dispatch(self, *args, **kwargs): return super(deviceStatus, self).dispatch(*args, **kwargs) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation is None: return JsonResponse({'code':404,'msg':'not found'}) if operation == 'saveAccessToken': return self.saveAccessToken(request_dict, response) # 付款完成 elif operation == 'test': return JsonResponse({'code':404,'msg':'not found'}) return self.test(request_dict, response) def saveAccessToken(self,request_dict, response): access_token = request_dict.get("access_token", '') refresh_token = request_dict.get("refresh_token", '') token = request_dict.get("token", '') logger = logging.getLogger('django') logger.info('login-------------begin---token') logger.info(access_token) logger.info(refresh_token) alexAuth = AlexaAuthModel.objects.filter(token=token) nowTime = int(time.time()) if not alexAuth.exists(): AlexaAuthModel.objects.create( access_token = access_token, refresh_token = refresh_token, token = token, addTime=nowTime, updTime=nowTime) else: alexAuth.update( access_token = access_token, refresh_token = refresh_token, token = token, updTime=nowTime ) return JsonResponse({'code':200,'msg':'success'}) def discover(self,request_dict, response): userID = 158943604783713800138000 def test(self,request_dict, response): api_uri = 'https://api.amazonalexa.com/v3/events' messageId = str(uuid.uuid4()) access_token = 'Atza|IwEBIGZVV6GPFAXf1_ySVAZkOfGc2j0rNhAnGW8dQ71KVnBY9XmKoi5xd1742t-64FLiM27ErgW_D9sqfAd2A69F1o63_HyPeTfP1_CpgYj6ohGGoMTAN9id58PBOyXtqoJr698luykXb8cT30LBxTqU8zYcmxwJfMzyr2Mxz6KkNtusSB4mHBEkfmQ-xkZ_87G0sUscbVokLnkd676kQAuPzmuYzY_nLLbkR28x8ohh0F9lGZYyEJjyg8-VXdoqWBMfEYxgCnQo76FOXXDFyVEPg6ZC' bearer_access_token = 'Bearer {access_token}'.format(access_token=access_token) headers = {'content-type': "application/json", 'Authorization': bearer_access_token} payload = { "event": { "header": { "namespace": "Alexa.Discovery", "name": "AddOrUpdateReport", "payloadVersion": "3", "messageId": messageId }, "payload": { "endpoints": [ { "endpointId": "HVTLKFJM6KDTAF9J111A", "manufacturerName": "Sample Manufacturer", "description": "Smart Light by Sample Manufacturer", "friendlyName": "Kitchen Light", "displayCategories": [ "CAMERA" ], "capabilities": [ { "type": "AlexaInterface", "interface": "Alexa.CameraStreamController", "version": "3", "cameraStreamConfigurations": [ { "protocols": ["RTSP"], "resolutions": [{"width": 1280, "height": 720}], "authorizationTypes": ["NONE"], "videoCodecs": ["H264"], "audioCodecs": ["ACC"] }] }, { "type": "AlexaInterface", "interface": "Alexa.MediaMetadata", "version": "3", "proactivelyReported": True, # "retrievable": True } ] } ], "scope": { "type": "BearerToken", "token": access_token } } } } response = requests.post(api_uri, data = payload, headers = headers) request_json = response.json() return JsonResponse({'res': request_json}) def post_to_api(self, payload): connection = http.client.HTTPSConnection("api.amazon.com") headers = { 'content-type': "application/x-www-form-urlencoded", 'cache-control': "no-cache" } response = requests.post('https://api.amazon.com/auth/o2/token', urlencode(payload), headers= headers, allow_redirects=True) request_json = response.json() return JsonResponse({'res': request_json})