# @Author : Rocky # @File : InitController.py # @Time : 2023/4/11 17:26 import json import logging import os import time import apns2 from _ssl import SSLError from django.http import HttpResponse from django.views import View from AnsjerPush.config import CONFIG_INFO from Model.models import Device_Info, SceneLog, EquipmentInfo1 from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from AnsjerPush.config import APP_BUNDLE_DICT, APNS_MODE, BASE_DIR, APNS_CONFIG, DATA_PUSH_EVENT_TYPE_LIST from Object.S3Email import S3Email from Service.CommonService import CommonService ERROR_INFO_LOGGER = logging.getLogger('error_info') class InitView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, operation) def validation(self, request_dict, operation): if operation == 'health-check': # 负载均衡器健康检测接口 return self.health_check(request_dict) if operation == 'checkApnsPem': # 检查🍎推送证书 return self.check_apns_pem() elif operation == 'oci_redis_test': return self.oci_redis_test(request_dict) elif operation == 'oci_redis_test_2': return self.oci_redis_test_2(request_dict) @staticmethod def health_check(request_dict): try: redis_obj = RedisObject() redis_obj.set_data('health_check', 1) response = ResponseObject() Device_Info.objects.filter().values('id').first() SceneLog.objects.filter().values('id').first() return response.json(0) except Exception as e: ERROR_INFO_LOGGER.info( '健康检查接口异常,error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return HttpResponse(repr(e), status=500) @staticmethod def check_apns_pem(): response = ResponseObject() try: server_id = os.environ.get('ServerID', 0) n_time = int(time.time()) uid, msg_title, msg_text, launch_image, event_type, channel, nickname, token_val = \ '', '', '', '', 1, '', '', '' app_bundle_id_list = [ 'com.ansjer.zccloud', 'com.ansjer.loocamccloud', 'com.cloudlife.commissionf', 'com.ansjer.customizeda', 'com.ansjer.customizedb', 'com.ansjer.customizedc', 'com.ansjer.customizedd', 'com.ansjer.customizede' ] for app_bundle_id in app_bundle_id_list: pem_path = os.path.join(BASE_DIR, APNS_CONFIG[app_bundle_id]['pem_path']) cli = apns2.APNSClient(mode=APNS_MODE, client_cert=pem_path) alert = apns2.PayloadAlert(title=msg_title, body=msg_text, launch_image=launch_image) # 跳转类型,1:推送消息,2:系统消息,3:音视频通话消息 jump_type = CommonService.get_jump_type(event_type) push_data = { 'alert': msg_text, 'msg': '', 'sound': '', 'zpush': '1', 'uid': uid, 'channel': channel, 'received_at': n_time, 'event_time': n_time, 'event_type': event_type, 'nickname': nickname, 'image_url': launch_image, 'jump_type': jump_type } sound = 'call_phone.mp3' if event_type in DATA_PUSH_EVENT_TYPE_LIST else 'default' payload = apns2.Payload(alert=alert, custom=push_data, sound=sound, category='myCategory', mutable_content=True) n = apns2.Notification(payload=payload, priority=apns2.PRIORITY_LOW) try: res = cli.push(n=n, device_token=token_val, topic=app_bundle_id) except SSLError as e: email_content = '{}服第{}台服务器苹果推送证书过期: {}'.format(CONFIG_INFO, server_id, pem_path) S3Email().faEmail(email_content, 'servers@ansjer.com') return response.json(0) except Exception as e: return response.json(0, 'error_line:{},error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def oci_redis_test(request_dict): try: key = request_dict.get('key', None) value = request_dict.get('value', None) response = ResponseObject() redis_obj = RedisObject() redis_obj.set_data(key, value, 60) redis_value = redis_obj.get_data(key) res = { 'redis_value': redis_value } return response.json(0, res) except Exception as e: return HttpResponse(repr(e), status=500) @staticmethod def oci_redis_test_2(request_dict): try: redis_obj = RedisObject() pipe = redis_obj.CONN.pipeline() response = ResponseObject() equipment_info_kwargs = { 'device_user_id': '163417566733313800138000', 'event_time': int(time.time()), 'event_type': 0, 'device_uid': 'H47UZJ7PHY2NXKNW111A', 'device_nick_name': 'redis_test', 'channel': 1, 'alarm': 'Motion', 'is_st': 0, 'add_time': int(time.time()), 'storage_location': 1, 'event_tag': '', 'answer_status': 0 } # 保存到redis列表 equipment_info_value = json.dumps(equipment_info_kwargs) equipment_info_key = 'equipment_info' pipe.rpush(equipment_info_key, equipment_info_value) pipe.rpush(equipment_info_key, equipment_info_value) pipe.execute() equipment_info_redis_list = redis_obj.lrange(equipment_info_key, 0, 99) redis_obj.ltrim(equipment_info_key, 100, -1) equipment_info_list = [] for equipment_info in equipment_info_redis_list: equipment_info_data = eval(equipment_info) # 设备昵称存在表情,解码utf-8 if equipment_info_data.get('device_nick_name') is not None: equipment_info_data['device_nick_name'] = equipment_info_data['device_nick_name']. \ encode('UTF-8', 'ignore').decode('UTF-8') equipment_info_list.append(EquipmentInfo1(**equipment_info_data)) EquipmentInfo1.objects.bulk_create(equipment_info_list) return response.json(0) except Exception as e: return HttpResponse(repr(e), status=500)