""" Event-Centric Sentiment Analysis Service Author: AI Generated Created: 2025-11-24 (Fixed for actual MongoDB schema) Purpose: Analyze sentiment for comments nested in PostSocialMedia.Images """ import torch from transformers import AutoTokenizer, AutoModelForSequenceClassification from typing import Tuple, List, Dict from datetime import datetime from bson import ObjectId from database import db from config import settings from models.sentiment_models import SentimentAnalysisResult from models.event_models import EventSentimentSummary, AIInsights from services.preprocessing import VietnameseTextCleaner from services.monitoring import monitor class SentimentAnalysisService: """ Event-centric sentiment analysis using PhoBERT. Comments are nested: PostSocialMedia.Images.UserCommentPosts """ def __init__(self, event_code: str): """ Initialize for a specific event. Args: event_code: Event identifier (ObjectId string) """ self.event_code = event_code self.model_name = settings.SENTIMENT_MODEL self.tokenizer = None self.model = None self.device = "cuda" if torch.cuda.is_available() else "cpu" self.label_map = {0: "Negative", 1: "Positive", 2: "Neutral"} self.text_cleaner = VietnameseTextCleaner() def load_model(self): """Load PhoBERT model""" print(f"šŸ”„ Loading sentiment model: {self.model_name}") token = settings.HF_TOKEN if settings.HF_TOKEN else None self.tokenizer = AutoTokenizer.from_pretrained( self.model_name, token=token ) self.model = AutoModelForSequenceClassification.from_pretrained( self.model_name, token=token ) self.model.to(self.device) self.model.eval() print(f"āœ“ Model loaded on {self.device}") def analyze_text(self, text: str) -> Tuple[str, float]: """Analyze single text""" if not self.model: self.load_model() # Preprocess preprocessed = self.text_cleaner.preprocess_for_sentiment(text) if not preprocessed: return "Neutral", 0.5 # Tokenize inputs = self.tokenizer( preprocessed, return_tensors="pt", truncation=True, max_length=256, padding=True ).to(self.device) # Predict with torch.no_grad(): outputs = self.model(**inputs) logits = outputs.logits probs = torch.softmax(logits, dim=-1) predicted_class = torch.argmax(probs, dim=-1).item() confidence = probs[0][predicted_class].item() sentiment_label = self.label_map.get(predicted_class, "Neutral") return sentiment_label, confidence def extract_comments_from_posts(self) -> List[Dict]: """ Extract all comments from PostSocialMedia for this event. Structure: PostSocialMedia → Images[] → UserCommentPosts[] """ pipeline = [ # Match posts for this event { "$match": { "eventCode": ObjectId(self.event_code) } }, # Unwind images array { "$unwind": { "path": "$images", "preserveNullAndEmptyArrays": False } }, # Unwind UserCommentPosts within each image { "$unwind": { "path": "$images.userCommentPosts", "preserveNullAndEmptyArrays": False } }, # Project the fields we need { "$project": { "post_id": "$_id", "image_id": "$images.imageInPostId", "comment_id": "$images.userCommentPosts.commentId", "user_id": "$images.userCommentPosts.userId", "comment_text": "$images.userCommentPosts.commentText", "commented_at": "$images.userCommentPosts.commentedAt" } }, # Limit for performance { "$limit": 1000 } ] comments = list(db.post_social_media.aggregate(pipeline)) return comments def analyze_event_comments(self) -> Dict: """ Analyze all comments for this event. """ import time start_time = time.time() print("=" * 60) print(f"šŸš€ Analyzing Sentiment for Event: {self.event_code}") print("=" * 60) try: if not self.model: self.load_model() # Extract comments comments = self.extract_comments_from_posts() print(f"āœ“ Found {len(comments)} comments for this event") if not comments: print("⚠ No comments to analyze") return {} # Analyze each results_to_save = [] sentiment_counts = {"Positive": 0, "Negative": 0, "Neutral": 0} total_confidence = 0 all_keywords = [] for comment in comments: text = comment.get('comment_text', '') if not text: continue sentiment, confidence = self.analyze_text(text) keywords = self.text_cleaner.extract_keywords(text, top_n=3) # Save individual result result = SentimentAnalysisResult( source_id=ObjectId(comment['comment_id']), source_type="UserCommentPost", event_code=self.event_code, sentiment_label=sentiment, confidence_score=confidence, key_phrases=keywords, analyzed_at=datetime.utcnow() ) results_to_save.append(result.dict(by_alias=True, exclude={'id'})) # Update counts sentiment_counts[sentiment] += 1 total_confidence += confidence all_keywords.extend(keywords) # Bulk insert if results_to_save: db.sentiment_results.insert_many(results_to_save) print(f"āœ“ Saved {len(results_to_save)} sentiment results") # Calculate summary avg_confidence = total_confidence / len(results_to_save) if results_to_save else 0 # Top keywords keyword_freq = {} for kw in all_keywords: keyword_freq[kw] = keyword_freq.get(kw, 0) + 1 top_keywords = sorted( keyword_freq.items(), key=lambda x: x[1], reverse=True )[:10] top_keywords = [kw[0] for kw in top_keywords] # Save summary summary = EventSentimentSummary( event_code=self.event_code, total_comments=len(results_to_save), sentiment_distribution=sentiment_counts, avg_confidence=avg_confidence, top_keywords=top_keywords, ai_insights=None, last_updated=datetime.utcnow() ) db.event_sentiment_summary.update_one( {"event_code": self.event_code}, {"$set": summary.dict(by_alias=True, exclude={'id'})}, upsert=True ) # Print summary print("\nšŸ“Š Sentiment Distribution:") for label, count in sentiment_counts.items(): pct = (count / len(results_to_save) * 100) if results_to_save else 0 print(f" {label}: {count} ({pct:.1f}%)") # Monitoring execution_time = time.time() - start_time metrics = { "event_code": self.event_code, "n_comments": len(results_to_save), "sentiment_distribution": sentiment_counts, "avg_confidence": avg_confidence, "execution_time": execution_time } monitor.log_sentiment_run(metrics) print("=" * 60) print("āœ… Sentiment Analysis Complete!") print(f"ā±ļø Time: {execution_time:.2f}s") print("=" * 60) return { "total_comments": len(results_to_save), "sentiment_distribution": sentiment_counts, "avg_confidence": avg_confidence, "top_keywords": top_keywords } except Exception as e: monitor.log_error("sentiment", e, { "event_code": self.event_code, "model": self.model_name }) raise