""" Event-Centric Audience Segmentation Service Author: AI Generated Created: 2025-11-24 (Refactored for event-centric) Purpose: Cluster users per event and save to EventAudienceSegment """ import numpy as np from typing import List, Dict, Tuple from datetime import datetime from sklearn.cluster import KMeans from bson import ObjectId from database import db from config import settings from models.event_models import EventAudienceSegment, MarketingContent from services.data_aggregation import UserDataAggregator from services.preprocessing import DataCleaner from services.monitoring import monitor from services.model_registry import registry class SegmentationService: """ Event-centric user segmentation via K-Means clustering. """ def __init__(self, event_code: str, n_clusters: int = None): """ Initialize segmentation for a specific event. Args: event_code: Event identifier n_clusters: Number of segments (default: from settings) """ self.event_code = event_code self.n_clusters = n_clusters or settings.N_CLUSTERS self.aggregator = UserDataAggregator(event_code) self.data_cleaner = DataCleaner() self.kmeans = None self.scaler = None self.feature_names = [] def prepare_feature_matrix(self, user_data: List[Dict]) -> Tuple[np.ndarray, List[str]]: """ Convert aggregated user data into feature matrix. Uses hybrid approach: - Event-specific: ticket_count, spend, is_follower - Global: RFM (user's overall power) Returns: (feature_matrix, user_ids) """ feature_matrix = [] user_ids = [] for user in user_data: # Event-specific features event_ticket_count = user.get('event_ticket_count', 0) event_total_spend = user.get('event_total_spend', 0) is_follower = user.get('is_follower', 0) # Global RFM (user power) global_recency = user.get('global_recency', 999999) global_frequency = user.get('global_frequency', 0) global_monetary = user.get('global_monetary', 0) # Combine features features = [ event_ticket_count, event_total_spend, is_follower, global_recency, global_frequency, global_monetary ] feature_matrix.append(features) user_ids.append(str(user['user_id'])) # Store feature names self.feature_names = [ 'event_tickets', 'event_spend', 'is_follower', 'global_recency', 'global_frequency', 'global_monetary' ] return np.array(feature_matrix), user_ids def fit_clustering(self, feature_matrix: np.ndarray) -> Tuple[KMeans, List[int]]: """ Fit K-Means with preprocessing. """ # Clean and normalize normalized_features, valid_indices = self.data_cleaner.clean_user_features(feature_matrix) # Save scaler for later use self.scaler = self.data_cleaner.scaler print(f"🔄 Fitting K-Means with {self.n_clusters} clusters...") self.kmeans = KMeans( n_clusters=self.n_clusters, random_state=settings.RANDOM_STATE, n_init=10 ) self.kmeans.fit(normalized_features) print(f"✓ Clustering complete. Inertia: {self.kmeans.inertia_:.2f}") return self.kmeans, valid_indices def interpret_cluster(self, cluster_id: int) -> Dict: """ Interpret cluster characteristics. """ centroid = self.kmeans.cluster_centers_[cluster_id] centroid_original = self.scaler.inverse_transform([centroid])[0] interpretation = {} for i, feature_name in enumerate(self.feature_names): interpretation[feature_name] = float(centroid_original[i]) # Generate segment name event_spend = interpretation.get('event_spend', 0) event_tickets = interpretation.get('event_tickets', 0) global_monetary = interpretation.get('global_monetary', 0) is_follower = interpretation.get('is_follower', 0) segment_name = self._generate_segment_name( event_spend, event_tickets, global_monetary, is_follower ) return { "segment_name": segment_name, "criteria": interpretation, "cluster_id": cluster_id } def _generate_segment_name( self, event_spend: float, event_tickets: float, global_monetary: float, is_follower: float ) -> str: """Generate Vietnamese segment name.""" # High spenders on this event if event_spend > 500000 and event_tickets > 2: return "Khách Hàng VIP Sự Kiện" # Bought tickets but moderate spend elif event_tickets > 0 and event_spend > 100000: return "Khách Hàng Tích Cực" # Only followers, no tickets yet elif is_follower > 0.5 and event_tickets == 0: return "Người Theo Dõi Tiềm Năng" # High global value but low event engagement elif global_monetary > 1000000 and event_spend < 100000: return "Khách Hàng Chưa Khai Phá" # Low event engagement else: return "Khách Hàng Ít Tương Tác" def save_segments_to_db( self, cluster_interpretations: List[Dict], user_ids: List[str], labels: np.ndarray ) -> List[ObjectId]: """ Save to EventAudienceSegment collection. """ print("🔄 Saving event segments to database...") segment_ids = [] for cluster_info in cluster_interpretations: cluster_id = cluster_info['cluster_id'] # Get user_ids in this cluster cluster_user_indices = np.where(labels == cluster_id)[0] cluster_user_ids = [ObjectId(user_ids[i]) for i in cluster_user_indices] segment = EventAudienceSegment( event_code=self.event_code, segment_name=cluster_info['segment_name'], segment_type=cluster_info['segment_name'], # Can categorize further user_count=len(cluster_user_ids), user_ids=cluster_user_ids, criteria=cluster_info['criteria'], marketing_content=None, # Will be generated by GenAI created_at=datetime.utcnow(), last_updated=datetime.utcnow() ) result = db.event_audience_segments.insert_one( segment.dict(by_alias=True, exclude={'id'}) ) segment_ids.append(result.inserted_id) print(f" ✓ '{segment.segment_name}': {len(cluster_user_ids)} users") return segment_ids def run_segmentation(self) -> List[ObjectId]: """ Execute event-centric segmentation pipeline. """ import time start_time = time.time() print("=" * 60) print(f"🚀 Segmenting Event: {self.event_code}") print("=" * 60) try: # Step 1: Aggregate event users user_data = self.aggregator.aggregate_user_features() if len(user_data) < self.n_clusters: print(f"⚠ Not enough users ({len(user_data)}) for {self.n_clusters} clusters") return [] # Step 2: Prepare features feature_matrix, user_ids = self.prepare_feature_matrix(user_data) print(f"✓ Feature matrix: {feature_matrix.shape}") # Step 3: Clustering self.kmeans, valid_indices = self.fit_clustering(feature_matrix) user_ids = [user_ids[i] for i in valid_indices] # Step 4: Get labels normalized_features = self.scaler.transform(feature_matrix[valid_indices]) labels = self.kmeans.labels_ # Step 5: Interpret clusters cluster_interpretations = [ self.interpret_cluster(i) for i in range(self.n_clusters) ] # Step 6: Save to EventAudienceSegment segment_ids = self.save_segments_to_db( cluster_interpretations, user_ids, labels ) # Step 7: Save model metadata = { "event_code": self.event_code, "n_clusters": self.n_clusters, "n_users": len(user_ids), "inertia": float(self.kmeans.inertia_) } registry.save_model( self.kmeans, f"kmeans_{self.event_code}", metadata ) # Step 8: Monitoring execution_time = time.time() - start_time metrics = { "event_code": self.event_code, "n_users": len(user_ids), "n_segments": self.n_clusters, "inertia": float(self.kmeans.inertia_), "execution_time": execution_time, "centroids": self.kmeans.cluster_centers_.tolist() } monitor.log_segmentation_run(metrics) print("=" * 60) print("✅ Segmentation Complete!") print(f"⏱️ Time: {execution_time:.2f}s") print("=" * 60) return segment_ids except Exception as e: monitor.log_error("segmentation", e, { "event_code": self.event_code, "n_clusters": self.n_clusters }) raise