# aduc_framework/engineers/composer.py # # Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos # # Versão 2.4.2 (Corrected Translator Call) import logging import json import re from pathlib import Path from PIL import Image from typing import List, Dict, Any, Generator, Optional, Callable from .prompt_engine import prompt_engine_singleton from ..managers.llama_multimodal_manager import llama_multimodal_manager_singleton logger = logging.getLogger(__name__) def robust_json_parser(raw_text: str) -> dict: logger.debug(f"COMPOSER(JSON_PARSER): Tentando parsear JSON do texto bruto (primeiros 500 chars):\n---\n{raw_text[:500]}\n---") match = re.search(r'```json\s*(\{.*?\})\s*```', raw_text, re.DOTALL) if match: json_str = match.group(1) logger.debug("COMPOSER(JSON_PARSER): Bloco de código JSON explícito encontrado, parseando...") return json.loads(json_str) try: start_index = raw_text.find('{') end_index = raw_text.rfind('}') if start_index != -1 and end_index != -1 and end_index > start_index: json_str = raw_text[start_index : end_index + 1] logger.debug("COMPOSER(JSON_PARSER): JSON encontrado por delimitadores '{...}', parseando...") return json.loads(json_str) except json.JSONDecodeError: pass logger.debug("COMPOSER(JSON_PARSER): Nenhum delimitador ou bloco de código encontrado, tentando parsear o texto inteiro...") return json.loads(raw_text) class Composer: def __init__(self, model_map_name: str = "llama_3_2_vision"): self.task_templates = self._load_task_templates() self.json_schemas = { "PREPROD_01_CATALOG_ASSETS": { "scenarios": [{"id": "string (ex: scenario_01)", "description": "string", "tags": ["exemplo_tag"], "best_photo_id": 0}], "characters": [{"name": "string", "description": "string", "tags": ["exemplo_tag"], "best_photo_id": 0}], "objects": [{"id": "string (ex: object_01)", "description": "string", "tags": ["exemplo_tag"], "best_photo_id": 0}] }, "PREPROD_02_SCORE_ASSETS": { "scenarios": [{"id": "string", "description": "string", "tags": [], "best_photo_id": 0, "relevance_score": 5}], "characters": [{"name": "string", "description": "string", "tags": [], "best_photo_id": 0, "relevance_score": 10}], "objects": [{"id": "string", "description": "string", "tags": [], "best_photo_id": 0, "relevance_score": 3}] }, "PREPROD_04_FRAGMENT_SCENES": { "scenes": [ { "scene_id": 1, "scene_summary": "string", "required_media": { "scenario": [], "characters": [], "objects": [], "generate_new_photo": [] } } ] }, "PREPROD_05_REVIEW_PLAN": { "changes_needed": False, "justification": "string", "updated_scenes": [] }, "PREPROD_06_FRAGMENT_ACTS": { "scenes_with_acts": [ { "scene_id": 1, "scene_summary": "string", "required_media": {}, "acts": [{"act_id": 1, "act_summary": "string"}] } ] } } logger.info(f"Composer inicializado com {len(self.task_templates)} templates de tarefa e {len(self.json_schemas)} schemas JSON.") def _load_task_templates(self) -> Dict[str, str]: templates = {} template_dir = Path(__file__).resolve().parent.parent / "prompts" / "task_templates" if not template_dir.is_dir(): raise FileNotFoundError(f"Diretório de templates de tarefa não encontrado: {template_dir}") for task_file in template_dir.glob("*.txt"): task_id = task_file.stem with open(task_file, 'r', encoding='utf-8') as f: templates[task_id] = f.read() return templates def _talk_to_llama(self, generic_prompt: str, images: Optional[List[Image.Image]] = None, expected_format="text") -> Any: # CORREÇÃO: Passa a flag 'has_image' para o tradutor. final_model_prompt = prompt_engine_singleton.translate( generic_prompt_content=generic_prompt, has_image=bool(images) ) logger.info(f"COMPOSER: Enviando tarefa para o Llama (Esperando {expected_format}).") response_raw = llama_multimodal_manager_singleton.process_turn(prompt_text=final_model_prompt, image_list=images) if expected_format == "json": try: return robust_json_parser(response_raw) except (json.JSONDecodeError, ValueError) as e: logger.error(f"COMPOSER: Falha ao parsear JSON. Resposta bruta: {response_raw}", exc_info=True) raise ValueError(f"O LLM retornou um formato JSON inválido. Erro: {e}") return response_raw def execute_plan( self, execution_plan: List[Dict[str, Any]], initial_data: Dict[str, Any], callback: Optional[Callable] = None ) -> Generator[Dict[str, Any], None, Dict[str, Any]]: dna = { "global_prompt": initial_data["global_prompt"], "initial_media_paths": initial_data["user_media_paths"], "asset_catalog": {}, "story_summary": "", "scenes": [] } user_media = [Image.open(p) for p in initial_data["user_media_paths"]] total_tasks = len(execution_plan) for i, task in enumerate(execution_plan): task_id = task['task_id'] if callback: progress = (i + 0.5) / total_tasks callback(progress, desc=f"({i+1}/{total_tasks}) {task['description']}") yield {"status": "progress", "message": task['description'], "dna_snapshot": dna} logger.info(f"--- COMPOSER: INICIANDO TAREFA '{task_id}' ---") generic_template = self.task_templates.get(task_id) if not generic_template: raise ValueError(f"Template para a tarefa '{task_id}' não foi encontrado.") template_data = {**task['inputs'], **dna} template_data.setdefault('asset_catalog_str', json.dumps(dna.get("asset_catalog"), indent=2)) template_data.setdefault('scenes_str', json.dumps(dna.get("scenes"), indent=2)) template_data['output_json_schema'] = json.dumps(self.json_schemas.get(task_id, {}), indent=2) try: prompt_content = generic_template.replace("{global_prompt}", str(template_data.get("global_prompt", ""))) prompt_content = prompt_content.replace("{num_scenes}", str(template_data.get("num_scenes", ""))) prompt_content = prompt_content.replace("{max_duration_per_act}", str(template_data.get("max_duration_per_act", ""))) prompt_content = prompt_content.replace("{asset_catalog_str}", template_data['asset_catalog_str']) prompt_content = prompt_content.replace("{scenes_str}", template_data['scenes_str']) prompt_content = prompt_content.replace("{story_summary}", str(template_data.get("story_summary", ""))) prompt_content = prompt_content.replace("{output_json_schema}", template_data['output_json_schema']) logger.debug(f"Prompt genérico final (após substituições):\n---\n{prompt_content}\n---") needs_images = task_id == "PREPROD_01_CATALOG_ASSETS" expected_format = "json" if self.json_schemas.get(task_id) else "text" response = self._talk_to_llama(prompt_content, user_media if needs_images else None, expected_format) if task_id == "PREPROD_01_CATALOG_ASSETS": dna["asset_catalog"] = response elif task_id == "PREPROD_02_SCORE_ASSETS": dna["asset_catalog"] = response elif task_id == "PREPROD_03_CREATE_SUMMARY": dna["story_summary"] = response elif task_id == "PREPROD_04_FRAGMENT_SCENES": dna["scenes"] = response.get("scenes", []) elif task_id == "PREPROD_05_REVIEW_PLAN": if response.get("changes_needed", False): dna["scenes"] = response.get("updated_scenes", dna["scenes"]) elif task_id == "PREPROD_06_FRAGMENT_ACTS": dna["scenes"] = response.get("scenes_with_acts", []) logger.info(f"--- COMPOSER: TAREFA '{task_id}' CONCLUÍDA ---") except Exception as e: logger.error(f"COMPOSER: ERRO CRÍTICO NA TAREFA '{task_id}'!", exc_info=True) raise e if callback: progress = (i + 1) / total_tasks callback(progress, desc=f"({i+1}/{total_tasks}) Tarefa Concluída!") yield {"status": "progress", "message": f"Tarefa '{task_id}' concluída.", "dna_snapshot": dna} final_message = "Execução do Composer concluída." logger.info(final_message) yield {"status": "complete", "message": final_message, "dna": dna} return dna composer_singleton = Composer()