# @Author : Rocky # @File : AlexaController.py # @Time : 2025/7/22 17:20 import threading import time from typing import Dict, Union import requests from django.views import View from Model.models import AlexaPush from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from AnsjerPush.config import CONFIG_INFO, CONFIG_EUR, ALEXA_DOMAIN, ERROR_INFO_LOGGER from Service.DevicePushService import DevicePushService class AlexaView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'ObjectDetectionOrSnapshot': return self.object_detection_or_snapshot(request_dict, response) else: return response.json(414) @classmethod def object_detection_or_snapshot(cls, request_dict, response): uid = request_dict.get('uid', None) channel = request_dict.get('channel', '1') event_type = request_dict.get('event_type', None) event_time = request_dict.get('event_time', None) if not all([uid, event_type, event_time]): return response.json(444) # 欧洲: OCI伦敦, 其他: OCI凤凰城 storage_location: int = 4 if CONFIG_INFO == CONFIG_EUR else 3 kwargs = { 'uid': uid, 'channel': channel, 'event_type': int(event_type), 'event_time': event_time, 'storage_location': storage_location, } # 保存数据和发送Alexa事件 thread = threading.Thread( target=cls.save_data_and_send_alexa_event, kwargs=kwargs) thread.start() kwargs.pop('event_type') res_data: str = DevicePushService.get_oci_req_url(**kwargs) return response.json(0, res_data) @staticmethod def save_data_and_send_alexa_event(**kwargs): uid: str = kwargs['uid'] try: now_time: int = int(time.time()) kwargs['add_time']: int = now_time AlexaPush.objects.create(**kwargs) # 请求Alexa服务器发送 ObjectDetection 或 Snapshot 事件 url: str = ALEXA_DOMAIN + 'snapshot/asynEventResponse' storage_location: str = kwargs['storage_location'] bucket: str = 'foreignpush' redis_obj: RedisObject = RedisObject() obj_name: str = '{}/{}/{}.jpeg'.format(uid, kwargs['channel'], kwargs['event_time']) image_uri: str = DevicePushService.oci_object_url( uid=uid, redis_obj=redis_obj, storage_location=storage_location, bucket=bucket, obj_name=obj_name) print(image_uri) # 快照 if kwargs['event_type'] == 0: data: Dict[str, Union[str, int]] = { 'uid': uid, 'image_uri': image_uri, 'time_sample': now_time } res = requests.post(url=url, data=data, timeout=30) assert res.status_code == 200 print(res.json()) except Exception as e: ERROR_INFO_LOGGER.info( '发送Alexa事件线程异常,uid:{},error_line:{},error_msg:{}'.format(uid, e.__traceback__.tb_lineno, repr(e)))