| """End-to-end ingest pipeline: parse text -> chunk -> embed -> persist to Postgres. |
| |
| The script reads ``data/seed_sources/manifest.json`` (or a custom path) and, for each |
| listed source, requires a pre-registered row in ``sources``/``source_versions`` (so |
| governance state remains the source of truth). It then: |
| |
| 1. Chunks the source text via the hardened ScientificChunker (sentence-aware |
| sub-splitting, configurable overlap, tiny-fragment merging). |
| 2. Optionally embeds each chunk via the Ollama embedding model from .env. |
| 3. Replaces existing rows for ``(source_id, version_id)`` in ``chunks`` and |
| ``chunk_embeddings`` so the script is idempotent. |
| |
| Usage: |
| python3 scripts/run_ingest_pipeline.py \ |
| --manifest data/seed_sources/manifest.json \ |
| --database-url postgresql+psycopg://mobcoderid-296@localhost/ai_knowledge_spine |
| |
| Skip embeddings (chunks only, much faster): |
| python3 scripts/run_ingest_pipeline.py --no-embed |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import os |
| import sys |
| import uuid |
| from dataclasses import dataclass |
| from datetime import UTC, datetime |
| from pathlib import Path |
| from typing import Any |
|
|
| REPO_ROOT = Path(__file__).resolve().parent.parent |
| CHUNKING_ROOT = REPO_ROOT / "services" / "chunking-service" |
| SHARED_ROOT = REPO_ROOT / "shared" |
|
|
| |
| PROTECTED_SOURCE_IDS = frozenset( |
| { |
| "LBL-NSCLC-RET-EMA-2026", |
| "DOC-CSR-NSCLC-RET-2026", |
| "SOP-MED-NSCLC-RET-2026", |
| "LBL-NSCLC-TEST-EMA-2026", |
| "DOC-CSR-NSCLC-TEST-2026", |
| } |
| ) |
| for path in (str(CHUNKING_ROOT), str(SHARED_ROOT)): |
| if path not in sys.path: |
| sys.path.insert(0, path) |
|
|
| |
| _env_file = REPO_ROOT / ".env" |
| if _env_file.exists(): |
| for raw_line in _env_file.read_text().splitlines(): |
| line = raw_line.strip() |
| if not line or line.startswith("#") or "=" not in line: |
| continue |
| key, value = line.split("=", 1) |
| os.environ.setdefault(key.strip(), value.strip()) |
|
|
| from app.schemas.chunking import ChunkPreviewRequest |
| from app.services.heuristics import ScientificChunker |
|
|
| import psycopg |
|
|
|
|
| @dataclass |
| class SourceManifestEntry: |
| source_id: str |
| version_id: str |
| source_class: str |
| therapy_area: str |
| geography: str |
| audience: list[str] |
| text: str |
|
|
|
|
| def _load_manifest(manifest_path: Path) -> list[SourceManifestEntry]: |
| data = json.loads(manifest_path.read_text()) |
| base_dir = manifest_path.parent |
| entries: list[SourceManifestEntry] = [] |
| for raw in data.get("sources", []): |
| text_file = raw.get("text_file") |
| if text_file: |
| text = (base_dir / text_file).read_text() |
| elif "text" in raw: |
| text = raw["text"] |
| else: |
| raise ValueError(f"Manifest entry {raw.get('source_id')!r} has no text_file or text") |
| entries.append( |
| SourceManifestEntry( |
| source_id=raw["source_id"], |
| version_id=raw["version_id"], |
| source_class=raw["source_class"], |
| therapy_area=raw["therapy_area"], |
| geography=raw["geography"], |
| audience=list(raw.get("audience", ["HCP", "Internal"])), |
| text=text, |
| ) |
| ) |
| return entries |
|
|
|
|
| def _dsn_from_sqlalchemy_url(url: str) -> str: |
| |
| if url.startswith("postgresql+psycopg://"): |
| return url.replace("postgresql+psycopg://", "postgresql://", 1) |
| return url |
|
|
|
|
| def _ensure_source_registered(cursor: "psycopg.Cursor", entry: SourceManifestEntry) -> None: |
| cursor.execute( |
| "SELECT 1 FROM sources WHERE source_id = %s", |
| (entry.source_id,), |
| ) |
| if cursor.fetchone() is None: |
| raise RuntimeError( |
| f"Source {entry.source_id!r} is not registered in `sources`. " |
| f"Register the source and version via ingestion-service (or seed scripts) before running ingest." |
| ) |
| cursor.execute( |
| "SELECT 1 FROM source_versions WHERE source_id = %s AND version_id = %s", |
| (entry.source_id, entry.version_id), |
| ) |
| if cursor.fetchone() is None: |
| raise RuntimeError( |
| f"Source version {entry.version_id!r} for {entry.source_id!r} is not registered in `source_versions`." |
| ) |
|
|
|
|
| def _delete_existing_chunks(cursor: "psycopg.Cursor", source_id: str, version_id: str) -> int: |
| cursor.execute( |
| "DELETE FROM chunks WHERE source_id = %s AND version_id = %s", |
| (source_id, version_id), |
| ) |
| return cursor.rowcount or 0 |
|
|
|
|
| def _insert_chunk(cursor: "psycopg.Cursor", row: dict[str, Any]) -> None: |
| cursor.execute( |
| """ |
| INSERT INTO chunks ( |
| chunk_id, source_id, version_id, text, claim_type, |
| section_path, page_start, page_end, token_count, |
| audience_fit, geography_fit, therapy_area, embedding_id, created_at |
| ) VALUES ( |
| %(chunk_id)s, %(source_id)s, %(version_id)s, %(text)s, %(claim_type)s, |
| %(section_path)s, %(page_start)s, %(page_end)s, %(token_count)s, |
| %(audience_fit)s::json, %(geography_fit)s, %(therapy_area)s, %(embedding_id)s, %(created_at)s |
| ) |
| """, |
| row, |
| ) |
|
|
|
|
| def _relink_claims_for_source(cursor: "psycopg.Cursor", source_id: str) -> int: |
| """Restore claim_evidence_links for any claim whose ``primary_source_id`` is this source. |
| |
| Replacing chunks cascades-deletes prior evidence links, so we re-anchor each |
| claim to the chunk in the same source whose ``claim_type`` matches and whose |
| text shares the most distinctive tokens with the claim's canonical text. |
| """ |
| cursor.execute( |
| "SELECT claim_id, claim_type, canonical_text FROM claims WHERE primary_source_id = %s", |
| (source_id,), |
| ) |
| claims = cursor.fetchall() |
| if not claims: |
| return 0 |
|
|
| cursor.execute( |
| "SELECT chunk_id, claim_type, text FROM chunks WHERE source_id = %s", |
| (source_id,), |
| ) |
| chunks = cursor.fetchall() |
| if not chunks: |
| return 0 |
|
|
| links_created = 0 |
| for claim_id, claim_type, canonical_text in claims: |
| claim_tokens = { |
| token |
| for token in (word.strip(".,;:()").lower() for word in str(canonical_text).split()) |
| if len(token) > 3 |
| } |
| best_chunk_id = None |
| best_score = -1 |
| claim_type_value = str(claim_type) |
| |
| normalized_claim_type = claim_type_value.lower().split(".")[-1] |
| for chunk_id, chunk_claim_type, chunk_text in chunks: |
| chunk_tokens = { |
| token |
| for token in (word.strip(".,;:()").lower() for word in str(chunk_text).split()) |
| if len(token) > 3 |
| } |
| overlap = len(claim_tokens & chunk_tokens) |
| if str(chunk_claim_type).lower() == normalized_claim_type: |
| overlap += 5 |
| if overlap > best_score: |
| best_score = overlap |
| best_chunk_id = chunk_id |
| if best_chunk_id is None: |
| continue |
| cursor.execute( |
| """ |
| INSERT INTO claim_evidence_links ( |
| claim_id, chunk_id, source_id, support_type, extraction_confidence, is_primary_support |
| ) VALUES (%s, %s, %s, %s, %s, %s) |
| ON CONFLICT DO NOTHING |
| """, |
| (claim_id, best_chunk_id, source_id, "PRIMARY", 0.7, True), |
| ) |
| links_created += cursor.rowcount or 0 |
| return links_created |
|
|
|
|
| def _insert_embedding(cursor: "psycopg.Cursor", row: dict[str, Any]) -> None: |
| cursor.execute( |
| """ |
| INSERT INTO chunk_embeddings ( |
| chunk_id, embedding_id, embedding_vector, embedding_model, created_at |
| ) VALUES ( |
| %(chunk_id)s, %(embedding_id)s, %(embedding_vector)s::json, %(embedding_model)s, %(created_at)s |
| ) |
| """, |
| row, |
| ) |
|
|
|
|
| def _build_ollama_client(): |
| from ollama_client.client import OllamaClient, OllamaSettings |
|
|
| enabled = os.getenv("OLLAMA_ENABLED", "true").lower() in {"1", "true", "yes"} |
| return OllamaClient( |
| OllamaSettings( |
| enabled=enabled, |
| base_url=os.getenv("OLLAMA_BASE_URL", "http://127.0.0.1:11434"), |
| embedding_model=os.getenv("OLLAMA_EMBEDDING_MODEL", "qwen3-embedding:8b"), |
| generation_model=os.getenv("OLLAMA_GENERATION_MODEL", "qwen3.5:9b"), |
| request_timeout_seconds=float(os.getenv("OLLAMA_TIMEOUT_SECONDS", "120")), |
| ) |
| ) |
|
|
|
|
| def ingest( |
| manifest_path: Path, |
| dsn: str, |
| *, |
| embed: bool, |
| max_tokens: int, |
| overlap_tokens: int, |
| min_tokens: int, |
| force_fixtures: bool, |
| ) -> dict[str, Any]: |
| entries = _load_manifest(manifest_path) |
| if not force_fixtures: |
| blocked = [e for e in entries if e.source_id in PROTECTED_SOURCE_IDS] |
| if blocked: |
| names = ", ".join(e.source_id for e in blocked) |
| raise SystemExit( |
| f"Refusing to ingest protected integration-test sources: {names}\n" |
| "Use a separate manifest for new corpus work, or pass --force-fixtures " |
| "(then run: python3 scripts/restore_postgres_fixtures.py to undo)." |
| ) |
| chunker = ScientificChunker() |
| ollama = _build_ollama_client() if embed else None |
| embedding_model = None |
| if embed: |
| assert ollama is not None |
| if not ollama.settings.enabled: |
| raise RuntimeError("Embedding requested but OLLAMA_ENABLED is false in .env.") |
| embedding_model = ollama.resolved_embedding_model() |
| if not embedding_model: |
| raise RuntimeError( |
| f"Required Ollama embedding model {ollama.settings.embedding_model!r} is not " |
| f"registered on {ollama.settings.base_url}. Pull it with: " |
| f"ollama pull {ollama.settings.embedding_model}" |
| ) |
|
|
| summary: dict[str, Any] = { |
| "manifest": str(manifest_path), |
| "sources": [], |
| "embedding_model": embedding_model, |
| } |
|
|
| conn = psycopg.connect(dsn) |
| try: |
| with conn: |
| with conn.cursor() as cursor: |
| for entry in entries: |
| _ensure_source_registered(cursor, entry) |
| deleted = _delete_existing_chunks(cursor, entry.source_id, entry.version_id) |
|
|
| chunks = chunker.preview( |
| ChunkPreviewRequest( |
| source_id=entry.source_id, |
| version_id=entry.version_id, |
| source_class=entry.source_class, |
| therapy_area=entry.therapy_area, |
| geography_scope=entry.geography, |
| audience_suitability=entry.audience, |
| text=entry.text, |
| max_tokens=max_tokens, |
| overlap_tokens=overlap_tokens, |
| min_tokens=min_tokens, |
| ) |
| ) |
|
|
| now = datetime.now(UTC) |
| inserted_chunks = 0 |
| inserted_embeddings = 0 |
| for chunk in chunks: |
| embedding_id = None |
| embedding_vector: list[float] | None = None |
| if embed: |
| assert ollama is not None |
| embedding_vector = ollama.embed(chunk.text) |
| embedding_id = f"emb-{uuid.uuid4()}" |
|
|
| _insert_chunk( |
| cursor, |
| { |
| "chunk_id": chunk.chunk_id, |
| "source_id": chunk.source_id, |
| "version_id": chunk.version_id, |
| "text": chunk.text, |
| |
| "claim_type": chunk.claim_type.name if hasattr(chunk.claim_type, "name") else str(chunk.claim_type), |
| "section_path": chunk.section_path, |
| "page_start": chunk.page_start, |
| "page_end": chunk.page_end, |
| "token_count": chunk.token_count, |
| "audience_fit": json.dumps(chunk.audience_fit), |
| "geography_fit": chunk.geography_fit, |
| "therapy_area": chunk.therapy_area, |
| "embedding_id": embedding_id, |
| "created_at": now, |
| }, |
| ) |
| inserted_chunks += 1 |
|
|
| if embed and embedding_vector is not None: |
| _insert_embedding( |
| cursor, |
| { |
| "chunk_id": chunk.chunk_id, |
| "embedding_id": embedding_id, |
| "embedding_vector": json.dumps(embedding_vector), |
| "embedding_model": embedding_model, |
| "created_at": now, |
| }, |
| ) |
| inserted_embeddings += 1 |
|
|
| relinked = _relink_claims_for_source(cursor, entry.source_id) |
|
|
| summary["sources"].append( |
| { |
| "source_id": entry.source_id, |
| "version_id": entry.version_id, |
| "chunks_deleted": deleted, |
| "chunks_inserted": inserted_chunks, |
| "embeddings_inserted": inserted_embeddings, |
| "claim_evidence_links_created": relinked, |
| } |
| ) |
| finally: |
| conn.close() |
|
|
| return summary |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument( |
| "--manifest", |
| type=Path, |
| default=REPO_ROOT / "data" / "seed_sources" / "manifest.json", |
| help="Path to manifest JSON (default: data/seed_sources/manifest.json)", |
| ) |
| parser.add_argument( |
| "--database-url", |
| default=os.getenv( |
| "AKS_DATABASE_URL", |
| "postgresql+psycopg://mobcoderid-296@localhost/ai_knowledge_spine", |
| ), |
| help="SQLAlchemy or libpq URL for Postgres", |
| ) |
| parser.add_argument("--no-embed", action="store_true", help="Skip Ollama embeddings (chunks only)") |
| parser.add_argument("--max-tokens", type=int, default=220) |
| parser.add_argument("--overlap-tokens", type=int, default=30) |
| parser.add_argument("--min-tokens", type=int, default=20) |
| parser.add_argument( |
| "--force-fixtures", |
| action="store_true", |
| help="Allow ingest on integration-test source IDs (destructive; restore fixtures after).", |
| ) |
| args = parser.parse_args() |
|
|
| dsn = _dsn_from_sqlalchemy_url(args.database_url) |
| summary = ingest( |
| args.manifest, |
| dsn, |
| embed=not args.no_embed, |
| max_tokens=args.max_tokens, |
| overlap_tokens=args.overlap_tokens, |
| min_tokens=args.min_tokens, |
| force_fixtures=args.force_fixtures, |
| ) |
| print(json.dumps(summary, indent=2, default=str)) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|