# aduc_framework/orchestrator.py # # Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos # # Versão 7.0.0 (Strategic Maestro with Planner-Composer Delegation) # # O Orquestrador atua como o Maestro (Γ). Ele delega a criação do plano # de pré-produção ao Planner2D e a execução ao Composer. Uma vez que o DNA # da produção é gerado, ele o utiliza para dirigir as fases de produção # (Deformes4DEngine) e pós-produção. import logging from typing import List, Dict, Any, Tuple, Callable, Optional, Generator from PIL import Image, ImageOps import os import shutil import time import gc import torch import subprocess from pathlib import Path from .director import AducDirector from .types import GenerationState, PreProductionParams, ProductionParams # Importa a nova hierarquia de planejamento e execução from .engineers.planner_2d import planner_2d_singleton from .engineers.composer import composer_singleton # Importa os engenheiros e especialistas das fases seguintes from .engineers.deformes4D import Deformes4DEngine # Será o Composer4D no futuro from .managers import ( latent_enhancer_specialist_singleton, seedvr_manager_singleton, mmaudio_manager_singleton, vae_manager_singleton ) from .tools.video_encode_tool import video_encode_tool_singleton logger = logging.getLogger(__name__) ProgressCallback = Optional[Callable[[float, str], None]] class AducOrchestrator: """ Implementa o Maestro (Γ), a camada de orquestração central. Coordena os especialistas, gerencia o estado da produção através do Diretor, e delega as fases de planejamento e execução. """ def __init__(self, workspace_dir: str): self.director = AducDirector(workspace_dir) self.editor = Deformes4DEngine() self.editor.initialize(workspace_dir) self.device = 'cuda' if torch.cuda.is_available() else 'cpu' logger.info("ADUC Maestro (Framework Core) pronto para reger a orquestra de especialistas.") def get_current_state(self) -> GenerationState: """Retorna o estado de geração atual, gerenciado pelo AducDirector.""" return self.director.get_full_state() def process_image_for_story(self, image_path: str, size: int, filename: str) -> str: """Processa e padroniza uma imagem de referência para o formato quadrado.""" img = Image.open(image_path).convert("RGB") img_square = ImageOps.fit(img, (size, size), Image.Resampling.LANCZOS) processed_path = os.path.join(self.director.workspace_dir, filename) img_square.save(processed_path) logger.info(f"Imagem de referência processada e salva em: {processed_path}") return processed_path def task_pre_production(self, params: PreProductionParams, progress_callback: Optional[Callable] = None) -> Generator[Dict[str, Any], None, None]: """ Orquestra a pré-produção: Planner2D cria o plano, Composer o executa. Retransmite as atualizações de progresso para a camada superior. """ logger.info("Maestro: Iniciando Fase de Pré-Produção.") self.director.update_parameters("pre_producao", params) # ETAPA 1.1: PLANEJAMENTO ESTRATÉGICO execution_plan = [] try: plan_generator = planner_2d_singleton.generate_execution_plan( global_prompt=params.prompt, num_scenes=params.num_scenes, max_duration_per_act=params.duration_per_fragment, callback=progress_callback ) for update in plan_generator: yield update # Repassa as atualizações do Planner para a UI if update.get("status") == "planning_complete": execution_plan = update.get("plan", []) except Exception as e: logger.error(f"Maestro: Falha crítica na fase de planejamento. Erro: {e}", exc_info=True) yield {"status": "error", "message": f"Erro no Planner2D: {e}"} return if not execution_plan: error_message = "Maestro: Plano de execução retornado pelo Planner2D está vazio." logger.error(error_message) yield {"status": "error", "message": error_message} return # ETAPA 1.2: EXECUÇÃO DO PLANO initial_data = { "global_prompt": params.prompt, "user_media_paths": params.ref_paths, } final_dna = {} try: execution_generator = composer_singleton.execute_plan(execution_plan, initial_data, callback=progress_callback) for update in execution_generator: yield update # Repassa as atualizações do Composer para a UI if update.get('status') == 'complete': final_dna = update.get('dna') except Exception as e: logger.error(f"Maestro: Falha crítica na fase de execução do Composer. Erro: {e}", exc_info=True) yield {"status": "error", "message": f"Erro no Composer: {e}"} return # ETAPA 1.3: FINALIZAÇÃO E ATUALIZAÇÃO DO ESTADO GLOBAL # A lógica para atualizar o AducDirector com o DNA final iria aqui. # self.director.update_state_from_dna(final_dna) logger.info("Maestro: Pré-Produção concluída. DNA Bruto gerado.") # O Composer3D usaria o 'final_dna' para gerar os keyframes. # Por enquanto, retornamos um placeholder para a UI. yield { "status": "pre_production_complete", "progress": 1.0, "message": "Pré-produção e planejamento concluídos!", "dna_snapshot": final_dna, "keyframe_gallery": [path for path in params.ref_paths], } def task_produce_original_movie(self, params: ProductionParams, progress_callback: Optional[Callable] = None) -> Tuple[str, List[str], GenerationState]: """Orquestra a geração do vídeo principal a partir dos keyframes via Deformes4DEngine.""" logger.info("Maestro: Iniciando tarefa de Produção do Filme Original.") self.director.update_parameters("producao", params) result_data = self.editor.generate_original_movie( full_generation_state=self.director.get_full_state_as_dict(), progress_callback=progress_callback ) self.director.update_video_state(result_data["video_data"]) final_video_path = result_data["final_path"] latent_paths = result_data["latent_paths"] final_state = self.director.get_full_state() logger.info("Maestro: Tarefa de Produção do Filme Original concluída.") return final_video_path, latent_paths, final_state def task_run_latent_upscaler(self, latent_paths: List[str], chunk_size: int, progress_callback: Optional[Callable] = None) -> Generator[Dict[str, Any], None, None]: if not latent_paths: raise ValueError("Nenhum caminho de latente fornecido para o upscale.") logger.info("--- ORQUESTRADOR: Tarefa de Upscaling de Latentes ---") run_timestamp = int(time.time()) temp_dir = os.path.join(self.director.workspace_dir, f"temp_upscaled_clips_{run_timestamp}") os.makedirs(temp_dir, exist_ok=True) final_upscaled_clip_paths = [] num_chunks = -(-len(latent_paths) // chunk_size) for i in range(num_chunks): chunk_paths = latent_paths[i * chunk_size : (i + 1) * chunk_size] if progress_callback: progress_callback(i / num_chunks, f"Upscalando & Decodificando Lote {i+1}/{num_chunks}") tensors_in_chunk = [torch.load(p, map_location=self.device) for p in chunk_paths] tensors_to_concat = [t[:, :, :-1, :, :] if j < len(tensors_in_chunk) - 1 else t for j, t in enumerate(tensors_in_chunk)] sub_group_latent = torch.cat(tensors_to_concat, dim=2) del tensors_in_chunk, tensors_to_concat gc.collect() torch.cuda.empty_cache() upscaled_latent_chunk = latent_enhancer_specialist_singleton.upscale(sub_group_latent) pixel_tensor = vae_manager_singleton.decode(upscaled_latent_chunk) current_clip_path = os.path.join(temp_dir, f"upscaled_clip_{i:04d}.mp4") self.editor._save_video_from_tensor(pixel_tensor, current_clip_path, fps=24) final_upscaled_clip_paths.append(current_clip_path) del sub_group_latent, upscaled_latent_chunk, pixel_tensor gc.collect() torch.cuda.empty_cache() yield {"progress": (i + 1) / num_chunks} final_video_path = os.path.join(self.director.workspace_dir, f"upscaled_movie_{run_timestamp}.mp4") video_encode_tool_singleton.concatenate_videos(final_upscaled_clip_paths, final_video_path, self.director.workspace_dir) shutil.rmtree(temp_dir) logger.info(f"Upscaling de latentes completo! Vídeo final em: {final_video_path}") yield {"final_path": final_video_path} def task_run_hd_mastering(self, source_video_path: str, steps: int, prompt: str, progress_callback: Optional[Callable] = None) -> Generator[Dict[str, Any], None, None]: logger.info(f"--- ORQUESTRADOR: Tarefa de Masterização HD com SeedVR ---") run_timestamp = int(time.time()) output_path = os.path.join(self.director.workspace_dir, f"hd_mastered_movie_{run_timestamp}.mp4") # O yield from delega a geração de updates para o manager yield from seedvr_manager_singleton.process_video( input_video_path=source_video_path, output_video_path=output_path, prompt=prompt, steps=steps, progress_callback=progress_callback ) def task_run_audio_generation(self, source_video_path: str, audio_prompt: str, progress_callback: Optional[Callable] = None) -> Generator[Dict[str, Any], None, None]: logger.info(f"--- ORQUESTRADOR: Tarefa de Geração de Áudio ---") if progress_callback: progress_callback(0.1, "Preparando para geração de áudio...") run_timestamp = int(time.time()) source_name = Path(source_video_path).stem output_path = os.path.join(self.director.workspace_dir, f"{source_name}_with_audio_{run_timestamp}.mp4") try: result = subprocess.run( ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", source_video_path], capture_output=True, text=True, check=True ) duration = float(result.stdout.strip()) except Exception as e: logger.error(f"Não foi possível obter a duração do vídeo '{source_video_path}': {e}", exc_info=True) yield {"error": "Falha ao obter duração do vídeo."} return if progress_callback: progress_callback(0.5, "Gerando trilha de áudio...") final_path = mmaudio_manager_singleton.generate_audio_for_video( video_path=source_video_path, prompt=audio_prompt, duration_seconds=duration, output_path_override=output_path ) logger.info(f"Geração de áudio completa! Vídeo com áudio em: {final_path}") if progress_callback: progress_callback(1.0, "Geração de áudio completa!") yield {"final_path": final_path}