File size: 5,489 Bytes
47bf6fa
 
 
 
 
 
e520877
47bf6fa
e520877
47bf6fa
 
 
 
 
 
e520877
 
 
 
 
47bf6fa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e520877
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47bf6fa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e520877
 
47bf6fa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e520877
 
47bf6fa
 
 
 
 
 
 
 
e520877
47bf6fa
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from __future__ import annotations

import json
from dataclasses import dataclass
from typing import Any

from huggingface_hub import HfFileSystem, bucket_info, create_bucket

from .config import bucket_uri_from_source, normalize_bucket_name, settings, user_bucket_source
from .security import redact


@dataclass(frozen=True)
class RunPaths:
    run_id: str
    bucket_source: str

    @property
    def bucket_uri(self) -> str:
        return bucket_uri_from_source(self.bucket_source)

    @property
    def root(self) -> str:
        return f"{self.bucket_uri}/runs/{self.run_id}"

    @property
    def state(self) -> str:
        return f"{self.root}/state.json"

    @property
    def events(self) -> str:
        return f"{self.root}/events.jsonl"

    @property
    def report(self) -> str:
        return f"{self.root}/report.md"


def _fs(token: str | None = None) -> HfFileSystem:
    return HfFileSystem(token=token)


def check_user_bucket(*, username: str, bucket_name: str | None = None, token: str | None = None) -> dict[str, Any]:
    """Return bucket readiness for the signed-in user without creating resources."""
    bucket_source = user_bucket_source(username=username, bucket_name=bucket_name)
    bucket_uri = bucket_uri_from_source(bucket_source)
    try:
        info = bucket_info(bucket_source, token=token)
        return {
            "ok": True,
            "exists": True,
            "bucket_source": bucket_source,
            "bucket_uri": bucket_uri,
            "name": getattr(info, "name", normalize_bucket_name(bucket_name)),
            "private": getattr(info, "private", None),
        }
    except Exception as exc:  # noqa: BLE001 - report readable status in UI
        error = str(exc)
        not_found = any(marker in error.lower() for marker in ["404", "not found", "repository not found", "bucket not found"])
        return {
            "ok": False,
            "exists": False if not_found else None,
            "bucket_source": bucket_source,
            "bucket_uri": bucket_uri,
            "error": error,
        }


def create_user_bucket(*, username: str, bucket_name: str | None = None, token: str | None = None) -> dict[str, Any]:
    """Create the signed-in user's private run bucket, then return readiness."""
    bucket_source = user_bucket_source(username=username, bucket_name=bucket_name)
    try:
        url = create_bucket(bucket_source, private=True, exist_ok=True, token=token)
    except Exception as exc:  # noqa: BLE001
        return {
            "ok": False,
            "exists": None,
            "bucket_source": bucket_source,
            "bucket_uri": bucket_uri_from_source(bucket_source),
            "error": str(exc),
        }
    status = check_user_bucket(username=username, bucket_name=bucket_name, token=token)
    status["created_or_existing"] = True
    status["create_url"] = str(url)
    return status


def assert_user_bucket_ready(*, username: str, bucket_name: str | None = None, token: str | None = None) -> dict[str, Any]:
    """Raise a clear error if the user's run bucket cannot be used."""
    status = check_user_bucket(username=username, bucket_name=bucket_name, token=token)
    if not status.get("ok"):
        source = status.get("bucket_source") or user_bucket_source(username=username, bucket_name=bucket_name)
        error = status.get("error") or "Bucket does not exist or is not accessible."
        raise ValueError(
            f"Run bucket `{source}` is not ready. Click 'Create private run bucket' first, "
            f"or create it manually in Hugging Face Storage Buckets. Details: {error}"
        )
    return status


def read_text(path: str, token: str | None = None) -> str | None:
    fs = _fs(token)
    try:
        with fs.open(path, "r") as f:
            return f.read()
    except FileNotFoundError:
        return None
    except Exception as exc:  # noqa: BLE001 - surface readable error in UI
        return f"[Could not read {path}: {exc}]"


def read_json(path: str, token: str | None = None) -> dict[str, Any] | None:
    content = read_text(path, token=token)
    if not content or content.startswith("[Could not read"):
        return None
    try:
        return json.loads(content)
    except json.JSONDecodeError:
        return {"_error": "Invalid JSON", "raw": redact(content)}


def read_events(run_id: str, *, bucket_source: str, token: str | None = None) -> list[dict[str, Any]]:
    paths = RunPaths(run_id, bucket_source=bucket_source)
    content = read_text(paths.events, token=token)
    if not content:
        return []
    events: list[dict[str, Any]] = []
    for line in content.splitlines():
        line = line.strip()
        if not line:
            continue
        try:
            events.append(json.loads(line))
        except json.JSONDecodeError:
            events.append({"step": "parse_events", "status": "warning", "message": redact(line)})
    return events


def read_run_bundle(run_id: str, *, bucket_source: str, token: str | None = None) -> dict[str, Any]:
    paths = RunPaths(run_id, bucket_source=bucket_source)
    return {
        "paths": {
            "root": paths.root,
            "state": paths.state,
            "events": paths.events,
            "report": paths.report,
        },
        "state": read_json(paths.state, token=token),
        "events": read_events(run_id, bucket_source=bucket_source, token=token),
        "report": redact(read_text(paths.report, token=token) or ""),
    }