Sam
Update aduc_framework/orchestrator.py
bddbfac verified
Raw
History Blame
10.5 kB
# aduc_framework/orchestrator.py
#
# Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos
#
# Versão 6.0.0 (Streaming Conductor)
#
# Esta versão final do orquestrador está alinhada com a arquitetura do
# Diretor Autônomo e suporta a emissão de atualizações em tempo real (yield)
# para interfaces de usuário interativas.
import logging
from typing import List, Dict, Any, Tuple, Callable, Optional, Generator
from PIL import Image, ImageOps
import os
import subprocess
import shutil
from pathlib import Path
import time
import gc
import torch
# --- Componentes Internos do Framework ---
from .director import AducDirector
from .types import GenerationState, PreProductionParams, ProductionParams
# Engenheiros de alto nível que definem a lógica do fluxo
from .engineers import deformes2d_thinker_singleton, deformes3d_engine_singleton, Deformes4DEngine
# Managers (Pools) de especialistas que executam tarefas de pós-produção
from .managers.latent_enhancer_manager import latent_enhancer_specialist_singleton
from .managers.seedvr_manager import seedvr_manager_singleton
from .managers.mmaudio_manager import mmaudio_manager_singleton
from .managers.vae_manager import vae_manager_singleton
# Ferramentas de utilidade
from .tools.video_encode_tool import video_encode_tool_singleton
logger = logging.getLogger(__name__)
ProgressCallback = Optional[Callable[[float, str], None]]
class AducOrchestrator:
"""
Implementa o Maestro (Γ), a camada de orquestração central do Aduc Framework.
Ele recebe solicitações, atualiza o estado de geração, delega tarefas para os
engenheiros e seus pools de especialistas, e retorna o estado atualizado.
"""
def __init__(self, workspace_dir: str):
self.director = AducDirector(workspace_dir)
self.editor = Deformes4DEngine()
self.editor.initialize(workspace_dir)
self.painter = deformes3d_engine_singleton
self.painter.initialize(workspace_dir)
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
logger.info("ADUC Maestro (Framework Core) pronto para reger a orquestra de especialistas.")
def get_current_state(self) -> GenerationState:
"""Retorna o estado de geração atual, útil para APIs."""
return self.director.get_full_state()
def process_image_for_story(self, image_path: str, size: int, filename: str) -> str:
"""Processa e padroniza uma imagem de referência para o formato quadrado."""
img = Image.open(image_path).convert("RGB")
img_square = ImageOps.fit(img, (size, size), Image.Resampling.LANCZOS)
processed_path = os.path.join(self.director.workspace_dir, filename)
img_square.save(processed_path)
logger.info(f"Imagem de referência processada e salva em: {processed_path}")
return processed_path
# --- ETAPA 1: PRÉ-PRODUÇÃO (STREAMING) ---
def task_pre_production(self, params: PreProductionParams, progress_callback: ProgressCallback = None) -> Generator[Dict[str, Any], None, None]:
"""
Orquestra a pré-produção, agora como um gerador que emite
atualizações de estado para a UI em tempo real.
"""
logger.info("Maestro: Iniciando tarefa de Pré-Produção.")
self.director.update_parameters("pre_producao", params)
if progress_callback: progress_callback(0.1, "Gerando roteiro inicial...")
storyboard_list = deformes2d_thinker_singleton.generate_storyboard(
prompt=params.prompt, num_keyframes=params.num_keyframes, ref_image_paths=params.ref_paths
)
self.director.update_pre_production_state(params.prompt, params.ref_paths, storyboard_list)
yield {
"storyboard": storyboard_list,
"updated_state": self.director.get_full_state().model_dump()
}
if progress_callback: progress_callback(0.2, "Entregando produção ao Diretor Autônomo...")
final_keyframes_data = []
for keyframes_update in self.painter.generate_keyframes(
generation_state=self.director.get_full_state_as_dict(),
progress_callback=progress_callback
):
self.director.update_keyframes_state(keyframes_update)
final_keyframes_data = keyframes_update
yield {
"final_keyframes": [kf["caminho_pixel"] for kf in final_keyframes_data],
"updated_state": self.director.get_full_state().model_dump()
}
logger.info("Maestro: Tarefa de Pré-Produção concluída.")
# --- ETAPA 2: PRODUÇÃO ---
def task_produce_original_movie(self, params: ProductionParams, progress_callback: ProgressCallback = None) -> Tuple[str, List[str], GenerationState]:
"""Orquestra a geração do vídeo principal a partir dos keyframes."""
logger.info("Maestro: Iniciando tarefa de Produção do Filme Original.")
self.director.update_parameters("producao", params)
result_data = self.editor.generate_original_movie(
full_generation_state=self.director.get_full_state_as_dict(),
progress_callback=progress_callback
)
self.director.update_video_state(result_data["video_data"])
final_video_path = result_data["final_path"]
latent_paths = result_data["latent_paths"]
final_state = self.director.get_full_state()
logger.info("Maestro: Tarefa de Produção do Filme Original concluída.")
return final_video_path, latent_paths, final_state
# --- ETAPA 3: PÓS-PRODUÇÃO (CADEIA DE EFEITOS) ---
def task_run_latent_upscaler(self, latent_paths: List[str], chunk_size: int, progress_callback: ProgressCallback = None) -> Generator[Dict[str, Any], None, None]:
"""Aplica upscale 2x nos latentes e os decodifica para um novo vídeo."""
if not latent_paths: raise ValueError("Nenhum caminho de latente fornecido para o upscale.")
logger.info("--- ORQUESTRADOR: Tarefa de Upscaling de Latentes ---")
run_timestamp = int(time.time())
temp_dir = os.path.join(self.director.workspace_dir, f"temp_upscaled_clips_{run_timestamp}")
os.makedirs(temp_dir, exist_ok=True)
final_upscaled_clip_paths = []
num_chunks = -(-len(latent_paths) // chunk_size)
for i in range(num_chunks):
chunk_paths = latent_paths[i * chunk_size:(i + 1) * chunk_size]
if progress_callback: progress_callback(i / num_chunks, f"Upscalando & Decodificando Lote {i+1}/{num_chunks}")
tensors_in_chunk = [torch.load(p, map_location=self.device) for p in chunk_paths]
sub_group_latent = torch.cat(tensors_in_chunk, dim=2)
upscaled_latent_chunk = latent_enhancer_specialist_singleton.upscale(sub_group_latent)
pixel_tensor = vae_manager_singleton.decode(upscaled_latent_chunk)
current_clip_path = os.path.join(temp_dir, f"upscaled_clip_{i:04d}.mp4")
# Usando o save_video do Deformes4D para consistência
self.editor._save_video_from_tensor(pixel_tensor, current_clip_path, fps=24)
final_upscaled_clip_paths.append(current_clip_path)
del tensors_in_chunk, sub_group_latent, upscaled_latent_chunk, pixel_tensor
gc.collect(); torch.cuda.empty_cache()
yield {"progress": (i + 1) / num_chunks}
final_video_path = os.path.join(self.director.workspace_dir, f"upscaled_movie_{run_timestamp}.mp4")
video_encode_tool_singleton.concatenate_videos(final_upscaled_clip_paths, final_video_path, self.director.workspace_dir)
shutil.rmtree(temp_dir)
logger.info(f"Upscaling de latentes completo! Vídeo final em: {final_video_path}")
yield {"final_path": final_video_path}
def task_run_hd_mastering(self, source_video_path: str, steps: int, prompt: str, progress_callback: ProgressCallback = None) -> Generator[Dict[str, Any], None, None]:
"""Aplica masterização em HD usando o pool de GPUs do SeedVR."""
logger.info(f"--- ORQUESTRADOR: Tarefa de Masterização HD com SeedVR ---")
run_timestamp = int(time.time())
output_path = os.path.join(self.director.workspace_dir, f"hd_mastered_movie_{run_timestamp}.mp4")
final_path = seedvr_manager_singleton.process_video(
input_video_path=source_video_path,
output_video_path=output_path,
prompt=prompt,
steps=steps
)
logger.info(f"Masterização HD completa! Vídeo final em: {final_path}")
yield {"final_path": final_path}
def task_run_audio_generation(self, source_video_path: str, audio_prompt: str, progress_callback: ProgressCallback = None) -> Generator[Dict[str, Any], None, None]:
"""Gera e adiciona áudio ao vídeo usando o pool de GPUs do MMAudio."""
logger.info(f"--- ORQUESTRADOR: Tarefa de Geração de Áudio ---")
if progress_callback: progress_callback(0.1, "Preparando para geração de áudio...")
run_timestamp = int(time.time())
source_name = Path(source_video_path).stem
output_path = os.path.join(self.director.workspace_dir, f"{source_name}_with_audio_{run_timestamp}.mp4")
try:
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", source_video_path],
capture_output=True, text=True, check=True
)
duration = float(result.stdout.strip())
except Exception as e:
logger.error(f"Não foi possível obter a duração do vídeo '{source_video_path}': {e}", exc_info=True)
yield {"error": "Falha ao obter duração do vídeo."}
return
if progress_callback: progress_callback(0.5, "Gerando trilha de áudio...")
final_path = mmaudio_manager_singleton.generate_audio_for_video(
video_path=source_video_path,
prompt=audio_prompt,
duration_seconds=duration,
output_path_override=output_path
)
logger.info(f"Geração de áudio completa! Vídeo com áudio em: {final_path}")
if progress_callback: progress_callback(1.0, "Geração de áudio completa!")
yield {"final_path": final_path}