# @Author : Rocky # @File : AlexaController.py # @Time : 2025/7/22 17:20 import threading import time from django.views import View from Model.models import AlexaPush from Object.ResponseObject import ResponseObject from AnsjerPush.config import CONFIG_INFO, CONFIG_EUR from Service.DevicePushService import DevicePushService class AlexaView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'ObjectDetectionOrSnapshot': return self.object_detection_or_snapshot(request_dict, response) else: return response.json(414) @classmethod def object_detection_or_snapshot(cls, request_dict, response): uid = request_dict.get('uid', None) channel = request_dict.get('channel', '1') event_type = request_dict.get('event_type', None) event_time = request_dict.get('event_time', None) if not all([uid, event_type, event_time]): return response.json(444) # 欧洲: OCI伦敦, 其他: OCI凤凰城 storage_location: int = 4 if CONFIG_INFO == CONFIG_EUR else 3 kwargs = { 'uid': uid, 'channel': channel, 'event_type': int(event_type), 'event_time': event_time, 'storage_location': storage_location, } # 异步推送消息和保存数据 threading.Thread( target=cls.save_data_and_send_alexa_event, kwargs=kwargs).start() kwargs.pop('event_type') res_data: str = DevicePushService.get_oci_req_url(**kwargs) return response.json(0, res_data) @staticmethod def save_data_and_send_alexa_event(**kwargs): kwargs['add_time'] = int(time.time()) AlexaPush.objects.create(**kwargs) # 请求Alexa服务器发送 ObjectDetection 或 Snapshot 事件