""" streamlit_app.py — Streamlit UI for the Gītā Advisor. Features: - Live pipeline transparency via st.status() — each of the 5 stages expands in real time so the user sees felt emotion, queries, passage scores, and selection rationale before the response begins. - Real token streaming via st.write_stream() — no waiting for the full response before text appears. - Verse explorer — each cited source expands to show Sanskrit, IAST, translation, Śaṅkara's bhāṣya, teaching, and themes. - Explain-in-context — streaming explanation of any cited verse in the light of the current conversation. - Warm spiritual aesthetic matching the original Gradio palette. """ from __future__ import annotations import json import re from dataclasses import dataclass, field from typing import Generator import dspy import streamlit as st from openai import OpenAI from st_copy import copy_button import config from advisor import load_optimized from corpus import EnrichedVerse, Verse from knowledge_base import AdvaitaRetriever, format_passages_for_llm # ── page config (MUST be the first Streamlit call) ──────────────────────────── st.set_page_config( page_title="Gītā Advisor", page_icon="🕉️", layout="centered", initial_sidebar_state="collapsed", menu_items={"Get help": None, "Report a bug": None, "About": None}, ) # ── CSS ─────────────────────────────────────────────────────────────────────── _CSS = """ """ # ── chat avatars ────────────────────────────────────────────────────────────── # 🙏 namaste — the seeker approaching a teacher with folded hands # 🪔 diya — jñāna-dīpa, the lamp of knowledge that dispels avidyā; # Śaṅkara uses this metaphor throughout the Vivekacūḍāmaṇi _AVATAR_USER = "🙏" _AVATAR_ADVISOR = "🪔" # ── streaming field markers ─────────────────────────────────────────────────── # Gemma 4 outputs [[ ## field ]] (no closing ##); standard DSPy uses # [[ ## field ## ]]. config.py patches the DSPy parser to accept both, so # we must do the same here when scanning the raw stream. _RESPONSE_MARKER_RE = re.compile(r"\[\[ ## response(?:\s*##)? \]\]") _SOURCES_MARKER_RE = re.compile(r"\[\[ ## sources_cited(?:\s*##)? \]\]") _REASONING_MARKER_RE = re.compile(r"\[\[ ## reasoning(?:\s*##)? \]\]") _MARKER_MAX_LEN = 30 # len("[[ ## sources_cited ## ]]") — used as stream buffer guard # ── examples ────────────────────────────────────────────────────────────────── _EXAMPLES = [ "I just got laid off and feel like nothing makes sense.", "I'm terrified of dying. Is that irrational?", "I keep hurting the people I love without meaning to.", "I've been meditating for years but still feel empty.", "My ambition feels hollow but I can't stop chasing it.", ] # ── startup helpers ──────────────────────────────────────────────────────────── @st.cache_resource(show_spinner="Loading advisor…") def _load_resources(): """Load all heavy objects once per Space instance.""" config.configure_dspy(backend="hf") advisor = load_optimized() retriever = AdvaitaRetriever() retriever._ensure() synthesis_client = OpenAI( base_url=config.HF_ROUTER_BASE, api_key=config.HF_TOKEN, ) return advisor, retriever, synthesis_client def _init_state(): defaults: dict = { "messages": [], "turn_data": [], "dspy_history": dspy.History(messages=[]), } for k, v in defaults.items(): if k not in st.session_state: st.session_state[k] = v # ── dspy / history helpers ───────────────────────────────────────────────────── def _to_dspy_history(messages: list[dict]) -> dspy.History: """Convert Streamlit messages list to dspy.History, stripping source footers.""" msgs = [] i = 0 while i + 1 < len(messages): u, a = messages[i], messages[i + 1] if u.get("role") == "user" and a.get("role") == "assistant": content = a["content"] if "\n\n---\n" in content: content = content.split("\n\n---\n")[0] msgs.append({ "user_question": u["content"], "response": content, "sources_cited": [], }) i += 2 return dspy.History(messages=msgs) # ── synthesis streaming ──────────────────────────────────────────────────────── @dataclass class _StreamCtx: """Mutable container shared between generator and caller.""" full_text: str = field(default="") def _build_synthesis_messages( advisor, dspy_hist: dspy.History, message: str, felt_emotion: str, deeper_concern: str, selected_text: str, ) -> list[dict]: """Build the exact prompt DSPy would send, preserving GEPA-optimised instructions.""" adapter = dspy.settings.adapter predict = advisor.synthesize.predict sig = predict.signature demos = getattr(predict, "demos", []) return adapter.format(sig, demos, dict( history=dspy_hist, user_question=message, felt_emotion=felt_emotion, deeper_concern=deeper_concern, selected_passages=selected_text, )) def _synthesis_gen( messages: list[dict], client: OpenAI, model: str, ctx: _StreamCtx, ) -> Generator[str, None, None]: """Yield only the response-field tokens; populate ctx.full_text throughout. We keep the last GUARD characters buffered at all times so that a marker arriving split across multiple chunks never gets partially yielded to the UI. """ stream = client.chat.completions.create( model=model, messages=messages, stream=True, temperature=0.6, max_tokens=4096, ) GUARD = _MARKER_MAX_LEN in_response = False buf = "" # response text buffered but not yet yielded for chunk in stream: delta = (chunk.choices[0].delta.content or "") if chunk.choices else "" ctx.full_text += delta if not in_response: m = _RESPONSE_MARKER_RE.search(ctx.full_text) if m is None: continue in_response = True buf = ctx.full_text[m.end():].lstrip("\n") else: buf += delta # Sources marker has arrived — yield safe prefix and stop sm = _SOURCES_MARKER_RE.search(buf) if sm: safe = buf[:sm.start()].rstrip() if safe: yield safe return # Yield only the confirmed-safe portion; keep last GUARD chars buffered # to guard against the marker arriving split across chunks if len(buf) > GUARD: yield buf[:-GUARD] buf = buf[-GUARD:] # Stream ended without a sources marker — flush remaining buffer if in_response and buf: yield buf.rstrip() def _explain_gen( verse: Verse, conversation_context: str, client: OpenAI, model: str, ) -> Generator[str, None, None]: """Stream a contextual explanation of a verse, grounded in Advaita.""" bits: list[str] = [] if verse.translation: bits.append(f"Translation: {verse.translation}") if verse.sanskrit: bits.append(f"Sanskrit: {verse.sanskrit}") if verse.bhashya: bits.append(f"Śaṅkara's commentary: {verse.bhashya[:600]}") ev = verse if isinstance(verse, EnrichedVerse) else None if ev and ev.paraphrase: bits.append(f"Teaching: {ev.paraphrase}") system = ( "You are the Gītā Advisor — a teacher in the Advaita Vedānta lineage of " "Śaṅkarācārya. The user has asked you to explain a verse you cited. Go deeper " "than the translation: show precisely why this verse speaks to their situation " "using their own words. Use Sanskrit terms with brief glosses. Honour the " "two-truths distinction. Close with one concrete practice or perceptual shift " "the person can work with this week. 150–250 words." ) user_msg = ( f"Verse: {verse.verse_ref}\n" + "\n".join(bits) + f"\n\nConversation context:\n{conversation_context}" ) stream = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": system}, {"role": "user", "content": user_msg}, ], stream=True, temperature=0.7, max_tokens=600, ) for chunk in stream: if chunk.choices and chunk.choices[0].delta.content: yield chunk.choices[0].delta.content def _parse_sources_cited(full_text: str) -> list[str]: sm = _SOURCES_MARKER_RE.search(full_text) if not sm: return [] raw = full_text[sm.end():].strip() raw = re.split(r"\[\[", raw)[0].strip() try: result = json.loads(raw) return result if isinstance(result, list) else [] except Exception: m = re.search(r"\[.*?\]", raw, re.DOTALL) if m: try: return json.loads(m.group()) except Exception: pass return [] def _parse_reasoning(full_text: str) -> str: rm = _REASONING_MARKER_RE.search(full_text) resp_m = _RESPONSE_MARKER_RE.search(full_text) if not rm or not resp_m: return "" return full_text[rm.end():resp_m.start()].strip() # ── verse rendering ──────────────────────────────────────────────────────────── def _render_verse_body(verse: Verse): ev = verse if isinstance(verse, EnrichedVerse) else None if verse.sanskrit: st.markdown( f"
" f"{verse.sanskrit}
", unsafe_allow_html=True, ) if verse.transliteration: st.markdown( f"" f"{verse.transliteration}
", unsafe_allow_html=True, ) if verse.translation: label = f"Translation ({verse.translator})" if getattr(verse, "translator", None) else "Translation" st.caption(label) st.write(verse.translation) if verse.bhashya: btr = getattr(verse, "bhashya_translator", None) label = f"Śaṅkara's Bhāṣya ({btr})" if btr else "Śaṅkara's Bhāṣya" st.caption(label) preview = verse.bhashya[:900] + ("…" if len(verse.bhashya) > 900 else "") st.markdown( f"{preview}
", unsafe_allow_html=True, ) if ev: if ev.paraphrase: st.caption("Teaching") st.write(ev.paraphrase) if getattr(ev, "themes", None): st.caption("Themes") st.markdown( " · ".join( f"{t}" for t in ev.themes ), unsafe_allow_html=True, ) if getattr(ev, "practical_teaching", None): st.caption("Practical Shift") st.markdown( f"{ev.practical_teaching}
", unsafe_allow_html=True, ) def _render_verse_cards(turn: dict, turn_idx: int, synthesis_client: OpenAI): """Render expandable verse cards for passages selected during Stage 4. Uses turn["selected_verses"] (list[Verse] stored at pipeline time) so there is no dependency on stream parsing or verse lookup. """ verses: list[Verse] = turn.get("selected_verses", []) if not verses: return st.divider() st.markdown( "" "Selected Passages
", unsafe_allow_html=True, ) for n, verse in enumerate(verses, 1): ref = verse.verse_ref work = getattr(verse, "work_display", None) or getattr(verse, "work", "") section = getattr(verse, "section_display", None) or getattr(verse, "section", "") or "" title = f"[{n}] {ref}" if work: title += f" — {work}" + (f" · {section}" if section else "") explain_key = f"explain_btn_{turn_idx}_{ref}" result_key = f"explain_result_{turn_idx}_{ref}" has_result = result_key in st.session_state with st.expander(title, expanded=has_result): _render_verse_body(verse) st.write("") if st.button( "Explain in context of my situation →", key=explain_key, type="primary", ): ctx_text = turn.get("conversation_context", "No prior conversation.") with st.spinner("Drawing the thread…"): explanation = st.write_stream( _explain_gen(verse, ctx_text, synthesis_client, config.HF_MODEL) ) st.session_state[result_key] = explanation st.rerun() # re-render so copy button appears below the text elif has_result: st.markdown( f"{preview}",
unsafe_allow_html=True,
)
# ── main ──────────────────────────────────────────────────────────────────────
def main():
st.markdown(_CSS, unsafe_allow_html=True)
_init_state()
advisor, retriever, synthesis_client = _load_resources()
# ── header ────────────────────────────────────────────────────────────────
st.markdown(
"" "Grounded in Advaita Vedānta as taught by Śaṅkarācārya
" "✦ ✦ ✦
", unsafe_allow_html=True, ) # ── empty-state: welcome + examples ────────────────────────────────────── if not st.session_state.messages: st.markdown( "Speak from where you actually are.
" "" "The teacher will meet you there.
", unsafe_allow_html=True, ) st.caption("Opening moves") cols = st.columns(2) for i, ex in enumerate(_EXAMPLES): if cols[i % 2].button(ex, key=f"ex_{i}", use_container_width=True): st.session_state["_pending"] = ex st.rerun() # ── render conversation history ─────────────────────────────────────────── total_assistant = sum(1 for m in st.session_state.messages if m["role"] == "assistant") assistant_count = 0 for msg in st.session_state.messages: avatar = _AVATAR_USER if msg["role"] == "user" else _AVATAR_ADVISOR with st.chat_message(msg["role"], avatar=avatar): st.markdown(msg["content"]) if msg["role"] == "assistant": if assistant_count < len(st.session_state.turn_data): turn = st.session_state.turn_data[assistant_count] _render_reasoning_expander(turn) _render_verse_cards(turn, assistant_count, synthesis_client) # Start over button — only below the last assistant response if assistant_count == total_assistant - 1: st.write("") _, btn_col, _ = st.columns([3, 2, 3]) with btn_col: if st.button( "↺ Start over", key="start_over_inline", use_container_width=True, help="Clear this conversation and begin fresh", ): st.session_state.clear() st.rerun() assistant_count += 1 # ── capture new input ───────────────────────────────────────────────────── prompt = st.chat_input("Speak from where you actually are…") if not prompt: prompt = st.session_state.pop("_pending", None) if not prompt: return # Add user message to history and display it immediately st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user", avatar=_AVATAR_USER): st.markdown(prompt) # Build DSPy history from all prior turns (before this one) dspy_hist = _to_dspy_history(st.session_state.messages[:-1]) # Previously selected verse refs (for diversity pressure in SelectPassages) previously_cited = [ ref for turn in st.session_state.turn_data for ref in turn.get("selected_refs", []) ] # ── pipeline execution inside assistant bubble ──────────────────────────── with st.chat_message("assistant", avatar=_AVATAR_ADVISOR): with st.status("Working through your question…", expanded=True) as pipeline_status: # ── Stage 1: Understand ───────────────────────────────────────── st.write("**Understanding your situation**") try: u = advisor.understand(history=dspy_hist, user_question=prompt) except Exception as exc: st.error(f"Understanding stage failed: {exc}") st.session_state.messages.pop() return cols = st.columns(2) with cols[0]: st.caption("Felt Emotion") st.write(u.felt_emotion or "—") with cols[1]: st.caption("Deeper Concern") st.write(u.deeper_concern or "—") if u.vedantic_themes: st.caption("Vedāntic Themes") st.markdown( " · ".join( f"{t}" for t in u.vedantic_themes ), unsafe_allow_html=True, ) # ── Stage 2: Plan retrieval queries ──────────────────────────── st.write("**Planning search queries**") try: p = advisor.plan( surface_concern=u.surface_concern, deeper_concern=u.deeper_concern, vedantic_themes=u.vedantic_themes, ) except Exception as exc: st.error(f"Planning stage failed: {exc}") st.session_state.messages.pop() return queries = p.queries[:config.N_RETRIEVAL_QUERIES] if p.queries else [u.deeper_concern] for q in queries: st.markdown( f"› {q}", unsafe_allow_html=True, ) # ── Stage 3: Retrieve ─────────────────────────────────────────── st.write("**Searching the corpus**") hits = retriever.search_many(queries, k_per=config.TOP_K_RETRIEVE) candidates = hits[:max(8, config.TOP_K_RETRIEVE)] candidates_text = format_passages_for_llm(candidates) st.write(f"Retrieved **{len(candidates)}** passages across {len(queries)} queries") # ── Stage 4: Select passages ──────────────────────────────────── st.write("**Selecting passages**") try: s = advisor.select( deeper_concern=u.deeper_concern, candidate_passages=candidates_text, previously_cited=previously_cited, ) except Exception as exc: st.error(f"Selection stage failed: {exc}") st.session_state.messages.pop() return valid_idx = [ i for i in (s.selected_indices or []) if isinstance(i, int) and 1 <= i <= len(candidates) ] if not valid_idx: valid_idx = list(range(1, min(4, len(candidates) + 1))) selected = [candidates[i - 1] for i in valid_idx] selected_text = format_passages_for_llm(selected) selected_refs = [candidates[i - 1].verse.verse_ref for i in valid_idx] selected_scores = [candidates[i - 1].combined_score for i in valid_idx] for ref, score in zip(selected_refs, selected_scores): st.markdown( f"✓ **{ref}** " f"({score:.2f})", unsafe_allow_html=True, ) if s.selection_rationale: st.markdown( f"" f"{s.selection_rationale}", unsafe_allow_html=True, ) st.write("**Composing response…**") pipeline_status.update(label="✍️ Composing…", state="running") # ── Stage 5: Synthesise with real token streaming ───────────────── ctx = _StreamCtx() response_text = "" try: messages = _build_synthesis_messages( advisor, dspy_hist, prompt, u.felt_emotion, u.deeper_concern, selected_text, ) response_text = st.write_stream( _synthesis_gen(messages, synthesis_client, config.HF_MODEL, ctx) ) except Exception as exc: # Graceful fallback to non-streaming DSPy call try: a = advisor.synthesize( history=dspy_hist, user_question=prompt, felt_emotion=u.felt_emotion, deeper_concern=u.deeper_concern, selected_passages=selected_text, ) response_text = a.response # Reconstruct full_text so _parse_reasoning still works ctx.full_text = ( "[[ ## reasoning ## ]]\n" + (getattr(a, "reasoning", "") or "") + "\n" + "[[ ## response ## ]]\n" + a.response ) st.write(response_text) except Exception as exc2: response_text = f"*Error composing response: {exc2}*" st.error(response_text) # Collapse the pipeline status to a re-expandable trace pipeline_status.update( label="✓ Response complete — click to see reasoning", state="complete", expanded=False, ) # ── store turn data ─────────────────────────────────────────────────────── # Verse cards come from selected_verses (set during Stage 4) — no stream # parsing needed. _parse_reasoning still reads the raw stream for the # collapsible pipeline trace; that's the only thing we extract from it. reasoning = _parse_reasoning(ctx.full_text) footer = ( "\n\n---\n*Passages: " + " · ".join(f"`{r}`" for r in selected_refs) + "*" ) st.session_state.messages.append( {"role": "assistant", "content": response_text + footer} ) turn_idx = len(st.session_state.turn_data) st.session_state.turn_data.append({ "felt_emotion": u.felt_emotion, "surface_concern": u.surface_concern, "deeper_concern": u.deeper_concern, "vedantic_themes": u.vedantic_themes, "queries": queries, "retrieved_count": len(candidates), "selected_refs": selected_refs, "selected_scores": selected_scores, "selected_verses": [h.verse for h in selected], "selection_rationale": s.selection_rationale, "synthesis_reasoning": reasoning, "conversation_context": f"User: {prompt}\n\nAdvisor: {response_text}", }) # Render verse cards immediately (before any next rerun) _render_verse_cards(st.session_state.turn_data[-1], turn_idx, synthesis_client) main()