Procházet zdrojové kódy

通过pipeline写数据

locky před 1 rokem
rodič
revize
27439364b6

+ 21 - 28
Controller/Cron/CronTaskController.py

@@ -38,37 +38,30 @@ class CronTaskView(View):
 
     @classmethod
     def equipment_info(cls, response):
-        for equipment_info_key in EQUIPMENT_INFO_KEY_LIST:
-            # 异步保存数据
-            kwargs = {
-                'equipment_info_key': equipment_info_key
-            }
-            push_thread = threading.Thread(
-                target=cls.save_equipment_info,
-                kwargs=kwargs)
-            push_thread.start()
+        # 异步保存数据
+        push_thread = threading.Thread(target=cls.save_equipment_info)
+        push_thread.start()
         return response.json(0)
 
     @staticmethod
-    def save_equipment_info(**kwargs):
-        equipment_info_key = kwargs['equipment_info_key']
-        equipment_info_model = EQUIPMENT_INFO_DICT[equipment_info_key]
+    def save_equipment_info():
+        redis_obj_pipe = RedisObject().pipe
+        for equipment_info_key in EQUIPMENT_INFO_KEY_LIST:
+            equipment_info_model = EQUIPMENT_INFO_DICT[equipment_info_key]
 
-        # 读取缓存的前n条数据批量写入
-        redis_r_obj = RedisObject(mode='r')
-        equipment_info_redis_list = redis_r_obj.lrange(equipment_info_key, 0, size - 1)
-        redis_r_obj.close()
+            # 读取缓存的前n条数据批量写入
+            redis_r_obj = RedisObject(mode='r')
+            equipment_info_redis_list = redis_r_obj.lrange(equipment_info_key, 0, size - 1)
 
-        redis_obj = RedisObject()
-        redis_obj.ltrim(equipment_info_key, size, -1)
-        redis_obj.close()
+            redis_obj_pipe.ltrim(equipment_info_key, size, -1)
 
-        equipment_info_list = []
-        for equipment_info in equipment_info_redis_list:
-            equipment_info_data = eval(equipment_info)
-            # 设备昵称存在表情,解码utf-8
-            if equipment_info_data.get('device_nick_name') is not None:
-                equipment_info_data['device_nick_name'] = equipment_info_data['device_nick_name']. \
-                    encode('UTF-8', 'ignore').decode('UTF-8')
-            equipment_info_list.append(equipment_info_model(**equipment_info_data))
-        equipment_info_model.objects.bulk_create(equipment_info_list)
+            equipment_info_list = []
+            for equipment_info in equipment_info_redis_list:
+                equipment_info_data = eval(equipment_info)
+                # 设备昵称存在表情,解码utf-8
+                if equipment_info_data.get('device_nick_name') is not None:
+                    equipment_info_data['device_nick_name'] = equipment_info_data['device_nick_name']. \
+                        encode('UTF-8', 'ignore').decode('UTF-8')
+                equipment_info_list.append(equipment_info_model(**equipment_info_data))
+            equipment_info_model.objects.bulk_create(equipment_info_list)
+        redis_obj_pipe.execute()

+ 6 - 7
Controller/DetectController.py

@@ -44,7 +44,7 @@ class NotificationView(View):
         is_st = request_dict.get('is_st', None)
         if not all([channel, n_time]):
             return JsonResponse(status=200, data={'code': 444, 'msg': 'error channel or n_time'})
-        redisObj = RedisObject(db=6)
+        redis_obj_pipe = RedisObject(db=6).pipe
         redis_r_obj = RedisObject(mode='r', db=6)
         try:
             uid = DevicePushService.decode_uid(etk, uidToken)  # 解密uid
@@ -84,11 +84,10 @@ class NotificationView(View):
                     return JsonResponse(status=200, data={'code': 176, 'msg': 'no uid_push data'})
                 # 修改redis数据,并设置过期时间为10分钟
                 uid_push_list = DevicePushService.qs_to_list(uid_push_qs)
-                redisObj.set_data(key=ykey, val=str(uid_push_list), expire=600)
+                redis_obj_pipe.set(ykey, str(uid_push_list), 600)
                 if not uid_push_list:
                     res_data = {'code': 404, 'msg': 'error !'}
                     return JsonResponse(status=200, data=res_data)
-            redis_r_obj.close()
 
             if not uid_push_list:
                 res_data = {'code': 0, 'msg': 'uid_push_list not exist'}
@@ -109,12 +108,12 @@ class NotificationView(View):
                         new_detect_interval = uid_push_list[0]['uid_set__new_detect_interval']
                         detect_interval = new_detect_interval if new_detect_interval > 0 else detect_interval
                         detect_interval = 60 if detect_interval < 60 else detect_interval
-                    redisObj.set_data(key=dkey, val=1, expire=detect_interval - 5)
-                    redisObj.set_data(key=pkey, val=1, expire=60)
+                    redis_obj_pipe.set(dkey, 1, detect_interval - 5)
+                    redis_obj_pipe.set(pkey, 1, 60)
             # 旧模式并且没有pkey,重新创建一个
             if not detect_group and not have_pkey:
-                redisObj.set_data(key=pkey, val=1, expire=60)
-            redisObj.close()
+                redis_obj_pipe.set(pkey, 1, 60)
+            redis_obj_pipe.execute()
 
             auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET)
             bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg')

+ 6 - 5
Controller/DetectControllerV2.py

@@ -60,7 +60,7 @@ class NotificationV2View(View):
         is_st = int(is_st)
         region = int(region)
         event_type = int(event_type)
-        redis_obj = RedisObject()
+        redis_obj_pipe = RedisObject().pipe
         redis_r_obj = RedisObject(mode='r')
         try:
             uid = DevicePushService.decode_uid(etk, uidToken)
@@ -77,12 +77,11 @@ class NotificationV2View(View):
             req_limiting = '{}_{}_{}_ptl'.format(uid, channel, event_type)
             cache_req_limiting = redis_r_obj.get_data(key=req_limiting)  # 获取请求限流缓存数据
             cache_app_push = redis_r_obj.get_data(key=push_interval)  # 获取APP推送消息时间间隔缓存数据
-            redis_r_obj.close()
+
             if event_type not in [606, 607]:
                 if cache_req_limiting:  # 限流存在则直接返回
                     return JsonResponse(status=200, data={'code': 0, 'msg': 'Push again in one minute'})
-            redis_obj.set_data(key=req_limiting, val=1, expire=60)  # 当缓存不存在限流数据 重新设置一分钟请求一次
-            redis_obj.close()
+            redis_obj_pipe.set(req_limiting, 1, 60)  # 当缓存不存在限流数据 重新设置一分钟请求一次
 
             # 查询uid_push和uid_set数据
             uid_push_qs = DevicePushService.query_uid_push(uid, event_type)
@@ -103,11 +102,13 @@ class NotificationV2View(View):
             if event_type not in [606, 607]:
                 if not cache_app_push:
                     # 缓存APP提醒推送间隔 默认1分钟提醒一次
-                    DevicePushService.cache_push_detect_interval(push_interval, detect_interval,
+                    DevicePushService.cache_push_detect_interval(redis_obj_pipe, push_interval, detect_interval,
                                                                  uid_set_push_list[0]['uid_set__new_detect_interval'])
             else:
                 cache_app_push = ''
 
+            redis_obj_pipe.execute()
+
             bucket = ''
             aws_s3_client = ''
             # 推图,初始化s3 client

+ 6 - 5
Object/RedisObject.py

@@ -10,21 +10,22 @@ class RedisObject:
 
     def __init__(self, mode='w', db=0):
         if CONFIG_INFO != CONFIG_US:
-            self.POOL = redis.ConnectionPool(host=REDIS_ADDRESS, port=6379, db=db)
-            self.CONN = redis.Redis(connection_pool=self.POOL)
+            pool = redis.ConnectionPool(host=REDIS_ADDRESS, port=6379, db=db)
+            self.CONN = redis.Redis(connection_pool=pool)
+            self.pipe = self.CONN.pipeline()
         else:
             if mode == 'w':
                 host = OCI_REDIS_NODE_PRIMARY
             else:
                 host = OCI_REDIS_NODE_READ
-            self.POOL = redis.ConnectionPool(connection_class=SSLConnection, host=host, port=6379, db=db,
-                                             max_connections=100)
+            pool = redis.ConnectionPool(connection_class=SSLConnection, host=host, port=6379, db=db)
             self.CONN = redis.StrictRedis(
-                connection_pool=self.POOL,
+                connection_pool=pool,
                 host=host,
                 ssl=True,
                 ssl_cert_reqs=None,
             )
+            self.pipe = self.CONN.pipeline()
 
     def set_data(self, key, val, expire=0):
         try:

+ 10 - 10
Service/DevicePushService.py

@@ -125,9 +125,10 @@ class DevicePushService:
         return qs_list
 
     @staticmethod
-    def cache_push_detect_interval(name, detect_interval, new_detect_interval):
+    def cache_push_detect_interval(redis_obj_pipe, name, detect_interval, new_detect_interval):
         """
         缓存设置推送消息的时间间隔
+        @param redis_obj_pipe: redis pipeline
         @param name: redis key
         @param detect_interval: 原推送时间间隔
         @param new_detect_interval: 新推送时间间隔
@@ -137,9 +138,7 @@ class DevicePushService:
             detect_interval = 60 if detect_interval < 60 else detect_interval
         else:  # 国内推送兼容问题,有值并且大于旧消息间隔则使用new_detect_interval
             detect_interval = new_detect_interval if new_detect_interval > detect_interval else detect_interval
-        redis_obj = RedisObject()
-        redis_obj.set_data(key=name, val=1, expire=detect_interval - 5)
-        redis_obj.close()
+        redis_obj_pipe.set(name, 1, detect_interval - 5)
 
     @classmethod
     def save_msg_push(cls, **params):
@@ -153,7 +152,8 @@ class DevicePushService:
         uid = params['uid']
         push_kwargs = params['push_kwargs']
         now_time = int(time.time())
-        redis_obj = RedisObject()
+        redis_r_obj = RedisObject(mode='r')
+        redis_obj_pipe = RedisObject().pipe
         try:
             params['event_tag'] = cls.get_event_tag(params['ai_type'], params['event_type'], params['detection'])
             is_app_push = True if params['event_type'] in [606, 607] else \
@@ -201,7 +201,7 @@ class DevicePushService:
                         }
                         # 保存到redis列表
                         equipment_info_value = json.dumps(equipment_info_kwargs)
-                        redis_obj.rpush(equipment_info_key, equipment_info_value)
+                        redis_obj_pipe.rpush(equipment_info_key, equipment_info_value)
                     saved_user_id_list.append(user_id)
 
                 # 推送
@@ -250,12 +250,12 @@ class DevicePushService:
                 if params['event_type'] in [606, 607] or CONFIG_INFO != CONFIG_US:
                     end = 0
                     # 缓存数据多于100条,批量保存前100条,否则保存全部
-                    equipment_info_len = redis_obj.llen(equipment_info_key)
+                    equipment_info_len = redis_r_obj.llen(equipment_info_key)
                     end = 99 if equipment_info_len > 100 else equipment_info_len - 1
 
                     if end != 0:
-                        equipment_info_redis_list = redis_obj.lrange(equipment_info_key, 0, end)
-                        redis_obj.ltrim(equipment_info_key, end+1, -1)
+                        equipment_info_redis_list = redis_r_obj.lrange(equipment_info_key, 0, end)
+                        redis_obj_pipe.ltrim(equipment_info_key, end+1, -1)
 
                         for equipment_info in equipment_info_redis_list:
                             equipment_info_data = eval(equipment_info)
@@ -266,7 +266,7 @@ class DevicePushService:
                             equipment_info_list.append(equipment_info_model(**equipment_info_data))
                         equipment_info_model.objects.bulk_create(equipment_info_list)
 
-            redis_obj.close()
+            redis_obj_pipe.execute()
 
             return True
         except Exception as e: