""" FastAPI Application for Event-Centric Audience Segmentation AI Author: AI Generated Created: 2025-11-24 (Refactored) Purpose: REST API with event-based endpoints """ from fastapi import FastAPI, HTTPException, BackgroundTasks, status, Query from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Dict, Optional, Any from datetime import datetime from bson import ObjectId # Import services from services.segmentation_service import SegmentationService from services.sentiment_service import SentimentAnalysisService from services.genai_service import GenerativeAIService from database import db from config import settings # FastAPI app app = FastAPI( title="Audience Segmentation AI - Event-Centric", description="REST API for per-event audience analysis", version="2.0.0", docs_url="/api/docs", redoc_url="/api/redoc" ) # CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Helper def serialize_doc(doc: Dict) -> Optional[Dict]: """Convert MongoDB document to JSON-serializable dict""" if doc is None: return None if '_id' in doc: doc['id'] = str(doc.pop('_id')) # Handle nested ObjectIds and lists for key, value in list(doc.items()): if isinstance(value, ObjectId): doc[key] = str(value) elif isinstance(value, list): doc[key] = [str(v) if isinstance(v, ObjectId) else v for v in value] elif isinstance(value, dict): doc[key] = serialize_doc(value) return doc # ===== HEALTH ===== @app.get("/health", tags=["System"]) async def health_check(): """Health check""" try: db.client.server_info() return { "status": "healthy", "timestamp": datetime.utcnow(), "database": "connected" } except Exception as e: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"Unhealthy: {str(e)}" ) # ===== EVENT ANALYSIS ===== @app.post("/api/events/{event_code}/analyze", tags=["Event Analysis"]) async def analyze_event(event_code: str, background_tasks: BackgroundTasks): """Run full AI pipeline for an event""" def run_pipeline(): # Step 1: Segmentation seg_service = SegmentationService(event_code) seg_service.run_segmentation() # Step 2: Sentiment sent_service = SentimentAnalysisService(event_code) sent_service.analyze_event_comments() # Step 3: Email generation genai_service = GenerativeAIService(event_code) genai_service.generate_emails_for_all_segments() # Step 4: Insights genai_service.update_sentiment_summary_with_insights() background_tasks.add_task(run_pipeline) return { "status": "started", "message": f"Analysis pipeline started for event {event_code}" } @app.get("/api/events/{event_code}/dashboard", tags=["Event Analysis"]) async def get_event_dashboard(event_code: str): """Get complete dashboard for Event Owner""" # Get segments segments = list(db.event_audience_segments.find({"event_code": event_code})) # Get sentiment summary sentiment_summary = db.event_sentiment_summary.find_one({"event_code": event_code}) return { "event_code": event_code, "segments": [serialize_doc(s) for s in segments], "sentiment_summary": serialize_doc(sentiment_summary) if sentiment_summary else None } # ===== SEGMENTATION ===== @app.post("/api/events/{event_code}/segmentation/run", tags=["Segmentation"]) async def run_event_segmentation( event_code: str, background_tasks: BackgroundTasks, n_clusters: int = Query(default=5, ge=2, le=10) ): """Run segmentation for an event""" def run_task(): service = SegmentationService(event_code, n_clusters=n_clusters) service.run_segmentation() background_tasks.add_task(run_task) return { "status": "started", "message": f"Segmentation started for event {event_code}", "event_code": event_code } @app.get("/api/events/{event_code}/segments", tags=["Segmentation"]) async def get_event_segments( event_code: str, status_filter: Optional[str] = Query(default=None, description="Filter by Draft, Approved, Sent") ): """Get all segments for an event""" query = {"event_code": event_code} if status_filter: query["marketing_content.status"] = status_filter segments = list(db.event_audience_segments.find(query)) return [serialize_doc(s) for s in segments] @app.get("/api/events/{event_code}/segments/{segment_id}", tags=["Segmentation"]) async def get_segment_detail(event_code: str, segment_id: str): """Get specific segment details""" segment = db.event_audience_segments.find_one({ "_id": ObjectId(segment_id), "event_code": event_code }) if not segment: raise HTTPException(status_code=404, detail="Segment not found") return serialize_doc(segment) @app.get("/api/events/{event_code}/segments/{segment_id}/users", tags=["Segmentation"]) async def get_segment_users( event_code: str, segment_id: str, skip: int = 0, limit: int = 100 ): """Get users in a segment with details""" segment = db.event_audience_segments.find_one({ "_id": ObjectId(segment_id), "event_code": event_code }) if not segment: raise HTTPException(status_code=404, detail="Segment not found") user_ids = segment.get('user_ids', []) total_users = len(user_ids) # Paginate paginated_ids = user_ids[skip:skip + limit] # Get user details users = list(db.users.find({ "_id": {"$in": paginated_ids} })) # Enrich with stats (optional) enriched_users = [] for user in users: enriched_users.append({ "user_id": str(user['_id']), "email": user.get('email'), "full_name": f"{user.get('FirstName', '')} {user.get('LastName', '')}".strip() }) return { "segment_id": segment_id, "total_users": total_users, "users": enriched_users } # ===== APPROVAL WORKFLOW ===== @app.post("/api/events/{event_code}/segments/{segment_id}/approve", tags=["Approval"]) async def approve_segment( event_code: str, segment_id: str, approved_by: Optional[str] = None, modified_subject: Optional[str] = None, modified_body: Optional[str] = None ): """Event Owner approves marketing content""" segment = db.event_audience_segments.find_one({ "_id": ObjectId(segment_id), "event_code": event_code }) if not segment: raise HTTPException(status_code=404, detail="Segment not found") # Update fields update = { "marketing_content.status": "Approved", "marketing_content.approved_at": datetime.utcnow(), "marketing_content.approved_by": approved_by, "last_updated": datetime.utcnow() } if modified_subject: update["marketing_content.email_subject"] = modified_subject if modified_body: update["marketing_content.email_body"] = modified_body db.event_audience_segments.update_one( {"_id": ObjectId(segment_id)}, {"$set": update} ) updated_segment = db.event_audience_segments.find_one({"_id": ObjectId(segment_id)}) return { "status": "success", "message": "Segment approved", "segment_id": segment_id, "marketing_content": updated_segment.get('marketing_content') } @app.post("/api/events/{event_code}/segments/{segment_id}/send-email", tags=["Approval"]) async def send_segment_email( event_code: str, segment_id: str, send_immediately: bool = True ): """Send approved marketing email""" segment = db.event_audience_segments.find_one({ "_id": ObjectId(segment_id), "event_code": event_code }) if not segment: raise HTTPException(status_code=404, detail="Segment not found") marketing_content = segment.get('marketing_content', {}) if marketing_content.get('status') != "Approved": raise HTTPException(status_code=400, detail="Segment not approved yet") # TODO: Integrate with email service (SendGrid, AWS SES, etc.) # For now, just mark as sent db.event_audience_segments.update_one( {"_id": ObjectId(segment_id)}, {"$set": { "marketing_content.status": "Sent", "last_updated": datetime.utcnow() }} ) return { "status": "success", "message": f"Email sent to {segment.get('user_count', 0)} users", "segment_id": segment_id, "emails_sent": segment.get('user_count', 0), "emails_failed": 0 } # ===== SENTIMENT ===== @app.post("/api/events/{event_code}/sentiment/analyze", tags=["Sentiment"]) async def analyze_event_sentiment(event_code: str, background_tasks: BackgroundTasks): """Analyze sentiment for event comments""" def run_task(): service = SentimentAnalysisService(event_code) service.analyze_event_comments() background_tasks.add_task(run_task) return { "status": "started", "message": f"Sentiment analysis started for event {event_code}" } @app.get("/api/events/{event_code}/sentiment/summary", tags=["Sentiment"]) async def get_sentiment_summary(event_code: str): """Get sentiment summary for an event""" summary = db.event_sentiment_summary.find_one({"event_code": event_code}) if not summary: raise HTTPException(status_code=404, detail="No sentiment data for this event") return serialize_doc(summary) @app.get("/api/events/{event_code}/sentiment/results", tags=["Sentiment"]) async def get_sentiment_results( event_code: str, sentiment_label: Optional[str] = None, skip: int = 0, limit: int = 100 ): """Get detailed sentiment results""" query = {"event_code": event_code} if sentiment_label: query["sentiment_label"] = sentiment_label total = db.sentiment_results.count_documents(query) results = list( db.sentiment_results.find(query) .sort("analyzed_at", -1) .skip(skip) .limit(limit) ) return { "total": total, "results": [serialize_doc(r) for r in results] } # ===== GENAI ===== @app.post("/api/events/{event_code}/genai/generate-emails", tags=["GenAI"]) async def generate_event_emails(event_code: str, background_tasks: BackgroundTasks): """Generate marketing emails for all segments""" def run_task(): service = GenerativeAIService(event_code) service.generate_emails_for_all_segments() background_tasks.add_task(run_task) return { "status": "started", "message": "Email generation started" } @app.post("/api/events/{event_code}/genai/generate-insights", tags=["GenAI"]) async def generate_event_insights(event_code: str, background_tasks: BackgroundTasks): """Generate AI insights from negative feedback""" def run_task(): service = GenerativeAIService(event_code) service.update_sentiment_summary_with_insights() background_tasks.add_task(run_task) return { "status": "started", "message": "Insight generation started" } # ===== MONITORING ===== @app.get("/api/monitoring/pipelines/{pipeline}/metrics", tags=["Monitoring"]) async def get_pipeline_metrics( pipeline: str, event_code: Optional[str] = None, days: int = 7 ): """Get performance metrics""" # TODO: Implement based on monitoring.py return { "pipeline": pipeline, "event_code": event_code, "message": "Metrics endpoint - implement as needed" } # ===== ADMIN ===== @app.post("/api/admin/indexes/create", tags=["Admin"]) async def create_indexes(): """Create MongoDB indexes""" from scripts.create_indexes import create_all_indexes try: create_all_indexes() return {"status": "success", "message": "Indexes created"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ===== ROOT ===== @app.get("/") async def root(): """API root""" return { "name": "Audience Segmentation AI - Event-Centric", "version": "2.0.0", "docs": "/api/docs", "health": "/health" } if __name__ == "__main__": import uvicorn uvicorn.run( "app:app", host="0.0.0.0", port=7860, reload=True )