import json from typing import List, Dict, Any, Tuple import warnings import psycopg2 from psycopg2.extras import Json from src.config import DB_HOST, DB_USER, DB_PASSWORD, DB_NAME warnings.warn("database_loader.py is deprecated. Vector ingestion is now handled via the Kaggle pipeline into Qdrant Cloud. Do not use this script for vector uploads.", DeprecationWarning, stacklevel=2) """ database_loader.py What it does: Loads extracted text chunks, embeddings, entities, and relationships into the PostgreSQL database. How it works: It uses `psycopg2` to connect to PostgreSQL. It defines helper functions to insert data into the respective tables: `sources`, `chunks`, `embeddings`, `entities`, and `relationships`. It handles foreign key relationships by returning the generated IDs. Example input: - chunks: [{"text": "...", "index": 0, "token_count": 50}] - embeddings: [[0.1, 0.2, ...], ...] - entities: [{"name": "Pemetrexed", "type": "drug"}, ...] Example output: - Returns the database ID of the inserted source document. """ def get_connection(): """Establishes and returns a database connection.""" return psycopg2.connect( host=DB_HOST, user=DB_USER, password=DB_PASSWORD, dbname=DB_NAME ) def insert_source(conn, name: str, doc_type: str, disease: str, version: str, content_raw: str) -> int: """Inserts a source document and returns its ID.""" with conn.cursor() as cur: cur.execute( """ INSERT INTO sources (name, type, disease, version, content_raw) VALUES (%s, %s, %s, %s, %s) RETURNING id; """, (name, doc_type, disease, version, content_raw) ) source_id = cur.fetchone()[0] return source_id def insert_chunk(conn, source_id: int, chunk_text: str, chunk_index: int, token_count: int) -> int: """Inserts a chunk and returns its ID.""" with conn.cursor() as cur: cur.execute( """ INSERT INTO chunks (source_id, chunk_text, chunk_index, token_count) VALUES (%s, %s, %s, %s) RETURNING id; """, (source_id, chunk_text, chunk_index, token_count) ) chunk_id = cur.fetchone()[0] return chunk_id def insert_embedding(conn, chunk_id: int, embedding: List[float]): """Inserts a vector embedding for a chunk.""" with conn.cursor() as cur: # pgvector accepts vector format as a string '[0.1, 0.2, ...]' vector_str = '[' + ','.join(map(str, embedding)) + ']' cur.execute( """ INSERT INTO embeddings (chunk_id, embedding) VALUES (%s, %s); """, (chunk_id, vector_str) ) def insert_entity(conn, name: str, entity_type: str, source_id: int, properties: Dict = None) -> int: """Inserts an entity. If it exists, returns existing ID (simplified for V1).""" if properties is None: properties = {} with conn.cursor() as cur: cur.execute( """ INSERT INTO entities (name, entity_type, source_id, properties) VALUES (%s, %s, %s, %s) RETURNING id; """, (name, entity_type, source_id, Json(properties)) ) entity_id = cur.fetchone()[0] return entity_id def insert_relationship(conn, source_entity_id: int, target_entity_id: int, rel_type: str, confidence: float, source_id: int): """Inserts a relationship between two entities.""" with conn.cursor() as cur: cur.execute( """ INSERT INTO relationships (source_entity_id, target_entity_id, relationship_type, confidence, source_id) VALUES (%s, %s, %s, %s, %s); """, (source_entity_id, target_entity_id, rel_type, confidence, source_id) ) def load_document_data(source_metadata: Dict, chunks: List[Dict], embeddings: List[List[float]], chunk_entities: List[Tuple[List, List]]): """ Main orchestrator function to load all parsed data for a document into the database. """ conn = get_connection() try: # 1. Insert Source source_id = insert_source( conn, name=source_metadata.get("name", "Unknown"), doc_type=source_metadata.get("type", "document"), disease=source_metadata.get("disease", "unknown"), version=source_metadata.get("version", "1.0"), content_raw=source_metadata.get("content_raw", "") ) # Dictionary to keep track of inserted entities to avoid duplicates in this run entity_id_map = {} # 2. Insert Chunks & Embeddings for i, chunk in enumerate(chunks): chunk_id = insert_chunk( conn, source_id=source_id, chunk_text=chunk["text"], chunk_index=chunk["index"], token_count=chunk.get("token_count", 0) ) # Insert corresponding embedding if i < len(embeddings): insert_embedding(conn, chunk_id, embeddings[i]) # 3. Insert Entities and Relationships for this chunk if i < len(chunk_entities): entities, relationships = chunk_entities[i] # Insert Entities for ent in entities: key = (ent["name"], ent["type"]) if key not in entity_id_map: ent_id = insert_entity(conn, ent["name"], ent["type"], source_id) entity_id_map[key] = ent_id # Insert Relationships for rel in relationships: src_key = (rel["source_name"], "dosage" if rel["type"] == "applies_to" else "drug") tgt_key = (rel["target_name"], "disease" if rel["type"] == "treats" else "symptom" if rel["type"] == "has_side_effect" else "drug") # Fuzzy resolution for simplicity in V1 src_id = entity_id_map.get(src_key) tgt_id = entity_id_map.get(tgt_key) if src_id and tgt_id: insert_relationship( conn, src_id, tgt_id, rel["type"], rel.get("confidence", 0.5), source_id ) conn.commit() return source_id except Exception as e: conn.rollback() raise e finally: conn.close() if __name__ == "__main__": print("Database loader module ready.")