import logging import threading import time from contextlib import asynccontextmanager from fastapi import Depends, FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import RedirectResponse from backend.app.api.routes import ( strategy, ) from backend.app.api.routes import stats from backend.app.api.routes import admin from backend.app.api.routes import analytics from backend.app.api.routes import graph from backend.app.api.routes import math_engine from backend.app.api.routes import mohp from backend.app.api.routes import persona from backend.app.api.routes import pipeline from backend.app.api.routes import simulation from backend.app.api.routes import warroom from backend.app.core.config import settings from backend.app.core.dlq import DeadLetterQueue from backend.app.core.idempotency import IdempotencyStore from backend.app.core.logging import configure_logging, install_application_logging, log_startup_summary from backend.app.core.outbox import TransactionalOutbox from backend.app.core.rate_limiter import RateLimitExceeded, rate_limit_exception_handler from backend.app.core.security import require_admin_mutation from backend.app.db.redis_client import get_redis_client # ── Production validation (fail-fast before FastAPI app is created) ─────────── settings.validate_production_secrets() configure_logging(settings.log_level) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(application: FastAPI): """Run all startup tasks before yielding, then cleanup on shutdown.""" logger.info("Application startup started") preload_models() start_outbox_worker() start_event_consumer() logger.info("Application startup completed") try: yield finally: logger.info("Application shutdown completed") app = FastAPI(title=settings.app_name, lifespan=lifespan) install_application_logging( app, capture_request_bodies=settings.log_request_bodies, body_max_chars=settings.log_request_body_max_chars, ) app.add_exception_handler(RateLimitExceeded, rate_limit_exception_handler) # ── CORS: use allow-listed origins from config ─────────────────────────────── _allowed_origins = settings.resolved_cors_allowed_origins app.add_middleware( CORSMiddleware, allow_origins=_allowed_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) log_startup_summary( app_name=settings.app_name, environment=settings.environment, port=settings.port, cors_origins=_allowed_origins, ollama_model=settings.ollama_model, embedding_model=settings.embedding_model, neo4j_uri=settings.neo4j_uri, redis_url=settings.safe_redis_url, chroma_host=settings.chroma_host, chroma_port=settings.chroma_port, infrastructure=settings.infrastructure_diagnostics(), ) # ── Route registration ──────────────────────────────────────────────────────── app.include_router(pipeline.router) app.include_router(math_engine.router) app.include_router(strategy.router) app.include_router(simulation.router) app.include_router(graph.router) app.include_router(analytics.router) app.include_router(persona.router) app.include_router(mohp.router) app.include_router(stats.router) app.include_router(admin.router) app.include_router(admin.seed_router) app.include_router(warroom.router) @app.get("/healthz") def healthcheck() -> dict: return {"status": "ok"} @app.get("/", include_in_schema=False) def root(): return RedirectResponse(url="/docs") # ── Startup helpers ─────────────────────────────────────────────────────────── def preload_models() -> None: """Pre-load embedding model and projection bridge weights into memory.""" try: from backend.app.core.embedder import EmbeddingRegistry from backend.app.services.projection_bridge import load_weights logger.info("Pre-loading embedding model: %s", settings.embedding_model) EmbeddingRegistry.get().load_model(settings.embedding_model) logger.info("Pre-loading projection bridge weights") load_weights() logger.info("All models pre-loaded into memory.") except Exception: logger.exception("Failed to pre-load models — some features may be unavailable") # Kokoro TTS — warm up both English pipelines (American + British) try: from backend.app.services.tts_service import preload as tts_preload logger.info("Pre-loading Kokoro TTS pipelines") tts_preload() logger.info("Kokoro TTS pipelines ready") except Exception: logger.warning("Kokoro TTS unavailable — run 'pip install kokoro soundfile' to enable") # faster-whisper STT — load Whisper base model into memory try: from backend.app.services.stt_service import preload as stt_preload logger.info("Pre-loading faster-whisper STT model") stt_preload() logger.info("faster-whisper STT model ready") except Exception: logger.warning("faster-whisper STT unavailable — run 'pip install faster-whisper' to enable") try: from backend.app.api.routes.strategy import warm_strategy_model_cache warm_strategy_model_cache() logger.info("Strategy GMM/PCA artifacts warmed") except Exception: logger.warning("Strategy GMM/PCA warm-up failed; first request will fit lazily", exc_info=True) def _outbox_worker() -> None: logger.info("Outbox worker loop started | transport=%s", settings.outbox_transport) redis = get_redis_client() outbox = TransactionalOutbox( redis, IdempotencyStore(redis), DeadLetterQueue(redis), transport=settings.outbox_transport, amqp_url=settings.rabbitmq_url, ) while True: try: processed = outbox.dispatch_once() if not processed: time.sleep(1.0) except Exception: logger.exception("Outbox dispatch failed; retrying after backoff") time.sleep(2.0) def start_outbox_worker() -> None: try: worker = threading.Thread(target=_outbox_worker, daemon=True, name="outbox-worker") worker.start() logger.info("Outbox worker thread started | thread=%s", worker.name) except Exception: logger.exception("Outbox worker failed to start — event dispatching disabled") def start_event_consumer() -> None: try: from backend.app.services.event_consumer import run_consumer_loop consumer = threading.Thread(target=run_consumer_loop, daemon=True, name="event-consumer") consumer.start() logger.info("Event consumer thread started | thread=%s", consumer.name) except Exception: logger.exception("Event consumer failed to start — async event processing disabled") @app.get("/admin/dlq") def get_dlq(limit: int = 100) -> dict: redis = get_redis_client() dlq = DeadLetterQueue(redis) return {"items": dlq.list_items(limit)} @app.post("/admin/dlq/replay") def replay_dlq(_admin: bool = Depends(require_admin_mutation)) -> dict: redis = get_redis_client() dlq = DeadLetterQueue(redis) item = dlq.replay_one() return {"replayed": item is not None, "item": item}