Robotics
PyTorch
Cosmos
xperience10m_task_baseline_suite
embodied-ai
multimodal
xperience-10m
baseline
evaluation
qwen3-omni
Instructions to use cy0307/ropedia-xperience-10m-task-baselines with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Cosmos
How to use cy0307/ropedia-xperience-10m-task-baselines with Cosmos:
# No code snippets available yet for this library. # To use this model, check the repository files and the library's documentation. # Want to help? PRs adding snippets are welcome at: # https://github.com/huggingface/huggingface.js
- Notebooks
- Google Colab
- Kaggle
| #!/usr/bin/env python3 | |
| """Validate that scored matrix rows agree with their JSON metric sources.""" | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import math | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any | |
| ROOT = Path(__file__).resolve().parents[1] | |
| DEFAULT_MATRIX = ROOT / "docs/data/task_method_20_result_matrix.json" | |
| DEFAULT_OUTPUT_JSON = ROOT / "docs/data/task_method_20_source_audit.json" | |
| DEFAULT_OUTPUT_MD = ROOT / "TASK_METHOD_20_SOURCE_AUDIT.md" | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description=__doc__) | |
| parser.add_argument("--matrix-json", type=Path, default=DEFAULT_MATRIX) | |
| parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT_JSON) | |
| parser.add_argument("--markdown-output", type=Path, default=DEFAULT_OUTPUT_MD) | |
| parser.add_argument("--relative-tolerance", type=float, default=1e-9) | |
| parser.add_argument("--absolute-tolerance", type=float, default=1e-12) | |
| return parser.parse_args() | |
| def read_json(path: Path) -> Any: | |
| return json.loads(path.read_text(encoding="utf-8")) | |
| def rel(path: Path) -> str: | |
| try: | |
| return path.relative_to(ROOT).as_posix() | |
| except ValueError: | |
| return path.as_posix() | |
| def resolve_source(source: str) -> Path: | |
| path = Path(source) | |
| return path if path.is_absolute() else ROOT / path | |
| def numeric(value: Any) -> float | None: | |
| if isinstance(value, bool) or not isinstance(value, (int, float)): | |
| return None | |
| return float(value) | |
| def check_record(record: dict[str, Any], args: argparse.Namespace) -> tuple[str, dict[str, Any] | None]: | |
| source = record.get("source") | |
| metric_key = record.get("metric_key") | |
| raw = numeric(record.get("raw")) | |
| base = { | |
| "task_id": record.get("task_id"), | |
| "task_number": record.get("task_number"), | |
| "series_id": record.get("series_id"), | |
| "method": record.get("method"), | |
| "metric_key": metric_key, | |
| "source": source, | |
| "raw": record.get("raw"), | |
| } | |
| if not record.get("scored"): | |
| return "unscored", None | |
| if raw is None or not metric_key or not source: | |
| return "skipped_non_numeric_or_missing_source", base | |
| source_path = resolve_source(str(source)) | |
| if not source_path.exists(): | |
| return "missing_source", {**base, "resolved_source": rel(source_path)} | |
| if source_path.suffix.lower() != ".json": | |
| return "skipped_non_json_source", base | |
| try: | |
| payload = read_json(source_path) | |
| except json.JSONDecodeError as exc: | |
| return "invalid_json_source", {**base, "resolved_source": rel(source_path), "error": str(exc)} | |
| source_key = str(metric_key) | |
| source_value = numeric(payload.get(source_key)) if isinstance(payload, dict) else None | |
| if source_value is None and isinstance(payload, dict): | |
| primary_metric = payload.get("primary_metric") | |
| primary_score = numeric(payload.get("primary_score")) | |
| if primary_score is not None and (primary_metric in {metric_key, None} or str(primary_metric or "") == str(metric_key)): | |
| source_key = "primary_score" | |
| source_value = primary_score | |
| elif primary_score is not None and "primary_score" in payload: | |
| source_key = "primary_score" | |
| source_value = primary_score | |
| if source_value is None: | |
| return "missing_metric_key", { | |
| **base, | |
| "resolved_source": rel(source_path), | |
| "available_numeric_keys": sorted( | |
| key for key, value in payload.items() if numeric(value) is not None | |
| ) | |
| if isinstance(payload, dict) | |
| else [], | |
| } | |
| if not math.isclose(raw, source_value, rel_tol=args.relative_tolerance, abs_tol=args.absolute_tolerance): | |
| return "value_mismatch", { | |
| **base, | |
| "resolved_source": rel(source_path), | |
| "source_value": source_value, | |
| "delta": raw - source_value, | |
| } | |
| return "checked", {**base, "resolved_source": rel(source_path), "source_key": source_key, "source_value": source_value} | |
| def build_report(args: argparse.Namespace) -> dict[str, Any]: | |
| matrix = read_json(args.matrix_json) | |
| records = matrix.get("records", []) | |
| checked: list[dict[str, Any]] = [] | |
| skipped: list[dict[str, Any]] = [] | |
| failures: list[dict[str, Any]] = [] | |
| status_counts: dict[str, int] = {} | |
| for record in records: | |
| status, detail = check_record(record, args) | |
| status_counts[status] = status_counts.get(status, 0) + 1 | |
| if detail is None: | |
| continue | |
| detail = {"status": status, **detail} | |
| if status == "checked": | |
| checked.append(detail) | |
| elif status.startswith("skipped"): | |
| skipped.append(detail) | |
| else: | |
| failures.append(detail) | |
| return { | |
| "title": "Task Method 20 Matrix Source Audit", | |
| "status": "pass" if not failures else "fail", | |
| "generated_at_utc": datetime.now(timezone.utc).isoformat(timespec="seconds"), | |
| "source_matrix": rel(args.matrix_json), | |
| "method_task_record_count": matrix.get("method_task_record_count"), | |
| "scored_method_task_count": matrix.get("scored_method_task_count"), | |
| "checked_json_metric_count": len(checked), | |
| "skipped_record_count": len(skipped), | |
| "failure_count": len(failures), | |
| "status_counts": dict(sorted(status_counts.items())), | |
| "failures": failures, | |
| "skipped_records": skipped[:100], | |
| "rule": ( | |
| "Every scored row that declares a JSON metric source must have the same " | |
| "numeric value under that row's metric_key." | |
| ), | |
| } | |
| def write_markdown(path: Path, report: dict[str, Any]) -> None: | |
| failures = report["failures"] | |
| lines = [ | |
| "# Task Method 20 Matrix Source Audit", | |
| "", | |
| f"Generated: `{report['generated_at_utc']}`", | |
| "", | |
| f"Status: **{report['status']}**", | |
| "", | |
| report["rule"], | |
| "", | |
| "## Summary", | |
| "", | |
| f"- Source matrix: `{report['source_matrix']}`", | |
| f"- Scored rows: `{report['scored_method_task_count']}/{report['method_task_record_count']}`", | |
| f"- JSON metric rows checked: `{report['checked_json_metric_count']}`", | |
| f"- Skipped non-JSON/non-numeric rows: `{report['skipped_record_count']}`", | |
| f"- Failures: `{report['failure_count']}`", | |
| "", | |
| ] | |
| if failures: | |
| lines.extend([ | |
| "## Failures", | |
| "", | |
| "| Method | Task | Metric | Matrix value | Source value | Source |", | |
| "| --- | --- | --- | ---: | ---: | --- |", | |
| ]) | |
| for row in failures: | |
| lines.append( | |
| "| " | |
| + " | ".join( | |
| [ | |
| str(row.get("series_id")), | |
| str(row.get("task_id")), | |
| str(row.get("metric_key")), | |
| str(row.get("raw")), | |
| str(row.get("source_value")), | |
| str(row.get("source")), | |
| ] | |
| ) | |
| + " |" | |
| ) | |
| else: | |
| lines.append("No JSON source/value mismatches were found.") | |
| path.write_text("\n".join(lines) + "\n", encoding="utf-8") | |
| def main() -> int: | |
| args = parse_args() | |
| report = build_report(args) | |
| args.output.parent.mkdir(parents=True, exist_ok=True) | |
| args.output.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8") | |
| write_markdown(args.markdown_output, report) | |
| print(f"{report['status'].upper()}: wrote {args.output}") | |
| print(f"{report['status'].upper()}: wrote {args.markdown_output}") | |
| return 0 if report["status"] == "pass" else 1 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |