12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- # @Author : Rocky
- # @File : AppToApp.py
- # @Time : 2023/12/28 11:13
- import hashlib
- import logging
- import time
- import uuid
- from django.http import JsonResponse
- from django.views import View
- from model.models import UserModel
- from object.ResObject import ResObject
- from service.CommonService import CommonService
- class Oa2View(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, operation)
- def validation(self, request_dict, operation):
- response = ResObject()
- if operation == 'getAuthCodeAndToken': # 获取用户验证码和令牌
- return self.get_auth_code_and_token(request_dict, response)
- elif operation == 'getTokenWithAuthCode': # 根据用户验证码获取访问令牌
- return self.get_token_with_auth_code(request_dict, response)
- else:
- return self.print_url(operation, request_dict, response)
- @staticmethod
- def get_auth_code_and_token(request_dict, response):
- user_id = request_dict.get('user_id', None)
- region_code = request_dict.get('region_code', None)
- if not all([user_id, region_code]):
- return response.json(10, 'error params')
- try:
- now_time = int(time.time())
- user_authorization_code = hashlib.md5((str(uuid.uuid1()) + str(now_time)).encode('utf-8')).hexdigest()
- access_token = CommonService.encrypt_data(randomlength=32)
- refresh_token = CommonService.encrypt_data(randomlength=32)
- user_qs = UserModel.objects.filter(userID=user_id)
- # 用户不存在则创建
- if not user_qs.exists():
- code = CommonService.encrypt_data(32)
- UserModel.objects.create(userID=user_id, access_token=access_token, refresh_token=refresh_token, code=code,
- user_authorization_code=user_authorization_code, region_code=region_code,
- addTime=now_time, updTime=now_time)
- else:
- user_qs.update(access_token=access_token, refresh_token=refresh_token, region_code=region_code,
- user_authorization_code=user_authorization_code, updTime=now_time)
- res = {
- 'user_authorization_code': user_authorization_code,
- 'access_token': access_token,
- 'refresh_token': refresh_token
- }
- return response.json(0, res)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def get_token_with_auth_code(request_dict, response):
- logger = logging.getLogger('django')
- logger.info('根据用户验证码获取访问令牌参数{}'.format(request_dict))
- user_authorization_code = request_dict.get('code', None)
- if not user_authorization_code:
- return response.json(10, 'error params')
- user_qs = UserModel.objects.filter(user_authorization_code=user_authorization_code).values('userID',
- 'access_token',
- 'refresh_token')
- if not user_qs.exists():
- return response.json(10, 'user not exists')
- access_token = user_qs[0]['access_token']
- refresh_token = user_qs[0]['refresh_token']
- res_json = {
- "access_token": access_token,
- "token_type": "bearer",
- "expires_in": 3600,
- "refresh_token": refresh_token,
- }
- return JsonResponse(res_json)
- @staticmethod
- def print_url(operation, request_dict, response):
- logger = logging.getLogger('django')
- logger.info('打印请求:{} {}'.format(operation, request_dict))
- return JsonResponse({})
|