| from __future__ import annotations |
|
|
| import base64 |
| import textwrap |
|
|
|
|
| def _encode(script: str) -> str: |
| return base64.b64encode(script.encode("utf-8")).decode("ascii") |
|
|
|
|
| UNIVERSAL_MODEL_CARD_WORKER_SCRIPT = r''' |
| |
| import json |
| import os |
| import re |
| import shutil |
| import subprocess |
| import sys |
| import time |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from textwrap import dedent |
| |
| TARGET_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{1,95}/[A-Za-z0-9][A-Za-z0-9._-]{1,95}$") |
| GIST_URL = "https://gist.github.com/gary149/2aba2962375fa9ca56bb9ef53f00b73d" |
| DEFAULT_MODEL_ID = "sshleifer/tiny-gpt2" |
| |
| |
| def now(): |
| return datetime.now(timezone.utc).isoformat() |
| |
| |
| def write_json(path: Path, payload: dict): |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") |
| |
| |
| def append_event(path: Path, step: str, status: str, message: str, data: dict | None = None): |
| path.parent.mkdir(parents=True, exist_ok=True) |
| event = {"ts": now(), "step": step, "status": status, "message": message, "data": data or {}} |
| line = json.dumps(event, ensure_ascii=False) |
| with path.open("a", encoding="utf-8") as f: |
| f.write(line + "\n") |
| print(line, flush=True) |
| |
| |
| def redact_text(text: str | None) -> str: |
| if not text: |
| return "" |
| value = text |
| for secret_name in ["HF_TOKEN", "HUGGING_FACE_HUB_TOKEN"]: |
| secret = os.environ.get(secret_name) |
| if secret: |
| value = value.replace(secret, "[REDACTED]") |
| value = re.sub(r"Bearer\s+[A-Za-z0-9_\-.=]+", "Bearer [REDACTED]", value) |
| value = re.sub(r"hf_[A-Za-z0-9_\-]{10,}", "hf_[REDACTED]", value) |
| return value |
| |
| |
| def safe_details(details: dict | None) -> dict: |
| if not details: |
| return {} |
| try: |
| return json.loads(redact_text(json.dumps(details, ensure_ascii=False))) |
| except Exception: |
| return {"redacted_details": redact_text(str(details))[-4000:]} |
| |
| |
| def fail(run_dir: Path, events_path: Path, message: str, details: dict | None = None, status: str = "failed"): |
| safe = safe_details(details) |
| append_event(events_path, "failure", "failed", message, safe) |
| write_json(run_dir / "state.json", { |
| "run_id": os.environ.get("RUN_ID"), |
| "kind": "universal_model_card_builder", |
| "status": status, |
| "message": message, |
| "updated_at": now(), |
| "details": safe, |
| }) |
| report = f"""# Agentic Space Factory — model Article Reproduction Report |
| |
| Status: **{status}** |
| |
| {message} |
| |
| ```json |
| {json.dumps(safe, indent=2, ensure_ascii=False)} |
| ``` |
| """ |
| (run_dir / "report.md").write_text(report, encoding="utf-8") |
| raise SystemExit(1) |
| |
| |
| def run_cmd(cmd: list[str], *, cwd: Path | None = None, env: dict | None = None, timeout: int = 600): |
| result = subprocess.run(cmd, cwd=str(cwd) if cwd else None, env=env, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=timeout) |
| return result.returncode, redact_text(result.stdout) |
| |
| |
| def install_python_deps(events_path: Path): |
| append_event(events_path, "dependencies", "started", "Installing Python worker dependencies") |
| code, out = run_cmd([sys.executable, "-m", "pip", "install", "-q", "--upgrade", "huggingface_hub>=1.0.0", "gradio_client>=2.0.0", "requests>=2.31.0"], timeout=600) |
| if code != 0: |
| append_event(events_path, "dependencies", "failed", "Python dependency installation failed", {"output_tail": out[-4000:]}) |
| raise RuntimeError(out) |
| append_event(events_path, "dependencies", "success", "Python worker dependencies installed") |
| |
| |
| def ensure_node(events_path: Path): |
| node = shutil.which("node") |
| npm = shutil.which("npm") |
| if node and npm: |
| _, node_v = run_cmd([node, "--version"], timeout=30) |
| _, npm_v = run_cmd([npm, "--version"], timeout=30) |
| append_event(events_path, "node", "success", "Node/npm already available", {"node": node_v.strip(), "npm": npm_v.strip()}) |
| return |
| append_event(events_path, "node", "started", "Installing nodejs/npm through apt-get") |
| code, out = run_cmd(["bash", "-lc", "apt-get update -qq && apt-get install -y -qq nodejs npm"], timeout=600) |
| if code != 0: |
| append_event(events_path, "node", "failed", "Could not install nodejs/npm", {"output_tail": out[-4000:]}) |
| raise RuntimeError(out) |
| append_event(events_path, "node", "success", "Installed nodejs/npm") |
| |
| |
| def install_pi(events_path: Path): |
| ensure_node(events_path) |
| append_event(events_path, "pi_install", "started", "Installing Pi coding agent from npm") |
| code, out = run_cmd(["npm", "install", "-g", "@mariozechner/pi-coding-agent"], timeout=900) |
| if code != 0: |
| append_event(events_path, "pi_install", "failed", "Pi npm installation failed", {"output_tail": out[-4000:]}) |
| raise RuntimeError(out) |
| code, version = run_cmd(["pi", "--version"], timeout=60) |
| append_event(events_path, "pi_install", "success", "Pi installed", {"version_output": version.strip()[-300:]}) |
| |
| |
| def configure_pi(events_path: Path, model: str): |
| pi_dir = Path.home() / ".pi" / "agent" |
| pi_dir.mkdir(parents=True, exist_ok=True) |
| (pi_dir / "auth.json").write_text(json.dumps({"huggingface": {"type": "api_key", "key": os.environ.get("HF_TOKEN", "")}}, indent=2), encoding="utf-8") |
| (pi_dir / "settings.json").write_text(json.dumps({"model": model, "provider": "huggingface", "autoRun": True, "autoApply": True}, indent=2), encoding="utf-8") |
| append_event(events_path, "pi_config", "success", "Configured Pi", {"model": model}) |
| |
| |
| def collect_pi_traces(run_dir: Path, events_path: Path): |
| traces_dir = Path.home() / ".pi" / "agent" / "sessions" |
| raw_dir = run_dir / "traces" / "raw" |
| redacted_dir = run_dir / "traces" / "redacted" |
| raw_dir.mkdir(parents=True, exist_ok=True) |
| redacted_dir.mkdir(parents=True, exist_ok=True) |
| count = 0 |
| if traces_dir.exists(): |
| for path in traces_dir.rglob("*.jsonl"): |
| rel = path.relative_to(traces_dir) |
| target_raw = raw_dir / rel |
| target_raw.parent.mkdir(parents=True, exist_ok=True) |
| text = path.read_text(encoding="utf-8", errors="ignore") |
| target_raw.write_text(text, encoding="utf-8") |
| target_redacted = redacted_dir / rel |
| target_redacted.parent.mkdir(parents=True, exist_ok=True) |
| target_redacted.write_text(redact_text(text), encoding="utf-8") |
| count += 1 |
| append_event(events_path, "traces", "success", "Collected Pi traces", {"count": count}) |
| return count |
| |
| |
| def sanitize_model_id(model_id: str) -> str: |
| model_id = (model_id or DEFAULT_MODEL_ID).strip().replace("https://huggingface.co/", "") |
| model_id = model_id.split("?", 1)[0].strip("/") |
| if not re.match(r"^[A-Za-z0-9_.-]+/[A-Za-z0-9_.-]+$", model_id): |
| raise ValueError("MODEL_ID must look like owner/model-name") |
| return model_id |
| |
| |
| def make_gradio_client(target_space_id: str, token: str): |
| import inspect |
| from gradio_client import Client |
| params = inspect.signature(Client).parameters |
| if "token" in params: |
| return Client(target_space_id, token=token) |
| if "hf_token" in params: |
| return Client(target_space_id, hf_token=token) |
| if "api_key" in params: |
| return Client(target_space_id, api_key=token) |
| if "headers" in params: |
| return Client(target_space_id, headers={"Authorization": f"Bearer {token}"}) |
| return Client(target_space_id) |
| |
| |
| def api_names_from_schema(schema) -> list[str]: |
| names: list[str] = [] |
| if isinstance(schema, dict): |
| endpoints = schema.get("named_endpoints") or schema.get("endpoints") or {} |
| if isinstance(endpoints, dict): |
| for key, value in endpoints.items(): |
| if isinstance(key, str) and key.startswith("/"): |
| names.append(key) |
| if isinstance(value, dict): |
| api_name = value.get("api_name") |
| if isinstance(api_name, str) and api_name.startswith("/"): |
| names.append(api_name) |
| if isinstance(schema.get("dependencies"), list): |
| for dep in schema["dependencies"]: |
| if isinstance(dep, dict): |
| api_name = dep.get("api_name") |
| if isinstance(api_name, str): |
| names.append(api_name if api_name.startswith("/") else f"/{api_name}") |
| return list(dict.fromkeys(names)) |
| |
| |
| def space_subdomain_url(target_space_id: str) -> str: |
| owner, name = target_space_id.split("/", 1) |
| # This matches the common Spaces app URL pattern. Keep conservative: our |
| # generated slugs are ASCII and hyphen-friendly. |
| return f"https://{owner}-{name}.hf.space".replace("_", "-").lower() |
| |
| |
| def runtime_to_dict(runtime) -> dict: |
| payload = {} |
| for attr in ["stage", "hardware", "requested_hardware", "sleep_time", "storage", "gc_timeout"]: |
| value = getattr(runtime, attr, None) |
| payload[attr] = getattr(value, "value", value) |
| return {k: str(v) if v is not None else None for k, v in payload.items()} |
| |
| |
| def write_space_runtime(api, target_space_id: str, token: str, run_dir: Path, events_path: Path, attempt: int | None = None) -> dict: |
| try: |
| runtime = api.get_space_runtime(repo_id=target_space_id, token=token) |
| payload = runtime_to_dict(runtime) |
| payload["attempt"] = attempt |
| write_json(run_dir / "space_runtime.json", payload) |
| return payload |
| except Exception as exc: |
| payload = {"error": str(exc)[:2000], "attempt": attempt} |
| write_json(run_dir / "space_runtime.json", payload) |
| append_event(events_path, "space_runtime", "warning", "Could not fetch Space runtime", payload) |
| return payload |
| |
| |
| def collect_space_logs(target_space_id: str, token: str, run_dir: Path, events_path: Path): |
| logs_dir = run_dir / "logs" |
| logs_dir.mkdir(parents=True, exist_ok=True) |
| env = os.environ.copy() |
| env["HF_TOKEN"] = token |
| commands = { |
| "space_logs_runtime.txt": ["hf", "spaces", "logs", target_space_id], |
| "space_logs_build.txt": ["hf", "spaces", "logs", target_space_id, "--build"], |
| } |
| written = [] |
| for filename, cmd in commands.items(): |
| try: |
| code, out = run_cmd(cmd, env=env, timeout=75) |
| (logs_dir / filename).write_text(out, encoding="utf-8") |
| written.append({"file": filename, "returncode": code, "tail": out[-1000:]}) |
| except Exception as exc: |
| written.append({"file": filename, "error": str(exc)[:1000]}) |
| append_event(events_path, "space_logs", "success", "Collected best-effort Space logs", {"files": written}) |
| return written |
| |
| |
| def validate_http_health(target_space_id: str, token: str, run_dir: Path, events_path: Path, attempt: int): |
| import requests |
| base_url = space_subdomain_url(target_space_id) |
| url = base_url.rstrip("/") + "/health" |
| headers = {"Authorization": f"Bearer {token}", "Accept": "application/json,text/plain,*/*"} |
| response = requests.get(url, headers=headers, timeout=20) |
| payload = { |
| "status": "success" if response.ok else "failed", |
| "attempt": attempt, |
| "url": url, |
| "status_code": response.status_code, |
| "content_type": response.headers.get("content-type"), |
| "text": response.text[:2000], |
| } |
| if response.ok: |
| try: |
| payload["json"] = response.json() |
| except Exception: |
| pass |
| write_json(run_dir / "tests" / "http_health.json", payload) |
| write_json(run_dir / "tests" / "test_result.json", payload | {"validator": "http_get_health"}) |
| append_event(events_path, "api_validation", "success", "HTTP /health validation passed", {"attempt": attempt, "url": url, "status_code": response.status_code}) |
| return payload | {"validator": "http_get_health"} |
| raise RuntimeError(f"HTTP /health returned {response.status_code}: {response.text[:500]}") |
| |
| |
| def validate_gradio_api(target_space_id: str, token: str, run_dir: Path, events_path: Path, attempt: int): |
| client = make_gradio_client(target_space_id, token) |
| schema = client.view_api(return_format="dict") |
| write_json(run_dir / "tests" / "api_schema.json", schema if isinstance(schema, dict) else {"schema": str(schema)}) |
| discovered = api_names_from_schema(schema) |
| candidates = [] |
| for name in ["/health", "/predict", "/greet"] + discovered: |
| if name not in candidates: |
| candidates.append(name) |
| errors = [] |
| for api_name in candidates: |
| try: |
| if api_name == "/greet": |
| result = client.predict("Agentic Space Factory", api_name=api_name) |
| else: |
| result = client.predict(api_name=api_name) |
| payload = {"status": "success", "attempt": attempt, "api_name": api_name, "discovered_api_names": discovered, "result_repr": repr(result)[:2000], "validator": "gradio_client"} |
| write_json(run_dir / "tests" / "test_result.json", payload) |
| append_event(events_path, "api_validation", "success", "Gradio API validation passed", {"attempt": attempt, "api_name": api_name, "discovered_api_names": discovered}) |
| return payload |
| except Exception as exc: |
| errors.append({"api_name": api_name, "error": str(exc)[:1000]}) |
| raise RuntimeError("; ".join(f"{e['api_name']}: {e['error']}" for e in errors[:5]) or "No callable API endpoints found") |
| |
| |
| def validate_live_api(api, target_space_id: str, token: str, run_dir: Path, events_path: Path, timeout_s: int = 900): |
| append_event(events_path, "api_validation", "started", "Waiting for live HTTP /health or Gradio API to become available") |
| deadline = time.time() + timeout_s |
| attempt = 0 |
| last_error = None |
| runtime_error_count = 0 |
| while time.time() < deadline: |
| attempt += 1 |
| runtime_payload = write_space_runtime(api, target_space_id, token, run_dir, events_path, attempt) |
| stage = str(runtime_payload.get("stage") or "").upper() |
| if "RUNTIME_ERROR" in stage: |
| runtime_error_count += 1 |
| collect_space_logs(target_space_id, token, run_dir, events_path) |
| last_error = f"Space runtime stage is {stage}" |
| if runtime_error_count >= 2: |
| raise RuntimeError(f"Space is in RUNTIME_ERROR. See logs/space_logs_runtime.txt and logs/space_logs_build.txt. Last runtime: {runtime_payload}") |
| try: |
| return validate_http_health(target_space_id, token, run_dir, events_path, attempt) |
| except Exception as exc: |
| last_error = f"HTTP /health failed: {exc}" |
| try: |
| return validate_gradio_api(target_space_id, token, run_dir, events_path, attempt) |
| except Exception as exc: |
| last_error = (last_error or "") + f"; Gradio API failed: {exc}" |
| append_event(events_path, "api_validation", "waiting", "Live health/API not ready yet", {"attempt": attempt, "runtime": runtime_payload, "error": last_error[-1500:] if last_error else None}) |
| time.sleep(30) |
| collect_space_logs(target_space_id, token, run_dir, events_path) |
| raise RuntimeError(f"Live health/API validation did not pass before timeout: {last_error}") |
| |
| |
| def is_auth_or_billing_like_error(error: str | None) -> bool: |
| value = error or "" |
| markers = [ |
| "401", |
| "402", |
| "403", |
| "Invalid username or password", |
| "Unauthorized", |
| "Repository Not Found", |
| "payment", |
| "billing", |
| "quota", |
| "grant", |
| ] |
| return any(marker.lower() in value.lower() for marker in markers) |
| |
| |
| def request_hardware(api, target_space_id: str, hardware: str, token: str, events_path: Path, step: str, retries: int = 2): |
| """Best-effort hardware request after Space creation. |
| |
| V23 tries hardware at create_repo time first. This function remains as a |
| fallback for cases where a Space was created on CPU and the Hub later |
| accepts a hardware switch. Auth/billing/quota errors are not retried. |
| """ |
| if not hardware: |
| return {"phase": "post_create_request", "requested": False, "hardware": hardware, "ok": False, "error": "empty hardware"} |
| last_error = None |
| for attempt in range(1, retries + 1): |
| try: |
| runtime = api.request_space_hardware(repo_id=target_space_id, hardware=hardware, token=token) |
| payload = { |
| "phase": "post_create_request", |
| "requested": True, |
| "hardware": hardware, |
| "ok": True, |
| "attempt": attempt, |
| "runtime_stage": getattr(getattr(runtime, "stage", None), "value", str(getattr(runtime, "stage", None))), |
| "requested_hardware": getattr(runtime, "requested_hardware", None), |
| "hardware_current": getattr(runtime, "hardware", None), |
| } |
| append_event(events_path, step, "success", f"Requested Space hardware {hardware}", payload) |
| return payload |
| except Exception as exc: |
| last_error = str(exc)[:2000] |
| auth_like = is_auth_or_billing_like_error(last_error) |
| payload = {"phase": "post_create_request", "attempt": attempt, "hardware": hardware, "error": last_error, "manual_action_required": auth_like} |
| append_event(events_path, step, "failed" if auth_like or attempt == retries else "waiting", f"Could not request Space hardware {hardware}", payload) |
| if auth_like: |
| return {"phase": "post_create_request", "requested": True, "hardware": hardware, "ok": False, "attempts": attempt, "error": last_error, "manual_action_required": True} |
| if attempt < retries: |
| time.sleep(8 * attempt) |
| return {"phase": "post_create_request", "requested": True, "hardware": hardware, "ok": False, "attempts": retries, "error": last_error, "manual_action_required": False} |
| |
| |
| def build_hardware_sequence(preferred_hardware: str, fallback_hardware: str, allow_fixed_gpu_fallback: bool) -> list[str]: |
| sequence = [] |
| for hw in ["zero-a10g", preferred_hardware, fallback_hardware if allow_fixed_gpu_fallback else None]: |
| value = (hw or "").strip() |
| if value and value not in sequence: |
| sequence.append(value) |
| return sequence |
| |
| |
| def create_space_with_hardware_strategy(api, target_space_id: str, token: str, preferred_hardware: str, fallback_hardware: str, allow_fixed_gpu_fallback: bool, events_path: Path): |
| """Create a private Space and request hardware as early as possible. |
| |
| HF supports `space_hardware` directly on create_repo. This is the cleanest |
| moment to request hardware because the Space does not need a second restart. |
| If OAuth/billing/quota prevents automatic hardware selection, fall back to |
| a normal private CPU Space and mark manual hardware as required. |
| """ |
| sequence = build_hardware_sequence(preferred_hardware, fallback_hardware, allow_fixed_gpu_fallback) |
| attempts = [] |
| |
| for hardware in sequence: |
| try: |
| append_event(events_path, "create_space_hardware", "started", f"Creating private Space with requested hardware {hardware}", {"target_space": target_space_id, "hardware": hardware}) |
| api.create_repo( |
| repo_id=target_space_id, |
| repo_type="space", |
| space_sdk="gradio", |
| private=True, |
| exist_ok=False, |
| space_hardware=hardware, |
| token=token, |
| ) |
| payload = {"phase": "create_repo_space_hardware", "hardware": hardware, "ok": True, "target_space": target_space_id} |
| append_event(events_path, "create_space", "success", f"Private target Space created with requested hardware {hardware}", payload) |
| return {"created": True, "selected_hardware": hardware, "requested_sequence": sequence, "attempts": attempts + [payload], "manual_action_required": False} |
| except Exception as exc: |
| error = str(exc)[:2500] |
| manual = is_auth_or_billing_like_error(error) |
| payload = {"phase": "create_repo_space_hardware", "hardware": hardware, "ok": False, "error": error, "manual_action_required": manual} |
| attempts.append(payload) |
| append_event(events_path, "create_space_hardware", "failed", f"Could not create Space with requested hardware {hardware}", payload) |
| # Continue through the sequence: ZeroGPU quota/auth can fail while a fixed GPU |
| # may still be worth trying. If fixed GPU also fails, we'll create CPU below. |
| |
| append_event(events_path, "create_space", "started", "Creating private target Space on default CPU after hardware-at-creation attempts failed", {"target_space": target_space_id}) |
| api.create_repo(repo_id=target_space_id, repo_type="space", space_sdk="gradio", private=True, exist_ok=False, token=token) |
| cpu_payload = {"phase": "create_repo_default_cpu", "hardware": "cpu-basic", "ok": True, "target_space": target_space_id, "manual_action_required": True} |
| append_event(events_path, "create_space", "success", "Private target Space created on default CPU; manual hardware selection may be required", cpu_payload) |
| return {"created": True, "selected_hardware": "default-cpu-or-existing", "requested_sequence": sequence, "attempts": attempts + [cpu_payload], "manual_action_required": True} |
| |
| |
| def create_initial_workspace(workspace: Path, model_id: str, target_space_id: str, preferred_hardware: str, fallback_hardware: str, allow_fallback: bool, implementation_mode: str, model_analysis: dict | None = None): |
| workspace.mkdir(parents=True, exist_ok=True) |
| model_analysis = model_analysis or {} |
| pipeline_tag = model_analysis.get("pipeline_tag") |
| library_name = model_analysis.get("library_name") |
| tags = model_analysis.get("tags", [])[:40] |
| siblings = model_analysis.get("siblings", [])[:60] |
| app_py = f"""import gradio as gr |
| from huggingface_hub import model_info, list_repo_files |
| |
| MODEL_ID = {model_id!r} |
| TARGET_SPACE_ID = {target_space_id!r} |
| |
| |
| def health(): |
| return {{ |
| "status": "booted", |
| "model_id": MODEL_ID, |
| "target_space_id": TARGET_SPACE_ID, |
| "stage": "initial-scaffold", |
| "note": "Pi should replace this scaffold with a model-specific demo while preserving a cheap health endpoint.", |
| }} |
| |
| |
| def placeholder(*args): |
| return "Initial scaffold. Pi should replace this with a model-specific inference path, or write TECHNICAL_BLOCKERS.json." |
| |
| with gr.Blocks(title="Generated Model Space — Agentic Space Factory") as demo: |
| gr.Markdown("# Generated Model Space — Agentic Space Factory") |
| gr.Markdown(f"Private generated Space for `{{MODEL_ID}}`.") |
| gr.JSON(label="Health", value=health(), every=None) |
| gr.Button("Health check").click(fn=health, inputs=None, outputs=gr.JSON(), api_name="health") |
| gr.Textbox(label="Input", value="Hello from Agentic Space Factory").submit(fn=placeholder, inputs=None, outputs=gr.Textbox(), api_name="predict") |
| gr.Button("Run placeholder").click(fn=placeholder, inputs=None, outputs=gr.Textbox(), api_name="predict") |
| |
| if __name__ == "__main__": |
| demo.launch() |
| """ |
| (workspace / "app.py").write_text(app_py, encoding="utf-8") |
| req = """gradio>=6.0.0 |
| huggingface_hub>=0.34.0,<2.0.0 |
| spaces |
| transformers>=4.45.0,<6.0.0 |
| diffusers |
| accelerate |
| safetensors |
| torch |
| kernels |
| pillow |
| numpy |
| requests |
| """ |
| (workspace / "requirements.txt").write_text(req, encoding="utf-8") |
| readme = f"""--- |
| title: Generated Model Space |
| sdk: gradio |
| app_file: app.py |
| python_version: "3.10" |
| suggested_hardware: {preferred_hardware or fallback_hardware or "cpu-basic"} |
| short_description: "Agent-built model demo" |
| --- |
| |
| # Generated Model Space — Agentic Space Factory |
| |
| Private generated Space for `{model_id}`. |
| |
| This Space is created by Agentic Space Factory. It should remain private until manually reviewed. |
| """ |
| (workspace / "README.md").write_text(readme, encoding="utf-8") |
| analysis_json = json.dumps({"pipeline_tag": pipeline_tag, "library_name": library_name, "tags": tags, "siblings": siblings}, indent=2, ensure_ascii=False) |
| goal = f"""You are Pi running inside a Hugging Face Job for Agentic Space Factory. |
| |
| Goal: build the best possible private Hugging Face Space demo for an arbitrary model card. |
| |
| MODEL_ID: {model_id} |
| TARGET_SPACE_ID: {target_space_id} |
| IMPLEMENTATION_MODE: {implementation_mode} |
| MODEL_METADATA: |
| ```json |
| {analysis_json} |
| ``` |
| |
| First read and follow the operational rules from this gist: |
| {GIST_URL} |
| |
| Non-negotiable safety and product constraints: |
| - The target Space must remain private. |
| - Do not delete any user resources. |
| - Do not print secrets or tokens. |
| - Work only inside the current workspace. |
| - The wrapper will create the private Space, request hardware best-effort, upload files, and validate the live app. Do not create/delete repos yourself in this builder worker. |
| - Preserve a cheap health endpoint named `health` with `api_name="health"`. It must not load weights, run GPU work, or download large files. |
| - Do not pin huggingface_hub below 1.0. Use huggingface_hub>=0.34.0,<2.0.0 unless the model card requires a narrower compatible range. If transformers>=5 is used, keep huggingface_hub compatible with it, for example huggingface_hub>=1.5.0,<2.0.0. |
| - README.md frontmatter must remain valid; if it uses short_description, it must be 60 characters or fewer. |
| |
| Implementation contract: |
| - If IMPLEMENTATION_MODE is `full-inference-gated`, you are not allowed to silently replace generation with a placeholder and call it success. |
| - Try to implement the closest real inference path for the model card using evidence from README, model metadata, config files, and repo files. |
| - You may choose an appropriate Gradio UI for the task: text, image, audio, video, multimodal, embeddings, classification, etc. |
| - If the model is standard and feasible, implement a real generate/predict function and expose it as a Gradio endpoint. |
| - If the model requires GPU, add ZeroGPU-compatible `@spaces.GPU(...)` only around the inference function. Do not decorate health. |
| - If the model requires special dependencies, include them only when needed and document risks. |
| - Investigate compatibility fallbacks before declaring a blocker: PyTorch SDPA, xformers, HF Kernels where relevant, CPU/offload/lazy loading, smaller resolution/steps, safe smoke-test inputs. |
| - If real inference is impossible or unsafe in a Space, write TECHNICAL_BLOCKERS.json with concrete evidence for every blocker. |
| |
| Deliverables: |
| - app.py must boot on Hugging Face Spaces. |
| - app.py must expose health/api_name="health". |
| - If real generation is implemented, generate/predict must attempt a real model call, not only return a textual diagnostic. |
| - If real generation is not implemented, write TECHNICAL_BLOCKERS.json with: full_inference_implemented=false, blockers[], evidence[], minimum_runtime, and suggested_next_step. |
| - Write INFERENCE_CONTRACT.json with: full_inference_implemented, health_endpoint, primary_api_name, expected_output_type, validation_level, requires_gpu, estimated_vram, and blockers_count. |
| - README.md must explain the runtime strategy, task, limitations, and how to test. |
| - Write a concise PI_SUMMARY.md with what you changed and whether full inference is implemented. |
| """ |
| (workspace / "GOAL.md").write_text(goal, encoding="utf-8") |
| return ["app.py", "requirements.txt", "README.md", "GOAL.md"] |
| |
| |
| def sanitize_readme_metadata(workspace: Path, events_path: Path): |
| readme_path = workspace / "README.md" |
| if not readme_path.exists(): |
| return |
| text = readme_path.read_text(encoding="utf-8", errors="ignore") |
| if not text.startswith("---"): |
| return |
| parts = text.split("---", 2) |
| if len(parts) < 3: |
| return |
| _, frontmatter, body = parts |
| changed = False |
| sanitized_lines = [] |
| for line in frontmatter.splitlines(): |
| if line.strip().startswith("short_description:"): |
| value = "Generated model demo" |
| sanitized_lines.append(f"short_description: {value}") |
| changed = True |
| else: |
| sanitized_lines.append(line) |
| # If Pi added other unexpectedly long one-line metadata values, leave them alone: |
| # the known Hub validation blocker for this run was short_description > 60 chars. |
| if changed: |
| new_text = "---\n" + "\n".join(sanitized_lines).strip() + "\n---" + body |
| readme_path.write_text(new_text, encoding="utf-8") |
| append_event(events_path, "metadata_sanitize", "success", "Sanitized README metadata", {"short_description": "Generated model demo"}) |
| |
| |
| |
| def normalize_requirements_for_modern_hub(workspace: Path, events_path: Path): |
| """Prevent a known resolver conflict in generated Spaces. |
| |
| Older builder versions forced `huggingface_hub<1.0.0` to avoid old Gradio |
| import issues. Modern Spaces can use Gradio 6 and recent Transformers; |
| Transformers 5.x requires huggingface-hub >=1.5.0, so the old pin breaks |
| builds. Keep the constraint broad and modern unless Pi intentionally uses a |
| different compatible stack. |
| """ |
| req_path = workspace / "requirements.txt" |
| if not req_path.exists(): |
| return |
| raw = req_path.read_text(encoding="utf-8", errors="ignore") |
| lines = [line.rstrip() for line in raw.splitlines()] |
| changed = False |
| filtered = [] |
| transformers_needs_hub_15 = False |
| for line in lines: |
| stripped = line.strip() |
| low = stripped.lower().replace("_", "-") |
| if low.startswith("huggingface-hub"): |
| if "<1" in low or "< 1" in low or ",<1" in low: |
| changed = True |
| # Always replace with the policy line to avoid duplicate/conflicting pins. |
| changed = True |
| continue |
| if low.startswith("transformers") and (">=5" in low or "==5" in low or "~=5" in low): |
| transformers_needs_hub_15 = True |
| filtered.append(line) |
| hub_line = "huggingface_hub>=1.5.0,<2.0.0" if transformers_needs_hub_15 else "huggingface_hub>=0.34.0,<2.0.0" |
| # Put hub near the top, after any --extra-index-url lines. |
| insert_at = 0 |
| while insert_at < len(filtered) and filtered[insert_at].strip().startswith("--"): |
| insert_at += 1 |
| filtered.insert(insert_at, hub_line) |
| new = "\n".join(line for line in filtered if line.strip()) + "\n" |
| if new != raw: |
| req_path.write_text(new, encoding="utf-8") |
| append_event(events_path, "requirements_sanitize", "success", "Normalized huggingface_hub requirement for modern dependency resolution", {"huggingface_hub": hub_line}) |
| |
| |
| def repair_workspace_with_pi(workspace: Path, run_dir: Path, events_path: Path, pi_model: str, target_space_id: str, model_id: str, failure_reason: str): |
| """Ask Pi for one minimal build/runtime repair pass based on collected logs.""" |
| logs_dir = run_dir / "logs" |
| build_log = (logs_dir / "space_logs_build.txt").read_text(encoding="utf-8", errors="ignore") if (logs_dir / "space_logs_build.txt").exists() else "" |
| runtime_log = (logs_dir / "space_logs_runtime.txt").read_text(encoding="utf-8", errors="ignore") if (logs_dir / "space_logs_runtime.txt").exists() else "" |
| repair_dir = run_dir / "repair" |
| before_dir = repair_dir / "before" |
| after_dir = repair_dir / "after" |
| if before_dir.exists(): |
| shutil.rmtree(before_dir) |
| shutil.copytree(workspace, before_dir, ignore=shutil.ignore_patterns(".git", "node_modules", "__pycache__", "*.pyc")) |
| goal = f"""You are Pi repairing a Hugging Face Space generated by Agentic Space Factory. |
| |
| MODEL_ID: {model_id} |
| TARGET_SPACE_ID: {target_space_id} |
| |
| The first build/runtime validation failed. |
| |
| Failure summary: |
| {failure_reason[:4000]} |
| |
| Build log tail: |
| ```text |
| {build_log[-12000:]} |
| ``` |
| |
| Runtime log tail: |
| ```text |
| {runtime_log[-12000:]} |
| ``` |
| |
| Repair contract: |
| - Make the smallest patch possible. |
| - Prefer fixing dependency resolver conflicts, missing imports, invalid metadata, Gradio endpoint bugs, and import-order issues. |
| - Do not replace real inference with a placeholder unless TECHNICAL_BLOCKERS.json clearly explains why full inference is impossible. |
| - Preserve a cheap health endpoint with api_name="health". |
| - Keep README frontmatter valid, short_description <= 60 chars. |
| - Do not pin huggingface_hub below 1.0. For modern generated Spaces use huggingface_hub>=0.34.0,<2.0.0. If transformers>=5 is present, use huggingface_hub>=1.5.0,<2.0.0. |
| - Do not delete the app. Do not publish anything. Work only in the current workspace. |
| |
| Deliverables: |
| - patched app.py / requirements.txt / README.md as needed |
| - REPAIR_SUMMARY.md explaining the patch |
| - keep or update INFERENCE_CONTRACT.json if the inference contract changed |
| """ |
| (workspace / "REPAIR_GOAL.md").write_text(goal, encoding="utf-8") |
| append_event(events_path, "repair", "started", "Running Pi repair pass using build/runtime logs", {"model": pi_model}) |
| code, out = run_cmd(["pi", "-p", goal], cwd=workspace, timeout=1500) |
| logs_dir.mkdir(parents=True, exist_ok=True) |
| (logs_dir / "pi_repair_output.txt").write_text(out, encoding="utf-8") |
| if code != 0: |
| append_event(events_path, "repair", "failed", "Pi repair returned a non-zero exit code", {"returncode": code, "output_tail": out[-3000:]}) |
| return False |
| normalize_requirements_for_modern_hub(workspace, events_path) |
| if after_dir.exists(): |
| shutil.rmtree(after_dir) |
| shutil.copytree(workspace, after_dir, ignore=shutil.ignore_patterns(".git", "node_modules", "__pycache__", "*.pyc")) |
| append_event(events_path, "repair", "success", "Pi repair pass completed", {"output_tail": out[-3000:]}) |
| return True |
| |
| def upload_workspace(api, workspace: Path, target_space_id: str, token: str, run_dir: Path, events_path: Path): |
| sanitize_readme_metadata(workspace, events_path) |
| normalize_requirements_for_modern_hub(workspace, events_path) |
| append_event(events_path, "upload_files", "started", "Uploading generated universal model-card workspace recursively") |
| gen_dir = run_dir / "generated" |
| if gen_dir.exists(): |
| shutil.rmtree(gen_dir) |
| shutil.copytree(workspace, gen_dir, ignore=shutil.ignore_patterns(".git", "node_modules", "__pycache__", "*.pyc")) |
| for filename in ["app.py", "README.md", "requirements.txt"]: |
| if not (workspace / filename).exists(): |
| raise RuntimeError(f"Missing required generated file: {filename}") |
| api.upload_folder( |
| folder_path=str(workspace), |
| repo_id=target_space_id, |
| repo_type="space", |
| token=token, |
| ignore_patterns=[".git/*", "node_modules/*", "__pycache__/*", "*.pyc", "GOAL.md"], |
| ) |
| uploaded_files = sorted(str(p.relative_to(workspace)) for p in workspace.rglob("*") if p.is_file() and "node_modules" not in p.parts and "__pycache__" not in p.parts) |
| append_event(events_path, "upload_files", "success", "Uploaded generated workspace folder", {"file_count": len(uploaded_files), "files_sample": uploaded_files[:50]}) |
| |
| |
| def load_json_if_exists(path: Path) -> dict: |
| if not path.exists(): |
| return {} |
| try: |
| return json.loads(path.read_text(encoding="utf-8", errors="replace")) |
| except Exception as exc: |
| return {"parse_error": str(exc), "raw_tail": path.read_text(encoding="utf-8", errors="replace")[-2000:]} |
| |
| |
| def infer_generation_gate(workspace: Path, implementation_mode: str, validation: dict, run_dir: Path, events_path: Path) -> dict: |
| """Classify the run separately from process success. |
| |
| /health passing means the Space boots. It does not mean the generated Space |
| performs model inference. In full-inference-gated mode we require either |
| an actual implementation signal or a machine-readable blocker report. |
| """ |
| app_text = (workspace / "app.py").read_text(encoding="utf-8", errors="ignore") if (workspace / "app.py").exists() else "" |
| summary_text = (workspace / "PI_SUMMARY.md").read_text(encoding="utf-8", errors="ignore") if (workspace / "PI_SUMMARY.md").exists() else "" |
| req_text = (workspace / "requirements.txt").read_text(encoding="utf-8", errors="ignore") if (workspace / "requirements.txt").exists() else "" |
| blockers_path = workspace / "TECHNICAL_BLOCKERS.json" |
| blockers = load_json_if_exists(blockers_path) |
| |
| combined = (app_text + "\n" + summary_text).lower() |
| blocked_markers = [ |
| "full generation is not implemented", |
| "full generation is intentionally not wired", |
| "full inference is blocked", |
| "returns a detailed diagnostic", |
| "diagnostic report instead", |
| "placeholder generator", |
| "placeholder generation", |
| "info-only", |
| "not implemented", |
| "cannot run in this environment", |
| "out of scope", |
| ] |
| blocker_detected = bool(blockers) or any(m in combined for m in blocked_markers) |
| implementation_signals = { |
| "has_spaces_gpu": "@spaces.GPU" in app_text, |
| "has_torch": "torch" in req_text or "import torch" in app_text, |
| "has_diffusers": "diffusers" in req_text or "diffusers" in app_text, |
| "has_video_output_hint": any(x in app_text.lower() for x in ["gr.video", "video", ".mp4", "ffmpeg"]), |
| "health_passed": validation.get("method") in {"http_health", "gradio"}, |
| } |
| |
| if blocker_detected: |
| status = "technical_blocker" |
| message = "Space boots, but full model inference was not implemented. See TECHNICAL_BLOCKERS.json / PI_SUMMARY.md." |
| elif implementation_mode in {"full-inference-gated", "full-inference-attempt"}: |
| # Without a video smoke test, do not claim real inference success. |
| status = "full_inference_candidate_health_passed" |
| message = "Space boots and contains inference signals, but no generation smoke test has validated a real video output." |
| else: |
| status = "health_only" |
| message = "Safe scaffold health validation passed. Full inference was not requested." |
| |
| if blocker_detected and not blockers: |
| blockers = { |
| "full_inference_implemented": False, |
| "source": "worker_heuristic_from_PI_SUMMARY_or_app.py", |
| "blockers": [ |
| { |
| "type": "agent_declared_or_detected_blocker", |
| "claim": "Pi-generated artifacts state that full inference is blocked/not implemented or generation returns diagnostics/placeholders.", |
| "evidence": "See PI_SUMMARY.md and app.py in generated artifacts.", |
| "severity": "blocking", |
| } |
| ], |
| "required_investigations_for_next_run": [ |
| "Check whether PyTorch SDPA can replace flash-attn calls.", |
| "Check whether HF Kernels flash-attn2/3/4 can replace required flash-attn APIs.", |
| "Verify whether 2-GPU context parallelism is strictly required or can be reduced to a single-GPU smoke test.", |
| ], |
| } |
| (workspace / "TECHNICAL_BLOCKERS.json").write_text(json.dumps(blockers, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") |
| (run_dir / "generated" / "TECHNICAL_BLOCKERS.json").write_text(json.dumps(blockers, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") |
| |
| gate = { |
| "status": status, |
| "message": message, |
| "implementation_mode": implementation_mode, |
| "blocker_detected": blocker_detected, |
| "implementation_signals": implementation_signals, |
| "validation_method": validation.get("method"), |
| "blockers": blockers, |
| } |
| write_json(run_dir / "inference_gate.json", gate) |
| append_event(events_path, "inference_gate", status, message, gate) |
| return gate |
| |
| |
| def main(): |
| run_id = os.environ["RUN_ID"] |
| hf_username = os.environ.get("HF_USERNAME", "unknown") |
| bucket_source = os.environ.get("BUCKET_SOURCE", "unknown") |
| output_root = Path(os.environ.get("OUTPUT_ROOT", "/output")) |
| target_space_id = os.environ.get("TARGET_SPACE_ID", "") |
| model_id = sanitize_model_id(os.environ.get("MODEL_ID", DEFAULT_MODEL_ID)) |
| pi_model = os.environ.get("PI_MODEL", "Qwen/Qwen3-Coder-Next") |
| preferred_hardware = os.environ.get("PREFERRED_SPACE_HARDWARE", "zero-a10g") |
| fallback_hardware = os.environ.get("FALLBACK_SPACE_HARDWARE", "l40sx1") |
| allow_fixed_gpu_fallback = os.environ.get("ALLOW_FIXED_GPU_FALLBACK", "true").lower() in {"1", "true", "yes", "on"} |
| implementation_mode = os.environ.get("IMPLEMENTATION_MODE", "full-inference-attempt") |
| token = os.environ.get("HF_TOKEN") |
| |
| run_dir = output_root / "runs" / run_id |
| events_path = run_dir / "events.jsonl" |
| state_path = run_dir / "state.json" |
| workspace = Path("/tmp/universal_workspace") |
| |
| append_event(events_path, "bootstrap", "started", "Universal model-card builder worker started", {"model_id": model_id, "target_space_id": target_space_id}) |
| write_json(state_path, {"run_id": run_id, "kind": "universal_model_card_builder", "status": "running", "message": "Attempting Universal model-card builderd Space creation", "model_id": model_id, "target_space": target_space_id, "created_by": hf_username, "bucket_source": bucket_source, "created_at": now(), "updated_at": now()}) |
| if not token: |
| fail(run_dir, events_path, "HF_TOKEN is missing from Job secrets") |
| if not TARGET_RE.match(target_space_id): |
| fail(run_dir, events_path, "Invalid TARGET_SPACE_ID", {"target_space_id": target_space_id}) |
| |
| try: |
| install_python_deps(events_path) |
| from huggingface_hub import HfApi |
| api = HfApi(token=token) |
| whoami = api.whoami(token=token) |
| append_event(events_path, "auth", "success", "Authenticated inside Job", {"whoami_name": whoami.get("name")}) |
| |
| append_event(events_path, "model_analysis", "started", "Fetching model metadata", {"model_id": model_id}) |
| info = api.model_info(model_id, token=token, files_metadata=True) |
| siblings = [getattr(s, "rfilename", "") for s in (info.siblings or [])] |
| analysis = {"model_id": model_id, "pipeline_tag": getattr(info, "pipeline_tag", None), "library_name": getattr(info, "library_name", None), "tags": list(getattr(info, "tags", []) or [])[:100], "siblings": siblings[:160], "default_model_target": model_id == DEFAULT_MODEL_ID, "preferred_hardware": preferred_hardware, "fallback_hardware": fallback_hardware, "allow_fixed_gpu_fallback": allow_fixed_gpu_fallback, "implementation_mode": implementation_mode} |
| write_json(run_dir / "model_analysis.json", analysis) |
| append_event(events_path, "model_analysis", "success", "Model metadata fetched", {"pipeline_tag": analysis["pipeline_tag"], "library_name": analysis["library_name"]}) |
| |
| create_initial_workspace(workspace, model_id, target_space_id, preferred_hardware, fallback_hardware, allow_fixed_gpu_fallback, implementation_mode, analysis) |
| append_event(events_path, "workspace", "success", "Prepared universal model-card workspace", {"files": sorted(p.name for p in workspace.iterdir())}) |
| |
| install_pi(events_path) |
| configure_pi(events_path, pi_model) |
| append_event(events_path, "pi_run", "started", "Running Pi on universal model-card workspace", {"model": pi_model}) |
| code, pi_out = run_cmd(["pi", "-p", (workspace / "GOAL.md").read_text(encoding="utf-8")], cwd=workspace, timeout=2400) |
| (run_dir / "logs").mkdir(parents=True, exist_ok=True) |
| (run_dir / "logs" / "pi_output.txt").write_text(pi_out, encoding="utf-8") |
| if code != 0: |
| append_event(events_path, "pi_run", "failed", "Pi returned a non-zero exit code", {"returncode": code, "output_tail": pi_out[-4000:]}) |
| collect_pi_traces(run_dir, events_path) |
| fail(run_dir, events_path, "Pi failed before Space upload", {"returncode": code, "output_tail": pi_out[-4000:]}) |
| append_event(events_path, "pi_run", "success", "Pi completed universal model-card workspace pass", {"output_tail": pi_out[-2000:]}) |
| if not (workspace / "PI_SUMMARY.md").exists(): |
| (workspace / "PI_SUMMARY.md").write_text("# Pi Summary\n\nPi did not create a PI_SUMMARY.md. See logs/pi_output.txt.\n", encoding="utf-8") |
| |
| app_text = (workspace / "app.py").read_text(encoding="utf-8", errors="ignore") |
| if "/health" not in app_text and "api_name=\"health\"" not in app_text and "api_name='health'" not in app_text: |
| append_event(events_path, "pi_verification", "failed", "app.py does not appear to expose /health; injecting safe health endpoint is not implemented") |
| fail(run_dir, events_path, "Pi output did not preserve a /health endpoint") |
| append_event(events_path, "pi_verification", "success", "Pi output preserved health validation endpoint") |
| |
| append_event(events_path, "hardware_strategy", "started", "Creating Space with hardware-at-creation strategy", {"preferred_hardware": preferred_hardware, "fallback_hardware": fallback_hardware, "allow_fixed_gpu_fallback": allow_fixed_gpu_fallback}) |
| hardware_strategy = create_space_with_hardware_strategy( |
| api, |
| target_space_id, |
| token, |
| preferred_hardware, |
| fallback_hardware, |
| allow_fixed_gpu_fallback, |
| events_path, |
| ) |
| selected_hardware = hardware_strategy.get("selected_hardware") or "default-cpu-or-existing" |
| hardware_attempts = list(hardware_strategy.get("attempts") or []) |
| requested_hardware_sequence = list(hardware_strategy.get("requested_sequence") or []) |
| |
| # Upload after create. If create_repo(space_hardware=...) succeeded, the build |
| # starts directly on the requested hardware. If it fell back to CPU, the run |
| # remains valid but will be marked manual_hardware_required when inference |
| # signals indicate GPU is needed. |
| upload_workspace(api, workspace, target_space_id, token, run_dir, events_path) |
| |
| if selected_hardware == "default-cpu-or-existing": |
| append_event(events_path, "hardware", "warning", "Automatic hardware-at-creation failed; Space is on default CPU unless user changes it manually", {"attempts": hardware_attempts}) |
| |
| write_json(run_dir / "hardware_attempts.json", {"selected_hardware": selected_hardware, "requested_sequence": requested_hardware_sequence, "attempts": hardware_attempts, "strategy": "create_repo_space_hardware_first"}) |
| write_json(run_dir / "hardware_strategy.json", {"selected_hardware": selected_hardware, "requested_sequence": requested_hardware_sequence, "attempts": hardware_attempts, "manual_action_required": selected_hardware == "default-cpu-or-existing", "strategy": "create_repo_space_hardware_first"}) |
| |
| try: |
| validation = validate_live_api(api, target_space_id, token, run_dir, events_path, timeout_s=1200) |
| except Exception as validation_error: |
| append_event(events_path, "repair", "started", "Initial live validation failed; attempting one repair pass", {"error": str(validation_error)[:2000]}) |
| collect_space_logs(target_space_id, token, run_dir, events_path) |
| repaired = repair_workspace_with_pi(workspace, run_dir, events_path, pi_model, target_space_id, model_id, str(validation_error)) |
| if not repaired: |
| raise |
| upload_workspace(api, workspace, target_space_id, token, run_dir, events_path) |
| validation = validate_live_api(api, target_space_id, token, run_dir, events_path, timeout_s=1200) |
| inference_gate = infer_generation_gate(workspace, implementation_mode, validation, run_dir, events_path) |
| |
| # If the generated app looks like real GPU inference but automatic |
| # hardware requests failed, classify the run honestly as needing manual |
| # hardware instead of pretending CPU/default hardware is enough. the existing-Space validation workflow |
| # can then smoke-test generation after the user sets a GPU manually. |
| manual_hw_required = selected_hardware == "default-cpu-or-existing" and inference_gate.get("status") not in {"technical_blocker", "health_only"} and ( |
| inference_gate.get("implementation_signals", {}).get("has_spaces_gpu") |
| or inference_gate.get("implementation_signals", {}).get("has_torch") |
| or any((a.get("manual_action_required") for a in hardware_attempts if isinstance(a, dict))) |
| ) |
| if manual_hw_required: |
| inference_gate = dict(inference_gate) |
| inference_gate["status"] = "manual_hardware_required" |
| inference_gate["message"] = "Space was generated and boots, but automatic ZeroGPU/fixed-GPU assignment failed. Set hardware manually, then run the existing-Space validation workflow." |
| inference_gate["manual_hardware_required"] = True |
| inference_gate["hardware_attempts"] = hardware_attempts |
| write_json(run_dir / "inference_gate.json", inference_gate) |
| append_event(events_path, "inference_gate", "manual_hardware_required", inference_gate["message"], inference_gate) |
| |
| collect_pi_traces(run_dir, events_path) |
| |
| final_state = { |
| "run_id": run_id, |
| "kind": "universal_model_card_builder", |
| "status": inference_gate["status"], |
| "message": inference_gate["message"], |
| "model_id": model_id, |
| "target_space": target_space_id, |
| "target_space_url": f"https://huggingface.co/spaces/{target_space_id}", |
| "selected_hardware": selected_hardware, |
| "hardware_attempts": hardware_attempts, |
| "validation": validation, |
| "inference_gate": inference_gate, |
| "updated_at": now(), |
| "created_by": hf_username, |
| "bucket_source": bucket_source, |
| } |
| write_json(state_path, final_state) |
| report = f"""# Agentic Space Factory — Universal Model-Card Builder Report |
| |
| Run ID: `{run_id}` |
| |
| Status: **{inference_gate['status']}** |
| |
| {inference_gate['message']} |
| |
| Target Space: https://huggingface.co/spaces/{target_space_id} |
| |
| Model: `{model_id}` |
| |
| ## Hardware |
| |
| Selected/requested hardware: `{selected_hardware}` |
| |
| Hardware changes are best-effort with OAuth. If requests fail with 401/auth/billing errors, set the Space hardware manually and rerun validation. |
| |
| ```json |
| {json.dumps(hardware_attempts, indent=2, ensure_ascii=False)} |
| ``` |
| |
| ## Health validation |
| |
| The wrapper validated the live Space using HTTP `/health` first, with Gradio Client as fallback. This only proves bootability. |
| |
| ```json |
| {json.dumps(validation, indent=2, ensure_ascii=False)} |
| ``` |
| |
| ## Full-inference gate |
| |
| ```json |
| {json.dumps(inference_gate, indent=2, ensure_ascii=False)} |
| ``` |
| |
| ## Pi summary |
| |
| {(workspace / 'PI_SUMMARY.md').read_text(encoding='utf-8', errors='ignore') if (workspace / 'PI_SUMMARY.md').exists() else 'No PI_SUMMARY.md was produced.'} |
| |
| ## Safety |
| |
| - The target Space was created private. |
| - No public publication was attempted. |
| - Raw traces should remain private; redacted traces are stored separately. |
| - If fallback fixed GPU was used or selected manually, review billing/hardware settings manually after the run. |
| """ |
| (run_dir / "report.md").write_text(report, encoding="utf-8") |
| append_event(events_path, "report_write", "success", "Wrote report.md") |
| append_event(events_path, "done", inference_gate["status"], "Universal model-card builder completed", {"target_space": target_space_id, "selected_hardware": selected_hardware, "gate_status": inference_gate["status"]}) |
| except SystemExit: |
| raise |
| except Exception as exc: |
| try: |
| collect_pi_traces(run_dir, events_path) |
| except Exception: |
| pass |
| fail(run_dir, events_path, "Universal model-card builder worker failed", {"error": str(exc)}) |
| |
| |
| if __name__ == "__main__": |
| main() |
| |
| ''' |
|
|
|
|
| VALIDATE_EXISTING_SPACE_WORKER_SCRIPT = r''' |
| import json |
| import os |
| import re |
| import shutil |
| import subprocess |
| import sys |
| import time |
| from datetime import datetime, timezone |
| from pathlib import Path |
| |
| TARGET_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{1,95}/[A-Za-z0-9][A-Za-z0-9._-]{1,95}$") |
| |
| |
| def now(): |
| return datetime.now(timezone.utc).isoformat() |
| |
| |
| def write_json(path: Path, payload: dict): |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") |
| |
| |
| def append_event(path: Path, step: str, status: str, message: str, data: dict | None = None): |
| path.parent.mkdir(parents=True, exist_ok=True) |
| event = {"ts": now(), "step": step, "status": status, "message": message, "data": data or {}} |
| line = json.dumps(event, ensure_ascii=False) |
| with path.open("a", encoding="utf-8") as f: |
| f.write(line + "\n") |
| print(line, flush=True) |
| |
| |
| def redact_text(text: str | None) -> str: |
| if not text: |
| return "" |
| value = text |
| for secret_name in ["HF_TOKEN", "HUGGING_FACE_HUB_TOKEN"]: |
| secret = os.environ.get(secret_name) |
| if secret: |
| value = value.replace(secret, "[REDACTED]") |
| value = re.sub(r"Bearer\s+[A-Za-z0-9_\-.=]+", "Bearer [REDACTED]", value) |
| value = re.sub(r"hf_[A-Za-z0-9_\-]{10,}", "hf_[REDACTED]", value) |
| return value |
| |
| |
| def run_cmd(cmd: list[str], *, env: dict | None = None, timeout: int = 120): |
| result = subprocess.run(cmd, env=env, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=timeout) |
| return result.returncode, redact_text(result.stdout) |
| |
| |
| def install_deps(events_path: Path): |
| append_event(events_path, "dependencies", "started", "Installing validation dependencies") |
| code, out = run_cmd([sys.executable, "-m", "pip", "install", "-q", "--upgrade", "huggingface_hub>=1.0.0", "gradio_client>=2.0.0", "requests>=2.31.0"], timeout=600) |
| if code != 0: |
| append_event(events_path, "dependencies", "failed", "Dependency installation failed", {"output_tail": out[-4000:]}) |
| raise RuntimeError(out) |
| append_event(events_path, "dependencies", "success", "Validation dependencies installed") |
| |
| |
| def make_gradio_client(target_space_id: str, token: str): |
| import inspect |
| from gradio_client import Client |
| params = inspect.signature(Client).parameters |
| if "token" in params: |
| return Client(target_space_id, token=token) |
| if "hf_token" in params: |
| return Client(target_space_id, hf_token=token) |
| if "api_key" in params: |
| return Client(target_space_id, api_key=token) |
| if "headers" in params: |
| return Client(target_space_id, headers={"Authorization": f"Bearer {token}"}) |
| return Client(target_space_id) |
| |
| |
| def api_names_from_schema(schema) -> list[str]: |
| names: list[str] = [] |
| def add(name): |
| if not isinstance(name, str) or not name: |
| return |
| if not name.startswith("/"): |
| name = "/" + name |
| if name not in names: |
| names.append(name) |
| def walk(obj): |
| if isinstance(obj, dict): |
| for k, v in obj.items(): |
| if k in {"api_name", "apiName"}: |
| add(v) |
| if isinstance(k, str) and k.startswith("/"): |
| add(k) |
| walk(v) |
| elif isinstance(obj, list): |
| for item in obj: |
| walk(item) |
| walk(schema) |
| return names |
| |
| |
| def runtime_to_dict(runtime) -> dict: |
| payload = {} |
| for attr in ["stage", "hardware", "requested_hardware", "sleep_time", "storage", "gc_timeout"]: |
| value = getattr(runtime, attr, None) |
| payload[attr] = getattr(value, "value", value) |
| return {k: str(v) if v is not None else None for k, v in payload.items()} |
| |
| |
| def write_space_runtime(api, target_space_id: str, token: str, run_dir: Path, events_path: Path, attempt: int | None = None) -> dict: |
| try: |
| runtime = api.get_space_runtime(repo_id=target_space_id, token=token) |
| payload = runtime_to_dict(runtime) |
| payload["attempt"] = attempt |
| write_json(run_dir / "space_runtime.json", payload) |
| return payload |
| except Exception as exc: |
| payload = {"error": str(exc)[:2000], "attempt": attempt} |
| write_json(run_dir / "space_runtime.json", payload) |
| append_event(events_path, "space_runtime", "warning", "Could not fetch Space runtime", payload) |
| return payload |
| |
| |
| def collect_space_logs(target_space_id: str, token: str, run_dir: Path, events_path: Path): |
| logs_dir = run_dir / "logs" |
| logs_dir.mkdir(parents=True, exist_ok=True) |
| env = os.environ.copy() |
| env["HF_TOKEN"] = token |
| commands = { |
| "space_logs_runtime.txt": ["hf", "spaces", "logs", target_space_id], |
| "space_logs_build.txt": ["hf", "spaces", "logs", target_space_id, "--build"], |
| } |
| written = [] |
| for filename, cmd in commands.items(): |
| try: |
| code, out = run_cmd(cmd, env=env, timeout=75) |
| (logs_dir / filename).write_text(out, encoding="utf-8") |
| written.append({"file": filename, "returncode": code, "tail": out[-1000:]}) |
| except Exception as exc: |
| written.append({"file": filename, "error": str(exc)[:1000]}) |
| append_event(events_path, "space_logs", "success", "Collected best-effort Space logs", {"files": written}) |
| return written |
| |
| |
| def space_subdomain_url(target_space_id: str) -> str: |
| owner, name = target_space_id.split("/", 1) |
| return f"https://{owner}-{name}.hf.space".replace("_", "-").lower() |
| |
| |
| def validate_http_health(target_space_id: str, token: str, run_dir: Path, attempt: int): |
| import requests |
| url = space_subdomain_url(target_space_id).rstrip("/") + "/health" |
| headers = {"Authorization": f"Bearer {token}", "Accept": "application/json,text/plain,*/*"} |
| response = requests.get(url, headers=headers, timeout=20) |
| payload = { |
| "status": "success" if response.ok else "failed", |
| "attempt": attempt, |
| "url": url, |
| "status_code": response.status_code, |
| "content_type": response.headers.get("content-type"), |
| "text": response.text[:2000], |
| } |
| if response.ok: |
| try: |
| payload["json"] = response.json() |
| except Exception: |
| pass |
| write_json(run_dir / "tests" / "http_health.json", payload) |
| return payload |
| raise RuntimeError(f"HTTP /health returned {response.status_code}: {response.text[:500]}") |
| |
| |
| def wait_until_live(api, target_space_id: str, token: str, run_dir: Path, events_path: Path, timeout_s: int = 1800): |
| append_event(events_path, "live_wait", "started", "Waiting for existing Space to become live") |
| deadline = time.time() + timeout_s |
| attempt = 0 |
| last_error = None |
| while time.time() < deadline: |
| attempt += 1 |
| runtime_payload = write_space_runtime(api, target_space_id, token, run_dir, events_path, attempt) |
| stage = str(runtime_payload.get("stage") or "").upper() |
| if "RUNTIME_ERROR" in stage: |
| collect_space_logs(target_space_id, token, run_dir, events_path) |
| last_error = f"Space is in RUNTIME_ERROR: {runtime_payload}" |
| append_event(events_path, "live_wait", "waiting", "Space is in runtime error; still waiting in case hardware was changed manually", {"attempt": attempt, "runtime": runtime_payload}) |
| time.sleep(30) |
| continue |
| try: |
| health = validate_http_health(target_space_id, token, run_dir, attempt) |
| append_event(events_path, "live_wait", "success", "HTTP /health is live", {"attempt": attempt}) |
| return {"validator": "http_health", "health": health, "runtime": runtime_payload} |
| except Exception as http_exc: |
| last_error = f"HTTP health failed: {http_exc}" |
| try: |
| client = make_gradio_client(target_space_id, token) |
| schema = client.view_api(return_format="dict") |
| names = api_names_from_schema(schema) |
| write_json(run_dir / "tests" / "api_schema.json", {"schema": schema, "api_names": names}) |
| if names: |
| append_event(events_path, "live_wait", "success", "Gradio API schema is live", {"attempt": attempt, "api_names": names}) |
| return {"validator": "gradio_schema", "api_names": names, "runtime": runtime_payload} |
| except Exception as gr_exc: |
| last_error = (last_error or "") + f"; Gradio schema failed: {gr_exc}" |
| append_event(events_path, "live_wait", "waiting", "Space not live yet", {"attempt": attempt, "runtime": runtime_payload, "error": last_error[-1500:] if last_error else None}) |
| time.sleep(30) |
| collect_space_logs(target_space_id, token, run_dir, events_path) |
| raise RuntimeError(f"Space did not become live before timeout: {last_error}") |
| |
| |
| def parse_json_env(name: str, default): |
| value = os.environ.get(name) |
| if not value: |
| return default |
| try: |
| return json.loads(value) |
| except Exception as exc: |
| raise ValueError(f"Invalid JSON for {name}: {exc}") |
| |
| |
| def result_contains_expected_output(result, expected_output_type: str) -> tuple[bool, dict]: |
| expected = (expected_output_type or "any").lower().strip() |
| info = {"expected_output_type": expected, "result_type": type(result).__name__, "result_repr": repr(result)[:2000]} |
| paths = [] |
| def visit(obj): |
| if isinstance(obj, (str, Path)): |
| text = str(obj) |
| if any(text.lower().endswith(ext) for ext in [".png", ".jpg", ".jpeg", ".webp", ".gif", ".mp4", ".wav", ".mp3", ".txt"]): |
| paths.append(text) |
| elif isinstance(obj, dict): |
| for key in ["path", "url", "name"]: |
| if key in obj: |
| visit(obj[key]) |
| for value in obj.values(): |
| if isinstance(value, (dict, list, tuple)): |
| visit(value) |
| elif isinstance(obj, (list, tuple)): |
| for item in obj: |
| visit(item) |
| visit(result) |
| info["detected_paths"] = paths[:20] |
| if expected == "any": |
| return result is not None, info |
| image_ext = [".png", ".jpg", ".jpeg", ".webp", ".gif"] |
| video_ext = [".mp4", ".mov", ".webm"] |
| audio_ext = [".wav", ".mp3", ".flac", ".ogg"] |
| if expected == "text": |
| return isinstance(result, str) and len(result.strip()) > 0, info |
| if expected == "image": |
| return any(str(p).lower().endswith(tuple(image_ext)) for p in paths), info |
| if expected == "video": |
| return any(str(p).lower().endswith(tuple(video_ext)) for p in paths), info |
| if expected == "audio": |
| return any(str(p).lower().endswith(tuple(audio_ext)) for p in paths), info |
| return result is not None, info |
| |
| |
| def copy_result_artifacts(result, run_dir: Path): |
| artifacts = run_dir / "artifacts" |
| artifacts.mkdir(parents=True, exist_ok=True) |
| copied = [] |
| def maybe_copy(obj): |
| if isinstance(obj, (str, Path)): |
| path = Path(str(obj)) |
| if path.exists() and path.is_file(): |
| target = artifacts / path.name |
| try: |
| shutil.copy2(path, target) |
| copied.append(str(target)) |
| except Exception: |
| pass |
| elif isinstance(obj, dict): |
| for key in ["path", "name"]: |
| if key in obj: |
| maybe_copy(obj[key]) |
| for value in obj.values(): |
| if isinstance(value, (dict, list, tuple)): |
| maybe_copy(value) |
| elif isinstance(obj, (list, tuple)): |
| for item in obj: |
| maybe_copy(item) |
| maybe_copy(result) |
| return copied |
| |
| |
| def smoke_generate(target_space_id: str, token: str, run_dir: Path, events_path: Path): |
| api_name = (os.environ.get("API_NAME") or "/generate").strip() |
| expected_output_type = (os.environ.get("EXPECTED_OUTPUT_TYPE") or "any").strip() |
| test_args = parse_json_env("TEST_ARGS_JSON", ["a cinematic robot cat astronaut, detailed, studio lighting"]) |
| test_kwargs = parse_json_env("TEST_KWARGS_JSON", {}) |
| if not isinstance(test_args, list): |
| raise ValueError("TEST_ARGS_JSON must be a JSON list") |
| if not isinstance(test_kwargs, dict): |
| raise ValueError("TEST_KWARGS_JSON must be a JSON object") |
| append_event(events_path, "generation_smoke", "started", "Calling live generation endpoint", {"api_name": api_name, "expected_output_type": expected_output_type}) |
| client = make_gradio_client(target_space_id, token) |
| schema = client.view_api(return_format="dict") |
| discovered = api_names_from_schema(schema) |
| write_json(run_dir / "tests" / "api_schema.json", {"schema": schema, "api_names": discovered}) |
| started = time.time() |
| result = client.predict(*test_args, api_name=api_name, **test_kwargs) |
| latency = time.time() - started |
| ok, info = result_contains_expected_output(result, expected_output_type) |
| copied = copy_result_artifacts(result, run_dir) |
| payload = { |
| "status": "success" if ok else "failed", |
| "target_space": target_space_id, |
| "api_name": api_name, |
| "discovered_api_names": discovered, |
| "test_args": test_args, |
| "test_kwargs": test_kwargs, |
| "expected_output_type": expected_output_type, |
| "latency_seconds": round(latency, 3), |
| "result_info": info, |
| "copied_artifacts": copied, |
| "recommended_zero_gpu_duration_seconds": int(max(30, min(300, latency * 2 + 15))), |
| "validated_at": now(), |
| } |
| write_json(run_dir / "tests" / "generation_smoke.json", payload) |
| write_json(run_dir / "tests" / "test_result.json", payload) |
| if ok: |
| append_event(events_path, "generation_smoke", "success", "Live generation smoke test passed", {"latency_seconds": payload["latency_seconds"], "copied_artifacts": copied[:5]}) |
| return payload |
| append_event(events_path, "generation_smoke", "failed", "Live generation returned an unexpected output type", payload) |
| raise RuntimeError("Generation smoke test failed: unexpected output type") |
| |
| |
| def main(): |
| run_id = os.environ["RUN_ID"] |
| username = os.environ.get("HF_USERNAME", "unknown") |
| output_root = Path(os.environ.get("OUTPUT_ROOT", "/output")) |
| target_space_id = os.environ["TARGET_SPACE_ID"].strip() |
| token = os.environ.get("HF_TOKEN") |
| run_dir = output_root / "runs" / run_id |
| events_path = run_dir / "events.jsonl" |
| state_path = run_dir / "state.json" |
| append_event(events_path, "bootstrap", "started", "Existing Space validation worker started", {"target_space_id": target_space_id}) |
| write_json(state_path, {"run_id": run_id, "kind": "validate_existing_space", "status": "running", "target_space": target_space_id, "created_by": username, "updated_at": now()}) |
| if not token: |
| raise RuntimeError("HF_TOKEN is missing") |
| if not TARGET_RE.match(target_space_id): |
| raise ValueError("TARGET_SPACE_ID must look like owner/space-name") |
| try: |
| install_deps(events_path) |
| from huggingface_hub import HfApi |
| api = HfApi(token=token) |
| whoami = api.whoami(token=token) |
| append_event(events_path, "auth", "success", "Authenticated inside validation Job", {"whoami_name": whoami.get("name")}) |
| live = wait_until_live(api, target_space_id, token, run_dir, events_path, timeout_s=int(os.environ.get("LIVE_TIMEOUT_SECONDS", "1800"))) |
| smoke = smoke_generate(target_space_id, token, run_dir, events_path) |
| final_state = { |
| "run_id": run_id, |
| "kind": "validate_existing_space", |
| "status": "full_inference_success", |
| "message": "Existing Space passed live health/schema validation and generation smoke test.", |
| "target_space": target_space_id, |
| "target_space_url": f"https://huggingface.co/spaces/{target_space_id}", |
| "live_validation": live, |
| "generation_smoke": smoke, |
| "updated_at": now(), |
| } |
| write_json(state_path, final_state) |
| report = f"""# Agentic Space Factory — Existing Space Validation Report |
| |
| Status: **full_inference_success** |
| |
| Target Space: [`{target_space_id}`](https://huggingface.co/spaces/{target_space_id}) |
| |
| ## Generation smoke test |
| |
| ```json |
| {json.dumps(smoke, indent=2, ensure_ascii=False)} |
| ``` |
| |
| ## Notes |
| |
| - This validation is intended for Spaces whose hardware was set manually after generation. |
| - Latency is measured from the live Gradio endpoint call. |
| - The recommended ZeroGPU duration is a rough estimate from this live run, not a guarantee. |
| """ |
| (run_dir / "report.md").write_text(report, encoding="utf-8") |
| append_event(events_path, "report_write", "success", "Wrote report.md") |
| append_event(events_path, "done", "full_inference_success", "Existing Space validation completed", {"latency_seconds": smoke.get("latency_seconds")}) |
| except Exception as exc: |
| collect_space_logs(target_space_id, token or "", run_dir, events_path) |
| details = {"error": str(exc)[:4000]} |
| write_json(state_path, {"run_id": run_id, "kind": "validate_existing_space", "status": "failed", "target_space": target_space_id, "details": details, "updated_at": now()}) |
| (run_dir / "report.md").write_text(f"# Existing Space Validation Failed\n\n```json\n{json.dumps(details, indent=2, ensure_ascii=False)}\n```\n", encoding="utf-8") |
| append_event(events_path, "failure", "failed", "Existing Space validation failed", details) |
| raise SystemExit(1) |
| |
| |
| if __name__ == "__main__": |
| main() |
| ''' |
|
|
|
|
| def encoded_universal_model_card_worker_script() -> str: |
| """Return the base64-encoded universal model-card builder worker script.""" |
| return _encode(UNIVERSAL_MODEL_CARD_WORKER_SCRIPT) |
|
|
|
|
| def encoded_validate_existing_space_worker_script() -> str: |
| """Return the base64-encoded existing-Space validation worker script.""" |
| return _encode(VALIDATE_EXISTING_SPACE_WORKER_SCRIPT) |
|
|
|
|
| def python_decode_and_run_command() -> list[str]: |
| """Command list for `run_job`. |
| |
| The Job image only needs Python. The script is passed via env as base64 and |
| executed from /tmp, which avoids persisting code or exposing secrets. |
| """ |
| runner = textwrap.dedent( |
| """ |
| import base64, os, pathlib, subprocess, sys |
| script = base64.b64decode(os.environ['WORKER_SCRIPT_B64']).decode('utf-8') |
| path = pathlib.Path('/tmp/space_factory_worker.py') |
| path.write_text(script, encoding='utf-8') |
| raise SystemExit(subprocess.call([sys.executable, str(path)])) |
| """ |
| ).strip() |
| return ["python", "-c", runner] |
|
|