#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved. @AUTHOR: ASJRD018 @NAME: AnsjerOA @software: PyCharm @DATE: 2018/8/13 15:36 @Version: python3.6 @MODIFY DECORD:ansjer dev @file: TokenObject.py @Contact: chanjunkai@163.com """ from AnsjerPush.config import OAUTH_ACCESS_TOKEN_SECRET, OAUTH_REFRESH_TOKEN_SECRET, OAUTH_ACCESS_TOKEN_TIME, \ OAUTH_REFRESH_TOKEN_TIME import jwt, time from Object.RedisObject import RedisObject class TokenObject: def __init__(self, token=None): if token == 'local': token = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VySUQiOiIxNTg0MzUxODk2MjgyMTM4MDAxMzgwMDAiLCJsYW5nIjoiZW4iLCJ1c2VyIjoiMTM2ODAzMTc1OTYiLCJtX2NvZGUiOiIxMjM0MTMyNDMyMTQiLCJleHAiOjE1ODcyNzcwNjB9.c0LV_XyxwbzUlYqMJqx7vw9f19Jv-0kGnUHuu_go-mo' if token == 'test': token = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyIjoiMTM4MDAxMzgwMDEiLCJleHAiOjE1Njk5OTg4OTYsInVzZXJJRCI6IjE1MTU2NDI2MjMzNzkzOTUxMzgwMDEzODAwMSIsImxhbmciOiJlbiIsIm1fY29kZSI6IjEyMzQxMzI0MzIxNCJ9.VAQtT9AbCCfXcrNj9DL5cvVasMDoI7AP8ptgU1GoMu8' self.token = token self.lang = None self.userID = None self.user = '' self.code = 0 # 令牌校验 self.valid() def valid(self): if self.token is None: self.code = 309 return try: res = jwt.decode(self.token, OAUTH_ACCESS_TOKEN_SECRET, algorithms='HS256') # print(res) self.userID = res.get('userID', None) self.lang = res.get('lang', None) self.user = res.get('user', '') # 刷新登录时间 if self.userID: print(self.user) redisObj = RedisObject(db=3) redisObj.set_data(key=self.userID, val=self.user, expire=300) except jwt.ExpiredSignatureError as e: print('过期') print(repr(e)) self.code = 309 return except Exception as e: self.code = 309 return else: if not self.userID: self.code = 309 return else: if self.userID: self.code = 0 return res else: self.code = 309 return # token加密 def generate(self, data={}): try: access_expire = int(OAUTH_ACCESS_TOKEN_TIME.total_seconds()) refresh_expire = int(OAUTH_REFRESH_TOKEN_TIME.total_seconds()) now_stamp = int(time.time()) access_data = data refresh_data = data access_data['exp'] = access_expire + now_stamp refresh_data['exp'] = refresh_expire + now_stamp access_token = jwt.encode(access_data, OAUTH_ACCESS_TOKEN_SECRET, algorithm='HS256') refresh_token = jwt.encode( refresh_data, OAUTH_REFRESH_TOKEN_SECRET, algorithm='HS256') res = { 'access_token': access_token.decode('utf-8'), 'access_expire': access_expire, 'refresh_expire': refresh_expire, 'refresh_token': refresh_token.decode('utf-8'), } except Exception as e: self.code = 309 print(repr(e)) else: self.code = 0 return res def refresh(self): if not self.token: self.code = 309 return try: res = jwt.decode(self.token, OAUTH_REFRESH_TOKEN_SECRET, algorithms='HS256') except jwt.ExpiredSignatureError as e: print('过期') print(repr(e)) self.code = 309 except Exception as e: self.code = 309 print(repr(e)) else: self.code = 0 userID = res.get('userID', '') user = res.get('user', '') lang = self.lang refreshRes = self.generate(data={'userID': userID, 'lang': lang, 'user': user}) return refreshRes # import jwt # # # token = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySUQiOiIxNTMzODg0NDE4NTE5MTM4MDAxMzgwMDAiLCJleHAiOjE1NTU1NTEwNjUsInVzZXIiOiIxMTFAcXEuY29tIiwibGFuZyI6ImVuIn0.waPlfIBucSA7rFfnsxOKIVJ_cL6xiP33cAiz1IDoteY' # res = jwt.decode(token, 'a+jbgnw%@1%zy^=@dn62%', algorithms='HS256') # print(res)