| |
| """Validate that scored matrix rows agree with their JSON metric sources.""" |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import math |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any |
|
|
|
|
| ROOT = Path(__file__).resolve().parents[1] |
| DEFAULT_MATRIX = ROOT / "docs/data/task_method_20_result_matrix.json" |
| DEFAULT_OUTPUT_JSON = ROOT / "docs/data/task_method_20_source_audit.json" |
| DEFAULT_OUTPUT_MD = ROOT / "TASK_METHOD_20_SOURCE_AUDIT.md" |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument("--matrix-json", type=Path, default=DEFAULT_MATRIX) |
| parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT_JSON) |
| parser.add_argument("--markdown-output", type=Path, default=DEFAULT_OUTPUT_MD) |
| parser.add_argument("--relative-tolerance", type=float, default=1e-9) |
| parser.add_argument("--absolute-tolerance", type=float, default=1e-12) |
| return parser.parse_args() |
|
|
|
|
| def read_json(path: Path) -> Any: |
| return json.loads(path.read_text(encoding="utf-8")) |
|
|
|
|
| def rel(path: Path) -> str: |
| try: |
| return path.relative_to(ROOT).as_posix() |
| except ValueError: |
| return path.as_posix() |
|
|
|
|
| def resolve_source(source: str) -> Path: |
| path = Path(source) |
| return path if path.is_absolute() else ROOT / path |
|
|
|
|
| def numeric(value: Any) -> float | None: |
| if isinstance(value, bool) or not isinstance(value, (int, float)): |
| return None |
| return float(value) |
|
|
|
|
| def check_record(record: dict[str, Any], args: argparse.Namespace) -> tuple[str, dict[str, Any] | None]: |
| source = record.get("source") |
| metric_key = record.get("metric_key") |
| raw = numeric(record.get("raw")) |
| base = { |
| "task_id": record.get("task_id"), |
| "task_number": record.get("task_number"), |
| "series_id": record.get("series_id"), |
| "method": record.get("method"), |
| "metric_key": metric_key, |
| "source": source, |
| "raw": record.get("raw"), |
| } |
| if not record.get("scored"): |
| return "unscored", None |
| if raw is None or not metric_key or not source: |
| return "skipped_non_numeric_or_missing_source", base |
|
|
| source_path = resolve_source(str(source)) |
| if not source_path.exists(): |
| return "missing_source", {**base, "resolved_source": rel(source_path)} |
| if source_path.suffix.lower() != ".json": |
| return "skipped_non_json_source", base |
|
|
| try: |
| payload = read_json(source_path) |
| except json.JSONDecodeError as exc: |
| return "invalid_json_source", {**base, "resolved_source": rel(source_path), "error": str(exc)} |
| source_key = str(metric_key) |
| source_value = numeric(payload.get(source_key)) if isinstance(payload, dict) else None |
| if source_value is None and isinstance(payload, dict): |
| primary_metric = payload.get("primary_metric") |
| primary_score = numeric(payload.get("primary_score")) |
| if primary_score is not None and (primary_metric in {metric_key, None} or str(primary_metric or "") == str(metric_key)): |
| source_key = "primary_score" |
| source_value = primary_score |
| elif primary_score is not None and "primary_score" in payload: |
| source_key = "primary_score" |
| source_value = primary_score |
| if source_value is None: |
| return "missing_metric_key", { |
| **base, |
| "resolved_source": rel(source_path), |
| "available_numeric_keys": sorted( |
| key for key, value in payload.items() if numeric(value) is not None |
| ) |
| if isinstance(payload, dict) |
| else [], |
| } |
| if not math.isclose(raw, source_value, rel_tol=args.relative_tolerance, abs_tol=args.absolute_tolerance): |
| return "value_mismatch", { |
| **base, |
| "resolved_source": rel(source_path), |
| "source_value": source_value, |
| "delta": raw - source_value, |
| } |
| return "checked", {**base, "resolved_source": rel(source_path), "source_key": source_key, "source_value": source_value} |
|
|
|
|
| def build_report(args: argparse.Namespace) -> dict[str, Any]: |
| matrix = read_json(args.matrix_json) |
| records = matrix.get("records", []) |
| checked: list[dict[str, Any]] = [] |
| skipped: list[dict[str, Any]] = [] |
| failures: list[dict[str, Any]] = [] |
| status_counts: dict[str, int] = {} |
|
|
| for record in records: |
| status, detail = check_record(record, args) |
| status_counts[status] = status_counts.get(status, 0) + 1 |
| if detail is None: |
| continue |
| detail = {"status": status, **detail} |
| if status == "checked": |
| checked.append(detail) |
| elif status.startswith("skipped"): |
| skipped.append(detail) |
| else: |
| failures.append(detail) |
|
|
| return { |
| "title": "Task Method 20 Matrix Source Audit", |
| "status": "pass" if not failures else "fail", |
| "generated_at_utc": datetime.now(timezone.utc).isoformat(timespec="seconds"), |
| "source_matrix": rel(args.matrix_json), |
| "method_task_record_count": matrix.get("method_task_record_count"), |
| "scored_method_task_count": matrix.get("scored_method_task_count"), |
| "checked_json_metric_count": len(checked), |
| "skipped_record_count": len(skipped), |
| "failure_count": len(failures), |
| "status_counts": dict(sorted(status_counts.items())), |
| "failures": failures, |
| "skipped_records": skipped[:100], |
| "rule": ( |
| "Every scored row that declares a JSON metric source must have the same " |
| "numeric value under that row's metric_key." |
| ), |
| } |
|
|
|
|
| def write_markdown(path: Path, report: dict[str, Any]) -> None: |
| failures = report["failures"] |
| lines = [ |
| "# Task Method 20 Matrix Source Audit", |
| "", |
| f"Generated: `{report['generated_at_utc']}`", |
| "", |
| f"Status: **{report['status']}**", |
| "", |
| report["rule"], |
| "", |
| "## Summary", |
| "", |
| f"- Source matrix: `{report['source_matrix']}`", |
| f"- Scored rows: `{report['scored_method_task_count']}/{report['method_task_record_count']}`", |
| f"- JSON metric rows checked: `{report['checked_json_metric_count']}`", |
| f"- Skipped non-JSON/non-numeric rows: `{report['skipped_record_count']}`", |
| f"- Failures: `{report['failure_count']}`", |
| "", |
| ] |
| if failures: |
| lines.extend([ |
| "## Failures", |
| "", |
| "| Method | Task | Metric | Matrix value | Source value | Source |", |
| "| --- | --- | --- | ---: | ---: | --- |", |
| ]) |
| for row in failures: |
| lines.append( |
| "| " |
| + " | ".join( |
| [ |
| str(row.get("series_id")), |
| str(row.get("task_id")), |
| str(row.get("metric_key")), |
| str(row.get("raw")), |
| str(row.get("source_value")), |
| str(row.get("source")), |
| ] |
| ) |
| + " |" |
| ) |
| else: |
| lines.append("No JSON source/value mismatches were found.") |
| path.write_text("\n".join(lines) + "\n", encoding="utf-8") |
|
|
|
|
| def main() -> int: |
| args = parse_args() |
| report = build_report(args) |
| args.output.parent.mkdir(parents=True, exist_ok=True) |
| args.output.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8") |
| write_markdown(args.markdown_output, report) |
| print(f"{report['status'].upper()}: wrote {args.output}") |
| print(f"{report['status'].upper()}: wrote {args.markdown_output}") |
| return 0 if report["status"] == "pass" else 1 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|