12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- # @Author : Rocky
- # @File : AlexaController.py
- # @Time : 2025/7/22 17:20
- import threading
- import time
- from typing import Dict, Union
- import requests
- from django.views import View
- from Model.models import AlexaPush
- from Object.RedisObject import RedisObject
- from Object.ResponseObject import ResponseObject
- from AnsjerPush.config import CONFIG_INFO, CONFIG_EUR, ALEXA_DOMAIN, ERROR_INFO_LOGGER
- from Service.DevicePushService import DevicePushService
- class AlexaView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation == 'ObjectDetectionOrSnapshot':
- return self.object_detection_or_snapshot(request_dict, response)
- else:
- return response.json(414)
- @classmethod
- def object_detection_or_snapshot(cls, request_dict, response):
- uid = request_dict.get('uid', None)
- channel = request_dict.get('channel', '1')
- event_type = request_dict.get('event_type', None)
- event_time = request_dict.get('event_time', None)
- if not all([uid, event_type, event_time]):
- return response.json(444)
- # 欧洲: OCI伦敦, 其他: OCI凤凰城
- storage_location: int = 4 if CONFIG_INFO == CONFIG_EUR else 3
- kwargs = {
- 'uid': uid,
- 'channel': channel,
- 'event_type': int(event_type),
- 'event_time': event_time,
- 'storage_location': storage_location,
- }
- # 保存数据和发送Alexa事件
- thread = threading.Thread(
- target=cls.save_data_and_send_alexa_event,
- kwargs=kwargs)
- thread.start()
- kwargs.pop('event_type')
- res_data: str = DevicePushService.get_oci_req_url(**kwargs)
- return response.json(0, res_data)
- @staticmethod
- def save_data_and_send_alexa_event(**kwargs):
- uid: str = kwargs['uid']
- try:
- now_time: int = int(time.time())
- kwargs['add_time']: int = now_time
- AlexaPush.objects.create(**kwargs)
- # 请求Alexa服务器发送 ObjectDetection 或 Snapshot 事件
- url: str = ALEXA_DOMAIN + 'snapshot/asynEventResponse'
- storage_location: str = kwargs['storage_location']
- bucket: str = 'foreignpush'
- redis_obj: RedisObject = RedisObject()
- obj_name: str = '{}/{}/{}.jpeg'.format(uid, kwargs['channel'], kwargs['event_time'])
- image_uri: str = DevicePushService.oci_object_url(
- uid=uid, redis_obj=redis_obj, storage_location=storage_location, bucket=bucket, obj_name=obj_name)
- print(image_uri)
- # 快照
- if kwargs['event_type'] == 0:
- data: Dict[str, Union[str, int]] = {
- 'uid': uid,
- 'image_uri': image_uri,
- 'time_sample': now_time
- }
- res = requests.post(url=url, data=data, timeout=30)
- assert res.status_code == 200
- print(res.json())
- except Exception as e:
- ERROR_INFO_LOGGER.info(
- '发送Alexa事件线程异常,uid:{},error_line:{},error_msg:{}'.format(uid, e.__traceback__.tb_lineno, repr(e)))
|