Spaces:
Build error
Build error
Update engineers/deformes4D.py
Browse files- engineers/deformes4D.py +46 -16
engineers/deformes4D.py
CHANGED
|
@@ -1,13 +1,13 @@
|
|
| 1 |
-
# engineers/
|
| 2 |
#
|
| 3 |
# Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos
|
| 4 |
#
|
| 5 |
-
# Version: 2.
|
| 6 |
#
|
| 7 |
# This file contains the Deformes4D Engine, which acts as the primary "Editor" or
|
| 8 |
-
# "Film Crew" specialist within the ADUC-SDR architecture. It
|
| 9 |
-
#
|
| 10 |
-
#
|
| 11 |
|
| 12 |
import os
|
| 13 |
import time
|
|
@@ -22,9 +22,9 @@ import subprocess
|
|
| 22 |
import gc
|
| 23 |
import shutil
|
| 24 |
from pathlib import Path
|
| 25 |
-
from typing import List, Tuple, Generator, Dict, Any
|
| 26 |
-
from aduc_types import LatentConditioningItem
|
| 27 |
|
|
|
|
| 28 |
from managers.ltx_manager import ltx_manager_singleton
|
| 29 |
from managers.latent_enhancer_manager import latent_enhancer_specialist_singleton
|
| 30 |
from managers.vae_manager import vae_manager_singleton
|
|
@@ -35,8 +35,6 @@ from tools.video_encode_tool import video_encode_tool_singleton
|
|
| 35 |
|
| 36 |
logger = logging.getLogger(__name__)
|
| 37 |
|
| 38 |
-
|
| 39 |
-
|
| 40 |
class Deformes4DEngine:
|
| 41 |
"""
|
| 42 |
Implements the Camera (Ψ) and Distiller (Δ) of the ADUC-SDR architecture.
|
|
@@ -59,6 +57,18 @@ class Deformes4DEngine:
|
|
| 59 |
with imageio.get_writer(path, fps=fps, codec='libx264', quality=8, output_params=['-pix_fmt', 'yuv420p']) as writer:
|
| 60 |
for frame in video_np: writer.append_data(frame)
|
| 61 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 62 |
def _preprocess_image_for_latent_conversion(self, image: Image.Image, target_resolution: tuple) -> Image.Image:
|
| 63 |
"""Resizes and fits an image to the target resolution for VAE encoding."""
|
| 64 |
if image.size != target_resolution:
|
|
@@ -80,9 +90,6 @@ class Deformes4DEngine:
|
|
| 80 |
video_resolution: int, use_continuity_director: bool,
|
| 81 |
guidance_scale: float, stg_scale: float, num_inference_steps: int,
|
| 82 |
progress: gr.Progress = gr.Progress()):
|
| 83 |
-
"""
|
| 84 |
-
Step 3: Production. Generates the original master video from keyframes.
|
| 85 |
-
"""
|
| 86 |
FPS = 24
|
| 87 |
FRAMES_PER_LATENT_CHUNK = 8
|
| 88 |
LATENT_PROCESSING_CHUNK_SIZE = 4
|
|
@@ -96,6 +103,7 @@ class Deformes4DEngine:
|
|
| 96 |
total_frames_brutos = self._quantize_to_multiple(int(seconds_per_fragment * FPS), FRAMES_PER_LATENT_CHUNK)
|
| 97 |
frames_a_podar = self._quantize_to_multiple(int(total_frames_brutos * (trim_percent / 100)), FRAMES_PER_LATENT_CHUNK)
|
| 98 |
latents_a_podar = frames_a_podar // FRAMES_PER_LATENT_CHUNK
|
|
|
|
| 99 |
|
| 100 |
DEJAVU_FRAME_TARGET = frames_a_podar - 1 if frames_a_podar > 0 else 0
|
| 101 |
DESTINATION_FRAME_TARGET = total_frames_brutos - 1
|
|
@@ -122,6 +130,7 @@ class Deformes4DEngine:
|
|
| 122 |
decision = gemini_manager_singleton.get_cinematic_decision(global_prompt, story_history, past_keyframe_path, start_keyframe_path, destination_keyframe_path, storyboard[i - 1] if i > 0 else "The beginning.", storyboard[i], future_story_prompt)
|
| 123 |
transition_type, motion_prompt = decision["transition_type"], decision["motion_prompt"]
|
| 124 |
story_history += f"\n- Act {fragment_index}: {motion_prompt}"
|
|
|
|
| 125 |
conditioning_items = []
|
| 126 |
if eco_latent_for_next_loop is None:
|
| 127 |
img_start = self._preprocess_image_for_latent_conversion(Image.open(start_keyframe_path).convert("RGB"), target_resolution_tuple)
|
|
@@ -129,21 +138,43 @@ class Deformes4DEngine:
|
|
| 129 |
else:
|
| 130 |
conditioning_items.append(LatentConditioningItem(eco_latent_for_next_loop, 0, 1.0))
|
| 131 |
conditioning_items.append(LatentConditioningItem(dejavu_latent_for_next_loop, DEJAVU_FRAME_TARGET, handler_strength))
|
| 132 |
-
|
| 133 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 134 |
current_ltx_params = {**base_ltx_params, "motion_prompt": motion_prompt}
|
| 135 |
logger.info(f"Calling LTX to generate video latents for fragment {fragment_index} ({total_frames_brutos} frames)...")
|
| 136 |
latents_brutos, _ = self._generate_latent_tensor_internal(conditioning_items, current_ltx_params, target_resolution_tuple, total_frames_brutos)
|
| 137 |
num_latent_frames = latents_brutos.shape[2]
|
| 138 |
logger.info(f"LTX responded with a latent tensor of shape {latents_brutos.shape}, representing ~{num_latent_frames * 8 + 1} video frames at {FPS} FPS.")
|
|
|
|
| 139 |
last_trim = latents_brutos[:, :, -(latents_a_podar+1):, :, :].clone()
|
| 140 |
eco_latent_for_next_loop = last_trim[:, :, :2, :, :].clone()
|
| 141 |
dejavu_latent_for_next_loop = last_trim[:, :, -1:, :, :].clone()
|
| 142 |
latents_video = latents_brutos[:, :, :-(latents_a_podar-1), :, :].clone()
|
| 143 |
latents_video = latents_video[:, :, 1:, :, :]
|
| 144 |
del last_trim, latents_brutos; gc.collect(); torch.cuda.empty_cache()
|
|
|
|
| 145 |
if transition_type == "cut":
|
| 146 |
eco_latent_for_next_loop, dejavu_latent_for_next_loop = None, None
|
|
|
|
| 147 |
cpu_latent = latents_video.cpu()
|
| 148 |
latent_path = os.path.join(temp_latent_dir, f"latent_fragment_{i:04d}.pt")
|
| 149 |
torch.save(cpu_latent, latent_path)
|
|
@@ -166,7 +197,6 @@ class Deformes4DEngine:
|
|
| 166 |
logger.info(f"Batch {i+1} concatenated. Latent shape: {sub_group_latent.shape}")
|
| 167 |
base_name = f"clip_{i:04d}_{run_timestamp}"
|
| 168 |
current_clip_path = os.path.join(temp_video_clips_dir, f"{base_name}.mp4")
|
| 169 |
-
|
| 170 |
pixel_tensor = vae_manager_singleton.decode(sub_group_latent)
|
| 171 |
self.save_video_from_tensor(pixel_tensor, current_clip_path, fps=FPS)
|
| 172 |
del pixel_tensor, sub_group_latent; gc.collect(); torch.cuda.empty_cache()
|
|
@@ -273,7 +303,7 @@ class Deformes4DEngine:
|
|
| 273 |
def _generate_latent_tensor_internal(self, conditioning_items, ltx_params, target_resolution, total_frames_to_generate):
|
| 274 |
"""Internal helper to call the LTX manager."""
|
| 275 |
final_ltx_params = {**ltx_params, 'width': target_resolution[0], 'height': target_resolution[1], 'video_total_frames': total_frames_to_generate, 'video_fps': 24, 'current_fragment_index': int(time.time()), 'conditioning_items_data': conditioning_items}
|
| 276 |
-
return
|
| 277 |
|
| 278 |
def _quantize_to_multiple(self, n, m):
|
| 279 |
"""Helper to round n to the nearest multiple of m."""
|
|
|
|
| 1 |
+
# engineers/deformes4D.py
|
| 2 |
#
|
| 3 |
# Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos
|
| 4 |
#
|
| 5 |
+
# Version: 2.2.0
|
| 6 |
#
|
| 7 |
# This file contains the Deformes4D Engine, which acts as the primary "Editor" or
|
| 8 |
+
# "Film Crew" specialist within the ADUC-SDR architecture. It implements the Camera (Ψ)
|
| 9 |
+
# and Distiller (Δ) concepts. Its core responsibilities include the low-level orchestration
|
| 10 |
+
# of video fragment generation, latent manipulation, and final rendering/post-production tasks.
|
| 11 |
|
| 12 |
import os
|
| 13 |
import time
|
|
|
|
| 22 |
import gc
|
| 23 |
import shutil
|
| 24 |
from pathlib import Path
|
| 25 |
+
from typing import List, Tuple, Generator, Dict, Any
|
|
|
|
| 26 |
|
| 27 |
+
from aduc_types import LatentConditioningItem
|
| 28 |
from managers.ltx_manager import ltx_manager_singleton
|
| 29 |
from managers.latent_enhancer_manager import latent_enhancer_specialist_singleton
|
| 30 |
from managers.vae_manager import vae_manager_singleton
|
|
|
|
| 35 |
|
| 36 |
logger = logging.getLogger(__name__)
|
| 37 |
|
|
|
|
|
|
|
| 38 |
class Deformes4DEngine:
|
| 39 |
"""
|
| 40 |
Implements the Camera (Ψ) and Distiller (Δ) of the ADUC-SDR architecture.
|
|
|
|
| 57 |
with imageio.get_writer(path, fps=fps, codec='libx264', quality=8, output_params=['-pix_fmt', 'yuv420p']) as writer:
|
| 58 |
for frame in video_np: writer.append_data(frame)
|
| 59 |
|
| 60 |
+
def read_video_to_tensor(self, video_path: str) -> torch.Tensor:
|
| 61 |
+
"""Reads a video file and converts it into a pixel-space tensor."""
|
| 62 |
+
with imageio.get_reader(video_path, 'ffmpeg') as reader:
|
| 63 |
+
frames = [frame for frame in reader]
|
| 64 |
+
|
| 65 |
+
frames_np = np.stack(frames, axis=0).astype(np.float32) / 255.0
|
| 66 |
+
# (F, H, W, C) -> (C, F, H, W)
|
| 67 |
+
tensor = torch.from_numpy(frames_np).permute(3, 0, 1, 2)
|
| 68 |
+
tensor = tensor.unsqueeze(0) # (B, C, F, H, W)
|
| 69 |
+
tensor = (tensor * 2.0) - 1.0 # Normalize to [-1, 1]
|
| 70 |
+
return tensor.to(self.device)
|
| 71 |
+
|
| 72 |
def _preprocess_image_for_latent_conversion(self, image: Image.Image, target_resolution: tuple) -> Image.Image:
|
| 73 |
"""Resizes and fits an image to the target resolution for VAE encoding."""
|
| 74 |
if image.size != target_resolution:
|
|
|
|
| 90 |
video_resolution: int, use_continuity_director: bool,
|
| 91 |
guidance_scale: float, stg_scale: float, num_inference_steps: int,
|
| 92 |
progress: gr.Progress = gr.Progress()):
|
|
|
|
|
|
|
|
|
|
| 93 |
FPS = 24
|
| 94 |
FRAMES_PER_LATENT_CHUNK = 8
|
| 95 |
LATENT_PROCESSING_CHUNK_SIZE = 4
|
|
|
|
| 103 |
total_frames_brutos = self._quantize_to_multiple(int(seconds_per_fragment * FPS), FRAMES_PER_LATENT_CHUNK)
|
| 104 |
frames_a_podar = self._quantize_to_multiple(int(total_frames_brutos * (trim_percent / 100)), FRAMES_PER_LATENT_CHUNK)
|
| 105 |
latents_a_podar = frames_a_podar // FRAMES_PER_LATENT_CHUNK
|
| 106 |
+
total_latent_frames = total_frames_brutos // FRAMES_PER_LATENT_CHUNK
|
| 107 |
|
| 108 |
DEJAVU_FRAME_TARGET = frames_a_podar - 1 if frames_a_podar > 0 else 0
|
| 109 |
DESTINATION_FRAME_TARGET = total_frames_brutos - 1
|
|
|
|
| 130 |
decision = gemini_manager_singleton.get_cinematic_decision(global_prompt, story_history, past_keyframe_path, start_keyframe_path, destination_keyframe_path, storyboard[i - 1] if i > 0 else "The beginning.", storyboard[i], future_story_prompt)
|
| 131 |
transition_type, motion_prompt = decision["transition_type"], decision["motion_prompt"]
|
| 132 |
story_history += f"\n- Act {fragment_index}: {motion_prompt}"
|
| 133 |
+
|
| 134 |
conditioning_items = []
|
| 135 |
if eco_latent_for_next_loop is None:
|
| 136 |
img_start = self._preprocess_image_for_latent_conversion(Image.open(start_keyframe_path).convert("RGB"), target_resolution_tuple)
|
|
|
|
| 138 |
else:
|
| 139 |
conditioning_items.append(LatentConditioningItem(eco_latent_for_next_loop, 0, 1.0))
|
| 140 |
conditioning_items.append(LatentConditioningItem(dejavu_latent_for_next_loop, DEJAVU_FRAME_TARGET, handler_strength))
|
| 141 |
+
|
| 142 |
+
if transition_type == "cut":
|
| 143 |
+
logger.info(f"Cinematic Director chose a 'cut'. Creating FFmpeg transition bridge...")
|
| 144 |
+
bridge_duration_seconds = FRAMES_PER_LATENT_CHUNK / FPS
|
| 145 |
+
bridge_video_path = video_encode_tool_singleton.create_transition_bridge(
|
| 146 |
+
start_image_path=start_keyframe_path, end_image_path=destination_keyframe_path,
|
| 147 |
+
duration=bridge_duration_seconds, fps=FPS, target_resolution=target_resolution_tuple,
|
| 148 |
+
workspace_dir=self.workspace_dir
|
| 149 |
+
)
|
| 150 |
+
bridge_pixel_tensor = self.read_video_to_tensor(bridge_video_path)
|
| 151 |
+
bridge_latent_tensor = vae_manager_singleton.encode(bridge_pixel_tensor)
|
| 152 |
+
final_fade_latent = bridge_latent_tensor[:, :, -1:, :, :]
|
| 153 |
+
conditioning_items.append(LatentConditioningItem(final_fade_latent, total_latent_frames - 1, 0.95))
|
| 154 |
+
img_dest = self._preprocess_image_for_latent_conversion(Image.open(destination_keyframe_path).convert("RGB"), target_resolution_tuple)
|
| 155 |
+
conditioning_items.append(LatentConditioningItem(self.pil_to_latent(img_dest), DESTINATION_FRAME_TARGET, destination_convergence_strength * 0.5))
|
| 156 |
+
del bridge_pixel_tensor, bridge_latent_tensor, final_fade_latent
|
| 157 |
+
if os.path.exists(bridge_video_path): os.remove(bridge_video_path)
|
| 158 |
+
else:
|
| 159 |
+
img_dest = self._preprocess_image_for_latent_conversion(Image.open(destination_keyframe_path).convert("RGB"), target_resolution_tuple)
|
| 160 |
+
conditioning_items.append(LatentConditioningItem(self.pil_to_latent(img_dest), DESTINATION_FRAME_TARGET, destination_convergence_strength))
|
| 161 |
+
|
| 162 |
current_ltx_params = {**base_ltx_params, "motion_prompt": motion_prompt}
|
| 163 |
logger.info(f"Calling LTX to generate video latents for fragment {fragment_index} ({total_frames_brutos} frames)...")
|
| 164 |
latents_brutos, _ = self._generate_latent_tensor_internal(conditioning_items, current_ltx_params, target_resolution_tuple, total_frames_brutos)
|
| 165 |
num_latent_frames = latents_brutos.shape[2]
|
| 166 |
logger.info(f"LTX responded with a latent tensor of shape {latents_brutos.shape}, representing ~{num_latent_frames * 8 + 1} video frames at {FPS} FPS.")
|
| 167 |
+
|
| 168 |
last_trim = latents_brutos[:, :, -(latents_a_podar+1):, :, :].clone()
|
| 169 |
eco_latent_for_next_loop = last_trim[:, :, :2, :, :].clone()
|
| 170 |
dejavu_latent_for_next_loop = last_trim[:, :, -1:, :, :].clone()
|
| 171 |
latents_video = latents_brutos[:, :, :-(latents_a_podar-1), :, :].clone()
|
| 172 |
latents_video = latents_video[:, :, 1:, :, :]
|
| 173 |
del last_trim, latents_brutos; gc.collect(); torch.cuda.empty_cache()
|
| 174 |
+
|
| 175 |
if transition_type == "cut":
|
| 176 |
eco_latent_for_next_loop, dejavu_latent_for_next_loop = None, None
|
| 177 |
+
|
| 178 |
cpu_latent = latents_video.cpu()
|
| 179 |
latent_path = os.path.join(temp_latent_dir, f"latent_fragment_{i:04d}.pt")
|
| 180 |
torch.save(cpu_latent, latent_path)
|
|
|
|
| 197 |
logger.info(f"Batch {i+1} concatenated. Latent shape: {sub_group_latent.shape}")
|
| 198 |
base_name = f"clip_{i:04d}_{run_timestamp}"
|
| 199 |
current_clip_path = os.path.join(temp_video_clips_dir, f"{base_name}.mp4")
|
|
|
|
| 200 |
pixel_tensor = vae_manager_singleton.decode(sub_group_latent)
|
| 201 |
self.save_video_from_tensor(pixel_tensor, current_clip_path, fps=FPS)
|
| 202 |
del pixel_tensor, sub_group_latent; gc.collect(); torch.cuda.empty_cache()
|
|
|
|
| 303 |
def _generate_latent_tensor_internal(self, conditioning_items, ltx_params, target_resolution, total_frames_to_generate):
|
| 304 |
"""Internal helper to call the LTX manager."""
|
| 305 |
final_ltx_params = {**ltx_params, 'width': target_resolution[0], 'height': target_resolution[1], 'video_total_frames': total_frames_to_generate, 'video_fps': 24, 'current_fragment_index': int(time.time()), 'conditioning_items_data': conditioning_items}
|
| 306 |
+
return ltx_manager_singleton.generate_latent_fragment(**final_ltx_params)
|
| 307 |
|
| 308 |
def _quantize_to_multiple(self, n, m):
|
| 309 |
"""Helper to round n to the nearest multiple of m."""
|