123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319 |
- # -*- coding: utf-8 -*-
- import base64
- import datetime
- import os
- import time
- import hashlib
- from pathlib import Path
- from random import Random
- import ipdb
- import simplejson as json
- from boto3 import Session
- from django.core import serializers
- from django.utils import timezone
- from pyipip import IPIPDatabase
- from AnsjerPush.config import BASE_DIR, ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION_NAME, PUSH_BUCKET
- # 复用性且公用较高封装代码在这
- class CommonService:
- # 添加模糊搜索
- @staticmethod
- def get_kwargs(data={}):
- kwargs = {}
- for (k, v) in data.items():
- if v is not None and v != u'':
- kwargs[k + '__icontains'] = v
- return kwargs
- # 定义静态方法
- # 格式化query_set转dict
- @staticmethod
- def qs_to_dict(query_set):
- sqlJSON = serializers.serialize('json', query_set)
- sqlList = json.loads(sqlJSON)
- sqlDict = dict(zip(["datas"], [sqlList]))
- return sqlDict
- # 获取文件大小
- @staticmethod
- def get_file_size(file_path='', suffix_type='', decimal_point=0):
- # for x in ['bytes', 'KB', 'MB', 'GB', 'TB']:
- # path = Path() / 'D:/TestServer/123444.mp4'
- path = Path() / file_path
- size = path.stat().st_size
- mb_size = 0.0
- if suffix_type == 'MB':
- mb_size = size / 1024.0 / 1024.0
- if decimal_point != 0:
- mb_size = round(mb_size, decimal_point)
- return mb_size
- @staticmethod
- def get_param_flag(data=[]):
- # print(data)
- flag = True
- for v in data:
- if v is None:
- flag = False
- break
- return flag
- @staticmethod
- def get_ip_address(request):
- """
- 获取ip地址
- :param request:
- :return:
- """
- try:
- real_ip = request.META['HTTP_X_FORWARDED_FOR']
- clientIP = real_ip.split(",")[0]
- except:
- try:
- clientIP = request.META['REMOTE_ADDR']
- except Exception as e:
- clientIP = ''
- return clientIP
- # @获取一天每个小时的datetime.datetime
- @staticmethod
- def getTimeDict(times):
- time_dict = {}
- t = 0
- for x in range(24):
- if x < 10:
- x = '0' + str(x)
- else:
- x = str(x)
- a = times.strftime("%Y-%m-%d") + " " + x + ":00:00"
- time_dict[t] = timezone.datetime.strptime(a, '%Y-%m-%d %H:%M:%S')
- t += 1
- return time_dict
- # 根据ip获取地址
- @staticmethod
- def getAddr(ip):
- base_dir = BASE_DIR
- # ip数据库
- db = IPIPDatabase(base_dir + '/DB/17monipdb.dat')
- addr = db.lookup(ip)
- ts = addr.split('\t')[0]
- return ts
- # 通过ip检索ipip指定信息 lang为CN或EN
- @staticmethod
- def getIpIpInfo(ip, lang, update=False):
- ipbd_dir = BASE_DIR + "/DB/mydata4vipday2.ipdb"
- db = ipdb.City(ipbd_dir)
- if update:
- from var_dump import var_dump
- var_dump('is_update')
- rr = db.reload(ipbd_dir)
- var_dump(rr)
- info = db.find_map(ip, lang)
- return info
- @staticmethod
- def getUserID(userPhone='13800138000', getUser=True, setOTAID=False, μs=True):
- if μs == True:
- if getUser == True:
- timeID = str(round(time.time() * 1000000))
- userID = timeID + userPhone
- return userID
- else:
- if setOTAID == False:
- timeID = str(round(time.time() * 1000000))
- ID = userPhone + timeID
- return ID
- else:
- timeID = str(round(time.time() * 1000000))
- eID = '13800' + timeID + '138000'
- return eID
- else:
- if getUser == True:
- timeID = str(round(time.time() * 1000))
- userID = timeID + userPhone
- return userID
- else:
- if setOTAID == False:
- timeID = str(round(time.time() * 1000))
- ID = userPhone + timeID
- return ID
- else:
- timeID = str(round(time.time() * 1000))
- eID = '13800' + timeID + '138000'
- return eID
- # 生成随机数
- @staticmethod
- def RandomStr(randomlength=8, number=True):
- str = ''
- if number == False:
- characterSet = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsT' \
- 'tUuVvWwXxYyZz0123456789'
- else:
- characterSet = '0123456789'
- length = len(characterSet) - 1
- random = Random()
- for index in range(randomlength):
- str += characterSet[random.randint(0, length)]
- return str
- # 生成订单好
- @staticmethod
- def createOrderID():
- random_id = CommonService.RandomStr(6, True)
- order_id = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + str(random_id)
- print('orderID:')
- print(order_id)
- return order_id
- # qs转换list datetime处理
- @staticmethod
- def qs_to_list(qs):
- res = []
- # print(qs)
- for ps in qs:
- if 'add_time' in ps:
- ps['add_time'] = ps['add_time'].strftime("%Y-%m-%d %H:%M:%S")
- if 'update_time' in ps:
- ps['update_time'] = ps['update_time'].strftime("%Y-%m-%d %H:%M:%S")
- if 'end_time' in ps:
- ps['end_time'] = ps['end_time'].strftime("%Y-%m-%d %H:%M:%S")
- if 'data_joined' in ps:
- if ps['data_joined']:
- ps['data_joined'] = ps['data_joined'].strftime("%Y-%m-%d %H:%M:%S")
- else:
- ps['data_joined'] = ''
- res.append(ps)
- return res
- # 获取当前时间
- @staticmethod
- def get_now_time_str(n_time, tz, lang):
- print(n_time)
- print(tz)
- print(lang)
- try:
- tz = tz.replace(':', '.')
- n_time = int(n_time) + 3600 * float(tz)
- except:
- n_time = int(n_time)
- if lang == 'cn':
- return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(int(n_time)))
- else:
- return time.strftime('%m-%d-%Y %H:%M:%S', time.gmtime(int(n_time)))
- @staticmethod
- def app_log_log(uid='None', tz='0'):
- file_path = '/'.join((BASE_DIR, 'static/app_log.log'))
- file = open(file_path, 'a+')
- file.write("uid:" + uid + "; " + "; tz:" + tz)
- file.write('\n')
- file.flush()
- file.close()
- @classmethod
- def upload_images(cls, file_dict, dir_path):
- """
- 上传图片到S3,并删除本地图片
- @param file_dict: S3图片路径
- @param dir_path: 本地图片路径
- @return: boolean
- """
- try:
- s3 = Session(
- aws_access_key_id=ACCESS_KEY_ID,
- aws_secret_access_key=SECRET_ACCESS_KEY,
- region_name=REGION_NAME
- ).resource('s3')
- for file_path, upload_path in file_dict.items():
- upload_data = open(file_path, 'rb')
- s3.Bucket(PUSH_BUCKET).put_object(Key=upload_path, Body=upload_data)
- # 删除图片
- cls.del_path(dir_path)
- cls.del_path(dir_path + '.jpg')
- return True
- except Exception as e:
- print(repr(e))
- return False
- @classmethod
- def del_path(cls, path):
- """
- 删除目录文件
- @param path: 文件路径
- @return: None
- """
- if not os.path.exists(path):
- return
- if os.path.isfile(path):
- os.remove(path)
- else:
- items = os.listdir(path)
- for f in items:
- c_path = os.path.join(path, f)
- if os.path.isdir(c_path):
- cls.del_path(c_path)
- else:
- os.remove(c_path)
- os.rmdir(path)
- @staticmethod
- def getMD5Sign(data, key):
- '''
- 魅族MD5签名
- '''
- dataList = []
- for k in sorted(data):
- dataList.append("%s=%s" % (k, data[k]))
- data = (''.join(dataList))
- data = data + key
- sign = hashlib.md5(data.encode(encoding="utf-8")).hexdigest()
- return sign
- @staticmethod
- def check_time_stamp_token(token, time_stamp):
- # 时间戳token校验
- if not all([token, time_stamp]):
- return False
- try:
- token = int(CommonService.decode_data(token))
- time_stamp = int(time_stamp)
- now_time = int(time.time())
- distance = now_time - time_stamp
- if token != time_stamp or distance > 60000 or distance < -60000: # 为了全球化时间控制在一天内
- return False
- return True
- except Exception as e:
- print(e)
- return False
- @staticmethod
- def decode_data(content, start=1, end=4):
- """
- 数据解密
- @param content: 数据内容
- @param start: 起始长度
- @param end: 结束长度
- @return content: 解密的数据
- """
- if not content:
- return ''
- for i in range(start, end):
- content = base64.b64decode(content)
- content = content.decode('utf-8')
- content = content[i:-i]
- return content
|