import datetime import time from random import Random import base64 import pytz import requests import OpenSSL.crypto as ct from base64 import encodebytes from azoauth.config import ALEXA_EVENT_API, CLIENT_CONFIG, LOGGER from model.models import iotdeviceInfoModel, UidRtspModel, AlexaAuthModel # 复用性且公用较高封装代码在这 class CommonService: # 生成随机数 @staticmethod def encrypt_data(randomlength=8, number=False): str = '' if number == False: characterSet = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsT' \ 'tUuVvWwXxYyZz0123456789' else: characterSet = '0123456789' length = len(characterSet) - 1 random = Random() for index in range(randomlength): str += characterSet[random.randint(0, length)] return str # 加密 # @staticmethod def encrypt_pwd(self, userPwd): for i in range(1, 4): if i == 1: userPwd = self.RandomStr(3, False)+userPwd+self.RandomStr(3, False) userPwd = base64.b64encode(str(userPwd).encode("utf-8")).decode('utf8') if i == 2: userPwd = self.RandomStr(2, False)+str(userPwd)+self.RandomStr(2, False) userPwd = base64.b64encode(str(userPwd).encode("utf-8")).decode('utf8') if i == 3: userPwd = self.RandomStr(1, False)+str(userPwd)+self.RandomStr(1, False) userPwd = base64.b64encode(str(userPwd).encode("utf-8")).decode('utf8') return userPwd # 解密 @staticmethod def decode_pwd(password): for i in range(1, 4): if i == 1: # 第一次先解密 password = base64.b64decode(password) password = password.decode('utf-8') # 截去第一位,最后一位 password = password[1:-1] if i == 2: # 第2次先解密 password = base64.b64decode(password) password = password.decode('utf-8') # 去前2位,后2位 password = password[2:-2] if i == 3: # 第3次先解密 password = base64.b64decode(password) password = password.decode('utf-8') # 去前3位,后3位 password = password[3:-3] return password # 生成随机字符串 @staticmethod def RandomStr(randomlength=8, number=False): str = '' if number == False: characterSet = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsT' \ 'tUuVvWwXxYyZz0123456789' else: characterSet = '0123456789' length = len(characterSet) - 1 random = Random() for index in range(randomlength): str += characterSet[random.randint(0, length)] return str @staticmethod def query_serial_with_uid(uid): # 根据uid查询序列号,存在则返回序列号,否则返uid uid_qs = UidRtspModel.objects.filter(uid=uid).values('serial_number') if uid_qs.exists(): serial_number = uid_qs[0]['serial_number'] if serial_number: return serial_number return uid @staticmethod def req_publish_mqtt_msg(identification_code, topic_name, msg, qos=1): """ 通用发布MQTT消息函数 @param identification_code: 标识码 @param topic_name: 主题名 @param msg: 消息内容 @param qos: mqtt qos等级 @return: boolean """ if not all([identification_code, topic_name]): return False if identification_code.endswith('11L'): thing_name = 'LC_' + identification_code else: thing_name = 'Ansjer_Device_' + identification_code try: # 获取数据组织将要请求的url iot = iotdeviceInfoModel.objects.filter( thing_name=thing_name).values( 'endpoint', 'token_iot_number') if not iot.exists(): return False endpoint = iot[0]['endpoint'] Token = iot[0]['token_iot_number'] # api doc: https://docs.aws.amazon.com/zh_cn/iot/latest/developerguide/http.html # url: https://IoT_data_endpoint/topics/url_encoded_topic_name?qos=1 # post请求url发布MQTT消息 url = 'https://{}/topics/{}?qos={}'.format(endpoint, topic_name, qos) authorizer_name = 'Ansjer_Iot_Auth' signature = CommonService.rsa_sign(Token) # Token签名 headers = { 'x-amz-customauthorizer-name': authorizer_name, 'Token': Token, 'x-amz-customauthorizer-signature': signature} r = requests.post(url=url, headers=headers, json=msg, timeout=2) if r.status_code == 200: res = r.json() if res['message'] == 'OK': return True return False else: return False except Exception as e: return False @staticmethod def rsa_sign(Token): # 私钥签名Token if not Token: return '' private_key_file = '''-----BEGIN RSA PRIVATE KEY----- MIIEpQIBAAKCAQEA5iJzEDPqtGmFMggekVro6C0lrjuC2BjunGkrFNJWpDYzxCzE X5jf4/Fq7hcIaQd5sqHugDxPVollSLPe9zNilbrd0sZfU+Ed8gRVuKW9KwfE9XFr L0pt6bKRQ0IIRfiZ9TuR0tsQysvcO1GZSXcYfPue3tGM1zOnWFThWDqZ06+sOxzt RMRl4yNfbpCG4MfxG3itNXOfrjZv2OMLSXrxmzubSvRpUYSvQPs4fm9302SAnySY 0MKzx6H6528ZQm/IDDSZy6EmNBIyTRDfxC56vnYcXvqedAQh7jJnjdvt6Q4MhASH eIYi1FBSdu2NT6wgpnrqXzx5pq9kR/lnsLID0wIDAQABAoIBAQCiF4GT1/1oNSpr ouxk1PNXFPWFUsVGD8mAwVJmx//eiY7MjfuCmdqYYmI+cFqsH2fIOeYSzGfVO9Dq 9EYHN1oovAWhf7eFDPpajFMUSyiCNmazub8VAAeKowtNpCTPo9pMsDh1m3aoYA4u ebrN0+Sbo16y8kWRDgDAZoiR7DSMs8lczk16hwfv5mw8XpNDbaL3Coi4Koe2S1Yh 2SX3vWFlpd7qF1ZYXuZIp+b8JPrV7n9eUKoFgzj0gqgwQK80CoexIjiOrNMPvkQa q+8kCvFjAzKxOK7e8gjM8lMRiGodb61kmYZkkJzFwWO4EaGbl34lfVECd1Ixp3tF be0OWAGBAoGBAPSteXDzzToD8ovM7LL11x0jWwI6HOiHu89kZtW566rIezjWBuA2 TxrcYKM3h9jQRXS3CsMdoIv6XGk5lqM8ADtjn23FBWe/THYLh8bm8JOgh5RRWQDg SvkLfi9Ih2mM4NJfmuuDOh3Nze2efLM7+kOZWUQwF2Zx9mL5jvRBk351AoGBAPDI sYmT2Li+i5+0vykA2m5uPF8ZOW8BGtAfCZv0suW7BNzSgin78g9WapRd/4p0NNiL /nVMqPPCpd1akCUpV+GDWQt0hV+HZjxANE0KWhciQRyo2qvo51j8SWILJSgh0tXC aTF8qt6oGw3VN3m57vKhbrlDaz0J/NDJFci6msAnAoGBAOuG6bXPGijUj+//DYKf n7jOxdZ49kboEePrtAncdHzri6IEdI3z+WXT6bpzw/LzWUimwldb96WHFNm9s8Hi Ch8hIODbnP5naUTgiIzw1XhmONyPCewL/F+LrqX5XVA/alNX8JrwsUrrR2WLAGLQ Q3I69XDsEjptTU2tCO0bCs3ZAoGBAJ2lCHfm0JHET230zONvp5N9oREyVqQSuRdh +syc3TQDyh85w/bw+X6JOaaCFHj1tFPC9Iqf8k4GNspCLPXnp54CfR4+38O3xnvU HWoDSRC0YKT++IxtJGriYrlKSr2Hx54kdvLriIPW1D+uRW/xCDza7L9nIKMKEvgv b4/IfOEpAoGAeKM9Te7T1VzlAkS0CJOwanzwYV/zrex84WuXxlsGgPQ871lTs5AP H1QLfLfFXH+UVrCEC2yv4eml/cqFkpB3gE5i4MQ8GPVIOSs5tsIyl8YUA03vdNdB GCqvlyw5dfxNA+EtxNE2wCW/LW7ENJlACgcfgPlBZtpLheWoZB/maw4= -----END RSA PRIVATE KEY-----''' # 使用密钥文件方式 # private_key_file_path = os.path.join(BASE_DIR, 'static/iotCore/private.pem')#.replace('\\', '/') # private_key_file = open(private_key_file_path, 'r') private_key = ct.load_privatekey(ct.FILETYPE_PEM, private_key_file) signature = ct.sign(private_key, Token.encode('utf8'), 'sha256') signature = encodebytes(signature).decode('utf8').replace('\n', '') # print('signature:', signature) return signature @staticmethod def timestamp_to_iso8601(timestamp): # 将十位时间戳(秒)转换为UTC时区的datetime对象 utc_dt = datetime.datetime.fromtimestamp(timestamp, pytz.UTC) # 格式化为YYYY-MM-DDThh:mm:ssZ格式 return utc_dt.strftime("%Y-%m-%dT%H:%M:%SZ") @classmethod def get_access_token(cls, user_id): auth_qs = AlexaAuthModel.objects.filter(userID=user_id).values( 'expiresTime', 'refresh_token', 'access_token', 'alexa_region', 'skill_name') if not auth_qs.exists(): return False expires_time = auth_qs[0]['expiresTime'] alexa_region = auth_qs[0]['alexa_region'] now_time = int(time.time()) if now_time < expires_time: access_token = auth_qs[0]['access_token'] else: if alexa_region not in ALEXA_EVENT_API.keys(): return False skill_name = auth_qs[0]['skill_name'] refresh_token = auth_qs[0]['refresh_token'] res = cls.get_refresh_token(refresh_token, skill_name) if 'error' not in res: auth_qs.update( access_token=res['access_token'], refresh_token=res['refresh_token'], expiresTime=now_time + 3000, updTime=now_time, ) access_token = res['access_token'] else: return False return access_token @staticmethod def get_refresh_token(refresh_token, skill_name): # 请求更新token if skill_name not in CLIENT_CONFIG.keys(): return {'error': 'error skill_name'} payload = { 'grant_type': 'refresh_token', 'refresh_token': refresh_token, 'client_id': CLIENT_CONFIG[skill_name]['client_id'], 'client_secret': CLIENT_CONFIG[skill_name]['client_secret'], } auth_request_url = 'https://api.amazon.com/auth/o2/token' headers = { 'content-type': "application/x-www-form-urlencoded", 'cache-control': "no-cache" } try: res = requests.post(auth_request_url, data=payload, headers=headers, timeout=5) res.raise_for_status() request_json = res.json() return request_json except requests.exceptions.RequestException as e: return {'error': str(e)}