from typing import List, Tuple from qdrant_client import QdrantClient from neo4j import GraphDatabase from src.config import QDRANT_URL, QDRANT_API_KEY, COLLECTION_NAME, TOP_K, NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD from src.embedding import embed_query, embed_sparse_query from src.entity_extraction import extract_entities_from_text """ retrieval.py What it does: Performs vector search against the Qdrant Cloud database and Knowledge Graph queries against Neo4j. How it works: 1. Qdrant: Converts query into a dense vector, returns semantically similar chunks. 2. Neo4j: Extracts entities from the query, finds structured relationships in the graph. 3. retrieve(): Combines both sources, giving high confidence to Neo4j factual statements. """ # Lazy initialization _qdrant_client = None _neo4j_driver = None def get_qdrant_client() -> QdrantClient: global _qdrant_client if _qdrant_client is None: _qdrant_client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY) return _qdrant_client def get_neo4j_driver(): global _neo4j_driver if _neo4j_driver is None and NEO4J_URI: _neo4j_driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) return _neo4j_driver def qdrant_search(query: str, top_k: int = TOP_K) -> dict[str, dict]: """ Executes a Hybrid Search against Qdrant using named vectors. Returns a dictionary of text -> {'vector': score, 'lexical': score}. """ client = get_qdrant_client() # Generate Dense Vector query_vector = embed_query(query) # Generate Sparse Vector sparse_indices, sparse_values = embed_sparse_query(query) if not query_vector: return {} # 1. Query Dense dense_results = client.query_points( collection_name=COLLECTION_NAME, query=query_vector, limit=top_k * 2, with_payload=True ).points # 2. Query Sparse sparse_results = [] if sparse_indices: from qdrant_client.models import SparseVector sparse_results = client.query_points( collection_name=COLLECTION_NAME, query=SparseVector(indices=sparse_indices, values=sparse_values), using="text-sparse", limit=top_k * 2, with_payload=True ).points # Combine Scores combined_docs = {} for res in dense_results: text = res.payload.get("text", "") if text not in combined_docs: combined_docs[text] = {"vector": 0.0, "lexical": 0.0} combined_docs[text]["vector"] = float(res.score) for res in sparse_results: text = res.payload.get("text", "") if text not in combined_docs: combined_docs[text] = {"vector": 0.0, "lexical": 0.0} combined_docs[text]["lexical"] = float(res.score) return combined_docs def neo4j_search(query: str, top_k: int = TOP_K) -> List[Tuple[str, float]]: driver = get_neo4j_driver() if not driver: return [] entities = extract_entities_from_text(query) entity_names = [e["name"] for e in entities] results = [] try: with driver.session() as session: for name in entity_names: c_query = """ MATCH (n)-[r]-(m) WHERE toLower(n.name) CONTAINS toLower($name) RETURN n.name as source, type(r) as rel, m.name as target LIMIT $limit """ records = session.run(c_query, {"name": name, "limit": top_k}) for record in records: rel_type = record['rel'].replace('_', ' ').lower() statement = f"KNOWLEDGE GRAPH: {record['source']} {rel_type} {record['target']}." results.append((statement, 1.0)) # 1.0 confidence for explicit knowledge except Exception as e: print(f"Neo4j Query Error: {e}") # Deduplicate graph statements unique_results = [] seen = set() for text, score in results: if text not in seen: seen.add(text) unique_results.append((text, score)) return unique_results def retrieve(query: str, top_k: int = TOP_K) -> List[Tuple[str, float]]: """Main entry point: Evaluates the Heuristic Formula.""" # 1. Get raw scores from Qdrant Hybrid Search qdrant_docs = qdrant_search(query, top_k) # 2. Get Graph Evidence n_results = neo4j_search(query, top_k) graph_texts = {res[0] for res in n_results} final_scores = [] # 3. Apply Heuristic Formula to Qdrant Chunks for text, scores in qdrant_docs.items(): lexical = scores["lexical"] vector = scores["vector"] evidence = 0.8 # Standard baseline evidence confidence # Check Graph Bonus graph_bonus = 1.0 if any(g_text in text for g_text in graph_texts) else 0.0 # Heuristic Formula final_score = (0.45 * lexical) + (0.20 * vector) + (0.25 * evidence) + (0.10 * graph_bonus) final_scores.append((text, final_score)) # Add pure graph results if they exist independently for g_text, g_score in n_results: if g_text not in qdrant_docs: # Pure graph fact final_scores.append((g_text, 0.85)) final_scores.sort(key=lambda x: x[1], reverse=True) return final_scores[:top_k] # Deprecate the old hybrid search functions def hybrid_search(query: str, top_k: int = TOP_K) -> List[Tuple[str, float]]: return retrieve(query, top_k) def vector_search(query: str, top_k: int = TOP_K) -> List[Tuple[str, float]]: return retrieve(query, top_k) def keyword_search(query: str, top_k: int = TOP_K) -> List[Tuple[str, float]]: return retrieve(query, top_k) if __name__ == "__main__": print("Testing Qdrant + Neo4j Retrieval...") results = retrieve("What does Pemetrexed treat?", top_k=5) for i, (text, score) in enumerate(results): print(f"{i+1}. Score: {score:.4f} | Text: {text[:80]}...")