# @Author : Rocky # @File : AppToApp.py # @Time : 2023/12/28 11:13 import hashlib import logging import time import uuid from django.http import JsonResponse from django.views import View from model.models import UserModel from object.ResObject import ResObject from service.CommonService import CommonService class Oa2View(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, operation) def validation(self, request_dict, operation): response = ResObject() if operation == 'getAuthCode': # 获取用户验证码 return self.get_auth_code(request_dict, response) elif operation == 'getTokenWithAuthCode': # 根据用户验证码获取访问令牌 return self.get_token_with_auth_code(request_dict, response) else: return self.print_url(operation, request_dict, response) @staticmethod def get_auth_code(request_dict, response): user_id = request_dict.get('user_id', None) if not user_id: return response.json(10, 'error params') now_time = int(time.time()) user_authorization_code = hashlib.md5((str(uuid.uuid1()) + str(now_time)).encode('utf-8')).hexdigest() access_token = CommonService.encrypt_data(randomlength=32) refresh_token = CommonService.encrypt_data(randomlength=32) user_qs = UserModel.objects.filter(userID=user_id) # 用户不存在则创建 if not user_qs.exists(): UserModel.objects.create(userID=user_id, access_token=access_token, refresh_token=refresh_token, user_authorization_code=user_authorization_code, addTime=now_time, updTime=now_time) else: user_qs.update(access_token=access_token, refresh_token=refresh_token, user_authorization_code=user_authorization_code, updTime=now_time) res = { 'user_authorization_code': user_authorization_code } return response.json(0, res) @staticmethod def get_token_with_auth_code(request_dict, response): logger = logging.getLogger('django') logger.info('根据用户验证码获取访问令牌参数{}'.format(request_dict)) user_authorization_code = request_dict.get('code', None) if not user_authorization_code: return response.json(10, 'error params') user_qs = UserModel.objects.filter(user_authorization_code=user_authorization_code).values('userID', 'access_token', 'refresh_token') if not user_qs.exists(): return response.json(10, 'user not exists') access_token = user_qs[0]['access_token'] refresh_token = user_qs[0]['refresh_token'] res_json = { "access_token": access_token, "token_type": "bearer", "expires_in": 3600, "refresh_token": refresh_token, } return JsonResponse(res_json) @staticmethod def print_url(operation, request_dict, response): logger = logging.getLogger('django') logger.info('打印请求:{} {}'.format(operation, request_dict)) return JsonResponse({})