#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved. @AUTHOR: ASJRD018 @NAME: AnsjerFormal @software: PyCharm @DATE: 2019/1/14 15:57 @Version: python3.6 @MODIFY DECORD:ansjer dev @file: DetectController.py @Contact: chanjunkai@163.com """ import os import time import apns2 import jpush as jpush import oss2 from django.http import JsonResponse from django.views.generic.base import View from pyfcm import FCMNotification from AnsjerPush.config import SERVER_TYPE from AnsjerPush.config import OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET, DETECT_PUSH_DOMAIN, JPUSH_CONFIG, FCM_CONFIG, \ APNS_CONFIG, BASE_DIR, APNS_MODE from Model.models import Equipment_Info, UidPushModel, SysMsgModel from Object.ETkObject import ETkObject from Object.LogUtil import LogUtil from Object.RedisObject import RedisObject from Object.UidTokenObject import UidTokenObject from Service.CommonService import CommonService ''' http://push.dvema.com/notify/push?etk=Y2lTRXhMTjBWS01sWlpURTVJU0ZWTlJ6RXhNVUU9T3o=&n_time=1526845794&channel=1&event_type=704&is_st=0 http://push.dvema.com/deviceShadow/generateUTK?username=debug_user&password=debug_password&uid=VVDHCVBYDKFMJRWA111A ''' # 移动侦测接口 class NotificationView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' return self.validation(request.GET) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' return self.validation(request.POST) def validation(self, request_dict): uidToken = request_dict.get('uidToken', None) etk = request_dict.get('etk', None) channel = request_dict.get('channel', '1') n_time = request_dict.get('n_time', None) event_type = request_dict.get('event_type', None) is_st = request_dict.get('is_st', None) # print("aaa") # return JsonResponse(0,safe=False) if not all([channel, n_time]): return JsonResponse(status=200, data={ 'code': 444, 'msg': 'param is wrong'}) if etk: eto = ETkObject(etk) uid = eto.uid if len(uid) != 20: return JsonResponse(status=200, data={'code': 404, 'msg': 'data is not exist'}) else: utko = UidTokenObject(uidToken) uid = utko.UID pkey = '{uid}_{channel}_{event_type}_ptl'.format(uid=uid, event_type=event_type, channel=channel) # ykey = 'MUJ887NLR8K8GBM9111A_redis_qs'.format(uid=uid) ykey = '{uid}_redis_qs'.format(uid=uid) # dkey = '{uid}_{channel}_{event_type}_flag'.format(uid=uid, event_type=event_type, channel=channel) is_sys_msg = self.is_sys_msg(int(event_type)) if is_sys_msg is True: dkey = '{uid}_{channel}_{event_type}_flag'.format(uid=uid, event_type=event_type, channel=channel) else: dkey = '{uid}_{channel}_flag'.format(uid=uid, channel=channel) # 判断redisObj.get_data(key=pkey):不为空 redisObj = RedisObject(db=6) have_ykey = redisObj.get_data(key=ykey) # uid_set 数据库缓存 have_pkey = redisObj.get_data(key=pkey) # 一分钟限制key have_dkey = redisObj.get_data(key=dkey) # 推送类型限制 # 一分钟外,推送开启状态 detect_med_type = 0 # 0推送旧机制 1存库不推送,2推送存库 # 暂时注销 if have_pkey: if SERVER_TYPE != "Ansjer.formal_settings": res_data = {'code': 0, 'msg': 'Push once every 10 seconds'} else: res_data = {'code': 0, 'msg': 'Push it once a minute'} return JsonResponse(status=200, data=res_data) # 数据库读取数据 if have_ykey: redis_list = eval(redisObj.get_data(key=ykey)) print(have_ykey) else: # 从数据库查询出来 uid_push_qs = UidPushModel.objects.filter(uid_set__uid=uid,uid_set__detect_status=1). \ values('token_val', 'app_type', 'appBundleId','m_code', 'push_type', 'userID_id', 'userID__NickName', 'lang', 'm_code', 'tz', 'uid_set__nickname', 'uid_set__detect_interval', 'uid_set__detect_group', 'uid_set__channel') print(uid_push_qs) # 新建一个list接收数据 redis_list = [] # 把数据库数据追加进redis_list for qs in uid_push_qs: redis_list.append(qs) # 修改redis数据,并设置过期时间为10分钟 redisObj.set_data(key=ykey, val=str(redis_list), expire=600) if not redis_list: res_data = {'code': 404, 'msg': 'error !'} return JsonResponse(status=200, data=res_data) # 此时应该更新一下redis里面的dkey的有效时间 # detect_interval = redis_list[0]['uid_set__detect_interval'] # tmp_channel = redis_list[0]['uid_set__channel'] # self.do_update_detect_interval(uid, tmp_channel, redisObj, detect_interval) if not redis_list: print("没有redi_list") res_data = {'code': 0, 'msg': 'no redi_list success!'} return JsonResponse(status=200, data=res_data) # is_sys_msg = self.is_sys_msg(int(event_type)) nickname = redis_list[0]['uid_set__nickname'] detect_interval = redis_list[0]['uid_set__detect_interval'] detect_group = redis_list[0]['uid_set__detect_group'] now_time = int(time.time()) if not nickname: nickname = uid if detect_group is not None: if have_dkey: detect_med_type = 1 # 1为存库不推送 else: detect_med_type = 2 # 为2的话,既推送,又存库 # detect_group=0允许全部推送的时候 if detect_group == '0'or detect_group == '': redisObj.set_data(key=dkey, val=1, expire=detect_interval) else: detect_group_list = detect_group.split(',') if event_type in detect_group_list: if detect_interval < 60: detect_interval = 60 redisObj.set_data(key=dkey, val=1, expire=detect_interval) # 改为1秒 # 如果不是正式 if SERVER_TYPE!="Ansjer.formal_settings": redisObj.set_data(key=pkey, val=1, expire=10) else: redisObj.set_data(key=pkey, val=1, expire=60) # 打印have_ykey # return JsonResponse(status=200, data={'pkey': 0, 'have_ykey': have_ykey, 'have_pkey': have_pkey, 'have_ykey': have_dkey}) # 旧模式并且没有pkey,重新创建一个 if not detect_group and not have_pkey: # 设置推送时间为60秒一次 # 如果不是正式 if SERVER_TYPE != "Ansjer.formal_settings": redisObj.set_data(key=pkey, val=1, expire=10) else: redisObj.set_data(key=pkey, val=1, expire=60) auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET) bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg') kwag_args = { 'uid': uid, 'channel': channel, 'event_type': event_type, 'n_time': n_time, # 'appBundleId': appBundleId, # 'token_val': token_val, # 'msg_title': msg_title, # 'msg_text': msg_text } eq_list = [] sys_msg_list = [] userID_ids = [] do_apns_code = '' do_fcm_code = '' do_jpush_code = '' for up in redis_list: push_type = up['push_type'] appBundleId = up['appBundleId'] token_val = up['token_val'] lang = up['lang'] tz = up['tz'] if tz is None or tz == '': tz = 0 # 发送标题 msg_title = self.get_msg_title(appBundleId=appBundleId, nickname=nickname) # 发送内容 CommonService.app_log_log(uid=uid, tz=tz) msg_text = self.get_msg_text(channel=channel, n_time=n_time, lang=lang, tz=tz, event_type=event_type) kwag_args['appBundleId'] = appBundleId kwag_args['token_val'] = token_val kwag_args['msg_title'] = msg_title kwag_args['msg_text'] = msg_text push_server_status = 0 #推送 if detect_med_type == 2 or detect_med_type == 0: if push_type == 0: # ios apns print('do_apns') # self.do_apns(**kwag_args) do_apns_code = self.do_apns(**kwag_args) if isinstance(do_apns_code, int): push_server_status = do_apns_code else: push_server_status = 400 elif push_type == 1: # android gcm print('do_fcm') do_fcm_code = self.do_fcm(**kwag_args) push_server_status = 200 elif push_type == 2: # android jpush print('do_jpush') do_jpush_code = self.do_jpush(**kwag_args) push_server_status = do_jpush_code # return JsonResponse(status=200, data={'code': 0, '状态:': self.do_jpush(**kwag_args)}) if detect_med_type == 1: do_apns_code = '只存库不推送' do_fcm_code = '只存库不推送' do_jpush_code = '只存库不推送' # 以下是存库 userID_id = up["userID_id"] int_is_st = int(is_st) if userID_id not in userID_ids: eq_list.append(Equipment_Info( userID_id=userID_id, eventTime=n_time, eventType=event_type, devUid=uid, devNickName=nickname, Channel=channel, alarm='Motion \tChannel:{channel}'.format(channel=channel), is_st=int_is_st, receiveTime=n_time, addTime=now_time )) if is_sys_msg: sys_msg_text = self.get_msg_text(channel=channel, n_time=n_time, lang=lang, tz=tz, event_type=event_type, is_sys=1) sys_msg_list.append(SysMsgModel( userID_id=userID_id, msg=sys_msg_text, addTime=now_time, updTime=now_time, uid=uid, eventType=event_type)) userID_ids.append(userID_id) if is_sys_msg: SysMsgModel.objects.bulk_create(sys_msg_list) Equipment_Info.objects.bulk_create(eq_list) if is_st == '0' or is_st == '2': print("is_st=0or2") for up in redis_list: if up['push_type'] == 0: # ios apns up['do_apns_code'] = do_apns_code elif up['push_type'] == 1: # android gcm up['do_fcm_code'] = do_fcm_code elif up['push_type'] == 2: # android jpush up['do_jpush_code'] = do_jpush_code up['test_or_www'] = SERVER_TYPE del up['push_type'] del up['userID_id'] del up['userID__NickName'] del up['lang'] del up['tz'] del up['uid_set__nickname'] del up['uid_set__detect_interval'] del up['uid_set__detect_group'] return JsonResponse(status=200, data={'code': 0, 'msg': 'success 0 or 2' ,'re_list':redis_list}) elif is_st == '1': print("is_st=1") # Endpoint以杭州为例,其它Region请按实际情况填写。 obj = '{uid}/{channel}/{filename}.jpeg'.format(uid=uid, channel=channel, filename=n_time) # 设置此签名URL在60秒内有效。 url = bucket.sign_url('PUT', obj, 7200) for up in redis_list: up['do_apns_code'] = do_apns_code up['do_fcm_code'] = do_fcm_code up['do_jpush_code'] = do_jpush_code up['test_or_www'] = SERVER_TYPE del up['push_type'] del up['userID_id'] del up['userID__NickName'] del up['lang'] del up['tz'] del up['uid_set__nickname'] del up['uid_set__detect_interval'] del up['uid_set__detect_group'] # 不是正式服务器 if SERVER_TYPE != "Ansjer.formal_settings": res_data = {'code': 0, 'img_push': url, 'msg': 'success', 're_list': redis_list} else: # 是正式服务器的时候 res_data = {'code': 0, 'img_push': url, 'msg': 'success'} return JsonResponse(status=200, data=res_data) elif is_st == '3': print("is_st=3") # 人形检测带动图 # Endpoint以杭州为例,其它Region请按实际情况填写。 img_url_list = [] for i in range(int(is_st)): obj = '{uid}/{channel}/{filename}_{st}.jpeg'. \ format(uid=uid, channel=channel, filename=n_time, st=i) # 设置此签名URL在60秒内有效。 url = bucket.sign_url('PUT', obj, 7200) img_url_list.append(url) for up in redis_list: up['do_apns_code'] = do_apns_code up['do_fcm_code'] = do_fcm_code up['do_jpush_code'] = do_jpush_code up['test_or_www'] = SERVER_TYPE del up['push_type'] del up['userID_id'] del up['userID__NickName'] del up['lang'] del up['tz'] del up['uid_set__nickname'] del up['uid_set__detect_interval'] del up['uid_set__detect_group'] # 不是正式服务器 if SERVER_TYPE != "Ansjer.formal_settings": res_data = {'code': 0, 'img_url_list': img_url_list, 'msg': 'success 3', 're_list': redis_list} else: # 是正式服务器的时候 res_data = {'code': 0, 'img_url_list': img_url_list, 'msg': 'success 3'} return JsonResponse(status=200, data=res_data) def get_msg_title(self, appBundleId, nickname): package_title_config = { 'com.ansjer.customizedd_a': 'DVS', 'com.ansjer.zccloud_a': 'ZosiSmart', 'com.ansjer.zccloud_ab': '周视', 'com.ansjer.adcloud_a': 'ADCloud', 'com.ansjer.adcloud_ab': 'ADCloud', 'com.ansjer.accloud_a': 'ACCloud', 'com.ansjer.loocamccloud_a': 'Loocam', 'com.ansjer.loocamdcloud_a': 'Anlapus', 'com.ansjer.customizedb_a': 'COCOONHD', 'com.ansjer.customizeda_a': 'Guardian365', 'com.ansjer.customizedc_a': 'PatrolSecure', } if appBundleId in package_title_config.keys(): return package_title_config[appBundleId] + '(' + nickname + ')' else: return nickname def is_sys_msg(self, event_type): event_type_list = [702, 703, 704] if event_type in event_type_list: return True return False def get_msg_text(self, channel, n_time, lang, tz, event_type, is_sys=0): n_date = CommonService.get_now_time_str(n_time=n_time, tz=tz,lang=lang) etype = int(event_type) if lang == 'cn': if etype == 704: msg_type = '电量过低' elif etype == 702: msg_type = '摄像头休眠' elif etype == 703: msg_type = '摄像头唤醒' else: msg_type = '' if is_sys: send_text = '{msg_type} 通道:{channel}'.format(msg_type=msg_type, channel=channel) else: send_text = '{msg_type} 通道:{channel} 日期:{date}'.format(msg_type=msg_type, channel=channel, date=n_date) # send_text = '{msg_type} 通道:{channel} 日期:{date}'.format(msg_type=msg_type, channel=channel, date=n_date) else: if etype == 704: msg_type = 'Low battery' elif etype == 702: msg_type = 'Camera sleep' elif etype == 703: msg_type = 'Camera wake' else: msg_type = '' if is_sys: send_text = '{msg_type} channel:{channel}'. \ format(msg_type=msg_type, channel=channel) else: send_text = '{msg_type} channel:{channel} date:{date}'. \ format(msg_type=msg_type, channel=channel, date=n_date) return send_text def do_jpush(self, uid, channel, appBundleId, token_val, event_type, n_time, msg_title, msg_text): app_key = JPUSH_CONFIG[appBundleId]['Key'] master_secret = JPUSH_CONFIG[appBundleId]['Secret'] # 此处换成各自的app_key和master_secre _jpush = jpush.JPush(app_key, master_secret) push = _jpush.create_push() # if you set the logging level to "DEBUG",it will show the debug logging. # _jpush.set_logging("DEBUG") # push.audience = jpush.all_ push.audience = jpush.registration_id(token_val) push_data = {"alert": "Motion ", "event_time": n_time, "event_type": event_type, "msg": "", "received_at": n_time, "sound": "sound.aif", "uid": uid, "zpush": "1", "channel": channel} android = jpush.android(alert=msg_text, priority=1, style=1, alert_type=7, big_text=msg_text, title=msg_title, extras=push_data) push.notification = jpush.notification(android=android) push.platform = jpush.all_ res = push.send() print(res) return res.status_code # try: # res = push.send() # print(res) # except Exception as e: # print("jpush fail") # print("Exception") # print(repr(e)) # return # else: # print("jpush success") # return def do_fcm(self, uid, channel, appBundleId, token_val, event_type, n_time, msg_title, msg_text): try: serverKey = FCM_CONFIG[appBundleId] except Exception as e: return 'serverKey abnormal' push_service = FCMNotification(api_key=serverKey) data = {"alert": "Motion ", "event_time": n_time, "event_type": event_type, "msg": "", "received_at": n_time, "sound": "sound.aif", "uid": uid, "zpush": "1", "channel": channel} result = push_service.notify_single_device(registration_id=token_val, message_title=msg_title, message_body=msg_text, data_message=data, extra_kwargs={ 'default_vibrate_timings': True, 'default_sound': True, 'default_light_settings': True }) print('fcm push ing') print(result) return result def do_apns(self, uid, channel, appBundleId, token_val, event_type, n_time, msg_title, msg_text): try: cli = apns2.APNSClient(mode=APNS_MODE, client_cert=os.path.join(BASE_DIR, APNS_CONFIG[appBundleId]['pem_path'])) push_data = {"alert": "Motion ", "event_time": n_time, "event_type": event_type, "msg": "", "received_at": n_time, "sound": "sound.aif", "uid": uid, "zpush": "1", "channel": channel} alert = apns2.PayloadAlert(body=msg_text, title=msg_title) payload = apns2.Payload(alert=alert, custom=push_data) # return uid, channel, appBundleId, str(token_val), event_type, n_time, msg_title,msg_text n = apns2.Notification(payload=payload, priority=apns2.PRIORITY_LOW) res = cli.push(n=n, device_token=token_val, topic=appBundleId) print(res.status_code) # 200, 推送成功。 #   400, 请求有问题。 #   403, 证书或Token有问题。 #   405, 请求方式不正确, 只支持POST请求 #   410, 设备的Token与证书不一致 if res.status_code == 200: return res.status_code else: print('apns push fail') print(res.reason) return res.status_code except (ValueError, ArithmeticError): return 'The program has a numeric format exception, one of the arithmetic exceptions' except Exception as e: print(repr(e)) return repr(e) def do_update_detect_interval(self, uid, channel, redisObject, detect_interval): if channel == 0: channel = 17 else: channel += 1 for i in range(1, channel): tmpDKey = '{uid}_{channel}_{event_type}_flag'.format(uid=uid, event_type=51, channel=i) if tmpDKey is not False: llt = redisObject.get_ttl(tmpDKey) if llt > detect_interval: redisObject.set_data(key=tmpDKey, val=1, expire=detect_interval) tmpDKey = '{uid}_{channel}_{event_type}_flag'.format(uid=uid, event_type=54, channel=i) if tmpDKey is not False: llt = redisObject.get_ttl(tmpDKey) if llt > detect_interval: redisObject.set_data(key=tmpDKey, val=1, expire=detect_interval) # http://test.dvema.com/detect/add?uidToken=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1aWQiOiJQMldOR0pSRDJFSEE1RVU5MTExQSJ9.xOCI5lerk8JOs5OcAzunrKCfCrtuPIZ3AnkMmnd-bPY&n_time=1526845794&channel=1&event_type=51&is_st=0 # 移动侦测接口 class PushNotificationView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' # operation = kwargs.get('operation') return self.validation(request.GET) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' # operation = kwargs.get('operation') return self.validation(request.POST) def validation(self, request_dict): etk = request_dict.get('etk', None) channel = request_dict.get('channel', '1') n_time = request_dict.get('n_time', None) event_type = request_dict.get('event_type', None) is_st = request_dict.get('is_st', None) eto = ETkObject(etk) uid = eto.uid if len(uid) == 20: redisObj = RedisObject(db=6) # pkey = '{uid}_{channel}_ptl'.format(uid=uid, channel=channel) pkey = '{uid}_ptl'.format(uid=uid) ykey = '{uid}_redis_qs'.format(uid=uid) if redisObj.get_data(key=pkey): res_data = {'code': 0, 'msg': 'success,!33333333333'} return JsonResponse(status=200, data=res_data) else: redisObj.set_data(key=pkey, val=1, expire=60) ############## redis_data = redisObj.get_data(key=ykey) if redis_data: redis_list = eval(redis_data) else: # 设置推送时间为60秒一次 redisObj.set_data(key=pkey, val=1, expire=60) print("从数据库查到数据") # 从数据库查询出来 uid_push_qs = UidPushModel.objects.filter(uid_set__uid=uid, uid_set__detect_status=1). \ values('token_val', 'app_type', 'appBundleId', 'push_type', 'userID_id', 'lang','m_code', 'tz', 'uid_set__nickname') # 新建一个list接收数据 redis_list = [] # 把数据库数据追加进redis_list for qs in uid_push_qs: redis_list.append(qs) # 修改redis数据,并设置过期时间为10分钟 if redis_list: redisObj.set_data(key=ykey, val=str(redis_list), expire=600) auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET) bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg') self.do_bulk_create_info(redis_list, n_time, channel, event_type, is_st, uid) if is_st == '0' or is_st == '2': return JsonResponse(status=200, data={'code': 0, 'msg': 'success44444444444444444'}) elif is_st == '1': # Endpoint以杭州为例,其它Region请按实际情况填写。 obj = '{uid}/{channel}/{filename}.jpeg'.format(uid=uid, channel=channel, filename=n_time) # 设置此签名URL在60秒内有效。 url = bucket.sign_url('PUT', obj, 7200) res_data = {'code': 0, 'img_push': url, 'msg': 'success'} return JsonResponse(status=200, data=res_data) elif is_st == '3': # 人形检测带动图 img_url_list = [] for i in range(int(is_st)): obj = '{uid}/{channel}/{filename}_{st}.jpeg'. \ format(uid=uid, channel=channel, filename=n_time, st=i) # 设置此签名URL在60秒内有效。 url = bucket.sign_url('PUT', obj, 7200) img_url_list.append(url) res_data = {'code': 0, 'img_url_list': img_url_list, 'msg': 'success'} return JsonResponse(status=200, data=res_data) else: return JsonResponse(status=200, data={'code': 404, 'msg': 'data is not exist'}) else: return JsonResponse(status=200, data={'code': 404, 'msg': 'wrong etk'}) def do_bulk_create_info(self, uaqs, n_time, channel, event_type, is_st, uid): now_time = int(time.time()) # 设备昵称 userID_ids = [] sys_msg_list = [] is_sys_msg = self.is_sys_msg(int(event_type)) is_st = int(is_st) eq_list = [] nickname = uaqs[0]['uid_set__nickname'] if not nickname: nickname = uid for ua in uaqs: lang = ua['lang'] tz = ua['tz'] userID_id = ua["userID_id"] if userID_id not in userID_ids: eq_list.append(Equipment_Info( userID_id=userID_id, eventTime=n_time, eventType=event_type, devUid=uid, devNickName=nickname, Channel=channel, alarm='Motion \tChannel:{channel}'.format(channel=channel), is_st=is_st, receiveTime=n_time, addTime=now_time )) if is_sys_msg: sys_msg_text = self.get_msg_text(channel=channel, n_time=n_time, lang=lang, tz=tz, event_type=event_type, is_sys=1) sys_msg_list.append(SysMsgModel( userID_id=userID_id, msg=sys_msg_text, addTime=now_time, updTime=now_time, uid=uid, eventType=event_type)) if eq_list: print('eq_list') Equipment_Info.objects.bulk_create(eq_list) if is_sys_msg: print('sys_msg') SysMsgModel.objects.bulk_create(sys_msg_list) return True def is_sys_msg(self, event_type): event_type_list = [702, 703, 704] if event_type in event_type_list: return True return False def get_msg_text(self, channel, n_time, lang, tz, event_type, is_sys=0): n_date = CommonService.get_now_time_str(n_time=n_time, tz=tz) etype = int(event_type) if lang == 'cn': if etype == 704: msg_type = '电量过低' elif etype == 702: msg_type = '摄像头休眠' elif etype == 703: msg_type = '摄像头唤醒' else: msg_type = '' if is_sys: send_text = '{msg_type} 通道:{channel}'.format(msg_type=msg_type, channel=channel) else: send_text = '{msg_type} 通道:{channel} 日期:{date}'.format(msg_type=msg_type, channel=channel, date=n_date) else: if etype == 704: msg_type = 'Low battery' elif etype == 702: msg_type = 'Camera sleep' elif etype == 703: msg_type = 'Camera wake' else: msg_type = '' if is_sys: send_text = '{msg_type} channel:{channel}'. \ format(msg_type=msg_type, channel=channel) else: send_text = '{msg_type} channel:{channel} date:{date}'. \ format(msg_type=msg_type, channel=channel, date=n_date) return send_text