123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- import datetime
- import time
- from random import Random
- import base64
- import pytz
- import requests
- import OpenSSL.crypto as ct
- from base64 import encodebytes
- from azoauth.config import ALEXA_EVENT_API, CLIENT_CONFIG, LOGGER
- from model.models import iotdeviceInfoModel, UidRtspModel, AlexaAuthModel
- # 复用性且公用较高封装代码在这
- class CommonService:
- # 生成随机数
- @staticmethod
- def encrypt_data(randomlength=8, number=False):
- str = ''
- if number == False:
- characterSet = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsT' \
- 'tUuVvWwXxYyZz0123456789'
- else:
- characterSet = '0123456789'
- length = len(characterSet) - 1
- random = Random()
- for index in range(randomlength):
- str += characterSet[random.randint(0, length)]
- return str
- # 加密
- # @staticmethod
- def encrypt_pwd(self, userPwd):
- for i in range(1, 4):
- if i == 1:
- userPwd = self.RandomStr(3, False)+userPwd+self.RandomStr(3, False)
- userPwd = base64.b64encode(str(userPwd).encode("utf-8")).decode('utf8')
- if i == 2:
- userPwd = self.RandomStr(2, False)+str(userPwd)+self.RandomStr(2, False)
- userPwd = base64.b64encode(str(userPwd).encode("utf-8")).decode('utf8')
- if i == 3:
- userPwd = self.RandomStr(1, False)+str(userPwd)+self.RandomStr(1, False)
- userPwd = base64.b64encode(str(userPwd).encode("utf-8")).decode('utf8')
- return userPwd
- # 解密
- @staticmethod
- def decode_pwd(password):
- for i in range(1, 4):
- if i == 1:
- # 第一次先解密
- password = base64.b64decode(password)
- password = password.decode('utf-8')
- # 截去第一位,最后一位
- password = password[1:-1]
- if i == 2:
- # 第2次先解密
- password = base64.b64decode(password)
- password = password.decode('utf-8')
- # 去前2位,后2位
- password = password[2:-2]
- if i == 3:
- # 第3次先解密
- password = base64.b64decode(password)
- password = password.decode('utf-8')
- # 去前3位,后3位
- password = password[3:-3]
- return password
- # 生成随机字符串
- @staticmethod
- def RandomStr(randomlength=8, number=False):
- str = ''
- if number == False:
- characterSet = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsT' \
- 'tUuVvWwXxYyZz0123456789'
- else:
- characterSet = '0123456789'
- length = len(characterSet) - 1
- random = Random()
- for index in range(randomlength):
- str += characterSet[random.randint(0, length)]
- return str
- @staticmethod
- def query_serial_with_uid(uid):
- # 根据uid查询序列号,存在则返回序列号,否则返uid
- uid_qs = UidRtspModel.objects.filter(uid=uid).values('serial_number')
- if uid_qs.exists():
- serial_number = uid_qs[0]['serial_number']
- if serial_number:
- return serial_number
- return uid
- @staticmethod
- def req_publish_mqtt_msg(identification_code, topic_name, msg, qos=1):
- """
- 通用发布MQTT消息函数
- @param identification_code: 标识码
- @param topic_name: 主题名
- @param msg: 消息内容
- @param qos: mqtt qos等级
- @return: boolean
- """
- if not all([identification_code, topic_name]):
- return False
- if identification_code.endswith('11L'):
- thing_name = 'LC_' + identification_code
- else:
- thing_name = 'Ansjer_Device_' + identification_code
- try:
- # 获取数据组织将要请求的url
- iot = iotdeviceInfoModel.objects.filter(
- thing_name=thing_name).values(
- 'endpoint', 'token_iot_number')
- if not iot.exists():
- return False
- endpoint = iot[0]['endpoint']
- Token = iot[0]['token_iot_number']
- # api doc: https://docs.aws.amazon.com/zh_cn/iot/latest/developerguide/http.html
- # url: https://IoT_data_endpoint/topics/url_encoded_topic_name?qos=1
- # post请求url发布MQTT消息
- url = 'https://{}/topics/{}?qos={}'.format(endpoint, topic_name, qos)
- authorizer_name = 'Ansjer_Iot_Auth'
- signature = CommonService.rsa_sign(Token) # Token签名
- headers = {
- 'x-amz-customauthorizer-name': authorizer_name,
- 'Token': Token,
- 'x-amz-customauthorizer-signature': signature}
- r = requests.post(url=url, headers=headers, json=msg, timeout=2)
- if r.status_code == 200:
- res = r.json()
- if res['message'] == 'OK':
- return True
- return False
- else:
- return False
- except Exception as e:
- return False
- @staticmethod
- def rsa_sign(Token):
- # 私钥签名Token
- if not Token:
- return ''
- private_key_file = '''-----BEGIN RSA PRIVATE KEY-----
- MIIEpQIBAAKCAQEA5iJzEDPqtGmFMggekVro6C0lrjuC2BjunGkrFNJWpDYzxCzE
- X5jf4/Fq7hcIaQd5sqHugDxPVollSLPe9zNilbrd0sZfU+Ed8gRVuKW9KwfE9XFr
- L0pt6bKRQ0IIRfiZ9TuR0tsQysvcO1GZSXcYfPue3tGM1zOnWFThWDqZ06+sOxzt
- RMRl4yNfbpCG4MfxG3itNXOfrjZv2OMLSXrxmzubSvRpUYSvQPs4fm9302SAnySY
- 0MKzx6H6528ZQm/IDDSZy6EmNBIyTRDfxC56vnYcXvqedAQh7jJnjdvt6Q4MhASH
- eIYi1FBSdu2NT6wgpnrqXzx5pq9kR/lnsLID0wIDAQABAoIBAQCiF4GT1/1oNSpr
- ouxk1PNXFPWFUsVGD8mAwVJmx//eiY7MjfuCmdqYYmI+cFqsH2fIOeYSzGfVO9Dq
- 9EYHN1oovAWhf7eFDPpajFMUSyiCNmazub8VAAeKowtNpCTPo9pMsDh1m3aoYA4u
- ebrN0+Sbo16y8kWRDgDAZoiR7DSMs8lczk16hwfv5mw8XpNDbaL3Coi4Koe2S1Yh
- 2SX3vWFlpd7qF1ZYXuZIp+b8JPrV7n9eUKoFgzj0gqgwQK80CoexIjiOrNMPvkQa
- q+8kCvFjAzKxOK7e8gjM8lMRiGodb61kmYZkkJzFwWO4EaGbl34lfVECd1Ixp3tF
- be0OWAGBAoGBAPSteXDzzToD8ovM7LL11x0jWwI6HOiHu89kZtW566rIezjWBuA2
- TxrcYKM3h9jQRXS3CsMdoIv6XGk5lqM8ADtjn23FBWe/THYLh8bm8JOgh5RRWQDg
- SvkLfi9Ih2mM4NJfmuuDOh3Nze2efLM7+kOZWUQwF2Zx9mL5jvRBk351AoGBAPDI
- sYmT2Li+i5+0vykA2m5uPF8ZOW8BGtAfCZv0suW7BNzSgin78g9WapRd/4p0NNiL
- /nVMqPPCpd1akCUpV+GDWQt0hV+HZjxANE0KWhciQRyo2qvo51j8SWILJSgh0tXC
- aTF8qt6oGw3VN3m57vKhbrlDaz0J/NDJFci6msAnAoGBAOuG6bXPGijUj+//DYKf
- n7jOxdZ49kboEePrtAncdHzri6IEdI3z+WXT6bpzw/LzWUimwldb96WHFNm9s8Hi
- Ch8hIODbnP5naUTgiIzw1XhmONyPCewL/F+LrqX5XVA/alNX8JrwsUrrR2WLAGLQ
- Q3I69XDsEjptTU2tCO0bCs3ZAoGBAJ2lCHfm0JHET230zONvp5N9oREyVqQSuRdh
- +syc3TQDyh85w/bw+X6JOaaCFHj1tFPC9Iqf8k4GNspCLPXnp54CfR4+38O3xnvU
- HWoDSRC0YKT++IxtJGriYrlKSr2Hx54kdvLriIPW1D+uRW/xCDza7L9nIKMKEvgv
- b4/IfOEpAoGAeKM9Te7T1VzlAkS0CJOwanzwYV/zrex84WuXxlsGgPQ871lTs5AP
- H1QLfLfFXH+UVrCEC2yv4eml/cqFkpB3gE5i4MQ8GPVIOSs5tsIyl8YUA03vdNdB
- GCqvlyw5dfxNA+EtxNE2wCW/LW7ENJlACgcfgPlBZtpLheWoZB/maw4=
- -----END RSA PRIVATE KEY-----'''
- # 使用密钥文件方式
- # private_key_file_path = os.path.join(BASE_DIR, 'static/iotCore/private.pem')#.replace('\\', '/')
- # private_key_file = open(private_key_file_path, 'r')
- private_key = ct.load_privatekey(ct.FILETYPE_PEM, private_key_file)
- signature = ct.sign(private_key, Token.encode('utf8'), 'sha256')
- signature = encodebytes(signature).decode('utf8').replace('\n', '')
- # print('signature:', signature)
- return signature
- @staticmethod
- def timestamp_to_iso8601(timestamp):
- # 将十位时间戳(秒)转换为UTC时区的datetime对象
- utc_dt = datetime.datetime.fromtimestamp(timestamp, pytz.UTC)
- # 格式化为YYYY-MM-DDThh:mm:ssZ格式
- return utc_dt.strftime("%Y-%m-%dT%H:%M:%SZ")
- @classmethod
- def get_access_token(cls, user_id):
- auth_qs = AlexaAuthModel.objects.filter(userID=user_id).values(
- 'expiresTime', 'refresh_token', 'access_token', 'alexa_region', 'skill_name')
- if not auth_qs.exists():
- return False
- expires_time = auth_qs[0]['expiresTime']
- alexa_region = auth_qs[0]['alexa_region']
- now_time = int(time.time())
- if now_time < expires_time:
- access_token = auth_qs[0]['access_token']
- else:
- if alexa_region not in ALEXA_EVENT_API.keys():
- return False
- skill_name = auth_qs[0]['skill_name']
- refresh_token = auth_qs[0]['refresh_token']
- res = cls.get_refresh_token(refresh_token, skill_name)
- if 'error' not in res:
- auth_qs.update(
- access_token=res['access_token'],
- refresh_token=res['refresh_token'],
- expiresTime=now_time + 3000,
- updTime=now_time,
- )
- access_token = res['access_token']
- else:
- return False
- return access_token
- @staticmethod
- def get_refresh_token(refresh_token, skill_name):
- # 请求更新token
- if skill_name not in CLIENT_CONFIG.keys():
- return {'error': 'error skill_name'}
- payload = {
- 'grant_type': 'refresh_token',
- 'refresh_token': refresh_token,
- 'client_id': CLIENT_CONFIG[skill_name]['client_id'],
- 'client_secret': CLIENT_CONFIG[skill_name]['client_secret'],
- }
- auth_request_url = 'https://api.amazon.com/auth/o2/token'
- headers = {
- 'content-type': "application/x-www-form-urlencoded",
- 'cache-control': "no-cache"
- }
- try:
- res = requests.post(auth_request_url, data=payload, headers=headers, timeout=5)
- res.raise_for_status()
- request_json = res.json()
- return request_json
- except requests.exceptions.RequestException as e:
- return {'error': str(e)}
|