# -*- coding: utf-8 -*- import base64 import datetime import os import time import hashlib from pathlib import Path from random import Random import ipdb import requests import simplejson as json from boto3 import Session from django.core import serializers from django.utils import timezone from pyipip import IPIPDatabase import OpenSSL.crypto as ct from base64 import encodebytes from AnsjerPush.config import BASE_DIR, ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION_NAME, PUSH_BUCKET, SYS_EVENT_TYPE_LIST, \ DATA_PUSH_EVENT_TYPE_LIST from Model.models import iotdeviceInfoModel from Object.enums.EventTypeEnum import EventTypeEnumObj # 复用性且公用较高封装代码在这 class CommonService: # 添加模糊搜索 @staticmethod def get_kwargs(data={}): kwargs = {} for (k, v) in data.items(): if v is not None and v != u'': kwargs[k + '__icontains'] = v return kwargs # 定义静态方法 # 格式化query_set转dict @staticmethod def qs_to_dict(query_set): sqlJSON = serializers.serialize('json', query_set) sqlList = json.loads(sqlJSON) sqlDict = dict(zip(["datas"], [sqlList])) return sqlDict # 获取文件大小 @staticmethod def get_file_size(file_path='', suffix_type='', decimal_point=0): # for x in ['bytes', 'KB', 'MB', 'GB', 'TB']: # path = Path() / 'D:/TestServer/123444.mp4' path = Path() / file_path size = path.stat().st_size mb_size = 0.0 if suffix_type == 'MB': mb_size = size / 1024.0 / 1024.0 if decimal_point != 0: mb_size = round(mb_size, decimal_point) return mb_size @staticmethod def get_param_flag(data=[]): # print(data) flag = True for v in data: if v is None: flag = False break return flag @staticmethod def get_ip_address(request): """ 获取ip地址 :param request: :return: """ try: real_ip = request.META['HTTP_X_FORWARDED_FOR'] clientIP = real_ip.split(",")[0] except: try: clientIP = request.META['REMOTE_ADDR'] except Exception as e: clientIP = '' return clientIP # @获取一天每个小时的datetime.datetime @staticmethod def getTimeDict(times): time_dict = {} t = 0 for x in range(24): if x < 10: x = '0' + str(x) else: x = str(x) a = times.strftime("%Y-%m-%d") + " " + x + ":00:00" time_dict[t] = timezone.datetime.strptime(a, '%Y-%m-%d %H:%M:%S') t += 1 return time_dict # 根据ip获取地址 @staticmethod def getAddr(ip): base_dir = BASE_DIR # ip数据库 db = IPIPDatabase(base_dir + '/DB/17monipdb.dat') addr = db.lookup(ip) ts = addr.split('\t')[0] return ts # 通过ip检索ipip指定信息 lang为CN或EN @staticmethod def getIpIpInfo(ip, lang, update=False): ipbd_dir = BASE_DIR + "/DB/mydata4vipday2.ipdb" db = ipdb.City(ipbd_dir) if update: rr = db.reload(ipbd_dir) info = db.find_map(ip, lang) return info @staticmethod def getUserID(userPhone='13800138000', getUser=True, setOTAID=False, μs=True): if μs == True: if getUser == True: timeID = str(round(time.time() * 1000000)) userID = timeID + userPhone return userID else: if setOTAID == False: timeID = str(round(time.time() * 1000000)) ID = userPhone + timeID return ID else: timeID = str(round(time.time() * 1000000)) eID = '13800' + timeID + '138000' return eID else: if getUser == True: timeID = str(round(time.time() * 1000)) userID = timeID + userPhone return userID else: if setOTAID == False: timeID = str(round(time.time() * 1000)) ID = userPhone + timeID return ID else: timeID = str(round(time.time() * 1000)) eID = '13800' + timeID + '138000' return eID # 生成随机数 @staticmethod def RandomStr(randomlength=8, number=True): str = '' if number == False: characterSet = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsT' \ 'tUuVvWwXxYyZz0123456789' else: characterSet = '0123456789' length = len(characterSet) - 1 random = Random() for index in range(randomlength): str += characterSet[random.randint(0, length)] return str # 生成订单好 @staticmethod def createOrderID(): random_id = CommonService.RandomStr(6, True) order_id = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + str(random_id) print('orderID:') print(order_id) return order_id # qs转换list datetime处理 @staticmethod def qs_to_list(qs): res = [] # print(qs) for ps in qs: if 'add_time' in ps: ps['add_time'] = ps['add_time'].strftime("%Y-%m-%d %H:%M:%S") if 'update_time' in ps: ps['update_time'] = ps['update_time'].strftime("%Y-%m-%d %H:%M:%S") if 'end_time' in ps: ps['end_time'] = ps['end_time'].strftime("%Y-%m-%d %H:%M:%S") if 'data_joined' in ps: if ps['data_joined']: ps['data_joined'] = ps['data_joined'].strftime("%Y-%m-%d %H:%M:%S") else: ps['data_joined'] = '' res.append(ps) return res # 获取当前时间 @staticmethod def get_now_time_str(n_time, tz, lang): print(n_time) print(tz) print(lang) try: tz = tz.replace(':', '.') n_time = int(n_time) + 3600 * float(tz) except: n_time = int(n_time) if lang == 'cn': return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(int(n_time))) else: return time.strftime('%m-%d-%Y %H:%M:%S', time.gmtime(int(n_time))) @staticmethod def app_log_log(uid='None', tz='0'): file_path = '/'.join((BASE_DIR, 'static/app_log.log')) file = open(file_path, 'a+') file.write("uid:" + uid + "; " + "; tz:" + tz) file.write('\n') file.flush() file.close() @classmethod def upload_images(cls, file_dict, dir_path): """ 上传图片到S3,并删除本地图片 @param file_dict: S3图片路径 @param dir_path: 本地图片路径 @return: boolean """ try: s3 = Session( aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY, region_name=REGION_NAME ).resource('s3') for file_path, upload_path in file_dict.items(): upload_data = open(file_path, 'rb') s3.Bucket(PUSH_BUCKET).put_object(Key=upload_path, Body=upload_data) # 删除图片 cls.del_path(dir_path) cls.del_path(dir_path + '.jpg') return True except Exception as e: print(repr(e)) return False @classmethod def del_path(cls, path): """ 删除目录文件 @param path: 文件路径 @return: None """ if not os.path.exists(path): return if os.path.isfile(path): os.remove(path) else: items = os.listdir(path) for f in items: c_path = os.path.join(path, f) if os.path.isdir(c_path): cls.del_path(c_path) else: os.remove(c_path) os.rmdir(path) @staticmethod def getMD5Sign(data, key): ''' 魅族MD5签名 ''' dataList = [] for k in sorted(data): dataList.append("%s=%s" % (k, data[k])) data = (''.join(dataList)) data = data + key sign = hashlib.md5(data.encode(encoding="utf-8")).hexdigest() return sign @staticmethod def check_time_stamp_token(token, time_stamp): # 时间戳token校验 if not all([token, time_stamp]): return False try: token = int(CommonService.decode_data(token)) time_stamp = int(time_stamp) now_time = int(time.time()) distance = now_time - time_stamp if token != time_stamp or distance > 60000 or distance < -60000: # 为了全球化时间控制在一天内 return False return True except Exception as e: print(e) return False @staticmethod def decode_data(content, start=1, end=4): """ 数据解密 @param content: 数据内容 @param start: 起始长度 @param end: 结束长度 @return content: 解密的数据 """ if not content: return '' for i in range(start, end): content = base64.b64decode(content) content = content.decode('utf-8') content = content[i:-i] return content @staticmethod def timestamp_to_str(timestamp): """ 时间戳转时间字符串 @param timestamp: 时间戳 @return time_str: 时间字符串 """ struct_time = time.localtime(timestamp) time_str = time.strftime("%Y-%m-%d %H:%M:%S", struct_time) return time_str @staticmethod def req_publish_mqtt_msg(identification_code, topic_name, msg, qos=1): """ 通用发布MQTT消息函数 @param identification_code: 标识码 @param topic_name: 主题名 @param msg: 消息内容 @param qos: mqtt qos等级 @return: boolean """ if not all([identification_code, topic_name]): return False if identification_code.endswith('11L'): thing_name = 'LC_' + identification_code else: thing_name = 'Ansjer_Device_' + identification_code try: # 获取数据组织将要请求的url iot = iotdeviceInfoModel.objects.filter( thing_name=thing_name).values( 'endpoint', 'token_iot_number') if not iot.exists(): return False endpoint = iot[0]['endpoint'] Token = iot[0]['token_iot_number'] # api doc: https://docs.aws.amazon.com/zh_cn/iot/latest/developerguide/http.html # url: https://IoT_data_endpoint/topics/url_encoded_topic_name?qos=1 # post请求url发布MQTT消息 url = 'https://{}/topics/{}?qos={}'.format(endpoint, topic_name, qos) authorizer_name = 'Ansjer_Iot_Auth' signature = CommonService.rsa_sign(Token) # Token签名 headers = { 'x-amz-customauthorizer-name': authorizer_name, 'Token': Token, 'x-amz-customauthorizer-signature': signature} r = requests.post(url=url, headers=headers, json=msg, timeout=2) if r.status_code == 200: res = r.json() if res['message'] == 'OK': return True return False else: return False except Exception as e: return False @staticmethod def rsa_sign(Token): # 私钥签名Token if not Token: return '' private_key_file = '''-----BEGIN RSA PRIVATE KEY----- MIIEpQIBAAKCAQEA5iJzEDPqtGmFMggekVro6C0lrjuC2BjunGkrFNJWpDYzxCzE X5jf4/Fq7hcIaQd5sqHugDxPVollSLPe9zNilbrd0sZfU+Ed8gRVuKW9KwfE9XFr L0pt6bKRQ0IIRfiZ9TuR0tsQysvcO1GZSXcYfPue3tGM1zOnWFThWDqZ06+sOxzt RMRl4yNfbpCG4MfxG3itNXOfrjZv2OMLSXrxmzubSvRpUYSvQPs4fm9302SAnySY 0MKzx6H6528ZQm/IDDSZy6EmNBIyTRDfxC56vnYcXvqedAQh7jJnjdvt6Q4MhASH eIYi1FBSdu2NT6wgpnrqXzx5pq9kR/lnsLID0wIDAQABAoIBAQCiF4GT1/1oNSpr ouxk1PNXFPWFUsVGD8mAwVJmx//eiY7MjfuCmdqYYmI+cFqsH2fIOeYSzGfVO9Dq 9EYHN1oovAWhf7eFDPpajFMUSyiCNmazub8VAAeKowtNpCTPo9pMsDh1m3aoYA4u ebrN0+Sbo16y8kWRDgDAZoiR7DSMs8lczk16hwfv5mw8XpNDbaL3Coi4Koe2S1Yh 2SX3vWFlpd7qF1ZYXuZIp+b8JPrV7n9eUKoFgzj0gqgwQK80CoexIjiOrNMPvkQa q+8kCvFjAzKxOK7e8gjM8lMRiGodb61kmYZkkJzFwWO4EaGbl34lfVECd1Ixp3tF be0OWAGBAoGBAPSteXDzzToD8ovM7LL11x0jWwI6HOiHu89kZtW566rIezjWBuA2 TxrcYKM3h9jQRXS3CsMdoIv6XGk5lqM8ADtjn23FBWe/THYLh8bm8JOgh5RRWQDg SvkLfi9Ih2mM4NJfmuuDOh3Nze2efLM7+kOZWUQwF2Zx9mL5jvRBk351AoGBAPDI sYmT2Li+i5+0vykA2m5uPF8ZOW8BGtAfCZv0suW7BNzSgin78g9WapRd/4p0NNiL /nVMqPPCpd1akCUpV+GDWQt0hV+HZjxANE0KWhciQRyo2qvo51j8SWILJSgh0tXC aTF8qt6oGw3VN3m57vKhbrlDaz0J/NDJFci6msAnAoGBAOuG6bXPGijUj+//DYKf n7jOxdZ49kboEePrtAncdHzri6IEdI3z+WXT6bpzw/LzWUimwldb96WHFNm9s8Hi Ch8hIODbnP5naUTgiIzw1XhmONyPCewL/F+LrqX5XVA/alNX8JrwsUrrR2WLAGLQ Q3I69XDsEjptTU2tCO0bCs3ZAoGBAJ2lCHfm0JHET230zONvp5N9oREyVqQSuRdh +syc3TQDyh85w/bw+X6JOaaCFHj1tFPC9Iqf8k4GNspCLPXnp54CfR4+38O3xnvU HWoDSRC0YKT++IxtJGriYrlKSr2Hx54kdvLriIPW1D+uRW/xCDza7L9nIKMKEvgv b4/IfOEpAoGAeKM9Te7T1VzlAkS0CJOwanzwYV/zrex84WuXxlsGgPQ871lTs5AP H1QLfLfFXH+UVrCEC2yv4eml/cqFkpB3gE5i4MQ8GPVIOSs5tsIyl8YUA03vdNdB GCqvlyw5dfxNA+EtxNE2wCW/LW7ENJlACgcfgPlBZtpLheWoZB/maw4= -----END RSA PRIVATE KEY-----''' # 使用密钥文件方式 # private_key_file_path = os.path.join(BASE_DIR, 'static/iotCore/private.pem')#.replace('\\', '/') # private_key_file = open(private_key_file_path, 'r') private_key = ct.load_privatekey(ct.FILETYPE_PEM, private_key_file) signature = ct.sign(private_key, Token.encode('utf8'), 'sha256') signature = encodebytes(signature).decode('utf8').replace('\n', '') # print('signature:', signature) return signature @staticmethod def get_jump_type(event_type): """ 获取跳转类型 @param event_type: 事件类型 @return event_type: 跳转类型,1:推送消息,2:系统消息,3:音视频通话消息 """ # 跳转类型,1:推送消息,2:系统消息,3:音视频通话消息 jump_type = 1 event_type = int(event_type) if event_type in SYS_EVENT_TYPE_LIST: jump_type = 2 elif event_type in DATA_PUSH_EVENT_TYPE_LIST: jump_type = 3 return jump_type