locky 4 týždňov pred
rodič
commit
13d9300bb8
1 zmenil súbory, kde vykonal 78 pridanie a 24 odobranie
  1. 78 24
      Service/CustomizedPushService.py

+ 78 - 24
Service/CustomizedPushService.py

@@ -87,14 +87,14 @@ class CustomizedPushObject:
                 filter(userID__data_joined__gte=m_year_ago, userID__data_joined__lte=n_year_ago). \
                 values_list('userID_id', flat=True)
 
-        user_id_list = list(device_info_qs)
+        user_id_list = list(set(device_info_qs))
 
         return user_id_list
 
     @classmethod
     def push_and_save_sys_msg(cls, **kwargs):
         """
-        推送和保存系统消息
+        推送和保存系统消息(优化版:支持百万级用户)
         @param kwargs: 参数
         @return:
         """
@@ -120,47 +120,101 @@ class CustomizedPushObject:
             app_bundle_id_list = ['com.ansjer.zccloud_ab', 'com.ansjer.customizede']
 
         try:
+            # 使用流式查询,避免一次性加载所有数据到内存
             gateway_push_qs = GatewayPush.objects.filter(
                 user_id__in=user_id_list, app_bundle_id__in=app_bundle_id_list). \
                 values('user_id', 'app_bundle_id', 'push_type', 'token_val')
-            if gateway_push_qs.exists():
-                sys_msg_list = []
-                saved_user_id_list = []
-                gateway_push_list = []
-                for gateway_push in gateway_push_qs:
-                    # user_id保存列表,避免重复写入数据
-                    user_id = gateway_push['user_id']
-                    user_email_sub_qs = UserEmailSubscriptions.objects.filter(
-                        user_id=user_id, push_sub_status=0)
-                    if user_id not in saved_user_id_list:
-                        saved_user_id_list.append(user_id)
-                        sys_msg_list.append(SysMsgModel(
-                            userID_id=user_id, title=title, msg=msg, jumpLink=link, addTime=n_time, updTime=n_time))
-                    if not user_email_sub_qs.exists():
-                        gateway_push_list.append(gateway_push)
-                # 保存系统消息和异步推送消息
-                SysMsgModel.objects.bulk_create(sys_msg_list)
+            
+            if not gateway_push_qs.exists():
+                CUSTOMIZED_PUSH_LOGGER.info('customized_push_id:{}没有需要推送的用户'.format(customized_push_id))
+                return
+
+            # 批量查询所有相关用户的邮件订阅状态,减少数据库查询
+            unsubscribed_user_ids = set(
+                UserEmailSubscriptions.objects.filter(
+                    user_id__in=user_id_list, push_sub_status=0
+                ).values_list('user_id', flat=True)
+            )
+
+            sys_msg_list = []
+            saved_user_id_set = set()  # 使用 set 提高查找效率
+            gateway_push_list = []
+            
+            batch_size = 1000  # 每批处理1000条数据
+            sys_msg_batch_size = 5000  # 系统消息批量插入大小
+            
+            # 使用 iterator() 流式查询,分批处理
+            for gateway_push in gateway_push_qs.iterator(chunk_size=batch_size):
+                user_id = gateway_push['user_id']
+                
+                # 使用 set 的 O(1) 查找,而不是 list 的 O(n) 查找
+                if user_id not in saved_user_id_set:
+                    saved_user_id_set.add(user_id)
+                    sys_msg_list.append(SysMsgModel(
+                        userID_id=user_id, title=title, msg=msg, jumpLink=link, 
+                        addTime=n_time, updTime=n_time))
+                    
+                    # 分批插入系统消息,避免单次插入过多
+                    if len(sys_msg_list) >= sys_msg_batch_size:
+                        SysMsgModel.objects.bulk_create(sys_msg_list, batch_size=sys_msg_batch_size)
+                        sys_msg_list = []
+                        CUSTOMIZED_PUSH_LOGGER.info(
+                            'customized_push_id:{}已保存{}条系统消息'.format(
+                                customized_push_id, len(saved_user_id_set)))
+                
+                # 检查用户是否取消订阅
+                if user_id not in unsubscribed_user_ids:
+                    gateway_push_list.append(gateway_push)
+            
+            # 保存剩余的系统消息
+            if sys_msg_list:
+                SysMsgModel.objects.bulk_create(sys_msg_list, batch_size=sys_msg_batch_size)
+            
+            CUSTOMIZED_PUSH_LOGGER.info(
+                'customized_push_id:{}共保存{}条系统消息,准备推送{}条'.format(
+                    customized_push_id, len(saved_user_id_set), len(gateway_push_list)))
+            
+            # 异步推送消息
+            if gateway_push_list:
                 pre_push_kwargs = {
                     'push_kwargs': push_kwargs,
-                    'gateway_push_list': gateway_push_list
+                    'gateway_push_list': gateway_push_list,
+                    'customized_push_id': customized_push_id
                 }
                 pre_push_thread = threading.Thread(
                     target=cls.thr_pool_push,
                     kwargs=pre_push_kwargs)
                 pre_push_thread.start()
-                CUSTOMIZED_PUSH_LOGGER.info('customized_push_id:{}推送完成'.format(customized_push_id))
+            
+            CUSTOMIZED_PUSH_LOGGER.info('customized_push_id:{}推送任务已启动'.format(customized_push_id))
         except Exception as e:
             CUSTOMIZED_PUSH_LOGGER.info('定制化推送或保存数据异常,'
                                         'error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
     @classmethod
     def thr_pool_push(cls, **kwargs):
-        CUSTOMIZED_PUSH_LOGGER.info('线程池推送开始')
+        """
+        线程池推送(优化版:限制并发数)
+        """
         push_kwargs = kwargs['push_kwargs']
         gateway_push_list = kwargs['gateway_push_list']
-        with ThreadPoolExecutor() as executor:
+        customized_push_id = kwargs.get('customized_push_id', 'unknown')
+        
+        CUSTOMIZED_PUSH_LOGGER.info(
+            'customized_push_id:{}线程池推送开始,共{}条'.format(customized_push_id, len(gateway_push_list)))
+        
+        # 限制最大线程数,避免创建过多线程导致系统资源耗尽
+        max_workers = 50  # 根据服务器性能调整
+        batch_count = 0
+        
+        with ThreadPoolExecutor(max_workers=max_workers) as executor:
             executor.map(
-                lambda gateway_push_kwargs: cls.start_push(push_kwargs, gateway_push_kwargs), gateway_push_list)
+                lambda gateway_push_kwargs: cls.start_push(push_kwargs, gateway_push_kwargs), 
+                gateway_push_list)
+            batch_count += 1
+        
+        CUSTOMIZED_PUSH_LOGGER.info(
+            'customized_push_id:{}线程池推送完成'.format(customized_push_id))
 
     @classmethod
     def start_push(cls, push_kwargs, gateway_push_kwargs):