from __future__ import annotations import re from typing import Any from huggingface_hub import Volume, fetch_job_logs, inspect_job, run_job from .config import settings from .runs import make_run_id, utc_now_iso, validate_run_id from .worker_payload import ( encoded_create_space_worker_script, encoded_pi_gist_worker_script, encoded_pi_model_card_worker_script, encoded_runtime_recommender_worker_script, encoded_longcat_article_worker_script, encoded_universal_model_card_worker_script, encoded_pi_space_worker_script, encoded_worker_script, python_decode_and_run_command, ) SPACE_SLUG_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{1,95}$") def _base_env(*, run_id: str, username: str, worker_script_b64: str) -> dict[str, str]: return { "RUN_ID": run_id, "HF_USERNAME": username or "unknown", "BUCKET_SOURCE": settings.bucket_source, "OUTPUT_ROOT": settings.bucket_mount, "WORKER_SCRIPT_B64": worker_script_b64, "LAUNCHED_AT": utc_now_iso(), } def _launch_job(*, token: str, env: dict[str, str], flavor: str | None = None, timeout: str | None = None) -> Any: return run_job( image=settings.job_image, command=python_decode_and_run_command(), flavor=flavor or settings.job_flavor, timeout=timeout or settings.job_timeout, env=env, secrets={"HF_TOKEN": token}, volumes=[Volume(type="bucket", source=settings.bucket_source, mount_path=settings.bucket_mount)], token=token, ) def _job_result(job: Any, *, run_id: str, kind: str, extra: dict[str, Any] | None = None) -> dict[str, Any]: payload: dict[str, Any] = { "run_id": run_id, "kind": kind, "job_id": job.id, "job_url": getattr(job, "url", None), "status": getattr(getattr(job, "status", None), "stage", None), "bucket_source": settings.bucket_source, "bucket_uri": settings.bucket_uri, } if extra: payload.update(extra) return payload def launch_hello_job(*, token: str, username: str, run_id: str | None = None) -> dict[str, Any]: """Launch the Phase 1 HF Job that only writes state/events/report to the bucket.""" if not token: raise ValueError("Missing OAuth token. Please sign in with Hugging Face first.") safe_run_id = validate_run_id(run_id) if run_id else make_run_id("hello") env = _base_env(run_id=safe_run_id, username=username, worker_script_b64=encoded_worker_script()) job = _launch_job(token=token, env=env) return _job_result(job, run_id=safe_run_id, kind="hello_job") def normalize_target_space(*, username: str, target_slug: str | None, run_id: str) -> str: """Return `username/slug`, constrained to the signed-in user's namespace for V2.""" slug = (target_slug or "").strip() if not slug: slug = f"space-factory-{run_id}".lower()[:80] # If user pasted a full repo id, only allow their own namespace in Phase 2. if "/" in slug: namespace, repo = slug.split("/", 1) if namespace != username: raise ValueError("For Phase 2, the target Space must be created in your own namespace.") slug = repo if not SPACE_SLUG_RE.match(slug): raise ValueError("Invalid target Space name. Use letters, numbers, dots, underscores, or dashes.") return f"{username}/{slug}" def launch_create_private_space_job( *, token: str, username: str, target_slug: str | None = None, run_id: str | None = None, ) -> dict[str, Any]: """Launch the Phase 2 Job: create a private target Gradio Space and validate it live.""" if not token: raise ValueError("Missing OAuth token. Please sign in with Hugging Face first.") safe_run_id = validate_run_id(run_id) if run_id else make_run_id("space") target_space_id = normalize_target_space(username=username, target_slug=target_slug, run_id=safe_run_id) env = _base_env( run_id=safe_run_id, username=username, worker_script_b64=encoded_create_space_worker_script(), ) env["TARGET_SPACE_ID"] = target_space_id job = _launch_job(token=token, env=env) return _job_result( job, run_id=safe_run_id, kind="create_private_space", extra={"target_space": target_space_id, "target_space_url": f"https://huggingface.co/spaces/{target_space_id}"}, ) def launch_pi_space_smoke_job( *, token: str, username: str, target_slug: str | None = None, pi_model: str | None = None, run_id: str | None = None, ) -> dict[str, Any]: """Launch the Phase 3 Job: install Pi, let Pi modify a minimal app, then create/validate a private Space.""" if not token: raise ValueError("Missing OAuth token. Please sign in with Hugging Face first.") safe_run_id = validate_run_id(run_id) if run_id else make_run_id("pi") target_space_id = normalize_target_space(username=username, target_slug=target_slug, run_id=safe_run_id) env = _base_env( run_id=safe_run_id, username=username, worker_script_b64=encoded_pi_space_worker_script(), ) env["TARGET_SPACE_ID"] = target_space_id env["PI_MODEL"] = (pi_model or "Qwen/Qwen3-Coder-Next").strip() job = _launch_job(token=token, env=env) return _job_result( job, run_id=safe_run_id, kind="pi_space_smoke", extra={ "target_space": target_space_id, "target_space_url": f"https://huggingface.co/spaces/{target_space_id}", "pi_model": env["PI_MODEL"], }, ) def launch_pi_gist_recipe_job( *, token: str, username: str, target_slug: str | None = None, pi_model: str | None = None, run_id: str | None = None, ) -> dict[str, Any]: """Launch the Phase 4 Job: Pi applies the HF Spaces gist recipe and uses hf CLI to create/validate a private Space.""" if not token: raise ValueError("Missing OAuth token. Please sign in with Hugging Face first.") safe_run_id = validate_run_id(run_id) if run_id else make_run_id("gist") target_space_id = normalize_target_space(username=username, target_slug=target_slug, run_id=safe_run_id) env = _base_env( run_id=safe_run_id, username=username, worker_script_b64=encoded_pi_gist_worker_script(), ) env["TARGET_SPACE_ID"] = target_space_id env["PI_MODEL"] = (pi_model or "Qwen/Qwen3-Coder-Next").strip() job = _launch_job(token=token, env=env) return _job_result( job, run_id=safe_run_id, kind="pi_gist_recipe", extra={ "target_space": target_space_id, "target_space_url": f"https://huggingface.co/spaces/{target_space_id}", "pi_model": env["PI_MODEL"], }, ) def launch_pi_model_card_job( *, token: str, username: str, target_slug: str | None = None, model_id: str | None = None, pi_model: str | None = None, run_id: str | None = None, ) -> dict[str, Any]: """Launch the Phase 5 Job: analyze a model card, let Pi adapt a template, create/validate a private Space.""" if not token: raise ValueError("Missing OAuth token. Please sign in with Hugging Face first.") safe_run_id = validate_run_id(run_id) if run_id else make_run_id("model") target_space_id = normalize_target_space(username=username, target_slug=target_slug, run_id=safe_run_id) clean_model_id = (model_id or "").strip().replace("https://huggingface.co/", "").strip("/") if "/" not in clean_model_id: raise ValueError("Model ID must look like owner/model-name.") env = _base_env( run_id=safe_run_id, username=username, worker_script_b64=encoded_pi_model_card_worker_script(), ) env["TARGET_SPACE_ID"] = target_space_id env["MODEL_ID"] = clean_model_id env["PI_MODEL"] = (pi_model or "Qwen/Qwen3-Coder-Next").strip() job = _launch_job(token=token, env=env) return _job_result( job, run_id=safe_run_id, kind="pi_model_card", extra={ "target_space": target_space_id, "target_space_url": f"https://huggingface.co/spaces/{target_space_id}", "model_id": clean_model_id, "pi_model": env["PI_MODEL"], }, ) def launch_runtime_recommender_job( *, token: str, username: str, model_id: str | None = None, run_id: str | None = None, ) -> dict[str, Any]: """Launch the Phase 6 Job: analyze model metadata and recommend safe runtime/hardware before building.""" if not token: raise ValueError("Missing OAuth token. Please sign in with Hugging Face first.") safe_run_id = validate_run_id(run_id) if run_id else make_run_id("runtime") clean_model_id = (model_id or "").strip().replace("https://huggingface.co/", "").strip("/") if "/" not in clean_model_id: raise ValueError("Model ID must look like owner/model-name.") env = _base_env( run_id=safe_run_id, username=username, worker_script_b64=encoded_runtime_recommender_worker_script(), ) env["MODEL_ID"] = clean_model_id job = _launch_job(token=token, env=env) return _job_result( job, run_id=safe_run_id, kind="runtime_recommender", extra={"model_id": clean_model_id}, ) def launch_longcat_article_job( *, token: str, username: str, target_slug: str | None = None, model_id: str | None = None, pi_model: str | None = None, preferred_space_hardware: str | None = None, fallback_space_hardware: str | None = None, allow_fixed_gpu_fallback: bool = True, implementation_mode: str | None = None, run_id: str | None = None, ) -> dict[str, Any]: """Launch Phase 9: strict LongCat full-inference gate with blocker reporting.""" if not token: raise ValueError("Missing OAuth token. Please sign in with Hugging Face first.") safe_run_id = validate_run_id(run_id) if run_id else make_run_id("longcat") target_space_id = normalize_target_space(username=username, target_slug=target_slug, run_id=safe_run_id) clean_model_id = (model_id or "meituan-longcat/LongCat-Video-Avatar-1.5").strip().replace("https://huggingface.co/", "").strip("/") if "/" not in clean_model_id: raise ValueError("Model ID must look like owner/model-name.") env = _base_env( run_id=safe_run_id, username=username, worker_script_b64=encoded_longcat_article_worker_script(), ) env["TARGET_SPACE_ID"] = target_space_id env["MODEL_ID"] = clean_model_id env["PI_MODEL"] = (pi_model or "Qwen/Qwen3-Coder-Next").strip() env["PREFERRED_SPACE_HARDWARE"] = (preferred_space_hardware or "zero-a10g").strip() env["FALLBACK_SPACE_HARDWARE"] = (fallback_space_hardware or "l40sx1").strip() env["ALLOW_FIXED_GPU_FALLBACK"] = "true" if allow_fixed_gpu_fallback else "false" env["IMPLEMENTATION_MODE"] = (implementation_mode or "full-inference-gated").strip() job = _launch_job(token=token, env=env, timeout="60m") return _job_result( job, run_id=safe_run_id, kind="longcat_full_inference_gate", extra={ "target_space": target_space_id, "target_space_url": f"https://huggingface.co/spaces/{target_space_id}", "model_id": clean_model_id, "pi_model": env["PI_MODEL"], "preferred_space_hardware": env["PREFERRED_SPACE_HARDWARE"], "fallback_space_hardware": env["FALLBACK_SPACE_HARDWARE"], "allow_fixed_gpu_fallback": allow_fixed_gpu_fallback, "implementation_mode": env["IMPLEMENTATION_MODE"], }, ) def launch_universal_model_card_job( *, token: str, username: str, target_slug: str | None = None, model_id: str | None = None, pi_model: str | None = None, preferred_space_hardware: str | None = None, fallback_space_hardware: str | None = None, allow_fixed_gpu_fallback: bool = True, implementation_mode: str | None = None, run_id: str | None = None, ) -> dict[str, Any]: """Launch Phase 10: universal model-card → private Space builder with Pi and gated validation.""" if not token: raise ValueError("Missing OAuth token. Please sign in with Hugging Face first.") safe_run_id = validate_run_id(run_id) if run_id else make_run_id("universal") target_space_id = normalize_target_space(username=username, target_slug=target_slug, run_id=safe_run_id) clean_model_id = (model_id or "").strip().replace("https://huggingface.co/", "").strip("/") if "/" not in clean_model_id: raise ValueError("Model ID must look like owner/model-name or a Hugging Face model URL.") env = _base_env( run_id=safe_run_id, username=username, worker_script_b64=encoded_universal_model_card_worker_script(), ) env["TARGET_SPACE_ID"] = target_space_id env["MODEL_ID"] = clean_model_id env["PI_MODEL"] = (pi_model or "Qwen/Qwen3-Coder-Next").strip() env["PREFERRED_SPACE_HARDWARE"] = (preferred_space_hardware or "cpu-basic").strip() env["FALLBACK_SPACE_HARDWARE"] = (fallback_space_hardware or "l40sx1").strip() env["ALLOW_FIXED_GPU_FALLBACK"] = "true" if allow_fixed_gpu_fallback else "false" env["IMPLEMENTATION_MODE"] = (implementation_mode or "full-inference-gated").strip() job = _launch_job(token=token, env=env, timeout="60m") return _job_result( job, run_id=safe_run_id, kind="universal_model_card_builder", extra={ "target_space": target_space_id, "target_space_url": f"https://huggingface.co/spaces/{target_space_id}", "model_id": clean_model_id, "pi_model": env["PI_MODEL"], "preferred_space_hardware": env["PREFERRED_SPACE_HARDWARE"], "fallback_space_hardware": env["FALLBACK_SPACE_HARDWARE"], "allow_fixed_gpu_fallback": allow_fixed_gpu_fallback, "implementation_mode": env["IMPLEMENTATION_MODE"], }, ) def inspect_job_safe(job_id: str, token: str | None = None) -> dict[str, Any]: if not job_id: return {"error": "Missing job_id"} try: info = inspect_job(job_id=job_id, token=token) status = getattr(info, "status", None) return { "id": info.id, "url": getattr(info, "url", None), "stage": getattr(status, "stage", None), "message": getattr(status, "message", None), "flavor": getattr(info, "flavor", None), "created_at": str(getattr(info, "created_at", "")), "started_at": str(getattr(info, "started_at", "")), "finished_at": str(getattr(info, "finished_at", "")), } except Exception as exc: # noqa: BLE001 return {"error": str(exc)} def fetch_recent_logs_safe(job_id: str, token: str | None = None, max_lines: int = 120) -> str: if not job_id: return "" try: logs = list(fetch_job_logs(job_id=job_id, token=token)) return "\n".join(str(line).rstrip("\n") for line in logs[-max_lines:]) except Exception as exc: # noqa: BLE001 return f"Could not fetch job logs: {exc}"