# database/mongodb.py from pymongo import MongoClient from utils.config import get_mongodb_uri def get_mongodb_client(): """ Get a MongoDB client instance with robust error handling for cloud connections Returns: MongoClient: MongoDB client """ uri = get_mongodb_uri() # For troubleshooting - hide password in log debug_uri = uri if uri and '@' in uri: parts = uri.split('@') auth_part = parts[0].split('://') if len(auth_part) > 1: debug_uri = f"{auth_part[0]}://****:****@{parts[1]}" print(f"Connecting to MongoDB with URI: {debug_uri}") try: # The simplest, most reliable way to connect to MongoDB Atlas client = MongoClient(uri) # Test the connection client.admin.command('ping') print("MongoDB connection successful") return client except Exception as e: print(f"MongoDB connection error: {e}") # Create a mock client for testing when cloud connection fails print("Creating mock MongoDB client for testing") class MockMongoClient: def __getitem__(self, key): return MockMongoCollection() class MockMongoCollection: def __getitem__(self, key): return self def find(self, *args, **kwargs): return [] def find_one(self, *args, **kwargs): return None def insert_one(self, document): class MockResult: inserted_id = "mock_id_12345" return MockResult() return MockMongoClient() def get_podcast_collection(): """ Get the MongoDB collection for Meeting summaries Returns: Collection: MongoDB collection """ client = get_mongodb_client() db = client["podcast_analytics"] return db["summaries"] def store_podcast_data(podcast_data): """ Store Meeting data in MongoDB Args: podcast_data: Dictionary containing Meeting data and analysis Returns: str: ID of the inserted document """ collection = get_podcast_collection() result = collection.insert_one(podcast_data) return str(result.inserted_id) def get_podcast_by_id(podcast_id): """ Retrieve Meeting data by ID Args: podcast_id: ID of the Meeting document Returns: dict: Meeting data or None if not found """ from bson.objectid import ObjectId try: collection = get_podcast_collection() # Check if podcast_id is a valid ObjectId if ObjectId.is_valid(podcast_id): return collection.find_one({"_id": ObjectId(podcast_id)}) else: # If not a valid ObjectId, try to find by title return collection.find_one({"title": podcast_id}) except Exception as e: print(f"Error retrieving podcast by ID: {e}") return None def get_podcast_by_title(title): """ Retrieve Meeting data by title Args: title: Title of the Meeting Returns: dict: Meeting data or None if not found """ try: collection = get_podcast_collection() result = collection.find_one({"title": title}) # If no exact match, try case-insensitive search if not result: import re regex = re.compile(f"^{re.escape(title)}$", re.IGNORECASE) result = collection.find_one({"title": {"$regex": regex}}) # If still no match, try partial match if not result: import re regex = re.compile(f".*{re.escape(title)}.*", re.IGNORECASE) result = collection.find_one({"title": {"$regex": regex}}) return result except Exception as e: print(f"Error retrieving Meeting by title: {e}") return None def get_all_podcasts(): """ Retrieve all Meeting data Returns: list: List of all Meeting documents """ try: collection = get_podcast_collection() return list(collection.find()) except Exception as e: print(f"Error retrieving all Meetings: {e}") return [] def get_all_podcast_titles(): """ Retrieve all Meeting titles Returns: list: List of all Meeting titles """ try: collection = get_podcast_collection() results = collection.find({}, {"title": 1}) titles = [doc.get("title", f"Meeting {str(doc.get('_id', 'unknown'))}") for doc in results if doc.get("title")] # If no titles found, return mock data for testing if not titles: print("No Meeting titles found, returning mock data for testing") return ["Sample Meeting 1", "Sample Meeting 2", "Sample Meeting 3"] return titles except Exception as e: print(f"Error retrieving Meeting titles: {e}") # Return mock data for testing return ["Sample Meeting 1", "Sample Meeting 2", "Sample Meeting 3"] def update_podcast_data(podcast_id, update_data): """ Update Meeting data Args: podcast_id: ID of the Meeting document update_data: Dictionary containing fields to update Returns: bool: True if update was successful """ from bson.objectid import ObjectId collection = get_podcast_collection() result = collection.update_one( {"_id": ObjectId(podcast_id)}, {"$set": update_data} ) return result.modified_count > 0 def delete_podcast(podcast_id): """ Delete a Meeting document Args: Meeting_id: ID of the Meeting document Returns: bool: True if deletion was successful """ from bson.objectid import ObjectId collection = get_podcast_collection() result = collection.delete_one({"_id": ObjectId(podcast_id)}) return result.deleted_count > 0