from __future__ import annotations import re from typing import Any from huggingface_hub import Volume, fetch_job_logs, inspect_job, run_job from .config import settings from .runs import make_run_id, utc_now_iso, validate_run_id from .worker_payload import ( encoded_create_space_worker_script, encoded_worker_script, python_decode_and_run_command, ) SPACE_SLUG_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{1,95}$") def _base_env(*, run_id: str, username: str, worker_script_b64: str) -> dict[str, str]: return { "RUN_ID": run_id, "HF_USERNAME": username or "unknown", "BUCKET_SOURCE": settings.bucket_source, "OUTPUT_ROOT": settings.bucket_mount, "WORKER_SCRIPT_B64": worker_script_b64, "LAUNCHED_AT": utc_now_iso(), } def _launch_job(*, token: str, env: dict[str, str]) -> Any: return run_job( image=settings.job_image, command=python_decode_and_run_command(), flavor=settings.job_flavor, timeout=settings.job_timeout, env=env, secrets={"HF_TOKEN": token}, volumes=[Volume(type="bucket", source=settings.bucket_source, mount_path=settings.bucket_mount)], token=token, ) def _job_result(job: Any, *, run_id: str, kind: str, extra: dict[str, Any] | None = None) -> dict[str, Any]: payload: dict[str, Any] = { "run_id": run_id, "kind": kind, "job_id": job.id, "job_url": getattr(job, "url", None), "status": getattr(getattr(job, "status", None), "stage", None), "bucket_source": settings.bucket_source, "bucket_uri": settings.bucket_uri, } if extra: payload.update(extra) return payload def launch_hello_job(*, token: str, username: str, run_id: str | None = None) -> dict[str, Any]: """Launch the Phase 1 HF Job that only writes state/events/report to the bucket.""" if not token: raise ValueError("Missing OAuth token. Please sign in with Hugging Face first.") safe_run_id = validate_run_id(run_id) if run_id else make_run_id("hello") env = _base_env(run_id=safe_run_id, username=username, worker_script_b64=encoded_worker_script()) job = _launch_job(token=token, env=env) return _job_result(job, run_id=safe_run_id, kind="hello_job") def normalize_target_space(*, username: str, target_slug: str | None, run_id: str) -> str: """Return `username/slug`, constrained to the signed-in user's namespace for V2.""" slug = (target_slug or "").strip() if not slug: slug = f"space-factory-{run_id}".lower()[:80] # If user pasted a full repo id, only allow their own namespace in Phase 2. if "/" in slug: namespace, repo = slug.split("/", 1) if namespace != username: raise ValueError("For Phase 2, the target Space must be created in your own namespace.") slug = repo if not SPACE_SLUG_RE.match(slug): raise ValueError("Invalid target Space name. Use letters, numbers, dots, underscores, or dashes.") return f"{username}/{slug}" def launch_create_private_space_job( *, token: str, username: str, target_slug: str | None = None, run_id: str | None = None, ) -> dict[str, Any]: """Launch the Phase 2 Job: create a private target Gradio Space and validate it live.""" if not token: raise ValueError("Missing OAuth token. Please sign in with Hugging Face first.") safe_run_id = validate_run_id(run_id) if run_id else make_run_id("space") target_space_id = normalize_target_space(username=username, target_slug=target_slug, run_id=safe_run_id) env = _base_env( run_id=safe_run_id, username=username, worker_script_b64=encoded_create_space_worker_script(), ) env["TARGET_SPACE_ID"] = target_space_id job = _launch_job(token=token, env=env) return _job_result( job, run_id=safe_run_id, kind="create_private_space", extra={"target_space": target_space_id, "target_space_url": f"https://huggingface.co/spaces/{target_space_id}"}, ) def inspect_job_safe(job_id: str, token: str | None = None) -> dict[str, Any]: if not job_id: return {"error": "Missing job_id"} try: info = inspect_job(job_id=job_id, token=token) status = getattr(info, "status", None) return { "id": info.id, "url": getattr(info, "url", None), "stage": getattr(status, "stage", None), "message": getattr(status, "message", None), "flavor": getattr(info, "flavor", None), "created_at": str(getattr(info, "created_at", "")), "started_at": str(getattr(info, "started_at", "")), "finished_at": str(getattr(info, "finished_at", "")), } except Exception as exc: # noqa: BLE001 return {"error": str(exc)} def fetch_recent_logs_safe(job_id: str, token: str | None = None, max_lines: int = 120) -> str: if not job_id: return "" try: logs = list(fetch_job_logs(job_id=job_id, token=token)) return "\n".join(str(line).rstrip("\n") for line in logs[-max_lines:]) except Exception as exc: # noqa: BLE001 return f"Could not fetch job logs: {exc}"