""" Event-Centric Generative AI Service Author: AI Generated Created: 2025-11-24 (Using HuggingFace Inference API) Purpose: Generate marketing content using Vistral-7B-Chat via API """ from huggingface_hub import InferenceClient from typing import Dict, List from datetime import datetime from bson import ObjectId from database import db from config import settings from models.event_models import EventSentimentSummary, AIInsights, MarketingContent from services.monitoring import monitor class GenerativeAIService: """ Event-centric GenAI using Vistral-7B-Chat via HuggingFace Inference API. Much faster and lighter than loading model locally. """ def __init__(self, event_code: str): """ Initialize for a specific event. Args: event_code: Event identifier """ self.event_code = event_code self.model_name = settings.LLM_MODEL_NAME self.client = None # System prompt theo official docs self.system_prompt = ( "Bạn là một trợ lí Tiếng Việt nhiệt tình và trung thực. " "Hãy luôn trả lời một cách hữu ích nhất có thể, đồng thời giữ an toàn.\n" "Câu trả lời của bạn không nên chứa bất kỳ nội dung gây hại, " "phân biệt chủng tộc, phân biệt giới tính, độc hại, nguy hiểm hoặc bất hợp pháp nào." ) def get_client(self): """Get or create InferenceClient""" if not self.client: print(f"🔄 Initializing HuggingFace Inference API") print(f" Model: {self.model_name}") self.client = InferenceClient( model=self.model_name, token=settings.HF_TOKEN if settings.HF_TOKEN else None ) print("✓ Inference client ready!") return self.client def generate_text(self, prompt: str, max_new_tokens: int = 512) -> str: """ Generate text using Vistral via HuggingFace Inference API. Args: prompt: User prompt max_new_tokens: Max tokens to generate Returns: Generated text """ try: client = self.get_client() # Build messages with system prompt messages = [ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": prompt} ] # Call Inference API response = client.chat_completion( messages=messages, max_tokens=max_new_tokens, temperature=0.7, top_p=0.95, ) # Extract generated text generated = response.choices[0].message.content return generated.strip() except Exception as e: print(f"❌ Error calling Inference API: {str(e)}") print(f"⚠️ Returning empty response") return "" def generate_email_for_segment(self, segment: Dict) -> MarketingContent: """ Generate personalized email for a segment. """ # Get event info event = db.event_versions.find_one({"_id": ObjectId(self.event_code)}) event_name = event.get("EventName", "Sự kiện") if event else "Sự kiện" # Build prompt prompt = f"""Bạn là chuyên gia marketing sự kiện. Sự kiện: {event_name} Phân khúc khách hàng: {segment['segment_name']} Đặc điểm phân khúc: - Số vé trung bình: {segment['criteria'].get('event_tickets', 0):.1f} - Chi tiêu trung bình: {segment['criteria'].get('event_spend', 0):,.0f} VNĐ Nhiệm vụ: Tạo email marketing cá nhân hóa cho phân khúc này. Định dạng: SUBJECT: [tiêu đề email ngắn gọn, hấp dẫn] BODY: [Nội dung email bằng tiếng Việt, 2-3 đoạn văn, tập trung vào giá trị cho khách hàng] """ generated = self.generate_text(prompt, max_new_tokens=400) if not generated: return MarketingContent( email_subject=f"Ưu đãi đặc biệt cho {segment['segment_name']}", email_body="Nội dung email sẽ được tạo khi API khả dụng.", status="Draft", generated_at=datetime.utcnow() ) # Parse response lines = generated.split('\n') subject = "" body_lines = [] in_body = False for line in lines: if line.startswith("SUBJECT:"): subject = line.replace("SUBJECT:", "").strip() elif line.startswith("BODY:"): in_body = True elif in_body and line.strip(): body_lines.append(line.strip()) if not subject: subject = f"Ưu đãi đặc biệt cho {segment['segment_name']}" body = "\n\n".join(body_lines) if body_lines else generated return MarketingContent( email_subject=subject, email_body=body, status="Draft", generated_at=datetime.utcnow() ) def generate_emails_for_all_segments(self): """ Generate emails for all segments of this event. """ import time start_time = time.time() print("=" * 60) print(f"🚀 Generating Emails for Event: {self.event_code}") print("=" * 60) try: # Find segments without marketing content segments = list(db.event_audience_segments.find({ "event_code": self.event_code, "marketing_content": None })) if not segments: print("✓ All segments already have marketing content") return print(f"✓ Generating for {len(segments)} segments") for segment in segments: print(f"\n🔄 {segment['segment_name']}...") email_content = self.generate_email_for_segment(segment) # Update segment db.event_audience_segments.update_one( {"_id": segment['_id']}, {"$set": { "marketing_content": email_content.dict(), "last_updated": datetime.utcnow() }} ) print(f" ✓ Subject: {email_content.email_subject[:50]}...") # Monitoring execution_time = time.time() - start_time metrics = { "event_code": self.event_code, "n_generated": len(segments), "total_time": execution_time } monitor.log_genai_run("email_generation", metrics) print("=" * 60) print("✅ Email Generation Complete!") print(f"⏱️ Time: {execution_time:.2f}s") print("=" * 60) except Exception as e: monitor.log_error("genai_email", e, { "event_code": self.event_code }) raise def generate_insights_from_sentiment(self) -> AIInsights: """ Generate AI insights from negative comments. """ # Get negative sentiment results (already analyzed and saved) negative_results = list(db.sentiment_results.find({ "event_code": self.event_code, "sentiment_label": "Negative" }).limit(50)) if not negative_results: return AIInsights( summary="Không có phản hồi tiêu cực.", top_issues=[], improvement_suggestions=[], predicted_nps=70.0 ) # Get original comment texts from PostSocialMedia comment_ids = [ObjectId(r['source_id']) for r in negative_results] # Extract comments from nested structure pipeline = [ {"$unwind": "$images"}, {"$unwind": "$images.userCommentPosts"}, { "$match": { "images.userCommentPosts.commentId": {"$in": comment_ids} } }, { "$project": { "comment_text": "$images.userCommentPosts.commentText" } } ] comments = list(db.post_social_media.aggregate(pipeline)) negative_texts = [c.get('comment_text', '') for c in comments if c.get('comment_text')] if not negative_texts: return AIInsights( summary="Không thể truy xuất nội dung feedback tiêu cực.", top_issues=[], improvement_suggestions=[], predicted_nps=60.0 ) # Build prompt comments_sample = "\n".join([f"- {text[:100]}" for text in negative_texts[:15]]) prompt = f"""Phân tích feedback tiêu cực cho sự kiện. Số feedback tiêu cực: {len(negative_texts)} Mẫu feedback: {comments_sample} Nhiệm vụ: Đưa ra phân tích chi tiết: 1. TOP 5 VẤN ĐỀ (mỗi vấn đề 1 dòng, ngắn gọn): 2. ĐỀ XUẤT CẢI THIỆN (3-5 đề xuất cụ thể): 3. DỰ ĐOÁN NPS (điểm từ 0-100): Định dạng: TOP_ISSUES: 1. [vấn đề] 2. [vấn đề] ... SUGGESTIONS: - [đề xuất] - [đề xuất] ... NPS: [số] """ generated = self.generate_text(prompt, max_new_tokens=400) # Parse top_issues = [] suggestions = [] predicted_nps = 60.0 if generated: lines = generated.split('\n') current_section = None for line in lines: line = line.strip() if "TOP_ISSUES" in line or "VẤN ĐỀ" in line: current_section = "issues" elif "SUGGESTIONS" in line or "ĐỀ XUẤT" in line: current_section = "suggestions" elif "NPS" in line: try: import re numbers = re.findall(r'\d+', line) if numbers: predicted_nps = float(numbers[0]) except: pass elif current_section == "issues" and (line.startswith("-") or line[0].isdigit()): issue = line.lstrip("0123456789.-) ").strip() if issue and len(issue) > 5: top_issues.append(issue) elif current_section == "suggestions" and line.startswith("-"): suggestion = line.lstrip("- ").strip() if suggestion and len(suggestion) > 5: suggestions.append(suggestion) # Create summary total_comments = db.sentiment_results.count_documents({"event_code": self.event_code}) negative_pct = (len(negative_results) / total_comments * 100) if total_comments > 0 else 0 summary = f"Sự kiện nhận được {total_comments} phản hồi, trong đó {len(negative_results)} ({negative_pct:.1f}%) phản hồi tiêu cực." return AIInsights( summary=summary, top_issues=top_issues[:5], improvement_suggestions=suggestions[:5], predicted_nps=predicted_nps ) def update_sentiment_summary_with_insights(self): """ Generate and update EventSentimentSummary with AI insights. """ print("=" * 60) print(f"🚀 Generating Insights for Event: {self.event_code}") print("=" * 60) try: insights = self.generate_insights_from_sentiment() # Update summary db.event_sentiment_summary.update_one( {"event_code": self.event_code}, {"$set": { "ai_insights": insights.dict(), "last_updated": datetime.utcnow() }}, upsert=True ) print("✓ Insights generated:") print(f" Top Issues: {len(insights.top_issues)}") print(f" Suggestions: {len(insights.improvement_suggestions)}") print(f" Predicted NPS: {insights.predicted_nps}") print("=" * 60) print("✅ Insights Complete!") print("=" * 60) return insights except Exception as e: monitor.log_error("genai_insights", e, { "event_code": self.event_code }) raise