# @Author : Rocky # @File : InitController.py # @Time : 2023/4/11 17:26 import json import ssl import time from django.http import HttpResponse from django.views import View from redis.connection import SSLConnection from Model.models import Device_Info, SceneLog, EquipmentInfo1 from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject import redis class InitView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, operation) def validation(self, request_dict, operation): if operation == 'health-check': # 负载均衡器健康检测接口 return self.health_check(request_dict) elif operation == 'oci_redis_test': return self.oci_redis_test(request_dict) elif operation == 'oci_redis_test_2': return self.oci_redis_test_2(request_dict) @staticmethod def health_check(request_dict): try: redis_obj = RedisObject() redis_obj.set_data('health_check', 1) response = ResponseObject() Device_Info.objects.filter().values('id').first() SceneLog.objects.filter().values('id').first() return response.json(0) except Exception as e: return HttpResponse(repr(e), status=500) @staticmethod def oci_redis_test(request_dict): try: key = request_dict.get('key', None) value = request_dict.get('value', None) response = ResponseObject() redis_host = 'amaaaaaayszequiamxr7cdpparig3ptmytvde5vvnz6n7gceo4232sbhhlsa-p.redis.us-phoenix-1.oci.oraclecloud.com' # 创建Redis连接 pool = redis.ConnectionPool(connection_class=SSLConnection, host=redis_host, port=6379, db=0) # redis_client = redis.Redis(connection_pool=pool, ssl=True, ssl_cert_reqs=None) redis_client = redis.StrictRedis( connection_pool=pool, host=redis_host, ssl=True, ssl_cert_reqs=None ) pipe = redis_client.pipeline() pipe.set(key, value, 60) pipe.execute() redis_r_obj = RedisObject(mode='r') redis_value = redis_r_obj.get_data(key) redis_r_obj.CONN.close() res = { 'redis_value': redis_value } return response.json(0, res) except Exception as e: return HttpResponse(repr(e), status=500) @staticmethod def oci_redis_test_2(request_dict): try: redis_obj = RedisObject() pipe = redis_obj.CONN.pipeline() response = ResponseObject() equipment_info_kwargs = { 'device_user_id': '163417566733313800138000', 'event_time': int(time.time()), 'event_type': 0, 'device_uid': 'H47UZJ7PHY2NXKNW111A', 'device_nick_name': 'redis_test', 'channel': 1, 'alarm': 'Motion', 'is_st': 0, 'add_time': int(time.time()), 'storage_location': 1, 'event_tag': '', 'answer_status': 0 } # 保存到redis列表 equipment_info_value = json.dumps(equipment_info_kwargs) equipment_info_key = 'equipment_info' pipe.rpush(equipment_info_key, equipment_info_value) pipe.rpush(equipment_info_key, equipment_info_value) pipe.execute() redis_r_obj = RedisObject(mode='r') equipment_info_redis_list = redis_r_obj.lrange(equipment_info_key, 0, 99) redis_obj.ltrim(equipment_info_key, 100, -1) equipment_info_list = [] for equipment_info in equipment_info_redis_list: equipment_info_data = eval(equipment_info) # 设备昵称存在表情,解码utf-8 if equipment_info_data.get('device_nick_name') is not None: equipment_info_data['device_nick_name'] = equipment_info_data['device_nick_name']. \ encode('UTF-8', 'ignore').decode('UTF-8') equipment_info_list.append(EquipmentInfo1(**equipment_info_data)) EquipmentInfo1.objects.bulk_create(equipment_info_list) return response.json(0) except Exception as e: return HttpResponse(repr(e), status=500)