1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- # @Author : Rocky
- # @File : AppToApp.py
- # @Time : 2023/12/28 11:13
- import hashlib
- import logging
- import time
- import uuid
- from django.http import JsonResponse
- from django.views import View
- from model.models import UserModel
- from object.ResObject import ResObject
- from service.CommonService import CommonService
- class Oa2View(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, operation)
- def validation(self, request_dict, operation):
- response = ResObject()
- if operation == 'getAuthCode': # 获取用户验证码
- return self.get_auth_code(request_dict, response)
- elif operation == 'getTokenWithAuthCode': # 根据用户验证码获取访问令牌
- return self.get_token_with_auth_code(request_dict, response)
- else:
- return self.print_url(operation, request_dict, response)
- @staticmethod
- def get_auth_code(request_dict, response):
- user_id = request_dict.get('user_id', None)
- if not user_id:
- return response.json(10, 'error params')
- now_time = int(time.time())
- user_authorization_code = hashlib.md5((str(uuid.uuid1()) + str(now_time)).encode('utf-8')).hexdigest()
- access_token = CommonService.encrypt_data(randomlength=32)
- refresh_token = CommonService.encrypt_data(randomlength=32)
- user_qs = UserModel.objects.filter(userID=user_id)
- # 用户不存在则创建
- if not user_qs.exists():
- UserModel.objects.create(userID=user_id, access_token=access_token, refresh_token=refresh_token,
- user_authorization_code=user_authorization_code, addTime=now_time, updTime=now_time)
- else:
- user_qs.update(access_token=access_token, refresh_token=refresh_token,
- user_authorization_code=user_authorization_code, updTime=now_time)
- res = {
- 'user_authorization_code': user_authorization_code
- }
- return response.json(0, res)
- @staticmethod
- def get_token_with_auth_code(request_dict, response):
- logger = logging.getLogger('django')
- logger.info('根据用户验证码获取访问令牌参数{}'.format(request_dict))
- user_authorization_code = request_dict.get('code', None)
- if not user_authorization_code:
- return response.json(10, 'error params')
- user_qs = UserModel.objects.filter(user_authorization_code=user_authorization_code).values('userID',
- 'access_token',
- 'refresh_token')
- if not user_qs.exists():
- return response.json(10, 'user not exists')
- access_token = user_qs[0]['access_token']
- refresh_token = user_qs[0]['refresh_token']
- res_json = {
- "access_token": access_token,
- "token_type": "bearer",
- "expires_in": 3600,
- "refresh_token": refresh_token,
- }
- return JsonResponse(res_json)
- @staticmethod
- def print_url(operation, request_dict, response):
- logger = logging.getLogger('django')
- logger.info('打印请求:{} {}'.format(operation, request_dict))
- return JsonResponse({})
|