fffiloni commited on
Commit
99a2759
Β·
verified Β·
1 Parent(s): 7ff46f1

Upload worker_payload.py

Browse files
Files changed (1) hide show
  1. src/worker_payload.py +1 -1
src/worker_payload.py CHANGED
@@ -566,7 +566,7 @@ def _encode(script: str) -> str:
566
 
567
 
568
 
569
- PI_MODEL_CARD_WORKER_SCRIPT = 'import json\nimport os\nimport re\nimport shutil\nimport subprocess\nimport sys\nimport time\nfrom datetime import datetime, timezone\nfrom pathlib import Path\nfrom textwrap import dedent\n\n\nTARGET_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{1,95}/[A-Za-z0-9][A-Za-z0-9._-]{1,95}$")\nSUPPORTED_TASKS = {"text-generation", "text2text-generation", "fill-mask", "text-classification", "sentiment-analysis"}\n\n\ndef now():\n return datetime.now(timezone.utc).isoformat()\n\n\ndef write_json(path: Path, payload: dict):\n path.parent.mkdir(parents=True, exist_ok=True)\n path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\\n", encoding="utf-8")\n\n\ndef append_event(path: Path, step: str, status: str, message: str, data: dict | None = None):\n path.parent.mkdir(parents=True, exist_ok=True)\n event = {"ts": now(), "step": step, "status": status, "message": message, "data": data or {}}\n line = json.dumps(event, ensure_ascii=False)\n with path.open("a", encoding="utf-8") as f:\n f.write(line + "\\n")\n print(line, flush=True)\n\n\ndef redact_text(text: str | None) -> str:\n if not text:\n return ""\n value = text\n for secret_name in ["HF_TOKEN", "HUGGING_FACE_HUB_TOKEN"]:\n secret = os.environ.get(secret_name)\n if secret:\n value = value.replace(secret, "[REDACTED]")\n value = re.sub(r"Bearer\\s+[A-Za-z0-9_\\-.=]+", "Bearer [REDACTED]", value)\n value = re.sub(r"hf_[A-Za-z0-9_\\-]{10,}", "hf_[REDACTED]", value)\n return value\n\n\ndef safe_details(details: dict | None) -> dict:\n if not details:\n return {}\n try:\n return json.loads(redact_text(json.dumps(details, ensure_ascii=False)))\n except Exception:\n return {"redacted_details": redact_text(str(details))[-4000:]}\n\n\ndef fail(run_dir: Path, events_path: Path, message: str, details: dict | None = None, status: str = "failed"):\n safe = safe_details(details)\n append_event(events_path, "failure", "failed", message, safe)\n write_json(run_dir / "state.json", {\n "run_id": os.environ.get("RUN_ID"),\n "kind": "pi_model_card",\n "status": status,\n "message": message,\n "updated_at": now(),\n "details": safe,\n })\n report = f"""# Agentic Space Factory β€” Model Card Space Report\n\nStatus: **{status}**\n\n{message}\n\n```json\n{json.dumps(safe, indent=2, ensure_ascii=False)}\n```\n"""\n (run_dir / "report.md").write_text(report, encoding="utf-8")\n raise SystemExit(1)\n\n\ndef run_cmd(cmd: list[str], *, cwd: Path | None = None, env: dict | None = None, timeout: int = 600):\n result = subprocess.run(cmd, cwd=str(cwd) if cwd else None, env=env, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=timeout)\n return result.returncode, redact_text(result.stdout)\n\n\ndef install_python_deps(events_path: Path):\n append_event(events_path, "dependencies", "started", "Installing Python worker dependencies")\n code, out = run_cmd([sys.executable, "-m", "pip", "install", "-q", "--upgrade", "huggingface_hub>=1.0.0", "gradio_client>=2.0.0"], timeout=600)\n if code != 0:\n append_event(events_path, "dependencies", "failed", "Python dependency installation failed", {"output_tail": out[-4000:]})\n raise RuntimeError(out)\n append_event(events_path, "dependencies", "success", "Python worker dependencies installed")\n\n\ndef ensure_node(events_path: Path):\n node = shutil.which("node")\n npm = shutil.which("npm")\n if node and npm:\n _, node_v = run_cmd([node, "--version"], timeout=30)\n _, npm_v = run_cmd([npm, "--version"], timeout=30)\n append_event(events_path, "node", "success", "Node/npm already available", {"node": node_v.strip(), "npm": npm_v.strip()})\n return\n append_event(events_path, "node", "started", "Installing nodejs/npm through apt-get")\n code, out = run_cmd(["bash", "-lc", "apt-get update -qq && apt-get install -y -qq nodejs npm"], timeout=600)\n if code != 0:\n append_event(events_path, "node", "failed", "Could not install nodejs/npm", {"output_tail": out[-4000:]})\n raise RuntimeError(out)\n append_event(events_path, "node", "success", "Installed nodejs/npm")\n\n\ndef install_pi(events_path: Path):\n ensure_node(events_path)\n append_event(events_path, "pi_install", "started", "Installing Pi coding agent from npm")\n code, out = run_cmd(["npm", "install", "-g", "@mariozechner/pi-coding-agent"], timeout=900)\n if code != 0:\n append_event(events_path, "pi_install", "failed", "Pi npm installation failed", {"output_tail": out[-4000:]})\n raise RuntimeError(out)\n code, version = run_cmd(["pi", "--version"], timeout=60)\n append_event(events_path, "pi_install", "success", "Pi installed", {"version_output": version.strip()[-300:]})\n\n\ndef configure_pi(events_path: Path, model: str):\n pi_dir = Path.home() / ".pi" / "agent"\n pi_dir.mkdir(parents=True, exist_ok=True)\n (pi_dir / "auth.json").write_text(json.dumps({"huggingface": {"type": "api_key", "key": os.environ.get("HF_TOKEN", "")}}, indent=2), encoding="utf-8")\n (pi_dir / "settings.json").write_text(json.dumps({"model": model, "provider": "huggingface", "autoRun": True, "autoApply": True}, indent=2), encoding="utf-8")\n append_event(events_path, "pi_config", "success", "Configured Pi", {"model": model})\n\n\ndef sanitize_model_id(model_id: str) -> str:\n model_id = (model_id or "").strip().replace("https://huggingface.co/", "")\n model_id = model_id.split("?", 1)[0].strip("/")\n if not re.match(r"^[A-Za-z0-9_.-]+/[A-Za-z0-9_.-]+$", model_id):\n raise ValueError("MODEL_ID must look like owner/model-name")\n return model_id\n\n\ndef analyze_model(model_id: str, token: str, run_dir: Path, events_path: Path) -> dict:\n from huggingface_hub import HfApi, hf_hub_download\n append_event(events_path, "model_analysis", "started", "Fetching model metadata", {"model_id": model_id})\n api = HfApi(token=token)\n info = api.model_info(model_id, token=token, files_metadata=False)\n siblings = [getattr(s, "rfilename", "") for s in (info.siblings or [])]\n pipeline_tag = getattr(info, "pipeline_tag", None)\n library_name = getattr(info, "library_name", None)\n tags = list(getattr(info, "tags", []) or [])\n readme_excerpt = ""\n try:\n readme_path = hf_hub_download(repo_id=model_id, filename="README.md", token=token)\n readme_text = Path(readme_path).read_text(encoding="utf-8", errors="ignore")\n readme_excerpt = readme_text[:6000]\n except Exception as exc:\n readme_excerpt = f"Could not download README.md: {exc}"\n task = pipeline_tag or "text-generation"\n if task == "sentiment-analysis":\n task = "text-classification"\n supported = task in SUPPORTED_TASKS\n analysis = {\n "model_id": model_id,\n "pipeline_tag": pipeline_tag,\n "library_name": library_name,\n "tags": tags[:80],\n "siblings": siblings[:120],\n "selected_task": task,\n "template": "transformers_text_pipeline" if supported else "unsupported",\n "supported": supported,\n "confidence": 0.8 if supported else 0.25,\n "risks": [],\n "readme_excerpt": readme_excerpt,\n "evidence": [f"pipeline_tag={pipeline_tag}", f"library_name={library_name}", f"files={\', \'.join(siblings[:12])}"],\n }\n if not supported:\n analysis["risks"].append("Phase 5 only supports simple Transformers text pipeline tasks.")\n if "gated" in tags:\n analysis["risks"].append("Model appears gated; generated Space will not receive OAuth token as a secret in Phase 5.")\n write_json(run_dir / "model_analysis.json", analysis)\n append_event(events_path, "model_analysis", "success" if supported else "unsupported", "Model metadata analyzed", {"selected_task": task, "supported": supported, "confidence": analysis["confidence"]})\n return analysis\n\n\ndef render_app(model_id: str, task: str) -> str:\n return dedent(f\'\'\'\n import gradio as gr\n from transformers import pipeline\n\n MODEL_ID = {model_id!r}\n TASK = {task!r}\n\n pipe = pipeline(TASK, model=MODEL_ID)\n\n def run_model(text: str) -> str:\n text = (text or "Hello from Agentic Space Factory").strip() or "Hello from Agentic Space Factory"\n if TASK == "text-generation":\n result = pipe(text, max_new_tokens=32, do_sample=False)\n return result[0].get("generated_text", str(result))\n if TASK == "text2text-generation":\n result = pipe(text, max_new_tokens=64)\n return result[0].get("generated_text", str(result))\n if TASK == "fill-mask":\n mask = getattr(getattr(pipe, "tokenizer", None), "mask_token", None) or "<mask>"\n if mask not in text:\n text = f"Hugging Face is {{mask}}."\n result = pipe(text)\n return str(result[:3] if isinstance(result, list) else result)\n if TASK in {"text-classification", "sentiment-analysis"}:\n return str(pipe(text))\n return str(pipe(text))\n\n demo = gr.Interface(\n fn=run_model,\n inputs=gr.Textbox(label="Input", value="Hello from Agentic Space Factory"),\n outputs=gr.Textbox(label="Model output"),\n title=f"Model demo: {{MODEL_ID}}",\n description="Generated by Agentic Space Factory from model metadata. Pi adapted this model app.",\n examples=[["Hello from Agentic Space Factory"], ["Hugging Face is awesome"]],\n )\n\n if __name__ == "__main__":\n demo.launch()\n \'\'\').strip() + "\\n"\n\n\ndef render_readme(model_id: str, task: str, target_space_id: str) -> str:\n return dedent(f\'\'\'\n ---\n title: Model Card Generated Space\n emoji: πŸ€–\n colorFrom: green\n colorTo: blue\n sdk: gradio\n app_file: app.py\n python_version: "3.11"\n pinned: false\n ---\n\n # Model Card Generated Space\n\n This private Space was generated by Agentic Space Factory from `{model_id}`.\n\n - Target Space: `{target_space_id}`\n - Selected task: `{task}`\n - Template: `transformers_text_pipeline`\n\n Phase 5 is intentionally limited to simple Transformers text pipelines.\n \'\'\').strip() + "\\n"\n\n\ndef prepare_workspace(workspace: Path, run_dir: Path, model_id: str, task: str, target_space_id: str, analysis: dict, events_path: Path):\n workspace.mkdir(parents=True, exist_ok=True)\n (workspace / "app.py").write_text(render_app(model_id, task), encoding="utf-8")\n (workspace / "README.md").write_text(render_readme(model_id, task, target_space_id), encoding="utf-8")\n (workspace / "requirements.txt").write_text("gradio>=5.0.0\\ntransformers>=4.45.0\\ntorch\\nsafetensors\\n", encoding="utf-8")\n goal = f"""You are running inside a Hugging Face Job as a coding agent.\n\nGoal: adapt the provided minimal Gradio app for the model `{model_id}` and task `{task}`.\n\nFirst, read the HF Spaces Agent Quickstart gist:\nhttps://gist.github.com/gary149/2aba2962375fa9ca56bb9ef53f00b73d\n\nRules for this Phase 5 smoke test:\n- Work only in the current workspace.\n- Do not create, delete, publish, or modify Hugging Face repos. The wrapper will create/upload the private Space.\n- Preserve `app.py`, `README.md`, and `requirements.txt`.\n- Preserve the `run_model` function and a Gradio Interface or Blocks app.\n- Preserve the exact marker phrase: Pi adapted this model app.\n- Keep the app simple and CPU-friendly.\n- Do not print secrets.\n- Write a short summary to `PI_SUMMARY.md`.\n\nModel analysis:\n```json\n{json.dumps({k: v for k, v in analysis.items() if k != \'readme_excerpt\'}, indent=2, ensure_ascii=False)}\n```\n\nREADME excerpt:\n{analysis.get(\'readme_excerpt\', \'\')[:3000]}\n"""\n (workspace / "GOAL.md").write_text(goal, encoding="utf-8")\n save_generated_files(run_dir, workspace)\n append_event(events_path, "workspace", "success", "Prepared model app workspace", {"files": ["app.py", "README.md", "requirements.txt", "GOAL.md"]})\n\n\ndef save_generated_files(run_dir: Path, workspace: Path):\n generated_dir = run_dir / "generated"\n generated_dir.mkdir(parents=True, exist_ok=True)\n for filename in ["app.py", "README.md", "requirements.txt", "GOAL.md", "PI_SUMMARY.md"]:\n path = workspace / filename\n if path.exists():\n (generated_dir / filename).write_text(path.read_text(encoding="utf-8", errors="ignore"), encoding="utf-8")\n\n\ndef run_pi(workspace: Path, run_dir: Path, events_path: Path, model: str):\n append_event(events_path, "pi_run", "started", "Running Pi on model-card workspace", {"model": model})\n env = os.environ.copy()\n env["HF_TOKEN"] = os.environ.get("HF_TOKEN", "")\n code, out = run_cmd(["pi", "-p", (workspace / "GOAL.md").read_text(encoding="utf-8")], cwd=workspace, env=env, timeout=1800)\n logs_dir = run_dir / "logs"\n logs_dir.mkdir(parents=True, exist_ok=True)\n (logs_dir / "pi_output.txt").write_text(out, encoding="utf-8")\n save_generated_files(run_dir, workspace)\n if code != 0:\n append_event(events_path, "pi_run", "failed", "Pi returned a non-zero exit code", {"returncode": code, "output_tail": out[-4000:]})\n raise RuntimeError("Pi failed. See logs/pi_output.txt")\n app_text = (workspace / "app.py").read_text(encoding="utf-8", errors="ignore")\n if "Pi adapted this model app" not in app_text:\n raise RuntimeError("Pi/app verification failed: expected marker phrase missing from app.py")\n append_event(events_path, "pi_run", "success", "Pi completed and preserved required marker")\n\n\ndef collect_pi_traces(run_dir: Path, events_path: Path):\n src = Path.home() / ".pi" / "agent" / "sessions"\n raw_dir = run_dir / "traces" / "raw"\n redacted_dir = run_dir / "traces" / "redacted"\n raw_dir.mkdir(parents=True, exist_ok=True)\n redacted_dir.mkdir(parents=True, exist_ok=True)\n count = 0\n if src.exists():\n for path in src.rglob("*.jsonl"):\n count += 1\n text = path.read_text(encoding="utf-8", errors="ignore")\n (raw_dir / path.name).write_text(redact_text(text), encoding="utf-8")\n (redacted_dir / path.name).write_text(redact_text(text), encoding="utf-8")\n append_event(events_path, "traces", "success", "Collected Pi traces", {"count": count})\n\n\ndef make_gradio_client(target_space_id: str, token: str):\n import inspect\n from gradio_client import Client\n params = inspect.signature(Client).parameters\n if "token" in params:\n return Client(target_space_id, token=token)\n if "hf_token" in params:\n return Client(target_space_id, hf_token=token)\n if "api_key" in params:\n return Client(target_space_id, api_key=token)\n if "headers" in params:\n return Client(target_space_id, headers={"Authorization": f"Bearer {token}"})\n return Client(target_space_id)\n\n\ndef get_api_schema(client):\n try:\n return client.view_api(return_format="dict")\n except TypeError:\n return client.view_api()\n\n\ndef extract_api_names(api_schema) -> list[str]:\n names = []\n def add(value):\n if not value or not isinstance(value, str):\n return\n name = value if value.startswith("/") else f"/{value}"\n if name not in names:\n names.append(name)\n def walk(obj):\n if isinstance(obj, dict):\n for key, value in obj.items():\n if key in {"api_name", "apiName"}:\n add(value)\n if isinstance(key, str) and key.startswith("/"):\n add(key)\n walk(value)\n elif isinstance(obj, list):\n for item in obj:\n walk(item)\n walk(api_schema)\n return names\n\n\ndef predict_with_available_endpoint(client, api_schema, value: str):\n candidates = extract_api_names(api_schema)\n for fallback in ["/run_model", "/predict", "/greet"]:\n if fallback not in candidates:\n candidates.append(fallback)\n errors = []\n for api_name in candidates:\n try:\n return api_name, client.predict(value, api_name=api_name)\n except Exception as exc:\n errors.append({"api_name": api_name, "error": str(exc)[-500:]})\n try:\n return None, client.predict(value)\n except Exception as exc:\n errors.append({"api_name": None, "error": str(exc)[-500:]})\n raise RuntimeError(f"No candidate Gradio endpoint worked: {json.dumps(errors, ensure_ascii=False)}")\n\n\ndef validate_live_api(target_space_id: str, token: str, events_path: Path, tests_dir: Path, timeout_seconds: int = 900):\n tests_dir.mkdir(parents=True, exist_ok=True)\n deadline = time.time() + timeout_seconds\n last_error = None\n attempt = 0\n append_event(events_path, "api_validation", "started", "Waiting for live model Gradio API to become available")\n while time.time() < deadline:\n attempt += 1\n try:\n client = make_gradio_client(target_space_id, token)\n api_schema = get_api_schema(client)\n api_names = extract_api_names(api_schema)\n write_json(tests_dir / "api_schema.json", {"schema": api_schema, "api_names": api_names})\n used_api_name, result = predict_with_available_endpoint(client, api_schema, "Hello from Agentic Space Factory")\n result_text = str(result)\n ok = bool(result_text and len(result_text.strip()) >= 2)\n payload = {"attempt": attempt, "target_space": target_space_id, "api_test_passed": ok, "api_name": used_api_name, "discovered_api_names": api_names, "result": result_text[:4000], "validated_at": now()}\n write_json(tests_dir / "test_result.json", payload)\n if ok:\n append_event(events_path, "api_validation", "success", "Live model API test passed", {"attempt": attempt, "api_name": used_api_name, "discovered_api_names": api_names})\n return payload\n last_error = f"Unexpected empty API result from {used_api_name}: {result_text}"\n except Exception as exc:\n last_error = str(exc)\n append_event(events_path, "api_validation", "waiting", "Live API not ready yet", {"attempt": attempt, "error": last_error[-1000:]})\n time.sleep(20)\n payload = {"target_space": target_space_id, "api_test_passed": False, "error": last_error, "validated_at": now()}\n write_json(tests_dir / "test_result.json", payload)\n raise RuntimeError(f"Live API validation did not pass before timeout: {last_error}")\n\n\ndef create_and_upload_space(api, token: str, target_space_id: str, workspace: Path, events_path: Path):\n append_event(events_path, "create_space", "started", f"Creating private target Space {target_space_id}")\n api.create_repo(repo_id=target_space_id, repo_type="space", space_sdk="gradio", private=True, exist_ok=False, token=token)\n append_event(events_path, "create_space", "success", "Private target Space created", {"target_space": target_space_id})\n append_event(events_path, "upload_files", "started", "Uploading model app files to target Space")\n for path_in_repo in ["app.py", "README.md", "requirements.txt"]:\n api.upload_file(path_or_fileobj=(workspace / path_in_repo).read_bytes(), path_in_repo=path_in_repo, repo_id=target_space_id, repo_type="space", token=token)\n append_event(events_path, "upload_files", "success", f"Uploaded {path_in_repo}")\n\n\ndef main():\n run_id = os.environ["RUN_ID"]\n hf_username = os.environ.get("HF_USERNAME", "unknown")\n bucket_source = os.environ.get("BUCKET_SOURCE", "unknown")\n output_root = Path(os.environ.get("OUTPUT_ROOT", "/output"))\n target_space_id = os.environ["TARGET_SPACE_ID"]\n model_id = sanitize_model_id(os.environ.get("MODEL_ID", ""))\n pi_model = os.environ.get("PI_MODEL") or "moonshotai/Kimi-K2.5"\n token = os.environ.get("HF_TOKEN")\n run_dir = output_root / "runs" / run_id\n events_path = run_dir / "events.jsonl"\n state_path = run_dir / "state.json"\n workspace = Path("/tmp") / f"space-factory-model-{run_id}"\n append_event(events_path, "bootstrap", "started", "Pi model-card worker started", {"model_id": model_id})\n write_json(state_path, {"run_id": run_id, "kind": "pi_model_card", "status": "running", "message": "Analyzing model card and generating a private model demo Space", "model_id": model_id, "target_space": target_space_id, "created_by": hf_username, "bucket_source": bucket_source, "created_at": now(), "updated_at": now()})\n if not token:\n fail(run_dir, events_path, "HF_TOKEN is missing from Job secrets")\n if not TARGET_RE.match(target_space_id):\n fail(run_dir, events_path, "Invalid TARGET_SPACE_ID", {"target_space": target_space_id})\n if not target_space_id.startswith(f"{hf_username}/"):\n fail(run_dir, events_path, "Target Space must be in the signed-in user\'s namespace", {"target_space": target_space_id, "username": hf_username})\n try:\n install_python_deps(events_path)\n from huggingface_hub import HfApi\n api = HfApi(token=token)\n whoami = api.whoami(token=token)\n append_event(events_path, "auth", "success", "Authenticated inside Job", {"whoami_name": whoami.get("name")})\n analysis = analyze_model(model_id, token, run_dir, events_path)\n if not analysis.get("supported"):\n fail(run_dir, events_path, "Model task is unsupported by Phase 5", {"model_analysis": {k: v for k, v in analysis.items() if k != "readme_excerpt"}}, status="unsupported")\n prepare_workspace(workspace, run_dir, model_id, analysis["selected_task"], target_space_id, analysis, events_path)\n install_pi(events_path)\n configure_pi(events_path, pi_model)\n run_pi(workspace, run_dir, events_path, pi_model)\n collect_pi_traces(run_dir, events_path)\n create_and_upload_space(api, token, target_space_id, workspace, events_path)\n write_json(run_dir / "target_space.json", {"target_space": target_space_id, "url": f"https://huggingface.co/spaces/{target_space_id}", "private": True, "sdk": "gradio", "created_by": hf_username, "model_id": model_id})\n validation = validate_live_api(target_space_id, token, events_path, run_dir / "tests")\n final_state = {"run_id": run_id, "kind": "pi_model_card", "status": "success", "message": "Model-card generated private Space created and validated through the live API.", "model_id": model_id, "target_space": target_space_id, "target_space_url": f"https://huggingface.co/spaces/{target_space_id}", "created_by": hf_username, "bucket_source": bucket_source, "model_analysis": {k: v for k, v in analysis.items() if k != "readme_excerpt"}, "validation": validation, "updated_at": now(), "security_notes": ["The target Space was created as private.", "The HF token was not printed or intentionally persisted.", "Phase 5 supports only simple public text pipeline models.", "Success was declared only after the wrapper live API test passed."]}\n write_json(state_path, final_state)\n report = f"""# Agentic Space Factory β€” Model Card Space Report\n\nRun ID: `{run_id}`\n\nStatus: **success**\n\nCreated private model demo Space: [`{target_space_id}`](https://huggingface.co/spaces/{target_space_id})\n\n## Model\n\n- Model ID: `{model_id}`\n- Selected task: `{analysis[\'selected_task\']}`\n- Template: `{analysis[\'template\']}`\n- Pi model: `{pi_model}`\n\n## What happened\n\n```text\nOAuth user β†’ HF Job β†’ model metadata analysis β†’ Pi adapts app.py β†’ private Space creation β†’ live API validation β†’ Bucket report\n```\n\n## Live API validation\n\n```json\n{json.dumps(validation, indent=2, ensure_ascii=False)}\n```\n\n## Security posture\n\n- The target Space was created as private.\n- No token was printed or intentionally persisted.\n- Pi was instructed not to create/delete/publish repos; the wrapper performed Hub operations.\n- Success was declared only after the live API returned a non-empty result.\n\n## Next step\n\nPhase 6 should add a ZeroGPU Diffusers template and stricter model compatibility gating.\n"""\n (run_dir / "report.md").write_text(report, encoding="utf-8")\n append_event(events_path, "report_write", "success", "Wrote report.md")\n append_event(events_path, "done", "success", "Pi model-card worker completed")\n except SystemExit:\n raise\n except Exception as exc:\n collect_pi_traces(run_dir, events_path)\n fail(run_dir, events_path, "Pi model-card worker failed", {"error": str(exc)})\n\n\nif __name__ == "__main__":\n main()\n'
570
 
571
  def encoded_worker_script() -> str:
572
  """Return the base64-encoded Phase 1 hello worker script."""
 
566
 
567
 
568
 
569
+ PI_MODEL_CARD_WORKER_SCRIPT = 'import json\nimport os\nimport re\nimport shutil\nimport subprocess\nimport sys\nimport time\nfrom datetime import datetime, timezone\nfrom pathlib import Path\nfrom textwrap import dedent\n\n\nTARGET_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{1,95}/[A-Za-z0-9][A-Za-z0-9._-]{1,95}$")\nSUPPORTED_TASKS = {"text-generation", "text2text-generation", "fill-mask", "text-classification", "sentiment-analysis"}\n\n\ndef now():\n return datetime.now(timezone.utc).isoformat()\n\n\ndef write_json(path: Path, payload: dict):\n path.parent.mkdir(parents=True, exist_ok=True)\n path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\\n", encoding="utf-8")\n\n\ndef append_event(path: Path, step: str, status: str, message: str, data: dict | None = None):\n path.parent.mkdir(parents=True, exist_ok=True)\n event = {"ts": now(), "step": step, "status": status, "message": message, "data": data or {}}\n line = json.dumps(event, ensure_ascii=False)\n with path.open("a", encoding="utf-8") as f:\n f.write(line + "\\n")\n print(line, flush=True)\n\n\ndef redact_text(text: str | None) -> str:\n if not text:\n return ""\n value = text\n for secret_name in ["HF_TOKEN", "HUGGING_FACE_HUB_TOKEN"]:\n secret = os.environ.get(secret_name)\n if secret:\n value = value.replace(secret, "[REDACTED]")\n value = re.sub(r"Bearer\\s+[A-Za-z0-9_\\-.=]+", "Bearer [REDACTED]", value)\n value = re.sub(r"hf_[A-Za-z0-9_\\-]{10,}", "hf_[REDACTED]", value)\n return value\n\n\ndef safe_details(details: dict | None) -> dict:\n if not details:\n return {}\n try:\n return json.loads(redact_text(json.dumps(details, ensure_ascii=False)))\n except Exception:\n return {"redacted_details": redact_text(str(details))[-4000:]}\n\n\ndef fail(run_dir: Path, events_path: Path, message: str, details: dict | None = None, status: str = "failed"):\n safe = safe_details(details)\n append_event(events_path, "failure", "failed", message, safe)\n write_json(run_dir / "state.json", {\n "run_id": os.environ.get("RUN_ID"),\n "kind": "pi_model_card",\n "status": status,\n "message": message,\n "updated_at": now(),\n "details": safe,\n })\n report = f"""# Agentic Space Factory β€” Model Card Space Report\n\nStatus: **{status}**\n\n{message}\n\n```json\n{json.dumps(safe, indent=2, ensure_ascii=False)}\n```\n"""\n (run_dir / "report.md").write_text(report, encoding="utf-8")\n raise SystemExit(1)\n\n\ndef run_cmd(cmd: list[str], *, cwd: Path | None = None, env: dict | None = None, timeout: int = 600):\n result = subprocess.run(cmd, cwd=str(cwd) if cwd else None, env=env, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=timeout)\n return result.returncode, redact_text(result.stdout)\n\n\ndef install_python_deps(events_path: Path):\n append_event(events_path, "dependencies", "started", "Installing Python worker dependencies")\n code, out = run_cmd([sys.executable, "-m", "pip", "install", "-q", "--upgrade", "huggingface_hub>=1.0.0", "gradio_client>=2.0.0"], timeout=600)\n if code != 0:\n append_event(events_path, "dependencies", "failed", "Python dependency installation failed", {"output_tail": out[-4000:]})\n raise RuntimeError(out)\n append_event(events_path, "dependencies", "success", "Python worker dependencies installed")\n\n\ndef ensure_node(events_path: Path):\n node = shutil.which("node")\n npm = shutil.which("npm")\n if node and npm:\n _, node_v = run_cmd([node, "--version"], timeout=30)\n _, npm_v = run_cmd([npm, "--version"], timeout=30)\n append_event(events_path, "node", "success", "Node/npm already available", {"node": node_v.strip(), "npm": npm_v.strip()})\n return\n append_event(events_path, "node", "started", "Installing nodejs/npm through apt-get")\n code, out = run_cmd(["bash", "-lc", "apt-get update -qq && apt-get install -y -qq nodejs npm"], timeout=600)\n if code != 0:\n append_event(events_path, "node", "failed", "Could not install nodejs/npm", {"output_tail": out[-4000:]})\n raise RuntimeError(out)\n append_event(events_path, "node", "success", "Installed nodejs/npm")\n\n\ndef install_pi(events_path: Path):\n ensure_node(events_path)\n append_event(events_path, "pi_install", "started", "Installing Pi coding agent from npm")\n code, out = run_cmd(["npm", "install", "-g", "@mariozechner/pi-coding-agent"], timeout=900)\n if code != 0:\n append_event(events_path, "pi_install", "failed", "Pi npm installation failed", {"output_tail": out[-4000:]})\n raise RuntimeError(out)\n code, version = run_cmd(["pi", "--version"], timeout=60)\n append_event(events_path, "pi_install", "success", "Pi installed", {"version_output": version.strip()[-300:]})\n\n\ndef configure_pi(events_path: Path, model: str):\n pi_dir = Path.home() / ".pi" / "agent"\n pi_dir.mkdir(parents=True, exist_ok=True)\n (pi_dir / "auth.json").write_text(json.dumps({"huggingface": {"type": "api_key", "key": os.environ.get("HF_TOKEN", "")}}, indent=2), encoding="utf-8")\n (pi_dir / "settings.json").write_text(json.dumps({"model": model, "provider": "huggingface", "autoRun": True, "autoApply": True}, indent=2), encoding="utf-8")\n append_event(events_path, "pi_config", "success", "Configured Pi", {"model": model})\n\n\ndef sanitize_model_id(model_id: str) -> str:\n model_id = (model_id or "").strip().replace("https://huggingface.co/", "")\n model_id = model_id.split("?", 1)[0].strip("/")\n if not re.match(r"^[A-Za-z0-9_.-]+/[A-Za-z0-9_.-]+$", model_id):\n raise ValueError("MODEL_ID must look like owner/model-name")\n return model_id\n\n\ndef analyze_model(model_id: str, token: str, run_dir: Path, events_path: Path) -> dict:\n from huggingface_hub import HfApi, hf_hub_download\n append_event(events_path, "model_analysis", "started", "Fetching model metadata", {"model_id": model_id})\n api = HfApi(token=token)\n info = api.model_info(model_id, token=token, files_metadata=False)\n siblings = [getattr(s, "rfilename", "") for s in (info.siblings or [])]\n pipeline_tag = getattr(info, "pipeline_tag", None)\n library_name = getattr(info, "library_name", None)\n tags = list(getattr(info, "tags", []) or [])\n readme_excerpt = ""\n try:\n readme_path = hf_hub_download(repo_id=model_id, filename="README.md", token=token)\n readme_text = Path(readme_path).read_text(encoding="utf-8", errors="ignore")\n readme_excerpt = readme_text[:6000]\n except Exception as exc:\n readme_excerpt = f"Could not download README.md: {exc}"\n task = pipeline_tag or "text-generation"\n if task == "sentiment-analysis":\n task = "text-classification"\n supported = task in SUPPORTED_TASKS\n analysis = {\n "model_id": model_id,\n "pipeline_tag": pipeline_tag,\n "library_name": library_name,\n "tags": tags[:80],\n "siblings": siblings[:120],\n "selected_task": task,\n "template": "transformers_text_pipeline" if supported else "unsupported",\n "supported": supported,\n "confidence": 0.8 if supported else 0.25,\n "risks": [],\n "readme_excerpt": readme_excerpt,\n "evidence": [f"pipeline_tag={pipeline_tag}", f"library_name={library_name}", f"files={\', \'.join(siblings[:12])}"],\n }\n if not supported:\n analysis["risks"].append("Phase 5 only supports simple Transformers text pipeline tasks.")\n if "gated" in tags:\n analysis["risks"].append("Model appears gated; generated Space will not receive OAuth token as a secret in Phase 5.")\n write_json(run_dir / "model_analysis.json", analysis)\n append_event(events_path, "model_analysis", "success" if supported else "unsupported", "Model metadata analyzed", {"selected_task": task, "supported": supported, "confidence": analysis["confidence"]})\n return analysis\n\n\ndef render_app(model_id: str, task: str) -> str:\n return dedent(f\'\'\'\n import gradio as gr\n from transformers import pipeline\n\n MODEL_ID = {model_id!r}\n TASK = {task!r}\n\n pipe = pipeline(TASK, model=MODEL_ID)\n\n def run_model(text: str) -> str:\n text = (text or "Hello from Agentic Space Factory").strip() or "Hello from Agentic Space Factory"\n if TASK == "text-generation":\n result = pipe(text, max_new_tokens=32, do_sample=False)\n return result[0].get("generated_text", str(result))\n if TASK == "text2text-generation":\n result = pipe(text, max_new_tokens=64)\n return result[0].get("generated_text", str(result))\n if TASK == "fill-mask":\n mask = getattr(getattr(pipe, "tokenizer", None), "mask_token", None) or "<mask>"\n if mask not in text:\n text = f"Hugging Face is {{mask}}."\n result = pipe(text)\n return str(result[:3] if isinstance(result, list) else result)\n if TASK in {"text-classification", "sentiment-analysis"}:\n return str(pipe(text))\n return str(pipe(text))\n\n demo = gr.Interface(\n fn=run_model,\n inputs=gr.Textbox(label="Input", value="Hello from Agentic Space Factory"),\n outputs=gr.Textbox(label="Model output"),\n title=f"Model demo: {{MODEL_ID}}",\n description="Generated by Agentic Space Factory from model metadata. Pi adapted this model app.",\n examples=[["Hello from Agentic Space Factory"], ["Hugging Face is awesome"]],\n )\n\n if __name__ == "__main__":\n demo.launch()\n \'\'\').strip() + "\\n"\n\n\ndef render_readme(model_id: str, task: str, target_space_id: str) -> str:\n return dedent(f\'\'\'\n ---\n title: Model Card Generated Space\n emoji: πŸ€–\n colorFrom: green\n colorTo: blue\n sdk: gradio\n app_file: app.py\n python_version: "3.11"\n pinned: false\n ---\n\n # Model Card Generated Space\n\n This private Space was generated by Agentic Space Factory from `{model_id}`.\n\n - Target Space: `{target_space_id}`\n - Selected task: `{task}`\n - Template: `transformers_text_pipeline`\n\n Phase 5 is intentionally limited to simple Transformers text pipelines.\n \'\'\').strip() + "\\n"\n\n\ndef prepare_workspace(workspace: Path, run_dir: Path, model_id: str, task: str, target_space_id: str, analysis: dict, events_path: Path):\n workspace.mkdir(parents=True, exist_ok=True)\n (workspace / "app.py").write_text(render_app(model_id, task), encoding="utf-8")\n (workspace / "README.md").write_text(render_readme(model_id, task, target_space_id), encoding="utf-8")\n (workspace / "requirements.txt").write_text("gradio>=5.0.0\\nhuggingface_hub>=0.34.0,<1.0.0\\ntransformers>=4.45.0\\ntorch\\nsafetensors\\n", encoding="utf-8")\n goal = f"""You are running inside a Hugging Face Job as a coding agent.\n\nGoal: adapt the provided minimal Gradio app for the model `{model_id}` and task `{task}`.\n\nFirst, read the HF Spaces Agent Quickstart gist:\nhttps://gist.github.com/gary149/2aba2962375fa9ca56bb9ef53f00b73d\n\nRules for this Phase 5 smoke test:\n- Work only in the current workspace.\n- Do not create, delete, publish, or modify Hugging Face repos. The wrapper will create/upload the private Space.\n- Preserve `app.py`, `README.md`, and `requirements.txt`.\n- Do not remove the `huggingface_hub>=0.34.0,<1.0.0` compatibility pin from requirements.txt.\n- Preserve the `run_model` function and a Gradio Interface or Blocks app.\n- Preserve the exact marker phrase: Pi adapted this model app.\n- Keep the app simple and CPU-friendly.\n- Do not print secrets.\n- Write a short summary to `PI_SUMMARY.md`.\n\nModel analysis:\n```json\n{json.dumps({k: v for k, v in analysis.items() if k != \'readme_excerpt\'}, indent=2, ensure_ascii=False)}\n```\n\nREADME excerpt:\n{analysis.get(\'readme_excerpt\', \'\')[:3000]}\n"""\n (workspace / "GOAL.md").write_text(goal, encoding="utf-8")\n save_generated_files(run_dir, workspace)\n append_event(events_path, "workspace", "success", "Prepared model app workspace", {"files": ["app.py", "README.md", "requirements.txt", "GOAL.md"]})\n\n\ndef save_generated_files(run_dir: Path, workspace: Path):\n generated_dir = run_dir / "generated"\n generated_dir.mkdir(parents=True, exist_ok=True)\n for filename in ["app.py", "README.md", "requirements.txt", "GOAL.md", "PI_SUMMARY.md"]:\n path = workspace / filename\n if path.exists():\n (generated_dir / filename).write_text(path.read_text(encoding="utf-8", errors="ignore"), encoding="utf-8")\n\n\ndef run_pi(workspace: Path, run_dir: Path, events_path: Path, model: str):\n append_event(events_path, "pi_run", "started", "Running Pi on model-card workspace", {"model": model})\n env = os.environ.copy()\n env["HF_TOKEN"] = os.environ.get("HF_TOKEN", "")\n code, out = run_cmd(["pi", "-p", (workspace / "GOAL.md").read_text(encoding="utf-8")], cwd=workspace, env=env, timeout=1800)\n logs_dir = run_dir / "logs"\n logs_dir.mkdir(parents=True, exist_ok=True)\n (logs_dir / "pi_output.txt").write_text(out, encoding="utf-8")\n save_generated_files(run_dir, workspace)\n if code != 0:\n append_event(events_path, "pi_run", "failed", "Pi returned a non-zero exit code", {"returncode": code, "output_tail": out[-4000:]})\n raise RuntimeError("Pi failed. See logs/pi_output.txt")\n app_text = (workspace / "app.py").read_text(encoding="utf-8", errors="ignore")\n if "Pi adapted this model app" not in app_text:\n raise RuntimeError("Pi/app verification failed: expected marker phrase missing from app.py")\n append_event(events_path, "pi_run", "success", "Pi completed and preserved required marker")\n\n\ndef collect_pi_traces(run_dir: Path, events_path: Path):\n src = Path.home() / ".pi" / "agent" / "sessions"\n raw_dir = run_dir / "traces" / "raw"\n redacted_dir = run_dir / "traces" / "redacted"\n raw_dir.mkdir(parents=True, exist_ok=True)\n redacted_dir.mkdir(parents=True, exist_ok=True)\n count = 0\n if src.exists():\n for path in src.rglob("*.jsonl"):\n count += 1\n text = path.read_text(encoding="utf-8", errors="ignore")\n (raw_dir / path.name).write_text(redact_text(text), encoding="utf-8")\n (redacted_dir / path.name).write_text(redact_text(text), encoding="utf-8")\n append_event(events_path, "traces", "success", "Collected Pi traces", {"count": count})\n\n\ndef make_gradio_client(target_space_id: str, token: str):\n import inspect\n from gradio_client import Client\n params = inspect.signature(Client).parameters\n if "token" in params:\n return Client(target_space_id, token=token)\n if "hf_token" in params:\n return Client(target_space_id, hf_token=token)\n if "api_key" in params:\n return Client(target_space_id, api_key=token)\n if "headers" in params:\n return Client(target_space_id, headers={"Authorization": f"Bearer {token}"})\n return Client(target_space_id)\n\n\ndef get_api_schema(client):\n try:\n return client.view_api(return_format="dict")\n except TypeError:\n return client.view_api()\n\n\ndef extract_api_names(api_schema) -> list[str]:\n names = []\n def add(value):\n if not value or not isinstance(value, str):\n return\n name = value if value.startswith("/") else f"/{value}"\n if name not in names:\n names.append(name)\n def walk(obj):\n if isinstance(obj, dict):\n for key, value in obj.items():\n if key in {"api_name", "apiName"}:\n add(value)\n if isinstance(key, str) and key.startswith("/"):\n add(key)\n walk(value)\n elif isinstance(obj, list):\n for item in obj:\n walk(item)\n walk(api_schema)\n return names\n\n\ndef predict_with_available_endpoint(client, api_schema, value: str):\n candidates = extract_api_names(api_schema)\n for fallback in ["/run_model", "/predict", "/greet"]:\n if fallback not in candidates:\n candidates.append(fallback)\n errors = []\n for api_name in candidates:\n try:\n return api_name, client.predict(value, api_name=api_name)\n except Exception as exc:\n errors.append({"api_name": api_name, "error": str(exc)[-500:]})\n try:\n return None, client.predict(value)\n except Exception as exc:\n errors.append({"api_name": None, "error": str(exc)[-500:]})\n raise RuntimeError(f"No candidate Gradio endpoint worked: {json.dumps(errors, ensure_ascii=False)}")\n\n\ndef validate_live_api(target_space_id: str, token: str, events_path: Path, tests_dir: Path, timeout_seconds: int = 900):\n tests_dir.mkdir(parents=True, exist_ok=True)\n deadline = time.time() + timeout_seconds\n last_error = None\n attempt = 0\n append_event(events_path, "api_validation", "started", "Waiting for live model Gradio API to become available")\n while time.time() < deadline:\n attempt += 1\n try:\n client = make_gradio_client(target_space_id, token)\n api_schema = get_api_schema(client)\n api_names = extract_api_names(api_schema)\n write_json(tests_dir / "api_schema.json", {"schema": api_schema, "api_names": api_names})\n used_api_name, result = predict_with_available_endpoint(client, api_schema, "Hello from Agentic Space Factory")\n result_text = str(result)\n ok = bool(result_text and len(result_text.strip()) >= 2)\n payload = {"attempt": attempt, "target_space": target_space_id, "api_test_passed": ok, "api_name": used_api_name, "discovered_api_names": api_names, "result": result_text[:4000], "validated_at": now()}\n write_json(tests_dir / "test_result.json", payload)\n if ok:\n append_event(events_path, "api_validation", "success", "Live model API test passed", {"attempt": attempt, "api_name": used_api_name, "discovered_api_names": api_names})\n return payload\n last_error = f"Unexpected empty API result from {used_api_name}: {result_text}"\n except Exception as exc:\n last_error = str(exc)\n append_event(events_path, "api_validation", "waiting", "Live API not ready yet", {"attempt": attempt, "error": last_error[-1000:]})\n time.sleep(20)\n payload = {"target_space": target_space_id, "api_test_passed": False, "error": last_error, "validated_at": now()}\n write_json(tests_dir / "test_result.json", payload)\n raise RuntimeError(f"Live API validation did not pass before timeout: {last_error}")\n\n\ndef create_and_upload_space(api, token: str, target_space_id: str, workspace: Path, events_path: Path):\n append_event(events_path, "create_space", "started", f"Creating private target Space {target_space_id}")\n api.create_repo(repo_id=target_space_id, repo_type="space", space_sdk="gradio", private=True, exist_ok=False, token=token)\n append_event(events_path, "create_space", "success", "Private target Space created", {"target_space": target_space_id})\n append_event(events_path, "upload_files", "started", "Uploading model app files to target Space")\n for path_in_repo in ["app.py", "README.md", "requirements.txt"]:\n api.upload_file(path_or_fileobj=(workspace / path_in_repo).read_bytes(), path_in_repo=path_in_repo, repo_id=target_space_id, repo_type="space", token=token)\n append_event(events_path, "upload_files", "success", f"Uploaded {path_in_repo}")\n\n\ndef main():\n run_id = os.environ["RUN_ID"]\n hf_username = os.environ.get("HF_USERNAME", "unknown")\n bucket_source = os.environ.get("BUCKET_SOURCE", "unknown")\n output_root = Path(os.environ.get("OUTPUT_ROOT", "/output"))\n target_space_id = os.environ["TARGET_SPACE_ID"]\n model_id = sanitize_model_id(os.environ.get("MODEL_ID", ""))\n pi_model = os.environ.get("PI_MODEL") or "moonshotai/Kimi-K2.5"\n token = os.environ.get("HF_TOKEN")\n run_dir = output_root / "runs" / run_id\n events_path = run_dir / "events.jsonl"\n state_path = run_dir / "state.json"\n workspace = Path("/tmp") / f"space-factory-model-{run_id}"\n append_event(events_path, "bootstrap", "started", "Pi model-card worker started", {"model_id": model_id})\n write_json(state_path, {"run_id": run_id, "kind": "pi_model_card", "status": "running", "message": "Analyzing model card and generating a private model demo Space", "model_id": model_id, "target_space": target_space_id, "created_by": hf_username, "bucket_source": bucket_source, "created_at": now(), "updated_at": now()})\n if not token:\n fail(run_dir, events_path, "HF_TOKEN is missing from Job secrets")\n if not TARGET_RE.match(target_space_id):\n fail(run_dir, events_path, "Invalid TARGET_SPACE_ID", {"target_space": target_space_id})\n if not target_space_id.startswith(f"{hf_username}/"):\n fail(run_dir, events_path, "Target Space must be in the signed-in user\'s namespace", {"target_space": target_space_id, "username": hf_username})\n try:\n install_python_deps(events_path)\n from huggingface_hub import HfApi\n api = HfApi(token=token)\n whoami = api.whoami(token=token)\n append_event(events_path, "auth", "success", "Authenticated inside Job", {"whoami_name": whoami.get("name")})\n analysis = analyze_model(model_id, token, run_dir, events_path)\n if not analysis.get("supported"):\n fail(run_dir, events_path, "Model task is unsupported by Phase 5", {"model_analysis": {k: v for k, v in analysis.items() if k != "readme_excerpt"}}, status="unsupported")\n prepare_workspace(workspace, run_dir, model_id, analysis["selected_task"], target_space_id, analysis, events_path)\n install_pi(events_path)\n configure_pi(events_path, pi_model)\n run_pi(workspace, run_dir, events_path, pi_model)\n collect_pi_traces(run_dir, events_path)\n create_and_upload_space(api, token, target_space_id, workspace, events_path)\n write_json(run_dir / "target_space.json", {"target_space": target_space_id, "url": f"https://huggingface.co/spaces/{target_space_id}", "private": True, "sdk": "gradio", "created_by": hf_username, "model_id": model_id})\n validation = validate_live_api(target_space_id, token, events_path, run_dir / "tests")\n final_state = {"run_id": run_id, "kind": "pi_model_card", "status": "success", "message": "Model-card generated private Space created and validated through the live API.", "model_id": model_id, "target_space": target_space_id, "target_space_url": f"https://huggingface.co/spaces/{target_space_id}", "created_by": hf_username, "bucket_source": bucket_source, "model_analysis": {k: v for k, v in analysis.items() if k != "readme_excerpt"}, "validation": validation, "updated_at": now(), "security_notes": ["The target Space was created as private.", "The HF token was not printed or intentionally persisted.", "Phase 5 supports only simple public text pipeline models.", "Success was declared only after the wrapper live API test passed."]}\n write_json(state_path, final_state)\n report = f"""# Agentic Space Factory β€” Model Card Space Report\n\nRun ID: `{run_id}`\n\nStatus: **success**\n\nCreated private model demo Space: [`{target_space_id}`](https://huggingface.co/spaces/{target_space_id})\n\n## Model\n\n- Model ID: `{model_id}`\n- Selected task: `{analysis[\'selected_task\']}`\n- Template: `{analysis[\'template\']}`\n- Pi model: `{pi_model}`\n\n## What happened\n\n```text\nOAuth user β†’ HF Job β†’ model metadata analysis β†’ Pi adapts app.py β†’ private Space creation β†’ live API validation β†’ Bucket report\n```\n\n## Live API validation\n\n```json\n{json.dumps(validation, indent=2, ensure_ascii=False)}\n```\n\n## Security posture\n\n- The target Space was created as private.\n- No token was printed or intentionally persisted.\n- Pi was instructed not to create/delete/publish repos; the wrapper performed Hub operations.\n- Success was declared only after the live API returned a non-empty result.\n\n## Next step\n\nPhase 6 should add a ZeroGPU Diffusers template and stricter model compatibility gating.\n"""\n (run_dir / "report.md").write_text(report, encoding="utf-8")\n append_event(events_path, "report_write", "success", "Wrote report.md")\n append_event(events_path, "done", "success", "Pi model-card worker completed")\n except SystemExit:\n raise\n except Exception as exc:\n collect_pi_traces(run_dir, events_path)\n fail(run_dir, events_path, "Pi model-card worker failed", {"error": str(exc)})\n\n\nif __name__ == "__main__":\n main()\n'
570
 
571
  def encoded_worker_script() -> str:
572
  """Return the base64-encoded Phase 1 hello worker script."""