DynamodbObject.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : DynamodbObject.py
  4. @Time : 2023/8/29 10:05
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. import boto3
  10. class DynamodbObject(object):
  11. def __init__(self, aws_access_key_id, secret_access_key, region_name):
  12. self.dynamo = boto3.client(
  13. 'dynamodb',
  14. aws_access_key_id=aws_access_key_id,
  15. aws_secret_access_key=secret_access_key,
  16. region_name=region_name
  17. )
  18. def get_item(self, table_name, key_params):
  19. """
  20. 查询单个项目
  21. @param table_name: 表名称
  22. @param key_params: 分区键值、排序键值
  23. @return: 查询结果
  24. """
  25. try:
  26. response = self.dynamo.get_item(TableName=table_name,
  27. Key=key_params)
  28. except Exception as err:
  29. print(repr(err))
  30. raise
  31. else:
  32. return response['Item']
  33. def put_item(self, table_name, item):
  34. """
  35. 写入项目
  36. @param table_name: 表名称
  37. @param item: 项目键值
  38. @return: 创建结果
  39. """
  40. try:
  41. response = self.dynamo.put_item(TableName=table_name,
  42. Item=item)
  43. except Exception as err:
  44. print(repr(err))
  45. raise
  46. else:
  47. return response
  48. def query_page(self, table_name, key_condition_expression, expression_attribute_values, exclusive_start_key=None,
  49. page_size=10, scan_index_forward=True):
  50. """
  51. 分页查询
  52. @param table_name: 表名称
  53. @param key_condition_expression: 定义键条件表达式以筛选查询结果
  54. @param expression_attribute_values: 定义表达式中的占位符属性值
  55. @param exclusive_start_key: 用以分页查询时指定起始键
  56. @param page_size: 要返回的最大项目数
  57. @return: 查询结果
  58. """
  59. try:
  60. query_params = {
  61. 'TableName': table_name,
  62. 'KeyConditionExpression': key_condition_expression,
  63. 'ExpressionAttributeValues': expression_attribute_values,
  64. 'Limit': page_size,
  65. 'ScanIndexForward': scan_index_forward
  66. }
  67. if exclusive_start_key:
  68. query_params['ExclusiveStartKey'] = exclusive_start_key
  69. return self.dynamo.query(**query_params)
  70. except Exception as err:
  71. print(repr(err))
  72. raise
  73. def query(self, table_name, key_condition_expression, expression_attribute_values, scan_index_forward=True):
  74. """
  75. 查询项目
  76. @param table_name: 表名称
  77. @param key_condition_expression: 定义键条件表达式以筛选查询结果
  78. @param expression_attribute_values: 定义表达式中的占位符属性值
  79. @return: 查询结果
  80. """
  81. try:
  82. query_params = {
  83. 'TableName': table_name,
  84. 'KeyConditionExpression': key_condition_expression,
  85. 'ExpressionAttributeValues': expression_attribute_values,
  86. 'ScanIndexForward': scan_index_forward
  87. }
  88. return self.dynamo.query(**query_params)
  89. except Exception as err:
  90. print(repr(err))
  91. raise