"""Build a local RAG index from the approved source registry.""" from __future__ import annotations import argparse import hashlib import json import sys from datetime import datetime, timezone from pathlib import Path from urllib.parse import urljoin, urlparse from urllib.robotparser import RobotFileParser import numpy as np import requests from .config import get_settings from .embeddings import create_embedding_backend from .index_store import save_index from .sources import SourceRecord, load_source_registry from .text import TextChunk, chunk_text, html_to_text, normalize_text def _robots_allowed(source: SourceRecord) -> bool: if not source.url_text or not source.crawl.respect_robots: return True parsed = urlparse(source.url_text) robots_url = urljoin(f"{parsed.scheme}://{parsed.netloc}", "/robots.txt") parser = RobotFileParser(robots_url) parser.set_url(robots_url) try: parser.read() except Exception: return False return parser.can_fetch(source.crawl.user_agent, source.url_text) def fetch_source_text(source: SourceRecord, registry_dir: Path) -> tuple[str, list[str]]: """Fetch and normalize one approved source.""" warnings: list[str] = [] if source.path: path = Path(source.path) if not path.is_absolute(): path = registry_dir / path if not path.exists(): raise FileNotFoundError(f"source path not found for {source.id}: {path}") raw = path.read_text(encoding="utf-8") if path.suffix.lower() in {".html", ".htm"}: return html_to_text(raw), warnings return normalize_text(raw), warnings if not source.url_text: raise ValueError(f"source {source.id} has no url or path") if not source.crawl.enabled: raise ValueError(f"source {source.id} crawl.enabled is false") if not _robots_allowed(source): raise PermissionError(f"robots.txt disallows fetching {source.url_text}") headers = {"User-Agent": source.crawl.user_agent} response = requests.get(source.url_text, headers=headers, timeout=source.crawl.timeout_seconds) response.raise_for_status() content_type = response.headers.get("content-type", "").lower() if "text/html" in content_type or source.url_text.endswith((".html", ".htm", "/")): text = html_to_text(response.text) else: text = normalize_text(response.text) if not text: warnings.append(f"source {source.id} produced no text") return text, warnings def build_index( *, sources_path: str | Path, index_dir: str | Path, embedding_backend: str, embedding_model: str, embedding_dimensions: int, chunk_words: int = 260, overlap_words: int = 50, ) -> dict: """Build and persist the local vector index.""" sources_path = Path(sources_path) registry = load_source_registry(sources_path) registry_dir = sources_path.parent if sources_path.parent else Path.cwd() embedder = create_embedding_backend( embedding_backend, model_name=embedding_model, dimensions=embedding_dimensions, ) chunks: list[TextChunk] = [] warnings: list[str] = [] ingested_sources: list[str] = [] seen_hashes: set[str] = set() for source in registry.approved_ingestable_sources(): try: text, source_warnings = fetch_source_text(source, registry_dir) except Exception as exc: warnings.append(f"skipped {source.id}: {exc}") continue warnings.extend(source_warnings) source_chunks = chunk_text( source_id=source.id, title=source.title, text=text, url=source.url_text, license=source.license, attribution=source.attribution, tags=source.tags, chunk_words=chunk_words, overlap_words=overlap_words, ) for chunk in source_chunks: digest = hashlib.sha256(chunk.text.encode("utf-8")).hexdigest() if digest in seen_hashes: continue seen_hashes.add(digest) chunks.append(chunk) if source_chunks: ingested_sources.append(source.id) if chunks: vectors = embedder.encode([chunk.text for chunk in chunks]) else: vectors = np.zeros((0, embedder.dimensions), dtype=np.float32) try: registry_label = str(sources_path.resolve().relative_to(Path.cwd().resolve())) except ValueError: registry_label = str(sources_path) manifest = { "created_at": datetime.now(timezone.utc).isoformat(), "source_registry": registry_label, "registered_source_count": len(registry.sources), "ingested_source_count": len(ingested_sources), "chunk_count": len(chunks), "embedding_backend": embedder.name, "embedding_dimensions": int(vectors.shape[1]) if vectors.ndim == 2 else embedder.dimensions, "ingested_source_ids": ingested_sources, "warnings": warnings, } save_index(index_dir, chunks=chunks, vectors=vectors, manifest=manifest) return manifest def main(argv: list[str] | None = None) -> int: settings = get_settings() parser = argparse.ArgumentParser(description="Build the GameMaster Copilot RAG index.") parser.add_argument("--sources", default=str(settings.sources_path), help="Path to sources.yaml") parser.add_argument("--index-dir", default=str(settings.index_dir), help="Output index directory") parser.add_argument("--embedding-backend", default=settings.embedding_backend) parser.add_argument("--embedding-model", default=settings.embedding_model) parser.add_argument("--embedding-dimensions", type=int, default=settings.embedding_dimensions) parser.add_argument("--chunk-words", type=int, default=260) parser.add_argument("--overlap-words", type=int, default=50) args = parser.parse_args(argv) manifest = build_index( sources_path=args.sources, index_dir=args.index_dir, embedding_backend=args.embedding_backend, embedding_model=args.embedding_model, embedding_dimensions=args.embedding_dimensions, chunk_words=args.chunk_words, overlap_words=args.overlap_words, ) sys.stdout.write(json.dumps(manifest, indent=2) + "\n") return 0 if __name__ == "__main__": raise SystemExit(main())