# streamlit_app.py
import streamlit as st
import os
import tempfile
from datetime import datetime
import json
import base64
# Import custom modules
from utils.config import load_environment
from crews import get_crew, list_available_crews
from api.assemblyai import transcribe_podcast
from api.composio import send_email_summary
from database.mongodb import store_podcast_data, get_all_podcast_titles, get_podcast_by_title
from database.qdrant import store_vectors
from app.chatbot import generate_answer
from api.tts import text_to_speech
# Load environment variables
load_environment()
def main():
st.title("Meeting Analyzer & Chatbot")
# Add app version and sidebar config
st.sidebar.title("Configuration")
st.sidebar.caption("Version 2.0 - Enhanced Agent Architecture")
# Crew selection
available_crews = list_available_crews()
crew_display_names = {
"standard": "Standard",
"enhanced": "Enhanced (with fact-checking)",
"multilingual": "Multilingual",
"research": "Research-focused",
"fact_checking": "Fact Checking",
"advanced_multilingual": "Advanced Multilingual",
"localization": "Localization"
}
crew_options = [crew_display_names.get(crew, crew.replace("_", " ").title()) for crew in available_crews[:4]] # Limit to first 5 for simplicity
crew_type = st.sidebar.radio(
"Crew Type",
crew_options
)
# Convert display name back to crew type
reverse_mapping = {v: k for k, v in crew_display_names.items()}
selected_crew_type = reverse_mapping.get(crew_type, "standard")
# Model selection
model_options = ["gpt-4o"]
selected_model = st.sidebar.selectbox("AI Model", options=model_options, index=0)
# Multilingual options
languages = ["English", "Spanish", "French", "German", "Chinese", "Japanese", "Arabic"]
if "Multilingual" in crew_type or "Localization" in crew_type:
target_languages = st.sidebar.multiselect(
"Target Languages",
options=languages,
default=["English", "Spanish"]
)
else:
target_languages = ["English"]
# Add database status indicator
try:
from database.mongodb import get_mongodb_client
client = get_mongodb_client()
client.admin.command('ping')
st.sidebar.success("✅ Database connection successful")
except Exception as e:
st.sidebar.warning(f"⚠️ Database connection unavailable (using mock data): {str(e)}")
# Create tabs for different functions
tab1, tab2, tab3, tab4 = st.tabs(["Analyze Meeting", "Chat about Meeting", "Listen to Summaries", "Fact Check"])
with tab1:
st.header("Upload and Analyze Meeting")
uploaded_file = st.file_uploader("Choose a Meeting audio file", type=["mp3", "wav", "m4a"])
podcast_title = st.text_input("Meeting Title")
board_emails = st.text_area("Board Member Emails (one per line)")
if st.button("Analyze Meeting", key="analyze_podcast_btn"):
if not podcast_title:
st.error("Please enter a Meeting title")
return
if uploaded_file is None:
st.error("Please upload a Meeting audio file")
return
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix=f".{uploaded_file.name.split('.')[-1]}") as tmp_file:
tmp_file.write(uploaded_file.getvalue())
audio_path = tmp_file.name
try:
with st.spinner("Analyzing Meeting..."):
# Use AssemblyAI for transcription
st.info("Transcribing Meeting...")
try:
transcript = transcribe_podcast(audio_path)
st.success("Transcription complete!")
except Exception as e:
st.error(f"Transcription error: {str(e)}")
# For testing, generate a mock transcript
transcript = f"This is a mock transcript for '{podcast_title}'. The real transcription failed."
# Create the appropriate crew with the selected model
st.info(f"Running {crew_type} crew analysis with {selected_model}...")
# Create crew with options
crew_kwargs = {"model": selected_model}
# Add language options for multilingual crews
if "Multilingual" in crew_type or "Localization" in crew_type:
crew_kwargs["target_languages"] = target_languages
podcast_crew = get_crew(selected_crew_type, **crew_kwargs)
# Run the analysis
result_json = podcast_crew.run_analysis(transcript)
analysis_result = json.loads(result_json)
# Prepare data for storage
podcast_data = {
"title": podcast_title,
"date_analyzed": datetime.now().isoformat(),
"transcript": transcript,
"summary": analysis_result.get("summary", "Summary not available"),
"key_topics": analysis_result.get("key_topics", ["Topic information not available"]),
"sentiment": analysis_result.get("sentiment_analysis", "Sentiment analysis not available"),
"action_items": analysis_result.get("action_items", ["Action items not available"])
}
# Add translations if available
if "translations" in analysis_result:
podcast_data["translations"] = analysis_result["translations"]
# Store in MongoDB
st.info("Storing results in database...")
try:
summary_id = store_podcast_data(podcast_data)
st.success("Data stored successfully!")
except Exception as e:
st.error(f"Error storing data: {str(e)}")
summary_id = "mock_id_12345"
# Store in Qdrant for vector search
st.info("Storing vectors for semantic search...")
try:
store_vectors(podcast_data, summary_id)
st.success("Vectors stored successfully!")
except Exception as e:
st.error(f"Error storing vectors: {str(e)}")
# Send email to board members
if board_emails:
recipient_list = [email.strip() for email in board_emails.split("\n") if email.strip()]
st.info(f"Sending summary email to {len(recipient_list)} recipients...")
try:
send_email_summary(podcast_data, recipient_list)
st.success("Email sent successfully!")
except Exception as e:
st.error(f"Error sending email: {str(e)}")
# Display results
st.success("Meeting analysis complete!")
# Executive Summary
st.subheader("Executive Summary")
st.write(analysis_result.get("summary", "The executive summary could not be generated completely."))
# Key Topics
st.subheader("Key Topics")
for topic in analysis_result.get("key_topics", ["Topic information not available"]):
st.write(f"- {topic}")
# Sentiment Analysis
st.subheader("Sentiment Analysis")
st.write(analysis_result.get("sentiment_analysis", "Sentiment analysis could not be generated completely."))
# Action Items
st.subheader("Action Items")
for item in analysis_result.get("action_items", ["Action items not available"]):
st.write(f"- {item}")
# Display translations if available (Multilingual crew)
if "translations" in analysis_result:
st.subheader("Translations")
for language, content in analysis_result["translations"].items():
with st.expander(f"{language.capitalize()} Translation"):
if isinstance(content, dict):
if "summary" in content:
st.write("**Summary:**")
st.write(content["summary"])
if "action_items" in content:
st.write("**Action Items:**")
if isinstance(content["action_items"], list):
for item in content["action_items"]:
st.write(f"- {item}")
else:
st.write(content["action_items"])
else:
st.write(content)
# Enhanced features (Research, Fact Check)
if "research" in analysis_result:
st.subheader("Research Insights")
st.write(analysis_result["research"])
if "fact_check" in analysis_result:
st.subheader("Fact Check Results")
fact_check = analysis_result["fact_check"]
if isinstance(fact_check, dict):
if "results" in fact_check:
for result in fact_check["results"]:
status_color = "green" if result["status"] == "Verified" else "red" if result["status"] == "Refuted" else "orange"
st.markdown(f"- **Claim:** {result['claim']} \n **Status:** {result['status']}", unsafe_allow_html=True)
else:
st.write(fact_check)
else:
st.write(fact_check)
# Add TTS option
if st.button("Generate Audio Summary", key="generate_audio_summary_btn1"):
with st.spinner("Generating audio..."):
summary_text = analysis_result.get("summary", "No summary available.")
voice = st.selectbox("Select voice:", ["alloy", "echo", "fable", "onyx", "nova", "shimmer"], key="voice_select1")
audio_file = text_to_speech(summary_text, voice=voice)
if audio_file:
st.success("Audio generated successfully!")
st.audio(audio_file)
else:
st.error("Failed to generate audio summary")
# Clean up temporary files
os.unlink(audio_path)
except Exception as main_error:
st.error(f"An error occurred during analysis: {str(main_error)}")
st.info("Please try again with a different Meeting or check the logs for more details.")
with tab2:
st.header("Chat about Analyzed Meetings")
# Podcast selection
podcast_titles = get_all_podcast_titles()
if not podcast_titles:
st.warning("No analyzed Meetings found. Please analyze some Meetings first.")
else:
selected_podcast = st.selectbox(
"Select a Meeting to chat about:",
options=podcast_titles,
index=0,
key="chat_podcast_select"
)
st.write(f"Selected Meeting: **{selected_podcast}**")
user_question = st.text_input("Ask a question about this Meeting")
if st.button("Ask", key="ask_question_btn"):
if not user_question:
st.error("Please enter a question")
return
with st.spinner("Searching for answer..."):
try:
# Get podcast data
podcast_data = get_podcast_by_title(selected_podcast)
if not podcast_data:
st.error(f"Could not find Meeting data for '{selected_podcast}'")
else:
# Generate answer
answer = generate_answer(podcast_data, user_question)
st.subheader("Answer")
st.write(answer)
# Add TTS option for the answer
if st.button("Listen to Answer", key="listen_answer_btn"):
with st.spinner("Generating audio..."):
voice = st.selectbox("Select voice:", ["alloy", "echo", "fable", "onyx", "nova", "shimmer"], key="voice_select2")
audio_file = text_to_speech(answer, voice=voice)
if audio_file:
st.success("Audio generated successfully!")
st.audio(audio_file)
else:
st.error("Failed to generate audio for answer")
st.subheader("Source")
st.write(f"Based on Meeting: {podcast_data.get('title', 'Unknown Meeting')}")
except Exception as e:
st.error(f"An error occurred: {str(e)}")
with tab3:
st.header("Listen to Meeting Summaries")
# Podcast selection
podcast_titles = get_all_podcast_titles()
if not podcast_titles:
st.warning("No analyzed Meetings found. Please analyze some Meetings first.")
else:
selected_podcast = st.selectbox(
"Select a Meeting to listen to:",
options=podcast_titles,
index=0,
key="listen_podcast_select"
)
# Voice selection
voice_options = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]
selected_voice = st.selectbox("Select a voice:", options=voice_options, index=0, key="voice_select3")
# Language selection (for multilingual podcasts)
podcast_data = get_podcast_by_title(selected_podcast)
available_languages = ["English"]
if podcast_data and "translations" in podcast_data:
for language in podcast_data["translations"]:
if language.lower() != "english":
available_languages.append(language.capitalize())
selected_language = st.selectbox("Select language:", options=available_languages, index=0, key="language_select")
if st.button("Generate Audio Summary", key="generate_audio_summary_btn2"):
with st.spinner("Generating audio summary..."):
try:
if not podcast_data:
st.error(f"Could not find Meeting data for '{selected_podcast}'")
else:
# Get the appropriate summary based on language
if selected_language.lower() == "english":
summary_text = podcast_data.get("summary", "Summary not available.")
else:
language_key = selected_language.lower()
if "translations" in podcast_data and language_key in podcast_data["translations"]:
if isinstance(podcast_data["translations"][language_key], dict):
summary_text = podcast_data["translations"][language_key].get("summary", "Translation not available.")
else:
summary_text = podcast_data["translations"][language_key]
else:
summary_text = "Translation not available for this language."
# Generate audio
audio_file = text_to_speech(summary_text, voice=selected_voice)
if audio_file:
st.success(f"Audio summary for '{selected_podcast}' generated successfully!")
st.audio(audio_file)
# Add download option
with open(audio_file, "rb") as file:
audio_bytes = file.read()
file_name = f"{selected_podcast.replace(' ', '_')}_{selected_language}_summary.mp3"
st.download_button(
label="Download Audio Summary",
data=audio_bytes,
file_name=file_name,
mime="audio/mp3",
key="download_summary_btn"
)
else:
st.error("Failed to generate audio summary")
except Exception as e:
st.error(f"An error occurred: {str(e)}")
with tab4:
st.header("Fact Check Meeting")
# Podcast selection
podcast_titles = get_all_podcast_titles()
if not podcast_titles:
st.warning("No analyzed Meetings found. Please analyze some Meetings first.")
else:
selected_podcast = st.selectbox(
"Select a Meeting to fact check:",
options=podcast_titles,
index=0,
key="fact_check_podcast_select"
)
if st.button("Extract Claims", key="extract_claims_btn"):
with st.spinner("Extracting claims from Meeting..."):
try:
# Get podcast data
podcast_data = get_podcast_by_title(selected_podcast)
if not podcast_data:
st.error(f"Could not find Meeting data for '{selected_podcast}'")
else:
# Get the transcript
transcript = podcast_data.get("transcript", "")
if not transcript:
st.error("No transcript available for fact checking")
else:
# Use the fact checker agent
from agents.registry import get_agent
fact_checker = get_agent("fact_checker")
st.info("Extracting claims...")
claims = fact_checker.extract_claims(transcript)
if not claims:
st.warning("No factual claims were identified in this Meeting")
else:
st.success(f"Found {len(claims)} factual claims")
# Store claims in session state for later use
st.session_state.claims = claims
# Display claims
st.subheader("Identified Claims")
for i, claim in enumerate(claims):
st.write(f"{i+1}. {claim}")
# Allow user to select claims to verify
st.session_state.selected_claims = st.multiselect(
"Select claims to verify:",
options=claims,
default=claims[:min(3, len(claims))]
)
except Exception as e:
st.error(f"An error occurred during claim extraction: {str(e)}")
# Check if we have selected claims in session state
if hasattr(st.session_state, 'selected_claims') and st.session_state.selected_claims:
if st.button("Verify Selected Claims", key="verify_claims_btn"):
st.subheader("Verification Results")
# Get fact checker agent
from agents.registry import get_agent
fact_checker = get_agent("fact_checker")
progress_bar = st.progress(0)
results = []
# Verify each selected claim
for i, claim in enumerate(st.session_state.selected_claims):
with st.spinner(f"Verifying claim {i+1} of {len(st.session_state.selected_claims)}..."):
result = fact_checker.verify_claim(claim, use_wikipedia=True)
results.append(result)
# Update progress
progress = (i + 1) / len(st.session_state.selected_claims)
progress_bar.progress(progress)
# Display results
st.success("Verification complete!")
for result in results:
status_color = "green" if result["status"] == "Verified" else "red" if result["status"] == "Refuted" else "orange"
with st.expander(f"{result['claim']} - {result['status']}"):
st.markdown(f"**Status:** {result['status']}", unsafe_allow_html=True)
st.write("**Details:**")
st.write(result["details"])
if __name__ == "__main__":
main()