fffiloni's picture
Upload 6 files
e520877 verified
Raw
History Blame
5.49 kB
from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Any
from huggingface_hub import HfFileSystem, bucket_info, create_bucket
from .config import bucket_uri_from_source, normalize_bucket_name, settings, user_bucket_source
from .security import redact
@dataclass(frozen=True)
class RunPaths:
run_id: str
bucket_source: str
@property
def bucket_uri(self) -> str:
return bucket_uri_from_source(self.bucket_source)
@property
def root(self) -> str:
return f"{self.bucket_uri}/runs/{self.run_id}"
@property
def state(self) -> str:
return f"{self.root}/state.json"
@property
def events(self) -> str:
return f"{self.root}/events.jsonl"
@property
def report(self) -> str:
return f"{self.root}/report.md"
def _fs(token: str | None = None) -> HfFileSystem:
return HfFileSystem(token=token)
def check_user_bucket(*, username: str, bucket_name: str | None = None, token: str | None = None) -> dict[str, Any]:
"""Return bucket readiness for the signed-in user without creating resources."""
bucket_source = user_bucket_source(username=username, bucket_name=bucket_name)
bucket_uri = bucket_uri_from_source(bucket_source)
try:
info = bucket_info(bucket_source, token=token)
return {
"ok": True,
"exists": True,
"bucket_source": bucket_source,
"bucket_uri": bucket_uri,
"name": getattr(info, "name", normalize_bucket_name(bucket_name)),
"private": getattr(info, "private", None),
}
except Exception as exc: # noqa: BLE001 - report readable status in UI
error = str(exc)
not_found = any(marker in error.lower() for marker in ["404", "not found", "repository not found", "bucket not found"])
return {
"ok": False,
"exists": False if not_found else None,
"bucket_source": bucket_source,
"bucket_uri": bucket_uri,
"error": error,
}
def create_user_bucket(*, username: str, bucket_name: str | None = None, token: str | None = None) -> dict[str, Any]:
"""Create the signed-in user's private run bucket, then return readiness."""
bucket_source = user_bucket_source(username=username, bucket_name=bucket_name)
try:
url = create_bucket(bucket_source, private=True, exist_ok=True, token=token)
except Exception as exc: # noqa: BLE001
return {
"ok": False,
"exists": None,
"bucket_source": bucket_source,
"bucket_uri": bucket_uri_from_source(bucket_source),
"error": str(exc),
}
status = check_user_bucket(username=username, bucket_name=bucket_name, token=token)
status["created_or_existing"] = True
status["create_url"] = str(url)
return status
def assert_user_bucket_ready(*, username: str, bucket_name: str | None = None, token: str | None = None) -> dict[str, Any]:
"""Raise a clear error if the user's run bucket cannot be used."""
status = check_user_bucket(username=username, bucket_name=bucket_name, token=token)
if not status.get("ok"):
source = status.get("bucket_source") or user_bucket_source(username=username, bucket_name=bucket_name)
error = status.get("error") or "Bucket does not exist or is not accessible."
raise ValueError(
f"Run bucket `{source}` is not ready. Click 'Create private run bucket' first, "
f"or create it manually in Hugging Face Storage Buckets. Details: {error}"
)
return status
def read_text(path: str, token: str | None = None) -> str | None:
fs = _fs(token)
try:
with fs.open(path, "r") as f:
return f.read()
except FileNotFoundError:
return None
except Exception as exc: # noqa: BLE001 - surface readable error in UI
return f"[Could not read {path}: {exc}]"
def read_json(path: str, token: str | None = None) -> dict[str, Any] | None:
content = read_text(path, token=token)
if not content or content.startswith("[Could not read"):
return None
try:
return json.loads(content)
except json.JSONDecodeError:
return {"_error": "Invalid JSON", "raw": redact(content)}
def read_events(run_id: str, *, bucket_source: str, token: str | None = None) -> list[dict[str, Any]]:
paths = RunPaths(run_id, bucket_source=bucket_source)
content = read_text(paths.events, token=token)
if not content:
return []
events: list[dict[str, Any]] = []
for line in content.splitlines():
line = line.strip()
if not line:
continue
try:
events.append(json.loads(line))
except json.JSONDecodeError:
events.append({"step": "parse_events", "status": "warning", "message": redact(line)})
return events
def read_run_bundle(run_id: str, *, bucket_source: str, token: str | None = None) -> dict[str, Any]:
paths = RunPaths(run_id, bucket_source=bucket_source)
return {
"paths": {
"root": paths.root,
"state": paths.state,
"events": paths.events,
"report": paths.report,
},
"state": read_json(paths.state, token=token),
"events": read_events(run_id, bucket_source=bucket_source, token=token),
"report": redact(read_text(paths.report, token=token) or ""),
}