|
- # -*- coding: utf-8 -*-
- """
- @Author : Rocky
- @Time : 2022/5/9 10:51
- @File :gatewayController.py
- """
- import datetime
- import threading
- import time
- import pytz
- import requests
- from django.views.generic.base import View
- from AnsjerPush.Config.gatewaySensorConfig import SENSOR_TYPE, EVENT_TYPE, DEVICE_TYPE, SMART_SOCKET_TOPIC, \
- ANSJER_GENERIC_TOPIC
- from AnsjerPush.config import LOGGER, XM_PUSH_CHANNEL_ID
- from Model.models import SensorRecord, GatewaySubDevice, GatewayPush, Device_Info, SceneLog, SmartScene, CountryModel, \
- EffectiveTime
- from Object.ResponseObject import ResponseObject
- from Service.CommonService import CommonService
- from AnsjerPush.config import DETECT_PUSH_DOMAIN
- from Service.EquipmentInfoService import EquipmentInfoService
- from Service.HuaweiPushService.HuaweiPushService import HuaweiPushObject
- from Service.PushService import PushObject
- class GatewayView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, operation)
- def validation(self, request_dict, operation):
- response = ResponseObject()
- if operation == 'gatewayPush': # 网关推送
- return self.gateway_push(request_dict, response)
- elif operation == 'sceneLogPush': # 场景日志推送
- return self.scene_log_push(request_dict, response)
- elif operation == 'socketPush': # 插座推送
- return self.socket_msg_push(request_dict, response)
- else:
- return response.json(414)
- @classmethod
- def gateway_push(cls, request_dict, response):
- """
- 网关推送
- @param request_dict: 请求参数
- @request_dict serial_number: 序列号
- @request_dict ieee_addr: 长地址
- @request_dict sensor_type: 传感器类型
- @request_dict event_type: 事件类型
- @request_dict defense: 防御状态,0:撤防,1:防御
- @request_dict sensor_status: 拆动状态,拆动时传参
- @param response: 响应对象
- @return: response
- """
- serial_number = request_dict.get('serial_number', None)
- ieee_addr = request_dict.get('ieee_addr', None)
- sensor_type = int(request_dict.get('sensor_type', None))
- event_type = int(request_dict.get('event_type', None))
- defense = int(request_dict.get('defense', None))
- LOGGER.info('---调用网关推送接口--- request_dict:{}'.format(request_dict))
- if not all([serial_number, ieee_addr, sensor_type, event_type]):
- return response.json(444)
- num = None
- n_time = int(time.time())
- try:
- # 获取温湿度
- if sensor_type == SENSOR_TYPE['tem_hum_sensor'] and (
- event_type == EVENT_TYPE['temperature'] or event_type == EVENT_TYPE['humidity']):
- num = request_dict.get('num')
- if num is None:
- return response.json(444)
- num = int(num) / 100
- # 查询子设备表id
- gateway_sub_device_qs = GatewaySubDevice.objects.filter(device__serial_number=serial_number,
- device_type=sensor_type, ieee_addr=ieee_addr). \
- values('id', 'nickname', 'device__userID__region_country')
- if not gateway_sub_device_qs.exists():
- return response.json(173)
- gateway_sub_device_id = gateway_sub_device_qs[0]['id']
- # 多线程执行场景任务
- task_kwargs = {
- 'num': num,
- 'n_time': n_time,
- 'event_type': event_type,
- 'gateway_sub_device_id': gateway_sub_device_id
- }
- execute_task_thread = threading.Thread(target=cls.execute_scene_tasks, kwargs=task_kwargs)
- execute_task_thread.start()
- country_id = gateway_sub_device_qs[0]['device__userID__region_country']
- lang = cls.confirm_lang(country_id)
- alarm = cls.get_alarm(lang, event_type)
- nickname = gateway_sub_device_qs[0]['nickname']
- sensor_record_dict = {
- 'gateway_sub_device_id': gateway_sub_device_id,
- 'alarm': alarm,
- 'event_type': event_type,
- 'created_time': n_time,
- }
- # 温湿度上报不推送
- if num is not None:
- sensor_record_dict['alarm'] = str(num)
- SensorRecord.objects.create(**sensor_record_dict)
- return response.json(0)
- SensorRecord.objects.create(**sensor_record_dict)
- # 门磁被拆动/拆动恢复,修改拆动状态
- if event_type == 2156:
- gateway_sub_device_qs.update(is_tampered=1)
- elif event_type == 2152:
- gateway_sub_device_qs.update(is_tampered=0)
- # 撤防状态不推送
- if defense == 0:
- return response.json(0)
- device_info_qs = Device_Info.objects.filter(serial_number=serial_number).values('userID_id')
- if not device_info_qs.exists():
- return response.json(173)
- equipment_info_list = []
- equipment_info_model = EquipmentInfoService.randoms_choice_equipment_info()
- # 推送表存储数据
- equipment_info_kwargs = {
- 'device_uid': serial_number,
- 'device_nick_name': nickname,
- 'event_type': event_type,
- 'event_time': n_time,
- 'add_time': n_time,
- 'alarm': alarm
- }
- for device_info in device_info_qs:
- user_id = device_info['userID_id']
- equipment_info_kwargs['device_user_id'] = user_id
- equipment_info_list.append(equipment_info_model(**equipment_info_kwargs))
- # 开启异步推送
- push_kwargs = {
- 'user_id': user_id,
- 'n_time': n_time,
- 'event_type': event_type,
- 'nickname': nickname,
- 'alarm': alarm,
- }
- push_thread = threading.Thread(
- target=cls.gateway_push_msg,
- kwargs=push_kwargs)
- push_thread.start()
- if equipment_info_list:
- equipment_info_model.objects.bulk_create(equipment_info_list)
- return response.json(0)
- except Exception as e:
- LOGGER.info('---网关推送接口异常--- {}'.format(repr(e)))
- return response.json(500, 'error_ine:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def confirm_lang(country_id):
- """
- 根据country_id确定语言
- @param country_id: 国家id
- @return lang: 语言
- """
- country_qs = CountryModel.objects.filter(id=country_id).values('country_code')
- if not country_qs.exists():
- lang = 'NA'
- else:
- lang = country_qs[0]['country_code']
- return lang
- @staticmethod
- def get_alarm(lang, event_type):
- """
- 根据语言和事件类型确定警报内容
- @param lang: 语言
- @param event_type: 事件类型
- @return alarm: 警报内容
- """
- alarm = ''
- if lang == 'CN':
- # 门磁
- if event_type == 2150:
- alarm = '门磁开'
- elif event_type == 2151:
- alarm = '门磁关'
- elif event_type == 2156:
- alarm = '被拆动'
- elif event_type == 2152:
- alarm = '拆动恢复'
- # 智能按钮
- elif event_type == 2160:
- alarm = '紧急按钮按下'
- elif event_type == 2161:
- alarm = '单击'
- elif event_type == 2162:
- alarm = '双击'
- elif event_type == 2163:
- alarm = '长按'
- # 水浸
- elif event_type == 2170:
- alarm = '水浸触发'
- elif event_type == 2171:
- alarm = '水浸恢复'
- # 烟雾
- elif event_type == 2180:
- alarm = '烟雾触发'
- elif event_type == 2181:
- alarm = '烟雾恢复'
- # 人体红外
- elif event_type == 2190:
- alarm = '有人移动'
- elif event_type == 2191:
- alarm = '无人移动'
- # 低电量
- elif event_type in (2153, 2164, 2172, 2182, 2193):
- alarm = '低电量'
- else:
- # 门磁
- if event_type == 2150:
- alarm = 'Door magnetic opening'
- elif event_type == 2151:
- alarm = 'Door magnetic closing'
- elif event_type == 2156:
- alarm = 'Be dismantled'
- elif event_type == 2152:
- alarm = 'Dismantling recovery'
- # 智能按钮
- elif event_type == 2160:
- alarm = 'Emergency button pressed'
- elif event_type == 2161:
- alarm = 'Single click'
- elif event_type == 2162:
- alarm = 'Double click'
- elif event_type == 2163:
- alarm = 'Long press'
- # 水浸
- elif event_type == 2170:
- alarm = 'Water immersion trigger'
- elif event_type == 2171:
- alarm = 'Water immersion recovery'
- # 烟雾
- elif event_type == 2180:
- alarm = 'Smoke triggering'
- elif event_type == 2181:
- alarm = 'Smoke recovery'
- # 人体红外
- elif event_type == 2190:
- alarm = 'Someone moving'
- elif event_type == 2191:
- alarm = 'Unmanned movement'
- # 低电量
- elif event_type in (2153, 2164, 2172, 2182, 2193):
- alarm = 'LOW BATTERY'
- return alarm
- @classmethod
- def scene_log_push(cls, request_dict, response):
- """
- 网关智能场景日志推送
- @param request_dict: 请求参数
- @request_dict sceneId: 场景id
- @request_dict status: 状态
- @param response: 响应对象
- @return: response
- """
- scene_id = request_dict.get('sceneId', None)
- status = request_dict.get('status', None)
- LOGGER.info('---场景日志推送接口--- request_dict:{}'.format(request_dict))
- if not all([scene_id, status]):
- return response.json(444)
- smart_scene_qs = SmartScene.objects.filter(id=scene_id).values('scene_name', 'conditions', 'tasks', 'device_id',
- 'sub_device_id', 'user_id', 'scene_data')
- if not smart_scene_qs.exists():
- return response.json(173)
- scene_name = smart_scene_qs[0]['scene_name']
- tasks = smart_scene_qs[0]['tasks']
- device_id = smart_scene_qs[0]['device_id']
- sub_device_id = smart_scene_qs[0]['sub_device_id']
- n_time = int(time.time())
- user_id = smart_scene_qs[0]['user_id']
- scene_data = smart_scene_qs[0]['scene_data']
- if sub_device_id:
- gateway_sub_device_qs = GatewaySubDevice.objects.filter(id=sub_device_id).values('nickname')
- nickname = gateway_sub_device_qs[0]['nickname'] if gateway_sub_device_qs.exists() else ''
- else:
- device_qs = Device_Info.objects.filter(id=device_id).values('NickName')
- nickname = device_qs[0]['NickName'] if device_qs.exists() else ''
- log_dict = {
- 'scene_id': scene_id,
- 'scene_name': scene_name,
- 'tasks': tasks,
- 'status': status,
- 'device_id': device_id,
- 'sub_device_id': sub_device_id,
- 'created_time': n_time,
- }
- tasks = eval(tasks)
- try:
- SceneLog.objects.create(**log_dict)
- # 如果是一次性场景,关闭场景
- if scene_data:
- scene_data_dict = eval(scene_data)
- condition = scene_data_dict.get('condition')
- if condition:
- time_type = condition.get('time')
- if time_type == 'date':
- smart_scene_qs.update(is_enable=False)
- # 推送日志
- gateway_push_qs = GatewayPush.objects.filter(user_id=user_id, logout=False). \
- values('user_id', 'app_bundle_id', 'app_type', 'push_type', 'token_val', 'm_code', 'lang', 'm_code',
- 'tz')
- if not gateway_push_qs.exists():
- return response.json(174)
- # 开启异步推送
- push_kwargs = {
- 'tasks': tasks,
- 'n_time': n_time,
- 'nickname': nickname,
- 'gateway_push_qs': gateway_push_qs
- }
- push_thread = threading.Thread(
- target=cls.scene_msg_push,
- kwargs=push_kwargs)
- push_thread.start()
- return response.json(0)
- except Exception as e:
- LOGGER.info('---场景日志推送接口异常--- {}'.format(repr(e)))
- return response.json(500, 'error_ine:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @classmethod
- def gateway_push_msg(cls, **push_kwargs):
- user_id = push_kwargs['user_id']
- # 查询推送配置数据
- gateway_push_qs = GatewayPush.objects.filter(user_id=user_id, logout=False). \
- values('user_id', 'app_bundle_id', 'app_type', 'push_type', 'token_val', 'm_code', 'lang', 'm_code',
- 'tz')
- if gateway_push_qs.exists():
- n_time = push_kwargs['n_time']
- event_type = push_kwargs['event_type']
- nickname = push_kwargs['nickname']
- alarm = push_kwargs['alarm']
- kwargs = {
- 'n_time': n_time,
- 'event_type': event_type,
- 'nickname': nickname,
- }
- # 推送到每台登录账号的手机
- for gateway_push in gateway_push_qs:
- app_bundle_id = gateway_push['app_bundle_id']
- push_type = gateway_push['push_type']
- token_val = gateway_push['token_val']
- lang = gateway_push['lang']
- tz = gateway_push['tz'] if gateway_push['tz'] else 0
- # 获取推送所需数据
- msg_title = PushObject.get_msg_title(nickname)
- msg_text = PushObject.get_gateway_msg_text(n_time, tz, lang, alarm)
- kwargs['msg_title'] = msg_title
- kwargs['msg_text'] = msg_text
- kwargs['app_bundle_id'] = app_bundle_id
- kwargs['token_val'] = token_val
- try:
- # 推送消息
- cls.push_msg(push_type, **kwargs)
- except Exception as e:
- LOGGER.info('网关推送消息异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- continue
- @classmethod
- def scene_msg_push(cls, **push_kwargs):
- tasks = push_kwargs['tasks']
- n_time = push_kwargs['n_time']
- nickname = push_kwargs['nickname']
- gateway_push_qs = push_kwargs['gateway_push_qs']
- for task in tasks:
- event_type = task['event_type']
- if event_type == '1001':
- kwargs = {
- 'n_time': n_time,
- 'event_type': event_type,
- 'nickname': nickname,
- }
- event_info = task['value']
- # 推送到每台登录账号的手机
- for gateway_push in gateway_push_qs:
- app_bundle_id = gateway_push['app_bundle_id']
- push_type = gateway_push['push_type']
- token_val = gateway_push['token_val']
- kwargs['msg_title'] = PushObject.get_msg_title(nickname)
- kwargs['msg_text'] = event_info
- kwargs['app_bundle_id'] = app_bundle_id
- kwargs['token_val'] = token_val
- try:
- # 推送消息
- cls.push_msg(push_type, **kwargs)
- except Exception as e:
- LOGGER.info(
- '场景日志推送消息异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- continue
- @staticmethod
- def push_msg(push_type, **kwargs):
- """
- 发送推送消息
- @param push_type: 推送类型
- @param kwargs: 推送参数
- @return: None
- """
- if push_type == 0: # ios apns
- PushObject.ios_apns_push(**kwargs)
- elif push_type == 1: # android gcm
- PushObject.android_fcm_push_v1(**kwargs)
- elif push_type == 2: # android 极光推送
- PushObject.android_jpush(**kwargs)
- elif push_type == 3:
- huawei_push_object = HuaweiPushObject()
- huawei_push_object.send_push_notify_message(**kwargs)
- elif push_type == 4: # android 小米推送
- channel_id = XM_PUSH_CHANNEL_ID['device_reminder']
- PushObject.android_xmpush(channel_id=channel_id, **kwargs)
- elif push_type == 5: # android vivo推送
- PushObject.android_vivopush(**kwargs)
- elif push_type == 6: # android oppo推送
- channel_id = 'DEVICE_REMINDER'
- PushObject.android_oppopush(channel_id=channel_id, **kwargs)
- elif push_type == 7: # android 魅族推送
- PushObject.android_meizupush(**kwargs)
- @classmethod
- def socket_msg_push(cls, request_dict, response):
- """
- 智能插座开关状态推送
- """
- try:
- serial_number = request_dict.get('serialNumber', None)
- device_time = request_dict.get('deviceTime', None)
- status = request_dict.get('status', None)
- if not all([serial_number, status, device_time]):
- return response.json(444)
- status = int(status)
- now_time = int(device_time) if device_time else int(time.time())
- # 获取主用户设备id
- log_dict = {
- 'status': status,
- 'device_id': serial_number,
- 'created_time': now_time,
- }
- SceneLog.objects.create(**log_dict)
- LOGGER.info('成功接收并保存,插座序列号{},状态:{}'.format(serial_number, status))
- return response.json(0)
- except Exception as e:
- LOGGER.info('插座开关日志推送接口异常, error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @classmethod
- def execute_scene_tasks(cls, gateway_sub_device_id, event_type, num, n_time):
- """
- 执行场景任务
- @param gateway_sub_device_id: 子设备id
- @param event_type: 事件类型
- @param num: 温湿度值
- @param n_time: 当前时间
- @return:
- """
- smart_scene_qs = SmartScene.objects.filter(sub_device_id=gateway_sub_device_id, is_enable=True). \
- values('id', 'scene_data', 'tz', 'is_all_day', 'effective_time_id')
- if smart_scene_qs.exists():
- for smart_scene in smart_scene_qs:
- if smart_scene['scene_data']:
- scene_data_dict = eval(smart_scene['scene_data'])
- condition = scene_data_dict.get('condition')
- if condition:
- # None 或 event_type
- scene_data_event_type = condition.get('event_type')
- # 触发事件类型跟条件事件类型一致,且任务列表不为空
- task_list = scene_data_dict.get('task_list')
- if event_type == scene_data_event_type and task_list:
- # 判断温湿度是否在条件设置的范围
- if num is not None:
- condition_value = condition.get('value')
- space_index = condition_value.index(' ')
- symbol, value = condition_value[:space_index], float(condition_value[space_index + 1:])
- # 温湿度不在设定范围,不执行
- if not ((symbol == '≥' and num >= value) or (symbol == '≤' and num <= value)):
- continue
- execute_task = cls.judge_execute_task(smart_scene, n_time)
- # 执行任务
- if execute_task:
- scene_id = 0
- task_list_len = len(task_list)
- for index, task in enumerate(task_list):
- # 最后一个任务上报日志
- if index == task_list_len - 1:
- scene_id = smart_scene['id']
- kwargs = {
- 'sensor_event_type': event_type,
- 'device_type': task['device_type'],
- 'event_type': task['event_type'],
- 'serial_number': task['serial_number'],
- 'delay_time': task['delay_time'],
- 'scene_id': scene_id
- }
- pub_mqtt_thread = threading.Thread(target=cls.pub_mqtt, kwargs=kwargs)
- pub_mqtt_thread.start()
- @classmethod
- def judge_execute_task(cls, smart_scene, n_time):
- """
- 判断是否执行任务
- @param smart_scene: 场景数据
- @param n_time: 当前时间
- @return execute_task: bool
- """
- execute_task = False
- # 判断时间是否在执行时间范围内
- if smart_scene['is_all_day'] == 1:
- # 全天必执行
- execute_task = True
- elif smart_scene['is_all_day'] == 2:
- # 非全天,判断当前时间是否在设置的时间范围
- tz = smart_scene['tz']
- time_minute, week = cls.get_time_info(n_time, tz)
- effective_time_id = smart_scene['effective_time_id']
- effective_time_qs = EffectiveTime.objects.filter(id=effective_time_id). \
- values('start_time', 'end_time', 'repeat')
- if effective_time_qs.exists():
- start_time = effective_time_qs[0]['start_time']
- end_time = effective_time_qs[0]['end_time']
- repeat = effective_time_qs[0]['repeat']
- time_frame_dict, time_frame_next_day_dict = cls.get_time_frame_dict(
- start_time, end_time, repeat)
- # 判断当前时间是否在设置的时间范围
- if time_frame_dict.get(week):
- start_time = time_frame_dict[week]['start_time']
- end_time = time_frame_dict[week]['end_time']
- if start_time <= time_minute <= end_time:
- execute_task = True
- if time_frame_next_day_dict.get(week):
- start_time = time_frame_next_day_dict[week]['start_time']
- end_time = time_frame_next_day_dict[week]['end_time']
- if start_time <= time_minute <= end_time:
- execute_task = True
- return execute_task
- @staticmethod
- def get_time_info(timestamp, timezone_offset):
- """
- 根据时间戳和时区获取时间(时分转为分钟数)和星期
- @param timestamp: 时间戳
- @param timezone_offset: 时区
- @return: time_minute, week
- """
- # 计算时区偏移量(以分钟为单位)
- timezone_offset_minutes = int(timezone_offset * 60)
- timezone = pytz.FixedOffset(timezone_offset_minutes)
- # 将时间戳转换为datetime对象,并应用时区
- dt_object = datetime.datetime.fromtimestamp(timestamp, tz=timezone)
- # 获取时分,并转为分钟数
- hour = dt_object.hour
- minute = dt_object.minute
- time_minute = hour * 60 + minute
- # 获取星期(0表示星期一,6表示星期日)
- week = str(dt_object.weekday())
- return time_minute, week
- @staticmethod
- def get_time_frame_dict(start_time, end_time, repeat):
- """
- 获取时间范围字典
- @param start_time: 开始时间
- @param end_time: 结束时间
- @param repeat: 星期周期的十进制数,如127 -> 0,1,2,3,4,5,6
- @return: time_frame_dict, time_frame_next_day_dict
- """
- # 十进制转为7位的二进制并倒序
- bin_str = bin(repeat)[2:].zfill(7)[::-1]
- # 生成星期周期列表
- week_list = []
- for i, bit in enumerate(bin_str):
- if bit == '1':
- week_list.append(i)
- # 生成时间范围字典
- time_frame_dict = {}
- time_frame_next_day_dict = {}
- # 非隔天
- if end_time > start_time:
- for week in week_list:
- time_frame_dict[str(week)] = {
- 'start_time': start_time,
- 'end_time': end_time
- }
- # 隔天
- else:
- # time_frame_dict记录当天时间范围,time_frame_next_day_dict记录溢出到第二天的时间范围
- for week in week_list:
- time_frame_dict[str(week)] = {
- 'start_time': start_time,
- 'end_time': 1439 # 23:59
- }
- week += 1
- if week == 7:
- week = 0
- time_frame_next_day_dict[str(week)] = {
- 'start_time': 0, # 00:00
- 'end_time': end_time
- }
- return time_frame_dict, time_frame_next_day_dict
- @staticmethod
- def pub_mqtt(sensor_event_type, device_type, event_type, serial_number, delay_time, scene_id=0):
- """
- 发布mqtt消息
- @param sensor_event_type: 传感器事件类型
- @param device_type: 设备类型
- @param event_type: 事件类型
- @param serial_number: 序列号
- @param delay_time: 延迟时间
- @param scene_id: 场景id
- @return:
- """
- msg = {}
- # 插座
- if device_type == DEVICE_TYPE['socket']:
- topic_name = SMART_SOCKET_TOPIC.format(serial_number)
- status = 1 if event_type == EVENT_TYPE['socket_power_on'] else 0
- msg['type'] = 1
- msg['data'] = {
- 'deviceSwitch': status
- }
- # 摄像头
- elif device_type == DEVICE_TYPE['C516']:
- topic_name = ANSJER_GENERIC_TOPIC.format(serial_number)
- if event_type == EVENT_TYPE['detection_reminder_on']:
- msg['commandType'] = 'detection_reminder'
- msg['enable'] = 1
- elif event_type == EVENT_TYPE['detection_reminder_off']:
- msg['commandType'] = 'detection_reminder'
- msg['enable'] = 0
- elif event_type == EVENT_TYPE['snapshot']:
- msg['commandType'] = 'snapshot'
- msg['eventType'] = sensor_event_type
- elif event_type == EVENT_TYPE['record_video']:
- msg['commandType'] = 'record_video'
- elif event_type == EVENT_TYPE['human_tracking_on']:
- msg['commandType'] = 'human_tracking'
- msg['enable'] = 1
- elif event_type == EVENT_TYPE['human_tracking_off']:
- msg['commandType'] = 'human_tracking'
- msg['enable'] = 0
- else:
- return
- else:
- return
- if delay_time:
- time.sleep(delay_time)
- CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg)
- # 没有设备任务时,最后一个任务上报场景日志
- if scene_id:
- data = {
- 'sceneId': scene_id,
- 'status': 1
- }
- url = DETECT_PUSH_DOMAIN + 'gatewayService/sceneLogPush'
- requests.post(url=url, data=data, timeout=8)
|