| from __future__ import annotations |
|
|
| import json |
| from dataclasses import dataclass |
| from typing import Any |
|
|
| from huggingface_hub import HfFileSystem, bucket_info, create_bucket |
|
|
| from .config import bucket_uri_from_source, normalize_bucket_name, settings, user_bucket_source |
| from .security import redact |
|
|
|
|
| @dataclass(frozen=True) |
| class RunPaths: |
| run_id: str |
| bucket_source: str |
|
|
| @property |
| def bucket_uri(self) -> str: |
| return bucket_uri_from_source(self.bucket_source) |
|
|
| @property |
| def root(self) -> str: |
| return f"{self.bucket_uri}/runs/{self.run_id}" |
|
|
| @property |
| def state(self) -> str: |
| return f"{self.root}/state.json" |
|
|
| @property |
| def events(self) -> str: |
| return f"{self.root}/events.jsonl" |
|
|
| @property |
| def report(self) -> str: |
| return f"{self.root}/report.md" |
|
|
|
|
| def _fs(token: str | None = None) -> HfFileSystem: |
| return HfFileSystem(token=token) |
|
|
|
|
| def check_user_bucket(*, username: str, bucket_name: str | None = None, token: str | None = None) -> dict[str, Any]: |
| """Return bucket readiness for the signed-in user without creating resources.""" |
| bucket_source = user_bucket_source(username=username, bucket_name=bucket_name) |
| bucket_uri = bucket_uri_from_source(bucket_source) |
| try: |
| info = bucket_info(bucket_source, token=token) |
| return { |
| "ok": True, |
| "exists": True, |
| "bucket_source": bucket_source, |
| "bucket_uri": bucket_uri, |
| "name": getattr(info, "name", normalize_bucket_name(bucket_name)), |
| "private": getattr(info, "private", None), |
| } |
| except Exception as exc: |
| error = str(exc) |
| not_found = any(marker in error.lower() for marker in ["404", "not found", "repository not found", "bucket not found"]) |
| return { |
| "ok": False, |
| "exists": False if not_found else None, |
| "bucket_source": bucket_source, |
| "bucket_uri": bucket_uri, |
| "error": error, |
| } |
|
|
|
|
| def create_user_bucket(*, username: str, bucket_name: str | None = None, token: str | None = None) -> dict[str, Any]: |
| """Create the signed-in user's private run bucket, then return readiness.""" |
| bucket_source = user_bucket_source(username=username, bucket_name=bucket_name) |
| try: |
| url = create_bucket(bucket_source, private=True, exist_ok=True, token=token) |
| except Exception as exc: |
| return { |
| "ok": False, |
| "exists": None, |
| "bucket_source": bucket_source, |
| "bucket_uri": bucket_uri_from_source(bucket_source), |
| "error": str(exc), |
| } |
| status = check_user_bucket(username=username, bucket_name=bucket_name, token=token) |
| status["created_or_existing"] = True |
| status["create_url"] = str(url) |
| return status |
|
|
|
|
| def assert_user_bucket_ready(*, username: str, bucket_name: str | None = None, token: str | None = None) -> dict[str, Any]: |
| """Raise a clear error if the user's run bucket cannot be used.""" |
| status = check_user_bucket(username=username, bucket_name=bucket_name, token=token) |
| if not status.get("ok"): |
| source = status.get("bucket_source") or user_bucket_source(username=username, bucket_name=bucket_name) |
| error = status.get("error") or "Bucket does not exist or is not accessible." |
| raise ValueError( |
| f"Run bucket `{source}` is not ready. Click 'Create private run bucket' first, " |
| f"or create it manually in Hugging Face Storage Buckets. Details: {error}" |
| ) |
| return status |
|
|
|
|
| def read_text(path: str, token: str | None = None) -> str | None: |
| fs = _fs(token) |
| try: |
| with fs.open(path, "r") as f: |
| return f.read() |
| except FileNotFoundError: |
| return None |
| except Exception as exc: |
| return f"[Could not read {path}: {exc}]" |
|
|
|
|
| def read_json(path: str, token: str | None = None) -> dict[str, Any] | None: |
| content = read_text(path, token=token) |
| if not content or content.startswith("[Could not read"): |
| return None |
| try: |
| return json.loads(content) |
| except json.JSONDecodeError: |
| return {"_error": "Invalid JSON", "raw": redact(content)} |
|
|
|
|
| def read_events(run_id: str, *, bucket_source: str, token: str | None = None) -> list[dict[str, Any]]: |
| paths = RunPaths(run_id, bucket_source=bucket_source) |
| content = read_text(paths.events, token=token) |
| if not content: |
| return [] |
| events: list[dict[str, Any]] = [] |
| for line in content.splitlines(): |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| events.append(json.loads(line)) |
| except json.JSONDecodeError: |
| events.append({"step": "parse_events", "status": "warning", "message": redact(line)}) |
| return events |
|
|
|
|
| def read_run_bundle(run_id: str, *, bucket_source: str, token: str | None = None) -> dict[str, Any]: |
| paths = RunPaths(run_id, bucket_source=bucket_source) |
| return { |
| "paths": { |
| "root": paths.root, |
| "state": paths.state, |
| "events": paths.events, |
| "report": paths.report, |
| }, |
| "state": read_json(paths.state, token=token), |
| "events": read_events(run_id, bucket_source=bucket_source, token=token), |
| "report": redact(read_text(paths.report, token=token) or ""), |
| } |
|
|