|
@@ -10,6 +10,7 @@ import datetime
|
|
|
import hashlib
|
|
|
import json
|
|
|
import logging
|
|
|
+import random
|
|
|
import threading
|
|
|
import time
|
|
|
|
|
@@ -18,16 +19,18 @@ import botocore
|
|
|
import oss2
|
|
|
import requests
|
|
|
from obs import ObsClient
|
|
|
+from oci.object_storage.models import CopyObjectDetails
|
|
|
|
|
|
from AnsjerPush.Config.aiConfig import DEVICE_EVENT_TYPE, ALGORITHM_COMBO_TYPES
|
|
|
from AnsjerPush.MessageConfig import EVENT_CONFIGS, DEFAULT_TEXTS, MSG_CONFIG
|
|
|
from AnsjerPush.config import CONFIG_INFO, CONFIG_CN, MULTI_CHANNEL_TYPE_LIST, SYS_EVENT_TYPE_LIST, AWS_ACCESS_KEY_ID, \
|
|
|
AWS_SECRET_ACCESS_KEY, EVENT_DICT, EVENT_DICT_CN, CONFIG_TEST, HUAWEICLOUD_AK, HUAWEICLOUD_SK, \
|
|
|
HUAWEICLOUD_OBS_SERVER, HUAWEICLOUD_PUSH_BUKET, OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET, JPUSH_UID_LIST, \
|
|
|
- DATA_PUSH_EVENT_TYPE_LIST, PRIMARY_USERS_PUSH_EVENT_TYPE_LIST
|
|
|
+ DATA_PUSH_EVENT_TYPE_LIST, PRIMARY_USERS_PUSH_EVENT_TYPE_LIST, OCI_NAMESPACE_NAME
|
|
|
from AnsjerPush.config import XMPUSH_CONFIG, OPPOPUSH_CONFIG, XM_PUSH_CHANNEL_ID, XM_PUSH_CHANNEL_DICT
|
|
|
from Model.models import UidPushModel, SysMsgModel, DeviceSharePermission, DeviceChannelUserSet, \
|
|
|
- DeviceChannelUserPermission, UidSetModel, Device_Info, UserAudioVideoPush, PushLog
|
|
|
+ DeviceChannelUserPermission, UidSetModel, Device_Info, UserAudioVideoPush, PushLog, TimeAlbum, AlbumMedia, \
|
|
|
+ AlbumTitle
|
|
|
from Object.ETkObject import ETkObject
|
|
|
from Object.OCIObjectStorage import OCIObjectStorage
|
|
|
from Object.RedisObject import RedisObject
|
|
@@ -337,6 +340,21 @@ class DevicePushService:
|
|
|
# 保存到redis列表
|
|
|
equipment_info_value = json.dumps(equipment_info_kwargs)
|
|
|
redis_obj.rpush(equipment_info_key, equipment_info_value)
|
|
|
+
|
|
|
+ # 保存到时光相册
|
|
|
+ cls.try_save_time_album(
|
|
|
+ redis_obj=redis_obj,
|
|
|
+ uid=params['uid'],
|
|
|
+ channel=params['channel'],
|
|
|
+ event_time=params['n_time'],
|
|
|
+ storage_location=params['storage_location'],
|
|
|
+ region=params['region'],
|
|
|
+ is_st=params['is_st'],
|
|
|
+ bucket=params['bucket'],
|
|
|
+ app_bundle_id=up['appBundleId'],
|
|
|
+ event_tag=params['event_tag']
|
|
|
+ )
|
|
|
+
|
|
|
saved_user_id_list.append(user_id)
|
|
|
close_old_connections()
|
|
|
# 写入系统消息
|
|
@@ -1341,3 +1359,233 @@ class DevicePushService:
|
|
|
('ar', 51): "تم اكتشاف تغيير في الصورة",
|
|
|
('ar', 57): "وجود شخص",
|
|
|
}
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def save_time_album(cls, uid, channel, event_time, storage_location, region, is_st, bucket):
|
|
|
+ """
|
|
|
+ 保存推送数据到时光相册
|
|
|
+ @param params: 推送参数
|
|
|
+ @return: bool
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ redis_obj = RedisObject()
|
|
|
+ # 转移存储桶图片
|
|
|
+ copy_res = cls.transfer_obs_image(uid, channel, event_time, storage_location, region, is_st, bucket)
|
|
|
+ if copy_res:
|
|
|
+ # 保存到时光相册表
|
|
|
+ dt = datetime.datetime.fromtimestamp(int(event_time))
|
|
|
+ dt_zero = datetime.datetime(dt.year, dt.month, dt.day)
|
|
|
+ album_date = int(dt_zero.timestamp())
|
|
|
+ time_album_qs = TimeAlbum.objects.filter(album_date=album_date, device_id=uid).values("id")
|
|
|
+ if time_album_qs.exists():
|
|
|
+ time_album_id = time_album_qs[0]['id']
|
|
|
+ AlbumMedia.objects.create(time_album_id=time_album_id, device_id=uid, event_time=event_time, channel=channel, storage_location=storage_location, created_time=int(time.time()), updated_time=int(time.time()))
|
|
|
+ else:
|
|
|
+ random_integer = random.randint(0, 1000)
|
|
|
+ album_title_qs = AlbumTitle.objects.filter(id=random_integer).values('album_title')
|
|
|
+ album_title = "生命里不可复制的每一瞬间"
|
|
|
+ if album_title_qs.exists():
|
|
|
+ album_title = album_title_qs[0]['album_title']
|
|
|
+ time_album = TimeAlbum.objects.create(album_title=album_title, album_date=album_date, device_id=uid, created_time=int(time.time()), updated_time=int(time.time()))
|
|
|
+ AlbumMedia.objects.create(time_album_id=time_album.id, device_id=uid, event_time=event_time, channel=channel, storage_location=storage_location, created_time=int(time.time()), updated_time=int(time.time()))
|
|
|
+ else:
|
|
|
+ # 清除redis
|
|
|
+ time_album_key = 'time_album_{}'.format(uid)
|
|
|
+ redis_obj.del_data(time_album_key)
|
|
|
+ LOGGING.info('设备{}:转存存储桶图片失败'.format(uid))
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ ERROR_INFO_LOGGER.error(
|
|
|
+ '{}保存推送数据到时光相册异常,errLine:{}, errMsg:{}'.format(uid, e.__traceback__.tb_lineno, repr(e)))
|
|
|
+ return False
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def check_and_update_time_album(redis_obj, uid, n_time, expire=3 * 60 * 60):
|
|
|
+ """
|
|
|
+ 使用缓存判断是否需要保存到时光相册
|
|
|
+ :param redis_obj: Redis对象
|
|
|
+ :param uid: uid
|
|
|
+ :param n_time: 新时间戳
|
|
|
+ :param expire: 过期时间(秒)
|
|
|
+ :return: 是否需要复制图片(is_copy)
|
|
|
+ """
|
|
|
+ is_copy = False
|
|
|
+ time_album_key = 'time_album_{}'.format(uid)
|
|
|
+ time_album_data = redis_obj.get_data(time_album_key)
|
|
|
+
|
|
|
+ if not time_album_data:
|
|
|
+ is_copy = True
|
|
|
+ redis_obj.set_data(time_album_key, n_time, expire)
|
|
|
+ else:
|
|
|
+ # 判断是否同一天,并更新值
|
|
|
+ now_time, time_album_data = int(n_time), int(time_album_data)
|
|
|
+ # 转换为北京时间(UTC+8)并判断是否为同一天
|
|
|
+ from datetime import datetime, timedelta
|
|
|
+ beijing_tz = timedelta(hours=8)
|
|
|
+ now_date = datetime.utcfromtimestamp(now_time) + beijing_tz
|
|
|
+ old_date = datetime.utcfromtimestamp(time_album_data) + beijing_tz
|
|
|
+
|
|
|
+ # 比较年月日是否相同
|
|
|
+ if now_date.date() != old_date.date():
|
|
|
+ is_copy = True
|
|
|
+ redis_obj.set_data(time_album_key, n_time, expire)
|
|
|
+
|
|
|
+ return is_copy
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def transfer_obs_image(cls, uid, channel, event_time, storage_location, region, is_st, bucket, max_retries=3, retry_delay=1):
|
|
|
+ """
|
|
|
+ 复制OBS存储桶中的图片到目标位置(带重试机制)
|
|
|
+
|
|
|
+ Args:
|
|
|
+ uid (str): 用户ID
|
|
|
+ channel (str): 频道名称
|
|
|
+ event_time (str): 事件时间戳
|
|
|
+ max_retries (int): 最大重试次数,默认3次
|
|
|
+ retry_delay (int): 重试间隔(秒),默认1秒
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ bool: True表示转移成功,False表示失败
|
|
|
+ """
|
|
|
+ if is_st == 1:
|
|
|
+ source_key = f'{uid}/{channel}/{event_time}.jpeg'
|
|
|
+ else:
|
|
|
+ source_key = f'{uid}/{channel}/{event_time}_1.jpg'
|
|
|
+
|
|
|
+ target_key = f'roomumy/albumMedia/{uid}/{channel}/{event_time}.jpeg'
|
|
|
+
|
|
|
+ if storage_location == 1:
|
|
|
+ return False
|
|
|
+
|
|
|
+ elif storage_location == 2:
|
|
|
+ # AWS S3
|
|
|
+ retry_count = 0
|
|
|
+ while retry_count < max_retries:
|
|
|
+ try:
|
|
|
+ aws_s3_client = cls.get_s3_client(region)
|
|
|
+ bucket = 'foreignpush' if region == 1 else 'push'
|
|
|
+ target_bucket = 'ansjerfilemanager'
|
|
|
+ response = aws_s3_client.copy_object(
|
|
|
+ CopySource={'Bucket': bucket, 'Key': source_key},
|
|
|
+ Bucket=target_bucket,
|
|
|
+ Key=target_key
|
|
|
+ )
|
|
|
+
|
|
|
+ status_code = response.get("ResponseMetadata", {}).get("HTTPStatusCode", 0)
|
|
|
+ if status_code < 300:
|
|
|
+ LOGGING.info(f'设备{uid}:AWS S3 图片转移成功')
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ LOGGING.warning(f'设备{uid}:AWS S3 图片转移失败,状态码:{status_code}')
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ LOGGING.error(f'设备{uid}:AWS S3 图片转移异常:{str(e)}')
|
|
|
+
|
|
|
+ retry_count += 1
|
|
|
+ time.sleep(retry_delay)
|
|
|
+
|
|
|
+ LOGGING.error(f'设备{uid}:AWS S3 图片转移失败,已达最大重试次数{max_retries}次')
|
|
|
+ return False
|
|
|
+
|
|
|
+ elif storage_location in [3, 4]:
|
|
|
+ # Oracle OCI
|
|
|
+ region_str = 'eur' if storage_location == 4 else 'us'
|
|
|
+ retry_count = 0
|
|
|
+ while retry_count < max_retries:
|
|
|
+ try:
|
|
|
+ oci = OCIObjectStorage(region_str)
|
|
|
+ oci_client = oci.object_storage_client
|
|
|
+ copy_details = CopyObjectDetails(
|
|
|
+ source_object_name=source_key,
|
|
|
+ destination_bucket="app",
|
|
|
+ destination_object_name=target_key
|
|
|
+ )
|
|
|
+
|
|
|
+ response = oci_client.copy_object(
|
|
|
+ namespace_name=OCI_NAMESPACE_NAME,
|
|
|
+ bucket_name=bucket,
|
|
|
+ copy_object_details=copy_details
|
|
|
+ )
|
|
|
+
|
|
|
+ if response.status < 300:
|
|
|
+ LOGGING.info(f'设备{uid}:OCI 图片转移成功')
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ LOGGING.warning(f'设备{uid}:OCI 图片转移失败,状态码:{response.status}')
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ LOGGING.error(f'设备{uid}:OCI 图片转移异常:{str(e)}')
|
|
|
+
|
|
|
+ retry_count += 1
|
|
|
+ time.sleep(retry_delay)
|
|
|
+
|
|
|
+ LOGGING.error(f'设备{uid}:OCI 图片转移失败,已达最大重试次数{max_retries}次')
|
|
|
+ return False
|
|
|
+
|
|
|
+ elif storage_location == 5:
|
|
|
+ # 华为云 OBS
|
|
|
+ obs_client = ObsClient(
|
|
|
+ access_key_id=HUAWEICLOUD_AK,
|
|
|
+ secret_access_key=HUAWEICLOUD_SK,
|
|
|
+ server=HUAWEICLOUD_OBS_SERVER
|
|
|
+ )
|
|
|
+
|
|
|
+ target_bucket = 'asj-app'
|
|
|
+ retry_count = 0
|
|
|
+ while retry_count < max_retries:
|
|
|
+ try:
|
|
|
+ head_res = obs_client.headObject(HUAWEICLOUD_PUSH_BUKET, source_key)
|
|
|
+ if head_res.status >= 300:
|
|
|
+ LOGGING.debug(f'设备{uid}:源图片不存在,第{retry_count + 1}次重试...')
|
|
|
+ retry_count += 1
|
|
|
+ time.sleep(retry_delay)
|
|
|
+ continue
|
|
|
+
|
|
|
+ copy_res = obs_client.copyObject(
|
|
|
+ sourceBucketName=HUAWEICLOUD_PUSH_BUKET,
|
|
|
+ sourceObjectKey=source_key,
|
|
|
+ destBucketName=target_bucket,
|
|
|
+ destObjectKey=target_key
|
|
|
+ )
|
|
|
+
|
|
|
+ if copy_res.status < 300:
|
|
|
+ LOGGING.info(f'设备{uid}:图片转移成功')
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ LOGGING.warning(f'设备{uid}:图片转移失败,状态码:{copy_res.status}')
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ LOGGING.error(f'设备{uid}:转移图片异常:{str(e)}')
|
|
|
+
|
|
|
+ retry_count += 1
|
|
|
+ time.sleep(retry_delay)
|
|
|
+
|
|
|
+ LOGGING.error(f'设备{uid}:图片转移失败,已达最大重试次数{max_retries}次')
|
|
|
+ return False
|
|
|
+
|
|
|
+ else:
|
|
|
+ return False
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def try_save_time_album(cls, redis_obj, uid, channel, event_time, storage_location, region, is_st, bucket,
|
|
|
+ app_bundle_id, event_tag):
|
|
|
+ """
|
|
|
+ 判断是否需要保存到时光相册,并启动子线程执行
|
|
|
+ """
|
|
|
+ if app_bundle_id not in ["com.ansjer.customizede", "com.ansjer.zccloud_ab"]:
|
|
|
+ return
|
|
|
+
|
|
|
+ # 检查是否满足保存条件
|
|
|
+ event_tag_list = event_tag.split(",")
|
|
|
+ if is_st not in [1, 3] or "57" not in event_tag_list:
|
|
|
+ return
|
|
|
+
|
|
|
+ is_copy = cls.check_and_update_time_album(redis_obj, uid, event_time, expire=3 * 60 * 60)
|
|
|
+ if not is_copy:
|
|
|
+ return
|
|
|
+
|
|
|
+ # 启动线程保存
|
|
|
+ threading.Thread(
|
|
|
+ target=cls.save_time_album,
|
|
|
+ args=(uid, channel, event_time, storage_location, region, is_st, bucket)
|
|
|
+ ).start()
|