File size: 5,489 Bytes
47bf6fa e520877 47bf6fa e520877 47bf6fa e520877 47bf6fa e520877 47bf6fa e520877 47bf6fa e520877 47bf6fa e520877 47bf6fa | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Any
from huggingface_hub import HfFileSystem, bucket_info, create_bucket
from .config import bucket_uri_from_source, normalize_bucket_name, settings, user_bucket_source
from .security import redact
@dataclass(frozen=True)
class RunPaths:
run_id: str
bucket_source: str
@property
def bucket_uri(self) -> str:
return bucket_uri_from_source(self.bucket_source)
@property
def root(self) -> str:
return f"{self.bucket_uri}/runs/{self.run_id}"
@property
def state(self) -> str:
return f"{self.root}/state.json"
@property
def events(self) -> str:
return f"{self.root}/events.jsonl"
@property
def report(self) -> str:
return f"{self.root}/report.md"
def _fs(token: str | None = None) -> HfFileSystem:
return HfFileSystem(token=token)
def check_user_bucket(*, username: str, bucket_name: str | None = None, token: str | None = None) -> dict[str, Any]:
"""Return bucket readiness for the signed-in user without creating resources."""
bucket_source = user_bucket_source(username=username, bucket_name=bucket_name)
bucket_uri = bucket_uri_from_source(bucket_source)
try:
info = bucket_info(bucket_source, token=token)
return {
"ok": True,
"exists": True,
"bucket_source": bucket_source,
"bucket_uri": bucket_uri,
"name": getattr(info, "name", normalize_bucket_name(bucket_name)),
"private": getattr(info, "private", None),
}
except Exception as exc: # noqa: BLE001 - report readable status in UI
error = str(exc)
not_found = any(marker in error.lower() for marker in ["404", "not found", "repository not found", "bucket not found"])
return {
"ok": False,
"exists": False if not_found else None,
"bucket_source": bucket_source,
"bucket_uri": bucket_uri,
"error": error,
}
def create_user_bucket(*, username: str, bucket_name: str | None = None, token: str | None = None) -> dict[str, Any]:
"""Create the signed-in user's private run bucket, then return readiness."""
bucket_source = user_bucket_source(username=username, bucket_name=bucket_name)
try:
url = create_bucket(bucket_source, private=True, exist_ok=True, token=token)
except Exception as exc: # noqa: BLE001
return {
"ok": False,
"exists": None,
"bucket_source": bucket_source,
"bucket_uri": bucket_uri_from_source(bucket_source),
"error": str(exc),
}
status = check_user_bucket(username=username, bucket_name=bucket_name, token=token)
status["created_or_existing"] = True
status["create_url"] = str(url)
return status
def assert_user_bucket_ready(*, username: str, bucket_name: str | None = None, token: str | None = None) -> dict[str, Any]:
"""Raise a clear error if the user's run bucket cannot be used."""
status = check_user_bucket(username=username, bucket_name=bucket_name, token=token)
if not status.get("ok"):
source = status.get("bucket_source") or user_bucket_source(username=username, bucket_name=bucket_name)
error = status.get("error") or "Bucket does not exist or is not accessible."
raise ValueError(
f"Run bucket `{source}` is not ready. Click 'Create private run bucket' first, "
f"or create it manually in Hugging Face Storage Buckets. Details: {error}"
)
return status
def read_text(path: str, token: str | None = None) -> str | None:
fs = _fs(token)
try:
with fs.open(path, "r") as f:
return f.read()
except FileNotFoundError:
return None
except Exception as exc: # noqa: BLE001 - surface readable error in UI
return f"[Could not read {path}: {exc}]"
def read_json(path: str, token: str | None = None) -> dict[str, Any] | None:
content = read_text(path, token=token)
if not content or content.startswith("[Could not read"):
return None
try:
return json.loads(content)
except json.JSONDecodeError:
return {"_error": "Invalid JSON", "raw": redact(content)}
def read_events(run_id: str, *, bucket_source: str, token: str | None = None) -> list[dict[str, Any]]:
paths = RunPaths(run_id, bucket_source=bucket_source)
content = read_text(paths.events, token=token)
if not content:
return []
events: list[dict[str, Any]] = []
for line in content.splitlines():
line = line.strip()
if not line:
continue
try:
events.append(json.loads(line))
except json.JSONDecodeError:
events.append({"step": "parse_events", "status": "warning", "message": redact(line)})
return events
def read_run_bundle(run_id: str, *, bucket_source: str, token: str | None = None) -> dict[str, Any]:
paths = RunPaths(run_id, bucket_source=bucket_source)
return {
"paths": {
"root": paths.root,
"state": paths.state,
"events": paths.events,
"report": paths.report,
},
"state": read_json(paths.state, token=token),
"events": read_events(run_id, bucket_source=bucket_source, token=token),
"report": redact(read_text(paths.report, token=token) or ""),
}
|