"""Register eval-pack sources, generate corpus text, and ingest into Postgres. Aligns the knowledge base with ``generate_nsclc_eval_pack.py`` SOURCE_SET (14 sources). Does **not** modify integration-test fixtures (*-RET-*, *-TEST-*). Steps (default: all): 1. Write ``data/eval_corpus/*.txt`` + ``manifest.json`` 2. Register ``sources`` / ``source_versions`` in Postgres 3. Run chunk + embed ingest via ``run_ingest_pipeline.py`` Usage: python3 scripts/setup_eval_corpus.py python3 scripts/setup_eval_corpus.py --no-embed python3 scripts/setup_eval_corpus.py --write-only """ from __future__ import annotations import argparse import json import os import subprocess import sys from dataclasses import dataclass from datetime import UTC, date, datetime from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent CORPUS_DIR = REPO_ROOT / "data" / "eval_corpus" MANIFEST_PATH = CORPUS_DIR / "manifest.json" _env_file = REPO_ROOT / ".env" if _env_file.exists(): for raw_line in _env_file.read_text().splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) os.environ.setdefault(key.strip(), value.strip()) import psycopg # noqa: E402 DSN = os.getenv( "AKS_DATABASE_URL", "postgresql+psycopg://mobcoderid-296@localhost/ai_knowledge_spine", ).replace("postgresql+psycopg://", "postgresql://", 1) NOW = datetime.now(UTC) TODAY = date.today() THERAPY = "NSCLC" GEO = "EU / EMA" @dataclass(frozen=True) class SourceSpec: source_id: str version_id: str source_class: str # DB enum name: LBL, DOC_CSR, SOP_MED, ... title: str molecule: str | None sensitivity: str # EXTERNAL | INTERNAL_ONLY audience: list[str] def _molecule_from_id(source_id: str) -> str | None: if "DRUGA" in source_id or source_id.endswith("DRUGA-2024"): return "DRUG-A" if "DRUGB" in source_id: return "DRUG-B" if "DRUGC" in source_id: return "DRUG-C" return None def _class_from_prefix(source_id: str) -> str: if source_id.startswith("LBL-"): return "LBL" if source_id.startswith("DOC-CSR-"): return "DOC_CSR" if source_id.startswith("SOP-MED-"): return "SOP_MED" if source_id.startswith("GDL-"): return "GDL" if source_id.startswith("RMP-"): return "RMP" if source_id.startswith("PK-SUMMARY-"): return "PK_SUMMARY" if source_id.startswith("MI-FAQ-"): return "MI_FAQ" if source_id.startswith("MED-AFF-"): return "MED_AFF" if source_id.startswith("TREATMENT-ALGO-"): return "TREATMENT_ALGO" if source_id.startswith("SME-NOTE-"): return "SME_NOTE" return "LBL" def catalog() -> list[SourceSpec]: specs: list[SourceSpec] = [] for source_id in [ "DOC-CSR-NSCLC-001", "DOC-CSR-NSCLC-014", "SOP-MED-NSCLC-010", "SOP-MED-NSCLC-022", "GDL-NSCLC-2025-03", "LBL-NSCLC-DRUGA-EMA-2024", "LBL-NSCLC-DRUGB-EMA-2023", "LBL-NSCLC-DRUGC-EMA-2024", "MI-FAQ-NSCLC-021", "MED-AFF-NSCLC-PLAYBOOK-008", "RMP-NSCLC-DRUGA-2024", "SME-NOTE-NSCLC-017", "PK-SUMMARY-NSCLC-005", "TREATMENT-ALGO-NSCLC-2025-02", ]: slug = source_id.lower().replace("/", "-")[:40] version_id = f"ver-{slug}-1" mol = _molecule_from_id(source_id) cls = _class_from_prefix(source_id) internal = cls in {"SOP_MED", "MED_AFF", "TREATMENT_ALGO", "SME_NOTE"} specs.append( SourceSpec( source_id=source_id, version_id=version_id, source_class=cls, title=f"Synthetic eval corpus — {source_id}", molecule=mol, sensitivity="INTERNAL_ONLY" if internal else "EXTERNAL", audience=["Internal"] if internal and cls != "SOP_MED" else ["HCP", "Internal"], ) ) return specs def _drug_label(mol: str | None) -> str: return mol or "the authorised product" def generate_document(spec: SourceSpec) -> str: drug = _drug_label(spec.molecule) pages: list[str] = [] p = 1 def section(heading: str, paragraphs: list[str]) -> None: nonlocal p, pages pages.append(f"[[PAGE:{p}]]") pages.append(heading) pages.extend(paragraphs) p += 1 if spec.source_class == "LBL": section( "1 INDICATIONS AND USAGE", [ f"{drug} is indicated as monotherapy for adults with locally advanced or metastatic " "non-small cell lung cancer (NSCLC) harbouring activating EGFR mutations in the " "first-line setting under the approved EU label.", "Use outside EGFR-positive first-line metastatic NSCLC is not authorised. " "Adjuvant or post-resection use must not be presented as approved.", ], ) section( "2 POSOLOGY AND METHOD OF ADMINISTRATION", [ f"The recommended dose of {drug} is 80 mg once daily, orally, with or without food. " "Treatment continues until disease progression or unacceptable toxicity.", "Dose reduction to 40 mg once daily is permitted only within approved EU label " "boundaries for documented toxicity. Missed doses must not be doubled.", ], ) section( "4 CONTRAINDICATIONS", [ f"{drug} is contraindicated in patients with hypersensitivity to the active substance " "or excipients.", ], ) section( "4.4 SPECIAL WARNINGS AND PRECAUTIONS FOR USE", [ "Monitor for interstitial lung disease (ILD): new dyspnoea, cough, or fever require " "urgent assessment. Grade 3 or higher ILD requires permanent discontinuation.", "Baseline and periodic hepatic function and QT interval assessment is recommended. " "Use caution with QT-prolonging co-medications.", ], ) section( "4.8 UNDESIRABLE EFFECTS", [ "Common adverse reactions include rash, diarrhoea, paronychia, stomatitis, and " "decreased appetite. Serious reactions include ILD and severe cutaneous adverse events.", ], ) elif spec.source_class == "DOC_CSR": section( "OBJECTIVE", [ f"This clinical study report evaluates efficacy and safety of {drug} versus " "standard-of-care chemotherapy in treatment-naïve EGFR-positive metastatic NSCLC.", ], ) section( "ENDPOINTS", [ "Primary endpoint: progression-free survival by blinded independent central review. " "Secondary: overall survival, objective response rate (RECIST 1.1), duration of response, " "and treatment-emergent adverse events.", ], ) section( "RESULTS", [ f"{drug} improved progression-free survival in EGFR-positive NSCLC versus chemotherapy " "with a clinically meaningful hazard ratio favouring study treatment.", f"Overall response rate and duration of response were higher in the {drug} arm. " "Safety was consistent with EGFR-targeted therapy including ILD and QT prolongation.", ], ) section( "LIMITATIONS", [ "Population restricted to confirmed EGFR activating mutations. " "Findings must not be extrapolated beyond approved EU label scope.", ], ) elif spec.source_class == "SOP_MED": section( "PURPOSE", [ f"Govern medical information responses for {drug} in EU NSCLC, defining on-label " "versus medical affairs review boundaries.", ], ) section( "DOSING GUIDANCE", [ f"On-label dosing inquiries use approved EU label content: 80 mg once daily first-line " f"metastatic NSCLC for {drug}. Dose reductions must remain within approved EU label boundaries.", "Inquiries probing off-label dosing or regimens route to SME review.", ], ) section( "MEDICAL RESPONSE RULES", [ "Label is primary for indication, dose, and contraindications. " "Conflicts resolve in favour of the label. Low-confidence or policy-sensitive items route to SME.", ], ) elif spec.source_class == "GDL": section( "RECOMMENDATIONS", [ f"For EGFR-positive metastatic NSCLC, {drug} may be considered in first-line per " "current EU practice when aligned with the approved label.", ], ) section( "BIOMARKER TESTING", [ "Validated EGFR mutation testing should be completed before treatment selection. " "Later-line mutation-specific decisions require label alignment.", ], ) section( "FIRST-LINE THERAPY", [ "Separate labeled first-line metastatic use from adjuvant or post-resection settings. " "Do not imply non-labeled lines are approved.", ], ) elif spec.source_class == "RMP": section( "IMPORTANT IDENTIFIED RISKS", [ f"For {drug}, important risks include interstitial lung disease, QT prolongation, " "hepatotoxicity, and severe cutaneous adverse reactions.", ], ) section( "PHARMACOVIGILANCE MEASURES", [ "Healthcare professionals should report suspected adverse reactions per local requirements. " "ILD symptoms require prompt evaluation and label-concordant management.", ], ) elif spec.source_class == "PK_SUMMARY": section( "DOSE-EXPOSURE RELATIONSHIP", [ f"{drug} 80 mg once daily achieves target exposure in the approved population. " "Renal impairment requires cautious clinical judgement; avoid unsupported fixed-dose rules.", ], ) section( "ADMINISTRATION NOTES", [ "Oral administration with or without food. Dose modifications follow approved label steps only.", ], ) elif spec.source_class == "MI_FAQ": section( "FREQUENTLY ASKED QUESTIONS", [ f"What is the approved starting dose for {drug}? 80 mg once daily in first-line metastatic " "EGFR-positive NSCLC within EU label boundaries.", ], ) section( "MISSED DOSE", [ "Patient-facing answers must use only approved missed-dose guidance and avoid improvised " "rescue instructions; advise clinician follow-up when uncertain.", ], ) elif spec.source_class == "MED_AFF": section( "PLAYBOOK OVERVIEW", [ f"Medical affairs rollout for {drug} in EU NSCLC: align field medical with label-first messaging.", ], ) section( "BOUNDARY CASES", [ "Adjuvant and post-resection discussions remain outside approved scope unless label updates. " "Keep DRUG-B and DRUG-C narratives separate from DRUG-A.", ], ) elif spec.source_class == "TREATMENT_ALGO": section( "DECISION LOGIC", [ "Step 1: confirm EGFR activating mutation. Step 2: if first-line metastatic NSCLC, " f"consider {drug} when within approved EU label criteria.", ], ) section( "EXCLUSIONS", [ "Do not route adjuvant-only pathways into first-line metastatic approval logic.", ], ) elif spec.source_class == "SME_NOTE": section( "EXPERT REVIEW", [ f"SME interpretation: {drug} PFS benefit in EGFR-positive NSCLC is clinically relevant " "but must be communicated within approved boundaries without superiority overclaim.", ], ) section( "COMPARISON DISCIPLINE", [ "Comparative statements require explicit label or CSR grounding. Avoid cure-adjacent language.", ], ) else: section("CONTENT", [f"Controlled content for {spec.source_id} regarding {drug} in NSCLC."]) return "\n\n".join(pages) + "\n" def write_corpus_files(specs: list[SourceSpec]) -> None: CORPUS_DIR.mkdir(parents=True, exist_ok=True) manifest_sources = [] for spec in specs: text_file = f"{spec.source_id}.txt" path = CORPUS_DIR / text_file path.write_text(generate_document(spec), encoding="utf-8") chunker_class = { "LBL": "LBL", "DOC_CSR": "DOC-CSR", "SOP_MED": "SOP-MED", "GDL": "GDL", "RMP": "RMP", "PK_SUMMARY": "PK-SUMMARY", "MI_FAQ": "MI-FAQ", "MED_AFF": "MED-AFF", "TREATMENT_ALGO": "TREATMENT-ALGO", "SME_NOTE": "SME-NOTE", }[spec.source_class] manifest_sources.append( { "source_id": spec.source_id, "version_id": spec.version_id, "source_class": chunker_class, "therapy_area": THERAPY, "geography": GEO, "audience": spec.audience, "text_file": text_file, } ) MANIFEST_PATH.write_text(json.dumps({"sources": manifest_sources}, indent=2), encoding="utf-8") print(f"Wrote {len(specs)} text files and {MANIFEST_PATH}") def register_sources(specs: list[SourceSpec]) -> None: conn = psycopg.connect(DSN) try: with conn: with conn.cursor() as cur: for spec in specs: cur.execute("SELECT 1 FROM sources WHERE source_id = %s", (spec.source_id,)) if cur.fetchone() is None: cur.execute( """ INSERT INTO sources ( source_id, source_class, title, therapy_area, molecule, geography, audience_scope, sensitivity_class, approval_state, current_version_id, hygiene_status, created_at, updated_at ) VALUES ( %s, %s, %s, %s, %s, %s, %s::json, %s, 'APPROVED', NULL, 'active', %s, %s ) """, ( spec.source_id, spec.source_class, spec.title, THERAPY, spec.molecule, GEO, json.dumps(spec.audience), spec.sensitivity, NOW, NOW, ), ) cur.execute( "SELECT 1 FROM source_versions WHERE version_id = %s", (spec.version_id,), ) if cur.fetchone() is None: cur.execute( """ INSERT INTO source_versions ( version_id, source_id, version_label, approval_state, approval_date, is_latest_approved, is_superseded, created_at ) VALUES (%s, %s, 'v1', 'APPROVED', %s, TRUE, FALSE, %s) """, (spec.version_id, spec.source_id, TODAY, NOW), ) cur.execute( "UPDATE sources SET current_version_id = %s, updated_at = %s WHERE source_id = %s", (spec.version_id, NOW, spec.source_id), ) print(f"Registered {len(specs)} eval-pack sources in Postgres.") finally: conn.close() def run_ingest(*, embed: bool) -> None: cmd = [ sys.executable, str(REPO_ROOT / "scripts" / "run_ingest_pipeline.py"), "--manifest", str(MANIFEST_PATH), ] if not embed: cmd.append("--no-embed") subprocess.run(cmd, check=True, cwd=str(REPO_ROOT)) def verify() -> None: conn = psycopg.connect(DSN) try: with conn.cursor() as cur: cur.execute("SELECT COUNT(*) FROM sources") print(f"sources: {cur.fetchone()[0]}") cur.execute("SELECT COUNT(*) FROM chunks") print(f"chunks: {cur.fetchone()[0]}") cur.execute("SELECT COUNT(*) FROM chunk_embeddings") print(f"chunk_embeddings: {cur.fetchone()[0]}") cur.execute( "SELECT COUNT(*) FROM chunks WHERE source_id LIKE '%RET%' OR source_id LIKE '%TEST%'" ) print(f"fixture chunks (RET/TEST): {cur.fetchone()[0]}") cur.execute( """ SELECT source_id, COUNT(*) FROM chunks WHERE source_id NOT LIKE '%RET%' AND source_id NOT LIKE '%TEST%' GROUP BY source_id ORDER BY source_id """ ) print("eval corpus chunks per source:") for row in cur.fetchall(): print(f" {row[0]}: {row[1]}") finally: conn.close() def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--write-only", action="store_true", help="Only generate text + manifest") parser.add_argument("--no-register", action="store_true", help="Skip Postgres registration") parser.add_argument("--no-ingest", action="store_true", help="Skip ingest pipeline") parser.add_argument("--no-embed", action="store_true", help="Chunk without Ollama embeddings") parser.add_argument("--no-seed-claims", action="store_true", help="Skip claim + assessment seeding") args = parser.parse_args() specs = catalog() write_corpus_files(specs) if args.write_only: return 0 if not args.no_register: register_sources(specs) if not args.no_ingest: run_ingest(embed=not args.no_embed) if not args.no_seed_claims and not args.no_register and not args.no_ingest: subprocess.run( [sys.executable, str(REPO_ROOT / "scripts" / "seed_eval_claims.py")], check=True, cwd=str(REPO_ROOT), ) subprocess.run( [sys.executable, str(REPO_ROOT / "scripts" / "seed_eval_graph_entities.py")], check=True, cwd=str(REPO_ROOT), ) verify() return 0 if __name__ == "__main__": raise SystemExit(main())