import os import subprocess import sys # Disable torch.compile / dynamo before any torch import os.environ["TORCH_COMPILE_DISABLE"] = "1" os.environ["TORCHDYNAMO_DISABLE"] = "1" # Install xformers for memory-efficient attention subprocess.run([sys.executable, "-m", "pip", "install", "xformers==0.0.32.post2", "--no-build-isolation"], check=False) # Install video preprocessing dependencies subprocess.run([sys.executable, "-m", "pip", "install", "imageio[ffmpeg]", "scikit-image", "opencv-python-headless", "decord", "num2words"], check=False) # Ensure num2words is installed (required by SmolVLMProcessor) subprocess.run([sys.executable, "-m", "pip", "install", "num2words"], check=True) # Reinstall torchaudio to match the torch CUDA version on this space. _tv = subprocess.run([sys.executable, "-c", "import torch; print(torch.__version__)"], capture_output=True, text=True) if _tv.returncode == 0: _full_ver = _tv.stdout.strip() _cuda_suffix = _full_ver.split("+")[-1] if "+" in _full_ver else "cu124" _base_ver = _full_ver.split("+")[0] print(f"Detected torch {_full_ver}, reinstalling matching torchaudio...") subprocess.run([ sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", f"torchaudio=={_base_ver}", "--index-url", f"https://download.pytorch.org/whl/{_cuda_suffix}", ], check=False) # Clone LTX-2 repo at a pinned commit and install packages LTX_REPO_URL = "https://github.com/Lightricks/LTX-2.git" LTX_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LTX-2") LTX_COMMIT = "ae855f8538843825f9015a419cf4ba5edaf5eec2" if os.path.exists(LTX_REPO_DIR): print(f"Removing existing repo at {LTX_REPO_DIR}...") subprocess.run(["rm", "-rf", LTX_REPO_DIR], check=True) print(f"Cloning {LTX_REPO_URL}...") subprocess.run(["git", "clone", LTX_REPO_URL, LTX_REPO_DIR], check=True) print(f"Checking out commit {LTX_COMMIT}...") subprocess.run(["git", "-C", LTX_REPO_DIR, "checkout", LTX_COMMIT], check=True) print("Installing ltx-core and ltx-pipelines from pinned repo commit...") subprocess.run( [ sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", "-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-core"), "-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines"), ], check=True, ) sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines", "src")) sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-core", "src")) import logging import random import tempfile from pathlib import Path import torch torch._dynamo.config.suppress_errors = True torch._dynamo.config.disable = True import spaces import gradio as gr import numpy as np from huggingface_hub import hf_hub_download, snapshot_download from safetensors import safe_open from ltx_core.components.diffusion_steps import EulerDiffusionStep from ltx_core.components.noisers import GaussianNoiser from ltx_core.conditioning import ( ConditioningItem, ConditioningItemAttentionStrengthWrapper, VideoConditionByReferenceLatent, ) from ltx_core.loader import LoraPathStrengthAndSDOps from ltx_core.model.audio_vae import decode_audio as vae_decode_audio from ltx_core.model.audio_vae import encode_audio as vae_encode_audio from ltx_core.model.upsampler import upsample_video from ltx_core.model.video_vae import TilingConfig, VideoEncoder, get_video_chunks_number from ltx_core.model.video_vae import decode_video as vae_decode_video from ltx_core.quantization import QuantizationPolicy from ltx_core.types import Audio, AudioLatentShape, LatentState, VideoLatentShape, VideoPixelShape from ltx_pipelines.utils import ModelLedger, euler_denoising_loop from ltx_pipelines.utils.args import ImageConditioningInput from ltx_pipelines.utils.constants import DISTILLED_SIGMA_VALUES, STAGE_2_DISTILLED_SIGMA_VALUES from ltx_pipelines.utils.helpers import ( assert_resolution, cleanup_memory, combined_image_conditionings, denoise_audio_video, denoise_video_only, encode_prompts, get_device, simple_denoising_func, ) from ltx_pipelines.utils.media_io import ( decode_audio_from_file, encode_video, load_video_conditioning, ) from ltx_pipelines.utils.types import PipelineComponents # Force-patch xformers attention into the LTX attention module. from ltx_core.model.transformer import attention as _attn_mod print(f"[ATTN] Before patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}") try: from xformers.ops import memory_efficient_attention as _mea _attn_mod.memory_efficient_attention = _mea print(f"[ATTN] After patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}") except Exception as e: print(f"[ATTN] xformers patch FAILED: {type(e).__name__}: {e}") logging.getLogger().setLevel(logging.INFO) # ───────────────────────────────────────────────────────────────────────────── # Video Preprocessing: Letterboxing / Outpainting preparation # ───────────────────────────────────────────────────────────────────────────── import imageio import cv2 from PIL import Image def load_video_frames(video_path: str) -> list[np.ndarray]: """Load video frames as list of HWC uint8 numpy arrays.""" frames = [] with imageio.get_reader(video_path) as reader: for frame in reader: frames.append(frame) return frames def write_video_mp4(frames: list[np.ndarray], fps: float, out_path: str) -> str: """Write uint8 HWC frames to mp4.""" with imageio.get_writer(out_path, fps=fps, macro_block_size=1) as writer: for fr in frames: writer.append_data(fr) return out_path def get_video_fps(video_path: str) -> float: """Get video FPS via ffprobe.""" try: result = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=r_frame_rate", "-of", "default=nw=1:nk=1", str(video_path)], capture_output=True, text=True, ) num, den = result.stdout.strip().split("/") return float(num) / float(den) except Exception: return 24.0 def get_video_dimensions(video_path: str) -> tuple[int, int]: """Return (width, height) of video.""" try: result = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=s=x:p=0", str(video_path)], capture_output=True, text=True, ) parts = result.stdout.strip().split("x") return int(parts[0]), int(parts[1]) except Exception: return 768, 512 def apply_gamma(frame: np.ndarray, gamma: float) -> np.ndarray: """Apply gamma correction to a uint8 frame. Returns uint8.""" # Normalize to [0,1], apply gamma, back to uint8 f = frame.astype(np.float32) / 255.0 f = np.power(f, 1.0 / gamma) # gamma 2.0 => exponent 0.5 => brightens return (np.clip(f, 0.0, 1.0) * 255).astype(np.uint8) def apply_inverse_gamma(frame: np.ndarray, gamma: float) -> np.ndarray: """Apply inverse gamma (darken back). gamma=2.0 forward => gamma=0.5 inverse => exponent 2.0""" f = frame.astype(np.float32) / 255.0 f = np.power(f, gamma) # gamma 2.0 => exponent 2.0 => darkens return (np.clip(f, 0.0, 1.0) * 255).astype(np.uint8) def compute_letterbox_params( src_w: int, src_h: int, target_w: int, target_h: int ) -> tuple[int, int, int, int]: """ Compute padding to place src in the center of target canvas. Returns (pad_top, pad_bottom, pad_left, pad_right). Source is scaled to fit inside target while maintaining aspect ratio, then centered with black bars. """ src_aspect = src_w / src_h target_aspect = target_w / target_h if src_aspect > target_aspect: # Source is wider — fit to width, pad top/bottom new_w = target_w new_h = int(round(target_w / src_aspect)) else: # Source is taller — fit to height, pad left/right new_h = target_h new_w = int(round(target_h * src_aspect)) pad_top = (target_h - new_h) // 2 pad_bottom = target_h - new_h - pad_top pad_left = (target_w - new_w) // 2 pad_right = target_w - new_w - pad_left return pad_top, pad_bottom, pad_left, pad_right, new_w, new_h def letterbox_frame(frame: np.ndarray, target_w: int, target_h: int) -> np.ndarray: """Resize frame to fit inside target dimensions, pad with black (0,0,0).""" src_h, src_w = frame.shape[:2] pad_top, pad_bottom, pad_left, pad_right, new_w, new_h = compute_letterbox_params( src_w, src_h, target_w, target_h ) # Resize source to fit resized = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA) # Create black canvas and paste canvas = np.zeros((target_h, target_w, 3), dtype=np.uint8) canvas[pad_top:pad_top + new_h, pad_left:pad_left + new_w] = resized return canvas def letterbox_video( video_path: str, target_w: int, target_h: int, use_gamma: bool = False, num_frames: int | None = None, burnin_frames: int = 0, ) -> tuple[str, str]: """ Letterbox a video to target dimensions with black bars. Optionally applies gamma 2.0 brightening for dark scenes. burnin_frames: extra copies of the first frame prepended to give the model time to fill the black regions before actual content starts. Returns: (letterboxed_video_path, first_frame_preview_path) """ frames = load_video_frames(video_path) if not frames: raise ValueError("No frames decoded from video") fps = get_video_fps(video_path) if num_frames is not None: # Reserve space: we need num_frames of actual content + burn-in frames = frames[:num_frames] # Prepend burn-in copies of the first frame if burnin_frames > 0: frames = [frames[0]] * burnin_frames + frames processed = [] for frame in frames: lb = letterbox_frame(frame, target_w, target_h) if use_gamma: lb = apply_gamma(lb, gamma=2.0) processed.append(lb) # Save letterboxed video out_path = tempfile.mktemp(suffix=".mp4") write_video_mp4(processed, fps=fps, out_path=out_path) # Preview is the first real content frame (after burn-in) preview_path = tempfile.mktemp(suffix=".png") Image.fromarray(processed[min(burnin_frames, len(processed) - 1)]).save(preview_path) return out_path, preview_path def apply_inverse_gamma_to_video(video_path: str) -> str: """Apply inverse gamma 0.5 to all frames of a video (undo the gamma 2.0 brightening).""" frames = load_video_frames(video_path) fps = get_video_fps(video_path) corrected = [] for frame in frames: corrected.append(apply_inverse_gamma(frame, gamma=2.0)) out_path = tempfile.mktemp(suffix=".mp4") write_video_mp4(corrected, fps=fps, out_path=out_path) return out_path def trim_video_start(video_path: str, trim_frames: int, frame_rate: float) -> str: """ Trim the first N frames (and matching audio) from the output. Since we prepended silence to the audio matching the burn-in duration, trimming both video and audio by the same amount removes the burn-in video frames AND the silence, leaving everything in sync. """ if trim_frames <= 0: return video_path trim_seconds = trim_frames / frame_rate out_path = tempfile.mktemp(suffix=".mp4") subprocess.run( ["ffmpeg", "-y", "-v", "error", "-ss", f"{trim_seconds:.4f}", "-i", video_path, "-c:v", "libx264", "-crf", "18", "-preset", "fast", "-c:a", "aac", out_path], check=True, ) return out_path # ───────────────────────────────────────────────────────────────────────────── # Helper: read reference downscale factor from IC-LoRA metadata # ───────────────────────────────────────────────────────────────────────────── def _read_lora_reference_downscale_factor(lora_path: str) -> int: try: with safe_open(lora_path, framework="pt") as f: metadata = f.metadata() or {} return int(metadata.get("reference_downscale_factor", 1)) except Exception as e: logging.warning(f"Failed to read metadata from LoRA file '{lora_path}': {e}") return 1 # ───────────────────────────────────────────────────────────────────────────── # Unified Pipeline: Distilled + Audio + IC-LoRA Video-to-Video # ───────────────────────────────────────────────────────────────────────────── class LTX23OutpaintPipeline: """ LTX-2.3 pipeline for outpainting using IC-LoRA. The outpaint LoRA is loaded separately (not fused), so: - stage_1_model_ledger: base transformer + outpaint LoRA (Stage 1) - stage_2_model_ledger: base transformer WITHOUT LoRA (Stage 2 upsampling) """ def __init__( self, distilled_checkpoint_path: str, spatial_upsampler_path: str, gemma_root: str, ic_loras: list[LoraPathStrengthAndSDOps] | None = None, device: torch.device | None = None, quantization: QuantizationPolicy | None = None, stage_1_quantization: QuantizationPolicy | None = None, reference_downscale_factor: int | None = None, ): self.device = device or get_device() self.dtype = torch.bfloat16 ic_loras = ic_loras or [] self.has_ic_lora = len(ic_loras) > 0 # Stage 1 quantization: use stage_1_quantization if provided, # otherwise fall back to the shared quantization policy. # On ZeroGPU, fp8_cast LoRA fusion requires CUDA at init time, # so we typically pass None for Stage 1 (with LoRA) to avoid the issue. s1_quant = stage_1_quantization if stage_1_quantization is not None else quantization # Stage 1: transformer with IC-LoRA (outpaint) — no fp8 quant to # avoid Triton CUDA kernel during LoRA fusion at startup self.stage_1_model_ledger = ModelLedger( dtype=self.dtype, device=self.device, checkpoint_path=distilled_checkpoint_path, spatial_upsampler_path=spatial_upsampler_path, gemma_root_path=gemma_root, loras=ic_loras, quantization=s1_quant, ) if self.has_ic_lora: # Stage 2 needs a separate transformer WITHOUT IC-LoRA # Can safely use fp8_cast here since no LoRA fusion is involved self.stage_2_model_ledger = ModelLedger( dtype=self.dtype, device=self.device, checkpoint_path=distilled_checkpoint_path, spatial_upsampler_path=spatial_upsampler_path, gemma_root_path=gemma_root, loras=[], quantization=quantization, ) else: self.stage_2_model_ledger = self.stage_1_model_ledger self.pipeline_components = PipelineComponents( dtype=self.dtype, device=self.device, ) # Reference downscale factor if reference_downscale_factor is not None: self.reference_downscale_factor = reference_downscale_factor else: self.reference_downscale_factor = 1 for lora in ic_loras: scale = _read_lora_reference_downscale_factor(lora.path) if scale != 1: if self.reference_downscale_factor not in (1, scale): raise ValueError( f"Conflicting reference_downscale_factor: " f"already {self.reference_downscale_factor}, got {scale}" ) self.reference_downscale_factor = scale logging.info(f"[Pipeline] reference_downscale_factor={self.reference_downscale_factor}") # ── Video reference conditioning (IC-LoRA) ───────────────────────────── def _create_ic_conditionings( self, video_conditioning: list[tuple[str, float]], height: int, width: int, num_frames: int, video_encoder: VideoEncoder, conditioning_strength: float = 1.0, ) -> list[ConditioningItem]: """Create IC-LoRA video reference conditioning items.""" conditionings: list[ConditioningItem] = [] scale = self.reference_downscale_factor ref_height = height // scale ref_width = width // scale for video_path, strength in video_conditioning: video = load_video_conditioning( video_path=video_path, height=ref_height, width=ref_width, frame_cap=num_frames, dtype=self.dtype, device=self.device, ) encoded_video = video_encoder(video) cond = VideoConditionByReferenceLatent( latent=encoded_video, downscale_factor=scale, strength=strength, ) if conditioning_strength < 1.0: cond = ConditioningItemAttentionStrengthWrapper( cond, attention_mask=conditioning_strength ) conditionings.append(cond) if conditionings: logging.info(f"[IC-LoRA] Added {len(conditionings)} video conditioning(s)") return conditionings # ── Main generation entry point ────────────────────────────────────── def __call__( self, prompt: str, seed: int, height: int, width: int, num_frames: int, frame_rate: float, images: list[ImageConditioningInput], audio_path: str | None = None, video_conditioning: list[tuple[str, float]] | None = None, tiling_config: TilingConfig | None = None, enhance_prompt: bool = False, conditioning_strength: float = 1.0, ): """ Generate outpainted video. The video_conditioning should contain the letterboxed video (with black bars). """ assert_resolution(height=height, width=width, is_two_stage=True) has_audio = audio_path is not None has_video_cond = bool(video_conditioning) generator = torch.Generator(device=self.device).manual_seed(seed) noiser = GaussianNoiser(generator=generator) stepper = EulerDiffusionStep() dtype = torch.bfloat16 # ── Encode text prompt ─────────────────────────────────────────── (ctx_p,) = encode_prompts( [prompt], self.stage_1_model_ledger, enhance_first_prompt=enhance_prompt, enhance_prompt_image=images[0].path if len(images) > 0 else None, ) video_context, audio_context = ctx_p.video_encoding, ctx_p.audio_encoding # ── Encode external audio (if provided) ───────────────────────── encoded_audio_latent = None decoded_audio_for_output = None if has_audio: video_duration = num_frames / frame_rate decoded_audio = decode_audio_from_file(audio_path, self.device, 0.0, video_duration) if decoded_audio is None: raise ValueError(f"Could not extract audio stream from {audio_path}") encoded_audio_latent = vae_encode_audio( decoded_audio, self.stage_1_model_ledger.audio_encoder() ) audio_shape = AudioLatentShape.from_duration( batch=1, duration=video_duration, channels=8, mel_bins=16 ) expected_frames = audio_shape.frames actual_frames = encoded_audio_latent.shape[2] if actual_frames > expected_frames: encoded_audio_latent = encoded_audio_latent[:, :, :expected_frames, :] elif actual_frames < expected_frames: pad = torch.zeros( encoded_audio_latent.shape[0], encoded_audio_latent.shape[1], expected_frames - actual_frames, encoded_audio_latent.shape[3], device=encoded_audio_latent.device, dtype=encoded_audio_latent.dtype, ) encoded_audio_latent = torch.cat([encoded_audio_latent, pad], dim=2) decoded_audio_for_output = Audio( waveform=decoded_audio.waveform.squeeze(0), sampling_rate=decoded_audio.sampling_rate, ) # ── Build conditionings for Stage 1 ────────────────────────────── video_encoder = self.stage_1_model_ledger.video_encoder() stage_1_output_shape = VideoPixelShape( batch=1, frames=num_frames, width=width // 2, height=height // 2, fps=frame_rate, ) # Image conditionings (first frame of letterboxed video) stage_1_conditionings = combined_image_conditionings( images=images, height=stage_1_output_shape.height, width=stage_1_output_shape.width, video_encoder=video_encoder, dtype=dtype, device=self.device, ) # IC-LoRA video reference conditionings (the letterboxed video) if has_video_cond: ic_conds = self._create_ic_conditionings( video_conditioning=video_conditioning, height=stage_1_output_shape.height, width=stage_1_output_shape.width, num_frames=num_frames, video_encoder=video_encoder, conditioning_strength=conditioning_strength, ) stage_1_conditionings.extend(ic_conds) # ── Stage 1: Low-res generation ────────────────────────────────── transformer = self.stage_1_model_ledger.transformer() stage_1_sigmas = torch.Tensor(DISTILLED_SIGMA_VALUES).to(self.device) def denoising_loop(sigmas, video_state, audio_state, stepper): return euler_denoising_loop( sigmas=sigmas, video_state=video_state, audio_state=audio_state, stepper=stepper, denoise_fn=simple_denoising_func( video_context=video_context, audio_context=audio_context, transformer=transformer, ), ) if has_audio: video_state = denoise_video_only( output_shape=stage_1_output_shape, conditionings=stage_1_conditionings, noiser=noiser, sigmas=stage_1_sigmas, stepper=stepper, denoising_loop_fn=denoising_loop, components=self.pipeline_components, dtype=dtype, device=self.device, initial_audio_latent=encoded_audio_latent, ) audio_state = None else: video_state, audio_state = denoise_audio_video( output_shape=stage_1_output_shape, conditionings=stage_1_conditionings, noiser=noiser, sigmas=stage_1_sigmas, stepper=stepper, denoising_loop_fn=denoising_loop, components=self.pipeline_components, dtype=dtype, device=self.device, ) torch.cuda.synchronize() cleanup_memory() # ── Stage 2: Upsample + Refine ────────────────────────────────── upscaled_video_latent = upsample_video( latent=video_state.latent[:1], video_encoder=video_encoder, upsampler=self.stage_2_model_ledger.spatial_upsampler(), ) torch.cuda.synchronize() cleanup_memory() # Stage 2 uses the transformer WITHOUT IC-LoRA transformer_s2 = self.stage_2_model_ledger.transformer() stage_2_sigmas = torch.Tensor(STAGE_2_DISTILLED_SIGMA_VALUES).to(self.device) def denoising_loop_s2(sigmas, video_state, audio_state, stepper): return euler_denoising_loop( sigmas=sigmas, video_state=video_state, audio_state=audio_state, stepper=stepper, denoise_fn=simple_denoising_func( video_context=video_context, audio_context=audio_context, transformer=transformer_s2, ), ) stage_2_output_shape = VideoPixelShape( batch=1, frames=num_frames, width=width, height=height, fps=frame_rate, ) stage_2_conditionings = combined_image_conditionings( images=images, height=stage_2_output_shape.height, width=stage_2_output_shape.width, video_encoder=video_encoder, dtype=dtype, device=self.device, ) if has_audio: video_state = denoise_video_only( output_shape=stage_2_output_shape, conditionings=stage_2_conditionings, noiser=noiser, sigmas=stage_2_sigmas, stepper=stepper, denoising_loop_fn=denoising_loop_s2, components=self.pipeline_components, dtype=dtype, device=self.device, noise_scale=stage_2_sigmas[0], initial_video_latent=upscaled_video_latent, initial_audio_latent=encoded_audio_latent, ) audio_state = None else: video_state, audio_state = denoise_audio_video( output_shape=stage_2_output_shape, conditionings=stage_2_conditionings, noiser=noiser, sigmas=stage_2_sigmas, stepper=stepper, denoising_loop_fn=denoising_loop_s2, components=self.pipeline_components, dtype=dtype, device=self.device, noise_scale=stage_2_sigmas[0], initial_video_latent=upscaled_video_latent, initial_audio_latent=audio_state.latent, ) torch.cuda.synchronize() del transformer, transformer_s2, video_encoder cleanup_memory() # ── Decode ─────────────────────────────────────────────────────── decoded_video = vae_decode_video( video_state.latent, self.stage_2_model_ledger.video_decoder(), tiling_config, generator, ) if has_audio: output_audio = decoded_audio_for_output else: output_audio = vae_decode_audio( audio_state.latent, self.stage_2_model_ledger.audio_decoder(), self.stage_2_model_ledger.vocoder(), ) return decoded_video, output_audio # ───────────────────────────────────────────────────────────────────────────── # Constants # ───────────────────────────────────────────────────────────────────────────── MAX_SEED = np.iinfo(np.int32).max DEFAULT_FRAME_RATE = 24.0 # Output resolutions for outpainting (the expanded canvas) RESOLUTIONS = { "high": {"16:9": (1536, 1024), "9:16": (1024, 1536), "1:1": (1024, 1024), "4:3": (1536, 1152), "3:4": (1152, 1536), "21:9": (1536, 768)}, "low": {"16:9": (768, 512), "9:16": (512, 768), "1:1": (768, 768), "4:3": (768, 576), "3:4": (576, 768), "21:9": (768, 384)}, } # Outpaint fused checkpoint (base + LoRA pre-merged) FUSED_CHECKPOINT_REPO = "linoyts/ltx-2.3-22b-fused-outpaint" FUSED_CHECKPOINT_FILENAME = "ltx-2.3-22b-fused-outpaint.safetensors" # ───────────────────────────────────────────────────────────────────────────── # Download Models # ───────────────────────────────────────────────────────────────────────────── LTX_MODEL_REPO = "Lightricks/LTX-2.3" GEMMA_REPO = "google/gemma-3-12b-it-qat-q4_0-unquantized" print("=" * 80) print("Downloading LTX-2.3 fused outpaint model + Gemma...") print("=" * 80) # Fused checkpoint: base distilled + outpaint LoRA already merged checkpoint_path = hf_hub_download( repo_id=FUSED_CHECKPOINT_REPO, filename=FUSED_CHECKPOINT_FILENAME ) spatial_upsampler_path = hf_hub_download( repo_id=LTX_MODEL_REPO, filename="ltx-2.3-spatial-upscaler-x2-1.0.safetensors" ) gemma_root = snapshot_download(repo_id=GEMMA_REPO) print(f"Checkpoint (fused): {checkpoint_path}") print(f"Spatial upsampler: {spatial_upsampler_path}") print(f"Gemma root: {gemma_root}") # ───────────────────────────────────────────────────────────────────────────── # Initialize Pipeline # ───────────────────────────────────────────────────────────────────────────── pipeline = LTX23OutpaintPipeline( distilled_checkpoint_path=checkpoint_path, spatial_upsampler_path=spatial_upsampler_path, gemma_root=gemma_root, # ic_loras=[] — LoRA already fused into checkpoint quantization=QuantizationPolicy.fp8_cast(), # Outpaint IC-LoRA reference_downscale_factor: read from the LoRA metadata # it was 1 for outpaint, but set explicitly in case reference_downscale_factor=1, ) # Preload all models for ZeroGPU tensor packing. print("Preloading all models...") _ledger_1 = pipeline.stage_1_model_ledger _ledger_2 = pipeline.stage_2_model_ledger _shared = _ledger_1 is _ledger_2 # Stage 1 models (with outpaint LoRA) _s1_transformer = _ledger_1.transformer() _s1_video_encoder = _ledger_1.video_encoder() _s1_text_encoder = _ledger_1.text_encoder() _s1_embeddings = _ledger_1.gemma_embeddings_processor() _s1_audio_encoder = _ledger_1.audio_encoder() _ledger_1.transformer = lambda: _s1_transformer _ledger_1.video_encoder = lambda: _s1_video_encoder _ledger_1.text_encoder = lambda: _s1_text_encoder _ledger_1.gemma_embeddings_processor = lambda: _s1_embeddings _ledger_1.audio_encoder = lambda: _s1_audio_encoder if _shared: _video_decoder = _ledger_1.video_decoder() _audio_decoder = _ledger_1.audio_decoder() _vocoder = _ledger_1.vocoder() _spatial_upsampler = _ledger_1.spatial_upsampler() _ledger_1.video_decoder = lambda: _video_decoder _ledger_1.audio_decoder = lambda: _audio_decoder _ledger_1.vocoder = lambda: _vocoder _ledger_1.spatial_upsampler = lambda: _spatial_upsampler print(" (single shared ledger — no IC-LoRA)") else: # Stage 2 models (separate transformer without IC-LoRA) _s2_transformer = _ledger_2.transformer() _s2_video_encoder = _ledger_2.video_encoder() _s2_video_decoder = _ledger_2.video_decoder() _s2_audio_decoder = _ledger_2.audio_decoder() _s2_vocoder = _ledger_2.vocoder() _s2_spatial_upsampler = _ledger_2.spatial_upsampler() _s2_text_encoder = _ledger_2.text_encoder() _s2_embeddings = _ledger_2.gemma_embeddings_processor() _s2_audio_encoder = _ledger_2.audio_encoder() _ledger_2.transformer = lambda: _s2_transformer _ledger_2.video_encoder = lambda: _s2_video_encoder _ledger_2.video_decoder = lambda: _s2_video_decoder _ledger_2.audio_decoder = lambda: _s2_audio_decoder _ledger_2.vocoder = lambda: _s2_vocoder _ledger_2.spatial_upsampler = lambda: _s2_spatial_upsampler _ledger_2.text_encoder = lambda: _s2_text_encoder _ledger_2.gemma_embeddings_processor = lambda: _s2_embeddings _ledger_2.audio_encoder = lambda: _s2_audio_encoder print(" (two separate ledgers — IC-LoRA active)") print("All models preloaded!") print("=" * 80) # ───────────────────────────────────────────────────────────────────────────── # UI Helpers # ───────────────────────────────────────────────────────────────────────────── def detect_aspect_ratio(media_path) -> str: """Detect the closest aspect ratio from a video.""" if media_path is None: return "16:9" try: w, h = get_video_dimensions(str(media_path)) except Exception: return "16:9" ratio = w / h candidates = { "16:9": 16 / 9, "9:16": 9 / 16, "1:1": 1.0, "4:3": 4 / 3, "3:4": 3 / 4, "21:9": 21 / 9, } return min(candidates, key=lambda k: abs(ratio - candidates[k])) def _get_video_duration(video_path) -> float | None: """Get video duration in seconds via ffprobe.""" if video_path is None: return None try: result = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "format=duration", "-of", "default=nw=1:nk=1", str(video_path)], capture_output=True, text=True, ) return float(result.stdout.strip()) except Exception: return None def on_video_upload(video, high_res): """Auto-set duration when video is uploaded.""" vid_dur = _get_video_duration(video) if vid_dur is not None: dur = round(min(vid_dur, 15.0), 1) else: dur = 3.0 return gr.update(value=dur) def get_target_resolution(target_aspect: str, high_res: bool) -> tuple[int, int]: """Get the target output resolution for the selected aspect ratio.""" tier = "high" if high_res else "low" return RESOLUTIONS[tier].get(target_aspect, RESOLUTIONS[tier]["16:9"]) def preview_letterbox(video, target_aspect, high_res, use_gamma): """Generate a preview of the letterboxed first frame.""" if video is None: return None, gr.update(), gr.update() target_w, target_h = get_target_resolution(target_aspect, high_res) # Load first frame only for preview frames = load_video_frames(str(video)) if not frames: return None, gr.update(value=target_w), gr.update(value=target_h) frame = letterbox_frame(frames[0], target_w, target_h) if use_gamma: frame = apply_gamma(frame, gamma=2.0) preview_path = tempfile.mktemp(suffix=".png") Image.fromarray(frame).save(preview_path) return preview_path, gr.update(value=target_w), gr.update(value=target_h) # ───────────────────────────────────────────────────────────────────────────── # Audio extraction # ───────────────────────────────────────────────────────────────────────────── def _extract_audio_from_video(video_path: str) -> str | None: """Extract audio from video as a temp WAV file. Returns None if no audio.""" out_path = tempfile.mktemp(suffix=".wav") try: probe = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=codec_type", "-of", "default=nw=1:nk=1", video_path], capture_output=True, text=True, ) if not probe.stdout.strip(): return None subprocess.run( ["ffmpeg", "-y", "-v", "error", "-i", video_path, "-vn", "-ac", "2", "-ar", "48000", "-c:a", "pcm_s16le", out_path], check=True, ) return out_path except (subprocess.CalledProcessError, FileNotFoundError): return None def _prepend_silence_to_audio(audio_path: str, silence_duration: float) -> str: """Prepend silence to an audio file so it starts later in the timeline. This aligns audio with the real content when burn-in frames are prepended to video.""" if silence_duration <= 0: return audio_path out_path = tempfile.mktemp(suffix=".wav") # Generate silence then concatenate with original audio subprocess.run( ["ffmpeg", "-y", "-v", "error", "-f", "lavfi", "-i", f"anullsrc=r=48000:cl=stereo:d={silence_duration:.4f}", "-i", audio_path, "-filter_complex", "[0:a][1:a]concat=n=2:v=0:a=1[out]", "-map", "[out]", "-ac", "2", "-ar", "48000", "-c:a", "pcm_s16le", out_path], check=True, ) return out_path def _mux_audio_to_video(video_path: str, audio_path: str) -> str: """Mux an external audio track into a video, trimming to the shorter of the two.""" out_path = tempfile.mktemp(suffix=".mp4") subprocess.run( ["ffmpeg", "-y", "-v", "error", "-i", video_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0", "-shortest", out_path], check=True, ) return out_path # ───────────────────────────────────────────────────────────────────────────── # Generation # ───────────────────────────────────────────────────────────────────────────── @spaces.GPU(duration=120) @torch.inference_mode() def generate_video( input_video, prompt: str = "", duration: float = 3, target_aspect: str = "16:9", conditioning_strength: float = 1.0, enhance_prompt: bool = True, use_gamma: bool = False, use_video_audio: bool = True, seed: int = 42, randomize_seed: bool = True, high_res: bool = False, input_audio=None, progress=gr.Progress(track_tqdm=True), ): try: torch.cuda.reset_peak_memory_stats() current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed) if input_video is None: raise ValueError("Please upload a source video to outpaint.") video_path = str(input_video) frame_rate = DEFAULT_FRAME_RATE # Burn-in: prepend extra frames of the first frame so the model # has time to fill the black regions before actual content starts. # These will be trimmed from the final output. BURNIN_FRAMES = 24 # ~1 second at 24fps # Total frames to generate includes burn-in content_frames = int(duration * frame_rate) + 1 content_frames = ((content_frames - 1 + 7) // 8) * 8 + 1 total_frames = content_frames + BURNIN_FRAMES # Re-align to multiple of 8 + 1 total_frames = ((total_frames - 1 + 7) // 8) * 8 + 1 # Actual burn-in count after alignment (may differ slightly) actual_burnin = total_frames - content_frames # Get target resolution target_w, target_h = get_target_resolution(target_aspect, high_res) print(f"[Outpaint] Generating: {target_h}x{target_w}, {total_frames} frames " f"(content={content_frames}, burnin={actual_burnin}), " f"seed={current_seed}, gamma={use_gamma}, " f"target_aspect={target_aspect}") # Step 1: Letterbox the input video with black bars + burn-in frames letterboxed_path, first_frame_path = letterbox_video( video_path=video_path, target_w=target_w, target_h=target_h, use_gamma=use_gamma, num_frames=content_frames, burnin_frames=actual_burnin, ) print(f"[Outpaint] Letterboxed video saved to {letterboxed_path}") # Build image conditioning from letterboxed first frame images = [ImageConditioningInput(path=first_frame_path, frame_idx=0, strength=1.0)] # Build video conditioning — the letterboxed video IS the conditioning video_conditioning = [(letterboxed_path, 1.0)] # Extract original audio — we'll mux it back at the end untouched, # NOT through the pipeline's audio VAE which would introduce artifacts. original_audio_path = None if input_audio is not None: original_audio_path = str(input_audio) elif use_video_audio: original_audio_path = _extract_audio_from_video(video_path) if original_audio_path: print(f"[Outpaint] Extracted audio from input video (will mux at end)") tiling_config = TilingConfig.default() video_chunks_number = get_video_chunks_number(total_frames, tiling_config) # Generate video WITHOUT audio — audio will be muxed in post video, audio = pipeline( prompt=prompt, seed=current_seed, height=int(target_h), width=int(target_w), num_frames=total_frames, frame_rate=frame_rate, images=images, audio_path=None, # no audio through pipeline video_conditioning=video_conditioning, tiling_config=tiling_config, enhance_prompt=enhance_prompt, conditioning_strength=conditioning_strength, ) output_path = tempfile.mktemp(suffix=".mp4") encode_video( video=video, fps=frame_rate, audio=audio, output_path=output_path, video_chunks_number=video_chunks_number, ) # Step 2: If gamma was used, apply inverse gamma to the final output if use_gamma: print("[Outpaint] Applying inverse gamma correction to output...") output_path = apply_inverse_gamma_to_video(output_path) # Step 3: Trim burn-in frames from the start (video-only at this point) if actual_burnin > 0: print(f"[Outpaint] Trimming {actual_burnin} burn-in frames from output...") output_path = trim_video_start(output_path, actual_burnin, frame_rate) # Step 4: Mux the original untouched audio back in if original_audio_path is not None: print("[Outpaint] Muxing original audio into output...") output_path = _mux_audio_to_video(output_path, original_audio_path) return str(output_path), current_seed except Exception as e: import traceback print(f"Error: {str(e)}\n{traceback.format_exc()}") return None, current_seed # ───────────────────────────────────────────────────────────────────────────── # Gradio UI — LTX 2.3 Outpaint # ───────────────────────────────────────────────────────────────────────────── css = """ .main-title { text-align: center; margin-bottom: 0.5em; } .generate-btn { min-height: 52px !important; font-size: 1.1em !important; } footer { display: none !important; } video { object-fit: contain !important; } .preview-frame img { max-height: 300px !important; object-fit: contain !important; } """ purple_citrus = gr.themes.Citrus( primary_hue=gr.themes.colors.purple, secondary_hue=gr.themes.colors.purple, neutral_hue=gr.themes.colors.gray, ) with gr.Blocks(title="LTX 2.3 Outpaint", css=css, theme=purple_citrus) as demo: gr.Markdown(""" # LTX 2.3 Outpaint: Extend Your Video to Any Aspect Ratio 🖼️ Expand video beyond its original frame with visually and temporally consistent content using [LTX-2.3](https://huggingface.co/Lightricks/LTX-2.3) + [Outpaint IC-LoRA](https://huggingface.co/oumoumad/LTX-2.3-22b-IC-LoRA-Outpaint) by [@oumoumad](https://huggingface.co/oumoumad) ✨ **Tip:** For dark/night scenes, enable **Gamma Correction** (Advanced Settings) so the model can distinguish dark content from the black sentinel bars. """) with gr.Row(): # ── Left column: inputs ────────────────────────────────────── with gr.Column(scale=1): input_video = gr.Video(label="Source Video") with gr.Row(): target_aspect = gr.Dropdown( label="Expand to Aspect Ratio", choices=["16:9", "9:16", "1:1", "4:3", "3:4", "21:9"], value="16:9", info="The target canvas shape — black bars will fill the new area", ) duration = gr.Slider( label="Duration (s)", minimum=1.0, maximum=15.0, value=3.0, step=0.5, ) prompt = gr.Textbox( label="Prompt (optional)", info="Describe the video + what should appear in the expanded regions", lines=2, placeholder="a wide landscape with mountains and a clear sky", ) with gr.Row(): preview_btn = gr.Button("Preview Letterbox", variant="secondary") generate_btn = gr.Button( "Generate Outpaint", variant="primary", size="lg", elem_classes=["generate-btn"], ) with gr.Accordion("Letterbox Preview", open=True): preview_image = gr.Image( label="Letterboxed first frame (black = regions to generate)", type="filepath", elem_classes=["preview-frame"], interactive=False, ) with gr.Accordion("Advanced Settings", open=False): enhance_prompt = gr.Checkbox(label="Enhance Prompt", value=True) conditioning_strength = gr.Slider( label="Conditioning Strength", info="How strongly the original video content influences generation", minimum=0.0, maximum=1.0, value=1.0, step=0.05, ) use_gamma = gr.Checkbox( label="Gamma Correction (for dark scenes)", value=False, info="Apply gamma 2.0 brightening before generation and inverse after — " "recommended for dark/night footage where black bars may be confused " "with dark scene content", ) high_res = gr.Checkbox(label="High Resolution (2×)", value=False) use_video_audio = gr.Checkbox( label="Preserve Audio from Source Video", value=True, info="Extract and keep the audio track from the source video", ) input_audio = gr.Audio( label="Override Audio (optional — replaces video audio)", type="filepath", ) seed = gr.Slider( label="Seed", minimum=0, maximum=MAX_SEED, value=42, step=1, ) randomize_seed = gr.Checkbox(label="Randomize Seed", value=True) with gr.Row(): width_display = gr.Number(label="Output Width", interactive=False) height_display = gr.Number(label="Output Height", interactive=False) # ── Right column: output ───────────────────────────────────── with gr.Column(scale=1): output_video = gr.Video(label="Outpainted Result", autoplay=True, height=480) # ── Event handlers ─────────────────────────────────────────────────── input_video.change( fn=on_video_upload, inputs=[input_video, high_res], outputs=[duration], ) # Auto-preview when video or settings change preview_btn.click( fn=preview_letterbox, inputs=[input_video, target_aspect, high_res, use_gamma], outputs=[preview_image, width_display, height_display], ) # Also auto-preview when aspect ratio or gamma changes target_aspect.change( fn=preview_letterbox, inputs=[input_video, target_aspect, high_res, use_gamma], outputs=[preview_image, width_display, height_display], ) use_gamma.change( fn=preview_letterbox, inputs=[input_video, target_aspect, high_res, use_gamma], outputs=[preview_image, width_display, height_display], ) high_res.change( fn=preview_letterbox, inputs=[input_video, target_aspect, high_res, use_gamma], outputs=[preview_image, width_display, height_display], ) # Auto-preview on video upload too input_video.change( fn=preview_letterbox, inputs=[input_video, target_aspect, high_res, use_gamma], outputs=[preview_image, width_display, height_display], ) generate_btn.click( fn=generate_video, inputs=[ input_video, prompt, duration, target_aspect, conditioning_strength, enhance_prompt, use_gamma, use_video_audio, seed, randomize_seed, high_res, input_audio, ], outputs=[output_video, seed], ) if __name__ == "__main__": demo.launch()