Explorar el Código

代码回滚,欧洲服使用oci redis主节点

locky hace 1 año
padre
commit
d9bf2dde09

+ 0 - 2
AnsjerPush/cn_config/cn_formal_config.py

@@ -22,8 +22,6 @@ PUSH_BUCKET = 'push'                                # 推送存储桶
 
 # redis节点
 REDIS_ADDRESS = 'pushredis.3xavzq.0001.cnw1.cache.amazonaws.com.cn'
-OCI_REDIS_NODE_PRIMARY = ''
-OCI_REDIS_NODE_READ = ''
 
 APNS_MODE = 'prod'
 

+ 0 - 2
AnsjerPush/dev_config/local_config.py

@@ -180,5 +180,3 @@ MEIZUPUSH_CONFIG = {
 }
 APNS_MODE = 'dev'
 REDIS_ADDRESS = '127.0.0.1'
-OCI_REDIS_NODE_PRIMARY = '127.0.0.1'
-OCI_REDIS_NODE_READ = '127.0.0.1'

+ 0 - 2
AnsjerPush/eur_config/eur_formal_config.py

@@ -22,8 +22,6 @@ PUSH_BUCKET = 'foreignpush'                                # 推送存储桶
 
 # redis节点
 REDIS_ADDRESS = 'amaaaaaayszequiazwohcuwymldcfemd57hutngjgf5ngw6rzzhcw6pqsw2a-p.redis.uk-london-1.oci.oraclecloud.com'
-OCI_REDIS_NODE_PRIMARY = ''
-OCI_REDIS_NODE_READ = ''
 
 APNS_MODE = 'prod'
 

+ 0 - 2
AnsjerPush/test_config/test_config.py

@@ -22,8 +22,6 @@ PUSH_BUCKET = 'foreignpush'                                # 推送存储桶
 
 # redis节点
 REDIS_ADDRESS = '127.0.0.1'
-OCI_REDIS_NODE_PRIMARY = '127.0.0.1'
-OCI_REDIS_NODE_READ = '127.0.0.1'
 
 APNS_MODE = 'dev'
 

+ 0 - 3
AnsjerPush/us_config/formal_config.py

@@ -21,9 +21,6 @@ PUSH_BUCKET = 'foreignpush'                                # 推送存储桶
 
 # redis节点
 REDIS_ADDRESS = 'pushredis.5tgle2.0001.usw1.cache.amazonaws.com'
-OCI_REDIS_NODE_PRIMARY = 'amaaaaaayszequiamxr7cdpparig3ptmytvde5vvnz6n7gceo4232sbhhlsa-p.redis.us-phoenix-1.oci.oraclecloud.com'
-OCI_REDIS_NODE_READ = 'amaaaaaayszequiamxr7cdpparig3ptmytvde5vvnz6n7gceo4232sbhhlsa-r.redis.us-phoenix-1.oci.oraclecloud.com'
-
 
 APNS_MODE = 'prod'
 

+ 25 - 22
Controller/Cron/CronTaskController.py

@@ -38,30 +38,33 @@ class CronTaskView(View):
 
     @classmethod
     def equipment_info(cls, response):
-        # 异步保存数据
-        push_thread = threading.Thread(target=cls.save_equipment_info)
-        push_thread.start()
+        for equipment_info_key in EQUIPMENT_INFO_KEY_LIST:
+            # 异步保存数据
+            kwargs = {
+                'equipment_info_key': equipment_info_key
+            }
+            push_thread = threading.Thread(
+                target=cls.save_equipment_info,
+                kwargs=kwargs)
+            push_thread.start()
         return response.json(0)
 
     @staticmethod
-    def save_equipment_info():
-        redis_obj_pipe = RedisObject().pipe
-        for equipment_info_key in EQUIPMENT_INFO_KEY_LIST:
-            equipment_info_model = EQUIPMENT_INFO_DICT[equipment_info_key]
-
-            # 读取缓存的前n条数据批量写入
-            redis_r_obj = RedisObject(mode='r')
-            equipment_info_redis_list = redis_r_obj.lrange(equipment_info_key, 0, size - 1)
+    def save_equipment_info(**kwargs):
+        equipment_info_key = kwargs['equipment_info_key']
+        equipment_info_model = EQUIPMENT_INFO_DICT[equipment_info_key]
 
-            redis_obj_pipe.ltrim(equipment_info_key, size, -1)
+        # 读取缓存的前n条数据批量写入
+        redis_obj = RedisObject()
+        equipment_info_redis_list = redis_obj.lrange(equipment_info_key, 0, size - 1)
+        redis_obj.ltrim(equipment_info_key, size, -1)
 
-            equipment_info_list = []
-            for equipment_info in equipment_info_redis_list:
-                equipment_info_data = eval(equipment_info)
-                # 设备昵称存在表情,解码utf-8
-                if equipment_info_data.get('device_nick_name') is not None:
-                    equipment_info_data['device_nick_name'] = equipment_info_data['device_nick_name']. \
-                        encode('UTF-8', 'ignore').decode('UTF-8')
-                equipment_info_list.append(equipment_info_model(**equipment_info_data))
-            equipment_info_model.objects.bulk_create(equipment_info_list)
-        redis_obj_pipe.execute()
+        equipment_info_list = []
+        for equipment_info in equipment_info_redis_list:
+            equipment_info_data = eval(equipment_info)
+            # 设备昵称存在表情,解码utf-8
+            if equipment_info_data.get('device_nick_name') is not None:
+                equipment_info_data['device_nick_name'] = equipment_info_data['device_nick_name']. \
+                    encode('UTF-8', 'ignore').decode('UTF-8')
+            equipment_info_list.append(equipment_info_model(**equipment_info_data))
+        equipment_info_model.objects.bulk_create(equipment_info_list)

+ 9 - 12
Controller/DetectController.py

@@ -44,8 +44,7 @@ class NotificationView(View):
         is_st = request_dict.get('is_st', None)
         if not all([channel, n_time]):
             return JsonResponse(status=200, data={'code': 444, 'msg': 'error channel or n_time'})
-        redis_obj_pipe = RedisObject(db=6).pipe
-        redis_r_obj = RedisObject(mode='r', db=6)
+        redisObj = RedisObject(db=6)
         try:
             uid = DevicePushService.decode_uid(etk, uidToken)  # 解密uid
             if len(uid) != 20 and len(uid) != 14:
@@ -62,9 +61,9 @@ class NotificationView(View):
             else:
                 dkey = '{}_{}_flag'.format(uid, channel)
 
-            have_ykey = redis_r_obj.get_data(key=ykey)  # uid_set 数据库缓存
-            have_pkey = redis_r_obj.get_data(key=pkey)  # 一分钟限制key
-            have_dkey = redis_r_obj.get_data(key=dkey)  # 推送类型限制
+            have_ykey = redisObj.get_data(key=ykey)  # uid_set 数据库缓存
+            have_pkey = redisObj.get_data(key=pkey)  # 一分钟限制key
+            have_dkey = redisObj.get_data(key=dkey)  # 推送类型限制
 
             # 一分钟外,推送开启状态
             detect_med_type = 0  # 0推送旧机制 1存库不推送,2推送存库
@@ -75,7 +74,7 @@ class NotificationView(View):
 
             # 数据库读取数据
             if have_ykey:
-                uid_push_list = eval(redis_r_obj.get_data(key=ykey))
+                uid_push_list = eval(redisObj.get_data(key=ykey))
             else:
                 # 从数据库查询出来
                 uid_push_qs = DevicePushService.query_uid_push(uid, event_type)
@@ -84,7 +83,7 @@ class NotificationView(View):
                     return JsonResponse(status=200, data={'code': 176, 'msg': 'no uid_push data'})
                 # 修改redis数据,并设置过期时间为10分钟
                 uid_push_list = DevicePushService.qs_to_list(uid_push_qs)
-                redis_obj_pipe.set(ykey, str(uid_push_list), 600)
+                redisObj.set_data(key=ykey, val=str(uid_push_list), expire=600)
                 if not uid_push_list:
                     res_data = {'code': 404, 'msg': 'error !'}
                     return JsonResponse(status=200, data=res_data)
@@ -108,13 +107,11 @@ class NotificationView(View):
                         new_detect_interval = uid_push_list[0]['uid_set__new_detect_interval']
                         detect_interval = new_detect_interval if new_detect_interval > 0 else detect_interval
                         detect_interval = 60 if detect_interval < 60 else detect_interval
-                    redis_obj_pipe.set(dkey, 1, detect_interval - 5)
-                    redis_obj_pipe.set(pkey, 1, 60)
+                    redisObj.set_data(key=dkey, val=1, expire=detect_interval - 5)
+                    redisObj.set_data(key=pkey, val=1, expire=60)
             # 旧模式并且没有pkey,重新创建一个
             if not detect_group and not have_pkey:
-                redis_obj_pipe.set(pkey, 1, 60)
-            redis_obj_pipe.execute()
-
+                redisObj.set_data(key=pkey, val=1, expire=60)
             auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET)
             bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg')
 

+ 5 - 9
Controller/DetectControllerV2.py

@@ -60,8 +60,7 @@ class NotificationV2View(View):
         is_st = int(is_st)
         region = int(region)
         event_type = int(event_type)
-        redis_obj_pipe = RedisObject().pipe
-        redis_r_obj = RedisObject(mode='r')
+        redis_obj = RedisObject()
         try:
             uid = DevicePushService.decode_uid(etk, uidToken)
             if len(uid) != 20 and len(uid) != 14:
@@ -75,13 +74,12 @@ class NotificationV2View(View):
                 push_interval = '{}_{}_flag'.format(uid, channel)
 
             req_limiting = '{}_{}_{}_ptl'.format(uid, channel, event_type)
-            cache_req_limiting = redis_r_obj.get_data(key=req_limiting)  # 获取请求限流缓存数据
-            cache_app_push = redis_r_obj.get_data(key=push_interval)  # 获取APP推送消息时间间隔缓存数据
-
+            cache_req_limiting = redis_obj.get_data(key=req_limiting)  # 获取请求限流缓存数据
+            cache_app_push = redis_obj.get_data(key=push_interval)  # 获取APP推送消息时间间隔缓存数据
             if event_type not in [606, 607]:
                 if cache_req_limiting:  # 限流存在则直接返回
                     return JsonResponse(status=200, data={'code': 0, 'msg': 'Push again in one minute'})
-            redis_obj_pipe.set(req_limiting, 1, 60)  # 当缓存不存在限流数据 重新设置一分钟请求一次
+            redis_obj.set_data(key=req_limiting, val=1, expire=60)  # 当缓存不存在限流数据 重新设置一分钟请求一次
 
             # 查询uid_push和uid_set数据
             uid_push_qs = DevicePushService.query_uid_push(uid, event_type)
@@ -102,13 +100,11 @@ class NotificationV2View(View):
             if event_type not in [606, 607]:
                 if not cache_app_push:
                     # 缓存APP提醒推送间隔 默认1分钟提醒一次
-                    DevicePushService.cache_push_detect_interval(redis_obj_pipe, push_interval, detect_interval,
+                    DevicePushService.cache_push_detect_interval(redis_obj, push_interval, detect_interval,
                                                                  uid_set_push_list[0]['uid_set__new_detect_interval'])
             else:
                 cache_app_push = ''
 
-            redis_obj_pipe.execute()
-
             bucket = ''
             aws_s3_client = ''
             # 推图,初始化s3 client

+ 13 - 31
Object/RedisObject.py

@@ -1,7 +1,5 @@
 import redis
-from redis.connection import SSLConnection
-from AnsjerPush.config import REDIS_ADDRESS, OCI_REDIS_NODE_PRIMARY, OCI_REDIS_NODE_READ, \
-    CONFIG_INFO, CONFIG_US, CONFIG_EUR
+from AnsjerPush.config import REDIS_ADDRESS, CONFIG_INFO, CONFIG_US
 
 # 本地调试把注释打开
 # REDIS_ADDRESS = '127.0.0.1'
@@ -9,34 +7,18 @@ from AnsjerPush.config import REDIS_ADDRESS, OCI_REDIS_NODE_PRIMARY, OCI_REDIS_N
 
 class RedisObject:
 
-    def __init__(self, mode='w', db=0):
-        if CONFIG_INFO == CONFIG_EUR:
-            pool = redis.ConnectionPool(connection_class=SSLConnection, host=REDIS_ADDRESS, port=6379, db=db)
-            self.CONN = redis.StrictRedis(
-                connection_pool=pool,
-                host=REDIS_ADDRESS,
-                ssl=True,
-                ssl_cert_reqs=None,
-            )
-            self.pipe = self.CONN.pipeline()
-        else:
-            if CONFIG_INFO != CONFIG_US:
-                pool = redis.ConnectionPool(host=REDIS_ADDRESS, port=6379, db=db)
-                self.CONN = redis.Redis(connection_pool=pool)
-                self.pipe = self.CONN.pipeline()
-            else:
-                if mode == 'w':
-                    host = OCI_REDIS_NODE_PRIMARY
-                else:
-                    host = OCI_REDIS_NODE_READ
-                pool = redis.ConnectionPool(connection_class=SSLConnection, host=host, port=6379, db=db)
-                self.CONN = redis.StrictRedis(
-                    connection_pool=pool,
-                    host=host,
-                    ssl=True,
-                    ssl_cert_reqs=None,
-                )
-                self.pipe = self.CONN.pipeline()
+    def __init__(self, db=0):
+        self.POOL = redis.ConnectionPool(host=REDIS_ADDRESS, port=6379, db=db)
+        self.CONN = redis.Redis(connection_pool=self.POOL)
+        # if CONFIG_INFO != CONFIG_US:
+        #     self.POOL = redis.ConnectionPool(host=REDIS_ADDRESS, port=6379, db=db)
+        #     self.CONN = redis.Redis(connection_pool=self.POOL)
+        # else:
+        #     self.CONN = redis.StrictRedis(
+        #         host=REDIS_ADDRESS,
+        #         ssl=True,
+        #         ssl_cert_reqs=None,
+        #     )
 
     def set_data(self, key, val, expire=0):
         try:

+ 8 - 11
Service/DevicePushService.py

@@ -125,10 +125,10 @@ class DevicePushService:
         return qs_list
 
     @staticmethod
-    def cache_push_detect_interval(redis_obj_pipe, name, detect_interval, new_detect_interval):
+    def cache_push_detect_interval(redis_obj, name, detect_interval, new_detect_interval):
         """
         缓存设置推送消息的时间间隔
-        @param redis_obj_pipe: redis pipeline
+        @param redis_obj: redis对象
         @param name: redis key
         @param detect_interval: 原推送时间间隔
         @param new_detect_interval: 新推送时间间隔
@@ -138,7 +138,7 @@ class DevicePushService:
             detect_interval = 60 if detect_interval < 60 else detect_interval
         else:  # 国内推送兼容问题,有值并且大于旧消息间隔则使用new_detect_interval
             detect_interval = new_detect_interval if new_detect_interval > detect_interval else detect_interval
-        redis_obj_pipe.set(name, 1, detect_interval - 5)
+        redis_obj.set_data(key=name, val=1, expire=detect_interval - 5)
 
     @classmethod
     def save_msg_push(cls, **params):
@@ -152,8 +152,7 @@ class DevicePushService:
         uid = params['uid']
         push_kwargs = params['push_kwargs']
         now_time = int(time.time())
-        redis_r_obj = RedisObject(mode='r')
-        redis_obj_pipe = RedisObject().pipe
+        redis_obj = RedisObject()
         try:
             params['event_tag'] = cls.get_event_tag(params['ai_type'], params['event_type'], params['detection'])
             is_app_push = True if params['event_type'] in [606, 607] else \
@@ -201,7 +200,7 @@ class DevicePushService:
                         }
                         # 保存到redis列表
                         equipment_info_value = json.dumps(equipment_info_kwargs)
-                        redis_obj_pipe.rpush(equipment_info_key, equipment_info_value)
+                        redis_obj.rpush(equipment_info_key, equipment_info_value)
                     saved_user_id_list.append(user_id)
 
                 # 推送
@@ -250,12 +249,12 @@ class DevicePushService:
                 if params['event_type'] in [606, 607] or CONFIG_INFO != CONFIG_US:
                     end = 0
                     # 缓存数据多于100条,批量保存前100条,否则保存全部
-                    equipment_info_len = redis_r_obj.llen(equipment_info_key)
+                    equipment_info_len = redis_obj.llen(equipment_info_key)
                     end = 99 if equipment_info_len > 100 else equipment_info_len - 1
 
                     if end != 0:
-                        equipment_info_redis_list = redis_r_obj.lrange(equipment_info_key, 0, end)
-                        redis_obj_pipe.ltrim(equipment_info_key, end+1, -1)
+                        equipment_info_redis_list = redis_obj.lrange(equipment_info_key, 0, end)
+                        redis_obj.ltrim(equipment_info_key, end+1, -1)
 
                         for equipment_info in equipment_info_redis_list:
                             equipment_info_data = eval(equipment_info)
@@ -266,8 +265,6 @@ class DevicePushService:
                             equipment_info_list.append(equipment_info_model(**equipment_info_data))
                         equipment_info_model.objects.bulk_create(equipment_info_list)
 
-            redis_obj_pipe.execute()
-
             return True
         except Exception as e:
             LOGGING.info('推送消息或存表异常: error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))