| from __future__ import annotations |
|
|
| import base64 |
| import textwrap |
|
|
|
|
| HELLO_WORKER_SCRIPT = r''' |
| import json |
| import os |
| from datetime import datetime, timezone |
| from pathlib import Path |
| |
| |
| def now(): |
| return datetime.now(timezone.utc).isoformat() |
| |
| |
| def write_json(path: Path, payload: dict): |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") |
| |
| |
| def append_event(path: Path, step: str, status: str, message: str, data: dict | None = None): |
| path.parent.mkdir(parents=True, exist_ok=True) |
| event = { |
| "ts": now(), |
| "step": step, |
| "status": status, |
| "message": message, |
| "data": data or {}, |
| } |
| line = json.dumps(event, ensure_ascii=False) |
| with path.open("a", encoding="utf-8") as f: |
| f.write(line + "\n") |
| # Keep HF Job logs useful as well as Bucket events. |
| print(line, flush=True) |
| |
| |
| def main(): |
| run_id = os.environ["RUN_ID"] |
| hf_username = os.environ.get("HF_USERNAME", "unknown") |
| bucket_source = os.environ.get("BUCKET_SOURCE", "unknown") |
| output_root = Path(os.environ.get("OUTPUT_ROOT", "/output")) |
| job_id = os.environ.get("JOB_ID") |
| accelerator = os.environ.get("ACCELERATOR") or "none" |
| cpu_cores = os.environ.get("CPU_CORES") |
| memory = os.environ.get("MEMORY") |
| has_hf_token = bool(os.environ.get("HF_TOKEN")) |
| |
| run_dir = output_root / "runs" / run_id |
| state_path = run_dir / "state.json" |
| events_path = run_dir / "events.jsonl" |
| report_path = run_dir / "report.md" |
| |
| append_event(events_path, "bootstrap", "started", "HF Job started") |
| append_event( |
| events_path, |
| "environment", |
| "success", |
| "Collected non-sensitive job environment metadata", |
| { |
| "job_id": job_id, |
| "accelerator": accelerator, |
| "cpu_cores": cpu_cores, |
| "memory": memory, |
| "has_hf_token": has_hf_token, |
| }, |
| ) |
| |
| state = { |
| "run_id": run_id, |
| "status": "success", |
| "kind": "hello_job", |
| "message": "Hello from HF Job. OAuth → Job → Bucket write succeeded.", |
| "created_at": now(), |
| "updated_at": now(), |
| "created_by": hf_username, |
| "bucket_source": bucket_source, |
| "job_id": job_id, |
| "accelerator": accelerator, |
| "cpu_cores": cpu_cores, |
| "memory": memory, |
| "has_hf_token": has_hf_token, |
| "security_notes": [ |
| "HF_TOKEN was not printed.", |
| "This run does not create or publish any repository.", |
| "The bucket should remain private.", |
| ], |
| } |
| write_json(state_path, state) |
| append_event(events_path, "state_write", "success", "Wrote state.json") |
| |
| report = f"""# Agentic Space Factory — Hello Job Report |
| |
| Run ID: `{run_id}` |
| |
| Status: **success** |
| |
| This first worker validated the critical foundation: |
| |
| ```text |
| OAuth user → HF Job → mounted Storage Bucket → state/events/report write |
| ``` |
| |
| ## Non-sensitive job metadata |
| |
| - Job ID: `{job_id}` |
| - User: `{hf_username}` |
| - Bucket: `{bucket_source}` |
| - Accelerator: `{accelerator}` |
| - CPU cores: `{cpu_cores}` |
| - Memory: `{memory}` |
| - HF token present in job env: `{has_hf_token}` |
| |
| ## Security posture |
| |
| - The token was passed as a secret and was not printed. |
| - This run did not create or modify any Hugging Face repository. |
| - This run did not publish anything publicly. |
| |
| ## Next implementation step |
| |
| The next increment should create a private target Gradio Space and validate it with `gradio_client` before reporting success. |
| """ |
| report_path.write_text(report, encoding="utf-8") |
| append_event(events_path, "report_write", "success", "Wrote report.md") |
| append_event(events_path, "done", "success", "Hello Job completed") |
| |
| |
| if __name__ == "__main__": |
| main() |
| ''' |
|
|
|
|
| CREATE_SPACE_WORKER_SCRIPT = r''' |
| import json |
| import os |
| import re |
| import subprocess |
| import sys |
| import time |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from textwrap import dedent |
| |
| |
| TARGET_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{1,95}/[A-Za-z0-9][A-Za-z0-9._-]{1,95}$") |
| |
| |
| def now(): |
| return datetime.now(timezone.utc).isoformat() |
| |
| |
| def write_json(path: Path, payload: dict): |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") |
| |
| |
| def append_event(path: Path, step: str, status: str, message: str, data: dict | None = None): |
| path.parent.mkdir(parents=True, exist_ok=True) |
| event = {"ts": now(), "step": step, "status": status, "message": message, "data": data or {}} |
| line = json.dumps(event, ensure_ascii=False) |
| with path.open("a", encoding="utf-8") as f: |
| f.write(line + "\n") |
| # Keep HF Job logs useful as well as Bucket events. |
| print(line, flush=True) |
| |
| |
| def fail(run_dir: Path, events_path: Path, message: str, details: dict | None = None, status: str = "failed"): |
| state_path = run_dir / "state.json" |
| append_event(events_path, "failure", "failed", message, details or {}) |
| write_json(state_path, { |
| "run_id": os.environ.get("RUN_ID"), |
| "kind": "create_private_space", |
| "status": status, |
| "message": message, |
| "updated_at": now(), |
| "details": details or {}, |
| }) |
| report = f"""# Agentic Space Factory — Private Space Creation Report |
| |
| Status: **{status}** |
| |
| {message} |
| |
| ```json |
| {json.dumps(details or {}, indent=2, ensure_ascii=False)} |
| ``` |
| """ |
| (run_dir / "report.md").write_text(report, encoding="utf-8") |
| raise SystemExit(1) |
| |
| |
| def pip_install(events_path: Path): |
| append_event(events_path, "dependencies", "started", "Installing worker dependencies") |
| cmd = [sys.executable, "-m", "pip", "install", "-q", "--upgrade", "huggingface_hub>=1.0.0", "gradio_client>=2.0.0", "requests>=2.31.0"] |
| result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) |
| if result.returncode != 0: |
| append_event(events_path, "dependencies", "failed", "Dependency installation failed", {"output_tail": result.stdout[-4000:]}) |
| raise RuntimeError(result.stdout) |
| append_event(events_path, "dependencies", "success", "Worker dependencies installed") |
| |
| |
| def target_files(target_space_id: str) -> dict[str, str]: |
| app_py = dedent(f""" |
| import gradio as gr |
| |
| |
| def greet(name: str) -> str: |
| name = (name or "friend").strip() or "friend" |
| return f"Hello {{name}} — this private Space was generated by Agentic Space Factory." |
| |
| |
| demo = gr.Interface( |
| fn=greet, |
| inputs=gr.Textbox(label="Name", value="Hugging Face"), |
| outputs=gr.Textbox(label="Result"), |
| title="Generated private Space", |
| description="A minimal Gradio Space created by an HF Job, then validated through the live Gradio API.", |
| examples=[["Hugging Face"], ["Agentic Space Factory"]], |
| ) |
| |
| |
| if __name__ == "__main__": |
| demo.launch() |
| """).strip() + "\n" |
| |
| readme = dedent(f""" |
| --- |
| title: Generated Private Space |
| emoji: 🧪 |
| colorFrom: blue |
| colorTo: purple |
| sdk: gradio |
| app_file: app.py |
| python_version: "3.11" |
| pinned: false |
| --- |
| |
| # Generated Private Space |
| |
| This private Space was generated by **Agentic Space Factory**. |
| |
| Target repo: `{target_space_id}` |
| |
| This Phase 2 version intentionally creates only a safe hello-world Gradio app. |
| Later phases will add Pi, model-card analysis, ZeroGPU templates, and automatic repair. |
| """).strip() + "\n" |
| |
| requirements = "gradio>=5.0.0\n" |
| return {"app.py": app_py, "README.md": readme, "requirements.txt": requirements} |
| |
| |
| def save_generated_files(run_dir: Path, files: dict[str, str]): |
| generated_dir = run_dir / "generated" |
| generated_dir.mkdir(parents=True, exist_ok=True) |
| for filename, content in files.items(): |
| (generated_dir / filename).write_text(content, encoding="utf-8") |
| |
| |
| def create_and_upload_space(api, token: str, target_space_id: str, files: dict[str, str], events_path: Path): |
| append_event(events_path, "create_space", "started", f"Creating private target Space {target_space_id}") |
| try: |
| api.create_repo( |
| repo_id=target_space_id, |
| repo_type="space", |
| space_sdk="gradio", |
| private=True, |
| exist_ok=False, |
| token=token, |
| ) |
| append_event(events_path, "create_space", "success", "Private target Space created", {"target_space": target_space_id}) |
| except Exception as exc: |
| # If it already exists, fail safely instead of overwriting user resources unexpectedly. |
| append_event(events_path, "create_space", "failed", "Could not create target Space", {"error": str(exc)}) |
| raise |
| |
| append_event(events_path, "upload_files", "started", "Uploading generated files to target Space") |
| for path_in_repo, content in files.items(): |
| api.upload_file( |
| path_or_fileobj=content.encode("utf-8"), |
| path_in_repo=path_in_repo, |
| repo_id=target_space_id, |
| repo_type="space", |
| token=token, |
| ) |
| append_event(events_path, "upload_files", "success", f"Uploaded {path_in_repo}") |
| |
| |
| def make_gradio_client(target_space_id: str, token: str): |
| """Create a Gradio Client across gradio_client versions. |
| |
| gradio_client 2.x uses `token=...`; older/newer docs often mention |
| `hf_token=...`; some versions expose `api_key` or `headers`. Using |
| signature introspection prevents a permanent wait loop on a TypeError. |
| """ |
| import inspect |
| from gradio_client import Client |
| |
| params = inspect.signature(Client).parameters |
| if "token" in params: |
| return Client(target_space_id, token=token) |
| if "hf_token" in params: |
| return Client(target_space_id, hf_token=token) |
| if "api_key" in params: |
| return Client(target_space_id, api_key=token) |
| if "headers" in params: |
| return Client(target_space_id, headers={"Authorization": f"Bearer {token}"}) |
| # Last-resort fallback: if the process is logged in via HF_TOKEN/HF CLI, |
| # some client versions can pick credentials from the environment/cache. |
| return Client(target_space_id) |
| |
| |
| def get_api_schema(client): |
| try: |
| return client.view_api(return_format="dict") |
| except TypeError: |
| return client.view_api() |
| |
| |
| def extract_api_names(api_schema) -> list[str]: |
| """Best-effort extraction across gradio_client schema formats. |
| |
| Gradio/Gradio Client versions differ: an Interface can expose `/predict`, |
| `/greet`, or another named endpoint. For the generated hello app the live |
| Job logs show `/greet`, so validation must discover endpoints instead of |
| hardcoding `/predict`. |
| """ |
| names: list[str] = [] |
| |
| def add(value): |
| if not value or not isinstance(value, str): |
| return |
| name = value if value.startswith("/") else f"/{value}" |
| if name not in names: |
| names.append(name) |
| |
| def walk(obj): |
| if isinstance(obj, dict): |
| for key, value in obj.items(): |
| if key in {"api_name", "apiName"}: |
| add(value) |
| # Some schemas use endpoint paths as keys, for example `/greet`. |
| if isinstance(key, str) and key.startswith("/"): |
| add(key) |
| walk(value) |
| elif isinstance(obj, list): |
| for item in obj: |
| walk(item) |
| |
| walk(api_schema) |
| return names |
| |
| |
| def predict_with_available_endpoint(client, api_schema, value: str): |
| candidates = extract_api_names(api_schema) |
| for fallback in ["/greet", "/predict"]: |
| if fallback not in candidates: |
| candidates.append(fallback) |
| |
| errors = [] |
| for api_name in candidates: |
| try: |
| return api_name, client.predict(value, api_name=api_name) |
| except Exception as exc: |
| errors.append({"api_name": api_name, "error": str(exc)[-500:]}) |
| |
| # Last fallback for old/simple gradio_client versions where api_name may be optional. |
| try: |
| return None, client.predict(value) |
| except Exception as exc: |
| errors.append({"api_name": None, "error": str(exc)[-500:]}) |
| raise RuntimeError(f"No candidate Gradio endpoint worked: {json.dumps(errors, ensure_ascii=False)}") |
| |
| |
| def validate_live_api(target_space_id: str, token: str, events_path: Path, tests_dir: Path, timeout_seconds: int = 360): |
| tests_dir.mkdir(parents=True, exist_ok=True) |
| deadline = time.time() + timeout_seconds |
| last_error = None |
| attempt = 0 |
| append_event(events_path, "api_validation", "started", "Waiting for live Gradio API to become available") |
| |
| while time.time() < deadline: |
| attempt += 1 |
| try: |
| client = make_gradio_client(target_space_id, token) |
| api_schema = get_api_schema(client) |
| api_names = extract_api_names(api_schema) |
| write_json(tests_dir / "api_schema.json", {"schema": api_schema, "api_names": api_names}) |
| used_api_name, result = predict_with_available_endpoint(client, api_schema, "Agentic Space Factory") |
| result_text = str(result) |
| ok = "Agentic Space Factory" in result_text and "Hello" in result_text |
| payload = { |
| "attempt": attempt, |
| "target_space": target_space_id, |
| "api_test_passed": ok, |
| "api_name": used_api_name, |
| "discovered_api_names": api_names, |
| "result": result_text, |
| "validated_at": now(), |
| } |
| write_json(tests_dir / "test_result.json", payload) |
| if ok: |
| append_event( |
| events_path, |
| "api_validation", |
| "success", |
| "Live Gradio API test passed", |
| {"attempt": attempt, "api_name": used_api_name, "discovered_api_names": api_names}, |
| ) |
| return payload |
| last_error = f"Unexpected API result from {used_api_name}: {result_text}" |
| except Exception as exc: |
| last_error = str(exc) |
| append_event(events_path, "api_validation", "waiting", "Live API not ready yet", {"attempt": attempt, "error": last_error[-1000:]}) |
| time.sleep(20) |
| |
| payload = { |
| "target_space": target_space_id, |
| "api_test_passed": False, |
| "error": last_error, |
| "validated_at": now(), |
| } |
| write_json(tests_dir / "test_result.json", payload) |
| raise RuntimeError(f"Live API validation did not pass before timeout: {last_error}") |
| |
| |
| def main(): |
| run_id = os.environ["RUN_ID"] |
| hf_username = os.environ.get("HF_USERNAME", "unknown") |
| bucket_source = os.environ.get("BUCKET_SOURCE", "unknown") |
| output_root = Path(os.environ.get("OUTPUT_ROOT", "/output")) |
| target_space_id = os.environ["TARGET_SPACE_ID"] |
| token = os.environ.get("HF_TOKEN") |
| |
| run_dir = output_root / "runs" / run_id |
| events_path = run_dir / "events.jsonl" |
| state_path = run_dir / "state.json" |
| report_path = run_dir / "report.md" |
| target_json_path = run_dir / "target_space.json" |
| |
| append_event(events_path, "bootstrap", "started", "Private Space creation worker started") |
| write_json(state_path, { |
| "run_id": run_id, |
| "kind": "create_private_space", |
| "status": "running", |
| "message": "Creating private target Space", |
| "target_space": target_space_id, |
| "created_by": hf_username, |
| "bucket_source": bucket_source, |
| "created_at": now(), |
| "updated_at": now(), |
| }) |
| |
| if not token: |
| fail(run_dir, events_path, "HF_TOKEN is missing from Job secrets") |
| if not TARGET_RE.match(target_space_id): |
| fail(run_dir, events_path, "Invalid TARGET_SPACE_ID", {"target_space": target_space_id}) |
| if not target_space_id.startswith(f"{hf_username}/"): |
| fail(run_dir, events_path, "For Phase 2, target Space must be in the signed-in user's namespace", {"target_space": target_space_id, "username": hf_username}) |
| |
| try: |
| pip_install(events_path) |
| from huggingface_hub import HfApi |
| |
| api = HfApi(token=token) |
| whoami = api.whoami(token=token) |
| append_event(events_path, "auth", "success", "Authenticated inside Job", {"whoami_name": whoami.get("name")}) |
| |
| files = target_files(target_space_id) |
| save_generated_files(run_dir, files) |
| append_event(events_path, "generate_files", "success", "Generated minimal Gradio Space files", {"files": list(files)}) |
| |
| create_and_upload_space(api, token, target_space_id, files, events_path) |
| write_json(target_json_path, { |
| "target_space": target_space_id, |
| "url": f"https://huggingface.co/spaces/{target_space_id}", |
| "private": True, |
| "sdk": "gradio", |
| "created_by": hf_username, |
| }) |
| |
| validation = validate_live_api(target_space_id, token, events_path, run_dir / "tests") |
| |
| final_state = { |
| "run_id": run_id, |
| "kind": "create_private_space", |
| "status": "success", |
| "message": "Private Gradio Space created and validated through the live API.", |
| "target_space": target_space_id, |
| "target_space_url": f"https://huggingface.co/spaces/{target_space_id}", |
| "created_by": hf_username, |
| "bucket_source": bucket_source, |
| "validation": validation, |
| "updated_at": now(), |
| "security_notes": [ |
| "The target Space was created as private.", |
| "The HF token was not printed or written to report files.", |
| "Success was declared only after a live Gradio API test passed.", |
| ], |
| } |
| write_json(state_path, final_state) |
| |
| report = f"""# Agentic Space Factory — Private Space Creation Report |
| |
| Run ID: `{run_id}` |
| |
| Status: **success** |
| |
| Created private Space: [`{target_space_id}`](https://huggingface.co/spaces/{target_space_id}) |
| |
| ## What happened |
| |
| ```text |
| OAuth user → HF Job → private Space creation → file upload → live Gradio API validation → Bucket report |
| ``` |
| |
| ## Generated files |
| |
| - `app.py` |
| - `requirements.txt` |
| - `README.md` |
| |
| Copies are stored in: |
| |
| ```text |
| runs/{run_id}/generated/ |
| ``` |
| |
| ## Live API validation |
| |
| ```json |
| {json.dumps(validation, indent=2, ensure_ascii=False)} |
| ``` |
| |
| ## Security posture |
| |
| - The target Space was created as private. |
| - No token was printed or intentionally persisted. |
| - Success was declared only after the live Gradio API returned the expected output. |
| |
| ## Next step |
| |
| Phase 3 should introduce Pi inside the Job and ask it to modify/repair this simple Space while preserving the live API validation gate. |
| """ |
| report_path.write_text(report, encoding="utf-8") |
| append_event(events_path, "report_write", "success", "Wrote report.md") |
| append_event(events_path, "done", "success", "Private Space creation worker completed") |
| except Exception as exc: |
| fail(run_dir, events_path, "Private Space creation worker failed", {"error": str(exc)}) |
| |
| |
| if __name__ == "__main__": |
| main() |
| ''' |
|
|
|
|
|
|
| PI_SPACE_WORKER_SCRIPT = 'import json\nimport os\nimport re\nimport shutil\nimport subprocess\nimport sys\nimport time\nfrom datetime import datetime, timezone\nfrom pathlib import Path\nfrom textwrap import dedent\n\n\nTARGET_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{1,95}/[A-Za-z0-9][A-Za-z0-9._-]{1,95}$")\n\n\ndef now():\n return datetime.now(timezone.utc).isoformat()\n\n\ndef write_json(path: Path, payload: dict):\n path.parent.mkdir(parents=True, exist_ok=True)\n path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\\n", encoding="utf-8")\n\n\ndef append_event(path: Path, step: str, status: str, message: str, data: dict | None = None):\n path.parent.mkdir(parents=True, exist_ok=True)\n event = {"ts": now(), "step": step, "status": status, "message": message, "data": data or {}}\n line = json.dumps(event, ensure_ascii=False)\n with path.open("a", encoding="utf-8") as f:\n f.write(line + "\\n")\n print(line, flush=True)\n\n\ndef redact_text(text: str | None) -> str:\n if not text:\n return ""\n value = text\n for secret_name in ["HF_TOKEN", "HUGGING_FACE_HUB_TOKEN"]:\n secret = os.environ.get(secret_name)\n if secret:\n value = value.replace(secret, "[REDACTED]")\n value = re.sub(r"Bearer\\s+[A-Za-z0-9_\\-.=]+", "Bearer [REDACTED]", value)\n value = re.sub(r"hf_[A-Za-z0-9_\\-]{10,}", "hf_[REDACTED]", value)\n return value\n\n\ndef safe_details(details: dict | None) -> dict:\n if not details:\n return {}\n try:\n return json.loads(redact_text(json.dumps(details, ensure_ascii=False)))\n except Exception:\n return {"redacted_details": redact_text(str(details))[-4000:]}\n\n\ndef fail(run_dir: Path, events_path: Path, message: str, details: dict | None = None, status: str = "failed"):\n safe = safe_details(details)\n append_event(events_path, "failure", "failed", message, safe)\n write_json(run_dir / "state.json", {\n "run_id": os.environ.get("RUN_ID"),\n "kind": "pi_space_smoke",\n "status": status,\n "message": message,\n "updated_at": now(),\n "details": safe,\n })\n report = f"""# Agentic Space Factory — Pi Smoke Test Report\n\nStatus: **{status}**\n\n{message}\n\n```json\n{json.dumps(safe, indent=2, ensure_ascii=False)}\n```\n"""\n (run_dir / "report.md").write_text(report, encoding="utf-8")\n raise SystemExit(1)\n\n\ndef run_cmd(cmd: list[str], *, cwd: Path | None = None, env: dict | None = None, timeout: int = 600):\n result = subprocess.run(\n cmd,\n cwd=str(cwd) if cwd else None,\n env=env,\n text=True,\n stdout=subprocess.PIPE,\n stderr=subprocess.STDOUT,\n timeout=timeout,\n )\n return result.returncode, redact_text(result.stdout)\n\n\ndef install_python_deps(events_path: Path):\n append_event(events_path, "dependencies", "started", "Installing Python worker dependencies")\n code, out = run_cmd([sys.executable, "-m", "pip", "install", "-q", "--upgrade", "huggingface_hub>=1.0.0", "gradio_client>=2.0.0", "requests>=2.31.0"], timeout=600)\n if code != 0:\n append_event(events_path, "dependencies", "failed", "Python dependency installation failed", {"output_tail": out[-4000:]})\n raise RuntimeError(out)\n append_event(events_path, "dependencies", "success", "Python worker dependencies installed")\n\n\ndef ensure_node(events_path: Path):\n node = shutil.which("node")\n npm = shutil.which("npm")\n if node and npm:\n _, node_v = run_cmd([node, "--version"], timeout=30)\n _, npm_v = run_cmd([npm, "--version"], timeout=30)\n append_event(events_path, "node", "success", "Node/npm already available", {"node": node_v.strip(), "npm": npm_v.strip()})\n return\n append_event(events_path, "node", "started", "Installing nodejs/npm through apt-get")\n code, out = run_cmd(["bash", "-lc", "apt-get update -qq && apt-get install -y -qq nodejs npm"], timeout=600)\n if code != 0:\n append_event(events_path, "node", "failed", "Could not install nodejs/npm", {"output_tail": out[-4000:]})\n raise RuntimeError(out)\n append_event(events_path, "node", "success", "Installed nodejs/npm")\n\n\ndef install_pi(events_path: Path):\n ensure_node(events_path)\n append_event(events_path, "pi_install", "started", "Installing Pi coding agent from npm")\n code, out = run_cmd(["npm", "install", "-g", "@mariozechner/pi-coding-agent"], timeout=900)\n if code != 0:\n append_event(events_path, "pi_install", "failed", "Pi npm installation failed", {"output_tail": out[-4000:]})\n raise RuntimeError(out)\n code, version = run_cmd(["pi", "--version"], timeout=60)\n append_event(events_path, "pi_install", "success", "Pi installed", {"version_output": version.strip()[-300:]})\n\n\ndef configure_pi(events_path: Path, model: str):\n pi_dir = Path.home() / ".pi" / "agent"\n pi_dir.mkdir(parents=True, exist_ok=True)\n (pi_dir / "auth.json").write_text(json.dumps({"huggingface": {"type": "api_key", "key": os.environ.get("HF_TOKEN", "")}}, indent=2), encoding="utf-8")\n (pi_dir / "settings.json").write_text(json.dumps({"defaultProvider": "huggingface", "defaultModel": model}, indent=2), encoding="utf-8")\n append_event(events_path, "pi_config", "success", "Configured Pi for Hugging Face Inference Providers", {"model": model})\n\n\ndef initial_files(target_space_id: str) -> dict[str, str]:\n app_py = dedent(\'\'\'\n import gradio as gr\n\n\n def greet(name: str) -> str:\n name = (name or "friend").strip() or "friend"\n return f"Hello {name} — this private Space was generated by Agentic Space Factory."\n\n\n demo = gr.Interface(\n fn=greet,\n inputs=gr.Textbox(label="Name", value="Hugging Face"),\n outputs=gr.Textbox(label="Result"),\n title="Generated private Space",\n description="A minimal Gradio Space created by an HF Job. Phase 3 asks Pi to modify it safely before upload.",\n examples=[["Hugging Face"], ["Agentic Space Factory"]],\n )\n\n\n if __name__ == "__main__":\n demo.launch()\n \'\'\').strip() + "\\n"\n readme = dedent(f\'\'\'\n ---\n title: Pi Modified Private Space\n emoji: 🧪\n colorFrom: green\n colorTo: blue\n sdk: gradio\n app_file: app.py\n python_version: "3.11"\n pinned: false\n ---\n\n # Pi Modified Private Space\n\n This private Space was generated by **Agentic Space Factory**.\n\n Target repo: `{target_space_id}`\n\n Phase 3 asks Pi to make a small, safe modification before the Space is uploaded and validated through the live Gradio API.\n \'\'\').strip() + "\\n"\n return {"app.py": app_py, "README.md": readme, "requirements.txt": "gradio>=5.0.0\\n"}\n\n\ndef write_workspace(workspace: Path, files: dict[str, str]):\n workspace.mkdir(parents=True, exist_ok=True)\n for name, content in files.items():\n (workspace / name).write_text(content, encoding="utf-8")\n\n\ndef run_pi(events_path: Path, workspace: Path, target_space_id: str, model: str, run_dir: Path):\n goal = f"""\nYou are running inside an ephemeral Hugging Face Job.\n\nTask:\nModify the minimal Gradio app in this directory to prove that Pi can safely edit project files before they are uploaded to a private Hugging Face Space.\n\nHard constraints:\n- Only edit app.py and README.md if needed.\n- Do not read, print, or store secrets or environment variables.\n- Preserve the Gradio interface and the greet(name: str) function.\n- The live API validation expects the output for input "Agentic Space Factory" to contain "Hello" and "Agentic Space Factory".\n- Add the exact phrase "Pi modified this app" to the returned greeting string.\n- Keep the app simple and reliable.\n- Do not create, modify, or delete Hugging Face resources.\n\nWhen done, stop. No extra explanation is required.\n\nTarget Space later: {target_space_id}\nModel configured for Pi: {model}\n""".strip()\n (workspace / "GOAL.md").write_text(goal + "\\n", encoding="utf-8")\n append_event(events_path, "pi_run", "started", "Running Pi in non-interactive print mode", {"model": model})\n env = os.environ.copy()\n env["HF_TOKEN"] = os.environ.get("HF_TOKEN", "")\n code, out = run_cmd(["pi", "-p", goal], cwd=workspace, env=env, timeout=600)\n (run_dir / "logs").mkdir(parents=True, exist_ok=True)\n (run_dir / "logs" / "pi_output.txt").write_text(out, encoding="utf-8")\n if code != 0:\n append_event(events_path, "pi_run", "failed", "Pi exited with a non-zero status", {"exit_code": code, "output_tail": out[-4000:]})\n raise RuntimeError(f"Pi failed with exit code {code}: {out[-2000:]}")\n append_event(events_path, "pi_run", "success", "Pi completed", {"output_tail": out[-1000:]})\n\n\ndef collect_pi_traces(events_path: Path, run_dir: Path):\n trace_root = Path.home() / ".pi" / "agent" / "sessions"\n raw_dir = run_dir / "traces" / "raw"\n redacted_dir = run_dir / "traces" / "redacted"\n raw_dir.mkdir(parents=True, exist_ok=True)\n redacted_dir.mkdir(parents=True, exist_ok=True)\n copied = []\n if trace_root.exists():\n for path in trace_root.rglob("*.jsonl"):\n rel = path.relative_to(trace_root)\n raw_target = raw_dir / rel\n red_target = redacted_dir / rel\n raw_target.parent.mkdir(parents=True, exist_ok=True)\n red_target.parent.mkdir(parents=True, exist_ok=True)\n text = path.read_text(encoding="utf-8", errors="replace")\n redacted = redact_text(text)\n raw_target.write_text(redacted, encoding="utf-8")\n red_target.write_text(redacted, encoding="utf-8")\n copied.append(str(rel))\n append_event(events_path, "pi_traces", "success", "Collected Pi traces", {"count": len(copied), "files": copied[:10]})\n return copied\n\n\ndef assert_pi_modified(workspace: Path):\n app_text = (workspace / "app.py").read_text(encoding="utf-8")\n if "Pi modified this app" not in app_text:\n raise RuntimeError("Pi did not add the required marker phrase to app.py")\n if "def greet" not in app_text or "gr.Interface" not in app_text:\n raise RuntimeError("Pi modification appears to have broken the minimal Gradio app")\n\n\ndef save_generated_files(run_dir: Path, workspace: Path):\n generated_dir = run_dir / "generated"\n generated_dir.mkdir(parents=True, exist_ok=True)\n for filename in ["app.py", "README.md", "requirements.txt", "GOAL.md"]:\n src = workspace / filename\n if src.exists():\n (generated_dir / filename).write_text(src.read_text(encoding="utf-8"), encoding="utf-8")\n\n\ndef create_and_upload_space(api, token: str, target_space_id: str, workspace: Path, events_path: Path):\n append_event(events_path, "create_space", "started", f"Creating private target Space {target_space_id}")\n api.create_repo(repo_id=target_space_id, repo_type="space", space_sdk="gradio", private=True, exist_ok=False, token=token)\n append_event(events_path, "create_space", "success", "Private target Space created", {"target_space": target_space_id})\n append_event(events_path, "upload_files", "started", "Uploading Pi-modified files to target Space")\n for path_in_repo in ["app.py", "README.md", "requirements.txt"]:\n content = (workspace / path_in_repo).read_bytes()\n api.upload_file(path_or_fileobj=content, path_in_repo=path_in_repo, repo_id=target_space_id, repo_type="space", token=token)\n append_event(events_path, "upload_files", "success", f"Uploaded {path_in_repo}")\n\n\ndef make_gradio_client(target_space_id: str, token: str):\n import inspect\n from gradio_client import Client\n params = inspect.signature(Client).parameters\n if "token" in params:\n return Client(target_space_id, token=token)\n if "hf_token" in params:\n return Client(target_space_id, hf_token=token)\n if "api_key" in params:\n return Client(target_space_id, api_key=token)\n if "headers" in params:\n return Client(target_space_id, headers={"Authorization": f"Bearer {token}"})\n return Client(target_space_id)\n\n\ndef get_api_schema(client):\n try:\n return client.view_api(return_format="dict")\n except TypeError:\n return client.view_api()\n\n\ndef extract_api_names(api_schema) -> list[str]:\n names = []\n def add(value):\n if not value or not isinstance(value, str):\n return\n name = value if value.startswith("/") else f"/{value}"\n if name not in names:\n names.append(name)\n def walk(obj):\n if isinstance(obj, dict):\n for key, value in obj.items():\n if key in {"api_name", "apiName"}:\n add(value)\n if isinstance(key, str) and key.startswith("/"):\n add(key)\n walk(value)\n elif isinstance(obj, list):\n for item in obj:\n walk(item)\n walk(api_schema)\n return names\n\n\ndef predict_with_available_endpoint(client, api_schema, value: str):\n candidates = extract_api_names(api_schema)\n for fallback in ["/greet", "/predict"]:\n if fallback not in candidates:\n candidates.append(fallback)\n errors = []\n for api_name in candidates:\n try:\n return api_name, client.predict(value, api_name=api_name)\n except Exception as exc:\n errors.append({"api_name": api_name, "error": str(exc)[-500:]})\n try:\n return None, client.predict(value)\n except Exception as exc:\n errors.append({"api_name": None, "error": str(exc)[-500:]})\n raise RuntimeError(f"No candidate Gradio endpoint worked: {json.dumps(errors, ensure_ascii=False)}")\n\n\ndef validate_live_api(target_space_id: str, token: str, events_path: Path, tests_dir: Path, timeout_seconds: int = 420):\n tests_dir.mkdir(parents=True, exist_ok=True)\n deadline = time.time() + timeout_seconds\n last_error = None\n attempt = 0\n append_event(events_path, "api_validation", "started", "Waiting for live Gradio API to become available")\n while time.time() < deadline:\n attempt += 1\n try:\n client = make_gradio_client(target_space_id, token)\n api_schema = get_api_schema(client)\n api_names = extract_api_names(api_schema)\n write_json(tests_dir / "api_schema.json", {"schema": api_schema, "api_names": api_names})\n used_api_name, result = predict_with_available_endpoint(client, api_schema, "Agentic Space Factory")\n result_text = str(result)\n ok = "Hello" in result_text and "Agentic Space Factory" in result_text and "Pi modified this app" in result_text\n payload = {\n "attempt": attempt,\n "target_space": target_space_id,\n "api_test_passed": ok,\n "api_name": used_api_name,\n "discovered_api_names": api_names,\n "result": result_text,\n "validated_at": now(),\n }\n write_json(tests_dir / "test_result.json", payload)\n if ok:\n append_event(events_path, "api_validation", "success", "Live Gradio API test passed", {"attempt": attempt, "api_name": used_api_name, "discovered_api_names": api_names})\n return payload\n last_error = f"Unexpected API result from {used_api_name}: {result_text}"\n except Exception as exc:\n last_error = str(exc)\n append_event(events_path, "api_validation", "waiting", "Live API not ready yet", {"attempt": attempt, "error": last_error[-1000:]})\n time.sleep(20)\n payload = {"target_space": target_space_id, "api_test_passed": False, "error": last_error, "validated_at": now()}\n write_json(tests_dir / "test_result.json", payload)\n raise RuntimeError(f"Live API validation did not pass before timeout: {last_error}")\n\n\ndef main():\n run_id = os.environ["RUN_ID"]\n hf_username = os.environ.get("HF_USERNAME", "unknown")\n bucket_source = os.environ.get("BUCKET_SOURCE", "unknown")\n output_root = Path(os.environ.get("OUTPUT_ROOT", "/output"))\n target_space_id = os.environ["TARGET_SPACE_ID"]\n token = os.environ.get("HF_TOKEN")\n pi_model = os.environ.get("PI_MODEL") or "moonshotai/Kimi-K2.5"\n run_dir = output_root / "runs" / run_id\n events_path = run_dir / "events.jsonl"\n state_path = run_dir / "state.json"\n report_path = run_dir / "report.md"\n target_json_path = run_dir / "target_space.json"\n workspace = Path("/tmp/space_factory_pi_workspace")\n\n append_event(events_path, "bootstrap", "started", "Pi Space smoke worker started")\n write_json(state_path, {\n "run_id": run_id,\n "kind": "pi_space_smoke",\n "status": "running",\n "message": "Running Pi smoke test before creating private target Space",\n "target_space": target_space_id,\n "pi_model": pi_model,\n "created_by": hf_username,\n "bucket_source": bucket_source,\n "created_at": now(),\n "updated_at": now(),\n })\n if not token:\n fail(run_dir, events_path, "HF_TOKEN is missing from Job secrets")\n if not TARGET_RE.match(target_space_id):\n fail(run_dir, events_path, "Invalid TARGET_SPACE_ID", {"target_space": target_space_id})\n if not target_space_id.startswith(f"{hf_username}/"):\n fail(run_dir, events_path, "For Phase 3, target Space must be in the signed-in user\'s namespace", {"target_space": target_space_id, "username": hf_username})\n\n try:\n install_python_deps(events_path)\n install_pi(events_path)\n configure_pi(events_path, pi_model)\n files = initial_files(target_space_id)\n write_workspace(workspace, files)\n append_event(events_path, "workspace", "success", "Prepared local workspace for Pi", {"files": list(files)})\n run_pi(events_path, workspace, target_space_id, pi_model, run_dir)\n trace_files = collect_pi_traces(events_path, run_dir)\n assert_pi_modified(workspace)\n append_event(events_path, "pi_validation", "success", "Pi made the required safe modification")\n save_generated_files(run_dir, workspace)\n\n from huggingface_hub import HfApi\n api = HfApi(token=token)\n whoami = api.whoami(token=token)\n append_event(events_path, "auth", "success", "Authenticated inside Job", {"whoami_name": whoami.get("name")})\n create_and_upload_space(api, token, target_space_id, workspace, events_path)\n write_json(target_json_path, {"target_space": target_space_id, "url": f"https://huggingface.co/spaces/{target_space_id}", "private": True, "sdk": "gradio", "created_by": hf_username})\n validation = validate_live_api(target_space_id, token, events_path, run_dir / "tests")\n trace_files = collect_pi_traces(events_path, run_dir)\n final_state = {\n "run_id": run_id,\n "kind": "pi_space_smoke",\n "status": "success",\n "message": "Pi modified a Gradio app, the private Space was created, and the live API validation passed.",\n "target_space": target_space_id,\n "target_space_url": f"https://huggingface.co/spaces/{target_space_id}",\n "pi_model": pi_model,\n "created_by": hf_username,\n "bucket_source": bucket_source,\n "validation": validation,\n "trace_files": trace_files,\n "updated_at": now(),\n "security_notes": [\n "The target Space was created as private.",\n "The HF token was redacted from saved logs/traces.",\n "Success was declared only after a live Gradio API test passed.",\n ],\n }\n write_json(state_path, final_state)\n report = f"""# Agentic Space Factory — Pi Smoke Test Report\n\nRun ID: `{run_id}`\n\nStatus: **success**\n\nCreated private Space: [`{target_space_id}`](https://huggingface.co/spaces/{target_space_id})\n\n## What happened\n\n```text\nOAuth user → HF Job → install Pi → Pi edits app.py → private Space creation → file upload → live Gradio API validation → Bucket report\n```\n\n## Pi\n\n- Model: `{pi_model}`\n- Pi traces copied: `{len(trace_files)}`\n- Trace paths: `runs/{run_id}/traces/`\n\n## Live API validation\n\n```json\n{json.dumps(validation, indent=2, ensure_ascii=False)}\n```\n\n## Security posture\n\n- The target Space was created as private.\n- The token was not intentionally printed or persisted.\n- Saved Pi outputs/traces are redacted on a best-effort basis.\n- Success was declared only after the live Gradio API returned the expected Pi-modified output.\n\n## Next step\n\nPhase 4 should ask Pi to apply the HF Spaces Agent Quickstart gist against a generated Space, still with strict private-by-default and live API validation gates.\n"""\n report_path.write_text(report, encoding="utf-8")\n append_event(events_path, "report_write", "success", "Wrote report.md")\n append_event(events_path, "done", "success", "Pi Space smoke worker completed")\n except Exception as exc:\n try:\n collect_pi_traces(events_path, run_dir)\n except Exception:\n pass\n fail(run_dir, events_path, "Pi Space smoke worker failed", {"error": str(exc)})\n\n\nif __name__ == "__main__":\n main()\n' |
|
|
|
|
| PI_GIST_WORKER_SCRIPT = 'import json\nimport os\nimport re\nimport shutil\nimport subprocess\nimport sys\nimport time\nfrom datetime import datetime, timezone\nfrom pathlib import Path\nfrom textwrap import dedent\n\nTARGET_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{1,95}/[A-Za-z0-9][A-Za-z0-9._-]{1,95}$")\nMARKER = "Pi applied the HF Spaces gist recipe"\nGIST_URL = "https://gist.github.com/gary149/2aba2962375fa9ca56bb9ef53f00b73d"\n\n\ndef now():\n return datetime.now(timezone.utc).isoformat()\n\n\ndef write_json(path: Path, payload: dict):\n path.parent.mkdir(parents=True, exist_ok=True)\n path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\\n", encoding="utf-8")\n\n\ndef append_event(path: Path, step: str, status: str, message: str, data: dict | None = None):\n path.parent.mkdir(parents=True, exist_ok=True)\n event = {"ts": now(), "step": step, "status": status, "message": message, "data": data or {}}\n line = json.dumps(event, ensure_ascii=False)\n with path.open("a", encoding="utf-8") as f:\n f.write(line + "\\n")\n print(line, flush=True)\n\n\ndef redact_text(text: str | None) -> str:\n if not text:\n return ""\n value = text\n for secret_name in ["HF_TOKEN", "HUGGING_FACE_HUB_TOKEN"]:\n secret = os.environ.get(secret_name)\n if secret:\n value = value.replace(secret, "[REDACTED]")\n value = re.sub(r"Bearer\\s+[A-Za-z0-9_\\-.=]+", "Bearer [REDACTED]", value)\n value = re.sub(r"hf_[A-Za-z0-9_\\-]{10,}", "hf_[REDACTED]", value)\n return value\n\n\ndef safe_details(details: dict | None) -> dict:\n if not details:\n return {}\n try:\n return json.loads(redact_text(json.dumps(details, ensure_ascii=False)))\n except Exception:\n return {"redacted_details": redact_text(str(details))[-4000:]}\n\n\ndef fail(run_dir: Path, events_path: Path, message: str, details: dict | None = None, status: str = "failed"):\n safe = safe_details(details)\n append_event(events_path, "failure", "failed", message, safe)\n write_json(run_dir / "state.json", {\n "run_id": os.environ.get("RUN_ID"),\n "kind": "pi_gist_recipe",\n "status": status,\n "message": message,\n "updated_at": now(),\n "details": safe,\n })\n report = f"""# Agentic Space Factory — Pi Gist Recipe Report\n\nStatus: **{status}**\n\n{message}\n\n```json\n{json.dumps(safe, indent=2, ensure_ascii=False)}\n```\n"""\n (run_dir / "report.md").write_text(report, encoding="utf-8")\n raise SystemExit(1)\n\n\ndef run_cmd(cmd: list[str], *, cwd: Path | None = None, env: dict | None = None, timeout: int = 600):\n result = subprocess.run(\n cmd,\n cwd=str(cwd) if cwd else None,\n env=env,\n text=True,\n stdout=subprocess.PIPE,\n stderr=subprocess.STDOUT,\n timeout=timeout,\n )\n return result.returncode, redact_text(result.stdout)\n\n\ndef install_python_deps(events_path: Path):\n append_event(events_path, "dependencies", "started", "Installing Python worker dependencies")\n cmd = [sys.executable, "-m", "pip", "install", "-q", "--upgrade", "huggingface_hub[cli]>=1.0.0", "gradio_client>=2.0.0"]\n code, out = run_cmd(cmd, timeout=600)\n if code != 0:\n append_event(events_path, "dependencies", "failed", "Python dependency installation failed", {"output_tail": out[-4000:]})\n raise RuntimeError(out)\n append_event(events_path, "dependencies", "success", "Python worker dependencies installed")\n\n\ndef ensure_node(events_path: Path):\n node = shutil.which("node")\n npm = shutil.which("npm")\n if node and npm:\n _, node_v = run_cmd([node, "--version"], timeout=30)\n _, npm_v = run_cmd([npm, "--version"], timeout=30)\n append_event(events_path, "node", "success", "Node/npm already available", {"node": node_v.strip(), "npm": npm_v.strip()})\n return\n append_event(events_path, "node", "started", "Installing nodejs/npm through apt-get")\n code, out = run_cmd(["bash", "-lc", "apt-get update -qq && apt-get install -y -qq nodejs npm"], timeout=600)\n if code != 0:\n append_event(events_path, "node", "failed", "Could not install nodejs/npm", {"output_tail": out[-4000:]})\n raise RuntimeError(out)\n append_event(events_path, "node", "success", "Installed nodejs/npm")\n\n\ndef install_pi(events_path: Path):\n ensure_node(events_path)\n append_event(events_path, "pi_install", "started", "Installing Pi coding agent from npm")\n code, out = run_cmd(["npm", "install", "-g", "@mariozechner/pi-coding-agent"], timeout=900)\n if code != 0:\n append_event(events_path, "pi_install", "failed", "Pi npm installation failed", {"output_tail": out[-4000:]})\n raise RuntimeError(out)\n code, version = run_cmd(["pi", "--version"], timeout=60)\n append_event(events_path, "pi_install", "success", "Pi installed", {"version_output": version.strip()[-300:]})\n\n\ndef configure_pi(events_path: Path, model: str):\n pi_dir = Path.home() / ".pi" / "agent"\n pi_dir.mkdir(parents=True, exist_ok=True)\n (pi_dir / "auth.json").write_text(json.dumps({"huggingface": {"type": "api_key", "key": os.environ.get("HF_TOKEN", "")}}, indent=2), encoding="utf-8")\n (pi_dir / "settings.json").write_text(json.dumps({"defaultProvider": "huggingface", "defaultModel": model}, indent=2), encoding="utf-8")\n append_event(events_path, "pi_config", "success", "Configured Pi for Hugging Face Inference Providers", {"model": model})\n\n\ndef seed_files(target_space_id: str) -> dict[str, str]:\n app_py = dedent(f\'\'\'\n import gradio as gr\n\n\n def greet(name: str) -> str:\n name = (name or "friend").strip() or "friend"\n return f"Hello {{name}} — {MARKER}."\n\n\n demo = gr.Interface(\n fn=greet,\n inputs=gr.Textbox(label="Name", value="Hugging Face"),\n outputs=gr.Textbox(label="Result"),\n title="Pi Gist Recipe Space",\n description="This app was created by Pi inside a Hugging Face Job while following the HF Spaces Agent Quickstart recipe.",\n examples=[["Hugging Face"], ["Agentic Space Factory"]],\n )\n\n\n if __name__ == "__main__":\n demo.launch()\n \'\'\').strip() + "\\n"\n readme = dedent(f\'\'\'\n ---\n title: Pi Gist Recipe Space\n emoji: 🧪\n colorFrom: purple\n colorTo: blue\n sdk: gradio\n app_file: app.py\n python_version: "3.11"\n pinned: false\n ---\n\n # Pi Gist Recipe Space\n\n This private Space is part of Agentic Space Factory Phase 4.\n\n Target repo: `{target_space_id}`\n\n Pi is expected to use the HF CLI, create this private Space, upload these files, inspect logs/API information, and only finish after it has performed a live validation attempt.\n \'\'\').strip() + "\\n"\n requirements = "gradio>=5.0.0\\n"\n return {"app.py": app_py, "README.md": readme, "requirements.txt": requirements}\n\n\ndef write_workspace(workspace: Path, files: dict[str, str]):\n if workspace.exists():\n shutil.rmtree(workspace)\n workspace.mkdir(parents=True, exist_ok=True)\n for name, content in files.items():\n (workspace / name).write_text(content, encoding="utf-8")\n\n\ndef save_workspace(run_dir: Path, workspace: Path):\n generated_dir = run_dir / "generated"\n generated_dir.mkdir(parents=True, exist_ok=True)\n for name in ["app.py", "README.md", "requirements.txt", "GOAL.md", "PI_SUMMARY.md"]:\n src = workspace / name\n if src.exists():\n (generated_dir / name).write_text(src.read_text(encoding="utf-8", errors="replace"), encoding="utf-8")\n\n\ndef collect_pi_traces(events_path: Path, run_dir: Path):\n trace_root = Path.home() / ".pi" / "agent" / "sessions"\n raw_dir = run_dir / "traces" / "raw"\n redacted_dir = run_dir / "traces" / "redacted"\n raw_dir.mkdir(parents=True, exist_ok=True)\n redacted_dir.mkdir(parents=True, exist_ok=True)\n copied = []\n if trace_root.exists():\n for path in trace_root.rglob("*.jsonl"):\n rel = path.relative_to(trace_root)\n raw_target = raw_dir / rel\n red_target = redacted_dir / rel\n raw_target.parent.mkdir(parents=True, exist_ok=True)\n red_target.parent.mkdir(parents=True, exist_ok=True)\n text = path.read_text(encoding="utf-8", errors="replace")\n redacted = redact_text(text)\n raw_target.write_text(redacted, encoding="utf-8")\n red_target.write_text(redacted, encoding="utf-8")\n copied.append(str(rel))\n append_event(events_path, "pi_traces", "success", "Collected Pi traces", {"count": len(copied), "files": copied[:10]})\n return copied\n\n\ndef run_pi_recipe(events_path: Path, workspace: Path, target_space_id: str, model: str, run_dir: Path):\n goal = f"""\nYou are running inside an ephemeral Hugging Face Job as a terminal coding agent.\n\nFirst, read and follow this operational guide:\n{GIST_URL}\n\nYour task:\nCreate and validate a private Hugging Face Gradio Space using the files in the current directory.\n\nTarget Space:\n{target_space_id}\n\nRequired process, matching the article/gist as closely as possible:\n1. Use the `hf` CLI. Do not use browser steps.\n2. Authenticate with the HF_TOKEN already available in the environment.\n3. Create the target Space as PRIVATE, SDK gradio.\n4. Upload app.py, README.md, and requirements.txt from this directory.\n5. Use `hf spaces logs` and/or Space runtime information to observe the real deployed Space.\n6. Use `gradio_client` or the Gradio API to inspect/call the live Space.\n7. Do not declare done until you have made a live validation attempt.\n\nSafety constraints:\n- Only operate on this exact target Space: {target_space_id}\n- Do not delete any resources.\n- Do not publish the Space publicly.\n- Do not read, print, or store secrets or environment variables.\n- Do not modify resources outside {target_space_id}.\n- Keep the app simple and reliable.\n- The live output for input "Agentic Space Factory" must contain "Hello", "Agentic Space Factory", and "{MARKER}".\n\nUseful exact commands, adapt only if needed:\n- hf auth whoami\n- hf repo create {target_space_id} --type space --space-sdk gradio --private --yes\n- hf upload {target_space_id} app.py app.py --repo-type space\n- hf upload {target_space_id} README.md README.md --repo-type space\n- hf upload {target_space_id} requirements.txt requirements.txt --repo-type space\n- hf spaces logs {target_space_id} --build\n\nWhen finished, write a short `PI_SUMMARY.md` in this directory with:\n- commands you attempted, excluding secrets\n- whether creation/upload/log/API validation succeeded\n- any errors encountered\nThen stop.\n""".strip()\n (workspace / "GOAL.md").write_text(goal + "\\n", encoding="utf-8")\n append_event(events_path, "pi_recipe", "started", "Running Pi with the HF Spaces Agent Quickstart recipe", {"model": model, "gist": GIST_URL})\n env = os.environ.copy()\n env["HF_TOKEN"] = os.environ.get("HF_TOKEN", "")\n env["HUGGING_FACE_HUB_TOKEN"] = os.environ.get("HF_TOKEN", "")\n code, out = run_cmd(["pi", "-p", goal], cwd=workspace, env=env, timeout=1200)\n (run_dir / "logs").mkdir(parents=True, exist_ok=True)\n (run_dir / "logs" / "pi_output.txt").write_text(out, encoding="utf-8")\n if code != 0:\n append_event(events_path, "pi_recipe", "failed", "Pi exited with non-zero status", {"exit_code": code, "output_tail": out[-4000:]})\n raise RuntimeError(f"Pi failed with exit code {code}: {out[-2000:]}")\n append_event(events_path, "pi_recipe", "success", "Pi completed recipe run", {"output_tail": out[-1500:]})\n\n\ndef make_gradio_client(target_space_id: str, token: str):\n import inspect\n from gradio_client import Client\n params = inspect.signature(Client).parameters\n if "token" in params:\n return Client(target_space_id, token=token)\n if "hf_token" in params:\n return Client(target_space_id, hf_token=token)\n if "api_key" in params:\n return Client(target_space_id, api_key=token)\n if "headers" in params:\n return Client(target_space_id, headers={"Authorization": f"Bearer {token}"})\n return Client(target_space_id)\n\n\ndef get_api_schema(client):\n for attr in ["view_api", "view_api_info"]:\n if hasattr(client, attr):\n value = getattr(client, attr)()\n if value is not None:\n return value\n return {}\n\n\ndef extract_api_names(api_schema):\n names = []\n seen = set()\n def add(name):\n if isinstance(name, str) and name.startswith("/") and name not in seen:\n names.append(name); seen.add(name)\n def walk(obj):\n if isinstance(obj, dict):\n for key, value in obj.items():\n if key in {"api_name", "endpoint", "name"}:\n add(value)\n if isinstance(key, str) and key.startswith("/"):\n add(key)\n walk(value)\n elif isinstance(obj, list):\n for item in obj:\n walk(item)\n walk(api_schema)\n return names\n\n\ndef predict_with_available_endpoint(client, api_schema, value: str):\n candidates = extract_api_names(api_schema)\n for fallback in ["/greet", "/predict"]:\n if fallback not in candidates:\n candidates.append(fallback)\n errors = []\n for api_name in candidates:\n try:\n return api_name, client.predict(value, api_name=api_name)\n except Exception as exc:\n errors.append({"api_name": api_name, "error": str(exc)[-500:]})\n try:\n return None, client.predict(value)\n except Exception as exc:\n errors.append({"api_name": None, "error": str(exc)[-500:]})\n raise RuntimeError(f"No candidate Gradio endpoint worked: {json.dumps(errors, ensure_ascii=False)}")\n\n\ndef validate_live_api(target_space_id: str, token: str, events_path: Path, tests_dir: Path, timeout_seconds: int = 420):\n tests_dir.mkdir(parents=True, exist_ok=True)\n deadline = time.time() + timeout_seconds\n last_error = None\n attempt = 0\n append_event(events_path, "api_validation", "started", "Wrapper waiting for live Gradio API")\n while time.time() < deadline:\n attempt += 1\n try:\n client = make_gradio_client(target_space_id, token)\n api_schema = get_api_schema(client)\n api_names = extract_api_names(api_schema)\n write_json(tests_dir / "api_schema.json", {"schema": api_schema, "api_names": api_names})\n used_api_name, result = predict_with_available_endpoint(client, api_schema, "Agentic Space Factory")\n result_text = str(result)\n ok = "Hello" in result_text and "Agentic Space Factory" in result_text and MARKER in result_text\n payload = {\n "attempt": attempt,\n "target_space": target_space_id,\n "api_test_passed": ok,\n "api_name": used_api_name,\n "discovered_api_names": api_names,\n "result": result_text,\n "validated_at": now(),\n }\n write_json(tests_dir / "test_result.json", payload)\n if ok:\n append_event(events_path, "api_validation", "success", "Wrapper live Gradio API test passed", {"attempt": attempt, "api_name": used_api_name})\n return payload\n last_error = f"Unexpected result from {used_api_name}: {result_text}"\n except Exception as exc:\n last_error = str(exc)\n append_event(events_path, "api_validation", "waiting", "Live API not ready or not valid yet", {"attempt": attempt, "error": last_error[-1000:]})\n time.sleep(20)\n payload = {"target_space": target_space_id, "api_test_passed": False, "error": last_error, "validated_at": now()}\n write_json(tests_dir / "test_result.json", payload)\n raise RuntimeError(f"Live API validation did not pass before timeout: {last_error}")\n\n\ndef main():\n run_id = os.environ["RUN_ID"]\n hf_username = os.environ.get("HF_USERNAME", "unknown")\n bucket_source = os.environ.get("BUCKET_SOURCE", "unknown")\n output_root = Path(os.environ.get("OUTPUT_ROOT", "/output"))\n target_space_id = os.environ["TARGET_SPACE_ID"]\n token = os.environ.get("HF_TOKEN")\n pi_model = os.environ.get("PI_MODEL") or "moonshotai/Kimi-K2.5"\n run_dir = output_root / "runs" / run_id\n events_path = run_dir / "events.jsonl"\n state_path = run_dir / "state.json"\n report_path = run_dir / "report.md"\n target_json_path = run_dir / "target_space.json"\n workspace = Path("/tmp/space_factory_pi_gist_workspace")\n\n append_event(events_path, "bootstrap", "started", "Pi gist recipe worker started")\n write_json(state_path, {\n "run_id": run_id,\n "kind": "pi_gist_recipe",\n "status": "running",\n "message": "Running Pi with the HF Spaces Agent Quickstart recipe",\n "target_space": target_space_id,\n "pi_model": pi_model,\n "created_by": hf_username,\n "bucket_source": bucket_source,\n "created_at": now(),\n "updated_at": now(),\n })\n if not token:\n fail(run_dir, events_path, "HF_TOKEN is missing from Job secrets")\n if not TARGET_RE.match(target_space_id):\n fail(run_dir, events_path, "Invalid TARGET_SPACE_ID", {"target_space": target_space_id})\n if not target_space_id.startswith(f"{hf_username}/"):\n fail(run_dir, events_path, "For Phase 4, target Space must be in the signed-in user\'s namespace", {"target_space": target_space_id, "username": hf_username})\n\n try:\n install_python_deps(events_path)\n from huggingface_hub import HfApi\n api = HfApi(token=token)\n whoami = api.whoami(token=token)\n append_event(events_path, "auth", "success", "Authenticated inside Job", {"whoami_name": whoami.get("name")})\n try:\n api.repo_info(repo_id=target_space_id, repo_type="space", token=token)\n fail(run_dir, events_path, "Target Space already exists. Choose a new Space name for Phase 4.", {"target_space": target_space_id})\n except Exception as exc:\n msg = str(exc)\n if "404" not in msg and "Repository Not Found" not in msg and "not found" not in msg.lower():\n append_event(events_path, "preflight", "warning", "Could not conclusively verify target Space absence; continuing cautiously", {"error": msg[-500:]})\n else:\n append_event(events_path, "preflight", "success", "Target Space does not exist yet", {"target_space": target_space_id})\n\n install_pi(events_path)\n configure_pi(events_path, pi_model)\n files = seed_files(target_space_id)\n write_workspace(workspace, files)\n append_event(events_path, "workspace", "success", "Prepared workspace for Pi gist recipe", {"files": list(files)})\n run_pi_recipe(events_path, workspace, target_space_id, pi_model, run_dir)\n save_workspace(run_dir, workspace)\n trace_files = collect_pi_traces(events_path, run_dir)\n\n info = api.repo_info(repo_id=target_space_id, repo_type="space", token=token)\n append_event(events_path, "target_space_check", "success", "Target Space exists after Pi run", {"sha": getattr(info, "sha", None)})\n write_json(target_json_path, {"target_space": target_space_id, "url": f"https://huggingface.co/spaces/{target_space_id}", "private": True, "sdk": "gradio", "created_by": hf_username})\n validation = validate_live_api(target_space_id, token, events_path, run_dir / "tests")\n trace_files = collect_pi_traces(events_path, run_dir)\n final_state = {\n "run_id": run_id,\n "kind": "pi_gist_recipe",\n "status": "success",\n "message": "Pi applied the gist-style workflow and the wrapper validated the live private Space API.",\n "target_space": target_space_id,\n "target_space_url": f"https://huggingface.co/spaces/{target_space_id}",\n "pi_model": pi_model,\n "created_by": hf_username,\n "bucket_source": bucket_source,\n "validation": validation,\n "trace_files": trace_files,\n "updated_at": now(),\n "security_notes": [\n "The target Space was required to be private.",\n "The HF token was redacted from saved logs/traces on a best-effort basis.",\n "Success was declared only after the wrapper performed its own live Gradio API test.",\n ],\n }\n write_json(state_path, final_state)\n pi_summary = (workspace / "PI_SUMMARY.md").read_text(encoding="utf-8", errors="replace") if (workspace / "PI_SUMMARY.md").exists() else "PI_SUMMARY.md was not produced."\n report = f"""# Agentic Space Factory — Pi Gist Recipe Report\n\nRun ID: `{run_id}`\n\nStatus: **success**\n\nCreated private Space: [`{target_space_id}`](https://huggingface.co/spaces/{target_space_id})\n\n## What happened\n\n```text\nOAuth user → HF Job → Pi reads gist instructions → Pi uses hf CLI → private Space creation/upload/log/API attempt → wrapper API validation → Bucket report/traces\n```\n\n## Pi\n\n- Model: `{pi_model}`\n- Gist: {GIST_URL}\n- Pi traces copied: `{len(trace_files)}`\n- Trace paths: `runs/{run_id}/traces/`\n\n## Pi summary\n\n```md\n{redact_text(pi_summary)[-6000:]}\n```\n\n## Wrapper live API validation\n\n```json\n{json.dumps(validation, indent=2, ensure_ascii=False)}\n```\n\n## Security posture\n\n- The target Space was created under the signed-in user\'s namespace.\n- The target Space remains private unless the user changes it manually.\n- The token was not intentionally printed or persisted.\n- Saved Pi outputs/traces are redacted on a best-effort basis.\n- The wrapper, not Pi alone, declared final success after a live API test.\n\n## Next step\n\nPhase 5 can introduce model-card analysis and a small set of templates, while preserving this same wrapper validation gate.\n"""\n report_path.write_text(report, encoding="utf-8")\n append_event(events_path, "report_write", "success", "Wrote report.md")\n append_event(events_path, "done", "success", "Pi gist recipe worker completed")\n except Exception as exc:\n try:\n collect_pi_traces(events_path, run_dir)\n save_workspace(run_dir, workspace)\n except Exception:\n pass\n fail(run_dir, events_path, "Pi gist recipe worker failed", {"error": str(exc)})\n\n\nif __name__ == "__main__":\n main()\n' |
|
|
| def _encode(script: str) -> str: |
| return base64.b64encode(script.encode("utf-8")).decode("ascii") |
|
|
|
|
|
|
| PI_MODEL_CARD_WORKER_SCRIPT = 'import json\nimport os\nimport re\nimport shutil\nimport subprocess\nimport sys\nimport time\nfrom datetime import datetime, timezone\nfrom pathlib import Path\nfrom textwrap import dedent\n\n\nTARGET_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{1,95}/[A-Za-z0-9][A-Za-z0-9._-]{1,95}$")\nSUPPORTED_TASKS = {"text-generation", "text2text-generation", "fill-mask", "text-classification", "sentiment-analysis"}\n\n\ndef now():\n return datetime.now(timezone.utc).isoformat()\n\n\ndef write_json(path: Path, payload: dict):\n path.parent.mkdir(parents=True, exist_ok=True)\n path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\\n", encoding="utf-8")\n\n\ndef append_event(path: Path, step: str, status: str, message: str, data: dict | None = None):\n path.parent.mkdir(parents=True, exist_ok=True)\n event = {"ts": now(), "step": step, "status": status, "message": message, "data": data or {}}\n line = json.dumps(event, ensure_ascii=False)\n with path.open("a", encoding="utf-8") as f:\n f.write(line + "\\n")\n print(line, flush=True)\n\n\ndef redact_text(text: str | None) -> str:\n if not text:\n return ""\n value = text\n for secret_name in ["HF_TOKEN", "HUGGING_FACE_HUB_TOKEN"]:\n secret = os.environ.get(secret_name)\n if secret:\n value = value.replace(secret, "[REDACTED]")\n value = re.sub(r"Bearer\\s+[A-Za-z0-9_\\-.=]+", "Bearer [REDACTED]", value)\n value = re.sub(r"hf_[A-Za-z0-9_\\-]{10,}", "hf_[REDACTED]", value)\n return value\n\n\ndef safe_details(details: dict | None) -> dict:\n if not details:\n return {}\n try:\n return json.loads(redact_text(json.dumps(details, ensure_ascii=False)))\n except Exception:\n return {"redacted_details": redact_text(str(details))[-4000:]}\n\n\ndef fail(run_dir: Path, events_path: Path, message: str, details: dict | None = None, status: str = "failed"):\n safe = safe_details(details)\n append_event(events_path, "failure", "failed", message, safe)\n write_json(run_dir / "state.json", {\n "run_id": os.environ.get("RUN_ID"),\n "kind": "pi_model_card",\n "status": status,\n "message": message,\n "updated_at": now(),\n "details": safe,\n })\n report = f"""# Agentic Space Factory — Model Card Space Report\n\nStatus: **{status}**\n\n{message}\n\n```json\n{json.dumps(safe, indent=2, ensure_ascii=False)}\n```\n"""\n (run_dir / "report.md").write_text(report, encoding="utf-8")\n raise SystemExit(1)\n\n\ndef run_cmd(cmd: list[str], *, cwd: Path | None = None, env: dict | None = None, timeout: int = 600):\n result = subprocess.run(cmd, cwd=str(cwd) if cwd else None, env=env, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=timeout)\n return result.returncode, redact_text(result.stdout)\n\n\ndef install_python_deps(events_path: Path):\n append_event(events_path, "dependencies", "started", "Installing Python worker dependencies")\n code, out = run_cmd([sys.executable, "-m", "pip", "install", "-q", "--upgrade", "huggingface_hub>=1.0.0", "gradio_client>=2.0.0", "requests>=2.31.0"], timeout=600)\n if code != 0:\n append_event(events_path, "dependencies", "failed", "Python dependency installation failed", {"output_tail": out[-4000:]})\n raise RuntimeError(out)\n append_event(events_path, "dependencies", "success", "Python worker dependencies installed")\n\n\ndef ensure_node(events_path: Path):\n node = shutil.which("node")\n npm = shutil.which("npm")\n if node and npm:\n _, node_v = run_cmd([node, "--version"], timeout=30)\n _, npm_v = run_cmd([npm, "--version"], timeout=30)\n append_event(events_path, "node", "success", "Node/npm already available", {"node": node_v.strip(), "npm": npm_v.strip()})\n return\n append_event(events_path, "node", "started", "Installing nodejs/npm through apt-get")\n code, out = run_cmd(["bash", "-lc", "apt-get update -qq && apt-get install -y -qq nodejs npm"], timeout=600)\n if code != 0:\n append_event(events_path, "node", "failed", "Could not install nodejs/npm", {"output_tail": out[-4000:]})\n raise RuntimeError(out)\n append_event(events_path, "node", "success", "Installed nodejs/npm")\n\n\ndef install_pi(events_path: Path):\n ensure_node(events_path)\n append_event(events_path, "pi_install", "started", "Installing Pi coding agent from npm")\n code, out = run_cmd(["npm", "install", "-g", "@mariozechner/pi-coding-agent"], timeout=900)\n if code != 0:\n append_event(events_path, "pi_install", "failed", "Pi npm installation failed", {"output_tail": out[-4000:]})\n raise RuntimeError(out)\n code, version = run_cmd(["pi", "--version"], timeout=60)\n append_event(events_path, "pi_install", "success", "Pi installed", {"version_output": version.strip()[-300:]})\n\n\ndef configure_pi(events_path: Path, model: str):\n pi_dir = Path.home() / ".pi" / "agent"\n pi_dir.mkdir(parents=True, exist_ok=True)\n (pi_dir / "auth.json").write_text(json.dumps({"huggingface": {"type": "api_key", "key": os.environ.get("HF_TOKEN", "")}}, indent=2), encoding="utf-8")\n (pi_dir / "settings.json").write_text(json.dumps({"model": model, "provider": "huggingface", "autoRun": True, "autoApply": True}, indent=2), encoding="utf-8")\n append_event(events_path, "pi_config", "success", "Configured Pi", {"model": model})\n\n\ndef sanitize_model_id(model_id: str) -> str:\n model_id = (model_id or "").strip().replace("https://huggingface.co/", "")\n model_id = model_id.split("?", 1)[0].strip("/")\n if not re.match(r"^[A-Za-z0-9_.-]+/[A-Za-z0-9_.-]+$", model_id):\n raise ValueError("MODEL_ID must look like owner/model-name")\n return model_id\n\n\ndef analyze_model(model_id: str, token: str, run_dir: Path, events_path: Path) -> dict:\n from huggingface_hub import HfApi, hf_hub_download\n append_event(events_path, "model_analysis", "started", "Fetching model metadata", {"model_id": model_id})\n api = HfApi(token=token)\n info = api.model_info(model_id, token=token, files_metadata=False)\n siblings = [getattr(s, "rfilename", "") for s in (info.siblings or [])]\n pipeline_tag = getattr(info, "pipeline_tag", None)\n library_name = getattr(info, "library_name", None)\n tags = list(getattr(info, "tags", []) or [])\n readme_excerpt = ""\n try:\n readme_path = hf_hub_download(repo_id=model_id, filename="README.md", token=token)\n readme_text = Path(readme_path).read_text(encoding="utf-8", errors="ignore")\n readme_excerpt = readme_text[:6000]\n except Exception as exc:\n readme_excerpt = f"Could not download README.md: {exc}"\n task = pipeline_tag or "text-generation"\n if task == "sentiment-analysis":\n task = "text-classification"\n supported = task in SUPPORTED_TASKS\n analysis = {\n "model_id": model_id,\n "pipeline_tag": pipeline_tag,\n "library_name": library_name,\n "tags": tags[:80],\n "siblings": siblings[:120],\n "selected_task": task,\n "template": "transformers_text_pipeline" if supported else "unsupported",\n "supported": supported,\n "confidence": 0.8 if supported else 0.25,\n "risks": [],\n "readme_excerpt": readme_excerpt,\n "evidence": [f"pipeline_tag={pipeline_tag}", f"library_name={library_name}", f"files={\', \'.join(siblings[:12])}"],\n }\n if not supported:\n analysis["risks"].append("Phase 5 only supports simple Transformers text pipeline tasks.")\n if "gated" in tags:\n analysis["risks"].append("Model appears gated; generated Space will not receive OAuth token as a secret in Phase 5.")\n write_json(run_dir / "model_analysis.json", analysis)\n append_event(events_path, "model_analysis", "success" if supported else "unsupported", "Model metadata analyzed", {"selected_task": task, "supported": supported, "confidence": analysis["confidence"]})\n return analysis\n\n\ndef render_app(model_id: str, task: str) -> str:\n return dedent(f\'\'\'\n import gradio as gr\n from transformers import pipeline\n\n MODEL_ID = {model_id!r}\n TASK = {task!r}\n\n pipe = pipeline(TASK, model=MODEL_ID)\n\n def run_model(text: str) -> str:\n text = (text or "Hello from Agentic Space Factory").strip() or "Hello from Agentic Space Factory"\n if TASK == "text-generation":\n result = pipe(text, max_new_tokens=32, do_sample=False)\n return result[0].get("generated_text", str(result))\n if TASK == "text2text-generation":\n result = pipe(text, max_new_tokens=64)\n return result[0].get("generated_text", str(result))\n if TASK == "fill-mask":\n mask = getattr(getattr(pipe, "tokenizer", None), "mask_token", None) or "<mask>"\n if mask not in text:\n text = f"Hugging Face is {{mask}}."\n result = pipe(text)\n return str(result[:3] if isinstance(result, list) else result)\n if TASK in {"text-classification", "sentiment-analysis"}:\n return str(pipe(text))\n return str(pipe(text))\n\n demo = gr.Interface(\n fn=run_model,\n inputs=gr.Textbox(label="Input", value="Hello from Agentic Space Factory"),\n outputs=gr.Textbox(label="Model output"),\n title=f"Model demo: {{MODEL_ID}}",\n description="Generated by Agentic Space Factory from model metadata. Pi adapted this model app.",\n examples=[["Hello from Agentic Space Factory"], ["Hugging Face is awesome"]],\n )\n\n if __name__ == "__main__":\n demo.launch()\n \'\'\').strip() + "\\n"\n\n\ndef render_readme(model_id: str, task: str, target_space_id: str) -> str:\n return dedent(f\'\'\'\n ---\n title: Model Card Generated Space\n emoji: 🤖\n colorFrom: green\n colorTo: blue\n sdk: gradio\n app_file: app.py\n python_version: "3.11"\n pinned: false\n ---\n\n # Model Card Generated Space\n\n This private Space was generated by Agentic Space Factory from `{model_id}`.\n\n - Target Space: `{target_space_id}`\n - Selected task: `{task}`\n - Template: `transformers_text_pipeline`\n\n Phase 5 is intentionally limited to simple Transformers text pipelines.\n \'\'\').strip() + "\\n"\n\n\ndef prepare_workspace(workspace: Path, run_dir: Path, model_id: str, task: str, target_space_id: str, analysis: dict, events_path: Path):\n workspace.mkdir(parents=True, exist_ok=True)\n (workspace / "app.py").write_text(render_app(model_id, task), encoding="utf-8")\n (workspace / "README.md").write_text(render_readme(model_id, task, target_space_id), encoding="utf-8")\n (workspace / "requirements.txt").write_text("gradio>=5.0.0\\nhuggingface_hub>=0.34.0,<1.0.0\\ntransformers>=4.45.0\\ntorch\\nsafetensors\\n", encoding="utf-8")\n goal = f"""You are running inside a Hugging Face Job as a coding agent.\n\nGoal: adapt the provided minimal Gradio app for the model `{model_id}` and task `{task}`.\n\nFirst, read the HF Spaces Agent Quickstart gist:\nhttps://gist.github.com/gary149/2aba2962375fa9ca56bb9ef53f00b73d\n\nRules for this Phase 5 smoke test:\n- Work only in the current workspace.\n- Do not create, delete, publish, or modify Hugging Face repos. The wrapper will create/upload the private Space.\n- Preserve `app.py`, `README.md`, and `requirements.txt`.\n- Do not remove the `huggingface_hub>=0.34.0,<1.0.0` compatibility pin from requirements.txt.\n- Preserve the `run_model` function and a Gradio Interface or Blocks app.\n- Preserve the exact marker phrase: Pi adapted this model app.\n- Keep the app simple and CPU-friendly.\n- Do not print secrets.\n- Write a short summary to `PI_SUMMARY.md`.\n\nModel analysis:\n```json\n{json.dumps({k: v for k, v in analysis.items() if k != \'readme_excerpt\'}, indent=2, ensure_ascii=False)}\n```\n\nREADME excerpt:\n{analysis.get(\'readme_excerpt\', \'\')[:3000]}\n"""\n (workspace / "GOAL.md").write_text(goal, encoding="utf-8")\n save_generated_files(run_dir, workspace)\n append_event(events_path, "workspace", "success", "Prepared model app workspace", {"files": ["app.py", "README.md", "requirements.txt", "GOAL.md"]})\n\n\ndef save_generated_files(run_dir: Path, workspace: Path):\n generated_dir = run_dir / "generated"\n generated_dir.mkdir(parents=True, exist_ok=True)\n for filename in ["app.py", "README.md", "requirements.txt", "GOAL.md", "PI_SUMMARY.md"]:\n path = workspace / filename\n if path.exists():\n (generated_dir / filename).write_text(path.read_text(encoding="utf-8", errors="ignore"), encoding="utf-8")\n\n\ndef run_pi(workspace: Path, run_dir: Path, events_path: Path, model: str):\n append_event(events_path, "pi_run", "started", "Running Pi on model-card workspace", {"model": model})\n env = os.environ.copy()\n env["HF_TOKEN"] = os.environ.get("HF_TOKEN", "")\n code, out = run_cmd(["pi", "-p", (workspace / "GOAL.md").read_text(encoding="utf-8")], cwd=workspace, env=env, timeout=1800)\n logs_dir = run_dir / "logs"\n logs_dir.mkdir(parents=True, exist_ok=True)\n (logs_dir / "pi_output.txt").write_text(out, encoding="utf-8")\n save_generated_files(run_dir, workspace)\n if code != 0:\n append_event(events_path, "pi_run", "failed", "Pi returned a non-zero exit code", {"returncode": code, "output_tail": out[-4000:]})\n raise RuntimeError("Pi failed. See logs/pi_output.txt")\n app_text = (workspace / "app.py").read_text(encoding="utf-8", errors="ignore")\n if "Pi adapted this model app" not in app_text:\n raise RuntimeError("Pi/app verification failed: expected marker phrase missing from app.py")\n append_event(events_path, "pi_run", "success", "Pi completed and preserved required marker")\n\n\ndef collect_pi_traces(run_dir: Path, events_path: Path):\n src = Path.home() / ".pi" / "agent" / "sessions"\n raw_dir = run_dir / "traces" / "raw"\n redacted_dir = run_dir / "traces" / "redacted"\n raw_dir.mkdir(parents=True, exist_ok=True)\n redacted_dir.mkdir(parents=True, exist_ok=True)\n count = 0\n if src.exists():\n for path in src.rglob("*.jsonl"):\n count += 1\n text = path.read_text(encoding="utf-8", errors="ignore")\n (raw_dir / path.name).write_text(redact_text(text), encoding="utf-8")\n (redacted_dir / path.name).write_text(redact_text(text), encoding="utf-8")\n append_event(events_path, "traces", "success", "Collected Pi traces", {"count": count})\n\n\ndef make_gradio_client(target_space_id: str, token: str):\n import inspect\n from gradio_client import Client\n params = inspect.signature(Client).parameters\n if "token" in params:\n return Client(target_space_id, token=token)\n if "hf_token" in params:\n return Client(target_space_id, hf_token=token)\n if "api_key" in params:\n return Client(target_space_id, api_key=token)\n if "headers" in params:\n return Client(target_space_id, headers={"Authorization": f"Bearer {token}"})\n return Client(target_space_id)\n\n\ndef get_api_schema(client):\n try:\n return client.view_api(return_format="dict")\n except TypeError:\n return client.view_api()\n\n\ndef extract_api_names(api_schema) -> list[str]:\n names = []\n def add(value):\n if not value or not isinstance(value, str):\n return\n name = value if value.startswith("/") else f"/{value}"\n if name not in names:\n names.append(name)\n def walk(obj):\n if isinstance(obj, dict):\n for key, value in obj.items():\n if key in {"api_name", "apiName"}:\n add(value)\n if isinstance(key, str) and key.startswith("/"):\n add(key)\n walk(value)\n elif isinstance(obj, list):\n for item in obj:\n walk(item)\n walk(api_schema)\n return names\n\n\ndef predict_with_available_endpoint(client, api_schema, value: str):\n candidates = extract_api_names(api_schema)\n for fallback in ["/run_model", "/predict", "/greet"]:\n if fallback not in candidates:\n candidates.append(fallback)\n errors = []\n for api_name in candidates:\n try:\n return api_name, client.predict(value, api_name=api_name)\n except Exception as exc:\n errors.append({"api_name": api_name, "error": str(exc)[-500:]})\n try:\n return None, client.predict(value)\n except Exception as exc:\n errors.append({"api_name": None, "error": str(exc)[-500:]})\n raise RuntimeError(f"No candidate Gradio endpoint worked: {json.dumps(errors, ensure_ascii=False)}")\n\n\ndef validate_live_api(target_space_id: str, token: str, events_path: Path, tests_dir: Path, timeout_seconds: int = 900):\n tests_dir.mkdir(parents=True, exist_ok=True)\n deadline = time.time() + timeout_seconds\n last_error = None\n attempt = 0\n append_event(events_path, "api_validation", "started", "Waiting for live model Gradio API to become available")\n while time.time() < deadline:\n attempt += 1\n try:\n client = make_gradio_client(target_space_id, token)\n api_schema = get_api_schema(client)\n api_names = extract_api_names(api_schema)\n write_json(tests_dir / "api_schema.json", {"schema": api_schema, "api_names": api_names})\n used_api_name, result = predict_with_available_endpoint(client, api_schema, "Hello from Agentic Space Factory")\n result_text = str(result)\n ok = bool(result_text and len(result_text.strip()) >= 2)\n payload = {"attempt": attempt, "target_space": target_space_id, "api_test_passed": ok, "api_name": used_api_name, "discovered_api_names": api_names, "result": result_text[:4000], "validated_at": now()}\n write_json(tests_dir / "test_result.json", payload)\n if ok:\n append_event(events_path, "api_validation", "success", "Live model API test passed", {"attempt": attempt, "api_name": used_api_name, "discovered_api_names": api_names})\n return payload\n last_error = f"Unexpected empty API result from {used_api_name}: {result_text}"\n except Exception as exc:\n last_error = str(exc)\n append_event(events_path, "api_validation", "waiting", "Live API not ready yet", {"attempt": attempt, "error": last_error[-1000:]})\n time.sleep(20)\n payload = {"target_space": target_space_id, "api_test_passed": False, "error": last_error, "validated_at": now()}\n write_json(tests_dir / "test_result.json", payload)\n raise RuntimeError(f"Live API validation did not pass before timeout: {last_error}")\n\n\ndef create_and_upload_space(api, token: str, target_space_id: str, workspace: Path, events_path: Path):\n append_event(events_path, "create_space", "started", f"Creating private target Space {target_space_id}")\n api.create_repo(repo_id=target_space_id, repo_type="space", space_sdk="gradio", private=True, exist_ok=False, token=token)\n append_event(events_path, "create_space", "success", "Private target Space created", {"target_space": target_space_id})\n append_event(events_path, "upload_files", "started", "Uploading model app files to target Space")\n for path_in_repo in ["app.py", "README.md", "requirements.txt"]:\n api.upload_file(path_or_fileobj=(workspace / path_in_repo).read_bytes(), path_in_repo=path_in_repo, repo_id=target_space_id, repo_type="space", token=token)\n append_event(events_path, "upload_files", "success", f"Uploaded {path_in_repo}")\n\n\ndef main():\n run_id = os.environ["RUN_ID"]\n hf_username = os.environ.get("HF_USERNAME", "unknown")\n bucket_source = os.environ.get("BUCKET_SOURCE", "unknown")\n output_root = Path(os.environ.get("OUTPUT_ROOT", "/output"))\n target_space_id = os.environ["TARGET_SPACE_ID"]\n model_id = sanitize_model_id(os.environ.get("MODEL_ID", ""))\n pi_model = os.environ.get("PI_MODEL") or "moonshotai/Kimi-K2.5"\n token = os.environ.get("HF_TOKEN")\n run_dir = output_root / "runs" / run_id\n events_path = run_dir / "events.jsonl"\n state_path = run_dir / "state.json"\n workspace = Path("/tmp") / f"space-factory-model-{run_id}"\n append_event(events_path, "bootstrap", "started", "Pi model-card worker started", {"model_id": model_id})\n write_json(state_path, {"run_id": run_id, "kind": "pi_model_card", "status": "running", "message": "Analyzing model card and generating a private model demo Space", "model_id": model_id, "target_space": target_space_id, "created_by": hf_username, "bucket_source": bucket_source, "created_at": now(), "updated_at": now()})\n if not token:\n fail(run_dir, events_path, "HF_TOKEN is missing from Job secrets")\n if not TARGET_RE.match(target_space_id):\n fail(run_dir, events_path, "Invalid TARGET_SPACE_ID", {"target_space": target_space_id})\n if not target_space_id.startswith(f"{hf_username}/"):\n fail(run_dir, events_path, "Target Space must be in the signed-in user\'s namespace", {"target_space": target_space_id, "username": hf_username})\n try:\n install_python_deps(events_path)\n from huggingface_hub import HfApi\n api = HfApi(token=token)\n whoami = api.whoami(token=token)\n append_event(events_path, "auth", "success", "Authenticated inside Job", {"whoami_name": whoami.get("name")})\n analysis = analyze_model(model_id, token, run_dir, events_path)\n if not analysis.get("supported"):\n fail(run_dir, events_path, "Model task is unsupported by Phase 5", {"model_analysis": {k: v for k, v in analysis.items() if k != "readme_excerpt"}}, status="unsupported")\n prepare_workspace(workspace, run_dir, model_id, analysis["selected_task"], target_space_id, analysis, events_path)\n install_pi(events_path)\n configure_pi(events_path, pi_model)\n run_pi(workspace, run_dir, events_path, pi_model)\n collect_pi_traces(run_dir, events_path)\n create_and_upload_space(api, token, target_space_id, workspace, events_path)\n write_json(run_dir / "target_space.json", {"target_space": target_space_id, "url": f"https://huggingface.co/spaces/{target_space_id}", "private": True, "sdk": "gradio", "created_by": hf_username, "model_id": model_id})\n validation = validate_live_api(target_space_id, token, events_path, run_dir / "tests")\n final_state = {"run_id": run_id, "kind": "pi_model_card", "status": "success", "message": "Model-card generated private Space created and validated through the live API.", "model_id": model_id, "target_space": target_space_id, "target_space_url": f"https://huggingface.co/spaces/{target_space_id}", "created_by": hf_username, "bucket_source": bucket_source, "model_analysis": {k: v for k, v in analysis.items() if k != "readme_excerpt"}, "validation": validation, "updated_at": now(), "security_notes": ["The target Space was created as private.", "The HF token was not printed or intentionally persisted.", "Phase 5 supports only simple public text pipeline models.", "Success was declared only after the wrapper live API test passed."]}\n write_json(state_path, final_state)\n report = f"""# Agentic Space Factory — Model Card Space Report\n\nRun ID: `{run_id}`\n\nStatus: **success**\n\nCreated private model demo Space: [`{target_space_id}`](https://huggingface.co/spaces/{target_space_id})\n\n## Model\n\n- Model ID: `{model_id}`\n- Selected task: `{analysis[\'selected_task\']}`\n- Template: `{analysis[\'template\']}`\n- Pi model: `{pi_model}`\n\n## What happened\n\n```text\nOAuth user → HF Job → model metadata analysis → Pi adapts app.py → private Space creation → live API validation → Bucket report\n```\n\n## Live API validation\n\n```json\n{json.dumps(validation, indent=2, ensure_ascii=False)}\n```\n\n## Security posture\n\n- The target Space was created as private.\n- No token was printed or intentionally persisted.\n- Pi was instructed not to create/delete/publish repos; the wrapper performed Hub operations.\n- Success was declared only after the live API returned a non-empty result.\n\n## Next step\n\nPhase 6 should add a ZeroGPU Diffusers template and stricter model compatibility gating.\n"""\n (run_dir / "report.md").write_text(report, encoding="utf-8")\n append_event(events_path, "report_write", "success", "Wrote report.md")\n append_event(events_path, "done", "success", "Pi model-card worker completed")\n except SystemExit:\n raise\n except Exception as exc:\n collect_pi_traces(run_dir, events_path)\n fail(run_dir, events_path, "Pi model-card worker failed", {"error": str(exc)})\n\n\nif __name__ == "__main__":\n main()\n' |
|
|
|
|
| RUNTIME_RECOMMENDER_WORKER_SCRIPT = 'import json\nimport os\nimport re\nimport sys\nfrom datetime import datetime, timezone\nfrom pathlib import Path\nimport subprocess\n\nSUPPORTED_TEXT_TASKS = {"text-generation", "text2text-generation", "fill-mask", "text-classification", "sentiment-analysis"}\nDIFFUSION_TASKS = {"text-to-image", "image-to-image", "image-to-video", "text-to-video"}\n\n\ndef now():\n return datetime.now(timezone.utc).isoformat()\n\n\ndef write_json(path: Path, payload: dict):\n path.parent.mkdir(parents=True, exist_ok=True)\n path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\\n", encoding="utf-8")\n\n\ndef append_event(path: Path, step: str, status: str, message: str, data: dict | None = None):\n path.parent.mkdir(parents=True, exist_ok=True)\n event = {"ts": now(), "step": step, "status": status, "message": message, "data": data or {}}\n line = json.dumps(event, ensure_ascii=False)\n with path.open("a", encoding="utf-8") as f:\n f.write(line + "\\n")\n print(line, flush=True)\n\n\ndef redact_text(text: str | None) -> str:\n if not text:\n return ""\n value = text\n for secret_name in ["HF_TOKEN", "HUGGING_FACE_HUB_TOKEN"]:\n secret = os.environ.get(secret_name)\n if secret:\n value = value.replace(secret, "[REDACTED]")\n value = re.sub(r"Bearer\\s+[A-Za-z0-9_\\-.=]+", "Bearer [REDACTED]", value)\n value = re.sub(r"hf_[A-Za-z0-9_\\-]{10,}", "hf_[REDACTED]", value)\n return value\n\n\ndef run_cmd(cmd: list[str], timeout: int = 600):\n result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=timeout)\n return result.returncode, redact_text(result.stdout)\n\n\ndef fail(run_dir: Path, events_path: Path, message: str, details: dict | None = None, status: str = "failed"):\n append_event(events_path, "failure", "failed", message, details or {})\n write_json(run_dir / "state.json", {\n "run_id": os.environ.get("RUN_ID"),\n "kind": "runtime_recommender",\n "status": status,\n "message": message,\n "updated_at": now(),\n "details": details or {},\n })\n (run_dir / "report.md").write_text(f"# Runtime Recommendation Report\\n\\nStatus: **{status}**\\n\\n{message}\\n\\n```json\\n{json.dumps(details or {}, indent=2, ensure_ascii=False)}\\n```\\n", encoding="utf-8")\n raise SystemExit(1)\n\n\ndef sanitize_model_id(model_id: str) -> str:\n model_id = (model_id or "").strip().replace("https://huggingface.co/", "")\n model_id = model_id.split("?", 1)[0].strip("/")\n if not re.match(r"^[A-Za-z0-9_.-]+/[A-Za-z0-9_.-]+$", model_id):\n raise ValueError("MODEL_ID must look like owner/model-name")\n return model_id\n\n\ndef human_bytes(value: int | None) -> str:\n if value is None:\n return "unknown"\n size = float(value)\n for unit in ["B", "KB", "MB", "GB", "TB"]:\n if size < 1024 or unit == "TB":\n return f"{size:.1f} {unit}"\n size /= 1024\n return str(value)\n\n\ndef infer_task(pipeline_tag, library_name, tags, siblings):\n if pipeline_tag:\n return "text-classification" if pipeline_tag == "sentiment-analysis" else pipeline_tag\n if library_name == "diffusers" or "model_index.json" in siblings:\n return "text-to-image"\n if "config.json" in siblings:\n return "text-generation"\n return "unknown"\n\n\ndef recommend_runtime(task: str, library_name: str | None, tags: list[str], siblings: list[str], total_size: int | None):\n risks = []\n evidence = []\n supported_for_phase5 = task in SUPPORTED_TEXT_TASKS\n is_diffusion = task in DIFFUSION_TASKS or library_name == "diffusers" or "model_index.json" in siblings\n gated = any("gated" == t or "gated" in t for t in tags)\n trust_remote = any("trust_remote_code" in t or "custom_code" in t for t in tags)\n\n if gated:\n risks.append("Model appears gated; generated Spaces will need explicit secret/token handling before runtime validation.")\n if trust_remote:\n risks.append("Model may require custom code/trust_remote_code; keep out of V1 auto-build unless manually approved.")\n if total_size is not None:\n evidence.append(f"Estimated repo file size: {human_bytes(total_size)}")\n evidence.append(f"task={task}")\n evidence.append(f"library_name={library_name}")\n\n if is_diffusion:\n if total_size and total_size > 15 * 1024**3:\n return {\n "target_runtime": "manual-review",\n "target_space_hardware": "manual-review",\n "job_flavor": "cpu-basic",\n "confidence": 0.35,\n "supported_by_current_builder": False,\n "reason": "Diffusion/video-like model appears too large for the next ZeroGPU MVP without manual review.",\n "risks": risks + ["Large diffusion/video models may exceed ZeroGPU duration/memory assumptions."],\n "evidence": evidence,\n }\n return {\n "target_runtime": "zerogpu-candidate",\n "target_space_hardware": "zero-a10g / ZeroGPU if available for the user",\n "job_flavor": "cpu-basic",\n "confidence": 0.72,\n "supported_by_current_builder": False,\n "reason": "Diffusers-style model; good candidate for Phase 7 ZeroGPU template, but not for Phase 5 CPU text template.",\n "risks": risks + ["ZeroGPU template is not implemented yet in this version."],\n "evidence": evidence,\n }\n\n if task in SUPPORTED_TEXT_TASKS:\n if total_size is None:\n runtime = "cpu-basic"\n confidence = 0.62\n reason = "Simple text pipeline task; file sizes unavailable, starting with CPU is acceptable for a guarded MVP."\n elif total_size <= 500 * 1024**2:\n runtime = "cpu-basic"\n confidence = 0.82\n reason = "Small text model; CPU Basic should be acceptable for validation demos."\n elif total_size <= 2 * 1024**3:\n runtime = "cpu-upgrade recommended"\n confidence = 0.68\n reason = "Medium text model; CPU Basic may be slow, CPU Upgrade is safer for runtime."\n risks.append("CPU Basic may cold-start or infer slowly.")\n else:\n runtime = "manual-review or GPU/Endpoint"\n confidence = 0.38\n reason = "Large text model; not suitable for automatic CPU Space generation in this MVP."\n risks.append("Model appears too large for the current CPU-only template.")\n supported_for_phase5 = False\n return {\n "target_runtime": runtime,\n "target_space_hardware": runtime,\n "job_flavor": "cpu-basic",\n "confidence": confidence,\n "supported_by_current_builder": supported_for_phase5,\n "reason": reason,\n "risks": risks,\n "evidence": evidence,\n }\n\n return {\n "target_runtime": "unsupported",\n "target_space_hardware": "manual-review",\n "job_flavor": "cpu-basic",\n "confidence": 0.25,\n "supported_by_current_builder": False,\n "reason": "Task/library combination is outside the current safe templates.",\n "risks": risks + ["Unsupported by current builder templates."],\n "evidence": evidence,\n }\n\n\ndef main():\n run_id = os.environ["RUN_ID"]\n hf_username = os.environ.get("HF_USERNAME", "unknown")\n bucket_source = os.environ.get("BUCKET_SOURCE", "unknown")\n output_root = Path(os.environ.get("OUTPUT_ROOT", "/output"))\n model_id = sanitize_model_id(os.environ.get("MODEL_ID", ""))\n token = os.environ.get("HF_TOKEN")\n run_dir = output_root / "runs" / run_id\n events_path = run_dir / "events.jsonl"\n state_path = run_dir / "state.json"\n\n append_event(events_path, "bootstrap", "started", "Runtime recommender worker started", {"model_id": model_id})\n write_json(state_path, {"run_id": run_id, "kind": "runtime_recommender", "status": "running", "message": "Analyzing model compatibility and runtime recommendation", "model_id": model_id, "created_by": hf_username, "bucket_source": bucket_source, "created_at": now(), "updated_at": now()})\n if not token:\n fail(run_dir, events_path, "HF_TOKEN is missing from Job secrets")\n\n try:\n append_event(events_path, "dependencies", "started", "Installing Python worker dependencies")\n code, out = run_cmd([sys.executable, "-m", "pip", "install", "-q", "--upgrade", "huggingface_hub>=1.0.0"], timeout=600)\n if code != 0:\n fail(run_dir, events_path, "Python dependency installation failed", {"output_tail": out[-4000:]})\n append_event(events_path, "dependencies", "success", "Python worker dependencies installed")\n\n from huggingface_hub import HfApi, hf_hub_download\n api = HfApi(token=token)\n whoami = api.whoami(token=token)\n append_event(events_path, "auth", "success", "Authenticated inside Job", {"whoami_name": whoami.get("name")})\n\n append_event(events_path, "model_analysis", "started", "Fetching model metadata", {"model_id": model_id})\n info = api.model_info(model_id, token=token, files_metadata=True)\n siblings_objs = list(info.siblings or [])\n siblings = [getattr(s, "rfilename", "") for s in siblings_objs]\n sizes = [getattr(s, "size", None) for s in siblings_objs]\n total_size = sum(x for x in sizes if isinstance(x, int)) if any(isinstance(x, int) for x in sizes) else None\n pipeline_tag = getattr(info, "pipeline_tag", None)\n library_name = getattr(info, "library_name", None)\n tags = list(getattr(info, "tags", []) or [])\n task = infer_task(pipeline_tag, library_name, tags, siblings)\n readme_excerpt = ""\n try:\n readme_path = hf_hub_download(repo_id=model_id, filename="README.md", token=token)\n readme_excerpt = Path(readme_path).read_text(encoding="utf-8", errors="ignore")[:5000]\n except Exception as exc:\n readme_excerpt = f"README unavailable: {exc}"\n\n recommendation = recommend_runtime(task, library_name, tags, siblings, total_size)\n analysis = {\n "model_id": model_id,\n "pipeline_tag": pipeline_tag,\n "library_name": library_name,\n "tags": tags[:100],\n "siblings": siblings[:160],\n "estimated_total_file_size_bytes": total_size,\n "estimated_total_file_size_human": human_bytes(total_size),\n "selected_task": task,\n "readme_excerpt": readme_excerpt,\n "runtime_recommendation": recommendation,\n }\n write_json(run_dir / "model_analysis.json", analysis)\n write_json(run_dir / "runtime_recommendation.json", recommendation)\n append_event(events_path, "model_analysis", "success", "Model metadata analyzed", {"selected_task": task, "target_runtime": recommendation["target_runtime"], "confidence": recommendation["confidence"], "supported_by_current_builder": recommendation["supported_by_current_builder"]})\n\n status = "success" if recommendation["confidence"] >= 0.35 else "needs_review"\n final_state = {"run_id": run_id, "kind": "runtime_recommender", "status": status, "message": "Runtime recommendation completed", "model_id": model_id, "created_by": hf_username, "bucket_source": bucket_source, "model_analysis": {k: v for k, v in analysis.items() if k != "readme_excerpt"}, "updated_at": now()}\n write_json(state_path, final_state)\n report = f"""# Agentic Space Factory — Runtime Recommendation Report\n\nRun ID: `{run_id}`\n\nStatus: **{status}**\n\nModel: `{model_id}`\n\n## Recommendation\n\n```json\n{json.dumps(recommendation, indent=2, ensure_ascii=False)}\n```\n\n## Model metadata\n\n- Pipeline tag: `{pipeline_tag}`\n- Library: `{library_name}`\n- Selected task: `{task}`\n- Estimated total file size: `{human_bytes(total_size)}`\n\n## Interpretation\n\n- `supported_by_current_builder=true` means Phase 5 can attempt the current simple text-pipeline builder.\n- `zerogpu-candidate` means the model looks like a candidate for the upcoming ZeroGPU/Diffusers template, but should not be sent through the CPU text builder.\n- `manual-review` means the model should not be auto-built without an explicit user decision.\n\n## Next step\n\nUse this recommendation as a gate before launching Phase 5/Phase 7 builders.\n"""\n (run_dir / "report.md").write_text(report, encoding="utf-8")\n append_event(events_path, "report_write", "success", "Wrote report.md")\n append_event(events_path, "done", "success", "Runtime recommendation completed")\n except SystemExit:\n raise\n except Exception as exc:\n fail(run_dir, events_path, "Runtime recommender worker failed", {"error": str(exc)})\n\n\nif __name__ == "__main__":\n main()\n' |
|
|
|
|
| LONGCAT_ARTICLE_WORKER_SCRIPT = r''' |
| import json |
| import os |
| import re |
| import shutil |
| import subprocess |
| import sys |
| import time |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from textwrap import dedent |
| |
| TARGET_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{1,95}/[A-Za-z0-9][A-Za-z0-9._-]{1,95}$") |
| GIST_URL = "https://gist.github.com/gary149/2aba2962375fa9ca56bb9ef53f00b73d" |
| DEFAULT_MODEL_ID = "meituan-longcat/LongCat-Video-Avatar-1.5" |
| |
| |
| def now(): |
| return datetime.now(timezone.utc).isoformat() |
| |
| |
| def write_json(path: Path, payload: dict): |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") |
| |
| |
| def append_event(path: Path, step: str, status: str, message: str, data: dict | None = None): |
| path.parent.mkdir(parents=True, exist_ok=True) |
| event = {"ts": now(), "step": step, "status": status, "message": message, "data": data or {}} |
| line = json.dumps(event, ensure_ascii=False) |
| with path.open("a", encoding="utf-8") as f: |
| f.write(line + "\n") |
| print(line, flush=True) |
| |
| |
| def redact_text(text: str | None) -> str: |
| if not text: |
| return "" |
| value = text |
| for secret_name in ["HF_TOKEN", "HUGGING_FACE_HUB_TOKEN"]: |
| secret = os.environ.get(secret_name) |
| if secret: |
| value = value.replace(secret, "[REDACTED]") |
| value = re.sub(r"Bearer\s+[A-Za-z0-9_\-.=]+", "Bearer [REDACTED]", value) |
| value = re.sub(r"hf_[A-Za-z0-9_\-]{10,}", "hf_[REDACTED]", value) |
| return value |
| |
| |
| def safe_details(details: dict | None) -> dict: |
| if not details: |
| return {} |
| try: |
| return json.loads(redact_text(json.dumps(details, ensure_ascii=False))) |
| except Exception: |
| return {"redacted_details": redact_text(str(details))[-4000:]} |
| |
| |
| def fail(run_dir: Path, events_path: Path, message: str, details: dict | None = None, status: str = "failed"): |
| safe = safe_details(details) |
| append_event(events_path, "failure", "failed", message, safe) |
| write_json(run_dir / "state.json", { |
| "run_id": os.environ.get("RUN_ID"), |
| "kind": "longcat_full_inference_gate", |
| "status": status, |
| "message": message, |
| "updated_at": now(), |
| "details": safe, |
| }) |
| report = f"""# Agentic Space Factory — LongCat Article Reproduction Report |
| |
| Status: **{status}** |
| |
| {message} |
| |
| ```json |
| {json.dumps(safe, indent=2, ensure_ascii=False)} |
| ``` |
| """ |
| (run_dir / "report.md").write_text(report, encoding="utf-8") |
| raise SystemExit(1) |
| |
| |
| def run_cmd(cmd: list[str], *, cwd: Path | None = None, env: dict | None = None, timeout: int = 600): |
| result = subprocess.run(cmd, cwd=str(cwd) if cwd else None, env=env, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=timeout) |
| return result.returncode, redact_text(result.stdout) |
| |
| |
| def install_python_deps(events_path: Path): |
| append_event(events_path, "dependencies", "started", "Installing Python worker dependencies") |
| code, out = run_cmd([sys.executable, "-m", "pip", "install", "-q", "--upgrade", "huggingface_hub>=1.0.0", "gradio_client>=2.0.0", "requests>=2.31.0"], timeout=600) |
| if code != 0: |
| append_event(events_path, "dependencies", "failed", "Python dependency installation failed", {"output_tail": out[-4000:]}) |
| raise RuntimeError(out) |
| append_event(events_path, "dependencies", "success", "Python worker dependencies installed") |
| |
| |
| def ensure_node(events_path: Path): |
| node = shutil.which("node") |
| npm = shutil.which("npm") |
| if node and npm: |
| _, node_v = run_cmd([node, "--version"], timeout=30) |
| _, npm_v = run_cmd([npm, "--version"], timeout=30) |
| append_event(events_path, "node", "success", "Node/npm already available", {"node": node_v.strip(), "npm": npm_v.strip()}) |
| return |
| append_event(events_path, "node", "started", "Installing nodejs/npm through apt-get") |
| code, out = run_cmd(["bash", "-lc", "apt-get update -qq && apt-get install -y -qq nodejs npm"], timeout=600) |
| if code != 0: |
| append_event(events_path, "node", "failed", "Could not install nodejs/npm", {"output_tail": out[-4000:]}) |
| raise RuntimeError(out) |
| append_event(events_path, "node", "success", "Installed nodejs/npm") |
| |
| |
| def install_pi(events_path: Path): |
| ensure_node(events_path) |
| append_event(events_path, "pi_install", "started", "Installing Pi coding agent from npm") |
| code, out = run_cmd(["npm", "install", "-g", "@mariozechner/pi-coding-agent"], timeout=900) |
| if code != 0: |
| append_event(events_path, "pi_install", "failed", "Pi npm installation failed", {"output_tail": out[-4000:]}) |
| raise RuntimeError(out) |
| code, version = run_cmd(["pi", "--version"], timeout=60) |
| append_event(events_path, "pi_install", "success", "Pi installed", {"version_output": version.strip()[-300:]}) |
| |
| |
| def configure_pi(events_path: Path, model: str): |
| pi_dir = Path.home() / ".pi" / "agent" |
| pi_dir.mkdir(parents=True, exist_ok=True) |
| (pi_dir / "auth.json").write_text(json.dumps({"huggingface": {"type": "api_key", "key": os.environ.get("HF_TOKEN", "")}}, indent=2), encoding="utf-8") |
| (pi_dir / "settings.json").write_text(json.dumps({"model": model, "provider": "huggingface", "autoRun": True, "autoApply": True}, indent=2), encoding="utf-8") |
| append_event(events_path, "pi_config", "success", "Configured Pi", {"model": model}) |
| |
| |
| def collect_pi_traces(run_dir: Path, events_path: Path): |
| traces_dir = Path.home() / ".pi" / "agent" / "sessions" |
| raw_dir = run_dir / "traces" / "raw" |
| redacted_dir = run_dir / "traces" / "redacted" |
| raw_dir.mkdir(parents=True, exist_ok=True) |
| redacted_dir.mkdir(parents=True, exist_ok=True) |
| count = 0 |
| if traces_dir.exists(): |
| for path in traces_dir.rglob("*.jsonl"): |
| rel = path.relative_to(traces_dir) |
| target_raw = raw_dir / rel |
| target_raw.parent.mkdir(parents=True, exist_ok=True) |
| text = path.read_text(encoding="utf-8", errors="ignore") |
| target_raw.write_text(text, encoding="utf-8") |
| target_redacted = redacted_dir / rel |
| target_redacted.parent.mkdir(parents=True, exist_ok=True) |
| target_redacted.write_text(redact_text(text), encoding="utf-8") |
| count += 1 |
| append_event(events_path, "traces", "success", "Collected Pi traces", {"count": count}) |
| return count |
| |
| |
| def sanitize_model_id(model_id: str) -> str: |
| model_id = (model_id or DEFAULT_MODEL_ID).strip().replace("https://huggingface.co/", "") |
| model_id = model_id.split("?", 1)[0].strip("/") |
| if not re.match(r"^[A-Za-z0-9_.-]+/[A-Za-z0-9_.-]+$", model_id): |
| raise ValueError("MODEL_ID must look like owner/model-name") |
| return model_id |
| |
| |
| def make_gradio_client(target_space_id: str, token: str): |
| import inspect |
| from gradio_client import Client |
| params = inspect.signature(Client).parameters |
| if "token" in params: |
| return Client(target_space_id, token=token) |
| if "hf_token" in params: |
| return Client(target_space_id, hf_token=token) |
| if "api_key" in params: |
| return Client(target_space_id, api_key=token) |
| if "headers" in params: |
| return Client(target_space_id, headers={"Authorization": f"Bearer {token}"}) |
| return Client(target_space_id) |
| |
| |
| def api_names_from_schema(schema) -> list[str]: |
| names: list[str] = [] |
| if isinstance(schema, dict): |
| endpoints = schema.get("named_endpoints") or schema.get("endpoints") or {} |
| if isinstance(endpoints, dict): |
| for key, value in endpoints.items(): |
| if isinstance(key, str) and key.startswith("/"): |
| names.append(key) |
| if isinstance(value, dict): |
| api_name = value.get("api_name") |
| if isinstance(api_name, str) and api_name.startswith("/"): |
| names.append(api_name) |
| if isinstance(schema.get("dependencies"), list): |
| for dep in schema["dependencies"]: |
| if isinstance(dep, dict): |
| api_name = dep.get("api_name") |
| if isinstance(api_name, str): |
| names.append(api_name if api_name.startswith("/") else f"/{api_name}") |
| return list(dict.fromkeys(names)) |
| |
| |
| def space_subdomain_url(target_space_id: str) -> str: |
| owner, name = target_space_id.split("/", 1) |
| # This matches the common Spaces app URL pattern. Keep conservative: our |
| # generated slugs are ASCII and hyphen-friendly. |
| return f"https://{owner}-{name}.hf.space".replace("_", "-").lower() |
| |
| |
| def runtime_to_dict(runtime) -> dict: |
| payload = {} |
| for attr in ["stage", "hardware", "requested_hardware", "sleep_time", "storage", "gc_timeout"]: |
| value = getattr(runtime, attr, None) |
| payload[attr] = getattr(value, "value", value) |
| return {k: str(v) if v is not None else None for k, v in payload.items()} |
| |
| |
| def write_space_runtime(api, target_space_id: str, token: str, run_dir: Path, events_path: Path, attempt: int | None = None) -> dict: |
| try: |
| runtime = api.get_space_runtime(repo_id=target_space_id, token=token) |
| payload = runtime_to_dict(runtime) |
| payload["attempt"] = attempt |
| write_json(run_dir / "space_runtime.json", payload) |
| return payload |
| except Exception as exc: |
| payload = {"error": str(exc)[:2000], "attempt": attempt} |
| write_json(run_dir / "space_runtime.json", payload) |
| append_event(events_path, "space_runtime", "warning", "Could not fetch Space runtime", payload) |
| return payload |
| |
| |
| def collect_space_logs(target_space_id: str, token: str, run_dir: Path, events_path: Path): |
| logs_dir = run_dir / "logs" |
| logs_dir.mkdir(parents=True, exist_ok=True) |
| env = os.environ.copy() |
| env["HF_TOKEN"] = token |
| commands = { |
| "space_logs_runtime.txt": ["hf", "spaces", "logs", target_space_id], |
| "space_logs_build.txt": ["hf", "spaces", "logs", target_space_id, "--build"], |
| } |
| written = [] |
| for filename, cmd in commands.items(): |
| try: |
| code, out = run_cmd(cmd, env=env, timeout=75) |
| (logs_dir / filename).write_text(out, encoding="utf-8") |
| written.append({"file": filename, "returncode": code, "tail": out[-1000:]}) |
| except Exception as exc: |
| written.append({"file": filename, "error": str(exc)[:1000]}) |
| append_event(events_path, "space_logs", "success", "Collected best-effort Space logs", {"files": written}) |
| return written |
| |
| |
| def validate_http_health(target_space_id: str, token: str, run_dir: Path, events_path: Path, attempt: int): |
| import requests |
| base_url = space_subdomain_url(target_space_id) |
| url = base_url.rstrip("/") + "/health" |
| headers = {"Authorization": f"Bearer {token}", "Accept": "application/json,text/plain,*/*"} |
| response = requests.get(url, headers=headers, timeout=20) |
| payload = { |
| "status": "success" if response.ok else "failed", |
| "attempt": attempt, |
| "url": url, |
| "status_code": response.status_code, |
| "content_type": response.headers.get("content-type"), |
| "text": response.text[:2000], |
| } |
| if response.ok: |
| try: |
| payload["json"] = response.json() |
| except Exception: |
| pass |
| write_json(run_dir / "tests" / "http_health.json", payload) |
| write_json(run_dir / "tests" / "test_result.json", payload | {"validator": "http_get_health"}) |
| append_event(events_path, "api_validation", "success", "HTTP /health validation passed", {"attempt": attempt, "url": url, "status_code": response.status_code}) |
| return payload | {"validator": "http_get_health"} |
| raise RuntimeError(f"HTTP /health returned {response.status_code}: {response.text[:500]}") |
| |
| |
| def validate_gradio_api(target_space_id: str, token: str, run_dir: Path, events_path: Path, attempt: int): |
| client = make_gradio_client(target_space_id, token) |
| schema = client.view_api(return_format="dict") |
| write_json(run_dir / "tests" / "api_schema.json", schema if isinstance(schema, dict) else {"schema": str(schema)}) |
| discovered = api_names_from_schema(schema) |
| candidates = [] |
| for name in ["/health", "/predict", "/greet"] + discovered: |
| if name not in candidates: |
| candidates.append(name) |
| errors = [] |
| for api_name in candidates: |
| try: |
| if api_name == "/greet": |
| result = client.predict("Agentic Space Factory", api_name=api_name) |
| else: |
| result = client.predict(api_name=api_name) |
| payload = {"status": "success", "attempt": attempt, "api_name": api_name, "discovered_api_names": discovered, "result_repr": repr(result)[:2000], "validator": "gradio_client"} |
| write_json(run_dir / "tests" / "test_result.json", payload) |
| append_event(events_path, "api_validation", "success", "Gradio API validation passed", {"attempt": attempt, "api_name": api_name, "discovered_api_names": discovered}) |
| return payload |
| except Exception as exc: |
| errors.append({"api_name": api_name, "error": str(exc)[:1000]}) |
| raise RuntimeError("; ".join(f"{e['api_name']}: {e['error']}" for e in errors[:5]) or "No callable API endpoints found") |
| |
| |
| def validate_live_api(api, target_space_id: str, token: str, run_dir: Path, events_path: Path, timeout_s: int = 900): |
| append_event(events_path, "api_validation", "started", "Waiting for live HTTP /health or Gradio API to become available") |
| deadline = time.time() + timeout_s |
| attempt = 0 |
| last_error = None |
| runtime_error_count = 0 |
| while time.time() < deadline: |
| attempt += 1 |
| runtime_payload = write_space_runtime(api, target_space_id, token, run_dir, events_path, attempt) |
| stage = str(runtime_payload.get("stage") or "").upper() |
| if "RUNTIME_ERROR" in stage: |
| runtime_error_count += 1 |
| collect_space_logs(target_space_id, token, run_dir, events_path) |
| last_error = f"Space runtime stage is {stage}" |
| if runtime_error_count >= 2: |
| raise RuntimeError(f"Space is in RUNTIME_ERROR. See logs/space_logs_runtime.txt and logs/space_logs_build.txt. Last runtime: {runtime_payload}") |
| try: |
| return validate_http_health(target_space_id, token, run_dir, events_path, attempt) |
| except Exception as exc: |
| last_error = f"HTTP /health failed: {exc}" |
| try: |
| return validate_gradio_api(target_space_id, token, run_dir, events_path, attempt) |
| except Exception as exc: |
| last_error = (last_error or "") + f"; Gradio API failed: {exc}" |
| append_event(events_path, "api_validation", "waiting", "Live health/API not ready yet", {"attempt": attempt, "runtime": runtime_payload, "error": last_error[-1500:] if last_error else None}) |
| time.sleep(30) |
| collect_space_logs(target_space_id, token, run_dir, events_path) |
| raise RuntimeError(f"Live health/API validation did not pass before timeout: {last_error}") |
| |
| |
| def request_hardware(api, target_space_id: str, hardware: str, token: str, events_path: Path, step: str, retries: int = 4): |
| """Best-effort hardware request. |
| |
| OAuth tokens can create/write Spaces but may still be unable to trigger |
| hardware changes, especially paid GPU upgrades. Treat 401/auth/billing |
| failures as manual-action-required instead of burning retries. |
| """ |
| if not hardware: |
| return {"requested": False, "hardware": hardware, "ok": False, "error": "empty hardware"} |
| last_error = None |
| for attempt in range(1, retries + 1): |
| try: |
| runtime = api.request_space_hardware(repo_id=target_space_id, hardware=hardware, token=token) |
| payload = { |
| "requested": True, |
| "hardware": hardware, |
| "ok": True, |
| "attempt": attempt, |
| "runtime_stage": getattr(getattr(runtime, "stage", None), "value", str(getattr(runtime, "stage", None))), |
| "requested_hardware": getattr(runtime, "requested_hardware", None), |
| "hardware_current": getattr(runtime, "hardware", None), |
| } |
| append_event(events_path, step, "success", f"Requested Space hardware {hardware}", payload) |
| return payload |
| except Exception as exc: |
| last_error = str(exc)[:2000] |
| auth_like = any(marker in last_error for marker in ["401", "Invalid username or password", "Unauthorized", "Repository Not Found"]) |
| payload = {"attempt": attempt, "hardware": hardware, "error": last_error, "manual_action_required": auth_like} |
| append_event(events_path, step, "failed" if auth_like or attempt == retries else "waiting", f"Could not request Space hardware {hardware}", payload) |
| if auth_like: |
| return {"requested": True, "hardware": hardware, "ok": False, "attempts": attempt, "error": last_error, "manual_action_required": True} |
| if attempt < retries: |
| time.sleep(8 * attempt) |
| return {"requested": True, "hardware": hardware, "ok": False, "attempts": retries, "error": last_error, "manual_action_required": False} |
| |
| |
| def create_initial_workspace(workspace: Path, model_id: str, target_space_id: str, preferred_hardware: str, fallback_hardware: str, allow_fallback: bool, implementation_mode: str): |
| workspace.mkdir(parents=True, exist_ok=True) |
| app_py = f"""import gradio as gr |
| |
| MODEL_ID = "{model_id}" |
| TARGET_SPACE_ID = "{target_space_id}" |
| |
| |
| def health(): |
| return {{ |
| "status": "booted", |
| "model_id": MODEL_ID, |
| "note": "Pi should replace this scaffold with a LongCat demo while preserving /health." |
| }} |
| |
| |
| def placeholder(): |
| return "LongCat demo scaffold booted. The full model pipeline was not wired yet." |
| |
| with gr.Blocks(title="LongCat Video Avatar — Agentic Space Factory") as demo: |
| gr.Markdown("# LongCat Video Avatar — Agentic Space Factory") |
| gr.Markdown("This private Space was generated by the Phase 7 article reproduction worker.") |
| gr.JSON(label="Health", value=health(), every=None) |
| gr.Button("Health check").click(fn=health, inputs=None, outputs=gr.JSON(), api_name="health") |
| gr.Button("Placeholder").click(fn=placeholder, inputs=None, outputs=gr.Textbox(), api_name="predict") |
| |
| if __name__ == "__main__": |
| demo.launch() |
| """ |
| (workspace / "app.py").write_text(app_py, encoding="utf-8") |
| req = """gradio>=5.0.0 |
| huggingface_hub>=0.34.0,<1.0.0 |
| spaces |
| transformers>=4.45.0 |
| diffusers |
| accelerate |
| safetensors |
| torch |
| kernels |
| opencv-python-headless |
| pillow |
| numpy |
| """ |
| (workspace / "requirements.txt").write_text(req, encoding="utf-8") |
| readme = f"""--- |
| title: LongCat Video Avatar Agentic Factory |
| sdk: gradio |
| app_file: app.py |
| python_version: "3.10" |
| suggested_hardware: {preferred_hardware or fallback_hardware or "zero-a10g"} |
| --- |
| |
| # LongCat Video Avatar — Agentic Space Factory |
| |
| Private generated Space for `{model_id}`. |
| |
| This Space is created by Phase 9. It should remain private until manually reviewed. |
| """ |
| (workspace / "README.md").write_text(readme, encoding="utf-8") |
| goal = f"""You are Pi running inside a Hugging Face Job for Agentic Space Factory Phase 9. |
| |
| Goal: attempt to reproduce the workflow from the Hugging Face article by building a private Space demo for: |
| |
| MODEL_ID: {model_id} |
| TARGET_SPACE_ID: {target_space_id} |
| IMPLEMENTATION_MODE: {implementation_mode} |
| |
| First read and follow the operational rules from this gist: |
| {GIST_URL} |
| |
| Important safety and product constraints: |
| - The target Space must remain private. |
| - Do not delete any user resources. |
| - Do not print secrets or tokens. |
| - Work only inside the current workspace. |
| - The wrapper will create the private Space, request hardware, upload files, and validate the live API. Do not create/delete repos yourself in this Phase 9 worker. |
| - You should edit app.py, requirements.txt, and README.md to make the closest possible real LongCat demo for the model card. |
| - Preserve a cheap /health endpoint that does not run full video generation. The wrapper uses it for boot validation. |
| - If IMPLEMENTATION_MODE is full-inference-gated or full-inference-attempt, you are NOT allowed to silently replace generation with a docs-only placeholder and call it success. |
| - In full-inference mode, either wire a real inference path as far as possible, or produce TECHNICAL_BLOCKERS.json with concrete evidence for every blocker. |
| - Investigate flash-attn alternatives before declaring it blocking: PyTorch SDPA, xformers, and HF Kernels (kernels-community/flash-attn2, kernels-community/flash-attn3, kernels-community/flash-attn4, kernels-community/vllm-flash-attn3). If they cannot replace the required API, cite the exact missing function/import. |
| - Verify whether 2-GPU context parallelism is strictly required or merely recommended. If a single-GPU low-res/low-step smoke path is possible, implement it. If not, cite the model-card command or source file that proves multi-GPU is mandatory. |
| - Prefer ZeroGPU-compatible code when possible, but allow fixed GPU runtime. Preferred hardware: {preferred_hardware}. Fallback hardware enabled: {allow_fallback}. Fallback hardware: {fallback_hardware}. |
| - If you use @spaces.GPU, decorate only the inference/generation function. Do not decorate /health. |
| - Keep the huggingface_hub pin in requirements.txt: huggingface_hub>=0.34.0,<1.0.0. |
| |
| Deliverables: |
| - app.py must boot on Hugging Face Spaces. |
| - app.py must expose /health. |
| - If real generation is implemented, generate() must attempt to return a video/file output, not only a textual diagnostic. |
| - If real generation is not implemented, write TECHNICAL_BLOCKERS.json with: full_inference_implemented=false, blockers[], evidence[], minimum_runtime, and suggested_next_step. |
| - README.md must explain the runtime strategy and limitations. |
| - If README.md frontmatter uses short_description, it must be 60 characters or fewer. |
| - Write a concise PI_SUMMARY.md with what you changed and whether full generation is implemented. |
| """ |
| (workspace / "GOAL.md").write_text(goal, encoding="utf-8") |
| return ["app.py", "requirements.txt", "README.md", "GOAL.md"] |
| |
| |
| def sanitize_readme_metadata(workspace: Path, events_path: Path): |
| readme_path = workspace / "README.md" |
| if not readme_path.exists(): |
| return |
| text = readme_path.read_text(encoding="utf-8", errors="ignore") |
| if not text.startswith("---"): |
| return |
| parts = text.split("---", 2) |
| if len(parts) < 3: |
| return |
| _, frontmatter, body = parts |
| changed = False |
| sanitized_lines = [] |
| for line in frontmatter.splitlines(): |
| if line.strip().startswith("short_description:"): |
| value = "LongCat video avatar demo" |
| sanitized_lines.append(f"short_description: {value}") |
| changed = True |
| else: |
| sanitized_lines.append(line) |
| # If Pi added other unexpectedly long one-line metadata values, leave them alone: |
| # the known Hub validation blocker for this run was short_description > 60 chars. |
| if changed: |
| new_text = "---\n" + "\n".join(sanitized_lines).strip() + "\n---" + body |
| readme_path.write_text(new_text, encoding="utf-8") |
| append_event(events_path, "metadata_sanitize", "success", "Sanitized README metadata", {"short_description": "LongCat video avatar demo"}) |
| |
| |
| def upload_workspace(api, workspace: Path, target_space_id: str, token: str, run_dir: Path, events_path: Path): |
| sanitize_readme_metadata(workspace, events_path) |
| append_event(events_path, "upload_files", "started", "Uploading generated LongCat workspace recursively") |
| gen_dir = run_dir / "generated" |
| if gen_dir.exists(): |
| shutil.rmtree(gen_dir) |
| shutil.copytree(workspace, gen_dir, ignore=shutil.ignore_patterns(".git", "node_modules", "__pycache__", "*.pyc")) |
| for filename in ["app.py", "README.md", "requirements.txt"]: |
| if not (workspace / filename).exists(): |
| raise RuntimeError(f"Missing required generated file: {filename}") |
| api.upload_folder( |
| folder_path=str(workspace), |
| repo_id=target_space_id, |
| repo_type="space", |
| token=token, |
| ignore_patterns=[".git/*", "node_modules/*", "__pycache__/*", "*.pyc", "GOAL.md"], |
| ) |
| uploaded_files = sorted(str(p.relative_to(workspace)) for p in workspace.rglob("*") if p.is_file() and "node_modules" not in p.parts and "__pycache__" not in p.parts) |
| append_event(events_path, "upload_files", "success", "Uploaded generated workspace folder", {"file_count": len(uploaded_files), "files_sample": uploaded_files[:50]}) |
| |
| |
| def load_json_if_exists(path: Path) -> dict: |
| if not path.exists(): |
| return {} |
| try: |
| return json.loads(path.read_text(encoding="utf-8", errors="replace")) |
| except Exception as exc: |
| return {"parse_error": str(exc), "raw_tail": path.read_text(encoding="utf-8", errors="replace")[-2000:]} |
| |
| |
| def infer_generation_gate(workspace: Path, implementation_mode: str, validation: dict, run_dir: Path, events_path: Path) -> dict: |
| """Classify the run separately from process success. |
| |
| /health passing means the Space boots. It does not mean the generated Space |
| performs LongCat inference. In full-inference-gated mode we require either |
| an actual implementation signal or a machine-readable blocker report. |
| """ |
| app_text = (workspace / "app.py").read_text(encoding="utf-8", errors="ignore") if (workspace / "app.py").exists() else "" |
| summary_text = (workspace / "PI_SUMMARY.md").read_text(encoding="utf-8", errors="ignore") if (workspace / "PI_SUMMARY.md").exists() else "" |
| req_text = (workspace / "requirements.txt").read_text(encoding="utf-8", errors="ignore") if (workspace / "requirements.txt").exists() else "" |
| blockers_path = workspace / "TECHNICAL_BLOCKERS.json" |
| blockers = load_json_if_exists(blockers_path) |
| |
| combined = (app_text + "\n" + summary_text).lower() |
| blocked_markers = [ |
| "full generation is not implemented", |
| "full generation is intentionally not wired", |
| "full inference is blocked", |
| "returns a detailed diagnostic", |
| "diagnostic report instead", |
| "placeholder generator", |
| "placeholder generation", |
| "info-only", |
| "not implemented", |
| "cannot run in this environment", |
| "out of scope", |
| ] |
| blocker_detected = bool(blockers) or any(m in combined for m in blocked_markers) |
| implementation_signals = { |
| "has_spaces_gpu": "@spaces.GPU" in app_text, |
| "has_torch": "torch" in req_text or "import torch" in app_text, |
| "has_diffusers": "diffusers" in req_text or "diffusers" in app_text, |
| "has_video_output_hint": any(x in app_text.lower() for x in ["gr.video", "video", ".mp4", "ffmpeg"]), |
| "health_passed": validation.get("method") in {"http_health", "gradio"}, |
| } |
| |
| if blocker_detected: |
| status = "technical_blocker" |
| message = "Space boots, but full LongCat inference was not implemented. See TECHNICAL_BLOCKERS.json / PI_SUMMARY.md." |
| elif implementation_mode in {"full-inference-gated", "full-inference-attempt"}: |
| # Without a video smoke test, do not claim real inference success. |
| status = "full_inference_candidate_health_passed" |
| message = "Space boots and contains inference signals, but no generation smoke test has validated a real video output." |
| else: |
| status = "health_only" |
| message = "Safe scaffold health validation passed. Full inference was not requested." |
| |
| if blocker_detected and not blockers: |
| blockers = { |
| "full_inference_implemented": False, |
| "source": "worker_heuristic_from_PI_SUMMARY_or_app.py", |
| "blockers": [ |
| { |
| "type": "agent_declared_or_detected_blocker", |
| "claim": "Pi-generated artifacts state that full inference is blocked/not implemented or generation returns diagnostics/placeholders.", |
| "evidence": "See PI_SUMMARY.md and app.py in generated artifacts.", |
| "severity": "blocking", |
| } |
| ], |
| "required_investigations_for_next_run": [ |
| "Check whether PyTorch SDPA can replace flash-attn calls.", |
| "Check whether HF Kernels flash-attn2/3/4 can replace required flash-attn APIs.", |
| "Verify whether 2-GPU context parallelism is strictly required or can be reduced to a single-GPU smoke test.", |
| ], |
| } |
| (workspace / "TECHNICAL_BLOCKERS.json").write_text(json.dumps(blockers, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") |
| (run_dir / "generated" / "TECHNICAL_BLOCKERS.json").write_text(json.dumps(blockers, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") |
| |
| gate = { |
| "status": status, |
| "message": message, |
| "implementation_mode": implementation_mode, |
| "blocker_detected": blocker_detected, |
| "implementation_signals": implementation_signals, |
| "validation_method": validation.get("method"), |
| "blockers": blockers, |
| } |
| write_json(run_dir / "inference_gate.json", gate) |
| append_event(events_path, "inference_gate", status, message, gate) |
| return gate |
| |
| |
| def main(): |
| run_id = os.environ["RUN_ID"] |
| hf_username = os.environ.get("HF_USERNAME", "unknown") |
| bucket_source = os.environ.get("BUCKET_SOURCE", "unknown") |
| output_root = Path(os.environ.get("OUTPUT_ROOT", "/output")) |
| target_space_id = os.environ.get("TARGET_SPACE_ID", "") |
| model_id = sanitize_model_id(os.environ.get("MODEL_ID", DEFAULT_MODEL_ID)) |
| pi_model = os.environ.get("PI_MODEL", "Qwen/Qwen3-Coder-Next") |
| preferred_hardware = os.environ.get("PREFERRED_SPACE_HARDWARE", "zero-a10g") |
| fallback_hardware = os.environ.get("FALLBACK_SPACE_HARDWARE", "l40sx1") |
| allow_fixed_gpu_fallback = os.environ.get("ALLOW_FIXED_GPU_FALLBACK", "true").lower() in {"1", "true", "yes", "on"} |
| implementation_mode = os.environ.get("IMPLEMENTATION_MODE", "full-inference-attempt") |
| token = os.environ.get("HF_TOKEN") |
| |
| run_dir = output_root / "runs" / run_id |
| events_path = run_dir / "events.jsonl" |
| state_path = run_dir / "state.json" |
| workspace = Path("/tmp/longcat_workspace") |
| |
| append_event(events_path, "bootstrap", "started", "LongCat full-inference gate worker started", {"model_id": model_id, "target_space_id": target_space_id}) |
| write_json(state_path, {"run_id": run_id, "kind": "longcat_full_inference_gate", "status": "running", "message": "Attempting LongCat full-inference gated Space creation", "model_id": model_id, "target_space": target_space_id, "created_by": hf_username, "bucket_source": bucket_source, "created_at": now(), "updated_at": now()}) |
| if not token: |
| fail(run_dir, events_path, "HF_TOKEN is missing from Job secrets") |
| if not TARGET_RE.match(target_space_id): |
| fail(run_dir, events_path, "Invalid TARGET_SPACE_ID", {"target_space_id": target_space_id}) |
| |
| try: |
| install_python_deps(events_path) |
| from huggingface_hub import HfApi |
| api = HfApi(token=token) |
| whoami = api.whoami(token=token) |
| append_event(events_path, "auth", "success", "Authenticated inside Job", {"whoami_name": whoami.get("name")}) |
| |
| append_event(events_path, "model_analysis", "started", "Fetching LongCat model metadata", {"model_id": model_id}) |
| info = api.model_info(model_id, token=token, files_metadata=True) |
| siblings = [getattr(s, "rfilename", "") for s in (info.siblings or [])] |
| analysis = {"model_id": model_id, "pipeline_tag": getattr(info, "pipeline_tag", None), "library_name": getattr(info, "library_name", None), "tags": list(getattr(info, "tags", []) or [])[:100], "siblings": siblings[:160], "article_target": model_id == DEFAULT_MODEL_ID, "preferred_hardware": preferred_hardware, "fallback_hardware": fallback_hardware, "allow_fixed_gpu_fallback": allow_fixed_gpu_fallback, "implementation_mode": implementation_mode} |
| write_json(run_dir / "model_analysis.json", analysis) |
| append_event(events_path, "model_analysis", "success", "Model metadata fetched", {"pipeline_tag": analysis["pipeline_tag"], "library_name": analysis["library_name"]}) |
| |
| create_initial_workspace(workspace, model_id, target_space_id, preferred_hardware, fallback_hardware, allow_fixed_gpu_fallback, implementation_mode) |
| append_event(events_path, "workspace", "success", "Prepared LongCat workspace", {"files": sorted(p.name for p in workspace.iterdir())}) |
| |
| install_pi(events_path) |
| configure_pi(events_path, pi_model) |
| append_event(events_path, "pi_run", "started", "Running Pi on LongCat workspace", {"model": pi_model}) |
| code, pi_out = run_cmd(["pi", "-p", (workspace / "GOAL.md").read_text(encoding="utf-8")], cwd=workspace, timeout=2400) |
| (run_dir / "logs").mkdir(parents=True, exist_ok=True) |
| (run_dir / "logs" / "pi_output.txt").write_text(pi_out, encoding="utf-8") |
| if code != 0: |
| append_event(events_path, "pi_run", "failed", "Pi returned a non-zero exit code", {"returncode": code, "output_tail": pi_out[-4000:]}) |
| collect_pi_traces(run_dir, events_path) |
| fail(run_dir, events_path, "Pi failed before Space upload", {"returncode": code, "output_tail": pi_out[-4000:]}) |
| append_event(events_path, "pi_run", "success", "Pi completed LongCat workspace pass", {"output_tail": pi_out[-2000:]}) |
| if not (workspace / "PI_SUMMARY.md").exists(): |
| (workspace / "PI_SUMMARY.md").write_text("# Pi Summary\n\nPi did not create a PI_SUMMARY.md. See logs/pi_output.txt.\n", encoding="utf-8") |
| |
| app_text = (workspace / "app.py").read_text(encoding="utf-8", errors="ignore") |
| if "/health" not in app_text and "api_name=\"health\"" not in app_text and "api_name='health'" not in app_text: |
| append_event(events_path, "pi_verification", "failed", "app.py does not appear to expose /health; injecting safe health endpoint is not implemented") |
| fail(run_dir, events_path, "Pi output did not preserve a /health endpoint") |
| append_event(events_path, "pi_verification", "success", "Pi output preserved health validation endpoint") |
| |
| append_event(events_path, "create_space", "started", "Creating private LongCat target Space", {"target_space": target_space_id}) |
| api.create_repo(repo_id=target_space_id, repo_type="space", space_sdk="gradio", private=True, exist_ok=False, token=token) |
| append_event(events_path, "create_space", "success", "Private target Space created", {"target_space": target_space_id}) |
| |
| # Upload before requesting hardware. Newly created private Spaces may not be |
| # immediately available on the hardware endpoint; uploading first also ensures |
| # the repo has valid Space metadata before any restart is triggered. |
| upload_workspace(api, workspace, target_space_id, token, run_dir, events_path) |
| |
| hardware_attempts = [] |
| preferred = request_hardware(api, target_space_id, preferred_hardware, token, events_path, "hardware_preferred") |
| hardware_attempts.append(preferred) |
| selected_hardware = preferred_hardware if preferred.get("ok") else None |
| if not selected_hardware and allow_fixed_gpu_fallback and fallback_hardware: |
| fallback = request_hardware(api, target_space_id, fallback_hardware, token, events_path, "hardware_fallback") |
| hardware_attempts.append(fallback) |
| selected_hardware = fallback_hardware if fallback.get("ok") else None |
| if not selected_hardware: |
| append_event(events_path, "hardware", "warning", "Could not request preferred/fallback hardware; Space may remain on default CPU", {"attempts": hardware_attempts}) |
| selected_hardware = "default-cpu-or-existing" |
| write_json(run_dir / "hardware_attempts.json", {"selected_hardware": selected_hardware, "attempts": hardware_attempts}) |
| |
| validation = validate_live_api(api, target_space_id, token, run_dir, events_path, timeout_s=1200) |
| inference_gate = infer_generation_gate(workspace, implementation_mode, validation, run_dir, events_path) |
| collect_pi_traces(run_dir, events_path) |
| |
| final_state = { |
| "run_id": run_id, |
| "kind": "longcat_full_inference_gate", |
| "status": inference_gate["status"], |
| "message": inference_gate["message"], |
| "model_id": model_id, |
| "target_space": target_space_id, |
| "target_space_url": f"https://huggingface.co/spaces/{target_space_id}", |
| "selected_hardware": selected_hardware, |
| "hardware_attempts": hardware_attempts, |
| "validation": validation, |
| "inference_gate": inference_gate, |
| "updated_at": now(), |
| "created_by": hf_username, |
| "bucket_source": bucket_source, |
| } |
| write_json(state_path, final_state) |
| report = f"""# Agentic Space Factory — LongCat Full-Inference Gate Report |
| |
| Run ID: `{run_id}` |
| |
| Status: **{inference_gate['status']}** |
| |
| {inference_gate['message']} |
| |
| Target Space: https://huggingface.co/spaces/{target_space_id} |
| |
| Model: `{model_id}` |
| |
| ## Hardware |
| |
| Selected/requested hardware: `{selected_hardware}` |
| |
| Hardware changes are best-effort with OAuth. If requests fail with 401/auth/billing errors, set the Space hardware manually and rerun validation. |
| |
| ```json |
| {json.dumps(hardware_attempts, indent=2, ensure_ascii=False)} |
| ``` |
| |
| ## Health validation |
| |
| The wrapper validated the live Space using HTTP `/health` first, with Gradio Client as fallback. This only proves bootability. |
| |
| ```json |
| {json.dumps(validation, indent=2, ensure_ascii=False)} |
| ``` |
| |
| ## Full-inference gate |
| |
| ```json |
| {json.dumps(inference_gate, indent=2, ensure_ascii=False)} |
| ``` |
| |
| ## Pi summary |
| |
| {(workspace / 'PI_SUMMARY.md').read_text(encoding='utf-8', errors='ignore') if (workspace / 'PI_SUMMARY.md').exists() else 'No PI_SUMMARY.md was produced.'} |
| |
| ## Safety |
| |
| - The target Space was created private. |
| - No public publication was attempted. |
| - Raw traces should remain private; redacted traces are stored separately. |
| - If fallback fixed GPU was used or selected manually, review billing/hardware settings manually after the run. |
| """ |
| (run_dir / "report.md").write_text(report, encoding="utf-8") |
| append_event(events_path, "report_write", "success", "Wrote report.md") |
| append_event(events_path, "done", inference_gate["status"], "LongCat full-inference gate completed", {"target_space": target_space_id, "selected_hardware": selected_hardware, "gate_status": inference_gate["status"]}) |
| except SystemExit: |
| raise |
| except Exception as exc: |
| try: |
| collect_pi_traces(run_dir, events_path) |
| except Exception: |
| pass |
| fail(run_dir, events_path, "LongCat full-inference gate worker failed", {"error": str(exc)}) |
| |
| |
| if __name__ == "__main__": |
| main() |
| ''' |
|
|
|
|
| UNIVERSAL_MODEL_CARD_WORKER_SCRIPT = r''' |
| |
| import json |
| import os |
| import re |
| import shutil |
| import subprocess |
| import sys |
| import time |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from textwrap import dedent |
| |
| TARGET_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{1,95}/[A-Za-z0-9][A-Za-z0-9._-]{1,95}$") |
| GIST_URL = "https://gist.github.com/gary149/2aba2962375fa9ca56bb9ef53f00b73d" |
| DEFAULT_MODEL_ID = "sshleifer/tiny-gpt2" |
| |
| |
| def now(): |
| return datetime.now(timezone.utc).isoformat() |
| |
| |
| def write_json(path: Path, payload: dict): |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") |
| |
| |
| def append_event(path: Path, step: str, status: str, message: str, data: dict | None = None): |
| path.parent.mkdir(parents=True, exist_ok=True) |
| event = {"ts": now(), "step": step, "status": status, "message": message, "data": data or {}} |
| line = json.dumps(event, ensure_ascii=False) |
| with path.open("a", encoding="utf-8") as f: |
| f.write(line + "\n") |
| print(line, flush=True) |
| |
| |
| def redact_text(text: str | None) -> str: |
| if not text: |
| return "" |
| value = text |
| for secret_name in ["HF_TOKEN", "HUGGING_FACE_HUB_TOKEN"]: |
| secret = os.environ.get(secret_name) |
| if secret: |
| value = value.replace(secret, "[REDACTED]") |
| value = re.sub(r"Bearer\s+[A-Za-z0-9_\-.=]+", "Bearer [REDACTED]", value) |
| value = re.sub(r"hf_[A-Za-z0-9_\-]{10,}", "hf_[REDACTED]", value) |
| return value |
| |
| |
| def safe_details(details: dict | None) -> dict: |
| if not details: |
| return {} |
| try: |
| return json.loads(redact_text(json.dumps(details, ensure_ascii=False))) |
| except Exception: |
| return {"redacted_details": redact_text(str(details))[-4000:]} |
| |
| |
| def fail(run_dir: Path, events_path: Path, message: str, details: dict | None = None, status: str = "failed"): |
| safe = safe_details(details) |
| append_event(events_path, "failure", "failed", message, safe) |
| write_json(run_dir / "state.json", { |
| "run_id": os.environ.get("RUN_ID"), |
| "kind": "universal_model_card_builder", |
| "status": status, |
| "message": message, |
| "updated_at": now(), |
| "details": safe, |
| }) |
| report = f"""# Agentic Space Factory — model Article Reproduction Report |
| |
| Status: **{status}** |
| |
| {message} |
| |
| ```json |
| {json.dumps(safe, indent=2, ensure_ascii=False)} |
| ``` |
| """ |
| (run_dir / "report.md").write_text(report, encoding="utf-8") |
| raise SystemExit(1) |
| |
| |
| def run_cmd(cmd: list[str], *, cwd: Path | None = None, env: dict | None = None, timeout: int = 600): |
| result = subprocess.run(cmd, cwd=str(cwd) if cwd else None, env=env, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=timeout) |
| return result.returncode, redact_text(result.stdout) |
| |
| |
| def install_python_deps(events_path: Path): |
| append_event(events_path, "dependencies", "started", "Installing Python worker dependencies") |
| code, out = run_cmd([sys.executable, "-m", "pip", "install", "-q", "--upgrade", "huggingface_hub>=1.0.0", "gradio_client>=2.0.0", "requests>=2.31.0"], timeout=600) |
| if code != 0: |
| append_event(events_path, "dependencies", "failed", "Python dependency installation failed", {"output_tail": out[-4000:]}) |
| raise RuntimeError(out) |
| append_event(events_path, "dependencies", "success", "Python worker dependencies installed") |
| |
| |
| def ensure_node(events_path: Path): |
| node = shutil.which("node") |
| npm = shutil.which("npm") |
| if node and npm: |
| _, node_v = run_cmd([node, "--version"], timeout=30) |
| _, npm_v = run_cmd([npm, "--version"], timeout=30) |
| append_event(events_path, "node", "success", "Node/npm already available", {"node": node_v.strip(), "npm": npm_v.strip()}) |
| return |
| append_event(events_path, "node", "started", "Installing nodejs/npm through apt-get") |
| code, out = run_cmd(["bash", "-lc", "apt-get update -qq && apt-get install -y -qq nodejs npm"], timeout=600) |
| if code != 0: |
| append_event(events_path, "node", "failed", "Could not install nodejs/npm", {"output_tail": out[-4000:]}) |
| raise RuntimeError(out) |
| append_event(events_path, "node", "success", "Installed nodejs/npm") |
| |
| |
| def install_pi(events_path: Path): |
| ensure_node(events_path) |
| append_event(events_path, "pi_install", "started", "Installing Pi coding agent from npm") |
| code, out = run_cmd(["npm", "install", "-g", "@mariozechner/pi-coding-agent"], timeout=900) |
| if code != 0: |
| append_event(events_path, "pi_install", "failed", "Pi npm installation failed", {"output_tail": out[-4000:]}) |
| raise RuntimeError(out) |
| code, version = run_cmd(["pi", "--version"], timeout=60) |
| append_event(events_path, "pi_install", "success", "Pi installed", {"version_output": version.strip()[-300:]}) |
| |
| |
| def configure_pi(events_path: Path, model: str): |
| pi_dir = Path.home() / ".pi" / "agent" |
| pi_dir.mkdir(parents=True, exist_ok=True) |
| (pi_dir / "auth.json").write_text(json.dumps({"huggingface": {"type": "api_key", "key": os.environ.get("HF_TOKEN", "")}}, indent=2), encoding="utf-8") |
| (pi_dir / "settings.json").write_text(json.dumps({"model": model, "provider": "huggingface", "autoRun": True, "autoApply": True}, indent=2), encoding="utf-8") |
| append_event(events_path, "pi_config", "success", "Configured Pi", {"model": model}) |
| |
| |
| def collect_pi_traces(run_dir: Path, events_path: Path): |
| traces_dir = Path.home() / ".pi" / "agent" / "sessions" |
| raw_dir = run_dir / "traces" / "raw" |
| redacted_dir = run_dir / "traces" / "redacted" |
| raw_dir.mkdir(parents=True, exist_ok=True) |
| redacted_dir.mkdir(parents=True, exist_ok=True) |
| count = 0 |
| if traces_dir.exists(): |
| for path in traces_dir.rglob("*.jsonl"): |
| rel = path.relative_to(traces_dir) |
| target_raw = raw_dir / rel |
| target_raw.parent.mkdir(parents=True, exist_ok=True) |
| text = path.read_text(encoding="utf-8", errors="ignore") |
| target_raw.write_text(text, encoding="utf-8") |
| target_redacted = redacted_dir / rel |
| target_redacted.parent.mkdir(parents=True, exist_ok=True) |
| target_redacted.write_text(redact_text(text), encoding="utf-8") |
| count += 1 |
| append_event(events_path, "traces", "success", "Collected Pi traces", {"count": count}) |
| return count |
| |
| |
| def sanitize_model_id(model_id: str) -> str: |
| model_id = (model_id or DEFAULT_MODEL_ID).strip().replace("https://huggingface.co/", "") |
| model_id = model_id.split("?", 1)[0].strip("/") |
| if not re.match(r"^[A-Za-z0-9_.-]+/[A-Za-z0-9_.-]+$", model_id): |
| raise ValueError("MODEL_ID must look like owner/model-name") |
| return model_id |
| |
| |
| def make_gradio_client(target_space_id: str, token: str): |
| import inspect |
| from gradio_client import Client |
| params = inspect.signature(Client).parameters |
| if "token" in params: |
| return Client(target_space_id, token=token) |
| if "hf_token" in params: |
| return Client(target_space_id, hf_token=token) |
| if "api_key" in params: |
| return Client(target_space_id, api_key=token) |
| if "headers" in params: |
| return Client(target_space_id, headers={"Authorization": f"Bearer {token}"}) |
| return Client(target_space_id) |
| |
| |
| def api_names_from_schema(schema) -> list[str]: |
| names: list[str] = [] |
| if isinstance(schema, dict): |
| endpoints = schema.get("named_endpoints") or schema.get("endpoints") or {} |
| if isinstance(endpoints, dict): |
| for key, value in endpoints.items(): |
| if isinstance(key, str) and key.startswith("/"): |
| names.append(key) |
| if isinstance(value, dict): |
| api_name = value.get("api_name") |
| if isinstance(api_name, str) and api_name.startswith("/"): |
| names.append(api_name) |
| if isinstance(schema.get("dependencies"), list): |
| for dep in schema["dependencies"]: |
| if isinstance(dep, dict): |
| api_name = dep.get("api_name") |
| if isinstance(api_name, str): |
| names.append(api_name if api_name.startswith("/") else f"/{api_name}") |
| return list(dict.fromkeys(names)) |
| |
| |
| def space_subdomain_url(target_space_id: str) -> str: |
| owner, name = target_space_id.split("/", 1) |
| # This matches the common Spaces app URL pattern. Keep conservative: our |
| # generated slugs are ASCII and hyphen-friendly. |
| return f"https://{owner}-{name}.hf.space".replace("_", "-").lower() |
| |
| |
| def runtime_to_dict(runtime) -> dict: |
| payload = {} |
| for attr in ["stage", "hardware", "requested_hardware", "sleep_time", "storage", "gc_timeout"]: |
| value = getattr(runtime, attr, None) |
| payload[attr] = getattr(value, "value", value) |
| return {k: str(v) if v is not None else None for k, v in payload.items()} |
| |
| |
| def write_space_runtime(api, target_space_id: str, token: str, run_dir: Path, events_path: Path, attempt: int | None = None) -> dict: |
| try: |
| runtime = api.get_space_runtime(repo_id=target_space_id, token=token) |
| payload = runtime_to_dict(runtime) |
| payload["attempt"] = attempt |
| write_json(run_dir / "space_runtime.json", payload) |
| return payload |
| except Exception as exc: |
| payload = {"error": str(exc)[:2000], "attempt": attempt} |
| write_json(run_dir / "space_runtime.json", payload) |
| append_event(events_path, "space_runtime", "warning", "Could not fetch Space runtime", payload) |
| return payload |
| |
| |
| def collect_space_logs(target_space_id: str, token: str, run_dir: Path, events_path: Path): |
| logs_dir = run_dir / "logs" |
| logs_dir.mkdir(parents=True, exist_ok=True) |
| env = os.environ.copy() |
| env["HF_TOKEN"] = token |
| commands = { |
| "space_logs_runtime.txt": ["hf", "spaces", "logs", target_space_id], |
| "space_logs_build.txt": ["hf", "spaces", "logs", target_space_id, "--build"], |
| } |
| written = [] |
| for filename, cmd in commands.items(): |
| try: |
| code, out = run_cmd(cmd, env=env, timeout=75) |
| (logs_dir / filename).write_text(out, encoding="utf-8") |
| written.append({"file": filename, "returncode": code, "tail": out[-1000:]}) |
| except Exception as exc: |
| written.append({"file": filename, "error": str(exc)[:1000]}) |
| append_event(events_path, "space_logs", "success", "Collected best-effort Space logs", {"files": written}) |
| return written |
| |
| |
| def validate_http_health(target_space_id: str, token: str, run_dir: Path, events_path: Path, attempt: int): |
| import requests |
| base_url = space_subdomain_url(target_space_id) |
| url = base_url.rstrip("/") + "/health" |
| headers = {"Authorization": f"Bearer {token}", "Accept": "application/json,text/plain,*/*"} |
| response = requests.get(url, headers=headers, timeout=20) |
| payload = { |
| "status": "success" if response.ok else "failed", |
| "attempt": attempt, |
| "url": url, |
| "status_code": response.status_code, |
| "content_type": response.headers.get("content-type"), |
| "text": response.text[:2000], |
| } |
| if response.ok: |
| try: |
| payload["json"] = response.json() |
| except Exception: |
| pass |
| write_json(run_dir / "tests" / "http_health.json", payload) |
| write_json(run_dir / "tests" / "test_result.json", payload | {"validator": "http_get_health"}) |
| append_event(events_path, "api_validation", "success", "HTTP /health validation passed", {"attempt": attempt, "url": url, "status_code": response.status_code}) |
| return payload | {"validator": "http_get_health"} |
| raise RuntimeError(f"HTTP /health returned {response.status_code}: {response.text[:500]}") |
| |
| |
| def validate_gradio_api(target_space_id: str, token: str, run_dir: Path, events_path: Path, attempt: int): |
| client = make_gradio_client(target_space_id, token) |
| schema = client.view_api(return_format="dict") |
| write_json(run_dir / "tests" / "api_schema.json", schema if isinstance(schema, dict) else {"schema": str(schema)}) |
| discovered = api_names_from_schema(schema) |
| candidates = [] |
| for name in ["/health", "/predict", "/greet"] + discovered: |
| if name not in candidates: |
| candidates.append(name) |
| errors = [] |
| for api_name in candidates: |
| try: |
| if api_name == "/greet": |
| result = client.predict("Agentic Space Factory", api_name=api_name) |
| else: |
| result = client.predict(api_name=api_name) |
| payload = {"status": "success", "attempt": attempt, "api_name": api_name, "discovered_api_names": discovered, "result_repr": repr(result)[:2000], "validator": "gradio_client"} |
| write_json(run_dir / "tests" / "test_result.json", payload) |
| append_event(events_path, "api_validation", "success", "Gradio API validation passed", {"attempt": attempt, "api_name": api_name, "discovered_api_names": discovered}) |
| return payload |
| except Exception as exc: |
| errors.append({"api_name": api_name, "error": str(exc)[:1000]}) |
| raise RuntimeError("; ".join(f"{e['api_name']}: {e['error']}" for e in errors[:5]) or "No callable API endpoints found") |
| |
| |
| def validate_live_api(api, target_space_id: str, token: str, run_dir: Path, events_path: Path, timeout_s: int = 900): |
| append_event(events_path, "api_validation", "started", "Waiting for live HTTP /health or Gradio API to become available") |
| deadline = time.time() + timeout_s |
| attempt = 0 |
| last_error = None |
| runtime_error_count = 0 |
| while time.time() < deadline: |
| attempt += 1 |
| runtime_payload = write_space_runtime(api, target_space_id, token, run_dir, events_path, attempt) |
| stage = str(runtime_payload.get("stage") or "").upper() |
| if "RUNTIME_ERROR" in stage: |
| runtime_error_count += 1 |
| collect_space_logs(target_space_id, token, run_dir, events_path) |
| last_error = f"Space runtime stage is {stage}" |
| if runtime_error_count >= 2: |
| raise RuntimeError(f"Space is in RUNTIME_ERROR. See logs/space_logs_runtime.txt and logs/space_logs_build.txt. Last runtime: {runtime_payload}") |
| try: |
| return validate_http_health(target_space_id, token, run_dir, events_path, attempt) |
| except Exception as exc: |
| last_error = f"HTTP /health failed: {exc}" |
| try: |
| return validate_gradio_api(target_space_id, token, run_dir, events_path, attempt) |
| except Exception as exc: |
| last_error = (last_error or "") + f"; Gradio API failed: {exc}" |
| append_event(events_path, "api_validation", "waiting", "Live health/API not ready yet", {"attempt": attempt, "runtime": runtime_payload, "error": last_error[-1500:] if last_error else None}) |
| time.sleep(30) |
| collect_space_logs(target_space_id, token, run_dir, events_path) |
| raise RuntimeError(f"Live health/API validation did not pass before timeout: {last_error}") |
| |
| |
| def request_hardware(api, target_space_id: str, hardware: str, token: str, events_path: Path, step: str, retries: int = 4): |
| """Best-effort hardware request. |
| |
| OAuth tokens can create/write Spaces but may still be unable to trigger |
| hardware changes, especially paid GPU upgrades. Treat 401/auth/billing |
| failures as manual-action-required instead of burning retries. |
| """ |
| if not hardware: |
| return {"requested": False, "hardware": hardware, "ok": False, "error": "empty hardware"} |
| last_error = None |
| for attempt in range(1, retries + 1): |
| try: |
| runtime = api.request_space_hardware(repo_id=target_space_id, hardware=hardware, token=token) |
| payload = { |
| "requested": True, |
| "hardware": hardware, |
| "ok": True, |
| "attempt": attempt, |
| "runtime_stage": getattr(getattr(runtime, "stage", None), "value", str(getattr(runtime, "stage", None))), |
| "requested_hardware": getattr(runtime, "requested_hardware", None), |
| "hardware_current": getattr(runtime, "hardware", None), |
| } |
| append_event(events_path, step, "success", f"Requested Space hardware {hardware}", payload) |
| return payload |
| except Exception as exc: |
| last_error = str(exc)[:2000] |
| auth_like = any(marker in last_error for marker in ["401", "Invalid username or password", "Unauthorized", "Repository Not Found"]) |
| payload = {"attempt": attempt, "hardware": hardware, "error": last_error, "manual_action_required": auth_like} |
| append_event(events_path, step, "failed" if auth_like or attempt == retries else "waiting", f"Could not request Space hardware {hardware}", payload) |
| if auth_like: |
| return {"requested": True, "hardware": hardware, "ok": False, "attempts": attempt, "error": last_error, "manual_action_required": True} |
| if attempt < retries: |
| time.sleep(8 * attempt) |
| return {"requested": True, "hardware": hardware, "ok": False, "attempts": retries, "error": last_error, "manual_action_required": False} |
| |
| |
| def create_initial_workspace(workspace: Path, model_id: str, target_space_id: str, preferred_hardware: str, fallback_hardware: str, allow_fallback: bool, implementation_mode: str, model_analysis: dict | None = None): |
| workspace.mkdir(parents=True, exist_ok=True) |
| model_analysis = model_analysis or {} |
| pipeline_tag = model_analysis.get("pipeline_tag") |
| library_name = model_analysis.get("library_name") |
| tags = model_analysis.get("tags", [])[:40] |
| siblings = model_analysis.get("siblings", [])[:60] |
| app_py = f"""import gradio as gr |
| from huggingface_hub import model_info, list_repo_files |
| |
| MODEL_ID = {model_id!r} |
| TARGET_SPACE_ID = {target_space_id!r} |
| |
| |
| def health(): |
| return {{ |
| "status": "booted", |
| "model_id": MODEL_ID, |
| "target_space_id": TARGET_SPACE_ID, |
| "stage": "initial-scaffold", |
| "note": "Pi should replace this scaffold with a model-specific demo while preserving a cheap health endpoint.", |
| }} |
| |
| |
| def placeholder(*args): |
| return "Initial scaffold. Pi should replace this with a model-specific inference path, or write TECHNICAL_BLOCKERS.json." |
| |
| with gr.Blocks(title="Generated Model Space — Agentic Space Factory") as demo: |
| gr.Markdown("# Generated Model Space — Agentic Space Factory") |
| gr.Markdown(f"Private generated Space for `{{MODEL_ID}}`.") |
| gr.JSON(label="Health", value=health(), every=None) |
| gr.Button("Health check").click(fn=health, inputs=None, outputs=gr.JSON(), api_name="health") |
| gr.Textbox(label="Input", value="Hello from Agentic Space Factory").submit(fn=placeholder, inputs=None, outputs=gr.Textbox(), api_name="predict") |
| gr.Button("Run placeholder").click(fn=placeholder, inputs=None, outputs=gr.Textbox(), api_name="predict") |
| |
| if __name__ == "__main__": |
| demo.launch() |
| """ |
| (workspace / "app.py").write_text(app_py, encoding="utf-8") |
| req = """gradio>=5.0.0 |
| huggingface_hub>=0.34.0,<1.0.0 |
| spaces |
| transformers>=4.45.0 |
| diffusers |
| accelerate |
| safetensors |
| torch |
| kernels |
| pillow |
| numpy |
| requests |
| """ |
| (workspace / "requirements.txt").write_text(req, encoding="utf-8") |
| readme = f"""--- |
| title: Generated Model Space |
| sdk: gradio |
| app_file: app.py |
| python_version: "3.10" |
| suggested_hardware: {preferred_hardware or fallback_hardware or "cpu-basic"} |
| short_description: "Agent-built model demo" |
| --- |
| |
| # Generated Model Space — Agentic Space Factory |
| |
| Private generated Space for `{model_id}`. |
| |
| This Space is created by Phase 10. It should remain private until manually reviewed. |
| """ |
| (workspace / "README.md").write_text(readme, encoding="utf-8") |
| analysis_json = json.dumps({"pipeline_tag": pipeline_tag, "library_name": library_name, "tags": tags, "siblings": siblings}, indent=2, ensure_ascii=False) |
| goal = f"""You are Pi running inside a Hugging Face Job for Agentic Space Factory Phase 10. |
| |
| Goal: build the best possible private Hugging Face Space demo for an arbitrary model card. |
| |
| MODEL_ID: {model_id} |
| TARGET_SPACE_ID: {target_space_id} |
| IMPLEMENTATION_MODE: {implementation_mode} |
| MODEL_METADATA: |
| ```json |
| {analysis_json} |
| ``` |
| |
| First read and follow the operational rules from this gist: |
| {GIST_URL} |
| |
| Non-negotiable safety and product constraints: |
| - The target Space must remain private. |
| - Do not delete any user resources. |
| - Do not print secrets or tokens. |
| - Work only inside the current workspace. |
| - The wrapper will create the private Space, request hardware best-effort, upload files, and validate the live app. Do not create/delete repos yourself in this Phase 10 worker. |
| - Preserve a cheap health endpoint named `health` with `api_name="health"`. It must not load weights, run GPU work, or download large files. |
| - Keep the huggingface_hub pin in requirements.txt: huggingface_hub>=0.34.0,<1.0.0. |
| - README.md frontmatter must remain valid; if it uses short_description, it must be 60 characters or fewer. |
| |
| Implementation contract: |
| - If IMPLEMENTATION_MODE is `full-inference-gated`, you are not allowed to silently replace generation with a placeholder and call it success. |
| - Try to implement the closest real inference path for the model card using evidence from README, model metadata, config files, and repo files. |
| - You may choose an appropriate Gradio UI for the task: text, image, audio, video, multimodal, embeddings, classification, etc. |
| - If the model is standard and feasible, implement a real generate/predict function and expose it as a Gradio endpoint. |
| - If the model requires GPU, add ZeroGPU-compatible `@spaces.GPU(...)` only around the inference function. Do not decorate health. |
| - If the model requires special dependencies, include them only when needed and document risks. |
| - Investigate compatibility fallbacks before declaring a blocker: PyTorch SDPA, xformers, HF Kernels where relevant, CPU/offload/lazy loading, smaller resolution/steps, safe smoke-test inputs. |
| - If real inference is impossible or unsafe in a Space, write TECHNICAL_BLOCKERS.json with concrete evidence for every blocker. |
| |
| Deliverables: |
| - app.py must boot on Hugging Face Spaces. |
| - app.py must expose health/api_name="health". |
| - If real generation is implemented, generate/predict must attempt a real model call, not only return a textual diagnostic. |
| - If real generation is not implemented, write TECHNICAL_BLOCKERS.json with: full_inference_implemented=false, blockers[], evidence[], minimum_runtime, and suggested_next_step. |
| - Write INFERENCE_CONTRACT.json with: full_inference_implemented, health_endpoint, primary_api_name, expected_output_type, validation_level, requires_gpu, estimated_vram, and blockers_count. |
| - README.md must explain the runtime strategy, task, limitations, and how to test. |
| - Write a concise PI_SUMMARY.md with what you changed and whether full inference is implemented. |
| """ |
| (workspace / "GOAL.md").write_text(goal, encoding="utf-8") |
| return ["app.py", "requirements.txt", "README.md", "GOAL.md"] |
| |
| |
| def sanitize_readme_metadata(workspace: Path, events_path: Path): |
| readme_path = workspace / "README.md" |
| if not readme_path.exists(): |
| return |
| text = readme_path.read_text(encoding="utf-8", errors="ignore") |
| if not text.startswith("---"): |
| return |
| parts = text.split("---", 2) |
| if len(parts) < 3: |
| return |
| _, frontmatter, body = parts |
| changed = False |
| sanitized_lines = [] |
| for line in frontmatter.splitlines(): |
| if line.strip().startswith("short_description:"): |
| value = "model video avatar demo" |
| sanitized_lines.append(f"short_description: {value}") |
| changed = True |
| else: |
| sanitized_lines.append(line) |
| # If Pi added other unexpectedly long one-line metadata values, leave them alone: |
| # the known Hub validation blocker for this run was short_description > 60 chars. |
| if changed: |
| new_text = "---\n" + "\n".join(sanitized_lines).strip() + "\n---" + body |
| readme_path.write_text(new_text, encoding="utf-8") |
| append_event(events_path, "metadata_sanitize", "success", "Sanitized README metadata", {"short_description": "model video avatar demo"}) |
| |
| |
| def upload_workspace(api, workspace: Path, target_space_id: str, token: str, run_dir: Path, events_path: Path): |
| sanitize_readme_metadata(workspace, events_path) |
| append_event(events_path, "upload_files", "started", "Uploading generated universal model-card workspace recursively") |
| gen_dir = run_dir / "generated" |
| if gen_dir.exists(): |
| shutil.rmtree(gen_dir) |
| shutil.copytree(workspace, gen_dir, ignore=shutil.ignore_patterns(".git", "node_modules", "__pycache__", "*.pyc")) |
| for filename in ["app.py", "README.md", "requirements.txt"]: |
| if not (workspace / filename).exists(): |
| raise RuntimeError(f"Missing required generated file: {filename}") |
| api.upload_folder( |
| folder_path=str(workspace), |
| repo_id=target_space_id, |
| repo_type="space", |
| token=token, |
| ignore_patterns=[".git/*", "node_modules/*", "__pycache__/*", "*.pyc", "GOAL.md"], |
| ) |
| uploaded_files = sorted(str(p.relative_to(workspace)) for p in workspace.rglob("*") if p.is_file() and "node_modules" not in p.parts and "__pycache__" not in p.parts) |
| append_event(events_path, "upload_files", "success", "Uploaded generated workspace folder", {"file_count": len(uploaded_files), "files_sample": uploaded_files[:50]}) |
| |
| |
| def load_json_if_exists(path: Path) -> dict: |
| if not path.exists(): |
| return {} |
| try: |
| return json.loads(path.read_text(encoding="utf-8", errors="replace")) |
| except Exception as exc: |
| return {"parse_error": str(exc), "raw_tail": path.read_text(encoding="utf-8", errors="replace")[-2000:]} |
| |
| |
| def infer_generation_gate(workspace: Path, implementation_mode: str, validation: dict, run_dir: Path, events_path: Path) -> dict: |
| """Classify the run separately from process success. |
| |
| /health passing means the Space boots. It does not mean the generated Space |
| performs model inference. In full-inference-gated mode we require either |
| an actual implementation signal or a machine-readable blocker report. |
| """ |
| app_text = (workspace / "app.py").read_text(encoding="utf-8", errors="ignore") if (workspace / "app.py").exists() else "" |
| summary_text = (workspace / "PI_SUMMARY.md").read_text(encoding="utf-8", errors="ignore") if (workspace / "PI_SUMMARY.md").exists() else "" |
| req_text = (workspace / "requirements.txt").read_text(encoding="utf-8", errors="ignore") if (workspace / "requirements.txt").exists() else "" |
| blockers_path = workspace / "TECHNICAL_BLOCKERS.json" |
| blockers = load_json_if_exists(blockers_path) |
| |
| combined = (app_text + "\n" + summary_text).lower() |
| blocked_markers = [ |
| "full generation is not implemented", |
| "full generation is intentionally not wired", |
| "full inference is blocked", |
| "returns a detailed diagnostic", |
| "diagnostic report instead", |
| "placeholder generator", |
| "placeholder generation", |
| "info-only", |
| "not implemented", |
| "cannot run in this environment", |
| "out of scope", |
| ] |
| blocker_detected = bool(blockers) or any(m in combined for m in blocked_markers) |
| implementation_signals = { |
| "has_spaces_gpu": "@spaces.GPU" in app_text, |
| "has_torch": "torch" in req_text or "import torch" in app_text, |
| "has_diffusers": "diffusers" in req_text or "diffusers" in app_text, |
| "has_video_output_hint": any(x in app_text.lower() for x in ["gr.video", "video", ".mp4", "ffmpeg"]), |
| "health_passed": validation.get("method") in {"http_health", "gradio"}, |
| } |
| |
| if blocker_detected: |
| status = "technical_blocker" |
| message = "Space boots, but full model inference was not implemented. See TECHNICAL_BLOCKERS.json / PI_SUMMARY.md." |
| elif implementation_mode in {"full-inference-gated", "full-inference-attempt"}: |
| # Without a video smoke test, do not claim real inference success. |
| status = "full_inference_candidate_health_passed" |
| message = "Space boots and contains inference signals, but no generation smoke test has validated a real video output." |
| else: |
| status = "health_only" |
| message = "Safe scaffold health validation passed. Full inference was not requested." |
| |
| if blocker_detected and not blockers: |
| blockers = { |
| "full_inference_implemented": False, |
| "source": "worker_heuristic_from_PI_SUMMARY_or_app.py", |
| "blockers": [ |
| { |
| "type": "agent_declared_or_detected_blocker", |
| "claim": "Pi-generated artifacts state that full inference is blocked/not implemented or generation returns diagnostics/placeholders.", |
| "evidence": "See PI_SUMMARY.md and app.py in generated artifacts.", |
| "severity": "blocking", |
| } |
| ], |
| "required_investigations_for_next_run": [ |
| "Check whether PyTorch SDPA can replace flash-attn calls.", |
| "Check whether HF Kernels flash-attn2/3/4 can replace required flash-attn APIs.", |
| "Verify whether 2-GPU context parallelism is strictly required or can be reduced to a single-GPU smoke test.", |
| ], |
| } |
| (workspace / "TECHNICAL_BLOCKERS.json").write_text(json.dumps(blockers, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") |
| (run_dir / "generated" / "TECHNICAL_BLOCKERS.json").write_text(json.dumps(blockers, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") |
| |
| gate = { |
| "status": status, |
| "message": message, |
| "implementation_mode": implementation_mode, |
| "blocker_detected": blocker_detected, |
| "implementation_signals": implementation_signals, |
| "validation_method": validation.get("method"), |
| "blockers": blockers, |
| } |
| write_json(run_dir / "inference_gate.json", gate) |
| append_event(events_path, "inference_gate", status, message, gate) |
| return gate |
| |
| |
| def main(): |
| run_id = os.environ["RUN_ID"] |
| hf_username = os.environ.get("HF_USERNAME", "unknown") |
| bucket_source = os.environ.get("BUCKET_SOURCE", "unknown") |
| output_root = Path(os.environ.get("OUTPUT_ROOT", "/output")) |
| target_space_id = os.environ.get("TARGET_SPACE_ID", "") |
| model_id = sanitize_model_id(os.environ.get("MODEL_ID", DEFAULT_MODEL_ID)) |
| pi_model = os.environ.get("PI_MODEL", "Qwen/Qwen3-Coder-Next") |
| preferred_hardware = os.environ.get("PREFERRED_SPACE_HARDWARE", "zero-a10g") |
| fallback_hardware = os.environ.get("FALLBACK_SPACE_HARDWARE", "l40sx1") |
| allow_fixed_gpu_fallback = os.environ.get("ALLOW_FIXED_GPU_FALLBACK", "true").lower() in {"1", "true", "yes", "on"} |
| implementation_mode = os.environ.get("IMPLEMENTATION_MODE", "full-inference-attempt") |
| token = os.environ.get("HF_TOKEN") |
| |
| run_dir = output_root / "runs" / run_id |
| events_path = run_dir / "events.jsonl" |
| state_path = run_dir / "state.json" |
| workspace = Path("/tmp/universal_workspace") |
| |
| append_event(events_path, "bootstrap", "started", "Universal model-card builder worker started", {"model_id": model_id, "target_space_id": target_space_id}) |
| write_json(state_path, {"run_id": run_id, "kind": "universal_model_card_builder", "status": "running", "message": "Attempting Universal model-card builderd Space creation", "model_id": model_id, "target_space": target_space_id, "created_by": hf_username, "bucket_source": bucket_source, "created_at": now(), "updated_at": now()}) |
| if not token: |
| fail(run_dir, events_path, "HF_TOKEN is missing from Job secrets") |
| if not TARGET_RE.match(target_space_id): |
| fail(run_dir, events_path, "Invalid TARGET_SPACE_ID", {"target_space_id": target_space_id}) |
| |
| try: |
| install_python_deps(events_path) |
| from huggingface_hub import HfApi |
| api = HfApi(token=token) |
| whoami = api.whoami(token=token) |
| append_event(events_path, "auth", "success", "Authenticated inside Job", {"whoami_name": whoami.get("name")}) |
| |
| append_event(events_path, "model_analysis", "started", "Fetching model metadata", {"model_id": model_id}) |
| info = api.model_info(model_id, token=token, files_metadata=True) |
| siblings = [getattr(s, "rfilename", "") for s in (info.siblings or [])] |
| analysis = {"model_id": model_id, "pipeline_tag": getattr(info, "pipeline_tag", None), "library_name": getattr(info, "library_name", None), "tags": list(getattr(info, "tags", []) or [])[:100], "siblings": siblings[:160], "default_model_target": model_id == DEFAULT_MODEL_ID, "preferred_hardware": preferred_hardware, "fallback_hardware": fallback_hardware, "allow_fixed_gpu_fallback": allow_fixed_gpu_fallback, "implementation_mode": implementation_mode} |
| write_json(run_dir / "model_analysis.json", analysis) |
| append_event(events_path, "model_analysis", "success", "Model metadata fetched", {"pipeline_tag": analysis["pipeline_tag"], "library_name": analysis["library_name"]}) |
| |
| create_initial_workspace(workspace, model_id, target_space_id, preferred_hardware, fallback_hardware, allow_fixed_gpu_fallback, implementation_mode, analysis) |
| append_event(events_path, "workspace", "success", "Prepared universal model-card workspace", {"files": sorted(p.name for p in workspace.iterdir())}) |
| |
| install_pi(events_path) |
| configure_pi(events_path, pi_model) |
| append_event(events_path, "pi_run", "started", "Running Pi on universal model-card workspace", {"model": pi_model}) |
| code, pi_out = run_cmd(["pi", "-p", (workspace / "GOAL.md").read_text(encoding="utf-8")], cwd=workspace, timeout=2400) |
| (run_dir / "logs").mkdir(parents=True, exist_ok=True) |
| (run_dir / "logs" / "pi_output.txt").write_text(pi_out, encoding="utf-8") |
| if code != 0: |
| append_event(events_path, "pi_run", "failed", "Pi returned a non-zero exit code", {"returncode": code, "output_tail": pi_out[-4000:]}) |
| collect_pi_traces(run_dir, events_path) |
| fail(run_dir, events_path, "Pi failed before Space upload", {"returncode": code, "output_tail": pi_out[-4000:]}) |
| append_event(events_path, "pi_run", "success", "Pi completed universal model-card workspace pass", {"output_tail": pi_out[-2000:]}) |
| if not (workspace / "PI_SUMMARY.md").exists(): |
| (workspace / "PI_SUMMARY.md").write_text("# Pi Summary\n\nPi did not create a PI_SUMMARY.md. See logs/pi_output.txt.\n", encoding="utf-8") |
| |
| app_text = (workspace / "app.py").read_text(encoding="utf-8", errors="ignore") |
| if "/health" not in app_text and "api_name=\"health\"" not in app_text and "api_name='health'" not in app_text: |
| append_event(events_path, "pi_verification", "failed", "app.py does not appear to expose /health; injecting safe health endpoint is not implemented") |
| fail(run_dir, events_path, "Pi output did not preserve a /health endpoint") |
| append_event(events_path, "pi_verification", "success", "Pi output preserved health validation endpoint") |
| |
| append_event(events_path, "create_space", "started", "Creating private target Space", {"target_space": target_space_id}) |
| api.create_repo(repo_id=target_space_id, repo_type="space", space_sdk="gradio", private=True, exist_ok=False, token=token) |
| append_event(events_path, "create_space", "success", "Private target Space created", {"target_space": target_space_id}) |
| |
| # Upload before requesting hardware. Newly created private Spaces may not be |
| # immediately available on the hardware endpoint; uploading first also ensures |
| # the repo has valid Space metadata before any restart is triggered. |
| upload_workspace(api, workspace, target_space_id, token, run_dir, events_path) |
| |
| hardware_attempts = [] |
| preferred = request_hardware(api, target_space_id, preferred_hardware, token, events_path, "hardware_preferred") |
| hardware_attempts.append(preferred) |
| selected_hardware = preferred_hardware if preferred.get("ok") else None |
| if not selected_hardware and allow_fixed_gpu_fallback and fallback_hardware: |
| fallback = request_hardware(api, target_space_id, fallback_hardware, token, events_path, "hardware_fallback") |
| hardware_attempts.append(fallback) |
| selected_hardware = fallback_hardware if fallback.get("ok") else None |
| if not selected_hardware: |
| append_event(events_path, "hardware", "warning", "Could not request preferred/fallback hardware; Space may remain on default CPU", {"attempts": hardware_attempts}) |
| selected_hardware = "default-cpu-or-existing" |
| write_json(run_dir / "hardware_attempts.json", {"selected_hardware": selected_hardware, "attempts": hardware_attempts}) |
| |
| validation = validate_live_api(api, target_space_id, token, run_dir, events_path, timeout_s=1200) |
| inference_gate = infer_generation_gate(workspace, implementation_mode, validation, run_dir, events_path) |
| collect_pi_traces(run_dir, events_path) |
| |
| final_state = { |
| "run_id": run_id, |
| "kind": "universal_model_card_builder", |
| "status": inference_gate["status"], |
| "message": inference_gate["message"], |
| "model_id": model_id, |
| "target_space": target_space_id, |
| "target_space_url": f"https://huggingface.co/spaces/{target_space_id}", |
| "selected_hardware": selected_hardware, |
| "hardware_attempts": hardware_attempts, |
| "validation": validation, |
| "inference_gate": inference_gate, |
| "updated_at": now(), |
| "created_by": hf_username, |
| "bucket_source": bucket_source, |
| } |
| write_json(state_path, final_state) |
| report = f"""# Agentic Space Factory — Universal Model-Card Builder Report |
| |
| Run ID: `{run_id}` |
| |
| Status: **{inference_gate['status']}** |
| |
| {inference_gate['message']} |
| |
| Target Space: https://huggingface.co/spaces/{target_space_id} |
| |
| Model: `{model_id}` |
| |
| ## Hardware |
| |
| Selected/requested hardware: `{selected_hardware}` |
| |
| Hardware changes are best-effort with OAuth. If requests fail with 401/auth/billing errors, set the Space hardware manually and rerun validation. |
| |
| ```json |
| {json.dumps(hardware_attempts, indent=2, ensure_ascii=False)} |
| ``` |
| |
| ## Health validation |
| |
| The wrapper validated the live Space using HTTP `/health` first, with Gradio Client as fallback. This only proves bootability. |
| |
| ```json |
| {json.dumps(validation, indent=2, ensure_ascii=False)} |
| ``` |
| |
| ## Full-inference gate |
| |
| ```json |
| {json.dumps(inference_gate, indent=2, ensure_ascii=False)} |
| ``` |
| |
| ## Pi summary |
| |
| {(workspace / 'PI_SUMMARY.md').read_text(encoding='utf-8', errors='ignore') if (workspace / 'PI_SUMMARY.md').exists() else 'No PI_SUMMARY.md was produced.'} |
| |
| ## Safety |
| |
| - The target Space was created private. |
| - No public publication was attempted. |
| - Raw traces should remain private; redacted traces are stored separately. |
| - If fallback fixed GPU was used or selected manually, review billing/hardware settings manually after the run. |
| """ |
| (run_dir / "report.md").write_text(report, encoding="utf-8") |
| append_event(events_path, "report_write", "success", "Wrote report.md") |
| append_event(events_path, "done", inference_gate["status"], "Universal model-card builder completed", {"target_space": target_space_id, "selected_hardware": selected_hardware, "gate_status": inference_gate["status"]}) |
| except SystemExit: |
| raise |
| except Exception as exc: |
| try: |
| collect_pi_traces(run_dir, events_path) |
| except Exception: |
| pass |
| fail(run_dir, events_path, "Universal model-card builder worker failed", {"error": str(exc)}) |
| |
| |
| if __name__ == "__main__": |
| main() |
| |
| ''' |
|
|
|
|
| def encoded_worker_script() -> str: |
| """Return the base64-encoded Phase 1 hello worker script.""" |
| return _encode(HELLO_WORKER_SCRIPT) |
|
|
|
|
| def encoded_create_space_worker_script() -> str: |
| """Return the base64-encoded Phase 2 private Space creation worker script.""" |
| return _encode(CREATE_SPACE_WORKER_SCRIPT) |
|
|
|
|
| def encoded_pi_space_worker_script() -> str: |
| """Return the base64-encoded Phase 3 Pi smoke worker script.""" |
| return _encode(PI_SPACE_WORKER_SCRIPT) |
|
|
|
|
| def encoded_pi_gist_worker_script() -> str: |
| """Return the base64-encoded Phase 4 Pi gist recipe worker script.""" |
| return _encode(PI_GIST_WORKER_SCRIPT) |
|
|
|
|
| def encoded_pi_model_card_worker_script() -> str: |
| """Return the base64-encoded Phase 5 Pi model-card worker script.""" |
| return _encode(PI_MODEL_CARD_WORKER_SCRIPT) |
|
|
|
|
|
|
| def encoded_runtime_recommender_worker_script() -> str: |
| """Return the base64-encoded Phase 6 runtime recommender worker script.""" |
| return _encode(RUNTIME_RECOMMENDER_WORKER_SCRIPT) |
|
|
|
|
| def encoded_longcat_article_worker_script() -> str: |
| """Return the base64-encoded Phase 9 LongCat article reproduction worker script.""" |
| return _encode(LONGCAT_ARTICLE_WORKER_SCRIPT) |
|
|
|
|
| def encoded_universal_model_card_worker_script() -> str: |
| """Return the base64-encoded Phase 10 universal model-card builder worker script.""" |
| return _encode(UNIVERSAL_MODEL_CARD_WORKER_SCRIPT) |
|
|
| def python_decode_and_run_command() -> list[str]: |
| """Command list for `run_job`. |
| |
| The Job image only needs Python. The script is passed via env as base64 and |
| executed from /tmp, which avoids persisting code or exposing secrets. |
| """ |
| runner = textwrap.dedent( |
| """ |
| import base64, os, pathlib, subprocess, sys |
| script = base64.b64decode(os.environ['WORKER_SCRIPT_B64']).decode('utf-8') |
| path = pathlib.Path('/tmp/space_factory_worker.py') |
| path.write_text(script, encoding='utf-8') |
| raise SystemExit(subprocess.call([sys.executable, str(path)])) |
| """ |
| ).strip() |
| return ["python", "-c", runner] |
|
|