Sam
Update aduc_framework/orchestrator.py
0f27c29 verified
Raw
History Blame
11.8 kB
# aduc_framework/orchestrator.py
#
# Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos
#
# Versão 8.0.0 (Strategic Maestro with Full Callback Chaining)
#
# O Orquestrador atua como o Maestro (Γ). Ele delega a criação do plano
# de pré-produção ao Planner2D e a execução ao Composer. Uma vez que o DNA
# da produção é gerado, ele o utiliza para dirigir as fases de produção
# e pós-produção, retransmitindo todos os eventos de progresso para a UI.
import logging
import os
import shutil
import time
import gc
import subprocess
from pathlib import Path
from typing import Any, Callable, Dict, Generator, List, Optional, Tuple
import torch
from PIL import Image, ImageOps
from .director import AducDirector
from .engineers.composer import composer_singleton
from .engineers.deformes4D import Deformes4DEngine
from .engineers.planner_2d import planner_2d_singleton
from .managers import (
latent_enhancer_specialist_singleton,
mmaudio_manager_singleton,
seedvr_manager_singleton,
vae_manager_singleton,
)
from .tools.video_encode_tool import video_encode_tool_singleton
from .types import GenerationState, PreProductionParams, ProductionParams
logger = logging.getLogger(__name__)
ProgressCallback = Optional[Callable[[float, str], None]]
class AducOrchestrator:
"""
Implementa o Maestro (Γ), a camada de orquestração central.
Coordena os especialistas, gerencia o estado da produção através do Diretor,
e delega as fases de planejamento e execução.
"""
def __init__(self, workspace_dir: str):
self.director = AducDirector(workspace_dir)
self.editor = Deformes4DEngine()
self.editor.initialize(workspace_dir)
self.device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(
"ADUC Maestro (Framework Core) pronto para reger a orquestra de especialistas."
)
def get_current_state(self) -> GenerationState:
"""Retorna o estado de geração atual, gerenciado pelo AducDirector."""
return self.director.get_full_state()
def process_image_for_story(
self, image_path: str, size: int, filename: str
) -> str:
"""Processa e padroniza uma imagem de referência para o formato quadrado."""
img = Image.open(image_path).convert("RGB")
img_square = ImageOps.fit(img, (size, size), Image.Resampling.LANCZOS)
processed_path = os.path.join(self.director.workspace_dir, filename)
img_square.save(processed_path)
logger.info(f"Imagem de referência processada e salva em: {processed_path}")
return processed_path
def task_pre_production(
self, params: PreProductionParams, progress_callback: Optional[Callable] = None
) -> Generator[Dict[str, Any], None, None]:
"""
Orquestra a pré-produção: Planner2D cria o plano, Composer o executa.
Retransmite as atualizações de progresso para a camada superior.
"""
logger.info("Maestro: Iniciando Fase de Pré-Produção.")
self.director.update_parameters("pre_producao", params)
# ETAPA 1.1: PLANEJAMENTO ESTRATÉGICO
execution_plan = []
try:
plan_generator = planner_2d_singleton.generate_execution_plan(
global_prompt=params.prompt,
num_scenes=params.num_scenes,
max_duration_per_act=params.duration_per_fragment,
callback=progress_callback,
)
for update in plan_generator:
yield update # Repassa as atualizações do Planner para a UI
if update.get("status") == "planning_complete":
execution_plan = update.get("plan", [])
except Exception as e:
logger.error(
f"Maestro: Falha crítica na fase de planejamento. Erro: {e}",
exc_info=True,
)
yield {"status": "error", "message": f"Erro no Planner2D: {e}"}
return
if not execution_plan:
error_message = "Maestro: Plano de execução retornado pelo Planner2D está vazio."
logger.error(error_message)
yield {"status": "error", "message": error_message}
return
# ETAPA 1.2: EXECUÇÃO DO PLANO
initial_data = {
"global_prompt": params.prompt,
"user_media_paths": params.ref_paths,
}
final_dna = {}
try:
execution_generator = composer_singleton.execute_plan(
execution_plan, initial_data, callback=progress_callback
)
for update in execution_generator:
yield update # Repassa as atualizações do Composer para a UI
if update.get("status") == "complete":
final_dna = update.get("dna")
except Exception as e:
logger.error(
f"Maestro: Falha crítica na fase de execução do Composer. Erro: {e}",
exc_info=True,
)
yield {"status": "error", "message": f"Erro no Composer: {e}"}
return
# ETAPA 1.3: FINALIZAÇÃO E ATUALIZAÇÃO DO ESTADO GLOBAL
# A lógica para atualizar o AducDirector com o DNA final iria aqui.
# self.director.update_state_from_dna(final_dna)
logger.info("Maestro: Pré-Produção concluída. DNA Bruto gerado.")
yield {
"status": "pre_production_complete",
"progress": 1.0,
"message": "Pré-produção e planejamento concluídos!",
"dna_snapshot": final_dna,
"keyframe_gallery": [path for path in params.ref_paths],
}
def task_produce_original_movie(
self, params: ProductionParams, progress_callback: Optional[Callable] = None
) -> Tuple[str, List[str], GenerationState]:
"""Orquestra a geração do vídeo principal a partir do DNA e keyframes."""
logger.info("Maestro: Iniciando tarefa de Produção do Filme Original.")
self.director.update_parameters("producao", params)
# A implementação futura do Composer3D e 4D lerá o DNA do self.director
result_data = self.editor.generate_original_movie(
full_generation_state=self.director.get_full_state_as_dict(),
progress_callback=progress_callback,
)
self.director.update_video_state(result_data["video_data"])
final_video_path = result_data["final_path"]
latent_paths = result_data["latent_paths"]
final_state = self.director.get_full_state()
logger.info("Maestro: Tarefa de Produção do Filme Original concluída.")
return final_video_path, latent_paths, final_state
def task_run_latent_upscaler(
self,
latent_paths: List[str],
chunk_size: int,
progress_callback: Optional[Callable] = None,
) -> Generator[Dict[str, Any], None, None]:
if not latent_paths:
raise ValueError("Nenhum caminho de latente fornecido para o upscale.")
logger.info("--- ORQUESTRADOR: Tarefa de Upscaling de Latentes ---")
run_timestamp = int(time.time())
temp_dir = os.path.join(
self.director.workspace_dir, f"temp_upscaled_clips_{run_timestamp}"
)
os.makedirs(temp_dir, exist_ok=True)
final_upscaled_clip_paths = []
num_chunks = -(-len(latent_paths) // chunk_size)
for i in range(num_chunks):
chunk_paths = latent_paths[i * chunk_size : (i + 1) * chunk_size]
if progress_callback:
progress_callback(
i / num_chunks, f"Upscalando & Decodificando Lote {i+1}/{num_chunks}"
)
tensors_in_chunk = [torch.load(p, map_location=self.device) for p in chunk_paths]
tensors_to_concat = [t[:, :, :-1, :, :] if j < len(tensors_in_chunk) - 1 else t for j, t in enumerate(tensors_in_chunk)]
sub_group_latent = torch.cat(tensors_to_concat, dim=2)
del tensors_in_chunk, tensors_to_concat
gc.collect()
torch.cuda.empty_cache()
upscaled_latent_chunk = latent_enhancer_specialist_singleton.upscale(sub_group_latent)
pixel_tensor = vae_manager_singleton.decode(upscaled_latent_chunk)
current_clip_path = os.path.join(temp_dir, f"upscaled_clip_{i:04d}.mp4")
self.editor._save_video_from_tensor(pixel_tensor, current_clip_path, fps=24)
final_upscaled_clip_paths.append(current_clip_path)
del sub_group_latent, upscaled_latent_chunk, pixel_tensor
gc.collect()
torch.cuda.empty_cache()
yield {"progress": (i + 1) / num_chunks}
final_video_path = os.path.join(
self.director.workspace_dir, f"upscaled_movie_{run_timestamp}.mp4"
)
video_encode_tool_singleton.concatenate_videos(
final_upscaled_clip_paths, final_video_path, self.director.workspace_dir
)
shutil.rmtree(temp_dir)
logger.info(f"Upscaling de latentes completo! Vídeo final em: {final_video_path}")
yield {"final_path": final_video_path}
def task_run_hd_mastering(
self,
source_video_path: str,
steps: int,
prompt: str,
progress_callback: Optional[Callable] = None,
) -> Generator[Dict[str, Any], None, None]:
logger.info(f"--- ORQUESTRADOR: Tarefa de Masterização HD com SeedVR ---")
run_timestamp = int(time.time())
output_path = os.path.join(
self.director.workspace_dir, f"hd_mastered_movie_{run_timestamp}.mp4"
)
yield from seedvr_manager_singleton.process_video(
input_video_path=source_video_path,
output_video_path=output_path,
prompt=prompt,
steps=steps,
progress_callback=progress_callback,
)
def task_run_audio_generation(
self,
source_video_path: str,
audio_prompt: str,
progress_callback: Optional[Callable] = None,
) -> Generator[Dict[str, Any], None, None]:
logger.info(f"--- ORQUESTRADOR: Tarefa de Geração de Áudio ---")
if progress_callback:
progress_callback(0.1, "Preparando para geração de áudio...")
run_timestamp = int(time.time())
source_name = Path(source_video_path).stem
output_path = os.path.join(
self.director.workspace_dir, f"{source_name}_with_audio_{run_timestamp}.mp4"
)
try:
result = subprocess.run(
[
"ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", source_video_path,
],
capture_output=True, text=True, check=True,
)
duration = float(result.stdout.strip())
except Exception as e:
logger.error(
f"Não foi possível obter a duração do vídeo '{source_video_path}': {e}",
exc_info=True,
)
yield {"error": "Falha ao obter duração do vídeo."}
return
if progress_callback:
progress_callback(0.5, "Gerando trilha de áudio...")
final_path = mmaudio_manager_singleton.generate_audio_for_video(
video_path=source_video_path,
prompt=audio_prompt,
duration_seconds=duration,
output_path_override=output_path,
)
logger.info(f"Geração de áudio completa! Vídeo com áudio em: {final_path}")
if progress_callback:
progress_callback(1.0, "Geração de áudio completa!")
yield {"final_path": final_path}