from __future__ import annotations import base64 import textwrap HELLO_WORKER_SCRIPT = r''' import json import os from datetime import datetime, timezone from pathlib import Path def now(): return datetime.now(timezone.utc).isoformat() def write_json(path: Path, payload: dict): path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") def append_event(path: Path, step: str, status: str, message: str, data: dict | None = None): path.parent.mkdir(parents=True, exist_ok=True) event = { "ts": now(), "step": step, "status": status, "message": message, "data": data or {}, } line = json.dumps(event, ensure_ascii=False) with path.open("a", encoding="utf-8") as f: f.write(line + "\n") # Keep HF Job logs useful as well as Bucket events. print(line, flush=True) def main(): run_id = os.environ["RUN_ID"] hf_username = os.environ.get("HF_USERNAME", "unknown") bucket_source = os.environ.get("BUCKET_SOURCE", "unknown") output_root = Path(os.environ.get("OUTPUT_ROOT", "/output")) job_id = os.environ.get("JOB_ID") accelerator = os.environ.get("ACCELERATOR") or "none" cpu_cores = os.environ.get("CPU_CORES") memory = os.environ.get("MEMORY") has_hf_token = bool(os.environ.get("HF_TOKEN")) run_dir = output_root / "runs" / run_id state_path = run_dir / "state.json" events_path = run_dir / "events.jsonl" report_path = run_dir / "report.md" append_event(events_path, "bootstrap", "started", "HF Job started") append_event( events_path, "environment", "success", "Collected non-sensitive job environment metadata", { "job_id": job_id, "accelerator": accelerator, "cpu_cores": cpu_cores, "memory": memory, "has_hf_token": has_hf_token, }, ) state = { "run_id": run_id, "status": "success", "kind": "hello_job", "message": "Hello from HF Job. OAuth → Job → Bucket write succeeded.", "created_at": now(), "updated_at": now(), "created_by": hf_username, "bucket_source": bucket_source, "job_id": job_id, "accelerator": accelerator, "cpu_cores": cpu_cores, "memory": memory, "has_hf_token": has_hf_token, "security_notes": [ "HF_TOKEN was not printed.", "This run does not create or publish any repository.", "The bucket should remain private.", ], } write_json(state_path, state) append_event(events_path, "state_write", "success", "Wrote state.json") report = f"""# Agentic Space Factory — Hello Job Report Run ID: `{run_id}` Status: **success** This first worker validated the critical foundation: ```text OAuth user → HF Job → mounted Storage Bucket → state/events/report write ``` ## Non-sensitive job metadata - Job ID: `{job_id}` - User: `{hf_username}` - Bucket: `{bucket_source}` - Accelerator: `{accelerator}` - CPU cores: `{cpu_cores}` - Memory: `{memory}` - HF token present in job env: `{has_hf_token}` ## Security posture - The token was passed as a secret and was not printed. - This run did not create or modify any Hugging Face repository. - This run did not publish anything publicly. ## Next implementation step The next increment should create a private target Gradio Space and validate it with `gradio_client` before reporting success. """ report_path.write_text(report, encoding="utf-8") append_event(events_path, "report_write", "success", "Wrote report.md") append_event(events_path, "done", "success", "Hello Job completed") if __name__ == "__main__": main() ''' CREATE_SPACE_WORKER_SCRIPT = r''' import json import os import re import subprocess import sys import time from datetime import datetime, timezone from pathlib import Path from textwrap import dedent TARGET_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{1,95}/[A-Za-z0-9][A-Za-z0-9._-]{1,95}$") def now(): return datetime.now(timezone.utc).isoformat() def write_json(path: Path, payload: dict): path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") def append_event(path: Path, step: str, status: str, message: str, data: dict | None = None): path.parent.mkdir(parents=True, exist_ok=True) event = {"ts": now(), "step": step, "status": status, "message": message, "data": data or {}} line = json.dumps(event, ensure_ascii=False) with path.open("a", encoding="utf-8") as f: f.write(line + "\n") # Keep HF Job logs useful as well as Bucket events. print(line, flush=True) def fail(run_dir: Path, events_path: Path, message: str, details: dict | None = None, status: str = "failed"): state_path = run_dir / "state.json" append_event(events_path, "failure", "failed", message, details or {}) write_json(state_path, { "run_id": os.environ.get("RUN_ID"), "kind": "create_private_space", "status": status, "message": message, "updated_at": now(), "details": details or {}, }) report = f"""# Agentic Space Factory — Private Space Creation Report Status: **{status}** {message} ```json {json.dumps(details or {}, indent=2, ensure_ascii=False)} ``` """ (run_dir / "report.md").write_text(report, encoding="utf-8") raise SystemExit(1) def pip_install(events_path: Path): append_event(events_path, "dependencies", "started", "Installing worker dependencies") cmd = [sys.executable, "-m", "pip", "install", "-q", "--upgrade", "huggingface_hub>=1.0.0", "gradio_client>=2.0.0"] result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if result.returncode != 0: append_event(events_path, "dependencies", "failed", "Dependency installation failed", {"output_tail": result.stdout[-4000:]}) raise RuntimeError(result.stdout) append_event(events_path, "dependencies", "success", "Worker dependencies installed") def target_files(target_space_id: str) -> dict[str, str]: app_py = dedent(f""" import gradio as gr def greet(name: str) -> str: name = (name or "friend").strip() or "friend" return f"Hello {{name}} — this private Space was generated by Agentic Space Factory." demo = gr.Interface( fn=greet, inputs=gr.Textbox(label="Name", value="Hugging Face"), outputs=gr.Textbox(label="Result"), title="Generated private Space", description="A minimal Gradio Space created by an HF Job, then validated through the live Gradio API.", examples=[["Hugging Face"], ["Agentic Space Factory"]], ) if __name__ == "__main__": demo.launch() """).strip() + "\n" readme = dedent(f""" --- title: Generated Private Space emoji: 🧪 colorFrom: blue colorTo: purple sdk: gradio app_file: app.py python_version: "3.11" pinned: false --- # Generated Private Space This private Space was generated by **Agentic Space Factory**. Target repo: `{target_space_id}` This Phase 2 version intentionally creates only a safe hello-world Gradio app. Later phases will add Pi, model-card analysis, ZeroGPU templates, and automatic repair. """).strip() + "\n" requirements = "gradio>=5.0.0\n" return {"app.py": app_py, "README.md": readme, "requirements.txt": requirements} def save_generated_files(run_dir: Path, files: dict[str, str]): generated_dir = run_dir / "generated" generated_dir.mkdir(parents=True, exist_ok=True) for filename, content in files.items(): (generated_dir / filename).write_text(content, encoding="utf-8") def create_and_upload_space(api, token: str, target_space_id: str, files: dict[str, str], events_path: Path): append_event(events_path, "create_space", "started", f"Creating private target Space {target_space_id}") try: api.create_repo( repo_id=target_space_id, repo_type="space", space_sdk="gradio", private=True, exist_ok=False, token=token, ) append_event(events_path, "create_space", "success", "Private target Space created", {"target_space": target_space_id}) except Exception as exc: # If it already exists, fail safely instead of overwriting user resources unexpectedly. append_event(events_path, "create_space", "failed", "Could not create target Space", {"error": str(exc)}) raise append_event(events_path, "upload_files", "started", "Uploading generated files to target Space") for path_in_repo, content in files.items(): api.upload_file( path_or_fileobj=content.encode("utf-8"), path_in_repo=path_in_repo, repo_id=target_space_id, repo_type="space", token=token, ) append_event(events_path, "upload_files", "success", f"Uploaded {path_in_repo}") def make_gradio_client(target_space_id: str, token: str): """Create a Gradio Client across gradio_client versions. gradio_client 2.x uses `token=...`; older/newer docs often mention `hf_token=...`; some versions expose `api_key` or `headers`. Using signature introspection prevents a permanent wait loop on a TypeError. """ import inspect from gradio_client import Client params = inspect.signature(Client).parameters if "token" in params: return Client(target_space_id, token=token) if "hf_token" in params: return Client(target_space_id, hf_token=token) if "api_key" in params: return Client(target_space_id, api_key=token) if "headers" in params: return Client(target_space_id, headers={"Authorization": f"Bearer {token}"}) # Last-resort fallback: if the process is logged in via HF_TOKEN/HF CLI, # some client versions can pick credentials from the environment/cache. return Client(target_space_id) def get_api_schema(client): try: return client.view_api(return_format="dict") except TypeError: return client.view_api() def extract_api_names(api_schema) -> list[str]: """Best-effort extraction across gradio_client schema formats. Gradio/Gradio Client versions differ: an Interface can expose `/predict`, `/greet`, or another named endpoint. For the generated hello app the live Job logs show `/greet`, so validation must discover endpoints instead of hardcoding `/predict`. """ names: list[str] = [] def add(value): if not value or not isinstance(value, str): return name = value if value.startswith("/") else f"/{value}" if name not in names: names.append(name) def walk(obj): if isinstance(obj, dict): for key, value in obj.items(): if key in {"api_name", "apiName"}: add(value) # Some schemas use endpoint paths as keys, for example `/greet`. if isinstance(key, str) and key.startswith("/"): add(key) walk(value) elif isinstance(obj, list): for item in obj: walk(item) walk(api_schema) return names def predict_with_available_endpoint(client, api_schema, value: str): candidates = extract_api_names(api_schema) for fallback in ["/greet", "/predict"]: if fallback not in candidates: candidates.append(fallback) errors = [] for api_name in candidates: try: return api_name, client.predict(value, api_name=api_name) except Exception as exc: errors.append({"api_name": api_name, "error": str(exc)[-500:]}) # Last fallback for old/simple gradio_client versions where api_name may be optional. try: return None, client.predict(value) except Exception as exc: errors.append({"api_name": None, "error": str(exc)[-500:]}) raise RuntimeError(f"No candidate Gradio endpoint worked: {json.dumps(errors, ensure_ascii=False)}") def validate_live_api(target_space_id: str, token: str, events_path: Path, tests_dir: Path, timeout_seconds: int = 360): tests_dir.mkdir(parents=True, exist_ok=True) deadline = time.time() + timeout_seconds last_error = None attempt = 0 append_event(events_path, "api_validation", "started", "Waiting for live Gradio API to become available") while time.time() < deadline: attempt += 1 try: client = make_gradio_client(target_space_id, token) api_schema = get_api_schema(client) api_names = extract_api_names(api_schema) write_json(tests_dir / "api_schema.json", {"schema": api_schema, "api_names": api_names}) used_api_name, result = predict_with_available_endpoint(client, api_schema, "Agentic Space Factory") result_text = str(result) ok = "Agentic Space Factory" in result_text and "Hello" in result_text payload = { "attempt": attempt, "target_space": target_space_id, "api_test_passed": ok, "api_name": used_api_name, "discovered_api_names": api_names, "result": result_text, "validated_at": now(), } write_json(tests_dir / "test_result.json", payload) if ok: append_event( events_path, "api_validation", "success", "Live Gradio API test passed", {"attempt": attempt, "api_name": used_api_name, "discovered_api_names": api_names}, ) return payload last_error = f"Unexpected API result from {used_api_name}: {result_text}" except Exception as exc: last_error = str(exc) append_event(events_path, "api_validation", "waiting", "Live API not ready yet", {"attempt": attempt, "error": last_error[-1000:]}) time.sleep(20) payload = { "target_space": target_space_id, "api_test_passed": False, "error": last_error, "validated_at": now(), } write_json(tests_dir / "test_result.json", payload) raise RuntimeError(f"Live API validation did not pass before timeout: {last_error}") def main(): run_id = os.environ["RUN_ID"] hf_username = os.environ.get("HF_USERNAME", "unknown") bucket_source = os.environ.get("BUCKET_SOURCE", "unknown") output_root = Path(os.environ.get("OUTPUT_ROOT", "/output")) target_space_id = os.environ["TARGET_SPACE_ID"] token = os.environ.get("HF_TOKEN") run_dir = output_root / "runs" / run_id events_path = run_dir / "events.jsonl" state_path = run_dir / "state.json" report_path = run_dir / "report.md" target_json_path = run_dir / "target_space.json" append_event(events_path, "bootstrap", "started", "Private Space creation worker started") write_json(state_path, { "run_id": run_id, "kind": "create_private_space", "status": "running", "message": "Creating private target Space", "target_space": target_space_id, "created_by": hf_username, "bucket_source": bucket_source, "created_at": now(), "updated_at": now(), }) if not token: fail(run_dir, events_path, "HF_TOKEN is missing from Job secrets") if not TARGET_RE.match(target_space_id): fail(run_dir, events_path, "Invalid TARGET_SPACE_ID", {"target_space": target_space_id}) if not target_space_id.startswith(f"{hf_username}/"): fail(run_dir, events_path, "For Phase 2, target Space must be in the signed-in user's namespace", {"target_space": target_space_id, "username": hf_username}) try: pip_install(events_path) from huggingface_hub import HfApi api = HfApi(token=token) whoami = api.whoami(token=token) append_event(events_path, "auth", "success", "Authenticated inside Job", {"whoami_name": whoami.get("name")}) files = target_files(target_space_id) save_generated_files(run_dir, files) append_event(events_path, "generate_files", "success", "Generated minimal Gradio Space files", {"files": list(files)}) create_and_upload_space(api, token, target_space_id, files, events_path) write_json(target_json_path, { "target_space": target_space_id, "url": f"https://huggingface.co/spaces/{target_space_id}", "private": True, "sdk": "gradio", "created_by": hf_username, }) validation = validate_live_api(target_space_id, token, events_path, run_dir / "tests") final_state = { "run_id": run_id, "kind": "create_private_space", "status": "success", "message": "Private Gradio Space created and validated through the live API.", "target_space": target_space_id, "target_space_url": f"https://huggingface.co/spaces/{target_space_id}", "created_by": hf_username, "bucket_source": bucket_source, "validation": validation, "updated_at": now(), "security_notes": [ "The target Space was created as private.", "The HF token was not printed or written to report files.", "Success was declared only after a live Gradio API test passed.", ], } write_json(state_path, final_state) report = f"""# Agentic Space Factory — Private Space Creation Report Run ID: `{run_id}` Status: **success** Created private Space: [`{target_space_id}`](https://huggingface.co/spaces/{target_space_id}) ## What happened ```text OAuth user → HF Job → private Space creation → file upload → live Gradio API validation → Bucket report ``` ## Generated files - `app.py` - `requirements.txt` - `README.md` Copies are stored in: ```text runs/{run_id}/generated/ ``` ## Live API validation ```json {json.dumps(validation, indent=2, ensure_ascii=False)} ``` ## Security posture - The target Space was created as private. - No token was printed or intentionally persisted. - Success was declared only after the live Gradio API returned the expected output. ## Next step Phase 3 should introduce Pi inside the Job and ask it to modify/repair this simple Space while preserving the live API validation gate. """ report_path.write_text(report, encoding="utf-8") append_event(events_path, "report_write", "success", "Wrote report.md") append_event(events_path, "done", "success", "Private Space creation worker completed") except Exception as exc: fail(run_dir, events_path, "Private Space creation worker failed", {"error": str(exc)}) if __name__ == "__main__": main() ''' def _encode(script: str) -> str: return base64.b64encode(script.encode("utf-8")).decode("ascii") def encoded_worker_script() -> str: """Return the base64-encoded Phase 1 hello worker script.""" return _encode(HELLO_WORKER_SCRIPT) def encoded_create_space_worker_script() -> str: """Return the base64-encoded Phase 2 private Space creation worker script.""" return _encode(CREATE_SPACE_WORKER_SCRIPT) def python_decode_and_run_command() -> list[str]: """Command list for `run_job`. The Job image only needs Python. The script is passed via env as base64 and executed from /tmp, which avoids persisting code or exposing secrets. """ runner = textwrap.dedent( """ import base64, os, pathlib, subprocess, sys script = base64.b64decode(os.environ['WORKER_SCRIPT_B64']).decode('utf-8') path = pathlib.Path('/tmp/space_factory_worker.py') path.write_text(script, encoding='utf-8') raise SystemExit(subprocess.call([sys.executable, str(path)])) """ ).strip() return ["python", "-c", runner]