瀏覽代碼

测试/国内服推送图片AWS转阿里云

locky 3 周之前
父節點
當前提交
f1c70886f8
共有 4 個文件被更改,包括 118 次插入4 次删除
  1. 4 0
      AnsjerPush/config.py
  2. 16 2
      Controller/DetectControllerV2.py
  3. 5 1
      Object/enums/RedisKeyConstant.py
  4. 93 1
      Service/DevicePushService.py

+ 4 - 0
AnsjerPush/config.py

@@ -30,6 +30,10 @@ JPUSH_UID_LIST = 'jpush_uid_list'
 HUAWEICLOUD_OBS_SERVER = 'https://obs.cn-east-3.myhuaweicloud.com'
 HUAWEICLOUD_PUSH_BUKET = 'asj-push'
 
+# 阿里云存储每日配额限制(测试/国内服)
+# 优先从 Redis 的 'push:alicloud:daily:quota:limit' 读取,如果 Redis 中未设置则使用此默认值
+ALICLOUD_OSS_DAILY_QUOTA = 10000  # 默认每天1万条
+
 # 阿里云发邮箱
 ALY_SES_ACCESS_NAME = 'message@dvema.com'
 ALY_SES_ACCESS_PAW = 'SMtp123456'

+ 16 - 2
Controller/DetectControllerV2.py

@@ -5,7 +5,8 @@ import threading
 from django.http import JsonResponse
 from django.views.generic.base import View
 
-from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_US, UNRESTRICTED_FREQUENCY_PUSH_EVENT_TYPE_LIST
+from AnsjerPush.config import CONFIG_EUR, CONFIG_INFO, CONFIG_US, UNRESTRICTED_FREQUENCY_PUSH_EVENT_TYPE_LIST, \
+    CONFIG_TEST, CONFIG_CN
 from Object.RedisObject import RedisObject
 from Object.enums.RedisKeyConstant import RedisKeyConstant
 from Service.DevicePushService import DevicePushService
@@ -131,7 +132,7 @@ class NotificationV2View(View):
                 'event_type': event_type,
                 'n_time': n_time,
             }
-            # 对象存储区域, 测试/国内: 旧:aws(2),新:华为云(5),设备传参, 美洲: oci凤凰城(3), 欧洲: oci伦敦(4)
+            # 对象存储区域, 测试/国内: 旧:aws(2),新:阿里云(1),华为云(5),设备传参, 美洲: oci凤凰城(3), 欧洲: oci伦敦(4)
             if CONFIG_INFO == CONFIG_US:
                 storage_location = 3
             elif CONFIG_INFO == CONFIG_EUR:
@@ -143,6 +144,19 @@ class NotificationV2View(View):
                         storage_location = int(config_value)
                 except Exception as e:
                     TIME_LOGGER.error('获取缓存推送存储位置异常uid:{},error:{}'.format(uid, repr(e)))
+            elif CONFIG_INFO in [CONFIG_TEST, CONFIG_CN]:
+                # 测试/国内服: 优先使用设备传参,否则默认阿里云
+                if storage_location is not None:
+                    storage_location = int(storage_location)
+                else:
+                    storage_location = 1  # 默认阿里云
+
+                # 阿里云存储配额控制:如果是阿里云(storage_location=1)且配额已满,则降级到AWS
+                if storage_location == 1:
+                    if not DevicePushService.check_and_use_alicloud_quota(redis_obj, uid, event_type):
+                        # 配额已满,降级到AWS
+                        storage_location = 2
+                        TIME_LOGGER.info('阿里云配额已满,uid:{},降级到AWS'.format(uid))
             else:
                 if storage_location is not None:
                     storage_location = int(storage_location)

+ 5 - 1
Object/enums/RedisKeyConstant.py

@@ -3,4 +3,8 @@ from enum import Enum
 
 class RedisKeyConstant(Enum):
     # 设备版本信息
-    PUSH_STORAGE_CONFIG_UID = 'push:storage:config:uid:'
+    PUSH_STORAGE_CONFIG_UID = 'push:storage:config:uid:'
+    # 阿里云存储每日配额计数器
+    ALICLOUD_DAILY_QUOTA_COUNTER = 'push:alicloud:daily:quota:counter:'
+    # 阿里云存储每日配额限制值
+    ALICLOUD_DAILY_QUOTA_LIMIT = 'push:alicloud:daily:quota:limit'

+ 93 - 1
Service/DevicePushService.py

@@ -24,7 +24,7 @@ from AnsjerPush.MessageConfig import EVENT_CONFIGS, DEFAULT_TEXTS, MSG_CONFIG
 from AnsjerPush.config import CONFIG_INFO, CONFIG_CN, MULTI_CHANNEL_TYPE_LIST, SYS_EVENT_TYPE_LIST, \
     EVENT_DICT, EVENT_DICT_CN, CONFIG_TEST, \
     HUAWEICLOUD_OBS_SERVER, HUAWEICLOUD_PUSH_BUKET, JPUSH_UID_LIST, \
-    DATA_PUSH_EVENT_TYPE_LIST, PRIMARY_USERS_PUSH_EVENT_TYPE_LIST, BASE_STATION_TYPE_LIST
+    DATA_PUSH_EVENT_TYPE_LIST, PRIMARY_USERS_PUSH_EVENT_TYPE_LIST, BASE_STATION_TYPE_LIST, ALICLOUD_OSS_DAILY_QUOTA
 from AnsjerPush.config import XMPUSH_CONFIG, OPPOPUSH_CONFIG, XM_PUSH_CHANNEL_ID, XM_PUSH_CHANNEL_DICT
 from Model.models import UidPushModel, SysMsgModel, DeviceSharePermission, DeviceChannelUserSet, \
     DeviceChannelUserPermission, UidSetModel, Device_Info, UserAudioVideoPush, PushLog, UidChannelSetModel
@@ -34,6 +34,7 @@ from Object.RedisObject import RedisObject
 from Object.UidTokenObject import UidTokenObject
 from Object.enums.ConstantEnum import ConstantEnum
 from Object.enums.EventTypeEnum import EventTypeEnumObj
+from Object.enums.RedisKeyConstant import RedisKeyConstant
 from Object.utils import LocalDateTimeUtil
 from Service.CommonService import CommonService
 from Service.EquipmentInfoService import EquipmentInfoService, EQUIPMENT_INFO_DICT
@@ -1102,6 +1103,74 @@ class DevicePushService:
             return ''
         return result.full_path + result.object_name
 
+    @staticmethod
+    def check_and_use_alicloud_quota(redis_obj, uid, event_type):
+        """
+        检查并使用阿里云存储配额
+        @param redis_obj: Redis对象
+        @param uid: uid
+        @param event_type: 事件类型
+        @return: True 可以使用阿里云, False 配额已满需要降级到AWS
+        """
+        try:
+            # 从 Redis 获取配额限制值,如果不存在则使用默认值
+            quota_limit_key = RedisKeyConstant.ALICLOUD_DAILY_QUOTA_LIMIT.value
+            quota_limit = redis_obj.get_data(key=quota_limit_key)
+
+            if quota_limit:
+                quota_limit = int(quota_limit)
+            else:
+                # 默认配额限制,如果 Redis 中没有设置则使用这个值
+                quota_limit = ALICLOUD_OSS_DAILY_QUOTA
+
+            # 获取当前日期作为key的一部分
+            current_date = datetime.datetime.now().strftime('%Y%m%d')
+            quota_key = RedisKeyConstant.ALICLOUD_DAILY_QUOTA_COUNTER.value + current_date
+
+            # 获取当前计数
+            current_count = redis_obj.get_data(key=quota_key)
+
+            if current_count is None:
+                # 首次使用,设置计数为1,过期时间为当天结束(86400秒)
+                redis_obj.set_data(key=quota_key, val=1, expire=86400)
+                return True
+
+            current_count = int(current_count)
+
+            # 检查是否超过配额
+            if current_count >= quota_limit:
+                LOGGING.info('阿里云存储配额已满,当前:{},限制:{},降级到AWS'.format(
+                    current_count, quota_limit))
+                return False
+
+            # 增加计数
+            redis_obj.incr(quota_key)
+            LOGGING.info('阿里云存储uid:{}, event_type{}'.format(uid, event_type))
+            return True
+
+        except Exception as e:
+            ERROR_INFO_LOGGER.info('检查阿里云配额异常,error_line:{},error_msg:{}'.format(
+                e.__traceback__.tb_lineno, repr(e)))
+            # 异常时默认允许使用阿里云
+            return True
+
+    @staticmethod
+    def create_alicloud_oss_signed_url(key_name):
+        """
+        生成阿里云OSS预签名URL
+        @param key_name: 对象名称
+        @return: 预签名URL
+        """
+        try:
+            auth = oss2.Auth(ALICLOUD_AK, ALICLOUD_SK)
+            bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg')
+            url = bucket.sign_url('PUT', key_name, 3600)
+            return url
+        except Exception as e:
+            ERROR_INFO_LOGGER.info('生成阿里云OSS预签名URL异常,key:{},error_line:{},error_msg:{}'.format(
+                key_name, e.__traceback__.tb_lineno, repr(e)))
+            return ''
+
     @staticmethod
     def get_res_data(**kwargs):
         """
@@ -1111,6 +1180,7 @@ class DevicePushService:
         res_data = {'code': 0, 'msg': 'success'}
         is_st = kwargs['is_st']
         storage_location = kwargs['storage_location']
+
         if is_st == 0 or is_st == 2:
             res_data['msg'] = 'success 0 or 2'
         elif is_st == 1:
@@ -1120,6 +1190,7 @@ class DevicePushService:
                 params['Bucket'] = 'push'
             else:  # 1:国外
                 params['Bucket'] = 'foreignpush'
+
             # 根据存储区域返回链接
             if storage_location in [3, 4]:
                 # OCI
@@ -1130,6 +1201,11 @@ class DevicePushService:
                 # AWS
                 img_url = DevicePushService.generate_s3_url(kwargs['aws_s3_client'], params)
                 res_data['img_push'] = img_url
+            elif storage_location == 1:
+                # 阿里云OSS
+                img_url = DevicePushService.create_alicloud_oss_signed_url(key_name)
+                res_data['img_push'] = img_url
+                res_data['msg'] = 'success 1'
             else:
                 # 华为云
                 img_url = DevicePushService.create_obs_signed_url(key_name, 'PUT')
@@ -1141,6 +1217,7 @@ class DevicePushService:
                 params = {'Bucket': 'push'}
             else:  # 1:国外
                 params = {'Bucket': 'foreignpush'}
+
             oci_client = None
             if storage_location in [3, 4]:  # 三张图的时候提前获取实例化OCI
                 region = 'eur' if storage_location == 4 else 'us'
@@ -1155,6 +1232,9 @@ class DevicePushService:
                 elif storage_location == 2:
                     # AWS
                     img_url = DevicePushService.generate_s3_url(kwargs['aws_s3_client'], params)
+                elif storage_location == 1:
+                    # 阿里云OSS
+                    img_url = DevicePushService.create_alicloud_oss_signed_url(key_name)
                 else:
                     # 华为云
                     img_url = DevicePushService.create_obs_signed_url(key_name, 'PUT')
@@ -1163,6 +1243,18 @@ class DevicePushService:
             res_data['msg'] = 'success 3'
         return res_data
 
+    @classmethod
+    def get_oci_req_url(cls, uid: str, channel: str, event_time: int, storage_location: int) -> str:
+        """
+        获取oci预认证请求url
+        """
+        region: str = 'eur' if storage_location == 4 else 'us'
+        oci_client: OCIObjectStorage = OCIObjectStorage(region)
+        key_name: str = '{}/{}/{}.jpeg'.format(uid, channel, event_time)
+        img_url: str = cls.create_oci_req_url(
+            storage_location=storage_location, bucket='foreignpush', obj_name=key_name, oci=oci_client)
+        return img_url
+
     @staticmethod
     def generate_s3_url(aws_s3_client, params):
         """