| |
| """Audit a verified omni public package before publication updates.""" |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
| from backbone_registry import load_registry |
|
|
|
|
| FORBIDDEN_SUFFIXES = { |
| ".hdf5", |
| ".mp4", |
| ".mov", |
| ".rrd", |
| ".safetensors", |
| ".pt", |
| ".pth", |
| ".ckpt", |
| ".bin", |
| ".tar", |
| ".gz", |
| ".zip", |
| } |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| workspace_default = Path(__file__).resolve().parents[2] |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument("--workspace", type=Path, default=workspace_default) |
| parser.add_argument("--package-dir", type=Path, required=True) |
| parser.add_argument("--backbone", help="Expected backbone id. Defaults to verified_result_summary.json.") |
| parser.add_argument("--output", type=Path) |
| return parser.parse_args() |
|
|
|
|
| def read_json(path: Path) -> dict[str, Any]: |
| return json.loads(path.read_text(encoding="utf-8")) |
|
|
|
|
| def add_issue(issues: list[dict[str, str]], stage: str, message: str, severity: str = "error") -> None: |
| issues.append({"stage": stage, "severity": severity, "message": message}) |
|
|
|
|
| def forbidden_files(package_dir: Path) -> list[str]: |
| return [ |
| str(path.relative_to(package_dir)) |
| for path in package_dir.rglob("*") |
| if path.is_file() and path.suffix.lower() in FORBIDDEN_SUFFIXES |
| ] |
|
|
|
|
| def count_jsonl(path: Path) -> int: |
| with path.open("r", encoding="utf-8", errors="replace") as handle: |
| return sum(1 for line in handle if line.strip()) |
|
|
|
|
| def audit(args: argparse.Namespace) -> dict[str, Any]: |
| workspace = args.workspace.expanduser().resolve() |
| package_dir = args.package_dir.expanduser().resolve() |
| try: |
| package_label = package_dir.relative_to(workspace).as_posix() |
| except ValueError: |
| package_label = package_dir.name |
| summary_path = package_dir / "verified_result_summary.json" |
| issues: list[dict[str, str]] = [] |
|
|
| if not summary_path.exists(): |
| add_issue(issues, "summary", f"missing verified_result_summary.json: {summary_path}") |
| return {"status": "fail", "package_dir": package_label, "issues": issues} |
|
|
| summary = read_json(summary_path) |
| backbone_id = args.backbone or summary.get("backbone") |
| registry = load_registry(workspace / "configs" / "omni_backbones") |
| if backbone_id not in registry: |
| add_issue(issues, "backbone", f"unknown backbone: {backbone_id}") |
| backbone = {} |
| else: |
| backbone = registry[backbone_id] |
|
|
| if summary.get("status") != "verified": |
| add_issue(issues, "summary", f"package status is {summary.get('status')}, expected verified") |
| if summary.get("backbone") != backbone_id: |
| add_issue(issues, "summary", f"summary backbone is {summary.get('backbone')}, expected {backbone_id}") |
|
|
| eval_dir = package_dir / "eval" |
| required_eval_files = list((backbone.get("artifact_contract") or {}).get("required_eval_files", [])) |
| included_files = set(summary.get("included_files", [])) |
| for filename in required_eval_files: |
| rel = f"eval/{filename}" |
| path = eval_dir / filename |
| if not path.exists(): |
| add_issue(issues, "eval", f"missing required packaged eval file: {rel}") |
| if rel not in included_files: |
| add_issue(issues, "summary", f"required eval file missing from included_files: {rel}") |
|
|
| eval_summary = summary.get("eval") or {} |
| prediction_file = eval_summary.get("prediction_file") |
| if prediction_file: |
| prediction_path = eval_dir / str(prediction_file) |
| if not prediction_path.exists(): |
| add_issue(issues, "eval", f"prediction file missing: eval/{prediction_file}") |
| elif prediction_path.suffix == ".jsonl" and count_jsonl(prediction_path) <= 0: |
| add_issue(issues, "eval", f"prediction file has no rows: eval/{prediction_file}") |
| if int(eval_summary.get("prediction_rows") or 0) <= 0: |
| add_issue(issues, "summary", "prediction_rows is empty") |
| if int(eval_summary.get("held_out_episode_count") or eval_summary.get("num_eval_episodes") or 0) <= 0: |
| add_issue(issues, "summary", "held-out episode count is empty") |
|
|
| primary_metrics = eval_summary.get("primary_metrics") or {} |
| for metric in backbone.get("primary_metrics", []): |
| if metric not in primary_metrics: |
| add_issue(issues, "metrics", f"missing primary metric in summary: {metric}") |
| elif primary_metrics.get(metric) is None: |
| add_issue(issues, "metrics", f"primary metric is null: {metric}") |
|
|
| validation_path = package_dir / "validation" / "eval.json" |
| if not validation_path.exists(): |
| add_issue(issues, "validation", "missing validation/eval.json") |
| else: |
| validation = read_json(validation_path) |
| if validation.get("status") != "pass": |
| add_issue(issues, "validation", f"validation status is {validation.get('status')}, expected pass") |
|
|
| bad_files = forbidden_files(package_dir) |
| for rel in bad_files: |
| add_issue(issues, "public_safety", f"forbidden file in package: {rel}") |
|
|
| errors = [issue for issue in issues if issue["severity"] == "error"] |
| return { |
| "status": "pass" if not errors else "fail", |
| "package_dir": package_label, |
| "backbone": backbone_id, |
| "required_eval_files": required_eval_files, |
| "primary_metrics": sorted(primary_metrics), |
| "issues": issues, |
| } |
|
|
|
|
| def main() -> int: |
| args = parse_args() |
| payload = audit(args) |
| if args.output: |
| args.output.parent.mkdir(parents=True, exist_ok=True) |
| args.output.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") |
| print(json.dumps(payload, indent=2)) |
| return 0 if payload["status"] == "pass" else 1 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|