minhvtt commited on
Commit
de1a145
·
verified ·
1 Parent(s): ea06065

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +447 -447
app.py CHANGED
@@ -1,447 +1,447 @@
1
- """
2
- FastAPI Application for Event-Centric Audience Segmentation AI
3
- Author: AI Generated
4
- Created: 2025-11-24 (Refactored)
5
- Purpose: REST API with event-based endpoints
6
- """
7
-
8
- from fastapi import FastAPI, HTTPException, BackgroundTasks, status, Query
9
- from fastapi.middleware.cors import CORSMiddleware
10
- from pydantic import BaseModel
11
- from typing import List, Dict, Optional, Any
12
- from datetime import datetime
13
- from bson import ObjectId
14
-
15
- # Import services
16
- from services.segmentation_service import SegmentationService
17
- from services.sentiment_service import SentimentAnalysisService
18
- from services.genai_service import GenerativeAIService
19
- from database import db
20
- from config import settings
21
-
22
-
23
- # FastAPI app
24
- app = FastAPI(
25
- title="Audience Segmentation AI - Event-Centric",
26
- description="REST API for per-event audience analysis",
27
- version="2.0.0",
28
- docs_url="/api/docs",
29
- redoc_url="/api/redoc"
30
- )
31
-
32
- # CORS
33
- app.add_middleware(
34
- CORSMiddleware,
35
- allow_origins=["*"],
36
- allow_credentials=True,
37
- allow_methods=["*"],
38
- allow_headers=["*"],
39
- )
40
-
41
-
42
- # Helper
43
- def serialize_doc(doc: Dict) -> Optional[Dict]:
44
- """Convert MongoDB document to JSON-serializable dict"""
45
- if doc is None:
46
- return None
47
- if '_id' in doc:
48
- doc['id'] = str(doc.pop('_id'))
49
-
50
- # Handle nested ObjectIds and lists
51
- for key, value in list(doc.items()):
52
- if isinstance(value, ObjectId):
53
- doc[key] = str(value)
54
- elif isinstance(value, list):
55
- doc[key] = [str(v) if isinstance(v, ObjectId) else v for v in value]
56
- elif isinstance(value, dict):
57
- doc[key] = serialize_doc(value)
58
-
59
- return doc
60
-
61
-
62
- # ===== HEALTH =====
63
- @app.get("/health", tags=["System"])
64
- async def health_check():
65
- """Health check"""
66
- try:
67
- db.client.server_info()
68
- return {
69
- "status": "healthy",
70
- "timestamp": datetime.utcnow(),
71
- "database": "connected"
72
- }
73
- except Exception as e:
74
- raise HTTPException(
75
- status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
76
- detail=f"Unhealthy: {str(e)}"
77
- )
78
-
79
-
80
- # ===== EVENT ANALYSIS =====
81
- @app.post("/api/events/{event_code}/analyze", tags=["Event Analysis"])
82
- async def analyze_event(event_code: str, background_tasks: BackgroundTasks):
83
- """Run full AI pipeline for an event"""
84
-
85
- def run_pipeline():
86
- # Step 1: Segmentation
87
- seg_service = SegmentationService(event_code)
88
- seg_service.run_segmentation()
89
-
90
- # Step 2: Sentiment
91
- sent_service = SentimentAnalysisService(event_code)
92
- sent_service.analyze_event_comments()
93
-
94
- # Step 3: Email generation
95
- genai_service = GenerativeAIService(event_code)
96
- genai_service.generate_emails_for_all_segments()
97
-
98
- # Step 4: Insights
99
- genai_service.update_sentiment_summary_with_insights()
100
-
101
- background_tasks.add_task(run_pipeline)
102
-
103
- return {
104
- "status": "started",
105
- "message": f"Analysis pipeline started for event {event_code}"
106
- }
107
-
108
-
109
- @app.get("/api/events/{event_code}/dashboard", tags=["Event Analysis"])
110
- async def get_event_dashboard(event_code: str):
111
- """Get complete dashboard for Event Owner"""
112
-
113
- # Get segments
114
- segments = list(db.event_audience_segments.find({"event_code": event_code}))
115
-
116
- # Get sentiment summary
117
- sentiment_summary = db.event_sentiment_summary.find_one({"event_code": event_code})
118
-
119
- return {
120
- "event_code": event_code,
121
- "segments": [serialize_doc(s) for s in segments],
122
- "sentiment_summary": serialize_doc(sentiment_summary) if sentiment_summary else None
123
- }
124
-
125
-
126
- # ===== SEGMENTATION =====
127
- @app.post("/api/events/{event_code}/segmentation/run", tags=["Segmentation"])
128
- async def run_event_segmentation(
129
- event_code: str,
130
- background_tasks: BackgroundTasks,
131
- n_clusters: int = Query(default=5, ge=2, le=10)
132
- ):
133
- """Run segmentation for an event"""
134
-
135
- def run_task():
136
- service = SegmentationService(event_code, n_clusters=n_clusters)
137
- service.run_segmentation()
138
-
139
- background_tasks.add_task(run_task)
140
-
141
- return {
142
- "status": "started",
143
- "message": f"Segmentation started for event {event_code}",
144
- "event_code": event_code
145
- }
146
-
147
-
148
- @app.get("/api/events/{event_code}/segments", tags=["Segmentation"])
149
- async def get_event_segments(
150
- event_code: str,
151
- status_filter: Optional[str] = Query(default=None, description="Filter by Draft, Approved, Sent")
152
- ):
153
- """Get all segments for an event"""
154
-
155
- query = {"event_code": event_code}
156
- if status_filter:
157
- query["marketing_content.status"] = status_filter
158
-
159
- segments = list(db.event_audience_segments.find(query))
160
-
161
- return [serialize_doc(s) for s in segments]
162
-
163
-
164
- @app.get("/api/events/{event_code}/segments/{segment_id}", tags=["Segmentation"])
165
- async def get_segment_detail(event_code: str, segment_id: str):
166
- """Get specific segment details"""
167
-
168
- segment = db.event_audience_segments.find_one({
169
- "_id": ObjectId(segment_id),
170
- "event_code": event_code
171
- })
172
-
173
- if not segment:
174
- raise HTTPException(status_code=404, detail="Segment not found")
175
-
176
- return serialize_doc(segment)
177
-
178
-
179
- @app.get("/api/events/{event_code}/segments/{segment_id}/users", tags=["Segmentation"])
180
- async def get_segment_users(
181
- event_code: str,
182
- segment_id: str,
183
- skip: int = 0,
184
- limit: int = 100
185
- ):
186
- """Get users in a segment with details"""
187
-
188
- segment = db.event_audience_segments.find_one({
189
- "_id": ObjectId(segment_id),
190
- "event_code": event_code
191
- })
192
-
193
- if not segment:
194
- raise HTTPException(status_code=404, detail="Segment not found")
195
-
196
- user_ids = segment.get('user_ids', [])
197
- total_users = len(user_ids)
198
-
199
- # Paginate
200
- paginated_ids = user_ids[skip:skip + limit]
201
-
202
- # Get user details
203
- users = list(db.users.find({
204
- "_id": {"$in": paginated_ids}
205
- }))
206
-
207
- # Enrich with stats (optional)
208
- enriched_users = []
209
- for user in users:
210
- enriched_users.append({
211
- "user_id": str(user['_id']),
212
- "email": user.get('email'),
213
- "full_name": f"{user.get('FirstName', '')} {user.get('LastName', '')}".strip()
214
- })
215
-
216
- return {
217
- "segment_id": segment_id,
218
- "total_users": total_users,
219
- "users": enriched_users
220
- }
221
-
222
-
223
- # ===== APPROVAL WORKFLOW =====
224
- @app.post("/api/events/{event_code}/segments/{segment_id}/approve", tags=["Approval"])
225
- async def approve_segment(
226
- event_code: str,
227
- segment_id: str,
228
- approved_by: Optional[str] = None,
229
- modified_subject: Optional[str] = None,
230
- modified_body: Optional[str] = None
231
- ):
232
- """Event Owner approves marketing content"""
233
-
234
- segment = db.event_audience_segments.find_one({
235
- "_id": ObjectId(segment_id),
236
- "event_code": event_code
237
- })
238
-
239
- if not segment:
240
- raise HTTPException(status_code=404, detail="Segment not found")
241
-
242
- # Update fields
243
- update = {
244
- "marketing_content.status": "Approved",
245
- "marketing_content.approved_at": datetime.utcnow(),
246
- "marketing_content.approved_by": approved_by,
247
- "last_updated": datetime.utcnow()
248
- }
249
-
250
- if modified_subject:
251
- update["marketing_content.email_subject"] = modified_subject
252
- if modified_body:
253
- update["marketing_content.email_body"] = modified_body
254
-
255
- db.event_audience_segments.update_one(
256
- {"_id": ObjectId(segment_id)},
257
- {"$set": update}
258
- )
259
-
260
- updated_segment = db.event_audience_segments.find_one({"_id": ObjectId(segment_id)})
261
-
262
- return {
263
- "status": "success",
264
- "message": "Segment approved",
265
- "segment_id": segment_id,
266
- "marketing_content": updated_segment.get('marketing_content')
267
- }
268
-
269
-
270
- @app.post("/api/events/{event_code}/segments/{segment_id}/send-email", tags=["Approval"])
271
- async def send_segment_email(
272
- event_code: str,
273
- segment_id: str,
274
- send_immediately: bool = True
275
- ):
276
- """Send approved marketing email"""
277
-
278
- segment = db.event_audience_segments.find_one({
279
- "_id": ObjectId(segment_id),
280
- "event_code": event_code
281
- })
282
-
283
- if not segment:
284
- raise HTTPException(status_code=404, detail="Segment not found")
285
-
286
- marketing_content = segment.get('marketing_content', {})
287
- if marketing_content.get('status') != "Approved":
288
- raise HTTPException(status_code=400, detail="Segment not approved yet")
289
-
290
- # TODO: Integrate with email service (SendGrid, AWS SES, etc.)
291
- # For now, just mark as sent
292
-
293
- db.event_audience_segments.update_one(
294
- {"_id": ObjectId(segment_id)},
295
- {"$set": {
296
- "marketing_content.status": "Sent",
297
- "last_updated": datetime.utcnow()
298
- }}
299
- )
300
-
301
- return {
302
- "status": "success",
303
- "message": f"Email sent to {segment.get('user_count', 0)} users",
304
- "segment_id": segment_id,
305
- "emails_sent": segment.get('user_count', 0),
306
- "emails_failed": 0
307
- }
308
-
309
-
310
- # ===== SENTIMENT =====
311
- @app.post("/api/events/{event_code}/sentiment/analyze", tags=["Sentiment"])
312
- async def analyze_event_sentiment(event_code: str, background_tasks: BackgroundTasks):
313
- """Analyze sentiment for event comments"""
314
-
315
- def run_task():
316
- service = SentimentAnalysisService(event_code)
317
- service.analyze_event_comments()
318
-
319
- background_tasks.add_task(run_task)
320
-
321
- return {
322
- "status": "started",
323
- "message": f"Sentiment analysis started for event {event_code}"
324
- }
325
-
326
-
327
- @app.get("/api/events/{event_code}/sentiment/summary", tags=["Sentiment"])
328
- async def get_sentiment_summary(event_code: str):
329
- """Get sentiment summary for an event"""
330
-
331
- summary = db.event_sentiment_summary.find_one({"event_code": event_code})
332
-
333
- if not summary:
334
- raise HTTPException(status_code=404, detail="No sentiment data for this event")
335
-
336
- return serialize_doc(summary)
337
-
338
-
339
- @app.get("/api/events/{event_code}/sentiment/results", tags=["Sentiment"])
340
- async def get_sentiment_results(
341
- event_code: str,
342
- sentiment_label: Optional[str] = None,
343
- skip: int = 0,
344
- limit: int = 100
345
- ):
346
- """Get detailed sentiment results"""
347
-
348
- query = {"event_code": event_code}
349
- if sentiment_label:
350
- query["sentiment_label"] = sentiment_label
351
-
352
- total = db.sentiment_results.count_documents(query)
353
- results = list(
354
- db.sentiment_results.find(query)
355
- .sort("analyzed_at", -1)
356
- .skip(skip)
357
- .limit(limit)
358
- )
359
-
360
- return {
361
- "total": total,
362
- "results": [serialize_doc(r) for r in results]
363
- }
364
-
365
-
366
- # ===== GENAI =====
367
- @app.post("/api/events/{event_code}/genai/generate-emails", tags=["GenAI"])
368
- async def generate_event_emails(event_code: str, background_tasks: BackgroundTasks):
369
- """Generate marketing emails for all segments"""
370
-
371
- def run_task():
372
- service = GenerativeAIService(event_code)
373
- service.generate_emails_for_all_segments()
374
-
375
- background_tasks.add_task(run_task)
376
-
377
- return {
378
- "status": "started",
379
- "message": "Email generation started"
380
- }
381
-
382
-
383
- @app.post("/api/events/{event_code}/genai/generate-insights", tags=["GenAI"])
384
- async def generate_event_insights(event_code: str, background_tasks: BackgroundTasks):
385
- """Generate AI insights from negative feedback"""
386
-
387
- def run_task():
388
- service = GenerativeAIService(event_code)
389
- service.update_sentiment_summary_with_insights()
390
-
391
- background_tasks.add_task(run_task)
392
-
393
- return {
394
- "status": "started",
395
- "message": "Insight generation started"
396
- }
397
-
398
-
399
- # ===== MONITORING =====
400
- @app.get("/api/monitoring/pipelines/{pipeline}/metrics", tags=["Monitoring"])
401
- async def get_pipeline_metrics(
402
- pipeline: str,
403
- event_code: Optional[str] = None,
404
- days: int = 7
405
- ):
406
- """Get performance metrics"""
407
- # TODO: Implement based on monitoring.py
408
- return {
409
- "pipeline": pipeline,
410
- "event_code": event_code,
411
- "message": "Metrics endpoint - implement as needed"
412
- }
413
-
414
-
415
- # ===== ADMIN =====
416
- @app.post("/api/admin/indexes/create", tags=["Admin"])
417
- async def create_indexes():
418
- """Create MongoDB indexes"""
419
- from scripts.create_indexes import create_all_indexes
420
-
421
- try:
422
- create_all_indexes()
423
- return {"status": "success", "message": "Indexes created"}
424
- except Exception as e:
425
- raise HTTPException(status_code=500, detail=str(e))
426
-
427
-
428
- # ===== ROOT =====
429
- @app.get("/")
430
- async def root():
431
- """API root"""
432
- return {
433
- "name": "Audience Segmentation AI - Event-Centric",
434
- "version": "2.0.0",
435
- "docs": "/api/docs",
436
- "health": "/health"
437
- }
438
-
439
-
440
- if __name__ == "__main__":
441
- import uvicorn
442
- uvicorn.run(
443
- "app:app",
444
- host="0.0.0.0",
445
- port=7860,
446
- reload=True
447
- )
 
1
+ """
2
+ FastAPI Application for Event-Centric Audience Segmentation AI
3
+ Author: AI Generated
4
+ Created: 2025-11-24 (Refactored)
5
+ Purpose: REST API with event-based endpoints
6
+ """
7
+
8
+ from fastapi import FastAPI, HTTPException, BackgroundTasks, status, Query
9
+ from fastapi.middleware.cors import CORSMiddleware
10
+ from pydantic import BaseModel
11
+ from typing import List, Dict, Optional, Any
12
+ from datetime import datetime
13
+ from bson import ObjectId
14
+
15
+ # Import services
16
+ from services.segmentation_service import SegmentationService
17
+ from services.sentiment_service import SentimentAnalysisService
18
+ from services.genai_service import GenerativeAIService
19
+ from database import db
20
+ from config import settings
21
+
22
+
23
+ # FastAPI app
24
+ app = FastAPI(
25
+ title="Audience Segmentation AI - Event-Centric",
26
+ description="REST API for per-event audience analysis",
27
+ version="2.0.0",
28
+ docs_url="/api/docs",
29
+ redoc_url="/api/redoc"
30
+ )
31
+
32
+ # CORS
33
+ app.add_middleware(
34
+ CORSMiddleware,
35
+ allow_origins=["*"],
36
+ allow_credentials=True,
37
+ allow_methods=["*"],
38
+ allow_headers=["*"],
39
+ )
40
+
41
+
42
+ # Helper
43
+ def serialize_doc(doc: Dict) -> Optional[Dict]:
44
+ """Convert MongoDB document to JSON-serializable dict"""
45
+ if doc is None:
46
+ return None
47
+ if '_id' in doc:
48
+ doc['id'] = str(doc.pop('_id'))
49
+
50
+ # Handle nested ObjectIds and lists
51
+ for key, value in list(doc.items()):
52
+ if isinstance(value, ObjectId):
53
+ doc[key] = str(value)
54
+ elif isinstance(value, list):
55
+ doc[key] = [str(v) if isinstance(v, ObjectId) else v for v in value]
56
+ elif isinstance(value, dict):
57
+ doc[key] = serialize_doc(value)
58
+
59
+ return doc
60
+
61
+
62
+ # ===== HEALTH =====
63
+ @app.get("/health", tags=["System"])
64
+ async def health_check():
65
+ """Health check"""
66
+ try:
67
+ db.client.server_info()
68
+ return {
69
+ "status": "healthy",
70
+ "timestamp": datetime.utcnow(),
71
+ "database": "connected"
72
+ }
73
+ except Exception as e:
74
+ raise HTTPException(
75
+ status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
76
+ detail=f"Unhealthy: {str(e)}"
77
+ )
78
+
79
+
80
+ # ===== EVENT ANALYSIS =====
81
+ @app.post("/api/events/{event_code}/analyze", tags=["Event Analysis"])
82
+ async def analyze_event(event_code: str, background_tasks: BackgroundTasks):
83
+ """Run full AI pipeline for an event"""
84
+
85
+ def run_pipeline():
86
+ # Step 1: Segmentation
87
+ seg_service = SegmentationService(event_code)
88
+ seg_service.run_segmentation()
89
+
90
+ # Step 2: Sentiment
91
+ sent_service = SentimentAnalysisService(event_code)
92
+ sent_service.analyze_event_comments()
93
+
94
+ # Step 3: Email generation
95
+ genai_service = GenerativeAIService(event_code)
96
+ genai_service.generate_emails_for_all_segments()
97
+
98
+ # Step 4: Insights
99
+ genai_service.update_sentiment_summary_with_insights()
100
+
101
+ background_tasks.add_task(run_pipeline)
102
+
103
+ return {
104
+ "status": "started",
105
+ "message": f"Analysis pipeline started for event {event_code}"
106
+ }
107
+
108
+
109
+ @app.get("/api/events/{event_code}/dashboard", tags=["Event Analysis"])
110
+ async def get_event_dashboard(event_code: str):
111
+ """Get complete dashboard for Event Owner"""
112
+
113
+ # Get segments
114
+ segments = list(db.event_audience_segments.find({"event_code": event_code}))
115
+
116
+ # Get sentiment summary
117
+ sentiment_summary = db.event_sentiment_summary.find_one({"event_code": event_code})
118
+
119
+ return {
120
+ "event_code": event_code,
121
+ "segments": [serialize_doc(s) for s in segments],
122
+ "sentiment_summary": serialize_doc(sentiment_summary) if sentiment_summary else None
123
+ }
124
+
125
+
126
+ # ===== SEGMENTATION =====
127
+ @app.post("/api/events/{event_code}/segmentation/run", tags=["Segmentation"])
128
+ async def run_event_segmentation(
129
+ event_code: str,
130
+ background_tasks: BackgroundTasks,
131
+ n_clusters: int = Query(default=5, ge=2, le=10)
132
+ ):
133
+ """Run segmentation for an event"""
134
+
135
+ def run_task():
136
+ service = SegmentationService(event_code, n_clusters=n_clusters)
137
+ service.run_segmentation()
138
+
139
+ background_tasks.add_task(run_task)
140
+
141
+ return {
142
+ "status": "started",
143
+ "message": f"Segmentation started for event {event_code}",
144
+ "event_code": event_code
145
+ }
146
+
147
+
148
+ @app.get("/api/events/{event_code}/segments", tags=["Segmentation"])
149
+ async def get_event_segments(
150
+ event_code: str,
151
+ status_filter: Optional[str] = Query(default=None, description="Filter by Draft, Approved, Sent")
152
+ ):
153
+ """Get all segments for an event"""
154
+
155
+ query = {"event_code": event_code}
156
+ if status_filter:
157
+ query["marketing_content.status"] = status_filter
158
+
159
+ segments = list(db.event_audience_segments.find(query))
160
+
161
+ return [serialize_doc(s) for s in segments]
162
+
163
+
164
+ @app.get("/api/events/{event_code}/segments/{segment_id}", tags=["Segmentation"])
165
+ async def get_segment_detail(event_code: str, segment_id: str):
166
+ """Get specific segment details"""
167
+
168
+ segment = db.event_audience_segments.find_one({
169
+ "_id": ObjectId(segment_id),
170
+ "event_code": event_code
171
+ })
172
+
173
+ if not segment:
174
+ raise HTTPException(status_code=404, detail="Segment not found")
175
+
176
+ return serialize_doc(segment)
177
+
178
+
179
+ @app.get("/api/events/{event_code}/segments/{segment_id}/users", tags=["Segmentation"])
180
+ async def get_segment_users(
181
+ event_code: str,
182
+ segment_id: str,
183
+ skip: int = 0,
184
+ limit: int = 100
185
+ ):
186
+ """Get users in a segment with details"""
187
+
188
+ segment = db.event_audience_segments.find_one({
189
+ "_id": ObjectId(segment_id),
190
+ "event_code": event_code
191
+ })
192
+
193
+ if not segment:
194
+ raise HTTPException(status_code=404, detail="Segment not found")
195
+
196
+ user_ids = segment.get('user_ids', [])
197
+ total_users = len(user_ids)
198
+
199
+ # Paginate
200
+ paginated_ids = user_ids[skip:skip + limit]
201
+
202
+ # Get user details
203
+ users = list(db.users.find({
204
+ "_id": {"$in": paginated_ids}
205
+ }))
206
+
207
+ # Enrich with stats (optional)
208
+ enriched_users = []
209
+ for user in users:
210
+ enriched_users.append({
211
+ "user_id": str(user['_id']),
212
+ "email": user.get('email'),
213
+ "full_name": f"{user.get('FirstName', '')} {user.get('LastName', '')}".strip()
214
+ })
215
+
216
+ return {
217
+ "segment_id": segment_id,
218
+ "total_users": total_users,
219
+ "users": enriched_users
220
+ }
221
+
222
+
223
+ # ===== APPROVAL WORKFLOW =====
224
+ @app.post("/api/events/{event_code}/segments/{segment_id}/approve", tags=["Approval"])
225
+ async def approve_segment(
226
+ event_code: str,
227
+ segment_id: str,
228
+ approved_by: Optional[str] = None,
229
+ modified_subject: Optional[str] = None,
230
+ modified_body: Optional[str] = None
231
+ ):
232
+ """Event Owner approves marketing content"""
233
+
234
+ segment = db.event_audience_segments.find_one({
235
+ "_id": ObjectId(segment_id),
236
+ "event_code": event_code
237
+ })
238
+
239
+ if not segment:
240
+ raise HTTPException(status_code=404, detail="Segment not found")
241
+
242
+ # Update fields
243
+ update = {
244
+ "marketing_content.status": "Approved",
245
+ "marketing_content.approved_at": datetime.utcnow(),
246
+ "marketing_content.approved_by": approved_by,
247
+ "last_updated": datetime.utcnow()
248
+ }
249
+
250
+ if modified_subject:
251
+ update["marketing_content.email_subject"] = modified_subject
252
+ if modified_body:
253
+ update["marketing_content.email_body"] = modified_body
254
+
255
+ db.event_audience_segments.update_one(
256
+ {"_id": ObjectId(segment_id)},
257
+ {"$set": update}
258
+ )
259
+
260
+ updated_segment = db.event_audience_segments.find_one({"_id": ObjectId(segment_id)})
261
+
262
+ return {
263
+ "status": "success",
264
+ "message": "Segment approved",
265
+ "segment_id": segment_id,
266
+ "marketing_content": updated_segment.get('marketing_content')
267
+ }
268
+
269
+
270
+ @app.post("/api/events/{event_code}/segments/{segment_id}/send-email", tags=["Approval"])
271
+ async def send_segment_email(
272
+ event_code: str,
273
+ segment_id: str,
274
+ send_immediately: bool = True
275
+ ):
276
+ """Send approved marketing email"""
277
+
278
+ segment = db.event_audience_segments.find_one({
279
+ "_id": ObjectId(segment_id),
280
+ "event_code": event_code
281
+ })
282
+
283
+ if not segment:
284
+ raise HTTPException(status_code=404, detail="Segment not found")
285
+
286
+ marketing_content = segment.get('marketing_content', {})
287
+ if marketing_content.get('status') != "Approved":
288
+ raise HTTPException(status_code=400, detail="Segment not approved yet")
289
+
290
+ # TODO: Integrate with email service (SendGrid, AWS SES, etc.)
291
+ # For now, just mark as sent
292
+
293
+ db.event_audience_segments.update_one(
294
+ {"_id": ObjectId(segment_id)},
295
+ {"$set": {
296
+ "marketing_content.status": "Sent",
297
+ "last_updated": datetime.utcnow()
298
+ }}
299
+ )
300
+
301
+ return {
302
+ "status": "success",
303
+ "message": f"Email sent to {segment.get('user_count', 0)} users",
304
+ "segment_id": segment_id,
305
+ "emails_sent": segment.get('user_count', 0),
306
+ "emails_failed": 0
307
+ }
308
+
309
+
310
+ # ===== SENTIMENT =====
311
+ @app.post("/api/events/{event_code}/sentiment/analyze", tags=["Sentiment"])
312
+ async def analyze_event_sentiment(event_code: str, background_tasks: BackgroundTasks):
313
+ """Analyze sentiment for event comments"""
314
+
315
+ def run_task():
316
+ service = SentimentAnalysisService(event_code)
317
+ service.analyze_event_comments()
318
+
319
+ background_tasks.add_task(run_task)
320
+
321
+ return {
322
+ "status": "started",
323
+ "message": f"Sentiment analysis started for event {event_code}"
324
+ }
325
+
326
+
327
+ @app.get("/api/events/{event_code}/sentiment/summary", tags=["Sentiment"])
328
+ async def get_sentiment_summary(event_code: str):
329
+ """Get sentiment summary for an event"""
330
+
331
+ summary = db.event_sentiment_summary.find_one({"event_code": event_code})
332
+
333
+ if not summary:
334
+ raise HTTPException(status_code=404, detail="No sentiment data for this event")
335
+
336
+ return serialize_doc(summary)
337
+
338
+
339
+ @app.get("/api/events/{event_code}/sentiment/results", tags=["Sentiment"])
340
+ async def get_sentiment_results(
341
+ event_code: str,
342
+ sentiment_label: Optional[str] = None,
343
+ skip: int = 0,
344
+ limit: int = 100
345
+ ):
346
+ """Get detailed sentiment results"""
347
+
348
+ query = {"event_code": event_code}
349
+ if sentiment_label:
350
+ query["sentiment_label"] = sentiment_label
351
+
352
+ total = db.sentiment_results.count_documents(query)
353
+ results = list(
354
+ db.sentiment_results.find(query)
355
+ .sort("analyzed_at", -1)
356
+ .skip(skip)
357
+ .limit(limit)
358
+ )
359
+
360
+ return {
361
+ "total": total,
362
+ "results": [serialize_doc(r) for r in results]
363
+ }
364
+
365
+
366
+ # ===== GENAI =====
367
+ @app.post("/api/events/{event_code}/genai/generate-emails", tags=["GenAI"])
368
+ async def generate_event_emails(event_code: str, background_tasks: BackgroundTasks):
369
+ """Generate marketing emails for all segments"""
370
+
371
+ def run_task():
372
+ service = GenerativeAIService(event_code)
373
+ service.generate_emails_for_all_segments()
374
+
375
+ background_tasks.add_task(run_task)
376
+
377
+ return {
378
+ "status": "started",
379
+ "message": "Email generation started"
380
+ }
381
+
382
+
383
+ @app.post("/api/events/{event_code}/genai/generate-insights", tags=["GenAI"])
384
+ async def generate_event_insights(event_code: str, background_tasks: BackgroundTasks):
385
+ """Generate AI insights from negative feedback"""
386
+
387
+ def run_task():
388
+ service = GenerativeAIService(event_code)
389
+ service.update_sentiment_summary_with_insights()
390
+
391
+ background_tasks.add_task(run_task)
392
+
393
+ return {
394
+ "status": "started",
395
+ "message": "Insight generation started"
396
+ }
397
+
398
+
399
+ # ===== MONITORING =====
400
+ @app.get("/api/monitoring/pipelines/{pipeline}/metrics", tags=["Monitoring"])
401
+ async def get_pipeline_metrics(
402
+ pipeline: str,
403
+ event_code: Optional[str] = None,
404
+ days: int = 7
405
+ ):
406
+ """Get performance metrics"""
407
+ # TODO: Implement based on monitoring.py
408
+ return {
409
+ "pipeline": pipeline,
410
+ "event_code": event_code,
411
+ "message": "Metrics endpoint - implement as needed"
412
+ }
413
+
414
+
415
+ # ===== ADMIN =====
416
+ @app.post("/api/admin/indexes/create", tags=["Admin"])
417
+ async def create_indexes():
418
+ """Create MongoDB indexes"""
419
+ from scripts.create_indexes import create_all_indexes
420
+
421
+ try:
422
+ create_all_indexes()
423
+ return {"status": "success", "message": "Indexes created"}
424
+ except Exception as e:
425
+ raise HTTPException(status_code=500, detail=str(e))
426
+
427
+
428
+ # ===== ROOT =====
429
+ @app.get("/")
430
+ async def root():
431
+ """API root"""
432
+ return {
433
+ "name": "Audience Segmentation AI - Event-Centric",
434
+ "version": "2.0.0",
435
+ "docs": "/api/docs",
436
+ "health": "/health"
437
+ }
438
+
439
+
440
+ if __name__ == "__main__":
441
+ import uvicorn
442
+ uvicorn.run(
443
+ "app:app",
444
+ host="0.0.0.0",
445
+ port=7860,
446
+ reload=False
447
+ )