import logging from typing import AsyncGenerator from sqlalchemy import text from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker, AsyncEngine from backend.core.config import settings from backend.db.base import Base logger = logging.getLogger(__name__) def create_async_engine_from_url(url: str) -> AsyncEngine: if url.startswith("sqlite"): return create_async_engine( url, connect_args={"check_same_thread": False}, echo=settings.DEBUG, pool_pre_ping=True ) else: return create_async_engine( url, pool_size=5, max_overflow=10, pool_timeout=30, pool_recycle=1800, pool_pre_ping=True, echo=False ) engine = create_async_engine_from_url(settings.DATABASE_URL) AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, autocommit=False, autoflush=False ) async def get_db() -> AsyncGenerator[AsyncSession, None]: async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise finally: await session.close() async def init_db(): """Create all tables if they don't exist (for development)""" async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) logger.info("Database initialized") async def check_db_connection() -> bool: """Used in health checks""" try: async with engine.connect() as conn: await conn.execute(text("SELECT 1")) return True except Exception as e: logger.error(f"DB connection failed: {e}") return False