from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware import os import json import sqlite3 import re from dotenv import load_dotenv # LangChain / Pinecone imports from langchain_core.output_parsers import StrOutputParser from langchain_google_genai import ChatGoogleGenerativeAI from langchain_core.messages import HumanMessage, AIMessage from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.sqlite import SqliteSaver from langchain_core.documents import Document from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder, SystemMessagePromptTemplate from typing import Optional from langchain_pinecone import PineconeVectorStore from langchain_huggingface import HuggingFaceEmbeddings # --- Load environment variables --- print("Step 1: Loading environment variables...") load_dotenv() PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") os.environ["PINECONE_API_KEY"] = PINECONE_API_KEY os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY print("Step 1: Environment variables loaded.") # --- Initialize Pinecone client --- print("Step 2: Initializing Pinecone client...") from pinecone import Pinecone, ServerlessSpec pc = Pinecone(api_key=PINECONE_API_KEY) print("Step 2: Pinecone client initialized.") # --- Initialize Pinecone indexes --- print("Step 3: Initializing Pinecone indexes...") def init_index(index_name): dimension = 1024 metric = "cosine" if index_name not in pc.list_indexes().names(): pc.create_index(name=index_name, dimension=dimension, metric=metric, spec=ServerlessSpec(cloud='aws', region='us-east-1')) return pc.Index(index_name) # Initialize Pinecone indexes hadith_index = init_index("ahadith-index") quran_index = init_index("quran-rag-index") print("Step 3: Pinecone indexes initialized.") # Connect vector stores print("Step 4: Initializing embedding model and vector stores...") embedding_model = HuggingFaceEmbeddings(model_name="omarelshehy/arabic-english-sts-matryoshka-v2.0") hadith_vectorstore = PineconeVectorStore.from_existing_index("ahadith-index", embedding=embedding_model) Quran_vectorstore = PineconeVectorStore.from_existing_index("quran-rag-index", embedding=embedding_model) print("Step 4: Vector stores connected.") # --- RAG Graph & Prompt Setup --- print("Step 5: Setting up RAG graph and prompt...") # --- LLM --- llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0.7) parser = StrOutputParser() #--- Prompt --- prompt = ChatPromptTemplate( messages=[ SystemMessagePromptTemplate.from_template( """"You are **ISLAMIC KNOWLEDGE BOT**. Your terms: - Solely reply using the given **context: {context}**. - Use the **filter: {Filter}** → just employ sources permitted by the filter. - If you cannot reply from the context or messages' history → respond with an apology. - Deliver metadata: - If **Quran** → show Arabic + English + surah/ayah reference. - If **Hadith** → show English text + Arabic + book + chapter in arabic and english + grade. - Replies must be **short, blunt, and to the question**. - Never use your own expertise or external data. - Use past **message's history** for consistency. - Never provide Quran or Hadith outside the context. - In cases where answers in hadith or quran or both aren't found , mark all the fields of it as none and in summary apologize i couldn't find the answer. - Summary should be a direct answer to the question based on the ayah and hadith in your response, always give **verified scholarly analysis**. - BOOK NAME AS BOOK GIVE COMPLETE BOOK NAME LIKE SAHIH AL MUSLIM - chapEng is english chapter name -Muhammad s.a.w is the last of all prophets(universal truth) - chapAr is arabic chapter name - you must return book,chapter i.e chapEng ,chapter_ar i.e chapArabic -Always strictly format output in the following JSON format: -"Always return strictly valid JSON with double quotes for keys and string values. No extra explanation." {{ "summary": "direct answering the user's question", "references": [ {{ "type": "Quran", "surah": "Al-Ikhlas", "ayahNumber": "1-4", "ayahArabic": "قُلْ هُوَ ٱللَّهُ أَحَدٌ", "ayahEnglish": "Say: He is Allah, One.", "reference": "Quran 112:1" }}, {{ "type": "Hadith", "book": "Sahih al-Bukhari", "chapter": "the chapter of governance", "chapter_ar": " كتاب أحاديث الأنبياء", "grade": "Sahih", "textArabic": "إِنَّمَا الأَعْمَالُ بِالنِّيَّاتِ", "textEnglish": "Actions are judged by intentions." }} ] }}""" ), MessagesPlaceholder(variable_name="messages"), ] ) from typing import TypedDict, Sequence, Annotated, Optional as TypingOptional # --- State schema --- class RAGState(TypedDict): messages: Annotated[list, lambda x, y: x + y] # auto-accumulates documents: Sequence[Document] # keep as list[Document] filters: Annotated[Optional[list[str]], lambda x, y: y or x] # selected sources # --- Retriever --- def retrieveDocuments(state: RAGState): question = state["messages"][-1].content retrieved_docs: list[Document] = [] filter_map = { "bukhari": ("vectorStore", "bukhari", 4), "muslim": ("vectorStore", "muslim", 4), "tirmidhi": ("vectorStore", "tirmidhi", 4), "sunan_nasai": ("vectorStore", "sunan_nasai", 4), "malik": ("vectorStore", "malik", 4), "ahmed": ("vectorStore", "ahmed", 4), "ibnmajah": ("vectorStore", "ibnmajah", 4), "Quran": ("Quran_vectorStore", None, 5), } filters = state.get("filters") or list(filter_map.keys()) if "All" in filters: filters = list(filter_map.keys()) for f in filters: store, namespace, k = filter_map.get(f, (None, None, None)) if store == "vectorStore": retrieved_docs += hadith_vectorstore.similarity_search(question, k=k, namespace=namespace) elif store == "Quran_vectorStore": retrieved_docs += Quran_vectorstore.similarity_search(question, k=k) # The fix is here: if state.get("documents") is None: state["documents"] = [] state["documents"].append(retrieved_docs) if len(state["documents"]) > 2: state["documents"] = state["documents"][-2:] return {"documents": state["documents"]} def flatten(docs): for d in docs: if d is None: continue # skip None values if isinstance(d, list): yield from flatten(d) # recurse into nested lists else: yield d # --- Answer generator --- def generate_answer(state: RAGState): # Keep last 20 messages if len(state["messages"]) > 8: state["messages"] = state["messages"][-8:] # Flatten nested documents flat_documents = list(flatten(state["documents"])) documents_str = "" if flat_documents: # First (oldest) document(s) documents_str += "=== History of Documents (Older Context) ===\n\n" documents_str += ( f"Book: {flat_documents[0].metadata.get('book','Unknown')}\n" f"Chapter (Arabic): {flat_documents[0].metadata.get('chapArabic','Unknown')}\n" f"Chapter (English): {flat_documents[0].metadata.get('chapEng','Unknown')}\n" f"Narrator: {flat_documents[0].metadata.get('narrator','Unknown')}\n" f"{flat_documents[0].page_content}\n\n" ) if len(flat_documents) > 1: # Remaining (newer/current) documents documents_str += "=== Current Document(s) Retrieved for the Question Context ===\n\n" for doc in flat_documents[1:]: # <- start from index 1 to avoid repeating the oldest doc md = doc.metadata or {} # ✅ Detect Quran docs by their fields instead of relying on "type" if "surah_no" in md and "ayah_no_surah" in md: surah_en = md.get("surah_name_en", "Unknown") surah_ar = md.get("surah_name_ar", "") surah_no = int(md.get("surah_no", 0)) if md.get("surah_no") else None ayah_no = int(md.get("ayah_no_surah", 0)) if md.get("ayah_no_surah") else None ayah_ar = md.get("ayah_ar") or md.get("AyahArabic") or "Unknown" ayah_en = md.get("ayah_en") or md.get("AyahEnglish") or "Unknown" ref = f"Quran {surah_no}:{ayah_no}" if surah_no and ayah_no else "Quran" documents_str += ( f"Type: Quran\n" f"Surah: {surah_en} ({surah_ar})\n" f"Surah Number: {surah_no}\n" f"Ayah Number: {ayah_no}\n" f"Ayah Arabic: {ayah_ar}\n" f"Ayah English: {ayah_en}\n" f"Reference: {ref}\n\n" ) elif md.get("type") == "Hadith": documents_str += ( f"Type: Hadith\n" f"Book: {md.get('book','Unknown')}\n" f"Chapter (English): {md.get('chapter','Unknown')}\n" f"Chapter (Arabic): {md.get('chapter_ar','Unknown')}\n" f"Grade: {md.get('grade','Unknown')}\n" f"Text Arabic: {md.get('textArabic','Unknown')}\n" f"Text English: {md.get('textEnglish','Unknown')}\n\n" ) else: # Fallback (old structure) documents_str += ( f"Book: {md.get('book','Unknown')}\n" f"Chapter (Arabic): {md.get('chapArabic','Unknown')}\n" f"Chapter (English): {md.get('chapEng','Unknown')}\n" f"Narrator: {md.get('narrator','Unknown')}\n" f"{doc.page_content}\n\n" ) print("string doc created") full_prompt = prompt.partial(context=documents_str, Filter=state["filters"]) new_message_content = parser.invoke( llm.invoke(full_prompt.invoke({"messages": state["messages"]})) ) state["messages"] = [AIMessage(content=new_message_content)] return state # --- Initialize Graph & Memory --- print("Step 6: Initializing LangGraph workflow and memory...") workflow = StateGraph(state_schema=RAGState) workflow.add_node("retriever", retrieveDocuments) workflow.add_node("generate", generate_answer) workflow.add_edge(START, "retriever") workflow.add_edge("retriever", "generate") workflow.add_edge("generate", END) conn = sqlite3.connect("chatbot_memory.sqlite", check_same_thread=False) memory = SqliteSaver(conn) app_graph = workflow.compile(checkpointer=memory) print("Step 6: Workflow compiled successfully.") # --- FastAPI App --- print("Step 7: Creating FastAPI app instance...") app = FastAPI() @app.get("/") def read_root(): return {"message": "Server is running"} app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) print("Step 7: FastAPI app instance created.") @app.post("/chat") def hadith_bot(payload: dict): question = payload.get("Question") filter_set = payload.get("filters") userID = payload.get("userID") # Invoke the LangGraph workflow bot_response = app_graph.invoke( {"messages": [HumanMessage(content=question)], "filters": filter_set}, config={"configurable": {"thread_id": userID}} ) # Extract the content cleaned = bot_response['messages'][-1].content.strip() # Remove code fences if present cleaned = re.sub(r"^```json|```$", "", cleaned, flags=re.MULTILINE).strip() # Escape problematic backslashes (if any) cleaned = re.sub(r'\\(?!["\\/bfnrtu])', r'\\\\', cleaned) # Attempt to load JSON safely try: parsed = json.loads(cleaned) except json.JSONDecodeError as e: # If JSON parsing fails, fallback to minimal structured response print("JSON parse error:", e) parsed = { "summary": cleaned, "references": [] } return parsed if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860) print("Application startup complete.")