""" Sulphur — Image to Video (HF Spaces). Clones Wan2GP and downloads models on first run. WanGPSession is created once on the first GPU call and kept alive so the model stays in memory — subsequent generations skip the full reload. """ import io import os import sys import queue import subprocess import shutil import tempfile import threading import json from pathlib import Path import gradio as gr import spaces _HF_TOKEN = os.environ.get("HF_TOKEN") _PERSISTENT = Path("/data") if Path("/data").exists() else Path(tempfile.gettempdir()) WAN2GP_ROOT = _PERSISTENT / "Wan2GP" CKPTS_DIR = WAN2GP_ROOT / "ckpts" LORAS_DIR = WAN2GP_ROOT / "loras" / "ltx2" FINETUNES_DIR = WAN2GP_ROOT / "finetunes" SULPHUR_ASSETS = [ ("SulphurAI/Sulphur-2-base", "sulphur_distil_bf16.safetensors", CKPTS_DIR), ] LTX_ASSETS = [ ("SulphurAI/Sulphur-2-base", "experimental/sulphur_experimental_lora_v1.safetensors", LORAS_DIR), ("DeepBeepMeep/LTX-2", "ltx-2.3-22b-distilled-lora-384.safetensors", LORAS_DIR), ("DeepBeepMeep/LTX-2", "ltx-2.3-22b_vae.safetensors", CKPTS_DIR), ("DeepBeepMeep/LTX-2", "ltx-2.3-22b_text_embedding_projection.safetensors", CKPTS_DIR), ("DeepBeepMeep/LTX-2", "ltx-2.3-22b_embeddings_connector.safetensors", CKPTS_DIR), ] SULPHUR_FINETUNE = { "model": { "name": "Sulphur 2 Base", "visible": True, "architecture": "ltx2_22B", "parent_model_type": "ltx2_22B", "description": "LTX-2.3 fine-tuned i2v. Distilled checkpoint.", # Full distilled model — do NOT also preload the rank-768 LoRA (README: use one or the other) "URLs": [str(CKPTS_DIR / "sulphur_distil_bf16.safetensors")], "preload_URLs": [], }, "num_inference_steps": 8, "video_length": 81, "resolution": "832x480", "guidance_scale": 3.5, "alt_guidance_scale": 3.5, } _setup_lock = threading.Lock() _setup_done = False _session = None _session_lock = threading.Lock() def _download(repo_id, filename, dest_dir): from huggingface_hub import hf_hub_download dest_dir.mkdir(parents=True, exist_ok=True) dest = dest_dir / Path(filename).name # flat — strip any subfolder if dest.exists(): print(f"[download] cached: {dest.name}") return print(f"[download] {repo_id}/{filename}") hf_hub_download(repo_id=repo_id, filename=filename, local_dir=str(dest_dir), token=_HF_TOKEN) # hf_hub_download preserves subfolder structure; flatten to dest_dir root downloaded = dest_dir / filename if downloaded.exists() and not dest.exists(): shutil.move(str(downloaded), str(dest)) def setup(): global _setup_done with _setup_lock: if _setup_done: return _setup_done = True if not (WAN2GP_ROOT / "shared" / "api.py").exists(): WAN2GP_ROOT.mkdir(parents=True, exist_ok=True) print("[setup] Cloning Wan2GP...") subprocess.run( ["git", "clone", "--depth=1", "https://github.com/deepbeepmeep/Wan2GP.git", str(WAN2GP_ROOT)], check=True, ) for repo, fname, dest in SULPHUR_ASSETS + LTX_ASSETS: _download(repo, fname, dest) # Gemma text encoder — must stay in its subfolder (Wan2GP looks there by name) _gemma_folder = "gemma-3-12b-it-qat-q4_0-unquantized" _gemma_file = f"{_gemma_folder}_quanto_bf16_int8.safetensors" gemma_dest = CKPTS_DIR / _gemma_folder / _gemma_file if not gemma_dest.exists(): from huggingface_hub import hf_hub_download print("[download] Gemma text encoder...") hf_hub_download( repo_id="DeepBeepMeep/LTX-2", filename=f"{_gemma_folder}/{_gemma_file}", local_dir=str(CKPTS_DIR), token=_HF_TOKEN, ) else: print("[download] cached: Gemma text encoder") FINETUNES_DIR.mkdir(parents=True, exist_ok=True) (FINETUNES_DIR / "sulphur_2_base.json").write_text(json.dumps(SULPHUR_FINETUNE, indent=2)) print("[setup] Done.") setup() RESOLUTIONS = ["832x480", "480x832", "640x640", "1024x576", "576x1024"] def _ensure_session(): """Create WanGPSession on first call; reuse on all subsequent calls.""" global _session if _session is not None: return _session with _session_lock: if _session is not None: return _session sys.path.insert(0, str(WAN2GP_ROOT)) os.chdir(WAN2GP_ROOT) from shared.api import WanGPSession out_dir = Path("/tmp/wan2gp_output") out_dir.mkdir(parents=True, exist_ok=True) _session = WanGPSession(root=WAN2GP_ROOT, output_dir=out_dir, console_output=True) return _session class _QueueWriter(io.TextIOBase): """Forward print() output to a queue while also passing through to real stdout.""" def __init__(self, q, real_out): self._q = q self._real = real_out self._buf = "" def write(self, s): self._real.write(s) self._buf += s # flush complete lines (Wan2GP uses both \n and \r for progress) while True: for sep in ("\r\n", "\n", "\r"): idx = self._buf.find(sep) if idx >= 0: line = self._buf[:idx] self._buf = self._buf[idx + len(sep):] if line.strip(): self._q.put(line) break else: break return len(s) def flush(self): self._real.flush() @spaces.GPU(duration=120) def generate_video(image, prompt, resolution, steps, guidance_scale, seconds, seed): if image is None: raise gr.Error("Please upload an image.") if not prompt.strip(): raise gr.Error("Please enter a prompt.") session = _ensure_session() frames = int(seconds * 25 // 8) * 8 + 1 image_path = str(Path(image.strip()).resolve()) out_dir = Path(tempfile.mkdtemp()) out_file = out_dir / "output.mp4" task = { "model_type": "sulphur_2_base", "base_model_type": "sulphur_2_base", "prompt": prompt, "image_start": image_path, "num_inference_steps": int(steps), "guidance_scale": float(guidance_scale), "resolution": resolution, "video_length": frames, "seed": int(seed), "image_prompt_type": "S", "input_video_strength": 1.0, "activated_loras": ["sulphur_experimental_lora_v1.safetensors"], "loras_multipliers": ["0.5"], } log_q = queue.Queue() result_box = [None] error_box = [None] real_out = sys.stdout real_err = sys.stderr def _run(): writer = _QueueWriter(log_q, real_out) sys.stdout = writer sys.stderr = writer # tqdm and progress bars write to stderr try: result_box[0] = session.run_task(task) except Exception as exc: error_box[0] = exc finally: sys.stdout = real_out sys.stderr = real_err log_q.put(None) # sentinel t = threading.Thread(target=_run, daemon=True) t.start() log_lines = [] while True: try: line = log_q.get(timeout=180) except queue.Empty: yield None, "\n".join(log_lines) + "\n\n[TIMEOUT]" return if line is None: break stripped = line.strip() if not stripped: continue if log_lines and ("%" in stripped or "it/s" in stripped or "step" in stripped.lower()): log_lines[-1] = stripped else: log_lines.append(stripped) yield None, "\n".join(log_lines[-30:]) t.join(timeout=10) if error_box[0]: yield None, "\n".join(log_lines) + f"\n\n[ERROR] {error_box[0]}" return result = result_box[0] output_file = None if result and result.artifacts: src = result.artifacts[0].path if src and Path(src).exists(): output_file = str(src) if output_file is None: for search_dir in [out_dir, Path("/tmp/wan2gp_output")]: candidates = sorted(search_dir.glob("**/*.mp4"), key=lambda f: f.stat().st_mtime, reverse=True) if candidates: output_file = str(candidates[0]) break if output_file: final = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) shutil.copy2(output_file, final.name) yield final.name, "\n".join(log_lines) + "\n\n[DONE]" else: errors = getattr(result, "errors", None) yield None, "\n".join(log_lines) + f"\n\n[ERROR] No output found. Errors: {errors}" with gr.Blocks(title="Sulphur — Image to Video") as demo: gr.Markdown("# Sulphur — Image to Video\nUsing New 'Experimental' Lora v1") with gr.Row(): with gr.Column(scale=1): image_in = gr.Image(type="filepath", label="Input Image") prompt_in = gr.Textbox(label="Prompt", placeholder="Describe the motion…", lines=3) with gr.Accordion("Advanced", open=False): resolution_dd = gr.Dropdown(RESOLUTIONS, value="832x480", label="Resolution") steps_sl = gr.Slider(1, 50, value=8, step=1, label="Steps") guidance_sl = gr.Slider(1.0, 10.0, value=5.0, step=0.5, label="Guidance Scale") seconds_sl = gr.Slider(1, 5, value=3, step=1, label="Seconds") seed_num = gr.Number(value=-1, label="Seed (-1 = random)", precision=0) run_btn = gr.Button("Generate", variant="primary") with gr.Column(scale=1): video_out = gr.Video(label="Output Video") log_out = gr.Textbox(label="Log", lines=10, interactive=False) run_btn.click( fn=generate_video, inputs=[image_in, prompt_in, resolution_dd, steps_sl, guidance_sl, seconds_sl, seed_num], outputs=[video_out, log_out], ) if __name__ == "__main__": demo.launch(theme=gr.themes.Soft())