# @Author : Rocky # @File : CustomizedPushService.py # @Time : 2023/10/19 15:49 import logging import threading import time from concurrent.futures import ThreadPoolExecutor from AnsjerPush.config import CONFIG_INFO, CONFIG_TEST, CONFIG_CN, XM_PUSH_CHANNEL_ID from Model.models import DeviceTypeModel, Device_Info, GatewayPush, CountryModel, SysMsgModel from Service.CommonService import CommonService from Service.HuaweiPushService.HuaweiPushService import HuaweiPushObject from Service.PushService import PushObject CUSTOMIZED_PUSH_LOGGER = logging.getLogger('customized_push') class CustomizedPushObject: @staticmethod def query_push_user(device_name, country, register_period): """ 查询需要推送的用户id列表 @param device_name: 设备型号 @param country: 国家 @param register_period: 用户注册年限 @return: uid_id_list """ # 设备型号和国家 device_name_list = device_name.split(',') device_type_list = DeviceTypeModel.objects.filter(name__in=device_name_list).values_list('type', flat=True) # 测试和国内服推给所有用户 if CONFIG_INFO in [CONFIG_TEST, CONFIG_CN]: device_info_qs = Device_Info.objects.filter(Type__in=device_type_list) else: country_name_list = country.split(',') country_id_list = CountryModel.objects.filter(country_name__in=country_name_list).\ values_list('id', flat=True) device_info_qs = Device_Info.objects.filter(Type__in=device_type_list, userID__region_country__in=country_id_list) # 获取时间范围 now_time = int(time.time()) index = register_period.find('-') n, m = register_period[:index], register_period[index + 1:] if m == '': # 0-,所有时间 if n == '0': device_info_qs = device_info_qs.values_list('userID_id', flat=True) # n-,n年以上 else: # n年前时间戳转时间字符串 n_years_seconds = int(n) * 365 * 24 * 60 * 60 n_year_ago_timestamp = now_time - n_years_seconds n_year_ago = CommonService.timestamp_to_str(n_year_ago_timestamp) # 注册时间越小越早 device_info_qs = device_info_qs.filter(userID__data_joined__lte=n_year_ago). \ values_list('userID_id', flat=True) else: # n-m年,(如2-3年) n_years_seconds, m_years_seconds = int(n) * 365 * 24 * 60 * 60, int(m) * 365 * 24 * 60 * 60 n_year_ago_timestamp = now_time - n_years_seconds m_year_ago_timestamp = now_time - m_years_seconds # 时间戳转时间字符串 n_year_ago = CommonService.timestamp_to_str(n_year_ago_timestamp) # 2021 m_year_ago = CommonService.timestamp_to_str(m_year_ago_timestamp) # 2020 # 2020 <= 注册时间 <= 2021 device_info_qs = device_info_qs. \ filter(userID__data_joined__gte=m_year_ago, userID__data_joined__lte=n_year_ago). \ values_list('userID_id', flat=True) user_id_list = list(device_info_qs) return user_id_list @classmethod def push_and_save_sys_msg(cls, **kwargs): """ 推送和保存系统消息 @param kwargs: 参数 @return: """ customized_push_id = kwargs['id'] user_id_list = kwargs['user_id_list'] title = kwargs['title'] msg = kwargs['msg'] link = kwargs['link'] icon_link = kwargs['icon_link'] if kwargs['icon_link'] != '' else None n_time = int(time.time()) push_kwargs = { 'n_time': n_time, 'title': title, 'msg': msg, 'icon_link': icon_link } # 推送 if kwargs['push_app'] == 'ZosiSmart': app_bundle_id_list = ['com.ansjer.zccloud_a', 'com.ansjer.zccloud'] else: app_bundle_id_list = ['com.ansjer.zccloud_ab', 'com.ansjer.customizede'] try: gateway_push_qs = GatewayPush.objects.filter( user_id__in=user_id_list, app_bundle_id__in=app_bundle_id_list).\ values('user_id', 'app_bundle_id', 'push_type', 'token_val') if gateway_push_qs.exists(): sys_msg_list = [] saved_user_id_list = [] gateway_push_list = [] for gateway_push in gateway_push_qs: # user_id保存列表,避免重复写入数据 user_id = gateway_push['user_id'] if user_id not in saved_user_id_list: saved_user_id_list.append(user_id) sys_msg_list.append(SysMsgModel( userID_id=user_id, title=title, msg=msg, jumpLink=link, addTime=n_time, updTime=n_time)) gateway_push_list.append(gateway_push) # 保存系统消息和异步推送消息 SysMsgModel.objects.bulk_create(sys_msg_list) pre_push_kwargs = { 'push_kwargs': push_kwargs, 'gateway_push_list': gateway_push_list } pre_push_thread = threading.Thread( target=cls.thr_pool_push, kwargs=pre_push_kwargs) pre_push_thread.start() CUSTOMIZED_PUSH_LOGGER.info('customized_push_id:{}推送完成'.format(customized_push_id)) except Exception as e: CUSTOMIZED_PUSH_LOGGER.info('定制化推送或保存数据异常,' 'error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def thr_pool_push(cls, **kwargs): CUSTOMIZED_PUSH_LOGGER.info('线程池推送开始') push_kwargs = kwargs['push_kwargs'] gateway_push_list = kwargs['gateway_push_list'] with ThreadPoolExecutor() as executor: executor.map( lambda gateway_push_kwargs: cls.start_push(push_kwargs, gateway_push_kwargs), gateway_push_list) @classmethod def start_push(cls, push_kwargs, gateway_push_kwargs): title = push_kwargs['title'] n_time = push_kwargs['n_time'] msg = push_kwargs['msg'] icon_link = push_kwargs['icon_link'] push_type = gateway_push_kwargs['push_type'] user_id = gateway_push_kwargs['user_id'] app_bundle_id = gateway_push_kwargs['app_bundle_id'] token_val = gateway_push_kwargs['token_val'] push_succeed = cls.push_msg(push_type, app_bundle_id, token_val, n_time, title, msg, icon_link) push_status = '成功' if push_succeed else '失败' CUSTOMIZED_PUSH_LOGGER.info('{}推送{},push_type:{}'.format(user_id, push_status, push_type)) @staticmethod def push_msg(push_type, app_bundle_id, token_val, n_time, title, msg, icon_link): push_kwargs = { 'nickname': '', 'event_type': 0, 'app_bundle_id': app_bundle_id, 'token_val': token_val, 'msg_title': title, 'msg_text': msg, 'n_time': n_time, } try: # ios if push_type == 0: push_kwargs['launch_image'] = icon_link return PushObject.ios_apns_push(**push_kwargs) # gcm elif push_type == 1: if icon_link is None: icon_link = '' push_kwargs['image'] = icon_link return PushObject.android_fcm_push_v1(**push_kwargs) # 极光 elif push_type == 2: return PushObject.android_jpush(**push_kwargs) # 华为 elif push_type == 3: push_kwargs['image_url'] = icon_link huawei_push_object = HuaweiPushObject() return huawei_push_object.send_push_notify_message(**push_kwargs) # 小米 elif push_type == 4: push_kwargs['channel_id'] = XM_PUSH_CHANNEL_ID['service_reminder'] return PushObject.android_xmpush(**push_kwargs) # vivo elif push_type == 5: return PushObject.android_vivopush(**push_kwargs) # oppo elif push_type == 6: push_kwargs['channel_id'] = 'VALUE_ADDED' return PushObject.android_oppopush(**push_kwargs) # 魅族 elif push_type == 7: return PushObject.android_meizupush(**push_kwargs) else: return False except Exception as e: CUSTOMIZED_PUSH_LOGGER.info('定制化推送异常,' 'error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return False