NovaImageTagObject.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : NovaImageTagObject.py
  4. @Time : 2025/8/29 09:03
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. import base64
  10. import io
  11. import json
  12. import logging
  13. import re
  14. import boto3
  15. from PIL import Image
  16. LOGGER = logging.getLogger('time')
  17. # --- 配置信息 ---
  18. MODEL_ID = "us.amazon.nova-lite-v1:0"
  19. class NovaImageTagObject(object):
  20. def __init__(self, aws_access_key_id, secret_access_key, region_name):
  21. self.bedrock = boto3.client(
  22. 'bedrock-runtime',
  23. aws_access_key_id=aws_access_key_id,
  24. aws_secret_access_key=secret_access_key,
  25. region_name=region_name
  26. )
  27. @staticmethod
  28. def safe_json_load(json_string):
  29. """
  30. 一个更健壮的JSON解析函数,尝试修复常见的模型输出格式问题。
  31. """
  32. try:
  33. # 寻找被代码块包围的JSON
  34. json_match = re.search(r'```json\s*([\s\S]*?)\s*```', json_string)
  35. if json_match:
  36. json_string = json_match.group(1)
  37. # 寻找常规的JSON对象或数组
  38. json_match = re.search(r'\{.*\}|\[.*\]', json_string, re.DOTALL)
  39. if json_match:
  40. json_string = json_match.group(0)
  41. return json.loads(json_string)
  42. except json.JSONDecodeError:
  43. LOGGER.error("JSON解析失败,尝试修复...")
  44. try:
  45. json_string = re.sub(r"(\w+):", r'"\1":', json_string)
  46. json_string = json_string.replace("'", '"')
  47. return json.loads(json_string)
  48. except Exception as e:
  49. LOGGER.error(f"无法解析模型返回的JSON: {e}")
  50. return None
  51. except Exception as e:
  52. LOGGER.error(f"发生未知解析错误: {e}")
  53. return None
  54. @staticmethod
  55. def format_and_convert_detections(nova_detections: list) -> list:
  56. """
  57. 将Nova模型返回的坐标转换为您指定的详细格式,包含原始坐标和Rekognition比例。
  58. """
  59. formatted_results = []
  60. if not isinstance(nova_detections, list):
  61. return []
  62. for item in nova_detections:
  63. if not isinstance(item, dict): continue
  64. label = list(item.keys())[0]
  65. nx1, ny1, nx2, ny2 = item[label]
  66. left = nx1 / 1000.0
  67. top = ny1 / 1000.0
  68. width = (nx2 - nx1) / 1000.0
  69. height = (ny2 - ny1) / 1000.0
  70. formatted_results.append({
  71. "x1": nx1, "x2": nx2, "y1": ny1, "y2": ny2,
  72. "Width": f"{width:.5f}", "Height": f"{height:.5f}",
  73. "Top": f"{top:.5f}", "Left": f"{left:.5f}",
  74. "class": label
  75. })
  76. return formatted_results
  77. @staticmethod
  78. def normalize_b64(b64_str: str) -> str:
  79. """清理并验证base64字符串"""
  80. if not b64_str:
  81. return ""
  82. # 移除可能的数据URL前缀
  83. b64_str = re.sub(r"^data:image/[^;]+;base64,", "", b64_str)
  84. # 移除所有非Base64字符(包括空格、换行等)
  85. b64_str = re.sub(r"[^A-Za-z0-9+/=]", "", b64_str)
  86. # 检查Base64有效性
  87. if len(b64_str) % 4 != 0:
  88. # 自动补全填充位
  89. b64_str += "=" * (4 - len(b64_str) % 4)
  90. return b64_str
  91. def process_image_batch(self, base64_images: list, categories: list, uid=''):
  92. if not base64_images:
  93. LOGGER.error(f"{uid}错误: 未提供图片数据。")
  94. return {}
  95. image_contents = []
  96. for idx, b64_image in enumerate(base64_images, start=1):
  97. try:
  98. # 规范化base64
  99. original_b64 = b64_image # 保存原始值用于调试
  100. b64_image = self.normalize_b64(b64_image)
  101. if not b64_image:
  102. raise ValueError("空的base64字符串")
  103. # 调试输出
  104. LOGGER.debug(f"{uid} 第{idx}张图处理前: {original_b64[:50]}...")
  105. LOGGER.debug(f"{uid} 第{idx}张图处理后: {b64_image[:50]}...")
  106. # 解码为二进制
  107. img_bytes = base64.b64decode(b64_image)
  108. # 验证解码后的数据
  109. if len(img_bytes) == 0:
  110. raise ValueError("解码后得到空字节数据")
  111. # 使用PIL处理图像
  112. image = Image.open(io.BytesIO(img_bytes))
  113. # 转换为RGB模式(如果需要)
  114. if image.mode != 'RGB':
  115. image = image.convert('RGB')
  116. # 转换为WebP格式
  117. buffer = io.BytesIO()
  118. image.save(buffer, format="webp", quality=90)
  119. webp_bytes = buffer.getvalue()
  120. image_contents.append({
  121. "image": {"format": "webp", "source": {"bytes": webp_bytes}}
  122. })
  123. LOGGER.info(f"{uid} 第{idx}张图处理成功, 格式=webp, 大小={len(webp_bytes)}B")
  124. except Exception as e:
  125. LOGGER.error(f"{uid} 第{idx}张图处理失败: {repr(e)}")
  126. LOGGER.debug(f"{uid}失败图像的Base64前100字符: {b64_image[:100]}")
  127. # 不要添加None,而是跳过或使用占位符图像
  128. continue # 直接跳过这张图
  129. if not image_contents:
  130. LOGGER.error(f"{uid}错误: 所有图片均无法处理。")
  131. return {}
  132. category_str = ", ".join([f'"{cat.lower()}"' for cat in categories])
  133. num_images = len(image_contents)
  134. # --- 关键改动:为多图片设计的全新Prompt ---
  135. prompt = f"""
  136. You have been provided with {num_images} images. Analyze each image sequentially.
  137. For each image, detect bounding boxes of objects from the following categories: {category_str}.
  138. Your output MUST be a single, valid JSON object.
  139. The keys of this object should be "image_0", "image_1", ..., "image_{num_images - 1}", corresponding to the first, second, and subsequent images provided.
  140. The value for each key must be a list of detected objects for that specific image. If no objects are detected in an image, the value should be an empty list [].
  141. Use a 1000x1000 coordinate system for the bounding boxes.
  142. Example output format for {num_images} images:
  143. {{
  144. "image_0": [{{"person": [100, 150, 200, 350]}}, {{"car": [400, 500, 600, 700]}}],
  145. "image_1": [],
  146. "image_2": [{{"package": [300, 300, 400, 400]}}]
  147. }}
  148. """
  149. messages = [{"role": "user", "content": image_contents + [{"text": prompt}]}]
  150. try:
  151. response = self.bedrock.converse(
  152. modelId=MODEL_ID,
  153. messages=messages,
  154. inferenceConfig={"temperature": 0.0, "maxTokens": 4096, "topP": 1.0},
  155. )
  156. model_output = response["output"]["message"]["content"][0]["text"]
  157. LOGGER.info(f"\n--- {uid}模型对整个批次的原始输出 ---\n{model_output}")
  158. # 解析模型返回的包含所有图片结果的JSON对象
  159. batch_results = self.safe_json_load(model_output)
  160. if not batch_results or not isinstance(batch_results, dict):
  161. LOGGER.error(f"{uid}模型未返回预期的字典格式结果。")
  162. return {}
  163. # --- 核心逻辑:将批处理结果映射回您的格式 ---
  164. final_output_dict = {}
  165. for i in range(len(base64_images)):
  166. # 从批处理结果中获取当前图片的数据,如果不存在则默认为空列表
  167. nova_detections = batch_results.get(f"image_{i}", [])
  168. # 转换为您最终需要的格式
  169. detailed_results = self.format_and_convert_detections(nova_detections)
  170. final_output_dict[f"file_{i}"] = detailed_results
  171. return final_output_dict
  172. except Exception as e:
  173. LOGGER.error(f"{uid}调用Bedrock模型或处理过程中发生错误: {repr(e)}")
  174. return {}