ropedia-xperience-10m-task-baselines / scripts /omni /audit_verified_omni_package.py
cy0307's picture
Update final Qwen model scripts
627e5d7 verified
Raw
History Blame
5.91 kB
#!/usr/bin/env python3
"""Audit a verified omni public package before publication updates."""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
from backbone_registry import load_registry
FORBIDDEN_SUFFIXES = {
".hdf5",
".mp4",
".mov",
".rrd",
".safetensors",
".pt",
".pth",
".ckpt",
".bin",
".tar",
".gz",
".zip",
}
def parse_args() -> argparse.Namespace:
workspace_default = Path(__file__).resolve().parents[2]
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--workspace", type=Path, default=workspace_default)
parser.add_argument("--package-dir", type=Path, required=True)
parser.add_argument("--backbone", help="Expected backbone id. Defaults to verified_result_summary.json.")
parser.add_argument("--output", type=Path)
return parser.parse_args()
def read_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def add_issue(issues: list[dict[str, str]], stage: str, message: str, severity: str = "error") -> None:
issues.append({"stage": stage, "severity": severity, "message": message})
def forbidden_files(package_dir: Path) -> list[str]:
return [
str(path.relative_to(package_dir))
for path in package_dir.rglob("*")
if path.is_file() and path.suffix.lower() in FORBIDDEN_SUFFIXES
]
def count_jsonl(path: Path) -> int:
with path.open("r", encoding="utf-8", errors="replace") as handle:
return sum(1 for line in handle if line.strip())
def audit(args: argparse.Namespace) -> dict[str, Any]:
workspace = args.workspace.expanduser().resolve()
package_dir = args.package_dir.expanduser().resolve()
try:
package_label = package_dir.relative_to(workspace).as_posix()
except ValueError:
package_label = package_dir.name
summary_path = package_dir / "verified_result_summary.json"
issues: list[dict[str, str]] = []
if not summary_path.exists():
add_issue(issues, "summary", f"missing verified_result_summary.json: {summary_path}")
return {"status": "fail", "package_dir": package_label, "issues": issues}
summary = read_json(summary_path)
backbone_id = args.backbone or summary.get("backbone")
registry = load_registry(workspace / "configs" / "omni_backbones")
if backbone_id not in registry:
add_issue(issues, "backbone", f"unknown backbone: {backbone_id}")
backbone = {}
else:
backbone = registry[backbone_id]
if summary.get("status") != "verified":
add_issue(issues, "summary", f"package status is {summary.get('status')}, expected verified")
if summary.get("backbone") != backbone_id:
add_issue(issues, "summary", f"summary backbone is {summary.get('backbone')}, expected {backbone_id}")
eval_dir = package_dir / "eval"
required_eval_files = list((backbone.get("artifact_contract") or {}).get("required_eval_files", []))
included_files = set(summary.get("included_files", []))
for filename in required_eval_files:
rel = f"eval/{filename}"
path = eval_dir / filename
if not path.exists():
add_issue(issues, "eval", f"missing required packaged eval file: {rel}")
if rel not in included_files:
add_issue(issues, "summary", f"required eval file missing from included_files: {rel}")
eval_summary = summary.get("eval") or {}
prediction_file = eval_summary.get("prediction_file")
if prediction_file:
prediction_path = eval_dir / str(prediction_file)
if not prediction_path.exists():
add_issue(issues, "eval", f"prediction file missing: eval/{prediction_file}")
elif prediction_path.suffix == ".jsonl" and count_jsonl(prediction_path) <= 0:
add_issue(issues, "eval", f"prediction file has no rows: eval/{prediction_file}")
if int(eval_summary.get("prediction_rows") or 0) <= 0:
add_issue(issues, "summary", "prediction_rows is empty")
if int(eval_summary.get("held_out_episode_count") or eval_summary.get("num_eval_episodes") or 0) <= 0:
add_issue(issues, "summary", "held-out episode count is empty")
primary_metrics = eval_summary.get("primary_metrics") or {}
for metric in backbone.get("primary_metrics", []):
if metric not in primary_metrics:
add_issue(issues, "metrics", f"missing primary metric in summary: {metric}")
elif primary_metrics.get(metric) is None:
add_issue(issues, "metrics", f"primary metric is null: {metric}")
validation_path = package_dir / "validation" / "eval.json"
if not validation_path.exists():
add_issue(issues, "validation", "missing validation/eval.json")
else:
validation = read_json(validation_path)
if validation.get("status") != "pass":
add_issue(issues, "validation", f"validation status is {validation.get('status')}, expected pass")
bad_files = forbidden_files(package_dir)
for rel in bad_files:
add_issue(issues, "public_safety", f"forbidden file in package: {rel}")
errors = [issue for issue in issues if issue["severity"] == "error"]
return {
"status": "pass" if not errors else "fail",
"package_dir": package_label,
"backbone": backbone_id,
"required_eval_files": required_eval_files,
"primary_metrics": sorted(primary_metrics),
"issues": issues,
}
def main() -> int:
args = parse_args()
payload = audit(args)
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
print(json.dumps(payload, indent=2))
return 0 if payload["status"] == "pass" else 1
if __name__ == "__main__":
raise SystemExit(main())