fffiloni's picture
Upload hello_worker.py
8b7a455 verified
Raw
History Blame
1.77 kB
"""Standalone version of the first worker.
The orchestrator currently embeds an equivalent script into the Job environment as
base64 to keep the first deployment self-contained. This file exists so the
worker logic is easy to inspect, test, and evolve in the next increments.
"""
from __future__ import annotations
import json
import os
from datetime import datetime, timezone
from pathlib import Path
def now() -> str:
return datetime.now(timezone.utc).isoformat()
def write_json(path: Path, payload: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
def append_event(path: Path, step: str, status: str, message: str, data: dict | None = None) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
event = {"ts": now(), "step": step, "status": status, "message": message, "data": data or {}}
with path.open("a", encoding="utf-8") as f:
f.write(json.dumps(event, ensure_ascii=False) + "\n")
def main() -> None:
run_id = os.environ["RUN_ID"]
output_root = Path(os.environ.get("OUTPUT_ROOT", "/output"))
run_dir = output_root / "runs" / run_id
events_path = run_dir / "events.jsonl"
append_event(events_path, "bootstrap", "started", "Standalone worker started")
write_json(
run_dir / "state.json",
{
"run_id": run_id,
"status": "success",
"message": "Standalone hello worker completed.",
"updated_at": now(),
},
)
(run_dir / "report.md").write_text("# Hello Worker\n\nSuccess.\n", encoding="utf-8")
append_event(events_path, "done", "success", "Standalone worker completed")
if __name__ == "__main__":
main()