# -*- encoding: utf-8 -*- """ @File : NovaImageTagObject.py @Time : 2025/8/29 09:03 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import base64 import io import json import logging import re import boto3 from PIL import Image LOGGER = logging.getLogger('time') # --- 配置信息 --- MODEL_ID = "us.amazon.nova-lite-v1:0" class NovaImageTagObject(object): def __init__(self, aws_access_key_id, secret_access_key, region_name): self.bedrock = boto3.client( 'bedrock-runtime', aws_access_key_id=aws_access_key_id, aws_secret_access_key=secret_access_key, region_name=region_name ) @staticmethod def safe_json_load(json_string): """ 一个更健壮的JSON解析函数,尝试修复常见的模型输出格式问题。 """ try: # 寻找被代码块包围的JSON json_match = re.search(r'```json\s*([\s\S]*?)\s*```', json_string) if json_match: json_string = json_match.group(1) # 寻找常规的JSON对象或数组 json_match = re.search(r'\{.*\}|\[.*\]', json_string, re.DOTALL) if json_match: json_string = json_match.group(0) return json.loads(json_string) except json.JSONDecodeError: LOGGER.error("JSON解析失败,尝试修复...") try: json_string = re.sub(r"(\w+):", r'"\1":', json_string) json_string = json_string.replace("'", '"') return json.loads(json_string) except Exception as e: LOGGER.error(f"无法解析模型返回的JSON: {e}") return None except Exception as e: LOGGER.error(f"发生未知解析错误: {e}") return None @staticmethod def format_and_convert_detections(nova_detections: list) -> list: """ 将Nova模型返回的坐标转换为您指定的详细格式,包含原始坐标和Rekognition比例。 """ formatted_results = [] if not isinstance(nova_detections, list): return [] for item in nova_detections: if not isinstance(item, dict): continue label = list(item.keys())[0] nx1, ny1, nx2, ny2 = item[label] left = nx1 / 1000.0 top = ny1 / 1000.0 width = (nx2 - nx1) / 1000.0 height = (ny2 - ny1) / 1000.0 formatted_results.append({ "x1": nx1, "x2": nx2, "y1": ny1, "y2": ny2, "Width": f"{width:.5f}", "Height": f"{height:.5f}", "Top": f"{top:.5f}", "Left": f"{left:.5f}", "class": label }) return formatted_results @staticmethod def normalize_b64(b64_str: str) -> str: """清理并补齐base64字符串""" if not b64_str: return "" b64_str = re.sub(r"^data:image/[^;]+;base64,", "", b64_str) b64_str = b64_str.strip().replace("\n", "").replace(" ", "") # 补齐Base64填充 padding = 4 - (len(b64_str) % 4) if padding and padding != 4: b64_str += "=" * padding return b64_str def process_image_batch(self, base64_images: list, categories: list, uid=''): if not base64_images: LOGGER.error(f"{uid}错误: 未提供图片数据。") return {} image_contents = [] for idx, b64_image in enumerate(base64_images, start=1): try: # 规范化base64 b64_image = self.normalize_b64(b64_image) if not b64_image: raise ValueError("空的base64字符串") # 解码为二进制 img_bytes = base64.b64decode(b64_image) # 使用PIL处理图像 image = Image.open(io.BytesIO(img_bytes)) # 转换为RGB模式(如果需要) if image.mode != 'RGB': image = image.convert('RGB') # 转换为WebP格式 buffer = io.BytesIO() image.save(buffer, format="webp", quality=90) img_bytes = buffer.getvalue() # 直接传递二进制数据给Bedrock image_contents.append({ "image": {"format": "webp", "source": {"bytes": img_bytes}} }) LOGGER.info(f"{uid} 第{idx}张图处理成功, 格式=webp, 大小={len(img_bytes)}B") except Exception as e: LOGGER.error(f"{uid} 第{idx}张图处理失败,已跳过: {repr(e)}") image_contents.append(None) # 添加占位符 if not image_contents: LOGGER.error(f"{uid}错误: 所有图片均无法处理。") return {} category_str = ", ".join([f'"{cat.lower()}"' for cat in categories]) num_images = len(image_contents) # --- 关键改动:为多图片设计的全新Prompt --- prompt = f""" You have been provided with {num_images} images. Analyze each image sequentially. For each image, detect bounding boxes of objects from the following categories: {category_str}. Your output MUST be a single, valid JSON object. The keys of this object should be "image_0", "image_1", ..., "image_{num_images - 1}", corresponding to the first, second, and subsequent images provided. The value for each key must be a list of detected objects for that specific image. If no objects are detected in an image, the value should be an empty list []. Use a 1000x1000 coordinate system for the bounding boxes. Example output format for {num_images} images: {{ "image_0": [{{"person": [100, 150, 200, 350]}}, {{"car": [400, 500, 600, 700]}}], "image_1": [], "image_2": [{{"package": [300, 300, 400, 400]}}] }} """ messages = [{"role": "user", "content": image_contents + [{"text": prompt}]}] try: response = self.bedrock.converse( modelId=MODEL_ID, messages=messages, inferenceConfig={"temperature": 0.0, "maxTokens": 4096, "topP": 1.0}, ) model_output = response["output"]["message"]["content"][0]["text"] LOGGER.info(f"\n--- {uid}模型对整个批次的原始输出 ---\n{model_output}") # 解析模型返回的包含所有图片结果的JSON对象 batch_results = self.safe_json_load(model_output) if not batch_results or not isinstance(batch_results, dict): LOGGER.error(f"{uid}模型未返回预期的字典格式结果。") return {} # --- 核心逻辑:将批处理结果映射回您的格式 --- final_output_dict = {} for i in range(len(base64_images)): # 从批处理结果中获取当前图片的数据,如果不存在则默认为空列表 nova_detections = batch_results.get(f"image_{i}", []) # 转换为您最终需要的格式 detailed_results = self.format_and_convert_detections(nova_detections) final_output_dict[f"file_{i}"] = detailed_results return final_output_dict except Exception as e: LOGGER.error(f"{uid}调用Bedrock模型或处理过程中发生错误: {repr(e)}") return {}