from __future__ import annotations import argparse import csv import json from collections import Counter from datetime import UTC, datetime from pathlib import Path try: from eval.runners.common_memory_client import get_memory_test_client except ModuleNotFoundError: from common_memory_client import get_memory_test_client DEFAULT_INPUT = "output/governance_policy_cases.csv" DEFAULT_JSON_OUT = "eval/dashboards/governance_policy_eval_summary.json" DEFAULT_MD_OUT = "eval/dashboards/governance_policy_eval_summary.md" ROLE_TO_AUDIENCE = { "Sales_Rep": "HCP", "Medical_Science_Liaison": "HCP", "Patient_Support": "Patient", "Internal_Medical_Reviewer": "Internal", "Compliance_Lead": "Internal", "Medical_Information_Specialist": "HCP", "Pharmacovigilance_User": "Internal", "Regional_Medical_Manager": "Internal", } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run governance policy evaluation against the local memory API.") parser.add_argument("--input", default=DEFAULT_INPUT) parser.add_argument("--json-out", default=DEFAULT_JSON_OUT) parser.add_argument("--md-out", default=DEFAULT_MD_OUT) parser.add_argument("--limit", type=int, default=0) return parser.parse_args() def load_rows(path: Path, limit: int) -> list[dict]: rows = [] with path.open(newline="", encoding="utf-8") as handle: reader = csv.DictReader(handle) for index, row in enumerate(reader, start=1): rows.append(row) if limit and index >= limit: break return rows def normalize_therapy(value: str) -> str: return "NSCLC" if "nsclc" in value.lower() else value def normalize_geography(value: str) -> str: country = value.lower() if country in {"germany", "france", "italy", "spain", "netherlands", "sweden", "belgium", "portugal"}: return "EU / EMA" return "EU / EMA" if "eu" in country else value def expected_decision(row: dict) -> str: access_allowed = row["access_allowed"].lower() == "true" if not access_allowed: return "deny_no_sources" if row["expected_routing_path"] == "fast_path": return "allow" return "route_sme_review" def evaluate_rows(rows: list[dict]) -> dict: client = get_memory_test_client() decisions = Counter() matches = 0 failures = [] for row in rows: audience = ROLE_TO_AUDIENCE.get(row["user_role"], row["audience"]) response = client.post( "/memory/search", json={ "question": row["question_text"], "user_role": row["user_role"], "audience": audience, "geography": normalize_geography(row["user_geography"]), "therapy_area": normalize_therapy(row["therapy_area"]), "max_sources": 5, }, ) response.raise_for_status() body = response.json() decisions[body["decision"]] += 1 expected = expected_decision(row) if body["decision"] == expected: matches += 1 else: failures.append( { "id": row["id"], "expected": expected, "actual": body["decision"], "role": row["user_role"], "risk_category": row["risk_category"], } ) total = len(rows) or 1 return { "generated_at": datetime.now(UTC).isoformat(), "dataset": "governance_policy_cases.csv", "rows_evaluated": len(rows), "decision_counts": dict(decisions), "overall": { "routing_accuracy": round(matches / total, 4), }, "sample_failures": failures[:25], } def write_markdown(summary: dict, path: Path) -> None: lines = [ "# Governance Policy Eval Summary", "", f"- Generated at: `{summary['generated_at']}`", f"- Rows evaluated: `{summary['rows_evaluated']}`", "", "## Overall", "", f"- Routing accuracy: `{summary['overall']['routing_accuracy']}`", "", "## Decision Counts", "", ] for key, value in summary["decision_counts"].items(): lines.append(f"- `{key}`: `{value}`") if summary["sample_failures"]: lines.extend(["", "## Sample Failures", ""]) for failure in summary["sample_failures"][:10]: lines.append(f"- `{failure['id']}` expected=`{failure['expected']}` actual=`{failure['actual']}` role=`{failure['role']}`") path.parent.mkdir(parents=True, exist_ok=True) path.write_text("\n".join(lines) + "\n", encoding="utf-8") def main() -> None: args = parse_args() rows = load_rows(Path(args.input), args.limit) summary = evaluate_rows(rows) json_out = Path(args.json_out) md_out = Path(args.md_out) json_out.parent.mkdir(parents=True, exist_ok=True) json_out.write_text(json.dumps(summary, indent=2), encoding="utf-8") write_markdown(summary, md_out) print(f"Wrote JSON summary to {json_out}") print(f"Wrote Markdown summary to {md_out}") if __name__ == "__main__": main()