123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- import datetime
- import time
- from pathlib import Path
- from random import Random
- import base64
- import ipdb
- import requests
- import OpenSSL.crypto as ct
- from base64 import encodebytes
- import simplejson as json
- from django.core import serializers
- from django.utils import timezone
- from pyipip import IPIPDatabase
- from model.models import iotdeviceInfoModel, UidRtspModel
- # 复用性且公用较高封装代码在这
- class CommonService:
- # 生成随机数
- @staticmethod
- def encrypt_data(randomlength=8, number=False):
- str = ''
- if number == False:
- characterSet = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsT' \
- 'tUuVvWwXxYyZz0123456789'
- else:
- characterSet = '0123456789'
- length = len(characterSet) - 1
- random = Random()
- for index in range(randomlength):
- str += characterSet[random.randint(0, length)]
- return str
- # 加密
- # @staticmethod
- def encrypt_pwd(self, userPwd):
- for i in range(1, 4):
- if i == 1:
- userPwd = self.RandomStr(3, False)+userPwd+self.RandomStr(3, False)
- userPwd = base64.b64encode(str(userPwd).encode("utf-8")).decode('utf8')
- if i == 2:
- userPwd = self.RandomStr(2, False)+str(userPwd)+self.RandomStr(2, False)
- userPwd = base64.b64encode(str(userPwd).encode("utf-8")).decode('utf8')
- if i == 3:
- userPwd = self.RandomStr(1, False)+str(userPwd)+self.RandomStr(1, False)
- userPwd = base64.b64encode(str(userPwd).encode("utf-8")).decode('utf8')
- return userPwd
- # 解密
- @staticmethod
- def decode_pwd(password):
- for i in range(1, 4):
- if i == 1:
- # 第一次先解密
- password = base64.b64decode(password)
- password = password.decode('utf-8')
- # 截去第一位,最后一位
- password = password[1:-1]
- if i == 2:
- # 第2次先解密
- password = base64.b64decode(password)
- password = password.decode('utf-8')
- # 去前2位,后2位
- password = password[2:-2]
- if i == 3:
- # 第3次先解密
- password = base64.b64decode(password)
- password = password.decode('utf-8')
- # 去前3位,后3位
- password = password[3:-3]
- return password
- # 生成随机字符串
- @staticmethod
- def RandomStr(randomlength=8, number=False):
- str = ''
- if number == False:
- characterSet = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsT' \
- 'tUuVvWwXxYyZz0123456789'
- else:
- characterSet = '0123456789'
- length = len(characterSet) - 1
- random = Random()
- for index in range(randomlength):
- str += characterSet[random.randint(0, length)]
- return str
- @staticmethod
- def query_serial_with_uid(uid):
- # 根据uid查询序列号,存在则返回序列号,否则返uid
- uid_qs = UidRtspModel.objects.filter(uid=uid).values('serial_number')
- if uid_qs.exists():
- serial_number = uid_qs[0]['serial_number']
- if serial_number:
- return serial_number
- return uid
- @staticmethod
- def req_publish_mqtt_msg(identification_code, topic_name, msg, qos=1):
- """
- 通用发布MQTT消息函数
- @param identification_code: 标识码
- @param topic_name: 主题名
- @param msg: 消息内容
- @param qos: mqtt qos等级
- @return: boolean
- """
- if not all([identification_code, topic_name]):
- return False
- if identification_code.endswith('11L'):
- thing_name = 'LC_' + identification_code
- else:
- thing_name = 'Ansjer_Device_' + identification_code
- try:
- # 获取数据组织将要请求的url
- iot = iotdeviceInfoModel.objects.filter(
- thing_name=thing_name).values(
- 'endpoint', 'token_iot_number')
- if not iot.exists():
- return False
- endpoint = iot[0]['endpoint']
- Token = iot[0]['token_iot_number']
- # api doc: https://docs.aws.amazon.com/zh_cn/iot/latest/developerguide/http.html
- # url: https://IoT_data_endpoint/topics/url_encoded_topic_name?qos=1
- # post请求url发布MQTT消息
- url = 'https://{}/topics/{}?qos={}'.format(endpoint, topic_name, qos)
- authorizer_name = 'Ansjer_Iot_Auth'
- signature = CommonService.rsa_sign(Token) # Token签名
- headers = {
- 'x-amz-customauthorizer-name': authorizer_name,
- 'Token': Token,
- 'x-amz-customauthorizer-signature': signature}
- r = requests.post(url=url, headers=headers, json=msg, timeout=2)
- if r.status_code == 200:
- res = r.json()
- if res['message'] == 'OK':
- return True
- return False
- else:
- return False
- except Exception as e:
- return False
- @staticmethod
- def rsa_sign(Token):
- # 私钥签名Token
- if not Token:
- return ''
- private_key_file = '''-----BEGIN RSA PRIVATE KEY-----
- MIIEpQIBAAKCAQEA5iJzEDPqtGmFMggekVro6C0lrjuC2BjunGkrFNJWpDYzxCzE
- X5jf4/Fq7hcIaQd5sqHugDxPVollSLPe9zNilbrd0sZfU+Ed8gRVuKW9KwfE9XFr
- L0pt6bKRQ0IIRfiZ9TuR0tsQysvcO1GZSXcYfPue3tGM1zOnWFThWDqZ06+sOxzt
- RMRl4yNfbpCG4MfxG3itNXOfrjZv2OMLSXrxmzubSvRpUYSvQPs4fm9302SAnySY
- 0MKzx6H6528ZQm/IDDSZy6EmNBIyTRDfxC56vnYcXvqedAQh7jJnjdvt6Q4MhASH
- eIYi1FBSdu2NT6wgpnrqXzx5pq9kR/lnsLID0wIDAQABAoIBAQCiF4GT1/1oNSpr
- ouxk1PNXFPWFUsVGD8mAwVJmx//eiY7MjfuCmdqYYmI+cFqsH2fIOeYSzGfVO9Dq
- 9EYHN1oovAWhf7eFDPpajFMUSyiCNmazub8VAAeKowtNpCTPo9pMsDh1m3aoYA4u
- ebrN0+Sbo16y8kWRDgDAZoiR7DSMs8lczk16hwfv5mw8XpNDbaL3Coi4Koe2S1Yh
- 2SX3vWFlpd7qF1ZYXuZIp+b8JPrV7n9eUKoFgzj0gqgwQK80CoexIjiOrNMPvkQa
- q+8kCvFjAzKxOK7e8gjM8lMRiGodb61kmYZkkJzFwWO4EaGbl34lfVECd1Ixp3tF
- be0OWAGBAoGBAPSteXDzzToD8ovM7LL11x0jWwI6HOiHu89kZtW566rIezjWBuA2
- TxrcYKM3h9jQRXS3CsMdoIv6XGk5lqM8ADtjn23FBWe/THYLh8bm8JOgh5RRWQDg
- SvkLfi9Ih2mM4NJfmuuDOh3Nze2efLM7+kOZWUQwF2Zx9mL5jvRBk351AoGBAPDI
- sYmT2Li+i5+0vykA2m5uPF8ZOW8BGtAfCZv0suW7BNzSgin78g9WapRd/4p0NNiL
- /nVMqPPCpd1akCUpV+GDWQt0hV+HZjxANE0KWhciQRyo2qvo51j8SWILJSgh0tXC
- aTF8qt6oGw3VN3m57vKhbrlDaz0J/NDJFci6msAnAoGBAOuG6bXPGijUj+//DYKf
- n7jOxdZ49kboEePrtAncdHzri6IEdI3z+WXT6bpzw/LzWUimwldb96WHFNm9s8Hi
- Ch8hIODbnP5naUTgiIzw1XhmONyPCewL/F+LrqX5XVA/alNX8JrwsUrrR2WLAGLQ
- Q3I69XDsEjptTU2tCO0bCs3ZAoGBAJ2lCHfm0JHET230zONvp5N9oREyVqQSuRdh
- +syc3TQDyh85w/bw+X6JOaaCFHj1tFPC9Iqf8k4GNspCLPXnp54CfR4+38O3xnvU
- HWoDSRC0YKT++IxtJGriYrlKSr2Hx54kdvLriIPW1D+uRW/xCDza7L9nIKMKEvgv
- b4/IfOEpAoGAeKM9Te7T1VzlAkS0CJOwanzwYV/zrex84WuXxlsGgPQ871lTs5AP
- H1QLfLfFXH+UVrCEC2yv4eml/cqFkpB3gE5i4MQ8GPVIOSs5tsIyl8YUA03vdNdB
- GCqvlyw5dfxNA+EtxNE2wCW/LW7ENJlACgcfgPlBZtpLheWoZB/maw4=
- -----END RSA PRIVATE KEY-----'''
- # 使用密钥文件方式
- # private_key_file_path = os.path.join(BASE_DIR, 'static/iotCore/private.pem')#.replace('\\', '/')
- # private_key_file = open(private_key_file_path, 'r')
- private_key = ct.load_privatekey(ct.FILETYPE_PEM, private_key_file)
- signature = ct.sign(private_key, Token.encode('utf8'), 'sha256')
- signature = encodebytes(signature).decode('utf8').replace('\n', '')
- # print('signature:', signature)
- return signature
|