agentic-space-factory-etheroi / src /worker_payload.py
fffiloni's picture
Upload 6 files
47bf6fa verified
Raw
History Blame
20.3 kB
from __future__ import annotations
import base64
import textwrap
HELLO_WORKER_SCRIPT = r'''
import json
import os
from datetime import datetime, timezone
from pathlib import Path
def now():
return datetime.now(timezone.utc).isoformat()
def write_json(path: Path, payload: dict):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
def append_event(path: Path, step: str, status: str, message: str, data: dict | None = None):
path.parent.mkdir(parents=True, exist_ok=True)
event = {
"ts": now(),
"step": step,
"status": status,
"message": message,
"data": data or {},
}
line = json.dumps(event, ensure_ascii=False)
with path.open("a", encoding="utf-8") as f:
f.write(line + "\n")
# Keep HF Job logs useful as well as Bucket events.
print(line, flush=True)
def main():
run_id = os.environ["RUN_ID"]
hf_username = os.environ.get("HF_USERNAME", "unknown")
bucket_source = os.environ.get("BUCKET_SOURCE", "unknown")
output_root = Path(os.environ.get("OUTPUT_ROOT", "/output"))
job_id = os.environ.get("JOB_ID")
accelerator = os.environ.get("ACCELERATOR") or "none"
cpu_cores = os.environ.get("CPU_CORES")
memory = os.environ.get("MEMORY")
has_hf_token = bool(os.environ.get("HF_TOKEN"))
run_dir = output_root / "runs" / run_id
state_path = run_dir / "state.json"
events_path = run_dir / "events.jsonl"
report_path = run_dir / "report.md"
append_event(events_path, "bootstrap", "started", "HF Job started")
append_event(
events_path,
"environment",
"success",
"Collected non-sensitive job environment metadata",
{
"job_id": job_id,
"accelerator": accelerator,
"cpu_cores": cpu_cores,
"memory": memory,
"has_hf_token": has_hf_token,
},
)
state = {
"run_id": run_id,
"status": "success",
"kind": "hello_job",
"message": "Hello from HF Job. OAuth → Job → Bucket write succeeded.",
"created_at": now(),
"updated_at": now(),
"created_by": hf_username,
"bucket_source": bucket_source,
"job_id": job_id,
"accelerator": accelerator,
"cpu_cores": cpu_cores,
"memory": memory,
"has_hf_token": has_hf_token,
"security_notes": [
"HF_TOKEN was not printed.",
"This run does not create or publish any repository.",
"The bucket should remain private.",
],
}
write_json(state_path, state)
append_event(events_path, "state_write", "success", "Wrote state.json")
report = f"""# Agentic Space Factory — Hello Job Report
Run ID: `{run_id}`
Status: **success**
This first worker validated the critical foundation:
```text
OAuth user → HF Job → mounted Storage Bucket → state/events/report write
```
## Non-sensitive job metadata
- Job ID: `{job_id}`
- User: `{hf_username}`
- Bucket: `{bucket_source}`
- Accelerator: `{accelerator}`
- CPU cores: `{cpu_cores}`
- Memory: `{memory}`
- HF token present in job env: `{has_hf_token}`
## Security posture
- The token was passed as a secret and was not printed.
- This run did not create or modify any Hugging Face repository.
- This run did not publish anything publicly.
## Next implementation step
The next increment should create a private target Gradio Space and validate it with `gradio_client` before reporting success.
"""
report_path.write_text(report, encoding="utf-8")
append_event(events_path, "report_write", "success", "Wrote report.md")
append_event(events_path, "done", "success", "Hello Job completed")
if __name__ == "__main__":
main()
'''
CREATE_SPACE_WORKER_SCRIPT = r'''
import json
import os
import re
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from textwrap import dedent
TARGET_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{1,95}/[A-Za-z0-9][A-Za-z0-9._-]{1,95}$")
def now():
return datetime.now(timezone.utc).isoformat()
def write_json(path: Path, payload: dict):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
def append_event(path: Path, step: str, status: str, message: str, data: dict | None = None):
path.parent.mkdir(parents=True, exist_ok=True)
event = {"ts": now(), "step": step, "status": status, "message": message, "data": data or {}}
line = json.dumps(event, ensure_ascii=False)
with path.open("a", encoding="utf-8") as f:
f.write(line + "\n")
# Keep HF Job logs useful as well as Bucket events.
print(line, flush=True)
def fail(run_dir: Path, events_path: Path, message: str, details: dict | None = None, status: str = "failed"):
state_path = run_dir / "state.json"
append_event(events_path, "failure", "failed", message, details or {})
write_json(state_path, {
"run_id": os.environ.get("RUN_ID"),
"kind": "create_private_space",
"status": status,
"message": message,
"updated_at": now(),
"details": details or {},
})
report = f"""# Agentic Space Factory — Private Space Creation Report
Status: **{status}**
{message}
```json
{json.dumps(details or {}, indent=2, ensure_ascii=False)}
```
"""
(run_dir / "report.md").write_text(report, encoding="utf-8")
raise SystemExit(1)
def pip_install(events_path: Path):
append_event(events_path, "dependencies", "started", "Installing worker dependencies")
cmd = [sys.executable, "-m", "pip", "install", "-q", "--upgrade", "huggingface_hub>=1.0.0", "gradio_client>=2.0.0"]
result = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if result.returncode != 0:
append_event(events_path, "dependencies", "failed", "Dependency installation failed", {"output_tail": result.stdout[-4000:]})
raise RuntimeError(result.stdout)
append_event(events_path, "dependencies", "success", "Worker dependencies installed")
def target_files(target_space_id: str) -> dict[str, str]:
app_py = dedent(f"""
import gradio as gr
def greet(name: str) -> str:
name = (name or "friend").strip() or "friend"
return f"Hello {{name}} — this private Space was generated by Agentic Space Factory."
demo = gr.Interface(
fn=greet,
inputs=gr.Textbox(label="Name", value="Hugging Face"),
outputs=gr.Textbox(label="Result"),
title="Generated private Space",
description="A minimal Gradio Space created by an HF Job, then validated through the live Gradio API.",
examples=[["Hugging Face"], ["Agentic Space Factory"]],
)
if __name__ == "__main__":
demo.launch()
""").strip() + "\n"
readme = dedent(f"""
---
title: Generated Private Space
emoji: 🧪
colorFrom: blue
colorTo: purple
sdk: gradio
app_file: app.py
python_version: "3.11"
pinned: false
---
# Generated Private Space
This private Space was generated by **Agentic Space Factory**.
Target repo: `{target_space_id}`
This Phase 2 version intentionally creates only a safe hello-world Gradio app.
Later phases will add Pi, model-card analysis, ZeroGPU templates, and automatic repair.
""").strip() + "\n"
requirements = "gradio>=5.0.0\n"
return {"app.py": app_py, "README.md": readme, "requirements.txt": requirements}
def save_generated_files(run_dir: Path, files: dict[str, str]):
generated_dir = run_dir / "generated"
generated_dir.mkdir(parents=True, exist_ok=True)
for filename, content in files.items():
(generated_dir / filename).write_text(content, encoding="utf-8")
def create_and_upload_space(api, token: str, target_space_id: str, files: dict[str, str], events_path: Path):
append_event(events_path, "create_space", "started", f"Creating private target Space {target_space_id}")
try:
api.create_repo(
repo_id=target_space_id,
repo_type="space",
space_sdk="gradio",
private=True,
exist_ok=False,
token=token,
)
append_event(events_path, "create_space", "success", "Private target Space created", {"target_space": target_space_id})
except Exception as exc:
# If it already exists, fail safely instead of overwriting user resources unexpectedly.
append_event(events_path, "create_space", "failed", "Could not create target Space", {"error": str(exc)})
raise
append_event(events_path, "upload_files", "started", "Uploading generated files to target Space")
for path_in_repo, content in files.items():
api.upload_file(
path_or_fileobj=content.encode("utf-8"),
path_in_repo=path_in_repo,
repo_id=target_space_id,
repo_type="space",
token=token,
)
append_event(events_path, "upload_files", "success", f"Uploaded {path_in_repo}")
def make_gradio_client(target_space_id: str, token: str):
"""Create a Gradio Client across gradio_client versions.
gradio_client 2.x uses `token=...`; older/newer docs often mention
`hf_token=...`; some versions expose `api_key` or `headers`. Using
signature introspection prevents a permanent wait loop on a TypeError.
"""
import inspect
from gradio_client import Client
params = inspect.signature(Client).parameters
if "token" in params:
return Client(target_space_id, token=token)
if "hf_token" in params:
return Client(target_space_id, hf_token=token)
if "api_key" in params:
return Client(target_space_id, api_key=token)
if "headers" in params:
return Client(target_space_id, headers={"Authorization": f"Bearer {token}"})
# Last-resort fallback: if the process is logged in via HF_TOKEN/HF CLI,
# some client versions can pick credentials from the environment/cache.
return Client(target_space_id)
def get_api_schema(client):
try:
return client.view_api(return_format="dict")
except TypeError:
return client.view_api()
def extract_api_names(api_schema) -> list[str]:
"""Best-effort extraction across gradio_client schema formats.
Gradio/Gradio Client versions differ: an Interface can expose `/predict`,
`/greet`, or another named endpoint. For the generated hello app the live
Job logs show `/greet`, so validation must discover endpoints instead of
hardcoding `/predict`.
"""
names: list[str] = []
def add(value):
if not value or not isinstance(value, str):
return
name = value if value.startswith("/") else f"/{value}"
if name not in names:
names.append(name)
def walk(obj):
if isinstance(obj, dict):
for key, value in obj.items():
if key in {"api_name", "apiName"}:
add(value)
# Some schemas use endpoint paths as keys, for example `/greet`.
if isinstance(key, str) and key.startswith("/"):
add(key)
walk(value)
elif isinstance(obj, list):
for item in obj:
walk(item)
walk(api_schema)
return names
def predict_with_available_endpoint(client, api_schema, value: str):
candidates = extract_api_names(api_schema)
for fallback in ["/greet", "/predict"]:
if fallback not in candidates:
candidates.append(fallback)
errors = []
for api_name in candidates:
try:
return api_name, client.predict(value, api_name=api_name)
except Exception as exc:
errors.append({"api_name": api_name, "error": str(exc)[-500:]})
# Last fallback for old/simple gradio_client versions where api_name may be optional.
try:
return None, client.predict(value)
except Exception as exc:
errors.append({"api_name": None, "error": str(exc)[-500:]})
raise RuntimeError(f"No candidate Gradio endpoint worked: {json.dumps(errors, ensure_ascii=False)}")
def validate_live_api(target_space_id: str, token: str, events_path: Path, tests_dir: Path, timeout_seconds: int = 360):
tests_dir.mkdir(parents=True, exist_ok=True)
deadline = time.time() + timeout_seconds
last_error = None
attempt = 0
append_event(events_path, "api_validation", "started", "Waiting for live Gradio API to become available")
while time.time() < deadline:
attempt += 1
try:
client = make_gradio_client(target_space_id, token)
api_schema = get_api_schema(client)
api_names = extract_api_names(api_schema)
write_json(tests_dir / "api_schema.json", {"schema": api_schema, "api_names": api_names})
used_api_name, result = predict_with_available_endpoint(client, api_schema, "Agentic Space Factory")
result_text = str(result)
ok = "Agentic Space Factory" in result_text and "Hello" in result_text
payload = {
"attempt": attempt,
"target_space": target_space_id,
"api_test_passed": ok,
"api_name": used_api_name,
"discovered_api_names": api_names,
"result": result_text,
"validated_at": now(),
}
write_json(tests_dir / "test_result.json", payload)
if ok:
append_event(
events_path,
"api_validation",
"success",
"Live Gradio API test passed",
{"attempt": attempt, "api_name": used_api_name, "discovered_api_names": api_names},
)
return payload
last_error = f"Unexpected API result from {used_api_name}: {result_text}"
except Exception as exc:
last_error = str(exc)
append_event(events_path, "api_validation", "waiting", "Live API not ready yet", {"attempt": attempt, "error": last_error[-1000:]})
time.sleep(20)
payload = {
"target_space": target_space_id,
"api_test_passed": False,
"error": last_error,
"validated_at": now(),
}
write_json(tests_dir / "test_result.json", payload)
raise RuntimeError(f"Live API validation did not pass before timeout: {last_error}")
def main():
run_id = os.environ["RUN_ID"]
hf_username = os.environ.get("HF_USERNAME", "unknown")
bucket_source = os.environ.get("BUCKET_SOURCE", "unknown")
output_root = Path(os.environ.get("OUTPUT_ROOT", "/output"))
target_space_id = os.environ["TARGET_SPACE_ID"]
token = os.environ.get("HF_TOKEN")
run_dir = output_root / "runs" / run_id
events_path = run_dir / "events.jsonl"
state_path = run_dir / "state.json"
report_path = run_dir / "report.md"
target_json_path = run_dir / "target_space.json"
append_event(events_path, "bootstrap", "started", "Private Space creation worker started")
write_json(state_path, {
"run_id": run_id,
"kind": "create_private_space",
"status": "running",
"message": "Creating private target Space",
"target_space": target_space_id,
"created_by": hf_username,
"bucket_source": bucket_source,
"created_at": now(),
"updated_at": now(),
})
if not token:
fail(run_dir, events_path, "HF_TOKEN is missing from Job secrets")
if not TARGET_RE.match(target_space_id):
fail(run_dir, events_path, "Invalid TARGET_SPACE_ID", {"target_space": target_space_id})
if not target_space_id.startswith(f"{hf_username}/"):
fail(run_dir, events_path, "For Phase 2, target Space must be in the signed-in user's namespace", {"target_space": target_space_id, "username": hf_username})
try:
pip_install(events_path)
from huggingface_hub import HfApi
api = HfApi(token=token)
whoami = api.whoami(token=token)
append_event(events_path, "auth", "success", "Authenticated inside Job", {"whoami_name": whoami.get("name")})
files = target_files(target_space_id)
save_generated_files(run_dir, files)
append_event(events_path, "generate_files", "success", "Generated minimal Gradio Space files", {"files": list(files)})
create_and_upload_space(api, token, target_space_id, files, events_path)
write_json(target_json_path, {
"target_space": target_space_id,
"url": f"https://huggingface.co/spaces/{target_space_id}",
"private": True,
"sdk": "gradio",
"created_by": hf_username,
})
validation = validate_live_api(target_space_id, token, events_path, run_dir / "tests")
final_state = {
"run_id": run_id,
"kind": "create_private_space",
"status": "success",
"message": "Private Gradio Space created and validated through the live API.",
"target_space": target_space_id,
"target_space_url": f"https://huggingface.co/spaces/{target_space_id}",
"created_by": hf_username,
"bucket_source": bucket_source,
"validation": validation,
"updated_at": now(),
"security_notes": [
"The target Space was created as private.",
"The HF token was not printed or written to report files.",
"Success was declared only after a live Gradio API test passed.",
],
}
write_json(state_path, final_state)
report = f"""# Agentic Space Factory — Private Space Creation Report
Run ID: `{run_id}`
Status: **success**
Created private Space: [`{target_space_id}`](https://huggingface.co/spaces/{target_space_id})
## What happened
```text
OAuth user → HF Job → private Space creation → file upload → live Gradio API validation → Bucket report
```
## Generated files
- `app.py`
- `requirements.txt`
- `README.md`
Copies are stored in:
```text
runs/{run_id}/generated/
```
## Live API validation
```json
{json.dumps(validation, indent=2, ensure_ascii=False)}
```
## Security posture
- The target Space was created as private.
- No token was printed or intentionally persisted.
- Success was declared only after the live Gradio API returned the expected output.
## Next step
Phase 3 should introduce Pi inside the Job and ask it to modify/repair this simple Space while preserving the live API validation gate.
"""
report_path.write_text(report, encoding="utf-8")
append_event(events_path, "report_write", "success", "Wrote report.md")
append_event(events_path, "done", "success", "Private Space creation worker completed")
except Exception as exc:
fail(run_dir, events_path, "Private Space creation worker failed", {"error": str(exc)})
if __name__ == "__main__":
main()
'''
def _encode(script: str) -> str:
return base64.b64encode(script.encode("utf-8")).decode("ascii")
def encoded_worker_script() -> str:
"""Return the base64-encoded Phase 1 hello worker script."""
return _encode(HELLO_WORKER_SCRIPT)
def encoded_create_space_worker_script() -> str:
"""Return the base64-encoded Phase 2 private Space creation worker script."""
return _encode(CREATE_SPACE_WORKER_SCRIPT)
def python_decode_and_run_command() -> list[str]:
"""Command list for `run_job`.
The Job image only needs Python. The script is passed via env as base64 and
executed from /tmp, which avoids persisting code or exposing secrets.
"""
runner = textwrap.dedent(
"""
import base64, os, pathlib, subprocess, sys
script = base64.b64decode(os.environ['WORKER_SCRIPT_B64']).decode('utf-8')
path = pathlib.Path('/tmp/space_factory_worker.py')
path.write_text(script, encoding='utf-8')
raise SystemExit(subprocess.call([sys.executable, str(path)]))
"""
).strip()
return ["python", "-c", runner]