|
@@ -0,0 +1,100 @@
|
|
|
+# -*- encoding: utf-8 -*-
|
|
|
+"""
|
|
|
+@File : DynamoDBObject.py
|
|
|
+@Time : 2023/8/29 10:05
|
|
|
+@Author : stephen
|
|
|
+@Email : zhangdongming@asj6.wecom.work
|
|
|
+@Software: PyCharm
|
|
|
+"""
|
|
|
+
|
|
|
+import boto3
|
|
|
+
|
|
|
+
|
|
|
+class DynamodbObject(object):
|
|
|
+ def __init__(self, aws_access_key_id, secret_access_key, region_name):
|
|
|
+ self.dynamo = boto3.client(
|
|
|
+ 'dynamodb',
|
|
|
+ aws_access_key_id=aws_access_key_id,
|
|
|
+ aws_secret_access_key=secret_access_key,
|
|
|
+ region_name=region_name
|
|
|
+ )
|
|
|
+
|
|
|
+ def get_item(self, table_name, key_params):
|
|
|
+ """
|
|
|
+ 查询单个项目
|
|
|
+ @param table_name: 表名称
|
|
|
+ @param key_params: 分区键值、排序键值
|
|
|
+ @return: 查询结果
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ response = self.dynamo.get_item(TableName=table_name,
|
|
|
+ Key=key_params)
|
|
|
+ except Exception as err:
|
|
|
+ print(repr(err))
|
|
|
+ raise
|
|
|
+ else:
|
|
|
+ return response['Item']
|
|
|
+
|
|
|
+ def put_item(self, table_name, item):
|
|
|
+ """
|
|
|
+ 写入项目
|
|
|
+ @param table_name: 表名称
|
|
|
+ @param item: 项目键值
|
|
|
+ @return: 创建结果
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ response = self.dynamo.put_item(TableName=table_name,
|
|
|
+ Item=item)
|
|
|
+ except Exception as err:
|
|
|
+ print(repr(err))
|
|
|
+ raise
|
|
|
+ else:
|
|
|
+ return response
|
|
|
+
|
|
|
+ def query_page(self, table_name, key_condition_expression, expression_attribute_values, exclusive_start_key=None,
|
|
|
+ page_size=10, scan_index_forward=True):
|
|
|
+ """
|
|
|
+ 分页查询
|
|
|
+ @param table_name: 表名称
|
|
|
+ @param key_condition_expression: 定义键条件表达式以筛选查询结果
|
|
|
+ @param expression_attribute_values: 定义表达式中的占位符属性值
|
|
|
+ @param exclusive_start_key: 用以分页查询时指定起始键
|
|
|
+ @param page_size: 要返回的最大项目数
|
|
|
+ @return: 查询结果
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ query_params = {
|
|
|
+ 'TableName': table_name,
|
|
|
+ 'KeyConditionExpression': key_condition_expression,
|
|
|
+ 'ExpressionAttributeValues': expression_attribute_values,
|
|
|
+ 'Limit': page_size,
|
|
|
+ 'ScanIndexForward': scan_index_forward
|
|
|
+ }
|
|
|
+ if exclusive_start_key:
|
|
|
+ query_params['ExclusiveStartKey'] = exclusive_start_key
|
|
|
+
|
|
|
+ return self.dynamo.query(**query_params)
|
|
|
+ except Exception as err:
|
|
|
+ print(repr(err))
|
|
|
+ raise
|
|
|
+
|
|
|
+ def query(self, table_name, key_condition_expression, expression_attribute_values, scan_index_forward=True):
|
|
|
+ """
|
|
|
+ 查询项目
|
|
|
+ @param table_name: 表名称
|
|
|
+ @param key_condition_expression: 定义键条件表达式以筛选查询结果
|
|
|
+ @param expression_attribute_values: 定义表达式中的占位符属性值
|
|
|
+ @return: 查询结果
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ query_params = {
|
|
|
+ 'TableName': table_name,
|
|
|
+ 'KeyConditionExpression': key_condition_expression,
|
|
|
+ 'ExpressionAttributeValues': expression_attribute_values,
|
|
|
+ 'ScanIndexForward': scan_index_forward
|
|
|
+ }
|
|
|
+
|
|
|
+ return self.dynamo.query(**query_params)
|
|
|
+ except Exception as err:
|
|
|
+ print(repr(err))
|
|
|
+ raise
|