# -*- encoding: utf-8 -*- """ @File : OCIObjectStorage.py @Time : 2024/4/10 15:06 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import logging import oci from AnsjerPush.config import OCI_CONFIG, OCI_NAMESPACE_NAME LOGGER = logging.getLogger('time') class OCIObjectStorage: # Create a default oci_config using DEFAULT profile in default location # Refer to # https://docs.cloud.oracle.com/en-us/iaas/Content/API/Concepts/sdkconfig.htm#SDK_and_CLI_Configuration_File # for more info def __init__(self, region): # Initialize service client with default oci_config file self.object_storage_client = oci.object_storage.ObjectStorageClient(OCI_CONFIG[region]) def create_ereauthenticated_request(self, bucket_name, name, object_name, time_expires): """ 创建特定于桶的预认证请求。 api:https://docs.oracle.com/en-us/iaas/api/#/en/objectstorage/20160918/PreauthenticatedRequest/CreatePreauthenticatedRequest @param bucket_name: 存储桶名称 @param name: 请求名称 是创建的预授权链接的名称,是方便管理用的,不会影响功能。比如对每个桶分别创建链接,如果要删除或者查看,可以根据name看出来是对哪个桶的链接。 @param object_name: 对象名 @param time_expires: 失效时间 需要datetime类型格式 例如:datetime.utcnow() + timedelta(minutes=30) @return: 预认证请求URL """ try: object_storage_client = self.object_storage_client # Send the request to service, some parameters are not required, see API # doc for more info response = object_storage_client.create_preauthenticated_request( namespace_name=OCI_NAMESPACE_NAME, bucket_name=bucket_name, create_preauthenticated_request_details=oci.object_storage.models.CreatePreauthenticatedRequestDetails( name=name, access_type="AnyObjectWrite", time_expires=time_expires, bucket_listing_action="Deny", object_name=object_name)) assert response.status == 200 return response.data except Exception as e: LOGGER.error('OCI创建预认证URL异常errLine={errLine}, errMsg={errMsg}' .format(errLine=e.__traceback__.tb_lineno, errMsg=repr(e))) return None def get_preauthenticated_request_url(self, bucket_name, name, object_name, time_expires): """ 获取指定对象预认证请求URL。 @param bucket_name: 存储桶名称 @param name: 请求名称 是创建的预授权链接的名称,是方便管理用的,不会影响功能。比如对每个桶分别创建链接,如果要删除或者查看,可以根据name看出来是对哪个桶的链接。 @param object_name: 对象名 @param time_expires: 失效时间 需要datetime类型格式 例如:datetime.utcnow() + timedelta(minutes=30) @return: 预认证请求URL """ try: object_storage_client = self.object_storage_client # 创建预认证请求 response = object_storage_client.create_preauthenticated_request( namespace_name=OCI_NAMESPACE_NAME, bucket_name=bucket_name, create_preauthenticated_request_details=oci.object_storage.models.CreatePreauthenticatedRequestDetails( name=name, object_name=object_name, access_type="ObjectRead", time_expires=time_expires ) ) assert response.status == 200 return response.data except Exception as e: LOGGER.error('OCI获取预认证URL异常errLine={errLine}, errMsg={errMsg}' .format(errLine=e.__traceback__.tb_lineno, errMsg=repr(e))) return None def put_object(self, bucket_name, object_name, obj, content_type=None): """ 上传对象 @param bucket_name: 存储桶名称 @param object_name: 对象名 @param obj: 数据内容 @param content_type: 文件类型 @return: 可访问对象URL """ try: object_storage_client = self.object_storage_client # 发送上传请求 put_object_response = object_storage_client.put_object( namespace_name=OCI_NAMESPACE_NAME, bucket_name=bucket_name, object_name=object_name, put_object_body=obj, content_type=content_type ) # 打印响应头信息 assert put_object_response.status == 200 except Exception as e: LOGGER.error('OCI上传对象异常errLine={errLine}, errMsg={errMsg}' .format(errLine=e.__traceback__.tb_lineno, errMsg=repr(e))) return None def delete_object(self, bucket_name, object_name, content_type=None): """ 删除对象 @param bucket_name: 存储桶名称 @param object_name: 对象名 @param content_type: 文件类型 @return: 可访问对象URL """ try: object_storage_client = self.object_storage_client # 发送删除请求 object_storage_client.delete_object( namespace_name=OCI_NAMESPACE_NAME, bucket_name=bucket_name, object_name=object_name) except Exception as e: LOGGER.error('OCI删除对象异常errLine={errLine}, errMsg={errMsg}' .format(errLine=e.__traceback__.tb_lineno, errMsg=repr(e))) return None