from __future__ import annotations import re from typing import Any from huggingface_hub import Volume, fetch_job_logs, inspect_job, run_job from .config import bucket_uri_from_source, user_bucket_source, settings from .bucket import assert_user_bucket_ready from .runs import make_run_id, utc_now_iso, validate_run_id from .worker_payload import ( encoded_universal_model_card_worker_script, encoded_validate_existing_space_worker_script, python_decode_and_run_command, ) SPACE_SLUG_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]{1,95}$") def _base_env(*, run_id: str, username: str, bucket_source: str, worker_script_b64: str) -> dict[str, str]: return { "RUN_ID": run_id, "HF_USERNAME": username or "unknown", "BUCKET_SOURCE": bucket_source, "OUTPUT_ROOT": settings.bucket_mount, "WORKER_SCRIPT_B64": worker_script_b64, "LAUNCHED_AT": utc_now_iso(), } def _launch_job(*, token: str, env: dict[str, str], bucket_source: str, flavor: str | None = None, timeout: str | None = None) -> Any: return run_job( image=settings.job_image, command=python_decode_and_run_command(), flavor=flavor or settings.job_flavor, timeout=timeout or settings.job_timeout, env=env, secrets={"HF_TOKEN": token}, volumes=[Volume(type="bucket", source=bucket_source, mount_path=settings.bucket_mount)], token=token, ) def _job_result(job: Any, *, run_id: str, kind: str, bucket_source: str, extra: dict[str, Any] | None = None) -> dict[str, Any]: payload: dict[str, Any] = { "run_id": run_id, "kind": kind, "job_id": job.id, "job_url": getattr(job, "url", None), "status": getattr(getattr(job, "status", None), "stage", None), "bucket_source": bucket_source, "bucket_uri": bucket_uri_from_source(bucket_source), } if extra: payload.update(extra) return payload def normalize_target_space(*, username: str, target_slug: str | None, run_id: str) -> str: """Return `username/slug`, constrained to the signed-in user's namespace.""" slug = (target_slug or "").strip() if not slug: slug = f"space-factory-{run_id}".lower()[:80] if "/" in slug: namespace, repo = slug.split("/", 1) if namespace != username: raise ValueError("The target Space must be created in your own namespace.") slug = repo if not SPACE_SLUG_RE.match(slug): raise ValueError("Invalid target Space name. Use letters, numbers, dots, underscores, or dashes.") return f"{username}/{slug}" def _clean_repo_id(value: str | None, *, repo_kind: str) -> str: cleaned = (value or "").strip() cleaned = cleaned.replace("https://huggingface.co/spaces/", "") cleaned = cleaned.replace("https://huggingface.co/", "") cleaned = cleaned.strip("/") if "/" not in cleaned: raise ValueError(f"{repo_kind} must look like owner/name or a Hugging Face URL.") return cleaned def launch_universal_model_card_job( *, token: str, username: str, target_slug: str | None = None, model_id: str | None = None, pi_model: str | None = None, preferred_space_hardware: str | None = None, fallback_space_hardware: str | None = None, allow_fixed_gpu_fallback: bool = True, implementation_mode: str | None = None, run_id: str | None = None, bucket_name: str | None = None, ) -> dict[str, Any]: """Launch the public product builder: model card → private Space attempt.""" if not token: raise ValueError("Missing OAuth token. Please sign in with Hugging Face first.") safe_run_id = validate_run_id(run_id) if run_id else make_run_id("universal") target_space_id = normalize_target_space(username=username, target_slug=target_slug, run_id=safe_run_id) clean_model_id = _clean_repo_id(model_id, repo_kind="Model ID") bucket_source = user_bucket_source(username=username, bucket_name=bucket_name) assert_user_bucket_ready(username=username, bucket_name=bucket_name, token=token) env = _base_env( run_id=safe_run_id, username=username, bucket_source=bucket_source, worker_script_b64=encoded_universal_model_card_worker_script(), ) env["TARGET_SPACE_ID"] = target_space_id env["MODEL_ID"] = clean_model_id env["PI_MODEL"] = (pi_model or "Qwen/Qwen3-Coder-Next").strip() env["PREFERRED_SPACE_HARDWARE"] = (preferred_space_hardware or "zero-a10g").strip() env["FALLBACK_SPACE_HARDWARE"] = (fallback_space_hardware or "l40sx1").strip() env["ALLOW_FIXED_GPU_FALLBACK"] = "true" if allow_fixed_gpu_fallback else "false" env["IMPLEMENTATION_MODE"] = (implementation_mode or "full-inference-gated").strip() job = _launch_job(token=token, env=env, bucket_source=bucket_source, timeout="60m") return _job_result( job, run_id=safe_run_id, kind="universal_model_card_builder", bucket_source=bucket_source, extra={ "target_space": target_space_id, "target_space_url": f"https://huggingface.co/spaces/{target_space_id}", "model_id": clean_model_id, "pi_model": env["PI_MODEL"], "preferred_space_hardware": env["PREFERRED_SPACE_HARDWARE"], "fallback_space_hardware": env["FALLBACK_SPACE_HARDWARE"], "allow_fixed_gpu_fallback": allow_fixed_gpu_fallback, "implementation_mode": env["IMPLEMENTATION_MODE"], }, ) def launch_validate_existing_space_job( *, token: str, username: str, target_space_id: str, api_name: str | None = None, test_args_json: str | None = None, test_kwargs_json: str | None = None, expected_output_type: str | None = None, live_timeout_seconds: int = 1800, run_id: str | None = None, bucket_name: str | None = None, ) -> dict[str, Any]: """Launch the public product validator for an existing generated Space.""" if not token: raise ValueError("Missing OAuth token. Please sign in with Hugging Face first.") safe_run_id = validate_run_id(run_id) if run_id else make_run_id("validate") target = _clean_repo_id(target_space_id, repo_kind="Target Space") namespace, _ = target.split("/", 1) if namespace != username: raise ValueError("For this version, target Space validation is limited to your own namespace.") bucket_source = user_bucket_source(username=username, bucket_name=bucket_name) assert_user_bucket_ready(username=username, bucket_name=bucket_name, token=token) env = _base_env( run_id=safe_run_id, username=username, bucket_source=bucket_source, worker_script_b64=encoded_validate_existing_space_worker_script(), ) env["TARGET_SPACE_ID"] = target env["API_NAME"] = (api_name or "/generate").strip() env["TEST_ARGS_JSON"] = (test_args_json or '["a cinematic robot cat astronaut, detailed, studio lighting"]').strip() env["TEST_KWARGS_JSON"] = (test_kwargs_json or "{}").strip() env["EXPECTED_OUTPUT_TYPE"] = (expected_output_type or "image").strip() env["LIVE_TIMEOUT_SECONDS"] = str(int(live_timeout_seconds or 1800)) job = _launch_job(token=token, env=env, bucket_source=bucket_source, timeout="60m") return _job_result( job, run_id=safe_run_id, kind="validate_existing_space", bucket_source=bucket_source, extra={ "target_space": target, "target_space_url": f"https://huggingface.co/spaces/{target}", "api_name": env["API_NAME"], "expected_output_type": env["EXPECTED_OUTPUT_TYPE"], "test_args_json": env["TEST_ARGS_JSON"], "test_kwargs_json": env["TEST_KWARGS_JSON"], }, ) def inspect_job_safe(job_id: str, token: str | None = None) -> dict[str, Any]: if not job_id: return {"error": "Missing job_id"} try: info = inspect_job(job_id=job_id, token=token) status = getattr(info, "status", None) return { "id": info.id, "url": getattr(info, "url", None), "stage": getattr(status, "stage", None), "message": getattr(status, "message", None), "flavor": getattr(info, "flavor", None), "created_at": str(getattr(info, "created_at", "")), "started_at": str(getattr(info, "started_at", "")), "finished_at": str(getattr(info, "finished_at", "")), } except Exception as exc: # noqa: BLE001 return {"error": str(exc)} def fetch_recent_logs_safe(job_id: str, token: str | None = None, max_lines: int = 120) -> str: if not job_id: return "" try: logs = list(fetch_job_logs(job_id=job_id, token=token)) return "\n".join(str(line).rstrip("\n") for line in logs[-max_lines:]) except Exception as exc: # noqa: BLE001 return f"Could not fetch job logs: {exc}"