Ver Fonte

app to app oauth2验证连接接口

locky há 1 ano atrás
pai
commit
0934647909
3 ficheiros alterados com 85 adições e 1 exclusões
  1. 4 1
      azoauth/urls.py
  2. 80 0
      controller/AppToApp.py
  3. 1 0
      model/models.py

+ 4 - 1
azoauth/urls.py

@@ -16,7 +16,7 @@ Including another URLconf
 from django.conf.urls import url
 from django.contrib import admin
 from django.urls import path, re_path
-from controller import index, beian
+from controller import index, beian, AppToApp
 from controller import deviceStatus
 
 urlpatterns = [
@@ -37,6 +37,9 @@ urlpatterns = [
     url(r'^deviceStatus/(?P<operation>.*)$', deviceStatus.deviceStatus.as_view()),  # 更新设备信息等接口
     url(r'^vseesTest/(?P<operation>.*)', index.VesseTest.as_view()),  # test
 
+    # app to app oauth2验证登录连接
+    path('appToApp/oa2/(?P<operation>.*)', AppToApp.Oa2View.as_view()),
+
     # 域名备案网站
     re_path('(?P<path>.*)', beian.beianPath),
 

+ 80 - 0
controller/AppToApp.py

@@ -0,0 +1,80 @@
+# @Author    : Rocky
+# @File      : AppToApp.py
+# @Time      : 2023/12/28 11:13
+import hashlib
+import logging
+import time
+import uuid
+
+from django.http import JsonResponse
+from django.views import View
+
+from model.models import UserModel
+from object.ResObject import ResObject
+from service.CommonService import CommonService
+
+
+class Oa2View(View):
+    def get(self, request, *args, **kwargs):
+        request.encoding = 'utf-8'
+        operation = kwargs.get('operation')
+        return self.validation(request.GET, operation)
+
+    def post(self, request, *args, **kwargs):
+        request.encoding = 'utf-8'
+        operation = kwargs.get('operation')
+        return self.validation(request.POST, operation)
+
+    def validation(self, request_dict, operation):
+        response = ResObject()
+        if operation == 'getAuthCode':  # 获取用户验证码
+            return self.get_auth_code(request_dict, response)
+        elif operation == 'getTokenWithAuthCode':  # 根据用户验证码获取访问令牌
+            return self.get_token_with_auth_code(request_dict, response)
+        else:
+            return response.json(10, 'invalid url')
+
+    @staticmethod
+    def get_auth_code(request_dict, response):
+        user_id = request_dict.get('user_id', None)
+        if not user_id:
+            return response.json(10, 'error params')
+
+        now_time = int(time.time())
+        user_authorization_code = hashlib.md5((str(uuid.uuid1()) + str(now_time)).encode('utf-8')).hexdigest()
+        access_token = CommonService.encrypt_data(randomlength=32)
+        refresh_token = CommonService.encrypt_data(randomlength=32)
+        user_qs = UserModel.objects.filter(userID=user_id)
+        # 用户不存在则创建
+        if not user_qs.exists():
+            UserModel.objects.create(userID=user_id, access_token=access_token, refresh_token=refresh_token,
+                                     user_authorization_code=user_authorization_code, addTime=now_time, updTime=now_time)
+        else:
+            user_qs.update(access_token=access_token, refresh_token=refresh_token,
+                           user_authorization_code=user_authorization_code, updTime=now_time)
+        res = {
+            'user_authorization_code': user_authorization_code
+        }
+        return response.json(0, res)
+
+    @staticmethod
+    def get_token_with_auth_code(request_dict, response):
+        logger = logging.getLogger('django')
+        logger.info('根据用户验证码获取访问令牌参数{}'.format(request_dict))
+        user_authorization_code = request_dict.get('code', None)
+        if not user_authorization_code:
+            return response.json(10, 'error params')
+        user_qs = UserModel.objects.filter(user_authorization_code=user_authorization_code).values('userID',
+                                                                                                   'access_token',
+                                                                                                   'refresh_token')
+        if not user_qs.exists():
+            return response.json(10, 'user not exists')
+        access_token = user_qs[0]['access_token']
+        refresh_token = user_qs[0]['refresh_token']
+        res_json = {
+            "access_token": access_token,
+            "token_type": "bearer",
+            "expires_in": 3600,
+            "refresh_token": refresh_token,
+        }
+        return JsonResponse(res_json)

+ 1 - 0
model/models.py

@@ -8,6 +8,7 @@ class UserModel(models.Model):
     userID = models.CharField(blank=True, max_length=32, primary_key=True, unique=True, verbose_name='用户ID')
     region_code = models.CharField(default='US', max_length=8, verbose_name='用户地区')  # US, EU
     code = models.CharField(max_length=32, unique=True, default='', verbose_name='授权码')
+    user_authorization_code = models.CharField(max_length=32, unique=True, default='', verbose_name='用户授权码')
     access_token = models.CharField(max_length=64, unique=False, default='', verbose_name='访问令牌')
     refresh_token = models.CharField(max_length=64, unique=False, default='', verbose_name='刷新令牌')
     uid_rtsp = models.ManyToManyField(to='UidRtspModel', blank=True, verbose_name=u'用户关联uid_rtsp表',