123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- # @Author : Rocky
- # @File : AppToApp.py
- # @Time : 2023/12/28 11:13
- import hashlib
- import logging
- import time
- import uuid
- from django.http import JsonResponse
- from django.views import View
- from model.models import UserModel, AlexaAuthModel
- from object.ResObject import ResObject
- from service.CommonService import CommonService
- class Oa2View(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, operation)
- def validation(self, request_dict, operation):
- response = ResObject()
- if operation == 'getAuthCodeAndToken': # 获取用户验证码和令牌
- return self.get_auth_code_and_token(request_dict, response)
- elif operation == 'getTokenWithAuthCode': # 根据用户验证码获取访问令牌
- return self.get_token_with_auth_code(request_dict, response)
- elif operation == 'updateAmazonToken': # 更新亚马逊令牌
- return self.update_amazon_token(request_dict)
- else:
- return self.print_url(operation, request_dict, response)
- @staticmethod
- def get_auth_code_and_token(request_dict, response):
- user_id = request_dict.get('user_id', None)
- region_code = request_dict.get('region_code', None)
- if not all([user_id, region_code]):
- return response.json(10, 'error params')
- try:
- now_time = int(time.time())
- user_authorization_code = hashlib.md5((str(uuid.uuid1()) + str(now_time)).encode('utf-8')).hexdigest()
- access_token = CommonService.encrypt_data(randomlength=32)
- refresh_token = CommonService.encrypt_data(randomlength=32)
- user_qs = UserModel.objects.filter(userID=user_id)
- # 用户不存在则创建
- if not user_qs.exists():
- code = CommonService.encrypt_data(32)
- UserModel.objects.create(userID=user_id, access_token=access_token, refresh_token=refresh_token, code=code,
- user_authorization_code=user_authorization_code, region_code=region_code,
- addTime=now_time, updTime=now_time)
- else:
- user_qs.update(access_token=access_token, refresh_token=refresh_token, region_code=region_code,
- user_authorization_code=user_authorization_code, updTime=now_time)
- res = {
- 'user_authorization_code': user_authorization_code,
- 'access_token': access_token,
- 'refresh_token': refresh_token
- }
- return response.json(0, res)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def get_token_with_auth_code(request_dict, response):
- logger = logging.getLogger('django')
- logger.info('根据用户验证码获取访问令牌参数{}'.format(request_dict))
- user_authorization_code = request_dict.get('code', None)
- if not user_authorization_code:
- return response.json(10, 'error params')
- user_qs = UserModel.objects.filter(user_authorization_code=user_authorization_code).values('userID',
- 'access_token',
- 'refresh_token')
- if not user_qs.exists():
- return response.json(10, 'user not exists')
- access_token = user_qs[0]['access_token']
- refresh_token = user_qs[0]['refresh_token']
- res_json = {
- "access_token": access_token,
- "token_type": "bearer",
- "expires_in": 3600,
- "refresh_token": refresh_token,
- }
- return JsonResponse(res_json)
- @staticmethod
- def update_amazon_token(request_dict):
- user_id = request_dict.get('user_id')
- access_token = request_dict.get('access_token')
- refresh_token = request_dict.get('refresh_token')
- if not all([user_id, access_token, refresh_token]):
- return JsonResponse({}, status=500)
- try:
- now_time = int(time.time())
- AlexaAuthModel.objects.filter(userID=user_id).update(
- access_token=access_token, refresh_token=refresh_token, expiresTime=now_time + 3000, updTime=now_time)
- return JsonResponse({})
- except Exception as e:
- return JsonResponse({}, status=500)
- @staticmethod
- def print_url(operation, request_dict, response):
- logger = logging.getLogger('django')
- logger.info('打印请求:{} {}'.format(operation, request_dict))
- return JsonResponse({})
|