AppToApp.py 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. # @Author : Rocky
  2. # @File : AppToApp.py
  3. # @Time : 2023/12/28 11:13
  4. import hashlib
  5. import logging
  6. import time
  7. import uuid
  8. from django.http import JsonResponse
  9. from django.views import View
  10. from model.models import UserModel
  11. from object.ResObject import ResObject
  12. from service.CommonService import CommonService
  13. class Oa2View(View):
  14. def get(self, request, *args, **kwargs):
  15. request.encoding = 'utf-8'
  16. operation = kwargs.get('operation')
  17. return self.validation(request.GET, operation)
  18. def post(self, request, *args, **kwargs):
  19. request.encoding = 'utf-8'
  20. operation = kwargs.get('operation')
  21. return self.validation(request.POST, operation)
  22. def validation(self, request_dict, operation):
  23. response = ResObject()
  24. if operation == 'getAuthCode': # 获取用户验证码
  25. return self.get_auth_code(request_dict, response)
  26. elif operation == 'getTokenWithAuthCode': # 根据用户验证码获取访问令牌
  27. return self.get_token_with_auth_code(request_dict, response)
  28. else:
  29. return self.print_url(operation, request_dict, response)
  30. @staticmethod
  31. def get_auth_code(request_dict, response):
  32. user_id = request_dict.get('user_id', None)
  33. if not user_id:
  34. return response.json(10, 'error params')
  35. now_time = int(time.time())
  36. user_authorization_code = hashlib.md5((str(uuid.uuid1()) + str(now_time)).encode('utf-8')).hexdigest()
  37. access_token = CommonService.encrypt_data(randomlength=32)
  38. refresh_token = CommonService.encrypt_data(randomlength=32)
  39. user_qs = UserModel.objects.filter(userID=user_id)
  40. # 用户不存在则创建
  41. if not user_qs.exists():
  42. UserModel.objects.create(userID=user_id, access_token=access_token, refresh_token=refresh_token,
  43. user_authorization_code=user_authorization_code, addTime=now_time, updTime=now_time)
  44. else:
  45. user_qs.update(access_token=access_token, refresh_token=refresh_token,
  46. user_authorization_code=user_authorization_code, updTime=now_time)
  47. res = {
  48. 'user_authorization_code': user_authorization_code
  49. }
  50. return response.json(0, res)
  51. @staticmethod
  52. def get_token_with_auth_code(request_dict, response):
  53. logger = logging.getLogger('django')
  54. logger.info('根据用户验证码获取访问令牌参数{}'.format(request_dict))
  55. user_authorization_code = request_dict.get('code', None)
  56. if not user_authorization_code:
  57. return response.json(10, 'error params')
  58. user_qs = UserModel.objects.filter(user_authorization_code=user_authorization_code).values('userID',
  59. 'access_token',
  60. 'refresh_token')
  61. if not user_qs.exists():
  62. return response.json(10, 'user not exists')
  63. access_token = user_qs[0]['access_token']
  64. refresh_token = user_qs[0]['refresh_token']
  65. res_json = {
  66. "access_token": access_token,
  67. "token_type": "bearer",
  68. "expires_in": 3600,
  69. "refresh_token": refresh_token,
  70. }
  71. return JsonResponse(res_json)
  72. @staticmethod
  73. def print_url(operation, request_dict, response):
  74. logger = logging.getLogger('django')
  75. logger.info('打印请求:{} {}'.format(operation, request_dict))
  76. return JsonResponse({})