"""ZZZ Image Studio — in-process ComfyUI backend (ZeroGPU compatible). Key idea: run ComfyUI as Python modules inside the same process, so the GPU that ZeroGPU allocates inside @spaces.GPU is actually usable by the model loaders. No subprocess, no HTTP server. """ from __future__ import annotations import os import json import random import re import shutil import subprocess import sys import time from pathlib import Path from typing import Optional import gradio as gr import requests import spaces from huggingface_hub import hf_hub_download from PIL import Image # --------------------------------------------------------------------------- # Paths & config # --------------------------------------------------------------------------- HOME = Path(os.environ.get("HOME", "/home/user")) COMFY_DIR = HOME / "ComfyUI" M_CHECKPOINTS = COMFY_DIR / "models" / "checkpoints" M_DIFFUSION = COMFY_DIR / "models" / "diffusion_models" M_LORAS = COMFY_DIR / "models" / "loras" M_VAE = COMFY_DIR / "models" / "vae" M_TEXT_ENCODERS = COMFY_DIR / "models" / "text_encoders" CIVITAI_TOKEN = os.environ.get("CIVITAI_TOKEN", "") HF_TOKEN = os.environ.get("HF_TOKEN") LUCID_VID = "2722644" LORA1_VID = "2882877" LORA2_VID = "2871082" ELORA1_VID = "2896326" ELORA2_VID = "2882635" ZIMAGE_REPO = "Comfy-Org/z_image_turbo" def _run(cmd, **kw): print(">", " ".join(map(str, cmd))) return subprocess.run(cmd, check=True, **kw) def install_comfyui(): if (COMFY_DIR / "main.py").exists(): print(f"ComfyUI present at {COMFY_DIR}") else: _run(["git", "clone", "--depth", "1", "https://github.com/comfyanonymous/ComfyUI", str(COMFY_DIR)]) req = COMFY_DIR / "requirements.txt" if req.exists(): try: _run([sys.executable, "-m", "pip", "install", "-q", "-r", str(req)]) except Exception as e: print(f"ComfyUI requirements install warning: {e}") for d in (M_CHECKPOINTS, M_DIFFUSION, M_LORAS, M_VAE, M_TEXT_ENCODERS): d.mkdir(parents=True, exist_ok=True) def civitai_headers(): h = {"User-Agent": "Mozilla/5.0"} if CIVITAI_TOKEN: h["Authorization"] = f"Bearer {CIVITAI_TOKEN}" return h def download_civitai(version_id: str, dest_dir: Path, fp8: bool = False) -> Optional[Path]: for p in dest_dir.glob(f"*{version_id}*"): return p # Try multiple URL variations — CivitAI sometimes serves FP8 only as the default. urls = [f"https://civitai.com/api/download/models/{version_id}"] if fp8: urls.insert(0, f"https://civitai.com/api/download/models/{version_id}?type=Model&format=SafeTensor&size=pruned&fp=fp8") last_err = None for url in urls: try: with requests.get(url, headers=civitai_headers(), stream=True, timeout=600, allow_redirects=True) as r: if r.status_code != 200: last_err = f"HTTP {r.status_code}: {r.text[:120]}" continue cd = r.headers.get("Content-Disposition", "") m = re.search(r'filename="?([^";\n]+)"?', cd) fname = m.group(1).strip() if m else f"civitai_{version_id}.safetensors" stem, ext = os.path.splitext(fname) fname = f"{stem}_v{version_id}{ext}" path = dest_dir / fname with open(path, "wb") as f: for chunk in r.iter_content(chunk_size=1 << 20): f.write(chunk) print(f"CivitAI v{version_id} → {path} ({path.stat().st_size/1e9:.2f}GB)") return path except Exception as e: last_err = e print(f"CivitAI v{version_id} download FAILED: {last_err}") return None def _hf_download(repo_id: str, filename: str, dest_dir: Path, repo_type: str = "model") -> Optional[Path]: """Download file from HF, redownload if local size mismatches remote.""" target = dest_dir / Path(filename).name try: cached = hf_hub_download(repo_id, filename, token=HF_TOKEN, force_download=False, repo_type=repo_type) cached_sz = os.path.getsize(cached) if target.exists() and target.stat().st_size == cached_sz: return target if target.exists(): print(f"Replacing stale {target} ({target.stat().st_size}B → {cached_sz}B)") target.unlink() shutil.copy(cached, target) print(f"Copied {filename}: {cached_sz/1e6:.1f}MB → {target}") return target except Exception as e: print(f"HF download {repo_id}/{filename} failed: {e}") return None def download_assets(): print("Downloading Z Image Turbo split files...") for repo, fn, dest, label in [ (ZIMAGE_REPO, "split_files/diffusion_models/z_image_turbo_bf16.safetensors", M_DIFFUSION, "Z UNet bf16"), (ZIMAGE_REPO, "split_files/text_encoders/qwen_3_4b_fp8_mixed.safetensors", M_TEXT_ENCODERS, "Z CLIP (Qwen3 fp8)"), (ZIMAGE_REPO, "split_files/vae/ae.safetensors", M_VAE, "Z VAE"), # ERNIE Image Turbo split files (~24GB total) ("Comfy-Org/ERNIE-Image", "diffusion_models/ernie-image-turbo.safetensors", M_DIFFUSION, "ERNIE UNet"), ("Comfy-Org/ERNIE-Image", "text_encoders/ministral-3-3b.safetensors", M_TEXT_ENCODERS, "ERNIE CLIP (Ministral-3B)"), ("Comfy-Org/ERNIE-Image", "vae/flux2-vae.safetensors", M_VAE, "ERNIE VAE (flux2)"), ]: ok = _hf_download(repo, fn, dest) print(f" {label}: {'OK' if ok else 'FAIL'}") download_civitai(LUCID_VID, M_DIFFUSION, fp8=True) # store with UNet so UNETLoader sees it # ── Dynamic LoRA discovery ────────────────────────────────────────── # 所有 LoRA 从 jing96963/loras dataset 拉,按 lora_meta.json 注册表加载。 # 加新 LoRA 只需在 dataset 上传文件 + 在 lora_meta.json 加一行(slot 指定 6 槽位之一)。 LORAS_DATASET = "jing96963/loras" global _lora_registry _lora_registry = [] try: meta_path = hf_hub_download(LORAS_DATASET, "lora_meta.json", repo_type="dataset", token=HF_TOKEN, force_download=True) with open(meta_path, "r", encoding="utf-8") as f: meta = json.load(f) _lora_registry = meta.get("entries", []) print(f"LoRA registry loaded: {len(_lora_registry)} entries") except Exception as e: print(f"lora_meta.json missing/invalid ({e}); falling back to filename auto-discovery") for entry in _lora_registry: fn = entry.get("filename") if not fn: continue try: _hf_download(LORAS_DATASET, fn, M_LORAS, repo_type="dataset") except Exception as e: print(f"LoRA {fn} download failed: {e}") # --------------------------------------------------------------------------- # Discover what filenames we have so we can address them in workflow nodes # --------------------------------------------------------------------------- _lora_registry: list = [] _files = { "z_unet": None, "z_clip": None, "z_vae": None, "ernie_unet": None, "ernie_clip": None, "ernie_vae": None, "lucid": None, "lora1": None, "lora2": None, # Z 系:CivitAI "lora3": None, "lora4": None, # Z 系:用户自定义(Google Drive) "elora1": None, "elora2": None, # ERNIE 系 } def discover_files(): def first(d: Path, glob="*.safetensors"): files = sorted(d.glob(glob)) return files[0].name if files else None _files["z_unet"] = first(M_DIFFUSION, "*z_image*turbo*.safetensors") _files["z_clip"] = first(M_TEXT_ENCODERS, "qwen_3_4b*.safetensors") _files["z_vae"] = first(M_VAE, "ae.safetensors") _files["ernie_unet"] = first(M_DIFFUSION, "ernie-image-turbo.safetensors") _files["ernie_clip"] = first(M_TEXT_ENCODERS, "ministral-3-3b.safetensors") _files["ernie_vae"] = first(M_VAE, "flux2-vae.safetensors") _files["lucid"] = first(M_DIFFUSION, f"*{LUCID_VID}*.safetensors") _files["lora1"] = first(M_LORAS, f"*{LORA1_VID}*.safetensors") _files["lora2"] = first(M_LORAS, f"*{LORA2_VID}*.safetensors") _files["lora3"] = first(M_LORAS, "z_lora_1.safetensors") _files["lora4"] = first(M_LORAS, "z_lora_2.safetensors") _files["elora1"] = first(M_LORAS, "ernie_lora_1.safetensors") _files["elora2"] = first(M_LORAS, "ernie_lora_2.safetensors") # Override above mapping with lora_meta.json registry (dynamic) try: registry = globals().get("_lora_registry", []) for entry in registry: slot = entry.get("slot") fn = entry.get("filename") if not slot or not fn: continue # slot codes: zl1/zl2/zl3/zl4/el1/el2 → file dict keys: lora1/lora2/lora3/lora4/elora1/elora2 slot_to_key = {"zl1":"lora1","zl2":"lora2","zl3":"lora3","zl4":"lora4","el1":"elora1","el2":"elora2"} key = slot_to_key.get(slot) if not key: continue target = M_LORAS / fn if target.exists(): _files[key] = fn except Exception as _e: print(f"registry override failed: {_e}") print("Files discovered:", _files) # --------------------------------------------------------------------------- # Initial setup — lightweight only. Heavy work (clone ComfyUI, downloads, # import nodes) is deferred to the first @spaces.GPU call so APP_STARTING # finishes within HF's startup window. # --------------------------------------------------------------------------- print("=== ZZZ Image Studio (in-process ComfyUI) startup ===") _node_classes = {} # filled at module load _environment_ready = False def _force_stub(name: str): """Replace the named module with a permissive stub. The stub's __getattr__ creates child stubs on demand, and any callable usage is a no-op returning None. This makes it safe for ComfyUI's various imports of unused video/audio backends (e.g. av.logging.set_level).""" import types from importlib.machinery import ModuleSpec class _Stub(types.ModuleType): def __getattr__(self, attr): if attr.startswith("__"): raise AttributeError(attr) child = _Stub(f"{self.__name__}.{attr}") child.__spec__ = ModuleSpec(child.__name__, None) sys.modules[child.__name__] = child setattr(self, attr, child) return child def __call__(self, *a, **kw): # in case someone calls a stub directly return None stub = _Stub(name) stub.__spec__ = ModuleSpec(name, None) stub.__path__ = [] # type: ignore sys.modules[name] = stub def _setup_environment_eagerly(): """Do all CPU-only heavy setup at import time so @spaces.GPU only does model load.""" global _environment_ready install_comfyui() try: download_assets() except Exception as e: print(f"download_assets warning: {e}") discover_files() sys.path.insert(0, str(COMFY_DIR)) os.chdir(str(COMFY_DIR)) # Stub out modules whose CUDA libs don't match HF Space's torch/CUDA. # `av` is real (installed via pip), only torchaudio needs stubbing. _force_stub("torchaudio") import nodes as comfy_nodes # type: ignore for k in ("UNETLoader", "CLIPLoader", "VAELoader", "LoraLoader", "CLIPTextEncode", "EmptyLatentImage", "KSampler", "VAEDecode", "VAEEncode", "LoadImage"): _node_classes[k] = comfy_nodes.NODE_CLASS_MAPPINGS[k] _environment_ready = True print("ComfyUI environment ready (eager).") def _ensure_environment(): if not _environment_ready: _setup_environment_eagerly() # Cache loader objects so we keep loaded weights between calls. _state = { "unet_official": None, "unet_lucid": None, "unet_ernie": None, "clip_z": None, # Qwen3 (Z系) "clip_ernie": None, # Ministral-3B (ERNIE系) "vae_z": None, "vae_ernie": None, "active_loras": ((), ()), "model_loras": None, "active_unet_key": None, } def _is_ernie(model_choice: str) -> bool: return model_choice == "ERNIE Image Turbo (百度)" def _ensure_clip_vae(model_choice: str): if _is_ernie(model_choice): if _state["clip_ernie"] is None: print("Loading ERNIE text encoder (Ministral-3B)...") _state["clip_ernie"] = _node_classes["CLIPLoader"]().load_clip( _files["ernie_clip"], "ernie_image") if _state["vae_ernie"] is None: print("Loading ERNIE VAE (flux2)...") _state["vae_ernie"] = _node_classes["VAELoader"]().load_vae(_files["ernie_vae"]) return _state["clip_ernie"], _state["vae_ernie"] # Z 系 if _state["clip_z"] is None: print("Loading Z text encoder (Qwen3)...") _state["clip_z"] = _node_classes["CLIPLoader"]().load_clip(_files["z_clip"], "z_image") if _state["vae_z"] is None: print("Loading Z VAE...") _state["vae_z"] = _node_classes["VAELoader"]().load_vae(_files["z_vae"]) return _state["clip_z"], _state["vae_z"] def _ensure_unet(model_choice: str): if _is_ernie(model_choice): if _state["unet_ernie"] is None: print("Loading ERNIE UNet (bf16)...") _state["unet_ernie"] = _node_classes["UNETLoader"]().load_unet( _files["ernie_unet"], "default") return _state["unet_ernie"] if model_choice == "LucidDreamer Z v0.6112" and _files["lucid"]: if _state["unet_lucid"] is None: print("Loading LucidDreamer Z UNet (fp8)...") _state["unet_lucid"] = _node_classes["UNETLoader"]().load_unet( _files["lucid"], "fp8_e4m3fn") return _state["unet_lucid"] if _state["unet_official"] is None: print("Loading Z Image Turbo UNet (bf16)...") _state["unet_official"] = _node_classes["UNETLoader"]().load_unet( _files["z_unet"], "default") return _state["unet_official"] def _apply_loras(model, clip, slots): """slots: list of (filename, weight) tuples to apply in order.""" key = tuple((fn, round(float(w), 3)) for fn, w in slots) if key == _state["active_loras"] and _state["model_loras"] is not None: return _state["model_loras"] LoraLoader = _node_classes["LoraLoader"] cur_model, cur_clip = model, clip for fn, w in slots: if fn: cur_model, cur_clip = LoraLoader().load_lora(cur_model, cur_clip, fn, float(w), float(w)) _state["active_loras"] = key _state["model_loras"] = (cur_model, cur_clip) return cur_model, cur_clip # --------------------------------------------------------------------------- # Resolution helpers # --------------------------------------------------------------------------- RES_CHOICES = { "1024": [ "1024x1024 ( 1:1 )", "1152x896 ( 9:7 )", "896x1152 ( 7:9 )", "1152x864 ( 4:3 )", "864x1152 ( 3:4 )", "1248x832 ( 3:2 )", "832x1248 ( 2:3 )", "1280x720 ( 16:9 )", "720x1280 ( 9:16 )", "1344x576 ( 21:9 )", "576x1344 ( 9:21 )", ], "1280": [ "1280x1280 ( 1:1 )", "1440x1120 ( 9:7 )", "1120x1440 ( 7:9 )", "1472x1104 ( 4:3 )", "1104x1472 ( 3:4 )", "1536x1024 ( 3:2 )", "1024x1536 ( 2:3 )", "1536x864 ( 16:9 )", "864x1536 ( 9:16 )", "1680x720 ( 21:9 )", "720x1680 ( 9:21 )", ], "1536": [ "1536x1536 ( 1:1 )", "1728x1344 ( 9:7 )", "1344x1728 ( 7:9 )", "1728x1296 ( 4:3 )", "1296x1728 ( 3:4 )", "1872x1248 ( 3:2 )", "1248x1872 ( 2:3 )", "2048x1152 ( 16:9 )", "1152x2048 ( 9:16 )", "2016x864 ( 21:9 )", "864x2016 ( 9:21 )", ], } RESOLUTION_SET = [r for rs in RES_CHOICES.values() for r in rs] def get_wh(resolution: str): m = re.search(r"(\d+)\s*[×x]\s*(\d+)", resolution) return (int(m.group(1)), int(m.group(2))) if m else (1024, 1024) # Run heavy setup BEFORE the Gradio Blocks definition. HF gives ~30 min for # APP_STARTING; install + 27GB download + ComfyUI import fits in that. try: _setup_environment_eagerly() except Exception as e: import traceback print("EAGER SETUP FAILED:\n" + traceback.format_exc()) # --------------------------------------------------------------------------- # Generate # --------------------------------------------------------------------------- @spaces.GPU(duration=120) def generate( prompt, resolution="1024x1024 ( 1:1 )", seed=42, steps=8, shift=3.0, guidance_scale=1.0, random_seed=True, model_choice="Z Image Turbo (Official)", enable_zl1=False, zl1_scale=0.8, enable_zl2=False, zl2_scale=0.8, enable_el1=False, el1_scale=0.8, enable_el2=False, el2_scale=0.8, enable_zl3=False, zl3_scale=0.8, enable_zl4=False, zl4_scale=0.8, init_image=None, # img2img 参考图(PIL Image / file path / None) i2i_denoise=1.0, # img2img 去噪强度(0=完全保留原图, 1=纯文生图) gallery_images=None, progress=gr.Progress(track_tqdm=True), ): import torch try: _ensure_environment() new_seed = (random.randint(1, 1_000_000) if random_seed else (int(seed) if int(seed) != -1 else random.randint(1, 1_000_000))) w, h = get_wh(resolution) torch.set_grad_enabled(False) is_ernie = _is_ernie(model_choice) clip_tup, vae_tup = _ensure_clip_vae(model_choice) unet = _ensure_unet(model_choice) model = unet[0] clip = clip_tup[0] vae = vae_tup[0] # Reset lora cache when base model identity changes active_unet_key = ( "ernie" if is_ernie else "lucid" if model_choice == "LucidDreamer Z v0.6112" else "official" ) if _state.get("active_unet_key") != active_unet_key: _state["active_loras"] = ((), ()) _state["model_loras"] = None _state["active_unet_key"] = active_unet_key # Build active LoRA list filtered by base model family active_slots = [] z_lora_slots = [ (enable_zl1, zl1_scale, "lora1"), (enable_zl2, zl2_scale, "lora2"), (enable_zl3, zl3_scale, "lora3"), (enable_zl4, zl4_scale, "lora4"), ] ernie_lora_slots = [ (enable_el1, el1_scale, "elora1"), (enable_el2, el2_scale, "elora2"), ] for en, scale, key in (ernie_lora_slots if is_ernie else z_lora_slots): if en and _files[key]: active_slots.append((_files[key], float(scale))) model, clip = _apply_loras(model, clip, active_slots) CLIPTextEncode = _node_classes["CLIPTextEncode"] EmptyLatentImage = _node_classes["EmptyLatentImage"] KSampler = _node_classes["KSampler"] VAEDecode = _node_classes["VAEDecode"] VAEEncode = _node_classes["VAEEncode"] # 固定反向提示词 FIXED_NEG = ("worst quality, low quality, bad anatomy, bad hands, text, error, " "missing fingers, extra digit, fewer digits, cropped, jpeg artifacts, " "signature, watermark, username, blurry") pos = CLIPTextEncode().encode(clip, prompt or "")[0] neg = CLIPTextEncode().encode(clip, FIXED_NEG)[0] # img2img: VAE-encode the init image as the starting latent ks_denoise = 1.0 if init_image is not None: from PIL import Image as _PILImage import numpy as _np if isinstance(init_image, str): pil = _PILImage.open(init_image) elif hasattr(init_image, "save"): pil = init_image else: pil = _PILImage.open(str(init_image)) pil = pil.convert("RGB") iw = max(16, (pil.width // 16) * 16) ih = max(16, (pil.height // 16) * 16) if (iw, ih) != (pil.width, pil.height): pil = pil.resize((iw, ih), _PILImage.LANCZOS) arr = _np.asarray(pil).astype(_np.float32) / 255.0 tensor = torch.from_numpy(arr)[None, ...] # [1, H, W, 3] latent = VAEEncode().encode(vae, tensor)[0] ks_denoise = float(i2i_denoise) print(f"img2img: init {iw}x{ih}, denoise={ks_denoise}") else: latent = EmptyLatentImage().generate(int(w), int(h), 1)[0] samples = KSampler().sample( model, int(new_seed), int(steps), float(guidance_scale), "euler", "beta", pos, neg, latent, denoise=ks_denoise, )[0] images = VAEDecode().decode(vae, samples)[0] import numpy as np arr = (images[0].detach().clamp(0, 1).cpu().float().numpy() * 255).astype(np.uint8) img = Image.fromarray(arr) gallery_images = [img] + (gallery_images or []) return gallery_images, str(new_seed), int(new_seed) except Exception as e: import traceback print("GENERATE ERROR:\n" + traceback.format_exc()) raise gr.Error(f"生成失败: {type(e).__name__}: {e}") # --------------------------------------------------------------------------- # UI # --------------------------------------------------------------------------- with gr.Blocks(title="ZZZ Image Studio (ComfyUI in-process)") as demo: gr.Markdown("# ZZZ Image Studio · ComfyUI in-process backend") with gr.Row(): with gr.Column(scale=1): prompt_input = gr.Textbox(label="Prompt", lines=3, placeholder="描述你想生成的画面(中英文均可)…") model_choice = gr.Radio( choices=["Z Image Turbo (Official)", "LucidDreamer Z v0.6112", "ERNIE Image Turbo (百度)"], value="Z Image Turbo (Official)", label="Base Model") with gr.Accordion("Z Image LoRAs", open=True): with gr.Row(): enable_zl1 = gr.Checkbox(label="Niji-listic [ZIT]", value=False) zl1_scale = gr.Slider(0.0, 1.5, value=0.8, step=0.05, label="Weight") with gr.Row(): enable_zl2 = gr.Checkbox(label="Cyaniji [ZIT]", value=False) zl2_scale = gr.Slider(0.0, 1.5, value=0.8, step=0.05, label="Weight") enable_el1 = gr.Checkbox(visible=False, value=False) el1_scale = gr.Slider(visible=False, value=0.8) enable_el2 = gr.Checkbox(visible=False, value=False) el2_scale = gr.Slider(visible=False, value=0.8) enable_zl3 = gr.Checkbox(visible=False, value=False) zl3_scale = gr.Slider(visible=False, value=0.8) enable_zl4 = gr.Checkbox(visible=False, value=False) zl4_scale = gr.Slider(visible=False, value=0.8) with gr.Accordion("图生图(可选)", open=False): init_image = gr.Image(label="参考图(不传则做文生图)", type="pil") i2i_denoise = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="Denoise(越低越保留原图)") with gr.Accordion("Resolution", open=False): with gr.Row(): res_cat = gr.Dropdown(value=1024, choices=[1024, 1280, 1536], label="Category") resolution = gr.Dropdown( value=RES_CHOICES["1024"][0], choices=RESOLUTION_SET, label="Width × Height") with gr.Accordion("Advanced", open=False): with gr.Row(): seed = gr.Number(label="Seed", value=42, precision=0) random_seed = gr.Checkbox(label="Random Seed", value=True) with gr.Row(): steps = gr.Slider(1, 50, value=8, step=1, label="Steps") guidance_scale = gr.Slider(0.0, 10.0, value=1.0, step=0.1, label="CFG Scale") shift = gr.Slider(1.0, 10.0, value=3.0, step=0.1, label="(unused)") generate_btn = gr.Button("Generate", variant="primary", size="lg") with gr.Column(scale=1): output_gallery = gr.Gallery( label="Generated Images", columns=2, rows=2, height=620, object_fit="contain", format="png", interactive=False) used_seed = gr.Textbox(label="Seed Used", interactive=False) def update_res_choices(cat): choices = RES_CHOICES.get(str(cat), RES_CHOICES["1024"]) return gr.update(value=choices[0], choices=choices) res_cat.change(update_res_choices, inputs=res_cat, outputs=resolution, api_visibility="private") generate_btn.click( generate, inputs=[ prompt_input, resolution, seed, steps, shift, guidance_scale, random_seed, model_choice, enable_zl1, zl1_scale, enable_zl2, zl2_scale, enable_el1, el1_scale, enable_el2, el2_scale, enable_zl3, zl3_scale, enable_zl4, zl4_scale, init_image, i2i_denoise, output_gallery, ], outputs=[output_gallery, used_seed, seed], api_name="generate", ) if __name__ == "__main__": demo.launch()