File size: 5,910 Bytes
627e5d7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python3
"""Audit a verified omni public package before publication updates."""

from __future__ import annotations

import argparse
import json
from pathlib import Path
from typing import Any

from backbone_registry import load_registry


FORBIDDEN_SUFFIXES = {
    ".hdf5",
    ".mp4",
    ".mov",
    ".rrd",
    ".safetensors",
    ".pt",
    ".pth",
    ".ckpt",
    ".bin",
    ".tar",
    ".gz",
    ".zip",
}


def parse_args() -> argparse.Namespace:
    workspace_default = Path(__file__).resolve().parents[2]
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument("--workspace", type=Path, default=workspace_default)
    parser.add_argument("--package-dir", type=Path, required=True)
    parser.add_argument("--backbone", help="Expected backbone id. Defaults to verified_result_summary.json.")
    parser.add_argument("--output", type=Path)
    return parser.parse_args()


def read_json(path: Path) -> dict[str, Any]:
    return json.loads(path.read_text(encoding="utf-8"))


def add_issue(issues: list[dict[str, str]], stage: str, message: str, severity: str = "error") -> None:
    issues.append({"stage": stage, "severity": severity, "message": message})


def forbidden_files(package_dir: Path) -> list[str]:
    return [
        str(path.relative_to(package_dir))
        for path in package_dir.rglob("*")
        if path.is_file() and path.suffix.lower() in FORBIDDEN_SUFFIXES
    ]


def count_jsonl(path: Path) -> int:
    with path.open("r", encoding="utf-8", errors="replace") as handle:
        return sum(1 for line in handle if line.strip())


def audit(args: argparse.Namespace) -> dict[str, Any]:
    workspace = args.workspace.expanduser().resolve()
    package_dir = args.package_dir.expanduser().resolve()
    try:
        package_label = package_dir.relative_to(workspace).as_posix()
    except ValueError:
        package_label = package_dir.name
    summary_path = package_dir / "verified_result_summary.json"
    issues: list[dict[str, str]] = []

    if not summary_path.exists():
        add_issue(issues, "summary", f"missing verified_result_summary.json: {summary_path}")
        return {"status": "fail", "package_dir": package_label, "issues": issues}

    summary = read_json(summary_path)
    backbone_id = args.backbone or summary.get("backbone")
    registry = load_registry(workspace / "configs" / "omni_backbones")
    if backbone_id not in registry:
        add_issue(issues, "backbone", f"unknown backbone: {backbone_id}")
        backbone = {}
    else:
        backbone = registry[backbone_id]

    if summary.get("status") != "verified":
        add_issue(issues, "summary", f"package status is {summary.get('status')}, expected verified")
    if summary.get("backbone") != backbone_id:
        add_issue(issues, "summary", f"summary backbone is {summary.get('backbone')}, expected {backbone_id}")

    eval_dir = package_dir / "eval"
    required_eval_files = list((backbone.get("artifact_contract") or {}).get("required_eval_files", []))
    included_files = set(summary.get("included_files", []))
    for filename in required_eval_files:
        rel = f"eval/{filename}"
        path = eval_dir / filename
        if not path.exists():
            add_issue(issues, "eval", f"missing required packaged eval file: {rel}")
        if rel not in included_files:
            add_issue(issues, "summary", f"required eval file missing from included_files: {rel}")

    eval_summary = summary.get("eval") or {}
    prediction_file = eval_summary.get("prediction_file")
    if prediction_file:
        prediction_path = eval_dir / str(prediction_file)
        if not prediction_path.exists():
            add_issue(issues, "eval", f"prediction file missing: eval/{prediction_file}")
        elif prediction_path.suffix == ".jsonl" and count_jsonl(prediction_path) <= 0:
            add_issue(issues, "eval", f"prediction file has no rows: eval/{prediction_file}")
    if int(eval_summary.get("prediction_rows") or 0) <= 0:
        add_issue(issues, "summary", "prediction_rows is empty")
    if int(eval_summary.get("held_out_episode_count") or eval_summary.get("num_eval_episodes") or 0) <= 0:
        add_issue(issues, "summary", "held-out episode count is empty")

    primary_metrics = eval_summary.get("primary_metrics") or {}
    for metric in backbone.get("primary_metrics", []):
        if metric not in primary_metrics:
            add_issue(issues, "metrics", f"missing primary metric in summary: {metric}")
        elif primary_metrics.get(metric) is None:
            add_issue(issues, "metrics", f"primary metric is null: {metric}")

    validation_path = package_dir / "validation" / "eval.json"
    if not validation_path.exists():
        add_issue(issues, "validation", "missing validation/eval.json")
    else:
        validation = read_json(validation_path)
        if validation.get("status") != "pass":
            add_issue(issues, "validation", f"validation status is {validation.get('status')}, expected pass")

    bad_files = forbidden_files(package_dir)
    for rel in bad_files:
        add_issue(issues, "public_safety", f"forbidden file in package: {rel}")

    errors = [issue for issue in issues if issue["severity"] == "error"]
    return {
        "status": "pass" if not errors else "fail",
        "package_dir": package_label,
        "backbone": backbone_id,
        "required_eval_files": required_eval_files,
        "primary_metrics": sorted(primary_metrics),
        "issues": issues,
    }


def main() -> int:
    args = parse_args()
    payload = audit(args)
    if args.output:
        args.output.parent.mkdir(parents=True, exist_ok=True)
        args.output.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
    print(json.dumps(payload, indent=2))
    return 0 if payload["status"] == "pass" else 1


if __name__ == "__main__":
    raise SystemExit(main())