""" SYSTMS ACTION — gradio.Server backend with HuggingFace OAuth. Architecture: - gradio.Server (FastAPI-compatible) serves both the @app.api inference endpoint and the static React+Babel frontend in static/. - README.md sets hf_oauth: true, which gives us a /login/huggingface OAuth flow. - After the user signs in, the HF session cookie is set on the .hf.space domain. Same-origin fetches from our frontend automatically forward it, and the spaces package's ZeroGPU scheduler then recognises the request as a logged-in user and bills against their personal quota instead of the tiny anonymous pool. Local dev: set MOCK_MODE=true to skip heavy ML imports. """ import os import re import threading import time import uuid from pathlib import Path from PIL import Image from huggingface_hub import HfApi from gradio import Server from gradio.data_classes import FileData from fastapi import HTTPException, Request from fastapi.responses import HTMLResponse, Response, FileResponse, RedirectResponse # --- Spaces runtime: imported at module top-level so ZeroGPU detects @spaces.GPU. try: import spaces except ImportError: class _SpacesStub: @staticmethod def GPU(*args, **kwargs): def decorator(fn): return fn return decorator spaces = _SpacesStub() app = Server() # OAuth routes (/login/huggingface, /login/callback, /logout) are auto-attached # by gradio.Blocks().launch() but not by gradio.Server. Attach them manually so # the SIGN IN button has somewhere to send the user. try: from gradio.oauth import attach_oauth attach_oauth(app) except Exception as e: print(f"[oauth] attach_oauth failed: {e}", flush=True) # Capture each /gradio_api/* request's headers in a contextvar so the @app.api # handler can introspect them (gradio.Server's @app.api treats every parameter # as an input field, so we can't take `request: Request` as a function arg). from contextvars import ContextVar _current_request: ContextVar = ContextVar("current_request", default=None) @app.middleware("http") async def _capture_request(request, call_next): if request.url.path.startswith("/gradio_api/"): _current_request.set(request) return await call_next(request) STATIC = Path(__file__).parent / "static" # Session-keyed result store — lets the frontend recover a finished generation # after a tab-away/screen-lock dropped the websocket. Files are GC'd after TTL. RESULTS_DIR = Path("/tmp/action_results") RESULTS_DIR.mkdir(parents=True, exist_ok=True) RESULT_TTL_SECONDS = 3600 _SESSION_ID_RE = re.compile(r"^[A-Za-z0-9_-]{8,64}$") def _cleanup_old_results() -> None: now = time.time() for p in RESULTS_DIR.glob("*.png"): try: if now - p.stat().st_mtime > RESULT_TTL_SECONDS: p.unlink(missing_ok=True) except Exception: pass # --- Config --- MOCK_MODE = os.environ.get("MOCK_MODE", "false").lower() == "true" MAX_INPUT_SIZE = 4096 FIXED_PROMPT = "action the scene" NEGATIVE_PROMPT = ( "worst quality, low quality, bad anatomy, bad hands, text, error, " "missing fingers, extra digit, fewer digits, cropped, jpeg artifacts, " "signature, watermark, username, blurry" ) DEFAULT_STEPS = 8 DEFAULT_GUIDANCE = 1.0 ACTION_LORA_REPO = "systms/SYSTMS-ACTION-LoRA-Qwen-Image-Edit-2511" ACTION_LORA_FILE = "QWEN_EDIT_ACTION_V1.safetensors" LIGHTNING_LORA_REPO = "lightx2v/Qwen-Image-Edit-2511-Lightning" LIGHTNING_LORA_FILE = "Qwen-Image-Edit-2511-Lightning-8steps-V1.0-fp32.safetensors" # Same gallery datasets as INFL8 — every input + output gets pushed here # asynchronously so the team can browse what users are making. # HF_WRITE_TOKEN is a separate write-scoped token for the dataset pushes; the # existing HF_TOKEN (read-only, used to pull the LoRA) doesn't have write access. INPUT_DATASET_ID = "systms/image-edit-inputs" OUTPUT_DATASET_ID = "systms/image-edit-outputs" HF_WRITE_TOKEN = os.environ.get("HF_WRITE_TOKEN") or os.environ.get("HF_TOKEN") _hf_api = HfApi(token=HF_WRITE_TOKEN) if HF_WRITE_TOKEN else None def _upload_to_dataset(local_path: Path, dataset_id: str, remote_name: str) -> None: if _hf_api is None: return try: _hf_api.upload_file( path_or_fileobj=str(local_path), path_in_repo=remote_name, repo_id=dataset_id, repo_type="dataset", ) except Exception as e: print(f"[gallery] upload to {dataset_id} failed: {e}", flush=True) def _async_upload(local_path: Path, dataset_id: str, remote_name: str) -> None: threading.Thread( target=_upload_to_dataset, args=(local_path, dataset_id, remote_name), daemon=True, ).start() # --- Heavy ML imports & pipeline loading (skipped in mock mode) --- pipe = None if not MOCK_MODE: import torch from diffusers import QwenImageEditPlusPipeline dtype = torch.bfloat16 device = "cuda" if torch.cuda.is_available() else "cpu" pipe = QwenImageEditPlusPipeline.from_pretrained( "Qwen/Qwen-Image-Edit-2511", torch_dtype=dtype, ).to(device) pipe.load_lora_weights( ACTION_LORA_REPO, weight_name=ACTION_LORA_FILE, adapter_name="action", ) pipe.load_lora_weights( LIGHTNING_LORA_REPO, weight_name=LIGHTNING_LORA_FILE, adapter_name="lightning", ) pipe.set_adapters(["action", "lightning"], adapter_weights=[1.0, 1.0]) @spaces.GPU(duration=90) def _run_inference(image: Image.Image) -> Image.Image: if MOCK_MODE: time.sleep(2) return image import torch generator = torch.Generator(device="cuda").manual_seed(0) # Don't pass width/height and don't pre-resize. The pipeline's VAE preprocessing # uses calculate_dimensions(1024², ratio) on the raw input for both the latent # encoding and (when width/height are unset) the output canvas. Forcing different # dims for either side causes a latent/canvas mismatch that surfaces as cropping. result = pipe( image=image, prompt=FIXED_PROMPT, negative_prompt=NEGATIVE_PROMPT, num_inference_steps=DEFAULT_STEPS, generator=generator, true_cfg_scale=DEFAULT_GUIDANCE, ).images[0] return result # --- Gradio API endpoint (called from the browser via direct fetch) --- @app.api(name="action") def action(image_path: FileData, session_id: str = "") -> FileData: """Accept an image, run the ACTION pipeline, return the result.""" import traceback try: _cleanup_old_results() # Debug: which auth signals reached us? req = _current_request.get() if req is not None: ip_token = req.headers.get("x-ip-token") oauth = None try: oauth = req.session.get("oauth_info", {}).get("userinfo") except Exception: pass print( f"[action] auth: x-ip-token={'yes' if ip_token else 'NO'} " f"oauth_user={oauth.get('preferred_username') if oauth else 'NO'}", flush=True, ) print(f"[action] received: path={image_path['path']} size={image_path.get('size')}", flush=True) im = Image.open(image_path["path"]).convert("RGB") print(f"[action] decoded: {im.size}", flush=True) w, h = im.size if max(w, h) > MAX_INPUT_SIZE: scale = MAX_INPUT_SIZE / max(w, h) im = im.resize((int(w * scale), int(h * scale)), Image.LANCZOS) print(f"[action] resized to: {im.size}", flush=True) # Session id from frontend lets us serve the result via /result/ # if the user tabs away and comes back. Fall back to a fresh uuid if # the client didn't supply one. sid = session_id if _SESSION_ID_RE.match(session_id or "") else uuid.uuid4().hex input_path = RESULTS_DIR / f"{sid}_input.png" output_path = RESULTS_DIR / f"{sid}_output.png" im.save(input_path, "PNG") print("[action] calling _run_inference", flush=True) result = _run_inference(im) print(f"[action] inference returned: {result.size}", flush=True) # Keep result at the pipeline's native ~1024² resolution. Resizing down # to match small inputs throws away detail; resizing up doesn't add any. result.save(output_path, "PNG") print(f"[action] saved to: {output_path} (sid={sid})", flush=True) # Push to gallery datasets in background — doesn't block the response. ts = time.strftime("%Y%m%d_%H%M%S") remote = f"action/{ts}_{sid}.png" _async_upload(input_path, INPUT_DATASET_ID, remote) _async_upload(output_path, OUTPUT_DATASET_ID, remote) return FileData(path=str(output_path)) except Exception as e: print(f"[action] FAILED: {type(e).__name__}: {e}", flush=True) traceback.print_exc() raise @app.get("/result/{session_id}") async def get_result(session_id: str, type: str = "output"): if not _SESSION_ID_RE.match(session_id): raise HTTPException(status_code=400) if type not in ("input", "output"): raise HTTPException(status_code=400) target = RESULTS_DIR / f"{session_id}_{type}.png" if not target.is_file(): raise HTTPException(status_code=404) return FileResponse(target, media_type="image/png") # --- Static file serving --- def _text_response(filename: str, media_type: str): return Response( content=(STATIC / filename).read_text(encoding="utf-8"), media_type=media_type, ) @app.get("/") async def homepage(): return HTMLResponse(content=(STATIC / "index.html").read_text(encoding="utf-8")) @app.get("/styles.css") async def css_styles(): return _text_response("styles.css", "text/css") @app.get("/action.css") async def css_action(): return _text_response("action.css", "text/css") @app.get("/sections.css") async def css_sections(): return _text_response("sections.css", "text/css") @app.get("/action-variants.jsx") async def jsx_variants(): return _text_response("action-variants.jsx", "application/javascript") @app.get("/action-sections.jsx") async def jsx_sections(): return _text_response("action-sections.jsx", "application/javascript") @app.get("/flow.jsx") async def jsx_flow(): return _text_response("flow.jsx", "application/javascript") @app.get("/assets/{filename}") async def asset(filename: str): assets_dir = (STATIC / "assets").resolve() target = (assets_dir / filename).resolve() if not str(target).startswith(str(assets_dir)) or not target.is_file(): raise HTTPException(status_code=404) suffix = target.suffix.lower() mime = { ".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".webp": "image/webp", ".svg": "image/svg+xml", }.get(suffix, "application/octet-stream") return FileResponse(target, media_type=mime) # --- HF OAuth helpers --- # /login → start OAuth flow (redirect to /login/huggingface which HF Spaces injects) # /logout → clear session # /me → return JSON with the logged-in user's name (or null) @app.get("/login") async def login_redirect(): return RedirectResponse(url="/login/huggingface") @app.get("/me") async def me(request: Request): session = getattr(request, "session", {}) or {} user = session.get("oauth_info", {}).get("userinfo") if isinstance(session, dict) else None if not user: return {"logged_in": False} return { "logged_in": True, "name": user.get("preferred_username") or user.get("name"), "avatar": user.get("picture"), } app.launch(show_error=True)