# -*- encoding: utf-8 -*- """ @File : DevicePushService.py @Time : 2022/11/23 11:40 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import hashlib import json import logging import os import threading import time import apns2 import jpush as jpush import requests from pyfcm import FCMNotification from AnsjerPush.config import CONFIG_INFO, CONFIG_CN from AnsjerPush.config import JPUSH_CONFIG, FCM_CONFIG, APNS_CONFIG, BASE_DIR, APNS_MODE, XMPUSH_CONFIG, OPPOPUSH_CONFIG from Model.models import UidPushModel, SysMsgModel, DeviceSharePermission, DeviceChannelUserSet, \ DeviceChannelUserPermission from Object.ETkObject import ETkObject from Object.UidTokenObject import UidTokenObject from Object.utils import LocalDateTimeUtil from Service.CommonService import CommonService from Service.EquipmentInfoService import EquipmentInfoService from Service.HuaweiPushService.HuaweiPushService import HuaweiPushObject from Service.PushService import PushObject LOGGING = logging.getLogger('info') class DevicePushService: @staticmethod def decode_uid(etk, uidToken): """ 解密UID,优先解密etk 否则判断uidToken """ # 解密获取uid if etk: eto = ETkObject(etk) uid = eto.uid else: uto = UidTokenObject(uidToken) uid = uto.UID LOGGING.info('消息推送-当前UID:{}'.format(uid)) return uid @classmethod def query_uid_push(cls, uid): """ 查询uid_set与push数据列表 """ uid_push_qs = UidPushModel.objects.filter(uid_set__uid=uid, uid_set__detect_status=1). \ values('token_val', 'app_type', 'appBundleId', 'm_code', 'push_type', 'userID_id', 'userID__NickName', 'lang', 'm_code', 'tz', 'uid_set__nickname', 'uid_set__detect_interval', 'uid_set__detect_group', 'uid_set__channel', 'uid_set__ai_type', 'uid_set__new_detect_interval') return uid_push_qs @staticmethod def cache_uid_push(uid_push_qs): """ 将uid_push 信息进行缓存 @param uid_push_qs: uid_set & uid_push 列表对象 @return: uid_set_list """ uid_set_list = [] for qs in uid_push_qs: uid_set_list.append(qs) # redis_obj.set_data(key=name, val=str(redis_list), expire=expire) return uid_set_list @staticmethod def cache_push_detect_interval(redis_obj, name, detect_interval, new_detect_interval): """ 缓存设置推送消息的时间间隔 @param redis_obj: redis对象 @param name: redis key @param detect_interval: 原推送时间间隔 @param new_detect_interval: 新推送时间间隔 """ if CONFIG_INFO != CONFIG_CN: detect_interval = new_detect_interval if new_detect_interval > 0 else detect_interval detect_interval = 60 if detect_interval < 60 else detect_interval redis_obj.set_data(key=name, val=1, expire=detect_interval - 5) LOGGING.info('消息推送-缓存设置APP推送间隔:{}s'.format(detect_interval)) @classmethod def save_msg_push(cls, uid_set_push_list, **params): """ APP消息推送以及报警消息存库 @nickname 设备名称 @channel 通道 @event_type 事件类型 """ new_device_info_list = [] sys_msg_list = [] userID_ids = [] kwag_args = params['kwag_args'] code_data = {'do_apns_code': '', 'do_fcm_code': '', 'do_jpush_code': ''} local_date_time = '' push_permission = True try: for up in uid_set_push_list: appBundleId = up['appBundleId'] token_val = up['token_val'] lang = up['lang'] tz = up['tz'] if tz is None or tz == '': tz = 0 # 发送标题 msg_title = cls.get_msg_title(nickname=params['nickname']) # 发送内容 msg_text = cls.get_msg_text(channel=params['channel'], n_time=params['n_time'], lang=lang, tz=tz, event_type=params['event_type'], electricity=params['electricity']) kwag_args['appBundleId'] = appBundleId kwag_args['token_val'] = token_val kwag_args['msg_title'] = msg_title kwag_args['msg_text'] = msg_text LOGGING.info('推送要的数据: {}'.format(kwag_args)) local_date_time = CommonService.get_now_time_str(n_time=params['n_time'], tz=tz, lang='cn') LOGGING.info('<<<<<根据时区计算后日期={},时区={}'.format(local_date_time, tz)) local_date_time = local_date_time[0:10] LOGGING.info('<<<<<切片后的日期={}'.format(local_date_time)) # 以下是存库 userID_id = up["userID_id"] if userID_id not in userID_ids: now_time = int(time.time()) if params['is_sys_msg']: sys_msg_text = cls.get_msg_text(channel=params['channel'], n_time=params['n_time'], lang=lang, tz=tz, event_type=params['event_type'], electricity=params['electricity'], is_sys=1) sys_msg_list.append(SysMsgModel(userID_id=userID_id, msg=sys_msg_text, addTime=now_time, updTime=now_time, uid=params['uid'], eventType=params['event_type'])) else: LOGGING.info('分表存数据start------') params['userID_id'] = userID_id push_permission = DevicePushService.check_share_permission(userID_id, params['channel'], params['uid']) if push_permission: new_device_info_list.append(cls.created_device_vo(local_date_time, **params)) userID_ids.append(userID_id) params['appBundleId'] = appBundleId params['token_val'] = token_val params['lang'] = lang params['tz'] = tz params['kwag_args'] = kwag_args code_data = cls.send_app_msg_push(up['push_type'], **params) if push_permission else code_data return {'code_date': code_data, 'new_device_info_list': new_device_info_list, 'sys_msg_list': sys_msg_list, 'local_date_time': local_date_time} except Exception as e: LOGGING.info('推送消息或存表异常: errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def send_app_msg_push(cls, push_type, **param): """ 发送app消息推送 """ try: kwargs = param['kwag_args'] result = {'do_apns_code': '', 'do_fcm_code': '', 'do_jpush_code': '', 'do_xmpush_code': '', 'do_vivopush_code': '', 'do_meizupush_code': '', 'do_oppopush_code': ''} # 判断是否进行APP消息推送,如app_push不为空,则不进行推送 if not param['app_push']: LOGGING.info('APP准备推送:{}, {}'.format(param['uid'], param)) # 推送显示图片 if (param['is_st'] == 1 or param['is_st'] == 3) and \ (push_type == 0 or push_type == 1 or push_type == 3): if param['is_st'] == 1: key = '{}/{}/{}.jpeg'.format(param['uid'], param['channel'], param['n_time']) else: key = '{}/{}/{}_0.jpeg'.format(param['uid'], param['channel'], param['n_time']) push_thread = threading.Thread(target=cls.async_send_picture_push, args=( push_type, param['aws_s3_client'], param['bucket'], key, param['uid'], param['appBundleId'], param['token_val'], param['event_type'], param['n_time'], param['kwag_args']['msg_title'], param['kwag_args']['msg_text'], param['channel'])) push_thread.start() else: if push_type == 0: # ios apns result['do_apns_code'] = cls.do_apns(**kwargs) elif push_type == 1: # android gcm result['do_fcm_code'] = cls.do_fcm(**kwargs) elif push_type == 2: # android jpush result['do_jpush_code'] = cls.do_jpush(**kwargs) elif push_type == 3: huawei_push_object = HuaweiPushObject() huawei_push_object.send_push_notify_message(**kwargs) elif push_type == 4: # android xmpush channel_id = 104551 result['do_xmpush_code'] = cls.do_xmpush(channel_id=channel_id, **kwargs) elif push_type == 5: # android vivopush result['do_vivopush_code'] = PushObject.android_vivopush(**kwargs) elif push_type == 6: # android oppopush channel_id = 'DEVICE_REMINDER' result['do_oppopush_code'] = cls.do_oppopush(channel_id=channel_id, **kwargs) elif push_type == 7: # android meizupush result['do_meizupush_code'] = PushObject.android_meizupush(**kwargs) return result except Exception as e: LOGGING.info('异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return None @classmethod def save_sys_msg(cls, is_sys_msg, local_date_time, sys_msg_list, new_device_info_list): """ 保存系统消息&设备推送消息存库 """ if is_sys_msg: SysMsgModel.objects.bulk_create(sys_msg_list) else: # new 分表批量存储 设备信息 if new_device_info_list and len(new_device_info_list) > 0: # 根据日期获得星期几 week = LocalDateTimeUtil.date_to_week(local_date_time) EquipmentInfoService.equipment_info_bulk_create(week, new_device_info_list) LOGGING.info('设备信息分表批量保存end------') return True @classmethod def created_device_vo(cls, local_date_time, **params): """ 获取设备推送表对象 """ return EquipmentInfoService.get_equipment_info_obj( local_date_time, device_user_id=params['userID_id'], event_time=params['n_time'], event_type=params['event_type'], device_uid=params['uid'], device_nick_name=params['nickname'], channel=params['channel'], alarm='Motion \tChannel:{channel}'.format(channel=params['channel']), is_st=params['is_st'], receive_time=params['n_time'], add_time=int(time.time()), storage_location=params['storage_location'], border_coords='', ) @staticmethod def get_msg_title(nickname): """ 获取消息标题 """"" return nickname @staticmethod def get_msg_text(channel, n_time, lang, tz, event_type, electricity='', is_sys=0): """ 获取消息文本 """ n_date = CommonService.get_now_time_str(n_time=n_time, tz=tz, lang=lang) etype = int(event_type) if lang == 'cn': if etype == 704: msg_type = '剩余电量 ' + electricity elif etype == 702: msg_type = '摄像头休眠' elif etype == 703: msg_type = '摄像头唤醒' elif etype == 606: msg_type = '有人呼叫,请点击查看' else: msg_type = '' if is_sys: send_text = '{} 通道:{}'.format(msg_type, channel) else: send_text = '{} 通道:{} 日期:{}'.format(msg_type, channel, n_date) else: if etype == 704: msg_type = 'Battery remaining ' + electricity elif etype == 702: msg_type = 'Camera sleep' elif etype == 703: msg_type = 'Camera wake' elif etype == 606: msg_type = 'Someone is calling, please click to view' else: msg_type = '' if is_sys: send_text = '{} channel:{}'.format(msg_type, channel) else: send_text = '{} channel:{} date:{}'.format(msg_type, channel, n_date) return send_text @staticmethod def do_jpush(uid, channel, appBundleId, token_val, event_type, n_time, msg_title, msg_text): """ android 国内极光APP消息提醒推送 """ app_key = JPUSH_CONFIG[appBundleId]['Key'] master_secret = JPUSH_CONFIG[appBundleId]['Secret'] _jpush = jpush.JPush(app_key, master_secret) push = _jpush.create_push() push.audience = jpush.registration_id(token_val) push_data = {"alert": "Motion ", "event_time": n_time, "event_type": event_type, "msg": "", "received_at": n_time, "sound": "sound.aif", "uid": uid, "zpush": "1", "channel": channel} android = jpush.android(alert=msg_text, priority=1, style=1, alert_type=7, big_text=msg_text, title=msg_title, extras=push_data) push.notification = jpush.notification(android=android) push.platform = jpush.all_ res = push.send() print(res) return res.status_code @staticmethod def do_fcm(uid, channel, appBundleId, token_val, event_type, n_time, msg_title, msg_text): """ android 谷歌APP消息提醒推送 """ try: serverKey = FCM_CONFIG[appBundleId] except Exception as e: LOGGING.info('异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return 'serverKey abnormal' push_service = FCMNotification(api_key=serverKey) data = {"alert": "Motion ", "event_time": n_time, "event_type": event_type, "msg": "", "received_at": n_time, "sound": "sound.aif", "uid": uid, "zpush": "1", "channel": channel} result = push_service.notify_single_device(registration_id=token_val, message_title=msg_title, message_body=msg_text, data_message=data, extra_kwargs={ 'default_vibrate_timings': True, 'default_sound': True, 'default_light_settings': True }) return result @staticmethod def do_apns(uid, channel, appBundleId, token_val, event_type, n_time, msg_title, msg_text): """ ios 消息提醒推送 """ LOGGING.info("进来do_apns函数了") LOGGING.info(token_val) LOGGING.info(APNS_MODE) LOGGING.info(os.path.join(BASE_DIR, APNS_CONFIG[appBundleId]['pem_path'])) try: cli = apns2.APNSClient( mode=APNS_MODE, client_cert=os.path.join(BASE_DIR, APNS_CONFIG[appBundleId]['pem_path'])) push_data = {"alert": "Motion ", "event_time": n_time, "event_type": event_type, "msg": "", "received_at": n_time, "sound": "", "uid": uid, "zpush": "1", "channel": channel} alert = apns2.PayloadAlert(body=msg_text, title=msg_title) payload = apns2.Payload(alert=alert, custom=push_data, sound="default") # return uid, channel, appBundleId, str(token_val), event_type, n_time, msg_title,msg_text n = apns2.Notification(payload=payload, priority=apns2.PRIORITY_LOW) res = cli.push(n=n, device_token=token_val, topic=appBundleId) print(res.status_code) LOGGING.info("apns_推送状态:") LOGGING.info(res.status_code) if res.status_code == 200: return res.status_code else: print('apns push fail') print(res.reason) LOGGING.info('apns push fail') LOGGING.info(res.reason) return res.status_code except (ValueError, ArithmeticError): return 'The program has a numeric format exception, one of the arithmetic exceptions' except Exception as e: print(repr(e)) print('do_apns函数错误行号', e.__traceback__.tb_lineno) LOGGING.info('异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return repr(e) @staticmethod def do_xmpush(channel_id, uid, channel, appBundleId, token_val, event_type, n_time, msg_title, msg_text): """ android 国内小米APP消息提醒推送 """ url = 'https://api.xmpush.xiaomi.com/v3/message/regid' app_secret = XMPUSH_CONFIG[appBundleId] payload = {'alert': 'Motion', 'msg': '', 'sound': 'sound.aif', 'zpush': '1', 'received_at': n_time, 'event_time': n_time, 'event_type': event_type, 'uid': uid, 'channel': channel } data = { 'title': msg_title, 'description': msg_text, 'payload': 'payload', 'restricted_package_name': appBundleId, 'registration_id': token_val, 'extra.channel_id': channel_id } headers = { 'Authorization': 'key={}'.format(app_secret) } response = requests.post(url, data=data, headers=headers) if response.status_code == 200: return response.json() @staticmethod def do_oppopush(channel_id, uid, channel, appBundleId, token_val, event_type, n_time, msg_title, msg_text): """ android 国内oppo APP消息提醒推送 """ app_key = OPPOPUSH_CONFIG[appBundleId]['Key'] master_secret = OPPOPUSH_CONFIG[appBundleId]['Secret'] url = 'https://api.push.oppomobile.com/' now_time = str(round(time.time() * 1000)) # 1、实例化一个sha256对象 sha256 = hashlib.sha256() # 2、调用update方法进行加密 sha256.update((app_key + now_time + master_secret).encode('utf-8')) # 3、调用hexdigest方法,获取加密结果 sign = sha256.hexdigest() # 获取auth_token get_token_url = url + 'server/v1/auth' post_data = { 'app_key': app_key, 'sign': sign, 'timestamp': now_time } headers = {'Content-Type': 'application/x-www-form-urlencoded'} response = requests.post(get_token_url, data=post_data, headers=headers) result = response.json() # 发送推送 push_url = url + 'server/v1/message/notification/unicast' extra_data = {'alert': 'Motion', 'msg': '', 'sound': 'sound.aif', 'zpush': '1', 'received_at': n_time, 'event_time': n_time, 'event_type': event_type, 'uid': uid, 'channel': channel} message = { "target_type": 2, "target_value": token_val, "notification": { "title": msg_title, "content": msg_text, 'channel_id': channel_id } } push_data = { 'auth_token': result['data']['auth_token'], 'message': json.dumps(message) } response = requests.post(push_url, data=push_data, headers=headers) if response.status_code == 200: return response.json() @classmethod def async_send_picture_push(cls, push_type, aws_s3_client, bucket, key, uid, appBundleId, token_val, event_type, n_time, msg_title, msg_text, channel): """ 异步APP图片推送 """ try: image_url = aws_s3_client.generate_presigned_url('get_object', Params={'Bucket': bucket, 'Key': key}, ExpiresIn=3600) LOGGING.info('推送图片url:{}'.format(image_url)) if push_type == 0: PushObject.ios_apns_push(uid, appBundleId, token_val, n_time, event_type, msg_title, msg_text, uid, channel, image_url) elif push_type == 1: PushObject.android_fcm_push(uid, appBundleId, token_val, n_time, event_type, msg_title, msg_text, uid, channel, image_url) elif push_type == 3: huawei_push_object = HuaweiPushObject() huawei_push_object.send_push_notify_message(token_val=token_val, msg_title=msg_title, msg_text=msg_text, image_url=image_url) except Exception as e: LOGGING.info('异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_push_url(**params): """ 获取推送URL,设备根本当前返回结果进行数据上传 @return: re_data """ re_data = {'code': 0, 'msg': 'success'} if params['is_st'] == 0 or params['is_st'] == 2: re_data['msg'] = 'success 0 or 2' for up in params['uid_set_push_list']: if up['push_type'] == 0: # ios apns up['do_apns_code'] = params['code_dict']['code_date']['do_apns_code'] elif up['push_type'] == 1: # android gcm up['do_fcm_code'] = params['code_dict']['code_date']['do_fcm_code'] elif up['push_type'] == 2: # android jpush up['do_jpush_code'] = params['code_dict']['code_date']['do_jpush_code'] elif up['push_type'] == 4: # android jpush up['do_xmpush_code'] = params['code_dict']['code_date']['do_xmpush_code'] elif up['push_type'] == 5: # android jpush up['do_vivopush_code'] = params['code_dict']['code_date']['do_vivopush_code'] elif up['push_type'] == 7: # android jpush up['do_meizupush_code'] = params['code_dict']['code_date']['do_meizupush_code'] del up['push_type'] del up['userID_id'] del up['userID__NickName'] del up['lang'] del up['tz'] del up['uid_set__nickname'] del up['uid_set__detect_interval'] del up['uid_set__detect_group'] re_data['re_list'] = params['uid_set_push_list'] elif params['is_st'] == 1: key_name = '{uid}/{channel}/{filename}.jpeg' \ .format(uid=params['uid'], channel=params['channel'], filename=params['n_time']) re_args = {'Key': key_name} if params['region'] == 2: # 2:国内 re_args['Bucket'] = 'push' else: # 1:国外 re_args['Bucket'] = 'foreignpush' response_url = DevicePushService.generate_s3_url(params['aws_s3_client'], re_args) re_data['img_push'] = response_url elif params['is_st'] == 3: img_url_list = [] if params['region'] == 2: # 2:国内 re_args = {'Bucket': 'push'} else: # 1:国外 re_args = {'Bucket': 'foreignpush'} for i in range(params['is_st']): key_name = '{uid}/{channel}/{filename}_{st}.jpeg'. \ format(uid=params['uid'], channel=params['channel'], filename=params['n_time'], st=i) re_args['Key'] = key_name response_url = DevicePushService.generate_s3_url(params['aws_s3_client'], re_args) img_url_list.append(response_url) re_data['img_url_list'] = img_url_list re_data['msg'] = 'success 3' return re_data @staticmethod def generate_s3_url(aws_s3_client, params): """ 获取S3对象URL """ response_url = aws_s3_client.generate_presigned_url( ClientMethod='put_object', Params=params, ExpiresIn=3600 ) return response_url @staticmethod def check_share_permission(user_id, channel, uid): """ 检查用户是否有权限接收设备报警推送 """ user_permission_qs = DeviceChannelUserSet.objects.filter(user_id=user_id, uid=uid) \ .values('id', 'channels') # 根据当前用户与uid查询是否设置过通道权限,不存在则不是分享设备 if not user_permission_qs.exists(): return True up_id = user_permission_qs[0]['id'] channels = user_permission_qs[0]['channels'] channels_list = [int(val) for val in channels.split(',')] # 当前uid是属于分享设备并且设置了权限 # 判断通道是否设置了权限,不存在则当前通道没有权限接受消息推送 if int(channel) not in channels_list: return False permission_qs = DeviceSharePermission.objects.filter(code='AlarmMessages').values('id') p_id = permission_qs[0]['id'] # 当前通道存在设置则查看是否有 消息推送权限 channel_permission_qs = DeviceChannelUserPermission.objects \ .filter(channel_user_id=up_id, permission_id=p_id) \ .values('permission_id', 'channel_user_id') if not channel_permission_qs.exists(): return False return True