minhvtt commited on
Commit
34b2632
·
verified ·
1 Parent(s): d97be90

Upload 15 files

Browse files
models/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # MongoDB Models for Audience Segmentation AI Features
models/event_models.py ADDED
@@ -0,0 +1,112 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Event-Centric Pydantic Models for MongoDB
3
+ Author: AI Generated
4
+ Created: 2025-11-24
5
+ Purpose: Define schemas for event-specific analysis results
6
+ """
7
+
8
+ from pydantic import BaseModel, Field
9
+ from typing import List, Dict, Optional, Any
10
+ from datetime import datetime
11
+ from bson import ObjectId
12
+
13
+
14
+ class PyObjectId(ObjectId):
15
+ """Custom ObjectId type for Pydantic"""
16
+ @classmethod
17
+ def __get_validators__(cls):
18
+ yield cls.validate
19
+
20
+ @classmethod
21
+ def validate(cls, v):
22
+ if not ObjectId.is_valid(v):
23
+ raise ValueError("Invalid ObjectId")
24
+ return ObjectId(v)
25
+
26
+ @classmethod
27
+ def __modify_schema__(cls, field_schema):
28
+ field_schema.update(type="string")
29
+
30
+
31
+ class MarketingContent(BaseModel):
32
+ """Marketing email content generated by AI"""
33
+ email_subject: str
34
+ email_body: str
35
+ status: str = "Draft" # Draft, Approved, Sent
36
+ generated_at: datetime = Field(default_factory=datetime.utcnow)
37
+ approved_at: Optional[datetime] = None
38
+ approved_by: Optional[str] = None
39
+
40
+
41
+ class EventAudienceSegment(BaseModel):
42
+ """
43
+ Audience segment specific to an event.
44
+ Stores clustering results and marketing content for Event Owner review.
45
+ """
46
+ id: Optional[PyObjectId] = Field(default=None, alias="_id")
47
+ event_code: str = Field(..., description="Event identifier")
48
+ segment_name: str = Field(..., description="Human-readable segment name in Vietnamese")
49
+ segment_type: str = Field(..., description="Segment category (e.g., VIP, Potential, Dormant)")
50
+ user_count: int = Field(..., description="Number of users in this segment")
51
+ user_ids: List[PyObjectId] = Field(default_factory=list, description="List of user ObjectIds in this segment")
52
+
53
+ criteria: Dict[str, Any] = Field(
54
+ default_factory=dict,
55
+ description="Average statistics for this segment (e.g., avg_spend, avg_tickets, avg_recency)"
56
+ )
57
+
58
+ marketing_content: Optional[MarketingContent] = Field(
59
+ default=None,
60
+ description="AI-generated marketing email (Draft, pending approval)"
61
+ )
62
+
63
+ created_at: datetime = Field(default_factory=datetime.utcnow)
64
+ last_updated: datetime = Field(default_factory=datetime.utcnow)
65
+
66
+ class Config:
67
+ populate_by_name = True
68
+ arbitrary_types_allowed = True
69
+ json_encoders = {ObjectId: str}
70
+
71
+
72
+ class AIInsights(BaseModel):
73
+ """AI-generated insights from sentiment analysis"""
74
+ summary: str = Field(..., description="Overall sentiment summary in Vietnamese")
75
+ top_issues: List[str] = Field(default_factory=list, description="Top 5 recurring issues")
76
+ improvement_suggestions: List[str] = Field(default_factory=list, description="Actionable suggestions")
77
+ predicted_nps: Optional[float] = Field(None, description="Predicted Net Promoter Score (0-100)")
78
+
79
+
80
+ class EventSentimentSummary(BaseModel):
81
+ """
82
+ Aggregated sentiment analysis summary for an event.
83
+ Provides Event Owner with quick insights about attendee feedback.
84
+ """
85
+ id: Optional[PyObjectId] = Field(default=None, alias="_id")
86
+ event_code: str = Field(..., description="Event identifier")
87
+
88
+ total_comments: int = Field(default=0, description="Total number of comments analyzed")
89
+
90
+ sentiment_distribution: Dict[str, int] = Field(
91
+ default_factory=dict,
92
+ description="Count of Positive, Negative, Neutral comments"
93
+ )
94
+
95
+ avg_confidence: float = Field(default=0.0, description="Average confidence score of sentiment predictions")
96
+
97
+ top_keywords: List[str] = Field(
98
+ default_factory=list,
99
+ description="Most frequently mentioned keywords/phrases"
100
+ )
101
+
102
+ ai_insights: Optional[AIInsights] = Field(
103
+ default=None,
104
+ description="AI-generated insights and recommendations"
105
+ )
106
+
107
+ last_updated: datetime = Field(default_factory=datetime.utcnow)
108
+
109
+ class Config:
110
+ populate_by_name = True
111
+ arbitrary_types_allowed = True
112
+ json_encoders = {ObjectId: str}
models/segmentation_models.py ADDED
@@ -0,0 +1,69 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ MongoDB Models for Audience Segmentation
3
+ Author: AI Generated
4
+ Created: 2025-11-24
5
+ Purpose: Define data models for storing audience segmentation results
6
+ """
7
+
8
+ from datetime import datetime
9
+ from typing import Optional, List, Dict
10
+ from pydantic import BaseModel, Field
11
+ from bson import ObjectId
12
+
13
+
14
+ class PyObjectId(ObjectId):
15
+ """Custom ObjectId type for Pydantic"""
16
+ @classmethod
17
+ def __get_validators__(cls):
18
+ yield cls.validate
19
+
20
+ @classmethod
21
+ def validate(cls, v):
22
+ if not ObjectId.is_valid(v):
23
+ raise ValueError("Invalid ObjectId")
24
+ return ObjectId(v)
25
+
26
+ @classmethod
27
+ def __modify_schema__(cls, field_schema):
28
+ field_schema.update(type="string")
29
+
30
+
31
+ class AudienceSegment(BaseModel):
32
+ """
33
+ Defines the characteristics of an audience segment.
34
+ This is the result of K-Means clustering on user behavior data.
35
+ """
36
+ id: Optional[PyObjectId] = Field(default_factory=PyObjectId, alias="_id")
37
+ segment_name: str = Field(..., description="Human-readable segment name, e.g., 'Big Spenders', 'Music Lovers'")
38
+ description: Optional[str] = Field(None, description="Detailed description of this segment")
39
+ criteria: Dict = Field(default_factory=dict, description="Statistical criteria: min_spend, max_spend, top_categories, etc.")
40
+ user_count: int = Field(0, description="Number of users in this segment")
41
+ last_updated: datetime = Field(default_factory=datetime.utcnow)
42
+
43
+ # Generative AI Output
44
+ marketing_content: Optional[Dict] = Field(
45
+ None,
46
+ description="AI-generated marketing content: { 'email_subject': str, 'email_body': str }"
47
+ )
48
+
49
+ class Config:
50
+ allow_population_by_field_name = True
51
+ arbitrary_types_allowed = True
52
+ json_encoders = {ObjectId: str}
53
+
54
+
55
+ class UserSegmentAssignment(BaseModel):
56
+ """
57
+ Links a user to their assigned segment.
58
+ Many-to-one relationship: many users belong to one segment.
59
+ """
60
+ id: Optional[PyObjectId] = Field(default_factory=PyObjectId, alias="_id")
61
+ user_id: PyObjectId = Field(..., description="Reference to User._id")
62
+ segment_id: PyObjectId = Field(..., description="Reference to AudienceSegment._id")
63
+ confidence_score: float = Field(..., description="Distance to cluster center (lower is better)")
64
+ assigned_at: datetime = Field(default_factory=datetime.utcnow)
65
+
66
+ class Config:
67
+ allow_population_by_field_name = True
68
+ arbitrary_types_allowed = True
69
+ json_encoders = {ObjectId: str}
models/sentiment_models.py ADDED
@@ -0,0 +1,82 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Sentiment Analysis Pydantic Models for MongoDB
3
+ Author: AI Generated
4
+ Created: 2025-11-24
5
+ Purpose: Define schemas for sentiment analysis results
6
+ """
7
+
8
+ from pydantic import BaseModel, Field
9
+ from typing import List, Optional, Dict
10
+ from datetime import datetime
11
+ from bson import ObjectId
12
+
13
+
14
+ class PyObjectId(ObjectId):
15
+ """Custom ObjectId type for Pydantic"""
16
+ @classmethod
17
+ def __get_validators__(cls):
18
+ yield cls.validate
19
+
20
+ @classmethod
21
+ def validate(cls, v):
22
+ if not ObjectId.is_valid(v):
23
+ raise ValueError("Invalid ObjectId")
24
+ return ObjectId(v)
25
+
26
+ @classmethod
27
+ def __modify_schema__(cls, field_schema):
28
+ field_schema.update(type="string")
29
+
30
+
31
+ class SentimentAnalysisResult(BaseModel):
32
+ """Individual sentiment analysis result for a comment/feedback"""
33
+ id: Optional[PyObjectId] = Field(default=None, alias="_id")
34
+ source_id: PyObjectId = Field(..., description="ID of the original comment/post")
35
+ source_type: str = Field(default="UserCommentPost", description="Type of source")
36
+
37
+ # NEW: Event context
38
+ event_code: str = Field(..., description="Event identifier this comment belongs to")
39
+
40
+ sentiment_label: str = Field(..., description="Positive, Negative, or Neutral")
41
+ confidence_score: float = Field(..., ge=0.0, le=1.0, description="Model confidence (0-1)")
42
+
43
+ key_phrases: List[str] = Field(
44
+ default_factory=list,
45
+ description="Extracted keywords/phrases from the text"
46
+ )
47
+
48
+ analyzed_at: datetime = Field(default_factory=datetime.utcnow)
49
+
50
+ class Config:
51
+ populate_by_name = True
52
+ arbitrary_types_allowed = True
53
+ json_encoders = {ObjectId: str}
54
+
55
+
56
+ class EventInsightReport(BaseModel):
57
+ """
58
+ High-level insights for an event, generated by LLM.
59
+ Includes Top 5 issues, NPS prediction, and improvement suggestions.
60
+ """
61
+ id: Optional[PyObjectId] = Field(default=None, alias="_id")
62
+ event_code: str = Field(..., description="Reference to EventVersion.EventCode")
63
+ report_date: datetime = Field(default_factory=datetime.utcnow)
64
+ total_comments: int = Field(0, description="Total number of comments analyzed")
65
+ sentiment_breakdown: Dict[str, int] = Field(
66
+ default_factory=dict,
67
+ description="Count by sentiment: { 'Positive': 50, 'Negative': 10, 'Neutral': 20 }"
68
+ )
69
+ predicted_nps: Optional[float] = Field(None, description="Predicted NPS score (0-100)")
70
+ top_issues: List[str] = Field(
71
+ default_factory=list,
72
+ description="Top 5 recurring issues, e.g., ['Check-in slow', 'Sound quality poor']"
73
+ )
74
+ improvement_suggestions: List[str] = Field(
75
+ default_factory=list,
76
+ description="AI-generated suggestions for improvement"
77
+ )
78
+
79
+ class Config:
80
+ populate_by_name = True
81
+ arbitrary_types_allowed = True
82
+ json_encoders = {ObjectId: str}
scripts/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # scripts/__init__.py - marker file
scripts/create_indexes.py ADDED
@@ -0,0 +1,111 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ MongoDB Index Creation Script
3
+ Author: AI Generated
4
+ Created: 2025-11-24
5
+ Purpose: Create performance indexes for event-centric queries
6
+ """
7
+
8
+ from database import db
9
+ from config import settings
10
+
11
+
12
+ def create_all_indexes():
13
+ """
14
+ Create all necessary indexes for optimal performance.
15
+ Run this once during deployment or when setting up a new environment.
16
+ """
17
+
18
+ print("=" * 60)
19
+ print("🔧 Creating MongoDB Indexes")
20
+ print("=" * 60)
21
+
22
+ # Payment Collection Indexes
23
+ print("\n📊 Payment Collection:")
24
+
25
+ # Index for event-specific ticket purchases
26
+ db.payments.create_index(
27
+ [("EventCode", 1), ("Status", 1), ("UserId", 1)],
28
+ name="idx_payment_event_status_user"
29
+ )
30
+ print(" ✓ Created: idx_payment_event_status_user")
31
+
32
+ # Index for user RFM calculation
33
+ db.payments.create_index(
34
+ [("UserId", 1), ("TransactionDate", -1)],
35
+ name="idx_payment_user_date"
36
+ )
37
+ print(" ✓ Created: idx_payment_user_date")
38
+
39
+ # UserFollow Collection Indexes
40
+ print("\n👥 UserFollow Collection:")
41
+
42
+ # Index for event followers
43
+ db.user_follows.create_index(
44
+ [("EventCode", 1), ("userId", 1)],
45
+ name="idx_follow_event_user"
46
+ )
47
+ print(" ✓ Created: idx_follow_event_user")
48
+
49
+ # UserCommentPost Collection Indexes
50
+ print("\n💬 UserCommentPost Collection:")
51
+
52
+ # Index for event comments
53
+ db.user_comment_post.create_index(
54
+ [("EventCode", 1), ("CreatedDate", -1)],
55
+ name="idx_comment_event_date"
56
+ )
57
+ print(" ✓ Created: idx_comment_event_date")
58
+
59
+ # EventAudienceSegment Collection Indexes
60
+ print("\n🎯 EventAudienceSegment Collection:")
61
+
62
+ # Index for event owner dashboard
63
+ db.event_audience_segments.create_index(
64
+ [("event_code", 1)],
65
+ name="idx_segment_event"
66
+ )
67
+ print(" ✓ Created: idx_segment_event")
68
+
69
+ # Index for status filtering
70
+ db.event_audience_segments.create_index(
71
+ [("event_code", 1), ("marketing_content.status", 1)],
72
+ name="idx_segment_event_status"
73
+ )
74
+ print(" ✓ Created: idx_segment_event_status")
75
+
76
+ # EventSentimentSummary Collection Indexes
77
+ print("\n📊 EventSentimentSummary Collection:")
78
+
79
+ # Index for event sentiment lookup
80
+ db.event_sentiment_summary.create_index(
81
+ [("event_code", 1), ("last_updated", -1)],
82
+ name="idx_sentiment_event_date"
83
+ )
84
+ print(" ✓ Created: idx_sentiment_event_date")
85
+
86
+ # SentimentAnalysisResult Collection Indexes
87
+ print("\n💭 SentimentAnalysisResult Collection:")
88
+
89
+ # Index for event-specific sentiment results
90
+ db.sentiment_results.create_index(
91
+ [("event_code", 1), ("analyzed_at", -1)],
92
+ name="idx_sentiment_result_event_date"
93
+ )
94
+ print(" ✓ Created: idx_sentiment_result_event_date")
95
+
96
+ print("\n" + "=" * 60)
97
+ print("✅ All Indexes Created Successfully!")
98
+ print("=" * 60)
99
+
100
+ # List all indexes for verification
101
+ print("\n📋 Index Summary:")
102
+ print(f" Payment: {len(list(db.payments.list_indexes()))} indexes")
103
+ print(f" UserFollow: {len(list(db.user_follows.list_indexes()))} indexes")
104
+ print(f" UserCommentPost: {len(list(db.user_comment_post.list_indexes()))} indexes")
105
+ print(f" EventAudienceSegment: {len(list(db.event_audience_segments.list_indexes()))} indexes")
106
+ print(f" EventSentimentSummary: {len(list(db.event_sentiment_summary.list_indexes()))} indexes")
107
+ print(f" SentimentAnalysisResult: {len(list(db.sentiment_results.list_indexes()))} indexes")
108
+
109
+
110
+ if __name__ == "__main__":
111
+ create_all_indexes()
services/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # Services Package
services/data_aggregation.py ADDED
@@ -0,0 +1,188 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Data Aggregation Pipeline for Event-Centric User Segmentation
3
+ Author: AI Generated
4
+ Created: 2025-11-24 (Refactored for event-centric analysis)
5
+ Purpose: Aggregate user features for a specific event using MongoDB pipelines
6
+ """
7
+
8
+ from typing import List, Dict
9
+ from datetime import datetime
10
+ from database import db
11
+ from config import settings
12
+
13
+
14
+ class UserDataAggregator:
15
+ """
16
+ Aggregates user behavioral data for segmentation per event.
17
+ Uses MongoDB Aggregation Framework to minimize data transfer.
18
+ """
19
+
20
+ def __init__(self, event_code: str):
21
+ """
22
+ Initialize aggregator for a specific event.
23
+
24
+ Args:
25
+ event_code: Event identifier to filter users
26
+ """
27
+ self.event_code = event_code
28
+ self.db = db
29
+
30
+ def aggregate_user_features(self) -> List[Dict]:
31
+ """
32
+ Aggregate user features for the specified event.
33
+
34
+ Returns users who:
35
+ 1. Bought tickets for this event
36
+ 2. Follow this event
37
+ 3. Commented on this event
38
+
39
+ Returns: List of user feature vectors with event-specific context
40
+ """
41
+
42
+ pipeline = [
43
+ # Stage 1: Start with users who interacted with THIS event
44
+ {
45
+ "$match": {
46
+ "Status": "Active"
47
+ }
48
+ },
49
+
50
+ # Stage 2: Lookup tickets bought for THIS EVENT
51
+ {
52
+ "$lookup": {
53
+ "from": settings.COLLECTION_PAYMENTS,
54
+ "let": {"user_id": "$_id"},
55
+ "pipeline": [
56
+ {
57
+ "$match": {
58
+ "$expr": {
59
+ "$and": [
60
+ {"$eq": ["$UserId", "$$user_id"]},
61
+ {"$eq": ["$EventCode", self.event_code]},
62
+ {"$eq": ["$Status", "Completed"]}
63
+ ]
64
+ }
65
+ }
66
+ }
67
+ ],
68
+ "as": "event_tickets"
69
+ }
70
+ },
71
+
72
+ # Stage 3: Lookup follows for THIS EVENT
73
+ {
74
+ "$lookup": {
75
+ "from": settings.COLLECTION_USER_FOLLOWS,
76
+ "let": {"user_id": "$_id"},
77
+ "pipeline": [
78
+ {
79
+ "$match": {
80
+ "$expr": {
81
+ "$and": [
82
+ {"$eq": ["$userId", "$$user_id"]},
83
+ {"$eq": ["$EventCode", self.event_code]}
84
+ ]
85
+ }
86
+ }
87
+ }
88
+ ],
89
+ "as": "event_follows"
90
+ }
91
+ },
92
+
93
+ # Stage 4: Lookup all payments for global RFM (user lifetime value)
94
+ {
95
+ "$lookup": {
96
+ "from": settings.COLLECTION_PAYMENTS,
97
+ "localField": "_id",
98
+ "foreignField": "UserId",
99
+ "as": "all_payments",
100
+ "pipeline": [
101
+ {
102
+ "$match": {
103
+ "Status": "Completed"
104
+ }
105
+ }
106
+ ]
107
+ }
108
+ },
109
+
110
+ # Stage 5: Filter users who interacted with this event
111
+ {
112
+ "$match": {
113
+ "$or": [
114
+ {"event_tickets": {"$ne": []}},
115
+ {"event_follows": {"$ne": []}}
116
+ ]
117
+ }
118
+ },
119
+
120
+ # Stage 6: Calculate event-specific metrics
121
+ {
122
+ "$addFields": {
123
+ # Event-specific: tickets bought for THIS event
124
+ "event_ticket_count": {"$size": "$event_tickets"},
125
+ "event_total_spend": {"$sum": "$event_tickets.Amount"},
126
+
127
+ # Event-specific: follow status
128
+ "is_follower": {
129
+ "$cond": [
130
+ {"$gt": [{"$size": "$event_follows"}, 0]},
131
+ 1,
132
+ 0
133
+ ]
134
+ },
135
+
136
+ # Global RFM: user's overall purchasing power
137
+ "global_total_spend": {"$sum": "$all_payments.Amount"},
138
+ "global_transaction_count": {"$size": "$all_payments"},
139
+ "global_last_transaction": {"$max": "$all_payments.TransactionDate"}
140
+ }
141
+ },
142
+
143
+ # Stage 7: Calculate global recency
144
+ {
145
+ "$addFields": {
146
+ "global_recency_days": {
147
+ "$cond": {
148
+ "if": {"$gt": ["$global_last_transaction", None]},
149
+ "then": {
150
+ "$dateDiff": {
151
+ "startDate": "$global_last_transaction",
152
+ "endDate": "$$NOW",
153
+ "unit": "day"
154
+ }
155
+ },
156
+ "else": 999999
157
+ }
158
+ }
159
+ }
160
+ },
161
+
162
+ # Stage 8: Project final feature vector
163
+ {
164
+ "$project": {
165
+ "_id": 1,
166
+ "user_id": "$_id",
167
+ "email": 1,
168
+ "firstName": "$FirstName",
169
+ "lastName": "$LastName",
170
+
171
+ # Event-specific features
172
+ "event_ticket_count": 1,
173
+ "event_total_spend": 1,
174
+ "is_follower": 1,
175
+
176
+ # Global features (user power)
177
+ "global_recency": "$global_recency_days",
178
+ "global_frequency": "$global_transaction_count",
179
+ "global_monetary": "$global_total_spend"
180
+ }
181
+ }
182
+ ]
183
+
184
+ print(f"🔄 Running aggregation for event: {self.event_code}")
185
+ results = list(self.db.users.aggregate(pipeline, allowDiskUse=True))
186
+ print(f"✓ Found {len(results)} users who interacted with this event")
187
+
188
+ return results
services/feedback.py ADDED
@@ -0,0 +1,224 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Feedback Loop System
3
+ Author: AI Generated
4
+ Created: 2025-11-24
5
+ Purpose: Collect feedback metrics to improve AI models over time
6
+ """
7
+
8
+ from datetime import datetime
9
+ from typing import Dict, Optional
10
+ from bson import ObjectId
11
+
12
+ from database import db
13
+
14
+
15
+ class FeedbackCollector:
16
+ """
17
+ Collect feedback on AI outputs for continuous improvement.
18
+ """
19
+
20
+ def __init__(self):
21
+ self.collection = "AIFeedback"
22
+
23
+ def record_email_engagement(self,
24
+ segment_id: str,
25
+ user_id: str,
26
+ opened: bool = False,
27
+ clicked: bool = False,
28
+ converted: bool = False,
29
+ unsubscribed: bool = False):
30
+ """
31
+ Record email engagement metrics.
32
+ Used to evaluate email generation quality.
33
+ """
34
+ doc = {
35
+ "feedback_type": "email_engagement",
36
+ "segment_id": ObjectId(segment_id),
37
+ "user_id": ObjectId(user_id),
38
+ "opened": opened,
39
+ "clicked": clicked,
40
+ "converted": converted,
41
+ "unsubscribed": unsubscribed,
42
+ "timestamp": datetime.utcnow()
43
+ }
44
+
45
+ db.get_collection(self.collection).insert_one(doc)
46
+
47
+ def record_sentiment_correction(self,
48
+ analysis_id: str,
49
+ original_label: str,
50
+ corrected_label: str,
51
+ corrected_by: str):
52
+ """
53
+ Record manual corrections to sentiment analysis.
54
+ Used to fine-tune PhoBERT.
55
+ """
56
+ doc = {
57
+ "feedback_type": "sentiment_correction",
58
+ "analysis_id": ObjectId(analysis_id),
59
+ "original_label": original_label,
60
+ "corrected_label": corrected_label,
61
+ "corrected_by": corrected_by,
62
+ "timestamp": datetime.utcnow()
63
+ }
64
+
65
+ db.get_collection(self.collection).insert_one(doc)
66
+
67
+ def record_segment_feedback(self,
68
+ segment_id: str,
69
+ user_id: str,
70
+ interaction_type: str,
71
+ value: Optional[float] = None):
72
+ """
73
+ Record user interactions with segment-targeted campaigns.
74
+
75
+ interaction_type: 'purchase', 'view', 'ignore', etc.
76
+ value: revenue/engagement metric
77
+ """
78
+ doc = {
79
+ "feedback_type": "segment_interaction",
80
+ "segment_id": ObjectId(segment_id),
81
+ "user_id": ObjectId(user_id),
82
+ "interaction_type": interaction_type,
83
+ "value": value,
84
+ "timestamp": datetime.utcnow()
85
+ }
86
+
87
+ db.get_collection(self.collection).insert_one(doc)
88
+
89
+ def record_insight_usefulness(self,
90
+ insight_report_id: str,
91
+ user_id: str,
92
+ rating: int,
93
+ implemented: bool = False):
94
+ """
95
+ Record how useful an insight report was.
96
+ rating: 1-5 stars
97
+ """
98
+ doc = {
99
+ "feedback_type": "insight_rating",
100
+ "insight_report_id": ObjectId(insight_report_id),
101
+ "user_id": user_id,
102
+ "rating": rating,
103
+ "implemented": implemented,
104
+ "timestamp": datetime.utcnow()
105
+ }
106
+
107
+ db.get_collection(self.collection).insert_one(doc)
108
+
109
+ def get_email_performance(self, segment_id: str) -> Dict:
110
+ """
111
+ Get aggregated email performance for a segment.
112
+ """
113
+ pipeline = [
114
+ {
115
+ "$match": {
116
+ "feedback_type": "email_engagement",
117
+ "segment_id": ObjectId(segment_id)
118
+ }
119
+ },
120
+ {
121
+ "$group": {
122
+ "_id": None,
123
+ "total_sent": {"$sum": 1},
124
+ "opened": {"$sum": {"$cond": ["$opened", 1, 0]}},
125
+ "clicked": {"$sum": {"$cond": ["$clicked", 1, 0]}},
126
+ "converted": {"$sum": {"$cond": ["$converted", 1, 0]}},
127
+ "unsubscribed": {"$sum": {"$cond": ["$unsubscribed", 1, 0]}}
128
+ }
129
+ }
130
+ ]
131
+
132
+ results = list(db.get_collection(self.collection).aggregate(pipeline))
133
+
134
+ if not results:
135
+ return {"error": "No data"}
136
+
137
+ data = results[0]
138
+ total = data["total_sent"]
139
+
140
+ return {
141
+ "total_sent": total,
142
+ "open_rate": data["opened"] / total if total > 0 else 0,
143
+ "click_rate": data["clicked"] / total if total > 0 else 0,
144
+ "conversion_rate": data["converted"] / total if total > 0 else 0,
145
+ "unsubscribe_rate": data["unsubscribed"] / total if total > 0 else 0
146
+ }
147
+
148
+ def get_sentiment_accuracy(self) -> Dict:
149
+ """
150
+ Calculate sentiment analysis accuracy based on corrections.
151
+ """
152
+ corrections = list(db.get_collection(self.collection).find({
153
+ "feedback_type": "sentiment_correction"
154
+ }))
155
+
156
+ if not corrections:
157
+ return {"error": "No corrections recorded"}
158
+
159
+ total = len(corrections)
160
+ correct = sum(1 for c in corrections if c["original_label"] == c["corrected_label"])
161
+
162
+ accuracy = correct / total
163
+
164
+ # Breakdown by label
165
+ by_label = {}
166
+ for c in corrections:
167
+ label = c["original_label"]
168
+ if label not in by_label:
169
+ by_label[label] = {"total": 0, "correct": 0}
170
+ by_label[label]["total"] += 1
171
+ if c["original_label"] == c["corrected_label"]:
172
+ by_label[label]["correct"] += 1
173
+
174
+ for label in by_label:
175
+ data = by_label[label]
176
+ by_label[label]["accuracy"] = data["correct"] / data["total"]
177
+
178
+ return {
179
+ "overall_accuracy": accuracy,
180
+ "total_corrections": total,
181
+ "by_label": by_label
182
+ }
183
+
184
+ def get_retaining_dataset(self) -> tuple:
185
+ """
186
+ Get dataset for retraining sentiment model from corrections.
187
+ Returns: (texts, labels)
188
+ """
189
+ corrections = list(db.get_collection(self.collection).find({
190
+ "feedback_type": "sentiment_correction"
191
+ }))
192
+
193
+ # Fetch original texts
194
+ analysis_ids = [c["analysis_id"] for c in corrections]
195
+ analyses = {
196
+ str(a["_id"]): a
197
+ for a in db.sentiment_results.find({"_id": {"$in": analysis_ids}})
198
+ }
199
+
200
+ # Get comment texts
201
+ source_ids = [analyses[str(c["analysis_id"])]["source_id"] for c in corrections if str(c["analysis_id"]) in analyses]
202
+ comments = {
203
+ str(c["_id"]): c.get("CommentText", "")
204
+ for c in db.user_comment_post.find({"_id": {"$in": source_ids}})
205
+ }
206
+
207
+ # Build training data
208
+ texts = []
209
+ labels = []
210
+
211
+ for c in corrections:
212
+ analysis_id_str = str(c["analysis_id"])
213
+ if analysis_id_str in analyses:
214
+ source_id_str = str(analyses[analysis_id_str]["source_id"])
215
+ if source_id_str in comments:
216
+ texts.append(comments[source_id_str])
217
+ labels.append(c["corrected_label"])
218
+
219
+ print(f"✓ Built retraining dataset: {len(texts)} samples")
220
+ return texts, labels
221
+
222
+
223
+ # Global feedback collector
224
+ feedback = FeedbackCollector()
services/genai_service.py ADDED
@@ -0,0 +1,332 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Event-Centric Generative AI Service
3
+ Author: AI Generated
4
+ Created: 2025-11-24 (Refactored)
5
+ Purpose: Generate marketing content and insights with event context
6
+ """
7
+
8
+ from llama_cpp import Llama
9
+ from typing import Dict, List
10
+ from datetime import datetime
11
+ from bson import ObjectId
12
+
13
+ from database import db
14
+ from config import settings
15
+ from models.event_models import EventSentimentSummary, AIInsights, MarketingContent
16
+ from services.monitoring import monitor
17
+ from services.model_registry import registry
18
+
19
+
20
+ class GenerativeAIService:
21
+ """
22
+ Event-centric GenAI using Vistral-7B-Chat.
23
+ """
24
+
25
+ def __init__(self, event_code: str):
26
+ """
27
+ Initialize for a specific event.
28
+
29
+ Args:
30
+ event_code: Event identifier
31
+ """
32
+ self.event_code = event_code
33
+ self.model_path = settings.LLM_LOCAL_PATH
34
+ self.llm = None
35
+
36
+ def load_model(self):
37
+ """Load Vistral-7B-Chat model"""
38
+ print(f"🔄 Loading Vistral-7B-Chat from {self.model_path}")
39
+ self.llm = Llama(
40
+ model_path=self.model_path,
41
+ n_ctx=2048,
42
+ n_threads=4,
43
+ n_gpu_layers=0 # CPU only
44
+ )
45
+ print("✓ Model loaded")
46
+
47
+ def generate_email_for_segment(self, segment: Dict) -> MarketingContent:
48
+ """
49
+ Generate personalized email for a segment.
50
+
51
+ Task: NLG for Personalization with Event Context
52
+ """
53
+ if not self.llm:
54
+ self.load_model()
55
+
56
+ # Get event info
57
+ event = db.event_versions.find_one({"_id": self.event_code})
58
+ event_name = event.get("EventName", "Sự kiện") if event else "Sự kiện"
59
+
60
+ # Build prompt with event context
61
+ prompt = f"""Bạn là chuyên gia marketing sự kiện.
62
+
63
+ Sự kiện: {event_name}
64
+ Mã sự kiện: {self.event_code}
65
+
66
+ Phân khúc khách hàng: {segment['segment_name']}
67
+ Đặc điểm:
68
+ - Số vé trung bình: {segment['criteria'].get('event_tickets', 0):.1f}
69
+ - Chi tiêu trung bình: {segment['criteria'].get('event_spend', 0):,.0f} VNĐ
70
+ - Giá trị khách hàng toàn cục: {segment['criteria'].get('global_monetary', 0):,.0f} VNĐ
71
+
72
+ Nhiệm vụ: Tạo email marketing cá nhân hóa cho phân khúc này.
73
+
74
+ Định dạng:
75
+ SUBJECT: [tiêu đề email hấp dẫn]
76
+ BODY:
77
+ [nội dung email bằng tiếng Việt, 3-4 đoạn văn, tập trung vào giá trị cho khách hàng]
78
+ """
79
+
80
+ response = self.llm(
81
+ prompt,
82
+ max_tokens=512,
83
+ temperature=0.7,
84
+ stop=["</s>", "###"]
85
+ )
86
+
87
+ generated_text = response['choices'][0]['text']
88
+
89
+ # Parse response
90
+ lines = generated_text.split('\n')
91
+ subject = ""
92
+ body_lines = []
93
+
94
+ for line in lines:
95
+ if line.startswith("SUBJECT:"):
96
+ subject = line.replace("SUBJECT:", "").strip()
97
+ elif line.startswith("BODY:"):
98
+ continue
99
+ elif line.strip():
100
+ body_lines.append(line.strip())
101
+
102
+ if not subject:
103
+ subject = f"Ưu đãi đặc biệt cho {segment['segment_name']}"
104
+
105
+ body = "\n\n".join(body_lines) if body_lines else generated_text
106
+
107
+ return MarketingContent(
108
+ email_subject=subject,
109
+ email_body=body,
110
+ status="Draft",
111
+ generated_at=datetime.utcnow()
112
+ )
113
+
114
+ def generate_emails_for_all_segments(self):
115
+ """
116
+ Generate emails for all segments of this event.
117
+ """
118
+ import time
119
+ start_time = time.time()
120
+
121
+ print("=" * 60)
122
+ print(f"🚀 Generating Emails for Event: {self.event_code}")
123
+ print("=" * 60)
124
+
125
+ try:
126
+ if not self.llm:
127
+ self.load_model()
128
+
129
+ # Find segments without marketing content
130
+ segments = list(db.event_audience_segments.find({
131
+ "event_code": self.event_code,
132
+ "marketing_content": None
133
+ }))
134
+
135
+ if not segments:
136
+ print("✓ All segments already have marketing content")
137
+ return
138
+
139
+ print(f"✓ Generating for {len(segments)} segments")
140
+
141
+ for segment in segments:
142
+ print(f"\n🔄 {segment['segment_name']}...")
143
+
144
+ email_content = self.generate_email_for_segment(segment)
145
+
146
+ # Update segment
147
+ db.event_audience_segments.update_one(
148
+ {"_id": segment['_id']},
149
+ {"$set": {
150
+ "marketing_content": email_content.dict(),
151
+ "last_updated": datetime.utcnow()
152
+ }}
153
+ )
154
+
155
+ print(f" ✓ Subject: {email_content.email_subject[:50]}...")
156
+
157
+ # Save prompt template
158
+ registry.save_prompt_template(
159
+ f"email_gen_{self.event_code}",
160
+ "Email generation prompt for event segments",
161
+ {"event_code": self.event_code, "version": "1.0"}
162
+ )
163
+
164
+ # Monitoring
165
+ execution_time = time.time() - start_time
166
+ metrics = {
167
+ "event_code": self.event_code,
168
+ "n_generated": len(segments),
169
+ "total_time": execution_time
170
+ }
171
+ monitor.log_genai_run("email_generation", metrics)
172
+
173
+ print("=" * 60)
174
+ print("✅ Email Generation Complete!")
175
+ print(f"⏱️ Time: {execution_time:.2f}s")
176
+ print("=" * 60)
177
+
178
+ except Exception as e:
179
+ monitor.log_error("genai_email", e, {
180
+ "event_code": self.event_code
181
+ })
182
+ raise
183
+
184
+ def generate_insights_from_sentiment(self) -> AIInsights:
185
+ """
186
+ Generate AI insights from negative comments.
187
+
188
+ Task: Prompted Generative Layer for Insight Extraction
189
+ """
190
+ if not self.llm:
191
+ self.load_model()
192
+
193
+ # Get negative comments
194
+ negative_results = list(db.sentiment_results.find({
195
+ "event_code": self.event_code,
196
+ "sentiment_label": "Negative"
197
+ }).limit(50))
198
+
199
+ if not negative_results:
200
+ return AIInsights(
201
+ summary="Không có phản hồi tiêu cực.",
202
+ top_issues=[],
203
+ improvement_suggestions=[],
204
+ predicted_nps=70.0
205
+ )
206
+
207
+ # Get comment texts
208
+ comment_ids = [r['source_id'] for r in negative_results]
209
+ comments = list(db.user_comment_post.find({
210
+ "_id": {"$in": comment_ids}
211
+ }))
212
+
213
+ negative_texts = [c.get('CommentText', '') for c in comments if c.get('CommentText')]
214
+
215
+ # Build prompt
216
+ comments_text = "\n".join([f"- {text[:100]}" for text in negative_texts[:20]])
217
+
218
+ prompt = f"""Bạn là chuyên gia phân tích feedback sự kiện.
219
+
220
+ Sự kiện: {self.event_code}
221
+ Số feedback tiêu cực: {len(negative_texts)}
222
+
223
+ Feedback tiêu cực:
224
+ {comments_text}
225
+
226
+ Nhiệm vụ: Phân tích và đưa ra:
227
+ 1. TOP 5 vấn đề chính (mỗi vấn đề 1 dòng)
228
+ 2. ĐỀ XUẤT cải thiện (3-5 đề xuất cụ thể)
229
+ 3. DỰ ĐOÁN NPS (điểm từ 0-100)
230
+
231
+ Định dạng:
232
+ TOP_ISSUES:
233
+ 1. [vấn đề 1]
234
+ 2. [vấn đề 2]
235
+ ...
236
+
237
+ SUGGESTIONS:
238
+ - [đề xuất 1]
239
+ - [đề xuất 2]
240
+ ...
241
+
242
+ NPS: [số]
243
+ """
244
+
245
+ response = self.llm(
246
+ prompt,
247
+ max_tokens=512,
248
+ temperature=0.7
249
+ )
250
+
251
+ generated = response['choices'][0]['text']
252
+
253
+ # Parse
254
+ top_issues = []
255
+ suggestions = []
256
+ predicted_nps = 60.0
257
+
258
+ lines = generated.split('\n')
259
+ current_section = None
260
+
261
+ for line in lines:
262
+ line = line.strip()
263
+ if line.startswith("TOP_ISSUES:"):
264
+ current_section = "issues"
265
+ elif line.startswith("SUGGESTIONS:"):
266
+ current_section = "suggestions"
267
+ elif line.startswith("NPS:"):
268
+ try:
269
+ predicted_nps = float(line.split(":")[1].strip())
270
+ except:
271
+ pass
272
+ elif current_section == "issues" and (line.startswith("-") or line[0].isdigit()):
273
+ issue = line.lstrip("0123456789.-) ").strip()
274
+ if issue:
275
+ top_issues.append(issue)
276
+ elif current_section == "suggestions" and line.startswith("-"):
277
+ suggestion = line.lstrip("- ").strip()
278
+ if suggestion:
279
+ suggestions.append(suggestion)
280
+
281
+ # Create summary
282
+ total_comments = db.sentiment_results.count_documents({"event_code": self.event_code})
283
+ negative_pct = (len(negative_results) / total_comments * 100) if total_comments > 0 else 0
284
+
285
+ summary = f"Sự kiện nhận được {total_comments} phản hồi, trong đó {len(negative_results)} ({negative_pct:.1f}%) phản hồi tiêu cực. "
286
+ if top_issues:
287
+ summary += f"Vấn đề chính: {top_issues[0]}."
288
+
289
+ return AIInsights(
290
+ summary=summary,
291
+ top_issues=top_issues[:5],
292
+ improvement_suggestions=suggestions[:5],
293
+ predicted_nps=predicted_nps
294
+ )
295
+
296
+ def update_sentiment_summary_with_insights(self):
297
+ """
298
+ Generate and update EventSentimentSummary with AI insights.
299
+ """
300
+ print("=" * 60)
301
+ print(f"🚀 Generating Insights for Event: {self.event_code}")
302
+ print("=" * 60)
303
+
304
+ try:
305
+ insights = self.generate_insights_from_sentiment()
306
+
307
+ # Update summary
308
+ db.event_sentiment_summary.update_one(
309
+ {"event_code": self.event_code},
310
+ {"$set": {
311
+ "ai_insights": insights.dict(),
312
+ "last_updated": datetime.utcnow()
313
+ }},
314
+ upsert=True
315
+ )
316
+
317
+ print("✓ Insights generated:")
318
+ print(f" Top Issues: {len(insights.top_issues)}")
319
+ print(f" Suggestions: {len(insights.improvement_suggestions)}")
320
+ print(f" Predicted NPS: {insights.predicted_nps}")
321
+
322
+ print("=" * 60)
323
+ print("✅ Insights Complete!")
324
+ print("=" * 60)
325
+
326
+ return insights
327
+
328
+ except Exception as e:
329
+ monitor.log_error("genai_insights", e, {
330
+ "event_code": self.event_code
331
+ })
332
+ raise
services/model_registry.py ADDED
@@ -0,0 +1,191 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Model Registry & Versioning Module
3
+ Author: AI Generated
4
+ Created: 2025-11-24
5
+ Purpose: Track and version AI models and configurations
6
+ """
7
+
8
+ import pickle
9
+ import json
10
+ from datetime import datetime
11
+ from pathlib import Path
12
+ from typing import Any, Dict, Optional
13
+ import hashlib
14
+
15
+ from database import db
16
+
17
+
18
+ class ModelRegistry:
19
+ """
20
+ Manage model versions and configurations.
21
+ Stores models locally and metadata in MongoDB.
22
+ """
23
+
24
+ def __init__(self, storage_dir: str = "./model_storage"):
25
+ self.storage_dir = Path(storage_dir)
26
+ self.storage_dir.mkdir(exist_ok=True)
27
+ self.collection = "ModelRegistry"
28
+
29
+ def _generate_version_id(self, model_name: str) -> str:
30
+ """Generate a unique version ID based on timestamp."""
31
+ timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
32
+ return f"{model_name}_v{timestamp}"
33
+
34
+ def _calculate_hash(self, file_path: Path) -> str:
35
+ """Calculate MD5 hash of model file for integrity check."""
36
+ md5_hash = hashlib.md5()
37
+ with open(file_path, "rb") as f:
38
+ for chunk in iter(lambda: f.read(4096), b""):
39
+ md5_hash.update(chunk)
40
+ return md5_hash.hexdigest()
41
+
42
+ def save_model(self, model: Any, model_name: str, metadata: Dict = None) -> str:
43
+ """
44
+ Save a model with versioning.
45
+
46
+ Args:
47
+ model: The model object (sklearn, torch, etc.)
48
+ model_name: Name identifier (e.g., "kmeans_segmentation")
49
+ metadata: Additional info (params, metrics, etc.)
50
+
51
+ Returns:
52
+ version_id: Unique version identifier
53
+ """
54
+ version_id = self._generate_version_id(model_name)
55
+ model_path = self.storage_dir / f"{version_id}.pkl"
56
+
57
+ # Save model file
58
+ with open(model_path, 'wb') as f:
59
+ pickle.dump(model, f)
60
+
61
+ # Calculate file hash
62
+ file_hash = self._calculate_hash(model_path)
63
+
64
+ # Save metadata to MongoDB
65
+ doc = {
66
+ "version_id": version_id,
67
+ "model_name": model_name,
68
+ "file_path": str(model_path),
69
+ "file_hash": file_hash,
70
+ "file_size": model_path.stat().st_size,
71
+ "created_at": datetime.utcnow(),
72
+ "metadata": metadata or {},
73
+ "status": "active"
74
+ }
75
+
76
+ db.get_collection(self.collection).insert_one(doc)
77
+
78
+ print(f"✓ Saved model: {version_id}")
79
+ return version_id
80
+
81
+ def load_model(self, version_id: str = None, model_name: str = None) -> tuple:
82
+ """
83
+ Load a model by version_id or latest version of model_name.
84
+
85
+ Returns: (model, metadata)
86
+ """
87
+ if version_id:
88
+ doc = db.get_collection(self.collection).find_one({"version_id": version_id})
89
+ elif model_name:
90
+ # Get latest version
91
+ doc = db.get_collection(self.collection).find_one(
92
+ {"model_name": model_name, "status": "active"},
93
+ sort=[("created_at", -1)]
94
+ )
95
+ else:
96
+ raise ValueError("Must provide version_id or model_name")
97
+
98
+ if not doc:
99
+ raise ValueError("Model not found")
100
+
101
+ # Load model file
102
+ model_path = Path(doc["file_path"])
103
+
104
+ # Verify integrity
105
+ current_hash = self._calculate_hash(model_path)
106
+ if current_hash != doc["file_hash"]:
107
+ raise ValueError("Model file corrupted (hash mismatch)")
108
+
109
+ with open(model_path, 'rb') as f:
110
+ model = pickle.load(f)
111
+
112
+ print(f"✓ Loaded model: {doc['version_id']}")
113
+ return model, doc["metadata"]
114
+
115
+ def save_prompt_template(self, template_name: str, prompt: str, metadata: Dict = None) -> str:
116
+ """
117
+ Save a prompt template with versioning.
118
+ """
119
+ version_id = self._generate_version_id(template_name)
120
+
121
+ # Save to file
122
+ template_path = self.storage_dir / f"{version_id}.txt"
123
+ with open(template_path, 'w', encoding='utf-8') as f:
124
+ f.write(prompt)
125
+
126
+ # Save metadata
127
+ doc = {
128
+ "version_id": version_id,
129
+ "template_name": template_name,
130
+ "file_path": str(template_path),
131
+ "created_at": datetime.utcnow(),
132
+ "metadata": metadata or {},
133
+ "status": "active"
134
+ }
135
+
136
+ db.get_collection("PromptTemplates").insert_one(doc)
137
+
138
+ print(f"✓ Saved prompt template: {version_id}")
139
+ return version_id
140
+
141
+ def load_prompt_template(self, version_id: str = None, template_name: str = None) -> str:
142
+ """
143
+ Load a prompt template.
144
+ """
145
+ if version_id:
146
+ doc = db.get_collection("PromptTemplates").find_one({"version_id": version_id})
147
+ elif template_name:
148
+ doc = db.get_collection("PromptTemplates").find_one(
149
+ {"template_name": template_name, "status": "active"},
150
+ sort=[("created_at", -1)]
151
+ )
152
+ else:
153
+ raise ValueError("Must provide version_id or template_name")
154
+
155
+ if not doc:
156
+ raise ValueError("Template not found")
157
+
158
+ with open(doc["file_path"], 'r', encoding='utf-8') as f:
159
+ prompt = f.read()
160
+
161
+ return prompt
162
+
163
+ def list_versions(self, model_name: str) -> list:
164
+ """
165
+ List all versions of a model.
166
+ """
167
+ versions = list(db.get_collection(self.collection).find(
168
+ {"model_name": model_name},
169
+ sort=[("created_at", -1)]
170
+ ))
171
+
172
+ return [{
173
+ "version_id": v["version_id"],
174
+ "created_at": v["created_at"],
175
+ "status": v["status"],
176
+ "metadata": v.get("metadata", {})
177
+ } for v in versions]
178
+
179
+ def archive_version(self, version_id: str):
180
+ """
181
+ Archive (deactivate) a model version.
182
+ """
183
+ db.get_collection(self.collection).update_one(
184
+ {"version_id": version_id},
185
+ {"$set": {"status": "archived"}}
186
+ )
187
+ print(f"✓ Archived model: {version_id}")
188
+
189
+
190
+ # Global registry instance
191
+ registry = ModelRegistry()
services/monitoring.py ADDED
@@ -0,0 +1,232 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Logging & Monitoring Module
3
+ Author: AI Generated
4
+ Created: 2025-11-24
5
+ Purpose: Track pipeline performance, errors, and model drift
6
+ """
7
+
8
+ import logging
9
+ from datetime import datetime
10
+ from typing import Dict, Any, Optional
11
+ import json
12
+ from pathlib import Path
13
+ import numpy as np
14
+
15
+ from database import db
16
+
17
+
18
+ # Configure logging
19
+ LOG_DIR = Path("logs")
20
+ LOG_DIR.mkdir(exist_ok=True)
21
+
22
+ logging.basicConfig(
23
+ level=logging.INFO,
24
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
25
+ handlers=[
26
+ logging.FileHandler(LOG_DIR / 'pipeline.log'),
27
+ logging.StreamHandler()
28
+ ]
29
+ )
30
+
31
+ logger = logging.getLogger(__name__)
32
+
33
+
34
+ class PipelineMonitor:
35
+ """
36
+ Monitor AI pipeline performance and log metrics.
37
+ """
38
+
39
+ def __init__(self):
40
+ self.metrics_collection = "PipelineMetrics"
41
+
42
+ def log_segmentation_run(self, metrics: Dict[str, Any]):
43
+ """
44
+ Log segmentation pipeline metrics.
45
+
46
+ Metrics should include:
47
+ - n_users: Number of users processed
48
+ - n_segments: Number of segments created
49
+ - inertia: K-means inertia
50
+ - execution_time: Time in seconds
51
+ - outliers_removed: Count
52
+ """
53
+ logger.info(f"Segmentation Run: {metrics}")
54
+
55
+ # Save to MongoDB for trend analysis
56
+ doc = {
57
+ "pipeline": "segmentation",
58
+ "timestamp": datetime.utcnow(),
59
+ "metrics": metrics
60
+ }
61
+ db.get_collection(self.metrics_collection).insert_one(doc)
62
+
63
+ def log_sentiment_run(self, metrics: Dict[str, Any]):
64
+ """
65
+ Log sentiment analysis metrics.
66
+
67
+ Metrics should include:
68
+ - n_comments: Number of comments analyzed
69
+ - sentiment_distribution: {Positive: X, Negative: Y, Neutral: Z}
70
+ - avg_confidence: Average confidence score
71
+ - execution_time: Time in seconds
72
+ """
73
+ logger.info(f"Sentiment Analysis Run: {metrics}")
74
+
75
+ doc = {
76
+ "pipeline": "sentiment",
77
+ "timestamp": datetime.utcnow(),
78
+ "metrics": metrics
79
+ }
80
+ db.get_collection(self.metrics_collection).insert_one(doc)
81
+
82
+ def log_genai_run(self, task: str, metrics: Dict[str, Any]):
83
+ """
84
+ Log Generative AI metrics.
85
+
86
+ Metrics should include:
87
+ - n_generated: Number of items generated
88
+ - avg_generation_time: Average time per item
89
+ - total_time: Total execution time
90
+ """
91
+ logger.info(f"GenAI Run ({task}): {metrics}")
92
+
93
+ doc = {
94
+ "pipeline": "genai",
95
+ "task": task,
96
+ "timestamp": datetime.utcnow(),
97
+ "metrics": metrics
98
+ }
99
+ db.get_collection(self.metrics_collection).insert_one(doc)
100
+
101
+ def log_error(self, pipeline: str, error: Exception, context: Dict = None):
102
+ """
103
+ Log pipeline errors.
104
+ """
105
+ logger.error(f"Error in {pipeline}: {str(error)}", exc_info=True)
106
+
107
+ doc = {
108
+ "pipeline": pipeline,
109
+ "timestamp": datetime.utcnow(),
110
+ "error": str(error),
111
+ "error_type": type(error).__name__,
112
+ "context": context or {}
113
+ }
114
+ db.get_collection("PipelineErrors").insert_one(doc)
115
+
116
+ def detect_drift_segmentation(self, current_centroids: np.ndarray) -> Dict:
117
+ """
118
+ Detect drift in K-means clustering.
119
+ Compare current centroids with previous run.
120
+ """
121
+ # Fetch last run's centroids
122
+ last_metric = db.get_collection(self.metrics_collection).find_one(
123
+ {"pipeline": "segmentation"},
124
+ sort=[("timestamp", -1)]
125
+ )
126
+
127
+ if not last_metric or "centroids" not in last_metric["metrics"]:
128
+ logger.info("No previous centroids found for drift detection")
129
+ return {"drift_detected": False, "reason": "no_baseline"}
130
+
131
+ # Calculate drift as Euclidean distance between centroids
132
+ prev_centroids = np.array(last_metric["metrics"]["centroids"])
133
+
134
+ if prev_centroids.shape != current_centroids.shape:
135
+ return {"drift_detected": True, "reason": "shape_mismatch"}
136
+
137
+ # Calculate average distance
138
+ distances = np.linalg.norm(current_centroids - prev_centroids, axis=1)
139
+ avg_drift = float(np.mean(distances))
140
+ max_drift = float(np.max(distances))
141
+
142
+ # Threshold: if average drift > 0.5 std, flag as drift
143
+ drift_detected = avg_drift > 0.5
144
+
145
+ result = {
146
+ "drift_detected": drift_detected,
147
+ "avg_drift": avg_drift,
148
+ "max_drift": max_drift,
149
+ "threshold": 0.5
150
+ }
151
+
152
+ if drift_detected:
153
+ logger.warning(f"⚠️ Cluster drift detected: avg={avg_drift:.3f}, max={max_drift:.3f}")
154
+
155
+ return result
156
+
157
+ def detect_drift_sentiment(self, current_distribution: Dict[str, int]) -> Dict:
158
+ """
159
+ Detect drift in sentiment distribution.
160
+ """
161
+ # Fetch last run's distribution
162
+ last_metric = db.get_collection(self.metrics_collection).find_one(
163
+ {"pipeline": "sentiment"},
164
+ sort=[("timestamp", -1)]
165
+ )
166
+
167
+ if not last_metric:
168
+ return {"drift_detected": False, "reason": "no_baseline"}
169
+
170
+ prev_dist = last_metric["metrics"].get("sentiment_distribution", {})
171
+
172
+ # Calculate total counts
173
+ prev_total = sum(prev_dist.values())
174
+ curr_total = sum(current_distribution.values())
175
+
176
+ if prev_total == 0 or curr_total == 0:
177
+ return {"drift_detected": False, "reason": "insufficient_data"}
178
+
179
+ # Calculate percentage change for each sentiment
180
+ changes = {}
181
+ for label in ["Positive", "Negative", "Neutral"]:
182
+ prev_pct = prev_dist.get(label, 0) / prev_total
183
+ curr_pct = current_distribution.get(label, 0) / curr_total
184
+ changes[label] = abs(curr_pct - prev_pct)
185
+
186
+ # Drift if any sentiment changes > 10%
187
+ max_change = max(changes.values())
188
+ drift_detected = max_change > 0.1
189
+
190
+ result = {
191
+ "drift_detected": drift_detected,
192
+ "changes": changes,
193
+ "max_change": max_change,
194
+ "threshold": 0.1
195
+ }
196
+
197
+ if drift_detected:
198
+ logger.warning(f"⚠️ Sentiment drift detected: max_change={max_change:.1%}")
199
+
200
+ return result
201
+
202
+ def get_performance_summary(self, pipeline: str, days: int = 7) -> Dict:
203
+ """
204
+ Get performance summary for the last N days.
205
+ """
206
+ from datetime import timedelta
207
+
208
+ cutoff = datetime.utcnow() - timedelta(days=days)
209
+
210
+ metrics = list(db.get_collection(self.metrics_collection).find({
211
+ "pipeline": pipeline,
212
+ "timestamp": {"$gte": cutoff}
213
+ }).sort("timestamp", -1))
214
+
215
+ if not metrics:
216
+ return {"error": "No metrics found"}
217
+
218
+ # Aggregate
219
+ total_runs = len(metrics)
220
+ avg_time = np.mean([m["metrics"].get("execution_time", 0) for m in metrics])
221
+
222
+ return {
223
+ "pipeline": pipeline,
224
+ "period_days": days,
225
+ "total_runs": total_runs,
226
+ "avg_execution_time": avg_time,
227
+ "last_run": metrics[0]["timestamp"]
228
+ }
229
+
230
+
231
+ # Global monitor instance
232
+ monitor = PipelineMonitor()
services/preprocessing.py ADDED
@@ -0,0 +1,191 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Data Preprocessing & Cleaning Module
3
+ Author: AI Generated
4
+ Created: 2025-11-24
5
+ Purpose: Clean and preprocess data before AI processing
6
+ """
7
+
8
+ import re
9
+ from typing import List, Dict
10
+ import numpy as np
11
+ from pyvi import ViTokenizer
12
+ from sklearn.preprocessing import StandardScaler
13
+
14
+
15
+ class VietnameseTextCleaner:
16
+ """
17
+ Clean and preprocess Vietnamese text for NLP tasks.
18
+ """
19
+
20
+ # Vietnamese stopwords
21
+ STOP_WORDS = {
22
+ 'và', 'của', 'có', 'là', 'được', 'này', 'cho', 'với', 'các',
23
+ 'đã', 'trong', 'không', 'rất', 'một', 'để', 'những', 'cũng',
24
+ 'về', 'từ', 'hay', 'bị', 'như', 'làm', 'đó', 'lại', 'sẽ',
25
+ 'thì', 'nếu', 'khi', 'mà', 'hoặc', 'nên', 'trên', 'dưới'
26
+ }
27
+
28
+ def __init__(self):
29
+ self.tokenizer = ViTokenizer
30
+
31
+ def clean_text(self, text: str) -> str:
32
+ """
33
+ Clean Vietnamese text:
34
+ - Remove HTML tags
35
+ - Remove special characters
36
+ - Normalize whitespace
37
+ - Lowercase
38
+ """
39
+ if not text:
40
+ return ""
41
+
42
+ # Remove HTML tags
43
+ text = re.sub(r'<[^>]+>', '', text)
44
+
45
+ # Remove URLs
46
+ text = re.sub(r'http\S+|www\.\S+', '', text)
47
+
48
+ # Remove emails
49
+ text = re.sub(r'\S+@\S+', '', text)
50
+
51
+ # Remove special characters (keep Vietnamese)
52
+ text = re.sub(r'[^a-zA-ZàáảãạăắằẵặẳâấầẩẫậèéẻẽẹêếềểễệìíỉĩịòóỏõọôốồổỗộơớờởỡợùúủũụưứừửữựỳýỷỹỵđĐ\s]', ' ', text)
53
+
54
+ # Normalize whitespace
55
+ text = re.sub(r'\s+', ' ', text).strip()
56
+
57
+ # Lowercase
58
+ text = text.lower()
59
+
60
+ return text
61
+
62
+ def tokenize(self, text: str) -> List[str]:
63
+ """
64
+ Tokenize Vietnamese text using pyvi.
65
+ Returns list of words.
66
+ """
67
+ text = self.clean_text(text)
68
+ if not text:
69
+ return []
70
+
71
+ # Use pyvi for Vietnamese word segmentation
72
+ tokenized = self.tokenizer.tokenize(text)
73
+ words = tokenized.split()
74
+
75
+ return words
76
+
77
+ def remove_stopwords(self, words: List[str]) -> List[str]:
78
+ """
79
+ Remove Vietnamese stopwords.
80
+ """
81
+ return [w for w in words if w not in self.STOP_WORDS]
82
+
83
+ def preprocess_for_sentiment(self, text: str) -> str:
84
+ """
85
+ Preprocess text for PhoBERT sentiment analysis.
86
+ PhoBERT expects word-segmented text.
87
+ """
88
+ # Clean and tokenize
89
+ words = self.tokenize(text)
90
+
91
+ # Join back with spaces (word-segmented format)
92
+ return ' '.join(words)
93
+
94
+ def extract_keywords(self, text: str, top_n: int = 5) -> List[str]:
95
+ """
96
+ Extract keywords from text.
97
+ Simple TF approach without stopwords.
98
+ """
99
+ words = self.tokenize(text)
100
+ words = self.remove_stopwords(words)
101
+
102
+ # Count frequency
103
+ word_freq = {}
104
+ for word in words:
105
+ if len(word) > 2: # Filter very short words
106
+ word_freq[word] = word_freq.get(word, 0) + 1
107
+
108
+ # Get top N
109
+ top_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:top_n]
110
+ return [word[0] for word in top_words]
111
+
112
+
113
+ class DataCleaner:
114
+ """
115
+ Clean and validate user feature data for clustering.
116
+ """
117
+
118
+ def __init__(self):
119
+ self.scaler = StandardScaler()
120
+
121
+ def remove_outliers(self, data: np.ndarray, threshold: float = 3.0) -> tuple:
122
+ """
123
+ Remove outliers using Z-score method.
124
+ Returns: (cleaned_data, valid_indices)
125
+ """
126
+ # Calculate z-scores
127
+ z_scores = np.abs((data - data.mean(axis=0)) / data.std(axis=0))
128
+
129
+ # Find rows without extreme outliers
130
+ valid_indices = np.where(np.all(z_scores < threshold, axis=1))[0]
131
+
132
+ cleaned_data = data[valid_indices]
133
+
134
+ removed_count = len(data) - len(cleaned_data)
135
+ if removed_count > 0:
136
+ print(f" ⚠ Removed {removed_count} outliers ({removed_count/len(data)*100:.1f}%)")
137
+
138
+ return cleaned_data, valid_indices
139
+
140
+ def handle_missing_values(self, data: np.ndarray) -> np.ndarray:
141
+ """
142
+ Handle missing values (NaN, inf) by replacing with median.
143
+ """
144
+ # Replace inf with NaN
145
+ data = np.where(np.isinf(data), np.nan, data)
146
+
147
+ # Replace NaN with column median
148
+ col_median = np.nanmedian(data, axis=0)
149
+ inds = np.where(np.isnan(data))
150
+ data[inds] = np.take(col_median, inds[1])
151
+
152
+ return data
153
+
154
+ def normalize_features(self, data: np.ndarray, fit: bool = True) -> np.ndarray:
155
+ """
156
+ Standardize features using StandardScaler.
157
+
158
+ Args:
159
+ data: Feature matrix
160
+ fit: If True, fit scaler. If False, use existing scaler.
161
+ """
162
+ if fit:
163
+ normalized = self.scaler.fit_transform(data)
164
+ else:
165
+ normalized = self.scaler.transform(data)
166
+
167
+ return normalized
168
+
169
+ def clean_user_features(self, feature_matrix: np.ndarray, remove_outliers: bool = True) -> tuple:
170
+ """
171
+ Complete cleaning pipeline for user features.
172
+
173
+ Returns: (cleaned_features, valid_indices)
174
+ """
175
+ print("🔄 Cleaning user feature data...")
176
+
177
+ # Step 1: Handle missing values
178
+ data = self.handle_missing_values(feature_matrix)
179
+ print(f" ✓ Handled missing values")
180
+
181
+ # Step 2: Remove outliers (optional)
182
+ if remove_outliers:
183
+ data, valid_indices = self.remove_outliers(data)
184
+ else:
185
+ valid_indices = np.arange(len(data))
186
+
187
+ # Step 3: Normalize
188
+ data = self.normalize_features(data, fit=True)
189
+ print(f" ✓ Normalized {data.shape[0]} samples")
190
+
191
+ return data, valid_indices
services/segmentation_service.py ADDED
@@ -0,0 +1,292 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Event-Centric Audience Segmentation Service
3
+ Author: AI Generated
4
+ Created: 2025-11-24 (Refactored for event-centric)
5
+ Purpose: Cluster users per event and save to EventAudienceSegment
6
+ """
7
+
8
+ import numpy as np
9
+ from typing import List, Dict, Tuple
10
+ from datetime import datetime
11
+ from sklearn.cluster import KMeans
12
+ from bson import ObjectId
13
+
14
+ from database import db
15
+ from config import settings
16
+ from models.event_models import EventAudienceSegment, MarketingContent
17
+ from services.data_aggregation import UserDataAggregator
18
+ from services.preprocessing import DataCleaner
19
+ from services.monitoring import monitor
20
+ from services.model_registry import registry
21
+
22
+
23
+ class SegmentationService:
24
+ """
25
+ Event-centric user segmentation via K-Means clustering.
26
+ """
27
+
28
+ def __init__(self, event_code: str, n_clusters: int = None):
29
+ """
30
+ Initialize segmentation for a specific event.
31
+
32
+ Args:
33
+ event_code: Event identifier
34
+ n_clusters: Number of segments (default: from settings)
35
+ """
36
+ self.event_code = event_code
37
+ self.n_clusters = n_clusters or settings.N_CLUSTERS
38
+ self.aggregator = UserDataAggregator(event_code)
39
+ self.data_cleaner = DataCleaner()
40
+ self.kmeans = None
41
+ self.scaler = None
42
+ self.feature_names = []
43
+
44
+ def prepare_feature_matrix(self, user_data: List[Dict]) -> Tuple[np.ndarray, List[str]]:
45
+ """
46
+ Convert aggregated user data into feature matrix.
47
+
48
+ Uses hybrid approach:
49
+ - Event-specific: ticket_count, spend, is_follower
50
+ - Global: RFM (user's overall power)
51
+
52
+ Returns: (feature_matrix, user_ids)
53
+ """
54
+ feature_matrix = []
55
+ user_ids = []
56
+
57
+ for user in user_data:
58
+ # Event-specific features
59
+ event_ticket_count = user.get('event_ticket_count', 0)
60
+ event_total_spend = user.get('event_total_spend', 0)
61
+ is_follower = user.get('is_follower', 0)
62
+
63
+ # Global RFM (user power)
64
+ global_recency = user.get('global_recency', 999999)
65
+ global_frequency = user.get('global_frequency', 0)
66
+ global_monetary = user.get('global_monetary', 0)
67
+
68
+ # Combine features
69
+ features = [
70
+ event_ticket_count,
71
+ event_total_spend,
72
+ is_follower,
73
+ global_recency,
74
+ global_frequency,
75
+ global_monetary
76
+ ]
77
+
78
+ feature_matrix.append(features)
79
+ user_ids.append(str(user['user_id']))
80
+
81
+ # Store feature names
82
+ self.feature_names = [
83
+ 'event_tickets',
84
+ 'event_spend',
85
+ 'is_follower',
86
+ 'global_recency',
87
+ 'global_frequency',
88
+ 'global_monetary'
89
+ ]
90
+
91
+ return np.array(feature_matrix), user_ids
92
+
93
+ def fit_clustering(self, feature_matrix: np.ndarray) -> Tuple[KMeans, List[int]]:
94
+ """
95
+ Fit K-Means with preprocessing.
96
+ """
97
+ # Clean and normalize
98
+ normalized_features, valid_indices = self.data_cleaner.clean_user_features(feature_matrix)
99
+
100
+ # Save scaler for later use
101
+ self.scaler = self.data_cleaner.scaler
102
+
103
+ print(f"🔄 Fitting K-Means with {self.n_clusters} clusters...")
104
+ self.kmeans = KMeans(
105
+ n_clusters=self.n_clusters,
106
+ random_state=settings.RANDOM_STATE,
107
+ n_init=10
108
+ )
109
+ self.kmeans.fit(normalized_features)
110
+
111
+ print(f"✓ Clustering complete. Inertia: {self.kmeans.inertia_:.2f}")
112
+
113
+ return self.kmeans, valid_indices
114
+
115
+ def interpret_cluster(self, cluster_id: int) -> Dict:
116
+ """
117
+ Interpret cluster characteristics.
118
+ """
119
+ centroid = self.kmeans.cluster_centers_[cluster_id]
120
+ centroid_original = self.scaler.inverse_transform([centroid])[0]
121
+
122
+ interpretation = {}
123
+ for i, feature_name in enumerate(self.feature_names):
124
+ interpretation[feature_name] = float(centroid_original[i])
125
+
126
+ # Generate segment name
127
+ event_spend = interpretation.get('event_spend', 0)
128
+ event_tickets = interpretation.get('event_tickets', 0)
129
+ global_monetary = interpretation.get('global_monetary', 0)
130
+ is_follower = interpretation.get('is_follower', 0)
131
+
132
+ segment_name = self._generate_segment_name(
133
+ event_spend, event_tickets, global_monetary, is_follower
134
+ )
135
+
136
+ return {
137
+ "segment_name": segment_name,
138
+ "criteria": interpretation,
139
+ "cluster_id": cluster_id
140
+ }
141
+
142
+ def _generate_segment_name(
143
+ self,
144
+ event_spend: float,
145
+ event_tickets: float,
146
+ global_monetary: float,
147
+ is_follower: float
148
+ ) -> str:
149
+ """Generate Vietnamese segment name."""
150
+
151
+ # High spenders on this event
152
+ if event_spend > 500000 and event_tickets > 2:
153
+ return "Khách Hàng VIP Sự Kiện"
154
+
155
+ # Bought tickets but moderate spend
156
+ elif event_tickets > 0 and event_spend > 100000:
157
+ return "Khách Hàng Tích Cực"
158
+
159
+ # Only followers, no tickets yet
160
+ elif is_follower > 0.5 and event_tickets == 0:
161
+ return "Người Theo Dõi Tiềm Năng"
162
+
163
+ # High global value but low event engagement
164
+ elif global_monetary > 1000000 and event_spend < 100000:
165
+ return "Khách Hàng Chưa Khai Phá"
166
+
167
+ # Low event engagement
168
+ else:
169
+ return "Khách Hàng Ít Tương Tác"
170
+
171
+ def save_segments_to_db(
172
+ self,
173
+ cluster_interpretations: List[Dict],
174
+ user_ids: List[str],
175
+ labels: np.ndarray
176
+ ) -> List[ObjectId]:
177
+ """
178
+ Save to EventAudienceSegment collection.
179
+ """
180
+ print("🔄 Saving event segments to database...")
181
+
182
+ segment_ids = []
183
+
184
+ for cluster_info in cluster_interpretations:
185
+ cluster_id = cluster_info['cluster_id']
186
+
187
+ # Get user_ids in this cluster
188
+ cluster_user_indices = np.where(labels == cluster_id)[0]
189
+ cluster_user_ids = [ObjectId(user_ids[i]) for i in cluster_user_indices]
190
+
191
+ segment = EventAudienceSegment(
192
+ event_code=self.event_code,
193
+ segment_name=cluster_info['segment_name'],
194
+ segment_type=cluster_info['segment_name'], # Can categorize further
195
+ user_count=len(cluster_user_ids),
196
+ user_ids=cluster_user_ids,
197
+ criteria=cluster_info['criteria'],
198
+ marketing_content=None, # Will be generated by GenAI
199
+ created_at=datetime.utcnow(),
200
+ last_updated=datetime.utcnow()
201
+ )
202
+
203
+ result = db.event_audience_segments.insert_one(
204
+ segment.dict(by_alias=True, exclude={'id'})
205
+ )
206
+ segment_ids.append(result.inserted_id)
207
+
208
+ print(f" ✓ '{segment.segment_name}': {len(cluster_user_ids)} users")
209
+
210
+ return segment_ids
211
+
212
+ def run_segmentation(self) -> List[ObjectId]:
213
+ """
214
+ Execute event-centric segmentation pipeline.
215
+ """
216
+ import time
217
+ start_time = time.time()
218
+
219
+ print("=" * 60)
220
+ print(f"🚀 Segmenting Event: {self.event_code}")
221
+ print("=" * 60)
222
+
223
+ try:
224
+ # Step 1: Aggregate event users
225
+ user_data = self.aggregator.aggregate_user_features()
226
+
227
+ if len(user_data) < self.n_clusters:
228
+ print(f"⚠ Not enough users ({len(user_data)}) for {self.n_clusters} clusters")
229
+ return []
230
+
231
+ # Step 2: Prepare features
232
+ feature_matrix, user_ids = self.prepare_feature_matrix(user_data)
233
+ print(f"✓ Feature matrix: {feature_matrix.shape}")
234
+
235
+ # Step 3: Clustering
236
+ self.kmeans, valid_indices = self.fit_clustering(feature_matrix)
237
+ user_ids = [user_ids[i] for i in valid_indices]
238
+
239
+ # Step 4: Get labels
240
+ normalized_features = self.scaler.transform(feature_matrix[valid_indices])
241
+ labels = self.kmeans.labels_
242
+
243
+ # Step 5: Interpret clusters
244
+ cluster_interpretations = [
245
+ self.interpret_cluster(i) for i in range(self.n_clusters)
246
+ ]
247
+
248
+ # Step 6: Save to EventAudienceSegment
249
+ segment_ids = self.save_segments_to_db(
250
+ cluster_interpretations,
251
+ user_ids,
252
+ labels
253
+ )
254
+
255
+ # Step 7: Save model
256
+ metadata = {
257
+ "event_code": self.event_code,
258
+ "n_clusters": self.n_clusters,
259
+ "n_users": len(user_ids),
260
+ "inertia": float(self.kmeans.inertia_)
261
+ }
262
+ registry.save_model(
263
+ self.kmeans,
264
+ f"kmeans_{self.event_code}",
265
+ metadata
266
+ )
267
+
268
+ # Step 8: Monitoring
269
+ execution_time = time.time() - start_time
270
+ metrics = {
271
+ "event_code": self.event_code,
272
+ "n_users": len(user_ids),
273
+ "n_segments": self.n_clusters,
274
+ "inertia": float(self.kmeans.inertia_),
275
+ "execution_time": execution_time,
276
+ "centroids": self.kmeans.cluster_centers_.tolist()
277
+ }
278
+ monitor.log_segmentation_run(metrics)
279
+
280
+ print("=" * 60)
281
+ print("✅ Segmentation Complete!")
282
+ print(f"⏱️ Time: {execution_time:.2f}s")
283
+ print("=" * 60)
284
+
285
+ return segment_ids
286
+
287
+ except Exception as e:
288
+ monitor.log_error("segmentation", e, {
289
+ "event_code": self.event_code,
290
+ "n_clusters": self.n_clusters
291
+ })
292
+ raise
services/sentiment_service.py ADDED
@@ -0,0 +1,220 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Event-Centric Sentiment Analysis Service
3
+ Author: AI Generated
4
+ Created: 2025-11-24 (Refactored)
5
+ Purpose: Analyze sentiment for event comments and generate summary
6
+ """
7
+
8
+ import torch
9
+ from transformers import AutoTokenizer, AutoModelForSequenceClassification
10
+ from typing import Tuple, List, Dict
11
+ from datetime import datetime
12
+ from bson import ObjectId
13
+
14
+ from database import db
15
+ from config import settings
16
+ from models.sentiment_models import SentimentAnalysisResult
17
+ from models.event_models import EventSentimentSummary, AIInsights
18
+ from services.preprocessing import VietnameseTextCleaner
19
+ from services.monitoring import monitor
20
+
21
+
22
+ class SentimentAnalysisService:
23
+ """
24
+ Event-centric sentiment analysis using PhoBERT.
25
+ """
26
+
27
+ def __init__(self, event_code: str):
28
+ """
29
+ Initialize for a specific event.
30
+
31
+ Args:
32
+ event_code: Event identifier
33
+ """
34
+ self.event_code = event_code
35
+ self.model_name = settings.SENTIMENT_MODEL
36
+ self.tokenizer = None
37
+ self.model = None
38
+ self.device = "cuda" if torch.cuda.is_available() else "cpu"
39
+ self.label_map = {0: "Negative", 1: "Positive", 2: "Neutral"}
40
+ self.text_cleaner = VietnameseTextCleaner()
41
+
42
+ def load_model(self):
43
+ """Load PhoBERT model"""
44
+ print(f"🔄 Loading sentiment model: {self.model_name}")
45
+
46
+ token = settings.HF_TOKEN if settings.HF_TOKEN else None
47
+
48
+ self.tokenizer = AutoTokenizer.from_pretrained(
49
+ self.model_name,
50
+ token=token
51
+ )
52
+ self.model = AutoModelForSequenceClassification.from_pretrained(
53
+ self.model_name,
54
+ token=token
55
+ )
56
+ self.model.to(self.device)
57
+ self.model.eval()
58
+ print(f"✓ Model loaded on {self.device}")
59
+
60
+ def analyze_text(self, text: str) -> Tuple[str, float]:
61
+ """Analyze single text with preprocessing."""
62
+ if not self.model:
63
+ self.load_model()
64
+
65
+ # Preprocess
66
+ preprocessed = self.text_cleaner.preprocess_for_sentiment(text)
67
+ if not preprocessed:
68
+ return "Neutral", 0.5
69
+
70
+ # Tokenize
71
+ inputs = self.tokenizer(
72
+ preprocessed,
73
+ return_tensors="pt",
74
+ truncation=True,
75
+ max_length=256,
76
+ padding=True
77
+ ).to(self.device)
78
+
79
+ # Predict
80
+ with torch.no_grad():
81
+ outputs = self.model(**inputs)
82
+ logits = outputs.logits
83
+ probs = torch.softmax(logits, dim=-1)
84
+ predicted_class = torch.argmax(probs, dim=-1).item()
85
+ confidence = probs[0][predicted_class].item()
86
+
87
+ sentiment_label = self.label_map.get(predicted_class, "Neutral")
88
+ return sentiment_label, confidence
89
+
90
+ def analyze_event_comments(self) -> Dict:
91
+ """
92
+ Analyze all comments for this event.
93
+
94
+ Returns summary statistics.
95
+ """
96
+ import time
97
+ start_time = time.time()
98
+
99
+ print("=" * 60)
100
+ print(f"🚀 Analyzing Sentiment for Event: {self.event_code}")
101
+ print("=" * 60)
102
+
103
+ try:
104
+ if not self.model:
105
+ self.load_model()
106
+
107
+ # Fetch comments for THIS EVENT only
108
+ comments = list(db.user_comment_post.find({
109
+ "EventCode": self.event_code
110
+ }).limit(1000))
111
+
112
+ print(f"✓ Found {len(comments)} comments for this event")
113
+
114
+ if not comments:
115
+ print("⚠ No comments to analyze")
116
+ return {}
117
+
118
+ # Analyze each
119
+ results_to_save = []
120
+ sentiment_counts = {"Positive": 0, "Negative": 0, "Neutral": 0}
121
+ total_confidence = 0
122
+ all_keywords = []
123
+
124
+ for comment in comments:
125
+ text = comment.get('CommentText', '')
126
+ if not text:
127
+ continue
128
+
129
+ sentiment, confidence = self.analyze_text(text)
130
+ keywords = self.text_cleaner.extract_keywords(text, top_n=3)
131
+
132
+ # Save individual result
133
+ result = SentimentAnalysisResult(
134
+ source_id=comment['_id'],
135
+ source_type="UserCommentPost",
136
+ event_code=self.event_code, # NEW: link to event
137
+ sentiment_label=sentiment,
138
+ confidence_score=confidence,
139
+ key_phrases=keywords,
140
+ analyzed_at=datetime.utcnow()
141
+ )
142
+
143
+ results_to_save.append(result.dict(by_alias=True, exclude={'id'}))
144
+
145
+ # Update counts
146
+ sentiment_counts[sentiment] += 1
147
+ total_confidence += confidence
148
+ all_keywords.extend(keywords)
149
+
150
+ # Bulk insert results
151
+ if results_to_save:
152
+ db.sentiment_results.insert_many(results_to_save)
153
+ print(f"✓ Saved {len(results_to_save)} sentiment results")
154
+
155
+ # Calculate summary
156
+ avg_confidence = total_confidence / len(results_to_save) if results_to_save else 0
157
+
158
+ # Get top keywords
159
+ keyword_freq = {}
160
+ for kw in all_keywords:
161
+ keyword_freq[kw] = keyword_freq.get(kw, 0) + 1
162
+ top_keywords = sorted(
163
+ keyword_freq.items(),
164
+ key=lambda x: x[1],
165
+ reverse=True
166
+ )[:10]
167
+ top_keywords = [kw[0] for kw in top_keywords]
168
+
169
+ # Save summary
170
+ summary = EventSentimentSummary(
171
+ event_code=self.event_code,
172
+ total_comments=len(results_to_save),
173
+ sentiment_distribution=sentiment_counts,
174
+ avg_confidence=avg_confidence,
175
+ top_keywords=top_keywords,
176
+ ai_insights=None, # Will be filled by GenAI
177
+ last_updated=datetime.utcnow()
178
+ )
179
+
180
+ db.event_sentiment_summary.update_one(
181
+ {"event_code": self.event_code},
182
+ {"$set": summary.dict(by_alias=True, exclude={'id'})},
183
+ upsert=True
184
+ )
185
+
186
+ # Print summary
187
+ print("\n📊 Sentiment Distribution:")
188
+ for label, count in sentiment_counts.items():
189
+ pct = (count / len(results_to_save) *100) if results_to_save else 0
190
+ print(f" {label}: {count} ({pct:.1f}%)")
191
+
192
+ # Log metrics
193
+ execution_time = time.time() - start_time
194
+ metrics = {
195
+ "event_code": self.event_code,
196
+ "n_comments": len(results_to_save),
197
+ "sentiment_distribution": sentiment_counts,
198
+ "avg_confidence": avg_confidence,
199
+ "execution_time": execution_time
200
+ }
201
+ monitor.log_sentiment_run(metrics)
202
+
203
+ print("=" * 60)
204
+ print("✅ Sentiment Analysis Complete!")
205
+ print(f"⏱️ Time: {execution_time:.2f}s")
206
+ print("=" * 60)
207
+
208
+ return {
209
+ "total_comments": len(results_to_save),
210
+ "sentiment_distribution": sentiment_counts,
211
+ "avg_confidence": avg_confidence,
212
+ "top_keywords": top_keywords
213
+ }
214
+
215
+ except Exception as e:
216
+ monitor.log_error("sentiment", e, {
217
+ "event_code": self.event_code,
218
+ "model": self.model_name
219
+ })
220
+ raise