File size: 19,829 Bytes
5d0bada
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0e89ded
6d8fb5f
5d0bada
 
 
 
 
 
 
 
0e89ded
5d0bada
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
140a849
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c86ac4c
 
 
 
 
 
 
 
 
140a849
c86ac4c
 
 
 
140a849
 
 
5d0bada
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
af6d26d
 
 
 
 
 
 
 
5d0bada
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84f9e8e
 
 
5d0bada
 
 
 
 
 
84f9e8e
5d0bada
 
 
84f9e8e
5d0bada
 
 
 
 
 
84f9e8e
5d0bada
 
84f9e8e
5d0bada
 
 
 
 
 
 
 
 
 
 
 
f8b8261
5d0bada
f8b8261
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5d0bada
 
 
 
 
84f9e8e
c4a367a
 
84f9e8e
c4a367a
5cc4c4c
 
 
 
 
 
5d0bada
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0e89ded
 
 
 
 
 
 
 
 
 
 
6d8fb5f
 
 
 
 
 
 
0e89ded
 
 
 
 
 
 
 
 
 
 
 
884da37
 
 
 
 
 
 
 
 
 
 
 
5d0bada
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
884da37
5d0bada
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0e89ded
 
 
 
 
 
 
6d8fb5f
 
 
 
 
 
0e89ded
 
 
884da37
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
"""
pipeline_manager.py
-------------------
Loads diffusion pipelines from an editable registry (models.json) and runs
generation across multiple base families (SD1.5 / SDXL / FLUX) and multiple
input modes (txt2img / img2img / IP-Adapter / Face identity).

Designed for Hugging Face ZeroGPU: pipelines are built/cached on CPU and moved
to CUDA inside the @spaces.GPU-decorated caller (see app.py). Nothing here calls
.cuda() at import time.
"""

import os
import json
import gc
import hashlib
import urllib.request
from pathlib import Path

import torch

# ---------------------------------------------------------------------------
# Constants / paths
# ---------------------------------------------------------------------------
HERE = Path(__file__).parent
REGISTRY_PATH = HERE / "models.json"
DOWNLOAD_DIR = Path(os.environ.get("CS_CACHE_DIR", "/tmp/cs_models"))
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)

CIVITAI_TOKEN = os.environ.get("CIVITAI_TOKEN", "").strip()
HF_TOKEN = os.environ.get("HF_TOKEN", "").strip() or None

DTYPE = torch.bfloat16 if torch.cuda.is_available() else torch.float32
# SD1.5 / SDXL are most stable in float16; FLUX prefers bfloat16.
DTYPE_SD = torch.float16

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Modes supported per base family. Used by the UI to gate options.
SUPPORTED_MODES = {
    "sd15": ["txt2img", "img2img", "ip_adapter", "face_id", "pose"],
    "sdxl": ["txt2img", "img2img", "ip_adapter", "face_id", "pose"],
    "flux": ["txt2img", "img2img"],
}

MODE_LABELS = {
    "txt2img": "Text → Image",
    "img2img": "Image → Image (denoise)",
    "ip_adapter": "IP-Adapter (style / subject)",
    "face_id": "Face identity (FaceID)",
    "pose": "Pose lock (ControlNet OpenPose)",
}

# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
def load_registry():
    """Read models.json and return the list of enabled model configs."""
    with open(REGISTRY_PATH, "r", encoding="utf-8") as f:
        data = json.load(f)
    models = [m for m in data.get("models", []) if m.get("enabled", True)]
    return models


def get_model(models, model_id):
    for m in models:
        if m["id"] == model_id:
            return m
    return None


# ---------------------------------------------------------------------------
# Thai → English prompt translation (the SD/SDXL/FLUX text encoders are English;
# Thai prompts otherwise produce unrelated images). Runs on the Space, no API key.
# ---------------------------------------------------------------------------
TRANSLATORS = {
    "nllb": "facebook/nllb-200-distilled-600M",
    "typhoon": "scb10x/llama3.2-typhoon2-3b-instruct",
}
_TRANSLATOR_CACHE = {}


def has_thai(text):
    return any("฀" <= ch <= "๿" for ch in (text or ""))


def _load_translator(engine):
    if engine in _TRANSLATOR_CACHE:
        return _TRANSLATOR_CACHE[engine]
    name = TRANSLATORS[engine]
    if engine == "nllb":
        from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
        tok = AutoTokenizer.from_pretrained(name)
        model = AutoModelForSeq2SeqLM.from_pretrained(name, torch_dtype=DTYPE_SD)
    else:  # typhoon (causal LM)
        from transformers import AutoTokenizer, AutoModelForCausalLM
        tok = AutoTokenizer.from_pretrained(name)
        model = AutoModelForCausalLM.from_pretrained(name, torch_dtype=DTYPE_SD)
    model.eval()
    _TRANSLATOR_CACHE[engine] = (tok, model)
    return tok, model


def translate_prompt(text, engine):
    """Translate a Thai prompt to English. Pass-through if empty/English/off.
    MUST be called inside the @spaces.GPU context (uses CUDA when available)."""
    if not text or engine in (None, "off") or not has_thai(text):
        return text
    try:
        tok, model = _load_translator(engine)
        model = model.to(DEVICE)
        if engine == "nllb":
            tok.src_lang = "tha_Thai"
            inputs = tok(text, return_tensors="pt", truncation=True,
                         max_length=400).to(DEVICE)
            bos = tok.convert_tokens_to_ids("eng_Latn")
            out = model.generate(**inputs, forced_bos_token_id=bos,
                                 max_new_tokens=256, num_beams=4)
            return tok.batch_decode(out, skip_special_tokens=True)[0].strip()
        # typhoon: ask the LLM to rewrite as a clean English image prompt
        msgs = [
            {"role": "system", "content": "You convert Thai text-to-image prompts "
             "into a single concise, vivid English prompt for Stable Diffusion. "
             "Keep the described subject, clothing, pose, and scene. Output ONLY the "
             "English prompt as a comma-separated phrase — no quotes, no explanation."},
            {"role": "user", "content": text},
        ]
        chat = tok.apply_chat_template(msgs, add_generation_prompt=True, tokenize=False)
        inputs = tok(chat, return_tensors="pt").to(DEVICE)
        eos = tok.eos_token_id
        pad = eos[0] if isinstance(eos, (list, tuple)) else eos
        with torch.no_grad():
            out = model.generate(**inputs, max_new_tokens=256, do_sample=False,
                                 pad_token_id=pad)
        gen = out[0][inputs["input_ids"].shape[1]:]
        return tok.decode(gen, skip_special_tokens=True).strip().strip('"')
    except Exception as e:  # noqa
        import traceback as _tb
        print(f"[translate] {engine} failed, using original text: "
              f"{type(e).__name__}: {e}")
        _tb.print_exc()
        return text


# ---------------------------------------------------------------------------
# Download helpers (Civitai / arbitrary URL → local cache)
# ---------------------------------------------------------------------------
def _download_url(url):
    """Download a (Civitai or other) URL to the local cache and return the path."""
    if not url:
        return None
    fname = hashlib.sha1(url.encode()).hexdigest()[:16] + ".safetensors"
    dest = DOWNLOAD_DIR / fname
    if dest.exists() and dest.stat().st_size > 1_000_000:
        return str(dest)

    dl_url = url
    if "civitai.com" in url and CIVITAI_TOKEN and "token=" not in url:
        sep = "&" if "?" in url else "?"
        dl_url = f"{url}{sep}token={CIVITAI_TOKEN}"

    req = urllib.request.Request(dl_url, headers={"User-Agent": "Mozilla/5.0"})
    print(f"[download] {url} -> {dest}")
    with urllib.request.urlopen(req) as resp, open(dest, "wb") as out:
        while True:
            chunk = resp.read(1 << 20)
            if not chunk:
                break
            out.write(chunk)
    # A real model is many MB; a tiny file means Civitai returned a login/redirect page.
    if dest.stat().st_size < 1_000_000:
        dest.unlink(missing_ok=True)
        raise ValueError(
            "ดาวน์โหลดโมเดลจาก Civitai ไม่สำเร็จ — โมเดลนี้ต้องตั้งค่า CIVITAI_TOKEN "
            "ใน Space Settings → Variables and secrets ก่อน / Civitai download failed: "
            "set CIVITAI_TOKEN in the Space secrets to use this model."
        )
    return str(dest)


# ---------------------------------------------------------------------------
# Pipeline cache
# ---------------------------------------------------------------------------
# Keyed by model id. Stores the base txt2img pipeline (CPU). Adapters are loaded
# on demand and tracked via the `_cs_adapter` attribute on the pipe.
_PIPE_CACHE = {}
_FACE_APP = None  # lazy insightface FaceAnalysis


def _free_cache(keep_id=None):
    """Evict cached pipelines except keep_id to bound memory (simple LRU-ish)."""
    for k in list(_PIPE_CACHE.keys()):
        if k != keep_id:
            del _PIPE_CACHE[k]
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()


def _build_base_pipeline(cfg):
    """Construct the txt2img pipeline for a model config (on CPU)."""
    base = cfg["base"]
    common = dict(token=HF_TOKEN)
    # Some checkpoint merges overflow to NaN in fp16 (rainbow-noise output);
    # such models set "dtype": "fp32" in the registry.
    dt = torch.float32 if cfg.get("dtype") == "fp32" else DTYPE_SD

    if base == "sd15":
        from diffusers import StableDiffusionPipeline
        if cfg.get("single_file_url"):
            local = _download_url(cfg["single_file_url"])
            pipe = StableDiffusionPipeline.from_single_file(
                local, torch_dtype=dt, safety_checker=None
            )
        else:
            pipe = StableDiffusionPipeline.from_pretrained(
                cfg["repo_id"], torch_dtype=dt, safety_checker=None, **common
            )

    elif base == "sdxl":
        from diffusers import StableDiffusionXLPipeline
        if cfg.get("single_file_url"):
            local = _download_url(cfg["single_file_url"])
            pipe = StableDiffusionXLPipeline.from_single_file(local, torch_dtype=dt)
        else:
            pipe = StableDiffusionXLPipeline.from_pretrained(
                cfg["repo_id"], torch_dtype=dt, **common
            )

    elif base == "flux":
        from diffusers import FluxPipeline
        pipe = FluxPipeline.from_pretrained(cfg["repo_id"], torch_dtype=DTYPE, **common)

    else:
        raise ValueError(f"Unknown base family: {base}")

    # Apply LoRA if this entry is a LoRA model.
    if cfg.get("type") == "lora":
        scale = float(cfg.get("lora_scale", 0.8))
        # Resolve to a local .safetensors path (HF repo or direct/Civitai URL).
        if cfg.get("lora_repo_id"):
            from huggingface_hub import hf_hub_download
            local = hf_hub_download(cfg["lora_repo_id"], cfg["lora_weight_name"]) \
                if cfg.get("lora_weight_name") else None
            if local is None:
                pipe.load_lora_weights(cfg["lora_repo_id"])
                local = "__loaded__"
        else:
            local = _download_url(cfg.get("lora_url"))

        if local and local != "__loaded__":
            try:
                pipe.load_lora_weights(local)
            except Exception as e:  # noqa
                # Some Civitai/kohya LoRAs carry text-encoder keys diffusers can't
                # convert ("list index out of range"). Retry with UNet-only keys —
                # the UNet holds most of the character/style effect.
                print(f"[lora] full load failed ({e}); retrying UNet-only")
                from safetensors.torch import load_file
                sd = load_file(local)
                sd = {k: v for k, v in sd.items() if not k.startswith("lora_te")}
                pipe.load_lora_weights(sd)
        try:
            pipe.fuse_lora(lora_scale=scale)
        except Exception as e:  # noqa
            print(f"[lora] fuse skipped: {e}")

    # Optional VAE override (known-good VAE for models with a broken one).
    if cfg.get("vae"):
        from diffusers import AutoencoderKL
        pipe.vae = AutoencoderKL.from_pretrained(cfg["vae"], torch_dtype=dt)

    # SD1.5 / SDXL community checkpoints are tuned for the Euler Ancestral sampler;
    # it matches the look people get in A1111 / ComfyUI far better than the default.
    if base in ("sd15", "sdxl"):
        from diffusers import EulerAncestralDiscreteScheduler
        pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config)

    pipe.set_progress_bar_config(disable=True)
    pipe._cs_adapter = None  # track loaded IP-Adapter / FaceID state
    return pipe


def get_pipeline(cfg):
    """Return a cached base pipeline for the model, building it if needed."""
    mid = cfg["id"]
    if mid not in _PIPE_CACHE:
        _free_cache(keep_id=None)  # one big model at a time on ZeroGPU
        print(f"[pipeline] building {mid} ({cfg['base']})")
        _PIPE_CACHE[mid] = _build_base_pipeline(cfg)
    return _PIPE_CACHE[mid]


# ---------------------------------------------------------------------------
# Adapter management (IP-Adapter / FaceID)
# ---------------------------------------------------------------------------
_IP_ADAPTER_SPECS = {
    "sd15": {
        "ip_adapter": dict(repo="h94/IP-Adapter", subfolder="models",
                           weight_name="ip-adapter-plus_sd15.bin"),
        "face_id": dict(repo="h94/IP-Adapter-FaceID", subfolder=None,
                        weight_name="ip-adapter-faceid_sd15.bin",
                        image_encoder_folder=None),
    },
    "sdxl": {
        "ip_adapter": dict(repo="h94/IP-Adapter", subfolder="sdxl_models",
                           weight_name="ip-adapter-plus_sdxl_vit-h.bin"),
        "face_id": dict(repo="h94/IP-Adapter-FaceID", subfolder=None,
                        weight_name="ip-adapter-faceid_sdxl.bin",
                        image_encoder_folder=None),
    },
}


def _ensure_adapter(pipe, base, mode):
    """Load the right IP-Adapter for `mode`, unloading any previous one."""
    want = mode if mode in ("ip_adapter", "face_id") else None
    if pipe._cs_adapter == want:
        return
    try:
        pipe.unload_ip_adapter()
    except Exception:
        pass
    pipe._cs_adapter = None
    if want is None:
        return
    spec = _IP_ADAPTER_SPECS[base][want]
    kwargs = {k: v for k, v in spec.items() if k != "repo"}
    pipe.load_ip_adapter(spec["repo"], **kwargs)
    pipe._cs_adapter = want


def _get_face_app():
    global _FACE_APP
    if _FACE_APP is None:
        from insightface.app import FaceAnalysis
        app = FaceAnalysis(name="buffalo_l",
                           providers=["CUDAExecutionProvider", "CPUExecutionProvider"])
        app.prepare(ctx_id=0, det_size=(640, 640))
        _FACE_APP = app
    return _FACE_APP


def _face_embeds(image):
    """Return a torch tensor of FaceID embeddings for the largest face."""
    import numpy as np
    import cv2
    app = _get_face_app()
    arr = cv2.cvtColor(np.array(image.convert("RGB")), cv2.COLOR_RGB2BGR)
    faces = app.get(arr)
    if not faces:
        raise ValueError("ไม่พบใบหน้าในรูปต้นแบบ / No face detected in the reference image.")
    faces = sorted(faces, key=lambda f: (f.bbox[2] - f.bbox[0]) * (f.bbox[3] - f.bbox[1]))
    emb = torch.from_numpy(faces[-1].normed_embedding)  # [512]
    # diffusers IP-Adapter-FaceID expects [2, 1, 1, 512]: [neg, pos] for CFG.
    emb = emb.unsqueeze(0).unsqueeze(0).unsqueeze(0)     # [1, 1, 1, 512]
    return torch.cat([torch.zeros_like(emb), emb], dim=0).to(DTYPE_SD)


# ---------------------------------------------------------------------------
# Generation
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# ControlNet (OpenPose) — locks the generated subject to an uploaded pose.
# ---------------------------------------------------------------------------
_CONTROLNET = {}
_OPENPOSE = None


def _get_controlnet(base):
    if base in _CONTROLNET:
        return _CONTROLNET[base]
    from diffusers import ControlNetModel
    repos = {
        "sd15": "lllyasviel/control_v11p_sd15_openpose",
        "sdxl": "xinsir/controlnet-openpose-sdxl-1.0",
    }
    if base not in repos:
        raise ValueError("Pose (ControlNet) รองรับ SD1.5 / SDXL เท่านั้น.")
    cn = ControlNetModel.from_pretrained(repos[base], torch_dtype=DTYPE_SD)
    _CONTROLNET[base] = cn
    return cn


def _get_openpose():
    global _OPENPOSE
    if _OPENPOSE is None:
        from controlnet_aux import OpenposeDetector
        _OPENPOSE = OpenposeDetector.from_pretrained("lllyasviel/Annotators")
    return _OPENPOSE


def _safe_call(pipe_obj, call):
    """Run the pipeline; if clip_skip trips a version incompatibility, retry without it."""
    try:
        return pipe_obj(**call).images[0]
    except (AttributeError, TypeError) as e:
        if "clip_skip" in call:
            print(f"[clip_skip] disabled for this run due to: {e}")
            call.pop("clip_skip", None)
            return pipe_obj(**call).images[0]
        raise


def run_generation(cfg, mode, prompt, negative_prompt, ref_image,
                   steps, guidance, denoise, ip_scale, width, height, seed):
    """Run one generation. MUST be called inside a @spaces.GPU context."""
    base = cfg["base"]
    if mode not in SUPPORTED_MODES[base]:
        raise ValueError(
            f"โหมด '{MODE_LABELS.get(mode, mode)}' ใช้กับ base {base.upper()} ไม่ได้ "
            f"(รองรับ: {', '.join(MODE_LABELS[m] for m in SUPPORTED_MODES[base])})"
        )

    pipe = get_pipeline(cfg)
    pipe = pipe.to(DEVICE)

    generator = None
    if seed is not None and int(seed) >= 0:
        generator = torch.Generator(device=DEVICE).manual_seed(int(seed))

    full_prompt = prompt
    if cfg.get("trigger"):
        full_prompt = f"{cfg['trigger']}, {prompt}".strip(", ")

    call = dict(
        prompt=full_prompt,
        num_inference_steps=int(steps),
        generator=generator,
        width=int(width),
        height=int(height),
    )

    # FLUX uses `guidance_scale` differently and has no negative prompt.
    if base == "flux":
        call["guidance_scale"] = float(guidance)
    else:
        call["guidance_scale"] = float(guidance)
        call["negative_prompt"] = negative_prompt or None

    # ----- mode wiring -----
    if mode == "txt2img":
        _ensure_adapter(pipe, base, None)

    elif mode == "img2img":
        _ensure_adapter(pipe, base, None) if base != "flux" else None
        if ref_image is None:
            raise ValueError("img2img ต้องอัปโหลดรูปต้นแบบก่อน / Upload a reference image first.")
        from diffusers import AutoPipelineForImage2Image
        i2i = AutoPipelineForImage2Image.from_pipe(pipe).to(DEVICE)
        call.pop("width"); call.pop("height")
        call["image"] = ref_image.convert("RGB")
        call["strength"] = float(denoise)
        return _safe_call(i2i, call)

    elif mode == "ip_adapter":
        if ref_image is None:
            raise ValueError("IP-Adapter ต้องอัปโหลดรูปต้นแบบก่อน / Upload a reference image first.")
        _ensure_adapter(pipe, base, "ip_adapter")
        pipe.set_ip_adapter_scale(float(ip_scale))
        call["ip_adapter_image"] = ref_image.convert("RGB")

    elif mode == "face_id":
        if ref_image is None:
            raise ValueError("Face identity ต้องอัปโหลดรูปใบหน้าก่อน / Upload a face image first.")
        _ensure_adapter(pipe, base, "face_id")
        pipe.set_ip_adapter_scale(float(ip_scale))
        embeds = _face_embeds(ref_image).to(DEVICE)
        call["ip_adapter_image_embeds"] = [embeds]

    elif mode == "pose":
        if ref_image is None:
            raise ValueError("Pose ต้องอัปโหลดรูปท่าทางก่อน / Upload a pose reference image first.")
        _ensure_adapter(pipe, base, None)
        detector = _get_openpose()
        pose_img = detector(ref_image.convert("RGB")).resize((int(width), int(height)))
        cn = _get_controlnet(base).to(DEVICE)
        if base == "sdxl":
            from diffusers import StableDiffusionXLControlNetPipeline
            cn_pipe = StableDiffusionXLControlNetPipeline.from_pipe(pipe, controlnet=cn).to(DEVICE)
        else:
            from diffusers import StableDiffusionControlNetPipeline
            cn_pipe = StableDiffusionControlNetPipeline.from_pipe(pipe, controlnet=cn).to(DEVICE)
        call["image"] = pose_img
        return _safe_call(cn_pipe, call)

    return _safe_call(pipe, call)