# @Author : Rocky # @File : CronTaskController.py # @Time : 2023/12/11 15:47 import threading from django.db import close_old_connections from django.views import View from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Service.EquipmentInfoService import EQUIPMENT_INFO_DICT, EQUIPMENT_INFO_KEY_LIST # 每次写入的数据条数 size = 10000 class CronTaskView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'equipment_info': return self.equipment_info(response) else: return response.json(414) @classmethod def equipment_info(cls, response): for equipment_info_key in EQUIPMENT_INFO_KEY_LIST: # 异步保存数据 kwargs = { 'equipment_info_key': equipment_info_key } push_thread = threading.Thread( target=cls.save_equipment_info, kwargs=kwargs) push_thread.start() return response.json(0) @staticmethod def save_equipment_info(**kwargs): equipment_info_key = kwargs['equipment_info_key'] equipment_info_model = EQUIPMENT_INFO_DICT[equipment_info_key] # 读取缓存的前n条数据批量写入 redis_obj = RedisObject() equipment_info_redis_list = redis_obj.lrange(equipment_info_key, 0, size - 1) redis_obj.ltrim(equipment_info_key, size, -1) equipment_info_list = [] for equipment_info in equipment_info_redis_list: equipment_info_data = eval(equipment_info) # 设备昵称存在表情,解码utf-8 if equipment_info_data.get('device_nick_name') is not None: equipment_info_data['device_nick_name'] = equipment_info_data['device_nick_name']. \ encode('UTF-8', 'ignore').decode('UTF-8') equipment_info_list.append(equipment_info_model(**equipment_info_data)) equipment_info_model.objects.bulk_create(equipment_info_list) close_old_connections()