| """Register eval-pack sources, generate corpus text, and ingest into Postgres. |
| |
| Aligns the knowledge base with ``generate_nsclc_eval_pack.py`` SOURCE_SET (14 sources). |
| Does **not** modify integration-test fixtures (*-RET-*, *-TEST-*). |
| |
| Steps (default: all): |
| 1. Write ``data/eval_corpus/*.txt`` + ``manifest.json`` |
| 2. Register ``sources`` / ``source_versions`` in Postgres |
| 3. Run chunk + embed ingest via ``run_ingest_pipeline.py`` |
| |
| Usage: |
| python3 scripts/setup_eval_corpus.py |
| python3 scripts/setup_eval_corpus.py --no-embed |
| python3 scripts/setup_eval_corpus.py --write-only |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import os |
| import subprocess |
| import sys |
| from dataclasses import dataclass |
| from datetime import UTC, date, datetime |
| from pathlib import Path |
|
|
| REPO_ROOT = Path(__file__).resolve().parent.parent |
| CORPUS_DIR = REPO_ROOT / "data" / "eval_corpus" |
| MANIFEST_PATH = CORPUS_DIR / "manifest.json" |
|
|
| _env_file = REPO_ROOT / ".env" |
| if _env_file.exists(): |
| for raw_line in _env_file.read_text().splitlines(): |
| line = raw_line.strip() |
| if not line or line.startswith("#") or "=" not in line: |
| continue |
| key, value = line.split("=", 1) |
| os.environ.setdefault(key.strip(), value.strip()) |
|
|
| import psycopg |
|
|
| DSN = os.getenv( |
| "AKS_DATABASE_URL", |
| "postgresql+psycopg://mobcoderid-296@localhost/ai_knowledge_spine", |
| ).replace("postgresql+psycopg://", "postgresql://", 1) |
|
|
| NOW = datetime.now(UTC) |
| TODAY = date.today() |
| THERAPY = "NSCLC" |
| GEO = "EU / EMA" |
|
|
|
|
| @dataclass(frozen=True) |
| class SourceSpec: |
| source_id: str |
| version_id: str |
| source_class: str |
| title: str |
| molecule: str | None |
| sensitivity: str |
| audience: list[str] |
|
|
|
|
| def _molecule_from_id(source_id: str) -> str | None: |
| if "DRUGA" in source_id or source_id.endswith("DRUGA-2024"): |
| return "DRUG-A" |
| if "DRUGB" in source_id: |
| return "DRUG-B" |
| if "DRUGC" in source_id: |
| return "DRUG-C" |
| return None |
|
|
|
|
| def _class_from_prefix(source_id: str) -> str: |
| if source_id.startswith("LBL-"): |
| return "LBL" |
| if source_id.startswith("DOC-CSR-"): |
| return "DOC_CSR" |
| if source_id.startswith("SOP-MED-"): |
| return "SOP_MED" |
| if source_id.startswith("GDL-"): |
| return "GDL" |
| if source_id.startswith("RMP-"): |
| return "RMP" |
| if source_id.startswith("PK-SUMMARY-"): |
| return "PK_SUMMARY" |
| if source_id.startswith("MI-FAQ-"): |
| return "MI_FAQ" |
| if source_id.startswith("MED-AFF-"): |
| return "MED_AFF" |
| if source_id.startswith("TREATMENT-ALGO-"): |
| return "TREATMENT_ALGO" |
| if source_id.startswith("SME-NOTE-"): |
| return "SME_NOTE" |
| return "LBL" |
|
|
|
|
| def catalog() -> list[SourceSpec]: |
| specs: list[SourceSpec] = [] |
| for source_id in [ |
| "DOC-CSR-NSCLC-001", |
| "DOC-CSR-NSCLC-014", |
| "SOP-MED-NSCLC-010", |
| "SOP-MED-NSCLC-022", |
| "GDL-NSCLC-2025-03", |
| "LBL-NSCLC-DRUGA-EMA-2024", |
| "LBL-NSCLC-DRUGB-EMA-2023", |
| "LBL-NSCLC-DRUGC-EMA-2024", |
| "MI-FAQ-NSCLC-021", |
| "MED-AFF-NSCLC-PLAYBOOK-008", |
| "RMP-NSCLC-DRUGA-2024", |
| "SME-NOTE-NSCLC-017", |
| "PK-SUMMARY-NSCLC-005", |
| "TREATMENT-ALGO-NSCLC-2025-02", |
| ]: |
| slug = source_id.lower().replace("/", "-")[:40] |
| version_id = f"ver-{slug}-1" |
| mol = _molecule_from_id(source_id) |
| cls = _class_from_prefix(source_id) |
| internal = cls in {"SOP_MED", "MED_AFF", "TREATMENT_ALGO", "SME_NOTE"} |
| specs.append( |
| SourceSpec( |
| source_id=source_id, |
| version_id=version_id, |
| source_class=cls, |
| title=f"Synthetic eval corpus — {source_id}", |
| molecule=mol, |
| sensitivity="INTERNAL_ONLY" if internal else "EXTERNAL", |
| audience=["Internal"] if internal and cls != "SOP_MED" else ["HCP", "Internal"], |
| ) |
| ) |
| return specs |
|
|
|
|
| def _drug_label(mol: str | None) -> str: |
| return mol or "the authorised product" |
|
|
|
|
| def generate_document(spec: SourceSpec) -> str: |
| drug = _drug_label(spec.molecule) |
| pages: list[str] = [] |
| p = 1 |
|
|
| def section(heading: str, paragraphs: list[str]) -> None: |
| nonlocal p, pages |
| pages.append(f"[[PAGE:{p}]]") |
| pages.append(heading) |
| pages.extend(paragraphs) |
| p += 1 |
|
|
| if spec.source_class == "LBL": |
| section( |
| "1 INDICATIONS AND USAGE", |
| [ |
| f"{drug} is indicated as monotherapy for adults with locally advanced or metastatic " |
| "non-small cell lung cancer (NSCLC) harbouring activating EGFR mutations in the " |
| "first-line setting under the approved EU label.", |
| "Use outside EGFR-positive first-line metastatic NSCLC is not authorised. " |
| "Adjuvant or post-resection use must not be presented as approved.", |
| ], |
| ) |
| section( |
| "2 POSOLOGY AND METHOD OF ADMINISTRATION", |
| [ |
| f"The recommended dose of {drug} is 80 mg once daily, orally, with or without food. " |
| "Treatment continues until disease progression or unacceptable toxicity.", |
| "Dose reduction to 40 mg once daily is permitted only within approved EU label " |
| "boundaries for documented toxicity. Missed doses must not be doubled.", |
| ], |
| ) |
| section( |
| "4 CONTRAINDICATIONS", |
| [ |
| f"{drug} is contraindicated in patients with hypersensitivity to the active substance " |
| "or excipients.", |
| ], |
| ) |
| section( |
| "4.4 SPECIAL WARNINGS AND PRECAUTIONS FOR USE", |
| [ |
| "Monitor for interstitial lung disease (ILD): new dyspnoea, cough, or fever require " |
| "urgent assessment. Grade 3 or higher ILD requires permanent discontinuation.", |
| "Baseline and periodic hepatic function and QT interval assessment is recommended. " |
| "Use caution with QT-prolonging co-medications.", |
| ], |
| ) |
| section( |
| "4.8 UNDESIRABLE EFFECTS", |
| [ |
| "Common adverse reactions include rash, diarrhoea, paronychia, stomatitis, and " |
| "decreased appetite. Serious reactions include ILD and severe cutaneous adverse events.", |
| ], |
| ) |
| elif spec.source_class == "DOC_CSR": |
| section( |
| "OBJECTIVE", |
| [ |
| f"This clinical study report evaluates efficacy and safety of {drug} versus " |
| "standard-of-care chemotherapy in treatment-naïve EGFR-positive metastatic NSCLC.", |
| ], |
| ) |
| section( |
| "ENDPOINTS", |
| [ |
| "Primary endpoint: progression-free survival by blinded independent central review. " |
| "Secondary: overall survival, objective response rate (RECIST 1.1), duration of response, " |
| "and treatment-emergent adverse events.", |
| ], |
| ) |
| section( |
| "RESULTS", |
| [ |
| f"{drug} improved progression-free survival in EGFR-positive NSCLC versus chemotherapy " |
| "with a clinically meaningful hazard ratio favouring study treatment.", |
| f"Overall response rate and duration of response were higher in the {drug} arm. " |
| "Safety was consistent with EGFR-targeted therapy including ILD and QT prolongation.", |
| ], |
| ) |
| section( |
| "LIMITATIONS", |
| [ |
| "Population restricted to confirmed EGFR activating mutations. " |
| "Findings must not be extrapolated beyond approved EU label scope.", |
| ], |
| ) |
| elif spec.source_class == "SOP_MED": |
| section( |
| "PURPOSE", |
| [ |
| f"Govern medical information responses for {drug} in EU NSCLC, defining on-label " |
| "versus medical affairs review boundaries.", |
| ], |
| ) |
| section( |
| "DOSING GUIDANCE", |
| [ |
| f"On-label dosing inquiries use approved EU label content: 80 mg once daily first-line " |
| f"metastatic NSCLC for {drug}. Dose reductions must remain within approved EU label boundaries.", |
| "Inquiries probing off-label dosing or regimens route to SME review.", |
| ], |
| ) |
| section( |
| "MEDICAL RESPONSE RULES", |
| [ |
| "Label is primary for indication, dose, and contraindications. " |
| "Conflicts resolve in favour of the label. Low-confidence or policy-sensitive items route to SME.", |
| ], |
| ) |
| elif spec.source_class == "GDL": |
| section( |
| "RECOMMENDATIONS", |
| [ |
| f"For EGFR-positive metastatic NSCLC, {drug} may be considered in first-line per " |
| "current EU practice when aligned with the approved label.", |
| ], |
| ) |
| section( |
| "BIOMARKER TESTING", |
| [ |
| "Validated EGFR mutation testing should be completed before treatment selection. " |
| "Later-line mutation-specific decisions require label alignment.", |
| ], |
| ) |
| section( |
| "FIRST-LINE THERAPY", |
| [ |
| "Separate labeled first-line metastatic use from adjuvant or post-resection settings. " |
| "Do not imply non-labeled lines are approved.", |
| ], |
| ) |
| elif spec.source_class == "RMP": |
| section( |
| "IMPORTANT IDENTIFIED RISKS", |
| [ |
| f"For {drug}, important risks include interstitial lung disease, QT prolongation, " |
| "hepatotoxicity, and severe cutaneous adverse reactions.", |
| ], |
| ) |
| section( |
| "PHARMACOVIGILANCE MEASURES", |
| [ |
| "Healthcare professionals should report suspected adverse reactions per local requirements. " |
| "ILD symptoms require prompt evaluation and label-concordant management.", |
| ], |
| ) |
| elif spec.source_class == "PK_SUMMARY": |
| section( |
| "DOSE-EXPOSURE RELATIONSHIP", |
| [ |
| f"{drug} 80 mg once daily achieves target exposure in the approved population. " |
| "Renal impairment requires cautious clinical judgement; avoid unsupported fixed-dose rules.", |
| ], |
| ) |
| section( |
| "ADMINISTRATION NOTES", |
| [ |
| "Oral administration with or without food. Dose modifications follow approved label steps only.", |
| ], |
| ) |
| elif spec.source_class == "MI_FAQ": |
| section( |
| "FREQUENTLY ASKED QUESTIONS", |
| [ |
| f"What is the approved starting dose for {drug}? 80 mg once daily in first-line metastatic " |
| "EGFR-positive NSCLC within EU label boundaries.", |
| ], |
| ) |
| section( |
| "MISSED DOSE", |
| [ |
| "Patient-facing answers must use only approved missed-dose guidance and avoid improvised " |
| "rescue instructions; advise clinician follow-up when uncertain.", |
| ], |
| ) |
| elif spec.source_class == "MED_AFF": |
| section( |
| "PLAYBOOK OVERVIEW", |
| [ |
| f"Medical affairs rollout for {drug} in EU NSCLC: align field medical with label-first messaging.", |
| ], |
| ) |
| section( |
| "BOUNDARY CASES", |
| [ |
| "Adjuvant and post-resection discussions remain outside approved scope unless label updates. " |
| "Keep DRUG-B and DRUG-C narratives separate from DRUG-A.", |
| ], |
| ) |
| elif spec.source_class == "TREATMENT_ALGO": |
| section( |
| "DECISION LOGIC", |
| [ |
| "Step 1: confirm EGFR activating mutation. Step 2: if first-line metastatic NSCLC, " |
| f"consider {drug} when within approved EU label criteria.", |
| ], |
| ) |
| section( |
| "EXCLUSIONS", |
| [ |
| "Do not route adjuvant-only pathways into first-line metastatic approval logic.", |
| ], |
| ) |
| elif spec.source_class == "SME_NOTE": |
| section( |
| "EXPERT REVIEW", |
| [ |
| f"SME interpretation: {drug} PFS benefit in EGFR-positive NSCLC is clinically relevant " |
| "but must be communicated within approved boundaries without superiority overclaim.", |
| ], |
| ) |
| section( |
| "COMPARISON DISCIPLINE", |
| [ |
| "Comparative statements require explicit label or CSR grounding. Avoid cure-adjacent language.", |
| ], |
| ) |
| else: |
| section("CONTENT", [f"Controlled content for {spec.source_id} regarding {drug} in NSCLC."]) |
|
|
| return "\n\n".join(pages) + "\n" |
|
|
|
|
| def write_corpus_files(specs: list[SourceSpec]) -> None: |
| CORPUS_DIR.mkdir(parents=True, exist_ok=True) |
| manifest_sources = [] |
| for spec in specs: |
| text_file = f"{spec.source_id}.txt" |
| path = CORPUS_DIR / text_file |
| path.write_text(generate_document(spec), encoding="utf-8") |
| chunker_class = { |
| "LBL": "LBL", |
| "DOC_CSR": "DOC-CSR", |
| "SOP_MED": "SOP-MED", |
| "GDL": "GDL", |
| "RMP": "RMP", |
| "PK_SUMMARY": "PK-SUMMARY", |
| "MI_FAQ": "MI-FAQ", |
| "MED_AFF": "MED-AFF", |
| "TREATMENT_ALGO": "TREATMENT-ALGO", |
| "SME_NOTE": "SME-NOTE", |
| }[spec.source_class] |
| manifest_sources.append( |
| { |
| "source_id": spec.source_id, |
| "version_id": spec.version_id, |
| "source_class": chunker_class, |
| "therapy_area": THERAPY, |
| "geography": GEO, |
| "audience": spec.audience, |
| "text_file": text_file, |
| } |
| ) |
|
|
| MANIFEST_PATH.write_text(json.dumps({"sources": manifest_sources}, indent=2), encoding="utf-8") |
| print(f"Wrote {len(specs)} text files and {MANIFEST_PATH}") |
|
|
|
|
| def register_sources(specs: list[SourceSpec]) -> None: |
| conn = psycopg.connect(DSN) |
| try: |
| with conn: |
| with conn.cursor() as cur: |
| for spec in specs: |
| cur.execute("SELECT 1 FROM sources WHERE source_id = %s", (spec.source_id,)) |
| if cur.fetchone() is None: |
| cur.execute( |
| """ |
| INSERT INTO sources ( |
| source_id, source_class, title, therapy_area, molecule, |
| geography, audience_scope, sensitivity_class, approval_state, |
| current_version_id, hygiene_status, created_at, updated_at |
| ) VALUES ( |
| %s, %s, %s, %s, %s, %s, %s::json, %s, 'APPROVED', |
| NULL, 'active', %s, %s |
| ) |
| """, |
| ( |
| spec.source_id, |
| spec.source_class, |
| spec.title, |
| THERAPY, |
| spec.molecule, |
| GEO, |
| json.dumps(spec.audience), |
| spec.sensitivity, |
| NOW, |
| NOW, |
| ), |
| ) |
| cur.execute( |
| "SELECT 1 FROM source_versions WHERE version_id = %s", |
| (spec.version_id,), |
| ) |
| if cur.fetchone() is None: |
| cur.execute( |
| """ |
| INSERT INTO source_versions ( |
| version_id, source_id, version_label, approval_state, |
| approval_date, is_latest_approved, is_superseded, created_at |
| ) VALUES (%s, %s, 'v1', 'APPROVED', %s, TRUE, FALSE, %s) |
| """, |
| (spec.version_id, spec.source_id, TODAY, NOW), |
| ) |
| cur.execute( |
| "UPDATE sources SET current_version_id = %s, updated_at = %s WHERE source_id = %s", |
| (spec.version_id, NOW, spec.source_id), |
| ) |
| print(f"Registered {len(specs)} eval-pack sources in Postgres.") |
| finally: |
| conn.close() |
|
|
|
|
| def run_ingest(*, embed: bool) -> None: |
| cmd = [ |
| sys.executable, |
| str(REPO_ROOT / "scripts" / "run_ingest_pipeline.py"), |
| "--manifest", |
| str(MANIFEST_PATH), |
| ] |
| if not embed: |
| cmd.append("--no-embed") |
| subprocess.run(cmd, check=True, cwd=str(REPO_ROOT)) |
|
|
|
|
| def verify() -> None: |
| conn = psycopg.connect(DSN) |
| try: |
| with conn.cursor() as cur: |
| cur.execute("SELECT COUNT(*) FROM sources") |
| print(f"sources: {cur.fetchone()[0]}") |
| cur.execute("SELECT COUNT(*) FROM chunks") |
| print(f"chunks: {cur.fetchone()[0]}") |
| cur.execute("SELECT COUNT(*) FROM chunk_embeddings") |
| print(f"chunk_embeddings: {cur.fetchone()[0]}") |
| cur.execute( |
| "SELECT COUNT(*) FROM chunks WHERE source_id LIKE '%RET%' OR source_id LIKE '%TEST%'" |
| ) |
| print(f"fixture chunks (RET/TEST): {cur.fetchone()[0]}") |
| cur.execute( |
| """ |
| SELECT source_id, COUNT(*) FROM chunks |
| WHERE source_id NOT LIKE '%RET%' AND source_id NOT LIKE '%TEST%' |
| GROUP BY source_id ORDER BY source_id |
| """ |
| ) |
| print("eval corpus chunks per source:") |
| for row in cur.fetchall(): |
| print(f" {row[0]}: {row[1]}") |
| finally: |
| conn.close() |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument("--write-only", action="store_true", help="Only generate text + manifest") |
| parser.add_argument("--no-register", action="store_true", help="Skip Postgres registration") |
| parser.add_argument("--no-ingest", action="store_true", help="Skip ingest pipeline") |
| parser.add_argument("--no-embed", action="store_true", help="Chunk without Ollama embeddings") |
| parser.add_argument("--no-seed-claims", action="store_true", help="Skip claim + assessment seeding") |
| args = parser.parse_args() |
|
|
| specs = catalog() |
| write_corpus_files(specs) |
| if args.write_only: |
| return 0 |
| if not args.no_register: |
| register_sources(specs) |
| if not args.no_ingest: |
| run_ingest(embed=not args.no_embed) |
| if not args.no_seed_claims and not args.no_register and not args.no_ingest: |
| subprocess.run( |
| [sys.executable, str(REPO_ROOT / "scripts" / "seed_eval_claims.py")], |
| check=True, |
| cwd=str(REPO_ROOT), |
| ) |
| subprocess.run( |
| [sys.executable, str(REPO_ROOT / "scripts" / "seed_eval_graph_entities.py")], |
| check=True, |
| cwd=str(REPO_ROOT), |
| ) |
| verify() |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|