pharmaspine-backend / scripts /setup_eval_corpus.py
ashish1265659565's picture
Upload folder using huggingface_hub
08fd094 verified
Raw
History Blame
19.7 kB
"""Register eval-pack sources, generate corpus text, and ingest into Postgres.
Aligns the knowledge base with ``generate_nsclc_eval_pack.py`` SOURCE_SET (14 sources).
Does **not** modify integration-test fixtures (*-RET-*, *-TEST-*).
Steps (default: all):
1. Write ``data/eval_corpus/*.txt`` + ``manifest.json``
2. Register ``sources`` / ``source_versions`` in Postgres
3. Run chunk + embed ingest via ``run_ingest_pipeline.py``
Usage:
python3 scripts/setup_eval_corpus.py
python3 scripts/setup_eval_corpus.py --no-embed
python3 scripts/setup_eval_corpus.py --write-only
"""
from __future__ import annotations
import argparse
import json
import os
import subprocess
import sys
from dataclasses import dataclass
from datetime import UTC, date, datetime
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
CORPUS_DIR = REPO_ROOT / "data" / "eval_corpus"
MANIFEST_PATH = CORPUS_DIR / "manifest.json"
_env_file = REPO_ROOT / ".env"
if _env_file.exists():
for raw_line in _env_file.read_text().splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
os.environ.setdefault(key.strip(), value.strip())
import psycopg # noqa: E402
DSN = os.getenv(
"AKS_DATABASE_URL",
"postgresql+psycopg://mobcoderid-296@localhost/ai_knowledge_spine",
).replace("postgresql+psycopg://", "postgresql://", 1)
NOW = datetime.now(UTC)
TODAY = date.today()
THERAPY = "NSCLC"
GEO = "EU / EMA"
@dataclass(frozen=True)
class SourceSpec:
source_id: str
version_id: str
source_class: str # DB enum name: LBL, DOC_CSR, SOP_MED, ...
title: str
molecule: str | None
sensitivity: str # EXTERNAL | INTERNAL_ONLY
audience: list[str]
def _molecule_from_id(source_id: str) -> str | None:
if "DRUGA" in source_id or source_id.endswith("DRUGA-2024"):
return "DRUG-A"
if "DRUGB" in source_id:
return "DRUG-B"
if "DRUGC" in source_id:
return "DRUG-C"
return None
def _class_from_prefix(source_id: str) -> str:
if source_id.startswith("LBL-"):
return "LBL"
if source_id.startswith("DOC-CSR-"):
return "DOC_CSR"
if source_id.startswith("SOP-MED-"):
return "SOP_MED"
if source_id.startswith("GDL-"):
return "GDL"
if source_id.startswith("RMP-"):
return "RMP"
if source_id.startswith("PK-SUMMARY-"):
return "PK_SUMMARY"
if source_id.startswith("MI-FAQ-"):
return "MI_FAQ"
if source_id.startswith("MED-AFF-"):
return "MED_AFF"
if source_id.startswith("TREATMENT-ALGO-"):
return "TREATMENT_ALGO"
if source_id.startswith("SME-NOTE-"):
return "SME_NOTE"
return "LBL"
def catalog() -> list[SourceSpec]:
specs: list[SourceSpec] = []
for source_id in [
"DOC-CSR-NSCLC-001",
"DOC-CSR-NSCLC-014",
"SOP-MED-NSCLC-010",
"SOP-MED-NSCLC-022",
"GDL-NSCLC-2025-03",
"LBL-NSCLC-DRUGA-EMA-2024",
"LBL-NSCLC-DRUGB-EMA-2023",
"LBL-NSCLC-DRUGC-EMA-2024",
"MI-FAQ-NSCLC-021",
"MED-AFF-NSCLC-PLAYBOOK-008",
"RMP-NSCLC-DRUGA-2024",
"SME-NOTE-NSCLC-017",
"PK-SUMMARY-NSCLC-005",
"TREATMENT-ALGO-NSCLC-2025-02",
]:
slug = source_id.lower().replace("/", "-")[:40]
version_id = f"ver-{slug}-1"
mol = _molecule_from_id(source_id)
cls = _class_from_prefix(source_id)
internal = cls in {"SOP_MED", "MED_AFF", "TREATMENT_ALGO", "SME_NOTE"}
specs.append(
SourceSpec(
source_id=source_id,
version_id=version_id,
source_class=cls,
title=f"Synthetic eval corpus — {source_id}",
molecule=mol,
sensitivity="INTERNAL_ONLY" if internal else "EXTERNAL",
audience=["Internal"] if internal and cls != "SOP_MED" else ["HCP", "Internal"],
)
)
return specs
def _drug_label(mol: str | None) -> str:
return mol or "the authorised product"
def generate_document(spec: SourceSpec) -> str:
drug = _drug_label(spec.molecule)
pages: list[str] = []
p = 1
def section(heading: str, paragraphs: list[str]) -> None:
nonlocal p, pages
pages.append(f"[[PAGE:{p}]]")
pages.append(heading)
pages.extend(paragraphs)
p += 1
if spec.source_class == "LBL":
section(
"1 INDICATIONS AND USAGE",
[
f"{drug} is indicated as monotherapy for adults with locally advanced or metastatic "
"non-small cell lung cancer (NSCLC) harbouring activating EGFR mutations in the "
"first-line setting under the approved EU label.",
"Use outside EGFR-positive first-line metastatic NSCLC is not authorised. "
"Adjuvant or post-resection use must not be presented as approved.",
],
)
section(
"2 POSOLOGY AND METHOD OF ADMINISTRATION",
[
f"The recommended dose of {drug} is 80 mg once daily, orally, with or without food. "
"Treatment continues until disease progression or unacceptable toxicity.",
"Dose reduction to 40 mg once daily is permitted only within approved EU label "
"boundaries for documented toxicity. Missed doses must not be doubled.",
],
)
section(
"4 CONTRAINDICATIONS",
[
f"{drug} is contraindicated in patients with hypersensitivity to the active substance "
"or excipients.",
],
)
section(
"4.4 SPECIAL WARNINGS AND PRECAUTIONS FOR USE",
[
"Monitor for interstitial lung disease (ILD): new dyspnoea, cough, or fever require "
"urgent assessment. Grade 3 or higher ILD requires permanent discontinuation.",
"Baseline and periodic hepatic function and QT interval assessment is recommended. "
"Use caution with QT-prolonging co-medications.",
],
)
section(
"4.8 UNDESIRABLE EFFECTS",
[
"Common adverse reactions include rash, diarrhoea, paronychia, stomatitis, and "
"decreased appetite. Serious reactions include ILD and severe cutaneous adverse events.",
],
)
elif spec.source_class == "DOC_CSR":
section(
"OBJECTIVE",
[
f"This clinical study report evaluates efficacy and safety of {drug} versus "
"standard-of-care chemotherapy in treatment-naïve EGFR-positive metastatic NSCLC.",
],
)
section(
"ENDPOINTS",
[
"Primary endpoint: progression-free survival by blinded independent central review. "
"Secondary: overall survival, objective response rate (RECIST 1.1), duration of response, "
"and treatment-emergent adverse events.",
],
)
section(
"RESULTS",
[
f"{drug} improved progression-free survival in EGFR-positive NSCLC versus chemotherapy "
"with a clinically meaningful hazard ratio favouring study treatment.",
f"Overall response rate and duration of response were higher in the {drug} arm. "
"Safety was consistent with EGFR-targeted therapy including ILD and QT prolongation.",
],
)
section(
"LIMITATIONS",
[
"Population restricted to confirmed EGFR activating mutations. "
"Findings must not be extrapolated beyond approved EU label scope.",
],
)
elif spec.source_class == "SOP_MED":
section(
"PURPOSE",
[
f"Govern medical information responses for {drug} in EU NSCLC, defining on-label "
"versus medical affairs review boundaries.",
],
)
section(
"DOSING GUIDANCE",
[
f"On-label dosing inquiries use approved EU label content: 80 mg once daily first-line "
f"metastatic NSCLC for {drug}. Dose reductions must remain within approved EU label boundaries.",
"Inquiries probing off-label dosing or regimens route to SME review.",
],
)
section(
"MEDICAL RESPONSE RULES",
[
"Label is primary for indication, dose, and contraindications. "
"Conflicts resolve in favour of the label. Low-confidence or policy-sensitive items route to SME.",
],
)
elif spec.source_class == "GDL":
section(
"RECOMMENDATIONS",
[
f"For EGFR-positive metastatic NSCLC, {drug} may be considered in first-line per "
"current EU practice when aligned with the approved label.",
],
)
section(
"BIOMARKER TESTING",
[
"Validated EGFR mutation testing should be completed before treatment selection. "
"Later-line mutation-specific decisions require label alignment.",
],
)
section(
"FIRST-LINE THERAPY",
[
"Separate labeled first-line metastatic use from adjuvant or post-resection settings. "
"Do not imply non-labeled lines are approved.",
],
)
elif spec.source_class == "RMP":
section(
"IMPORTANT IDENTIFIED RISKS",
[
f"For {drug}, important risks include interstitial lung disease, QT prolongation, "
"hepatotoxicity, and severe cutaneous adverse reactions.",
],
)
section(
"PHARMACOVIGILANCE MEASURES",
[
"Healthcare professionals should report suspected adverse reactions per local requirements. "
"ILD symptoms require prompt evaluation and label-concordant management.",
],
)
elif spec.source_class == "PK_SUMMARY":
section(
"DOSE-EXPOSURE RELATIONSHIP",
[
f"{drug} 80 mg once daily achieves target exposure in the approved population. "
"Renal impairment requires cautious clinical judgement; avoid unsupported fixed-dose rules.",
],
)
section(
"ADMINISTRATION NOTES",
[
"Oral administration with or without food. Dose modifications follow approved label steps only.",
],
)
elif spec.source_class == "MI_FAQ":
section(
"FREQUENTLY ASKED QUESTIONS",
[
f"What is the approved starting dose for {drug}? 80 mg once daily in first-line metastatic "
"EGFR-positive NSCLC within EU label boundaries.",
],
)
section(
"MISSED DOSE",
[
"Patient-facing answers must use only approved missed-dose guidance and avoid improvised "
"rescue instructions; advise clinician follow-up when uncertain.",
],
)
elif spec.source_class == "MED_AFF":
section(
"PLAYBOOK OVERVIEW",
[
f"Medical affairs rollout for {drug} in EU NSCLC: align field medical with label-first messaging.",
],
)
section(
"BOUNDARY CASES",
[
"Adjuvant and post-resection discussions remain outside approved scope unless label updates. "
"Keep DRUG-B and DRUG-C narratives separate from DRUG-A.",
],
)
elif spec.source_class == "TREATMENT_ALGO":
section(
"DECISION LOGIC",
[
"Step 1: confirm EGFR activating mutation. Step 2: if first-line metastatic NSCLC, "
f"consider {drug} when within approved EU label criteria.",
],
)
section(
"EXCLUSIONS",
[
"Do not route adjuvant-only pathways into first-line metastatic approval logic.",
],
)
elif spec.source_class == "SME_NOTE":
section(
"EXPERT REVIEW",
[
f"SME interpretation: {drug} PFS benefit in EGFR-positive NSCLC is clinically relevant "
"but must be communicated within approved boundaries without superiority overclaim.",
],
)
section(
"COMPARISON DISCIPLINE",
[
"Comparative statements require explicit label or CSR grounding. Avoid cure-adjacent language.",
],
)
else:
section("CONTENT", [f"Controlled content for {spec.source_id} regarding {drug} in NSCLC."])
return "\n\n".join(pages) + "\n"
def write_corpus_files(specs: list[SourceSpec]) -> None:
CORPUS_DIR.mkdir(parents=True, exist_ok=True)
manifest_sources = []
for spec in specs:
text_file = f"{spec.source_id}.txt"
path = CORPUS_DIR / text_file
path.write_text(generate_document(spec), encoding="utf-8")
chunker_class = {
"LBL": "LBL",
"DOC_CSR": "DOC-CSR",
"SOP_MED": "SOP-MED",
"GDL": "GDL",
"RMP": "RMP",
"PK_SUMMARY": "PK-SUMMARY",
"MI_FAQ": "MI-FAQ",
"MED_AFF": "MED-AFF",
"TREATMENT_ALGO": "TREATMENT-ALGO",
"SME_NOTE": "SME-NOTE",
}[spec.source_class]
manifest_sources.append(
{
"source_id": spec.source_id,
"version_id": spec.version_id,
"source_class": chunker_class,
"therapy_area": THERAPY,
"geography": GEO,
"audience": spec.audience,
"text_file": text_file,
}
)
MANIFEST_PATH.write_text(json.dumps({"sources": manifest_sources}, indent=2), encoding="utf-8")
print(f"Wrote {len(specs)} text files and {MANIFEST_PATH}")
def register_sources(specs: list[SourceSpec]) -> None:
conn = psycopg.connect(DSN)
try:
with conn:
with conn.cursor() as cur:
for spec in specs:
cur.execute("SELECT 1 FROM sources WHERE source_id = %s", (spec.source_id,))
if cur.fetchone() is None:
cur.execute(
"""
INSERT INTO sources (
source_id, source_class, title, therapy_area, molecule,
geography, audience_scope, sensitivity_class, approval_state,
current_version_id, hygiene_status, created_at, updated_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s::json, %s, 'APPROVED',
NULL, 'active', %s, %s
)
""",
(
spec.source_id,
spec.source_class,
spec.title,
THERAPY,
spec.molecule,
GEO,
json.dumps(spec.audience),
spec.sensitivity,
NOW,
NOW,
),
)
cur.execute(
"SELECT 1 FROM source_versions WHERE version_id = %s",
(spec.version_id,),
)
if cur.fetchone() is None:
cur.execute(
"""
INSERT INTO source_versions (
version_id, source_id, version_label, approval_state,
approval_date, is_latest_approved, is_superseded, created_at
) VALUES (%s, %s, 'v1', 'APPROVED', %s, TRUE, FALSE, %s)
""",
(spec.version_id, spec.source_id, TODAY, NOW),
)
cur.execute(
"UPDATE sources SET current_version_id = %s, updated_at = %s WHERE source_id = %s",
(spec.version_id, NOW, spec.source_id),
)
print(f"Registered {len(specs)} eval-pack sources in Postgres.")
finally:
conn.close()
def run_ingest(*, embed: bool) -> None:
cmd = [
sys.executable,
str(REPO_ROOT / "scripts" / "run_ingest_pipeline.py"),
"--manifest",
str(MANIFEST_PATH),
]
if not embed:
cmd.append("--no-embed")
subprocess.run(cmd, check=True, cwd=str(REPO_ROOT))
def verify() -> None:
conn = psycopg.connect(DSN)
try:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM sources")
print(f"sources: {cur.fetchone()[0]}")
cur.execute("SELECT COUNT(*) FROM chunks")
print(f"chunks: {cur.fetchone()[0]}")
cur.execute("SELECT COUNT(*) FROM chunk_embeddings")
print(f"chunk_embeddings: {cur.fetchone()[0]}")
cur.execute(
"SELECT COUNT(*) FROM chunks WHERE source_id LIKE '%RET%' OR source_id LIKE '%TEST%'"
)
print(f"fixture chunks (RET/TEST): {cur.fetchone()[0]}")
cur.execute(
"""
SELECT source_id, COUNT(*) FROM chunks
WHERE source_id NOT LIKE '%RET%' AND source_id NOT LIKE '%TEST%'
GROUP BY source_id ORDER BY source_id
"""
)
print("eval corpus chunks per source:")
for row in cur.fetchall():
print(f" {row[0]}: {row[1]}")
finally:
conn.close()
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--write-only", action="store_true", help="Only generate text + manifest")
parser.add_argument("--no-register", action="store_true", help="Skip Postgres registration")
parser.add_argument("--no-ingest", action="store_true", help="Skip ingest pipeline")
parser.add_argument("--no-embed", action="store_true", help="Chunk without Ollama embeddings")
parser.add_argument("--no-seed-claims", action="store_true", help="Skip claim + assessment seeding")
args = parser.parse_args()
specs = catalog()
write_corpus_files(specs)
if args.write_only:
return 0
if not args.no_register:
register_sources(specs)
if not args.no_ingest:
run_ingest(embed=not args.no_embed)
if not args.no_seed_claims and not args.no_register and not args.no_ingest:
subprocess.run(
[sys.executable, str(REPO_ROOT / "scripts" / "seed_eval_claims.py")],
check=True,
cwd=str(REPO_ROOT),
)
subprocess.run(
[sys.executable, str(REPO_ROOT / "scripts" / "seed_eval_graph_entities.py")],
check=True,
cwd=str(REPO_ROOT),
)
verify()
return 0
if __name__ == "__main__":
raise SystemExit(main())