from __future__ import annotations import json from dataclasses import dataclass from typing import Any from huggingface_hub import HfFileSystem, bucket_info, create_bucket from .config import bucket_uri_from_source, normalize_bucket_name, settings, user_bucket_source from .security import redact @dataclass(frozen=True) class RunPaths: run_id: str bucket_source: str @property def bucket_uri(self) -> str: return bucket_uri_from_source(self.bucket_source) @property def root(self) -> str: return f"{self.bucket_uri}/runs/{self.run_id}" @property def state(self) -> str: return f"{self.root}/state.json" @property def events(self) -> str: return f"{self.root}/events.jsonl" @property def report(self) -> str: return f"{self.root}/report.md" def _fs(token: str | None = None) -> HfFileSystem: return HfFileSystem(token=token) def check_user_bucket(*, username: str, bucket_name: str | None = None, token: str | None = None) -> dict[str, Any]: """Return bucket readiness for the signed-in user without creating resources.""" bucket_source = user_bucket_source(username=username, bucket_name=bucket_name) bucket_uri = bucket_uri_from_source(bucket_source) try: info = bucket_info(bucket_source, token=token) return { "ok": True, "exists": True, "bucket_source": bucket_source, "bucket_uri": bucket_uri, "name": getattr(info, "name", normalize_bucket_name(bucket_name)), "private": getattr(info, "private", None), } except Exception as exc: # noqa: BLE001 - report readable status in UI error = str(exc) not_found = any(marker in error.lower() for marker in ["404", "not found", "repository not found", "bucket not found"]) return { "ok": False, "exists": False if not_found else None, "bucket_source": bucket_source, "bucket_uri": bucket_uri, "error": error, } def create_user_bucket(*, username: str, bucket_name: str | None = None, token: str | None = None) -> dict[str, Any]: """Create the signed-in user's private run bucket, then return readiness.""" bucket_source = user_bucket_source(username=username, bucket_name=bucket_name) try: url = create_bucket(bucket_source, private=True, exist_ok=True, token=token) except Exception as exc: # noqa: BLE001 return { "ok": False, "exists": None, "bucket_source": bucket_source, "bucket_uri": bucket_uri_from_source(bucket_source), "error": str(exc), } status = check_user_bucket(username=username, bucket_name=bucket_name, token=token) status["created_or_existing"] = True status["create_url"] = str(url) return status def assert_user_bucket_ready(*, username: str, bucket_name: str | None = None, token: str | None = None) -> dict[str, Any]: """Raise a clear error if the user's run bucket cannot be used.""" status = check_user_bucket(username=username, bucket_name=bucket_name, token=token) if not status.get("ok"): source = status.get("bucket_source") or user_bucket_source(username=username, bucket_name=bucket_name) error = status.get("error") or "Bucket does not exist or is not accessible." raise ValueError( f"Run bucket `{source}` is not ready. Click 'Create private run bucket' first, " f"or create it manually in Hugging Face Storage Buckets. Details: {error}" ) return status def read_text(path: str, token: str | None = None) -> str | None: fs = _fs(token) try: with fs.open(path, "r") as f: return f.read() except FileNotFoundError: return None except Exception as exc: # noqa: BLE001 - surface readable error in UI return f"[Could not read {path}: {exc}]" def read_json(path: str, token: str | None = None) -> dict[str, Any] | None: content = read_text(path, token=token) if not content or content.startswith("[Could not read"): return None try: return json.loads(content) except json.JSONDecodeError: return {"_error": "Invalid JSON", "raw": redact(content)} def read_events(run_id: str, *, bucket_source: str, token: str | None = None) -> list[dict[str, Any]]: paths = RunPaths(run_id, bucket_source=bucket_source) content = read_text(paths.events, token=token) if not content: return [] events: list[dict[str, Any]] = [] for line in content.splitlines(): line = line.strip() if not line: continue try: events.append(json.loads(line)) except json.JSONDecodeError: events.append({"step": "parse_events", "status": "warning", "message": redact(line)}) return events def read_run_bundle(run_id: str, *, bucket_source: str, token: str | None = None) -> dict[str, Any]: paths = RunPaths(run_id, bucket_source=bucket_source) return { "paths": { "root": paths.root, "state": paths.state, "events": paths.events, "report": paths.report, }, "state": read_json(paths.state, token=token), "events": read_events(run_id, bucket_source=bucket_source, token=token), "report": redact(read_text(paths.report, token=token) or ""), }