# @Author : Rocky # @File : AppToApp.py # @Time : 2023/12/28 11:13 import hashlib import logging import time import uuid from django.http import JsonResponse from django.views import View from model.models import UserModel, AlexaAuthModel from object.ResObject import ResObject from service.CommonService import CommonService class Oa2View(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, operation) def validation(self, request_dict, operation): response = ResObject() if operation == 'getAuthCodeAndToken': # 获取用户验证码和令牌 return self.get_auth_code_and_token(request_dict, response) elif operation == 'getTokenWithAuthCode': # 根据用户验证码获取访问令牌 return self.get_token_with_auth_code(request_dict, response) elif operation == 'updateAmazonToken': # 更新亚马逊令牌 return self.update_amazon_token(request_dict) else: return self.print_url(operation, request_dict, response) @staticmethod def get_auth_code_and_token(request_dict, response): user_id = request_dict.get('user_id', None) region_code = request_dict.get('region_code', None) if not all([user_id, region_code]): return response.json(10, 'error params') try: now_time = int(time.time()) user_authorization_code = hashlib.md5((str(uuid.uuid1()) + str(now_time)).encode('utf-8')).hexdigest() access_token = CommonService.encrypt_data(randomlength=32) refresh_token = CommonService.encrypt_data(randomlength=32) user_qs = UserModel.objects.filter(userID=user_id) # 用户不存在则创建 if not user_qs.exists(): code = CommonService.encrypt_data(32) UserModel.objects.create(userID=user_id, access_token=access_token, refresh_token=refresh_token, code=code, user_authorization_code=user_authorization_code, region_code=region_code, addTime=now_time, updTime=now_time) else: user_qs.update(access_token=access_token, refresh_token=refresh_token, region_code=region_code, user_authorization_code=user_authorization_code, updTime=now_time) res = { 'user_authorization_code': user_authorization_code, 'access_token': access_token, 'refresh_token': refresh_token } return response.json(0, res) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_token_with_auth_code(request_dict, response): logger = logging.getLogger('django') logger.info('根据用户验证码获取访问令牌参数{}'.format(request_dict)) user_authorization_code = request_dict.get('code', None) if not user_authorization_code: return response.json(10, 'error params') user_qs = UserModel.objects.filter(user_authorization_code=user_authorization_code).values('userID', 'access_token', 'refresh_token') if not user_qs.exists(): return response.json(10, 'user not exists') access_token = user_qs[0]['access_token'] refresh_token = user_qs[0]['refresh_token'] res_json = { "access_token": access_token, "token_type": "bearer", "expires_in": 3600, "refresh_token": refresh_token, } return JsonResponse(res_json) @staticmethod def update_amazon_token(request_dict): user_id = request_dict.get('user_id') access_token = request_dict.get('access_token') refresh_token = request_dict.get('refresh_token') if not all([user_id, access_token, refresh_token]): return JsonResponse({}, status=500) try: now_time = int(time.time()) AlexaAuthModel.objects.filter(userID=user_id).update( access_token=access_token, refresh_token=refresh_token, expiresTime=now_time + 3000, updTime=now_time) return JsonResponse({}) except Exception as e: return JsonResponse({}, status=500) @staticmethod def print_url(operation, request_dict, response): logger = logging.getLogger('django') logger.info('打印请求:{} {}'.format(operation, request_dict)) return JsonResponse({})