import os from dotenv import load_dotenv from qdrant_client import QdrantClient from neo4j import GraphDatabase from src.config import QDRANT_URL, QDRANT_API_KEY, COLLECTION_NAME, NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD from src.entity_extraction import process_chunk_for_graph """ neo4j_ingestion.py Scrapes chunks from Qdrant, extracts medical entities, and pushes structured nodes and relationships to the Neo4j Knowledge Graph. """ def get_qdrant_chunks(): """Scrolls through Qdrant and yields chunks.""" client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY) offset = None while True: res, next_offset = client.scroll( collection_name=COLLECTION_NAME, limit=100, offset=offset, with_payload=True, with_vectors=False ) for point in res: yield point.id, point.payload.get("text", "") if next_offset is None: break offset = next_offset def ingest_to_neo4j(): """Processes chunks and pushes them to Neo4j.""" driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) total_processed = 0 entities_added = set() rels_added = 0 with driver.session() as session: for chunk_id, text in get_qdrant_chunks(): if not text: continue entities, relationships = process_chunk_for_graph(text) # 1. Merge Nodes for ent in entities: ent_name = ent["name"] ent_type = ent["type"].capitalize() # Drug, Disease, Symptom, Dosage # Cypher injection safe matching via params c_query = f"MERGE (n:{ent_type} {{name: $name}})" session.run(c_query, {"name": ent_name}) entities_added.add((ent_name, ent_type)) # 2. Merge Relationships for rel in relationships: src_name = rel["source_name"] tgt_name = rel["target_name"] rel_type = rel["type"].upper() # TREATS, HAS_SIDE_EFFECT, APPLIES_TO # Match by name to avoid creating duplicate loose nodes c_query = f""" MATCH (s {{name: $src_name}}) MATCH (t {{name: $tgt_name}}) MERGE (s)-[r:{rel_type}]->(t) """ session.run(c_query, {"src_name": src_name, "tgt_name": tgt_name}) rels_added += 1 total_processed += 1 if total_processed % 50 == 0: print(f"Processed {total_processed} chunks...") driver.close() print(f"\nIngestion Complete!") print(f"Total chunks processed: {total_processed}") print(f"Unique Entities Merged: {len(entities_added)}") print(f"Relationships Merged: {rels_added}") if __name__ == "__main__": load_dotenv(override=True) print("Starting Neo4j Ingestion Pipeline...") ingest_to_neo4j()