Jelajahi Sumber

传感器触发事件联动插座场景

locky 1 tahun lalu
induk
melakukan
4af39d3022

+ 11 - 0
AnsjerPush/Config/gatewaySensorConfig.py

@@ -8,6 +8,14 @@
 # MQTT主题名
 SMART_SCENE_TOPIC = 'loocam/gateway_sensor/smart_scene/{}'
 GET_SCENE_TOPIC = 'loocam/gateway_sensor/get_scene/{}'
+SMART_SOCKET_TOPIC = 'loocam/smart-socket/{}'
+
+# 设备类型
+DEVICE_TYPE = {
+    'gateway': 200,
+    'socket': 201,
+    'switch': 202
+}
 
 # 传感器类型
 SENSOR_TYPE = {
@@ -54,4 +62,7 @@ EVENT_TYPE = {
     'temperature': 2200,
     'humidity': 2201,
     'tem_hum_sensor_low_power': 2202,
+    # 插座电源
+    'socket_power_on': 2010,
+    'socket_power_off': 2011,
 }

+ 1 - 1
AnsjerPush/cn_config/cn_formal_config.py

@@ -73,7 +73,7 @@ PAYPAL_CRD = {
     "client_id": "AdSRd6WBn-qLl9OiQHQuNYTDFSx0ZX0RUttqa58au8bPzoGYQUrt8bc6591RmH8_pEAIPijdvVYSVXyI",
     "client_secret": "ENT-J08N3Fw0B0uAokg4RukljAwO9hFHPf8whE6-Dwd8oBWJO8AWMgpdTKpfB1pOy89t4bsFEzMWDowm"
 }
-DETECT_PUSH_DOMAIN = 'http://push.dvema.com/'
+DETECT_PUSH_DOMAIN = 'http://push.zositechc.cn/'
 
 
 JPUSH_CONFIG = {

+ 1 - 1
AnsjerPush/eur_config/eur_formal_config.py

@@ -73,7 +73,7 @@ PAYPAL_CRD = {
     "client_id": "AdSRd6WBn-qLl9OiQHQuNYTDFSx0ZX0RUttqa58au8bPzoGYQUrt8bc6591RmH8_pEAIPijdvVYSVXyI",
     "client_secret": "ENT-J08N3Fw0B0uAokg4RukljAwO9hFHPf8whE6-Dwd8oBWJO8AWMgpdTKpfB1pOy89t4bsFEzMWDowm"
 }
-DETECT_PUSH_DOMAIN = 'http://push.dvema.com/'
+DETECT_PUSH_DOMAIN = 'http://push.zositeche.com/'
 
 
 JPUSH_CONFIG = {

+ 1 - 1
AnsjerPush/test_config/test_config.py

@@ -59,7 +59,7 @@ SERVER_DOMAIN = 'http://www.dvema.com/'
 SERVER_DOMAIN_SSL = 'https://www.dvema.com/'
 DOMAIN_HOST = 'www.dvema.com'
 SERVER_HOST = 'localhost'
-DETECT_PUSH_DOMAIN = 'http://push.dvema.com/'
+DETECT_PUSH_DOMAIN = 'http://test.push.zositechc.cn/'
 
 
 JPUSH_CONFIG = {

+ 221 - 12
Controller/gatewayController.py

@@ -4,15 +4,22 @@
 @Time : 2022/5/9 10:51
 @File :gatewayController.py
 """
+import datetime
+import threading
 import time
 
+import pytz
+import requests
 from django.views.generic.base import View
 
-from AnsjerPush.Config.gatewaySensorConfig import SENSOR_TYPE, EVENT_TYPE
+from AnsjerPush.Config.gatewaySensorConfig import SENSOR_TYPE, EVENT_TYPE, DEVICE_TYPE, SMART_SOCKET_TOPIC
 from AnsjerPush.config import LOGGER, XM_PUSH_CHANNEL_ID
-from Model.models import SensorRecord, GatewaySubDevice, GatewayPush, Device_Info, SceneLog, SmartScene, CountryModel
+from Model.models import SensorRecord, GatewaySubDevice, GatewayPush, Device_Info, SceneLog, SmartScene, CountryModel, \
+    EffectiveTime
 from Object.RedisObject import RedisObject
 from Object.ResponseObject import ResponseObject
+from Service.CommonService import CommonService
+from AnsjerPush.config import DETECT_PUSH_DOMAIN
 from Service.EquipmentInfoService import EquipmentInfoService
 from Service.HuaweiPushService.HuaweiPushService import HuaweiPushObject
 from Service.PushService import PushObject
@@ -63,9 +70,18 @@ class GatewayView(View):
         if not all([serial_number, ieee_addr, sensor_type, event_type]):
             return response.json(444)
 
+        num = None
         n_time = int(time.time())
 
         try:
+            # 获取温湿度
+            if sensor_type == SENSOR_TYPE['tem_hum_sensor'] and (
+                    event_type == EVENT_TYPE['temperature'] or event_type == EVENT_TYPE['humidity']):
+                num = request_dict.get('num')
+                if num is None:
+                    return response.json(444)
+                num = int(num) / 100
+
             # 查询子设备表id
             gateway_sub_device_qs = GatewaySubDevice.objects.filter(device__serial_number=serial_number,
                                                                     device_type=sensor_type, ieee_addr=ieee_addr). \
@@ -73,11 +89,21 @@ class GatewayView(View):
             if not gateway_sub_device_qs.exists():
                 return response.json(173)
 
+            gateway_sub_device_id = gateway_sub_device_qs[0]['id']
+            # 多线程执行场景任务
+            task_kwargs = {
+                'num': num,
+                'n_time': n_time,
+                'event_type': event_type,
+                'gateway_sub_device_id': gateway_sub_device_id
+            }
+            execute_task_thread = threading.Thread(target=cls.execute_scene_tasks, kwargs=task_kwargs)
+            execute_task_thread.start()
+
             country_id = gateway_sub_device_qs[0]['device__userID__region_country']
             lang = cls.confirm_lang(country_id)
             alarm = cls.get_alarm(lang, event_type)
 
-            gateway_sub_device_id = gateway_sub_device_qs[0]['id']
             nickname = gateway_sub_device_qs[0]['nickname']
             sensor_record_dict = {
                 'gateway_sub_device_id': gateway_sub_device_id,
@@ -85,12 +111,9 @@ class GatewayView(View):
                 'event_type': event_type,
                 'created_time': n_time,
             }
-            # 处理温湿度,不推送
-            if sensor_type == SENSOR_TYPE['tem_hum_sensor'] and (
-                    event_type == EVENT_TYPE['temperature'] or event_type == EVENT_TYPE['humidity']):
-                num = request_dict.get('num', None)
-                num = str(int(num) / 100)
-                sensor_record_dict['alarm'] = num
+            # 温湿度上报不推送
+            if num is not None:
+                sensor_record_dict['alarm'] = str(num)
                 SensorRecord.objects.create(**sensor_record_dict)
                 return response.json(0)
 
@@ -434,6 +457,192 @@ class GatewayView(View):
             LOGGER.info('成功接收并保存,插座序列号{},状态:{}'.format(serial_number, status))
             return response.json(0)
         except Exception as e:
-            print(repr(e))
-            LOGGER.info('---插座开关日志推送接口异常--- {}'.format(repr(e)))
-            return response.json(500, repr(e))
+            LOGGER.info('插座开关日志推送接口异常, error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
+    @classmethod
+    def execute_scene_tasks(cls, gateway_sub_device_id, event_type, num, n_time):
+        """
+        执行场景任务
+        @param gateway_sub_device_id: 子设备id
+        @param event_type: 事件类型
+        @param num: 温湿度值
+        @param n_time: 当前时间
+        @return:
+        """
+        smart_scene_qs = SmartScene.objects.filter(sub_device_id=gateway_sub_device_id, is_enable=True). \
+            values('id', 'scene_data', 'tz', 'is_all_day', 'effective_time_id')
+        if smart_scene_qs.exists():
+            for smart_scene in smart_scene_qs:
+                if smart_scene['scene_data']:
+                    scene_data_dict = eval(smart_scene['scene_data'])
+                    condition = scene_data_dict.get('condition')
+                    if condition:
+                        # None 或 event_type
+                        scene_data_event_type = condition.get('event_type')
+                        # 触发事件类型跟条件事件类型一致,且任务列表不为空
+                        task_list = scene_data_dict.get('task_list')
+                        if event_type == scene_data_event_type and task_list:
+                            # 判断温湿度是否在条件设置的范围
+                            if num is not None:
+                                condition_value = condition.get('value')
+                                space_index = condition_value.index(' ')
+                                symbol, value = condition_value[:space_index], float(condition_value[space_index + 1:])
+                                # 温湿度不在设定范围,不执行
+                                if not ((symbol == '≥' and num >= value) or (symbol == '≤' and num <= value)):
+                                    continue
+                            execute_task = cls.judge_execute_task(smart_scene, n_time)
+
+                            # 执行任务
+                            if execute_task:
+                                scene_id = 0
+                                task_list_len = len(task_list)
+                                for index, task in enumerate(task_list):
+                                    # 最后一个任务上报日志
+                                    if index == task_list_len - 1:
+                                        scene_id = smart_scene['id']
+                                    kwargs = {
+                                        'device_type': task['device_type'],
+                                        'event_type': task['event_type'],
+                                        'serial_number': task['serial_number'],
+                                        'delay_time': task['delay_time'],
+                                        'scene_id': scene_id
+                                    }
+                                    pub_mqtt_thread = threading.Thread(target=cls.pub_mqtt, kwargs=kwargs)
+                                    pub_mqtt_thread.start()
+
+    @classmethod
+    def judge_execute_task(cls, smart_scene, n_time):
+        """
+        判断是否执行任务
+        @param smart_scene: 场景数据
+        @param n_time: 当前时间
+        @return execute_task: bool
+        """
+        execute_task = False
+        # 判断时间是否在执行时间范围内
+        if smart_scene['is_all_day'] == 1:
+            # 全天必执行
+            execute_task = True
+        elif smart_scene['is_all_day'] == 2:
+            # 非全天,判断当前时间是否在设置的时间范围
+            tz = smart_scene['tz']
+            time_minute, week = cls.get_time_info(n_time, tz)
+
+            effective_time_id = smart_scene['effective_time_id']
+            effective_time_qs = EffectiveTime.objects.filter(id=effective_time_id). \
+                values('start_time', 'end_time', 'repeat')
+            if effective_time_qs.exists():
+                start_time = effective_time_qs[0]['start_time']
+                end_time = effective_time_qs[0]['end_time']
+                repeat = effective_time_qs[0]['repeat']
+                time_frame_dict, time_frame_next_day_dict = cls.get_time_frame_dict(
+                    start_time, end_time, repeat)
+                # 判断当前时间是否在设置的时间范围
+                if time_frame_dict.get(week):
+                    start_time = time_frame_dict[week]['start_time']
+                    end_time = time_frame_dict[week]['end_time']
+                    if start_time <= time_minute <= end_time:
+                        execute_task = True
+                if time_frame_next_day_dict.get(week):
+                    start_time = time_frame_next_day_dict[week]['start_time']
+                    end_time = time_frame_next_day_dict[week]['end_time']
+                    if start_time <= time_minute <= end_time:
+                        execute_task = True
+        return execute_task
+
+    @staticmethod
+    def get_time_info(timestamp, timezone_offset):
+        """
+        根据时间戳和时区获取时间(时分转为分钟数)和星期
+        @param timestamp: 时间戳
+        @param timezone_offset: 时区
+        @return: time_minute, week
+        """
+        # 计算时区偏移量(以分钟为单位)
+        timezone_offset_minutes = int(timezone_offset * 60)
+        timezone = pytz.FixedOffset(timezone_offset_minutes)
+        # 将时间戳转换为datetime对象,并应用时区
+        dt_object = datetime.datetime.fromtimestamp(timestamp, tz=timezone)
+        # 获取时分,并转为分钟数
+        hour = dt_object.hour
+        minute = dt_object.minute
+        time_minute = hour * 60 + minute
+        # 获取星期(0表示星期一,6表示星期日)
+        week = str(dt_object.weekday())
+        return time_minute, week
+
+    @staticmethod
+    def get_time_frame_dict(start_time, end_time, repeat):
+        """
+        获取时间范围字典
+        @param start_time: 开始时间
+        @param end_time: 结束时间
+        @param repeat: 星期周期的十进制数,如127 -> 0,1,2,3,4,5,6
+        @return: time_frame_dict, time_frame_next_day_dict
+        """
+        # 十进制转为7位的二进制并倒序
+        bin_str = bin(repeat)[2:].zfill(7)[::-1]
+        # 生成星期周期列表
+        week_list = []
+        for i, bit in enumerate(bin_str):
+            if bit == '1':
+                week_list.append(i)
+
+        # 生成时间范围字典
+        time_frame_dict = {}
+        time_frame_next_day_dict = {}
+        # 非隔天
+        if end_time > start_time:
+            for week in week_list:
+                time_frame_dict[str(week)] = {
+                    'start_time': start_time,
+                    'end_time': end_time
+                }
+        # 隔天
+        else:
+            # time_frame_dict记录当天时间范围,time_frame_next_day_dict记录溢出到第二天的时间范围
+            for week in week_list:
+                time_frame_dict[str(week)] = {
+                    'start_time': start_time,
+                    'end_time': 1439    # 23:59
+                }
+                week += 1
+                if week == 7:
+                    week = 0
+                time_frame_next_day_dict[str(week)] = {
+                    'start_time': 0,    # 00:00
+                    'end_time': end_time
+                }
+        return time_frame_dict, time_frame_next_day_dict
+
+    @staticmethod
+    def pub_mqtt(device_type, event_type, serial_number, delay_time, scene_id=0):
+        """
+        发布mqtt消息
+        @param device_type: 设备类型
+        @param event_type: 事件类型
+        @param serial_number: 序列号
+        @param delay_time: 延迟时间
+        @param scene_id: 场景id
+        @return:
+        """
+        if device_type == DEVICE_TYPE['socket']:
+            topic_name = SMART_SOCKET_TOPIC.format(serial_number)
+            status = 1 if event_type == EVENT_TYPE['socket_power_on'] else 0
+            msg = {
+                'type': 1,
+                'data': {'deviceSwitch': status}
+            }
+            if delay_time:
+                time.sleep(delay_time)
+            CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg)
+
+        # 没有设备任务时,最后一个任务上报场景日志
+        if scene_id:
+            data = {
+                'sceneId': scene_id,
+                'status': 1
+            }
+            url = DETECT_PUSH_DOMAIN + 'gatewayService/sceneLogPush'
+            requests.post(url=url, data=data, timeout=8)

+ 12 - 0
Model/models.py

@@ -2701,6 +2701,18 @@ class UnicomFlowPush(models.Model):
         app_label = 'db2'
 
 
+class EffectiveTime(models.Model):
+    id = models.AutoField(primary_key=True, verbose_name=u'自增标记id')
+    start_time = models.SmallIntegerField(default=0, verbose_name=u'开始时间')
+    end_time = models.SmallIntegerField(default=0, verbose_name=u'结束时间')
+    repeat = models.SmallIntegerField(default=0, verbose_name=u'重复周期')
+
+    class Meta:
+        db_table = 'effective_time'
+        verbose_name = '场景执行时间'
+        verbose_name_plural = verbose_name
+
+
 class SceneLog(models.Model):
     id = models.AutoField(primary_key=True, verbose_name='自增标记ID')
     scene_name = models.CharField(default='', max_length=100, verbose_name='场景名称')

+ 94 - 0
Service/CommonService.py

@@ -7,13 +7,17 @@ import hashlib
 from pathlib import Path
 from random import Random
 import ipdb
+import requests
 import simplejson as json
 from boto3 import Session
 from django.core import serializers
 from django.utils import timezone
 from pyipip import IPIPDatabase
+import OpenSSL.crypto as ct
+from base64 import encodebytes
 
 from AnsjerPush.config import BASE_DIR, ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION_NAME, PUSH_BUCKET
+from Model.models import iotdeviceInfoModel
 
 
 # 复用性且公用较高封装代码在这
@@ -328,3 +332,93 @@ class CommonService:
         struct_time = time.localtime(timestamp)
         time_str = time.strftime("%Y-%m-%d %H:%M:%S", struct_time)
         return time_str
+
+    @staticmethod
+    def req_publish_mqtt_msg(identification_code, topic_name, msg, qos=1):
+        """
+        通用发布MQTT消息函数
+        @param identification_code: 标识码
+        @param topic_name: 主题名
+        @param msg: 消息内容
+        @param qos: mqtt qos等级
+        @return: boolean
+        """
+        if not all([identification_code, topic_name]):
+            return False
+
+        if identification_code.endswith('11L'):
+            thing_name = 'LC_' + identification_code
+        else:
+            thing_name = 'Ansjer_Device_' + identification_code
+
+        try:
+            # 获取数据组织将要请求的url
+            iot = iotdeviceInfoModel.objects.filter(
+                thing_name=thing_name).values(
+                'endpoint', 'token_iot_number')
+            if not iot.exists():
+                return False
+            endpoint = iot[0]['endpoint']
+            Token = iot[0]['token_iot_number']
+
+            # api doc: https://docs.aws.amazon.com/zh_cn/iot/latest/developerguide/http.html
+            # url: https://IoT_data_endpoint/topics/url_encoded_topic_name?qos=1
+            # post请求url发布MQTT消息
+            url = 'https://{}/topics/{}?qos={}'.format(endpoint, topic_name, qos)
+            authorizer_name = 'Ansjer_Iot_Auth'
+            signature = CommonService.rsa_sign(Token)  # Token签名
+            headers = {
+                'x-amz-customauthorizer-name': authorizer_name,
+                'Token': Token,
+                'x-amz-customauthorizer-signature': signature}
+            r = requests.post(url=url, headers=headers, json=msg, timeout=2)
+            if r.status_code == 200:
+                res = r.json()
+                if res['message'] == 'OK':
+                    return True
+                return False
+            else:
+                return False
+        except Exception as e:
+            return False
+
+    @staticmethod
+    def rsa_sign(Token):
+        # 私钥签名Token
+        if not Token:
+            return ''
+        private_key_file = '''-----BEGIN RSA PRIVATE KEY-----
+MIIEpQIBAAKCAQEA5iJzEDPqtGmFMggekVro6C0lrjuC2BjunGkrFNJWpDYzxCzE
+X5jf4/Fq7hcIaQd5sqHugDxPVollSLPe9zNilbrd0sZfU+Ed8gRVuKW9KwfE9XFr
+L0pt6bKRQ0IIRfiZ9TuR0tsQysvcO1GZSXcYfPue3tGM1zOnWFThWDqZ06+sOxzt
+RMRl4yNfbpCG4MfxG3itNXOfrjZv2OMLSXrxmzubSvRpUYSvQPs4fm9302SAnySY
+0MKzx6H6528ZQm/IDDSZy6EmNBIyTRDfxC56vnYcXvqedAQh7jJnjdvt6Q4MhASH
+eIYi1FBSdu2NT6wgpnrqXzx5pq9kR/lnsLID0wIDAQABAoIBAQCiF4GT1/1oNSpr
+ouxk1PNXFPWFUsVGD8mAwVJmx//eiY7MjfuCmdqYYmI+cFqsH2fIOeYSzGfVO9Dq
+9EYHN1oovAWhf7eFDPpajFMUSyiCNmazub8VAAeKowtNpCTPo9pMsDh1m3aoYA4u
+ebrN0+Sbo16y8kWRDgDAZoiR7DSMs8lczk16hwfv5mw8XpNDbaL3Coi4Koe2S1Yh
+2SX3vWFlpd7qF1ZYXuZIp+b8JPrV7n9eUKoFgzj0gqgwQK80CoexIjiOrNMPvkQa
+q+8kCvFjAzKxOK7e8gjM8lMRiGodb61kmYZkkJzFwWO4EaGbl34lfVECd1Ixp3tF
+be0OWAGBAoGBAPSteXDzzToD8ovM7LL11x0jWwI6HOiHu89kZtW566rIezjWBuA2
+TxrcYKM3h9jQRXS3CsMdoIv6XGk5lqM8ADtjn23FBWe/THYLh8bm8JOgh5RRWQDg
+SvkLfi9Ih2mM4NJfmuuDOh3Nze2efLM7+kOZWUQwF2Zx9mL5jvRBk351AoGBAPDI
+sYmT2Li+i5+0vykA2m5uPF8ZOW8BGtAfCZv0suW7BNzSgin78g9WapRd/4p0NNiL
+/nVMqPPCpd1akCUpV+GDWQt0hV+HZjxANE0KWhciQRyo2qvo51j8SWILJSgh0tXC
+aTF8qt6oGw3VN3m57vKhbrlDaz0J/NDJFci6msAnAoGBAOuG6bXPGijUj+//DYKf
+n7jOxdZ49kboEePrtAncdHzri6IEdI3z+WXT6bpzw/LzWUimwldb96WHFNm9s8Hi
+Ch8hIODbnP5naUTgiIzw1XhmONyPCewL/F+LrqX5XVA/alNX8JrwsUrrR2WLAGLQ
+Q3I69XDsEjptTU2tCO0bCs3ZAoGBAJ2lCHfm0JHET230zONvp5N9oREyVqQSuRdh
++syc3TQDyh85w/bw+X6JOaaCFHj1tFPC9Iqf8k4GNspCLPXnp54CfR4+38O3xnvU
+HWoDSRC0YKT++IxtJGriYrlKSr2Hx54kdvLriIPW1D+uRW/xCDza7L9nIKMKEvgv
+b4/IfOEpAoGAeKM9Te7T1VzlAkS0CJOwanzwYV/zrex84WuXxlsGgPQ871lTs5AP
+H1QLfLfFXH+UVrCEC2yv4eml/cqFkpB3gE5i4MQ8GPVIOSs5tsIyl8YUA03vdNdB
+GCqvlyw5dfxNA+EtxNE2wCW/LW7ENJlACgcfgPlBZtpLheWoZB/maw4=
+-----END RSA PRIVATE KEY-----'''
+        # 使用密钥文件方式
+        # private_key_file_path = os.path.join(BASE_DIR, 'static/iotCore/private.pem')#.replace('\\', '/')
+        # private_key_file = open(private_key_file_path, 'r')
+        private_key = ct.load_privatekey(ct.FILETYPE_PEM, private_key_file)
+        signature = ct.sign(private_key, Token.encode('utf8'), 'sha256')
+        signature = encodebytes(signature).decode('utf8').replace('\n', '')
+        # print('signature:', signature)
+        return signature