# -*- coding: utf-8 -*- """ @Author : Rocky @Time : 2022/5/9 10:51 @File :gatewayController.py """ import datetime import threading import time import pytz import requests from django.views.generic.base import View from AnsjerPush.Config.gatewaySensorConfig import SENSOR_TYPE, EVENT_TYPE, DEVICE_TYPE, SMART_SOCKET_TOPIC, \ ANSJER_GENERIC_TOPIC from AnsjerPush.config import LOGGER, XM_PUSH_CHANNEL_ID from Model.models import SensorRecord, GatewaySubDevice, GatewayPush, Device_Info, SceneLog, SmartScene, CountryModel, \ EffectiveTime from Object.ResponseObject import ResponseObject from Service.CommonService import CommonService from AnsjerPush.config import DETECT_PUSH_DOMAIN from Service.EquipmentInfoService import EquipmentInfoService from Service.HuaweiPushService.HuaweiPushService import HuaweiPushObject from Service.PushService import PushObject class GatewayView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, operation) def validation(self, request_dict, operation): response = ResponseObject() if operation == 'gatewayPush': # 网关推送 return self.gateway_push(request_dict, response) elif operation == 'sceneLogPush': # 场景日志推送 return self.scene_log_push(request_dict, response) elif operation == 'socketPush': # 插座推送 return self.socket_msg_push(request_dict, response) else: return response.json(414) @classmethod def gateway_push(cls, request_dict, response): """ 网关推送 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @request_dict ieee_addr: 长地址 @request_dict sensor_type: 传感器类型 @request_dict event_type: 事件类型 @request_dict defense: 防御状态,0:撤防,1:防御 @request_dict sensor_status: 拆动状态,拆动时传参 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) ieee_addr = request_dict.get('ieee_addr', None) sensor_type = int(request_dict.get('sensor_type', None)) event_type = int(request_dict.get('event_type', None)) defense = int(request_dict.get('defense', None)) LOGGER.info('---调用网关推送接口--- request_dict:{}'.format(request_dict)) if not all([serial_number, ieee_addr, sensor_type, event_type]): return response.json(444) num = None n_time = int(time.time()) try: # 获取温湿度 if sensor_type == SENSOR_TYPE['tem_hum_sensor'] and ( event_type == EVENT_TYPE['temperature'] or event_type == EVENT_TYPE['humidity']): num = request_dict.get('num') if num is None: return response.json(444) num = int(num) / 100 # 查询子设备表id gateway_sub_device_qs = GatewaySubDevice.objects.filter(device__serial_number=serial_number, device_type=sensor_type, ieee_addr=ieee_addr). \ values('id', 'nickname', 'device__userID__region_country') if not gateway_sub_device_qs.exists(): return response.json(173) gateway_sub_device_id = gateway_sub_device_qs[0]['id'] # 多线程执行场景任务 task_kwargs = { 'num': num, 'n_time': n_time, 'event_type': event_type, 'gateway_sub_device_id': gateway_sub_device_id } execute_task_thread = threading.Thread(target=cls.execute_scene_tasks, kwargs=task_kwargs) execute_task_thread.start() country_id = gateway_sub_device_qs[0]['device__userID__region_country'] lang = cls.confirm_lang(country_id) alarm = cls.get_alarm(lang, event_type) nickname = gateway_sub_device_qs[0]['nickname'] sensor_record_dict = { 'gateway_sub_device_id': gateway_sub_device_id, 'alarm': alarm, 'event_type': event_type, 'created_time': n_time, } # 温湿度上报不推送 if num is not None: sensor_record_dict['alarm'] = str(num) SensorRecord.objects.create(**sensor_record_dict) return response.json(0) SensorRecord.objects.create(**sensor_record_dict) # 门磁被拆动/拆动恢复,修改拆动状态 if event_type == 2156: gateway_sub_device_qs.update(is_tampered=1) elif event_type == 2152: gateway_sub_device_qs.update(is_tampered=0) # 撤防状态不推送 if defense == 0: return response.json(0) device_info_qs = Device_Info.objects.filter(serial_number=serial_number).values('userID_id') if not device_info_qs.exists(): return response.json(173) equipment_info_list = [] equipment_info_model = EquipmentInfoService.randoms_choice_equipment_info() # 推送表存储数据 equipment_info_kwargs = { 'device_uid': serial_number, 'device_nick_name': nickname, 'event_type': event_type, 'event_time': n_time, 'add_time': n_time, 'alarm': alarm } for device_info in device_info_qs: user_id = device_info['userID_id'] equipment_info_kwargs['device_user_id'] = user_id equipment_info_list.append(equipment_info_model(**equipment_info_kwargs)) # 开启异步推送 push_kwargs = { 'user_id': user_id, 'n_time': n_time, 'event_type': event_type, 'nickname': nickname, 'alarm': alarm, } push_thread = threading.Thread( target=cls.gateway_push_msg, kwargs=push_kwargs) push_thread.start() if equipment_info_list: equipment_info_model.objects.bulk_create(equipment_info_list) return response.json(0) except Exception as e: LOGGER.info('---网关推送接口异常--- {}'.format(repr(e))) return response.json(500, 'error_ine:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def confirm_lang(country_id): """ 根据country_id确定语言 @param country_id: 国家id @return lang: 语言 """ country_qs = CountryModel.objects.filter(id=country_id).values('country_code') if not country_qs.exists(): lang = 'NA' else: lang = country_qs[0]['country_code'] return lang @staticmethod def get_alarm(lang, event_type): """ 根据语言和事件类型确定警报内容 @param lang: 语言 @param event_type: 事件类型 @return alarm: 警报内容 """ alarm = '' if lang == 'CN': # 门磁 if event_type == 2150: alarm = '门磁开' elif event_type == 2151: alarm = '门磁关' elif event_type == 2156: alarm = '被拆动' elif event_type == 2152: alarm = '拆动恢复' # 智能按钮 elif event_type == 2160: alarm = '紧急按钮按下' elif event_type == 2161: alarm = '单击' elif event_type == 2162: alarm = '双击' elif event_type == 2163: alarm = '长按' # 水浸 elif event_type == 2170: alarm = '水浸触发' elif event_type == 2171: alarm = '水浸恢复' # 烟雾 elif event_type == 2180: alarm = '烟雾触发' elif event_type == 2181: alarm = '烟雾恢复' # 人体红外 elif event_type == 2190: alarm = '有人移动' elif event_type == 2191: alarm = '无人移动' # 低电量 elif event_type in (2153, 2164, 2172, 2182, 2193): alarm = '低电量' else: # 门磁 if event_type == 2150: alarm = 'Door magnetic opening' elif event_type == 2151: alarm = 'Door magnetic closing' elif event_type == 2156: alarm = 'Be dismantled' elif event_type == 2152: alarm = 'Dismantling recovery' # 智能按钮 elif event_type == 2160: alarm = 'Emergency button pressed' elif event_type == 2161: alarm = 'Single click' elif event_type == 2162: alarm = 'Double click' elif event_type == 2163: alarm = 'Long press' # 水浸 elif event_type == 2170: alarm = 'Water immersion trigger' elif event_type == 2171: alarm = 'Water immersion recovery' # 烟雾 elif event_type == 2180: alarm = 'Smoke triggering' elif event_type == 2181: alarm = 'Smoke recovery' # 人体红外 elif event_type == 2190: alarm = 'Someone moving' elif event_type == 2191: alarm = 'Unmanned movement' # 低电量 elif event_type in (2153, 2164, 2172, 2182, 2193): alarm = 'LOW BATTERY' return alarm @classmethod def scene_log_push(cls, request_dict, response): """ 网关智能场景日志推送 @param request_dict: 请求参数 @request_dict sceneId: 场景id @request_dict status: 状态 @param response: 响应对象 @return: response """ scene_id = request_dict.get('sceneId', None) status = request_dict.get('status', None) LOGGER.info('---场景日志推送接口--- request_dict:{}'.format(request_dict)) if not all([scene_id, status]): return response.json(444) smart_scene_qs = SmartScene.objects.filter(id=scene_id).values('scene_name', 'conditions', 'tasks', 'device_id', 'sub_device_id', 'user_id', 'scene_data') if not smart_scene_qs.exists(): return response.json(173) scene_name = smart_scene_qs[0]['scene_name'] tasks = smart_scene_qs[0]['tasks'] device_id = smart_scene_qs[0]['device_id'] sub_device_id = smart_scene_qs[0]['sub_device_id'] n_time = int(time.time()) user_id = smart_scene_qs[0]['user_id'] scene_data = smart_scene_qs[0]['scene_data'] if sub_device_id: gateway_sub_device_qs = GatewaySubDevice.objects.filter(id=sub_device_id).values('nickname') nickname = gateway_sub_device_qs[0]['nickname'] if gateway_sub_device_qs.exists() else '' else: device_qs = Device_Info.objects.filter(id=device_id).values('NickName') nickname = device_qs[0]['NickName'] if device_qs.exists() else '' log_dict = { 'scene_id': scene_id, 'scene_name': scene_name, 'tasks': tasks, 'status': status, 'device_id': device_id, 'sub_device_id': sub_device_id, 'created_time': n_time, } tasks = eval(tasks) try: SceneLog.objects.create(**log_dict) # 如果是一次性场景,关闭场景 if scene_data: scene_data_dict = eval(scene_data) condition = scene_data_dict.get('condition') if condition: time_type = condition.get('time') if time_type == 'date': smart_scene_qs.update(is_enable=False) # 推送日志 gateway_push_qs = GatewayPush.objects.filter(user_id=user_id, logout=False). \ values('user_id', 'app_bundle_id', 'app_type', 'push_type', 'token_val', 'm_code', 'lang', 'm_code', 'tz') if not gateway_push_qs.exists(): return response.json(174) # 开启异步推送 push_kwargs = { 'tasks': tasks, 'n_time': n_time, 'nickname': nickname, 'gateway_push_qs': gateway_push_qs } push_thread = threading.Thread( target=cls.scene_msg_push, kwargs=push_kwargs) push_thread.start() return response.json(0) except Exception as e: LOGGER.info('---场景日志推送接口异常--- {}'.format(repr(e))) return response.json(500, 'error_ine:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def gateway_push_msg(cls, **push_kwargs): user_id = push_kwargs['user_id'] # 查询推送配置数据 gateway_push_qs = GatewayPush.objects.filter(user_id=user_id, logout=False). \ values('user_id', 'app_bundle_id', 'app_type', 'push_type', 'token_val', 'm_code', 'lang', 'm_code', 'tz') if gateway_push_qs.exists(): n_time = push_kwargs['n_time'] event_type = push_kwargs['event_type'] nickname = push_kwargs['nickname'] alarm = push_kwargs['alarm'] kwargs = { 'n_time': n_time, 'event_type': event_type, 'nickname': nickname, } # 推送到每台登录账号的手机 for gateway_push in gateway_push_qs: app_bundle_id = gateway_push['app_bundle_id'] push_type = gateway_push['push_type'] token_val = gateway_push['token_val'] lang = gateway_push['lang'] tz = gateway_push['tz'] if gateway_push['tz'] else 0 # 获取推送所需数据 msg_title = PushObject.get_msg_title(nickname) msg_text = PushObject.get_gateway_msg_text(n_time, tz, lang, alarm) kwargs['msg_title'] = msg_title kwargs['msg_text'] = msg_text kwargs['app_bundle_id'] = app_bundle_id kwargs['token_val'] = token_val try: # 推送消息 cls.push_msg(push_type, **kwargs) except Exception as e: LOGGER.info('网关推送消息异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) continue @classmethod def scene_msg_push(cls, **push_kwargs): tasks = push_kwargs['tasks'] n_time = push_kwargs['n_time'] nickname = push_kwargs['nickname'] gateway_push_qs = push_kwargs['gateway_push_qs'] for task in tasks: event_type = task['event_type'] if event_type == '1001': kwargs = { 'n_time': n_time, 'event_type': event_type, 'nickname': nickname, } event_info = task['value'] # 推送到每台登录账号的手机 for gateway_push in gateway_push_qs: app_bundle_id = gateway_push['app_bundle_id'] push_type = gateway_push['push_type'] token_val = gateway_push['token_val'] kwargs['msg_title'] = PushObject.get_msg_title(nickname) kwargs['msg_text'] = event_info kwargs['app_bundle_id'] = app_bundle_id kwargs['token_val'] = token_val try: # 推送消息 cls.push_msg(push_type, **kwargs) except Exception as e: LOGGER.info( '场景日志推送消息异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) continue @staticmethod def push_msg(push_type, **kwargs): """ 发送推送消息 @param push_type: 推送类型 @param kwargs: 推送参数 @return: None """ if push_type == 0: # ios apns PushObject.ios_apns_push(**kwargs) elif push_type == 1: # android gcm PushObject.android_fcm_push_v1(**kwargs) elif push_type == 2: # android 极光推送 PushObject.android_jpush(**kwargs) elif push_type == 3: huawei_push_object = HuaweiPushObject() huawei_push_object.send_push_notify_message(**kwargs) elif push_type == 4: # android 小米推送 channel_id = XM_PUSH_CHANNEL_ID['device_reminder'] PushObject.android_xmpush(channel_id=channel_id, **kwargs) elif push_type == 5: # android vivo推送 PushObject.android_vivopush(**kwargs) elif push_type == 6: # android oppo推送 channel_id = 'DEVICE_REMINDER' PushObject.android_oppopush(channel_id=channel_id, **kwargs) elif push_type == 7: # android 魅族推送 PushObject.android_meizupush(**kwargs) @classmethod def socket_msg_push(cls, request_dict, response): """ 智能插座开关状态推送 """ try: serial_number = request_dict.get('serialNumber', None) device_time = request_dict.get('deviceTime', None) status = request_dict.get('status', None) if not all([serial_number, status, device_time]): return response.json(444) status = int(status) now_time = int(device_time) if device_time else int(time.time()) # 获取主用户设备id log_dict = { 'status': status, 'device_id': serial_number, 'created_time': now_time, } SceneLog.objects.create(**log_dict) LOGGER.info('成功接收并保存,插座序列号{},状态:{}'.format(serial_number, status)) return response.json(0) except Exception as e: LOGGER.info('插座开关日志推送接口异常, error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def execute_scene_tasks(cls, gateway_sub_device_id, event_type, num, n_time): """ 执行场景任务 @param gateway_sub_device_id: 子设备id @param event_type: 事件类型 @param num: 温湿度值 @param n_time: 当前时间 @return: """ smart_scene_qs = SmartScene.objects.filter(sub_device_id=gateway_sub_device_id, is_enable=True). \ values('id', 'scene_data', 'tz', 'is_all_day', 'effective_time_id') if smart_scene_qs.exists(): for smart_scene in smart_scene_qs: if smart_scene['scene_data']: scene_data_dict = eval(smart_scene['scene_data']) condition = scene_data_dict.get('condition') if condition: # None 或 event_type scene_data_event_type = condition.get('event_type') # 触发事件类型跟条件事件类型一致,且任务列表不为空 task_list = scene_data_dict.get('task_list') if event_type == scene_data_event_type and task_list: # 判断温湿度是否在条件设置的范围 if num is not None: condition_value = condition.get('value') space_index = condition_value.index(' ') symbol, value = condition_value[:space_index], float(condition_value[space_index + 1:]) # 温湿度不在设定范围,不执行 if not ((symbol == '≥' and num >= value) or (symbol == '≤' and num <= value)): continue execute_task = cls.judge_execute_task(smart_scene, n_time) # 执行任务 if execute_task: scene_id = 0 task_list_len = len(task_list) for index, task in enumerate(task_list): # 最后一个任务上报日志 if index == task_list_len - 1: scene_id = smart_scene['id'] kwargs = { 'sensor_event_type': event_type, 'device_type': task['device_type'], 'event_type': task['event_type'], 'serial_number': task['serial_number'], 'delay_time': task['delay_time'], 'scene_id': scene_id } pub_mqtt_thread = threading.Thread(target=cls.pub_mqtt, kwargs=kwargs) pub_mqtt_thread.start() @classmethod def judge_execute_task(cls, smart_scene, n_time): """ 判断是否执行任务 @param smart_scene: 场景数据 @param n_time: 当前时间 @return execute_task: bool """ execute_task = False # 判断时间是否在执行时间范围内 if smart_scene['is_all_day'] == 1: # 全天必执行 execute_task = True elif smart_scene['is_all_day'] == 2: # 非全天,判断当前时间是否在设置的时间范围 tz = smart_scene['tz'] time_minute, week = cls.get_time_info(n_time, tz) effective_time_id = smart_scene['effective_time_id'] effective_time_qs = EffectiveTime.objects.filter(id=effective_time_id). \ values('start_time', 'end_time', 'repeat') if effective_time_qs.exists(): start_time = effective_time_qs[0]['start_time'] end_time = effective_time_qs[0]['end_time'] repeat = effective_time_qs[0]['repeat'] time_frame_dict, time_frame_next_day_dict = cls.get_time_frame_dict( start_time, end_time, repeat) # 判断当前时间是否在设置的时间范围 if time_frame_dict.get(week): start_time = time_frame_dict[week]['start_time'] end_time = time_frame_dict[week]['end_time'] if start_time <= time_minute <= end_time: execute_task = True if time_frame_next_day_dict.get(week): start_time = time_frame_next_day_dict[week]['start_time'] end_time = time_frame_next_day_dict[week]['end_time'] if start_time <= time_minute <= end_time: execute_task = True return execute_task @staticmethod def get_time_info(timestamp, timezone_offset): """ 根据时间戳和时区获取时间(时分转为分钟数)和星期 @param timestamp: 时间戳 @param timezone_offset: 时区 @return: time_minute, week """ # 计算时区偏移量(以分钟为单位) timezone_offset_minutes = int(timezone_offset * 60) timezone = pytz.FixedOffset(timezone_offset_minutes) # 将时间戳转换为datetime对象,并应用时区 dt_object = datetime.datetime.fromtimestamp(timestamp, tz=timezone) # 获取时分,并转为分钟数 hour = dt_object.hour minute = dt_object.minute time_minute = hour * 60 + minute # 获取星期(0表示星期一,6表示星期日) week = str(dt_object.weekday()) return time_minute, week @staticmethod def get_time_frame_dict(start_time, end_time, repeat): """ 获取时间范围字典 @param start_time: 开始时间 @param end_time: 结束时间 @param repeat: 星期周期的十进制数,如127 -> 0,1,2,3,4,5,6 @return: time_frame_dict, time_frame_next_day_dict """ # 十进制转为7位的二进制并倒序 bin_str = bin(repeat)[2:].zfill(7)[::-1] # 生成星期周期列表 week_list = [] for i, bit in enumerate(bin_str): if bit == '1': week_list.append(i) # 生成时间范围字典 time_frame_dict = {} time_frame_next_day_dict = {} # 非隔天 if end_time > start_time: for week in week_list: time_frame_dict[str(week)] = { 'start_time': start_time, 'end_time': end_time } # 隔天 else: # time_frame_dict记录当天时间范围,time_frame_next_day_dict记录溢出到第二天的时间范围 for week in week_list: time_frame_dict[str(week)] = { 'start_time': start_time, 'end_time': 1439 # 23:59 } week += 1 if week == 7: week = 0 time_frame_next_day_dict[str(week)] = { 'start_time': 0, # 00:00 'end_time': end_time } return time_frame_dict, time_frame_next_day_dict @staticmethod def pub_mqtt(sensor_event_type, device_type, event_type, serial_number, delay_time, scene_id=0): """ 发布mqtt消息 @param sensor_event_type: 传感器事件类型 @param device_type: 设备类型 @param event_type: 事件类型 @param serial_number: 序列号 @param delay_time: 延迟时间 @param scene_id: 场景id @return: """ msg = {} # 插座 if device_type == DEVICE_TYPE['socket']: topic_name = SMART_SOCKET_TOPIC.format(serial_number) status = 1 if event_type == EVENT_TYPE['socket_power_on'] else 0 msg['type'] = 1 msg['data'] = { 'deviceSwitch': status } # 摄像头 elif device_type == DEVICE_TYPE['C516']: topic_name = ANSJER_GENERIC_TOPIC.format(serial_number) if event_type == EVENT_TYPE['detection_reminder_on']: msg['commandType'] = 'detection_reminder' msg['enable'] = 1 elif event_type == EVENT_TYPE['detection_reminder_off']: msg['commandType'] = 'detection_reminder' msg['enable'] = 0 elif event_type == EVENT_TYPE['snapshot']: msg['commandType'] = 'snapshot' msg['eventType'] = sensor_event_type elif event_type == EVENT_TYPE['record_video']: msg['commandType'] = 'record_video' elif event_type == EVENT_TYPE['human_tracking_on']: msg['commandType'] = 'human_tracking' msg['enable'] = 1 elif event_type == EVENT_TYPE['human_tracking_off']: msg['commandType'] = 'human_tracking' msg['enable'] = 0 else: return else: return if delay_time: time.sleep(delay_time) CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg) # 没有设备任务时,最后一个任务上报场景日志 if scene_id: data = { 'sceneId': scene_id, 'status': 1 } url = DETECT_PUSH_DOMAIN + 'gatewayService/sceneLogPush' requests.post(url=url, data=data, timeout=8)