""" Script de ingesta: vectoriza los capítulos de ESL e ISLP en ChromaDB. Uso: python -m rag_books_mcp.ingest --books-dir ../ebook Esto crea/actualiza la base vectorial en ./chroma_db/ """ import os import re import argparse from pathlib import Path import chromadb from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction # --- Configuración --- EMBEDDING_MODEL = "all-MiniLM-L6-v2" CHUNK_SIZE = 600 # tokens aprox (caracteres / 4) CHUNK_OVERLAP = 100 CHROMA_DIR = Path(__file__).parent.parent / "chroma_db" BOOKS_CONFIG = { "esl": { "dir_name": "capitulos_TheElementsOfStatisticalLearning", "collection": "esl_chapters", "full_name": "The Elements of Statistical Learning (Hastie, Tibshirani, Friedman)", }, "islp": { "dir_name": "capitulos_islp", "collection": "islp_chapters", "full_name": "An Introduction to Statistical Learning with Python (James, Witten, Hastie, Tibshirani)", }, "fes": { "dir_name": "capitulos_fes", "collection": "fes_chapters", "full_name": "Feature Engineering and Selection (Kuhn, Johnson)", }, "pdsh": { "dir_name": "capitulos_pdsh", "collection": "pdsh_chapters", "full_name": "Python Data Science Handbook (VanderPlas)", }, "r4ds": { "dir_name": "capitulos_r4ds", "collection": "r4ds_chapters", "full_name": ( "R for Data Science, 2nd Edition " "(Wickham, Çetinkaya-Rundel, Grolemund) — examples in R; " "principles transfer to pandas/Python" ), # ⚠️ R4DS está bajo licencia CC BY-NC-ND 3.0 US (NoDerivatives). # Originalmente local_only=True para no redistribuir; se cambió a False # para uso académico explícito. La decisión y el mecanismo de takedown # están documentados en DATA_CARD.md. "local_only": False, }, } def extract_chapter_info(filename: str) -> dict: """Extrae número de archivo y nombre del capítulo del filename.""" # Formato: 04_3_Linear_Methods_for_Regression.md stem = Path(filename).stem parts = stem.split("_", 1) file_order = parts[0] if parts else "00" chapter_title = parts[1].replace("_", " ") if len(parts) > 1 else stem return {"file_order": file_order, "chapter_title": chapter_title} def split_by_sections(text: str, chapter_title: str) -> list[dict]: """ Divide el texto en secciones usando headers markdown (# y ##). Cada sección se subdivide en chunks si es muy larga. """ # Patrón para detectar headers de nivel 1-3 header_pattern = re.compile(r"^(#{1,3})\s+(.+)$", re.MULTILINE) sections = [] matches = list(header_pattern.finditer(text)) if not matches: # Sin headers, tratar todo como una sección sections.append({"title": chapter_title, "level": 1, "content": text.strip()}) else: # Texto antes del primer header pre_text = text[: matches[0].start()].strip() if pre_text and len(pre_text) > 50: sections.append({"title": chapter_title, "level": 1, "content": pre_text}) for i, match in enumerate(matches): level = len(match.group(1)) title = match.group(2).strip() start = match.end() end = matches[i + 1].start() if i + 1 < len(matches) else len(text) content = text[start:end].strip() if content and len(content) > 30: sections.append({"title": title, "level": level, "content": content}) return sections def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]: """ Divide texto en chunks por caracteres con overlap. Intenta cortar en saltos de línea o puntos para no romper oraciones. """ # Convertir chunk_size de tokens aprox a caracteres (1 token ≈ 4 chars) char_size = chunk_size * 4 char_overlap = overlap * 4 if len(text) <= char_size: return [text] chunks = [] start = 0 while start < len(text): end = start + char_size if end < len(text): # Buscar un buen punto de corte (párrafo o punto) # Primero intentar doble newline (párrafo) cut_point = text.rfind("\n\n", start + char_size // 2, end) if cut_point == -1: # Intentar punto seguido de espacio cut_point = text.rfind(". ", start + char_size // 2, end) if cut_point != -1: cut_point += 1 # incluir el punto if cut_point == -1: # Intentar newline simple cut_point = text.rfind("\n", start + char_size // 2, end) if cut_point == -1: cut_point = end end = cut_point chunk = text[start:end].strip() if chunk: chunks.append(chunk) start = end - char_overlap if start >= len(text): break return chunks def clean_text(text: str) -> str: """Limpia artefactos de la extracción PDF / scraping HTML.""" # Eliminar YAML frontmatter al inicio (---\nkey: value\n---) text = re.sub(r"^---\s*\n.*?\n---\s*\n", "", text, count=1, flags=re.DOTALL) # Eliminar marcadores de página text = re.sub(r"---\s*Página\s*\d+\s*---", "", text) # Eliminar líneas con solo números (números de página sueltos) text = re.sub(r"^\d+\s*$", "", text, flags=re.MULTILINE) # Reducir múltiples líneas vacías text = re.sub(r"\n{4,}", "\n\n\n", text) # Eliminar copyright notices text = re.sub(r"©.*?(?:\n|$)", "", text) return text.strip() def ingest_book(books_dir: Path, book_key: str, client: chromadb.ClientAPI, embedding_fn): """Ingesta un libro completo en ChromaDB.""" config = BOOKS_CONFIG[book_key] chapters_dir = books_dir / config["dir_name"] if not chapters_dir.exists(): print(f" ⚠️ Directorio no encontrado: {chapters_dir}") return 0 # Crear o obtener colección (reset si existe) try: client.delete_collection(config["collection"]) except Exception: pass collection = client.get_or_create_collection( name=config["collection"], embedding_function=embedding_fn, metadata={"hnsw:space": "cosine"}, ) total_chunks = 0 md_files = sorted(chapters_dir.glob("*.md")) # Excluir READMEs y notas auxiliares: solo capítulos NN_*.md md_files = [f for f in md_files if not f.name.upper().startswith("README")] print(f"\n 📚 {config['full_name']}") print(f" Archivos encontrados: {len(md_files)}") for md_file in md_files: chapter_info = extract_chapter_info(md_file.name) raw_text = md_file.read_text(encoding="utf-8") text = clean_text(raw_text) if len(text) < 100: continue # Dividir en secciones sections = split_by_sections(text, chapter_info["chapter_title"]) for section in sections: # Dividir secciones largas en chunks chunks = chunk_text(section["content"]) for i, chunk in enumerate(chunks): chunk_id = f"{book_key}_{chapter_info['file_order']}_{section['title'][:30]}_{i}" # Sanitizar ID chunk_id = re.sub(r"[^a-zA-Z0-9_-]", "_", chunk_id) metadata = { "book": book_key, "book_full_name": config["full_name"], "chapter": chapter_info["chapter_title"], "section": section["title"], "section_level": section["level"], "chunk_index": i, "total_chunks_in_section": len(chunks), "file": md_file.name, } collection.add( ids=[chunk_id], documents=[chunk], metadatas=[metadata], ) total_chunks += 1 print(f" ✓ {md_file.name} → {len(sections)} secciones") print(f" Total chunks: {total_chunks}") return total_chunks def main(): parser = argparse.ArgumentParser(description="Ingesta de libros ESL/ISLP en ChromaDB") parser.add_argument( "--books-dir", type=Path, default=Path(__file__).parent.parent.parent / "ebook", help="Directorio raíz con las carpetas de capítulos", ) parser.add_argument( "--chroma-dir", type=Path, default=CHROMA_DIR, help="Directorio para la base de datos ChromaDB", ) args = parser.parse_args() print("🔧 Inicializando embedding model...") embedding_fn = SentenceTransformerEmbeddingFunction(model_name=EMBEDDING_MODEL) print(f"🗄️ ChromaDB persistente en: {args.chroma_dir}") client = chromadb.PersistentClient(path=str(args.chroma_dir)) print("\n📖 Iniciando ingesta de libros...") total = 0 for book_key in BOOKS_CONFIG: total += ingest_book(args.books_dir, book_key, client, embedding_fn) print(f"\n✅ Ingesta completada. Total de chunks vectorizados: {total}") print(f" Base de datos en: {args.chroma_dir}") if __name__ == "__main__": main()