NovaImageTagObject.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : NovaImageTagObject.py
  4. @Time : 2025/8/29 09:03
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. import base64
  10. import io
  11. import json
  12. import logging
  13. import re
  14. import boto3
  15. from PIL import Image
  16. LOGGER = logging.getLogger('time')
  17. # --- 配置信息 ---
  18. MODEL_ID = "us.amazon.nova-lite-v1:0"
  19. class NovaImageTagObject(object):
  20. def __init__(self, aws_access_key_id, secret_access_key, region_name):
  21. self.bedrock = boto3.client(
  22. 'bedrock-runtime',
  23. aws_access_key_id=aws_access_key_id,
  24. aws_secret_access_key=secret_access_key,
  25. region_name=region_name
  26. )
  27. @staticmethod
  28. def safe_json_load(json_string):
  29. """
  30. 一个更健壮的JSON解析函数,尝试修复常见的模型输出格式问题。
  31. """
  32. try:
  33. # 寻找被代码块包围的JSON
  34. json_match = re.search(r'```json\s*([\s\S]*?)\s*```', json_string)
  35. if json_match:
  36. json_string = json_match.group(1)
  37. # 寻找常规的JSON对象或数组
  38. json_match = re.search(r'\{.*\}|\[.*\]', json_string, re.DOTALL)
  39. if json_match:
  40. json_string = json_match.group(0)
  41. return json.loads(json_string)
  42. except json.JSONDecodeError:
  43. LOGGER.error("JSON解析失败,尝试修复...")
  44. try:
  45. json_string = re.sub(r"(\w+):", r'"\1":', json_string)
  46. json_string = json_string.replace("'", '"')
  47. return json.loads(json_string)
  48. except Exception as e:
  49. LOGGER.error(f"无法解析模型返回的JSON: {e}")
  50. return None
  51. except Exception as e:
  52. LOGGER.error(f"发生未知解析错误: {e}")
  53. return None
  54. @staticmethod
  55. def format_and_convert_detections(nova_detections: list) -> list:
  56. """
  57. 将Nova模型返回的坐标转换为您指定的详细格式,包含原始坐标和Rekognition比例。
  58. """
  59. formatted_results = []
  60. if not isinstance(nova_detections, list):
  61. return []
  62. for item in nova_detections:
  63. if not isinstance(item, dict): continue
  64. label = list(item.keys())[0]
  65. nx1, ny1, nx2, ny2 = item[label]
  66. left = nx1 / 1000.0
  67. top = ny1 / 1000.0
  68. width = (nx2 - nx1) / 1000.0
  69. height = (ny2 - ny1) / 1000.0
  70. formatted_results.append({
  71. "x1": nx1, "x2": nx2, "y1": ny1, "y2": ny2,
  72. "Width": f"{width:.5f}", "Height": f"{height:.5f}",
  73. "Top": f"{top:.5f}", "Left": f"{left:.5f}",
  74. "class": label
  75. })
  76. return formatted_results
  77. @staticmethod
  78. def normalize_b64(b64_str: str) -> str:
  79. """清理并补齐base64字符串"""
  80. if not b64_str:
  81. return ""
  82. b64_str = re.sub(r"^data:image/[^;]+;base64,", "", b64_str)
  83. b64_str = b64_str.strip().replace("\n", "").replace(" ", "")
  84. # 补齐Base64填充
  85. padding = 4 - (len(b64_str) % 4)
  86. if padding and padding != 4:
  87. b64_str += "=" * padding
  88. return b64_str
  89. def process_image_batch(self, base64_images: list, categories: list, uid=''):
  90. if not base64_images:
  91. LOGGER.error(f"{uid}错误: 未提供图片数据。")
  92. return {}
  93. image_contents = []
  94. for idx, b64_image in enumerate(base64_images, start=1):
  95. try:
  96. # 规范化base64
  97. b64_image = self.normalize_b64(b64_image)
  98. if not b64_image:
  99. raise ValueError("空的base64字符串")
  100. # 解码为二进制
  101. img_bytes = base64.b64decode(b64_image)
  102. # 使用PIL处理图像
  103. image = Image.open(io.BytesIO(img_bytes))
  104. # 转换为RGB模式(如果需要)
  105. if image.mode != 'RGB':
  106. image = image.convert('RGB')
  107. # 转换为WebP格式
  108. buffer = io.BytesIO()
  109. image.save(buffer, format="webp", quality=90)
  110. img_bytes = buffer.getvalue()
  111. # 直接传递二进制数据给Bedrock
  112. image_contents.append({
  113. "image": {"format": "webp", "source": {"bytes": img_bytes}}
  114. })
  115. LOGGER.info(f"{uid} 第{idx}张图处理成功, 格式=webp, 大小={len(img_bytes)}B")
  116. except Exception as e:
  117. LOGGER.error(f"{uid} 第{idx}张图处理失败,已跳过: {repr(e)}")
  118. image_contents.append(None) # 添加占位符
  119. if not image_contents:
  120. LOGGER.error(f"{uid}错误: 所有图片均无法处理。")
  121. return {}
  122. category_str = ", ".join([f'"{cat.lower()}"' for cat in categories])
  123. num_images = len(image_contents)
  124. # --- 关键改动:为多图片设计的全新Prompt ---
  125. prompt = f"""
  126. You have been provided with {num_images} images. Analyze each image sequentially.
  127. For each image, detect bounding boxes of objects from the following categories: {category_str}.
  128. Your output MUST be a single, valid JSON object.
  129. The keys of this object should be "image_0", "image_1", ..., "image_{num_images - 1}", corresponding to the first, second, and subsequent images provided.
  130. The value for each key must be a list of detected objects for that specific image. If no objects are detected in an image, the value should be an empty list [].
  131. Use a 1000x1000 coordinate system for the bounding boxes.
  132. Example output format for {num_images} images:
  133. {{
  134. "image_0": [{{"person": [100, 150, 200, 350]}}, {{"car": [400, 500, 600, 700]}}],
  135. "image_1": [],
  136. "image_2": [{{"package": [300, 300, 400, 400]}}]
  137. }}
  138. """
  139. messages = [{"role": "user", "content": image_contents + [{"text": prompt}]}]
  140. try:
  141. response = self.bedrock.converse(
  142. modelId=MODEL_ID,
  143. messages=messages,
  144. inferenceConfig={"temperature": 0.0, "maxTokens": 4096, "topP": 1.0},
  145. )
  146. model_output = response["output"]["message"]["content"][0]["text"]
  147. LOGGER.info(f"\n--- {uid}模型对整个批次的原始输出 ---\n{model_output}")
  148. # 解析模型返回的包含所有图片结果的JSON对象
  149. batch_results = self.safe_json_load(model_output)
  150. if not batch_results or not isinstance(batch_results, dict):
  151. LOGGER.error(f"{uid}模型未返回预期的字典格式结果。")
  152. return {}
  153. # --- 核心逻辑:将批处理结果映射回您的格式 ---
  154. final_output_dict = {}
  155. for i in range(len(base64_images)):
  156. # 从批处理结果中获取当前图片的数据,如果不存在则默认为空列表
  157. nova_detections = batch_results.get(f"image_{i}", [])
  158. # 转换为您最终需要的格式
  159. detailed_results = self.format_and_convert_detections(nova_detections)
  160. final_output_dict[f"file_{i}"] = detailed_results
  161. return final_output_dict
  162. except Exception as e:
  163. LOGGER.error(f"{uid}调用Bedrock模型或处理过程中发生错误: {repr(e)}")
  164. return {}