| """
|
| Feedback Loop System
|
| Author: AI Generated
|
| Created: 2025-11-24
|
| Purpose: Collect feedback metrics to improve AI models over time
|
| """
|
|
|
| from datetime import datetime
|
| from typing import Dict, Optional
|
| from bson import ObjectId
|
|
|
| from database import db
|
|
|
|
|
| class FeedbackCollector:
|
| """
|
| Collect feedback on AI outputs for continuous improvement.
|
| """
|
|
|
| def __init__(self):
|
| self.collection = "AIFeedback"
|
|
|
| def record_email_engagement(self,
|
| segment_id: str,
|
| user_id: str,
|
| opened: bool = False,
|
| clicked: bool = False,
|
| converted: bool = False,
|
| unsubscribed: bool = False):
|
| """
|
| Record email engagement metrics.
|
| Used to evaluate email generation quality.
|
| """
|
| doc = {
|
| "feedback_type": "email_engagement",
|
| "segment_id": ObjectId(segment_id),
|
| "user_id": ObjectId(user_id),
|
| "opened": opened,
|
| "clicked": clicked,
|
| "converted": converted,
|
| "unsubscribed": unsubscribed,
|
| "timestamp": datetime.utcnow()
|
| }
|
|
|
| db.get_collection(self.collection).insert_one(doc)
|
|
|
| def record_sentiment_correction(self,
|
| analysis_id: str,
|
| original_label: str,
|
| corrected_label: str,
|
| corrected_by: str):
|
| """
|
| Record manual corrections to sentiment analysis.
|
| Used to fine-tune PhoBERT.
|
| """
|
| doc = {
|
| "feedback_type": "sentiment_correction",
|
| "analysis_id": ObjectId(analysis_id),
|
| "original_label": original_label,
|
| "corrected_label": corrected_label,
|
| "corrected_by": corrected_by,
|
| "timestamp": datetime.utcnow()
|
| }
|
|
|
| db.get_collection(self.collection).insert_one(doc)
|
|
|
| def record_segment_feedback(self,
|
| segment_id: str,
|
| user_id: str,
|
| interaction_type: str,
|
| value: Optional[float] = None):
|
| """
|
| Record user interactions with segment-targeted campaigns.
|
|
|
| interaction_type: 'purchase', 'view', 'ignore', etc.
|
| value: revenue/engagement metric
|
| """
|
| doc = {
|
| "feedback_type": "segment_interaction",
|
| "segment_id": ObjectId(segment_id),
|
| "user_id": ObjectId(user_id),
|
| "interaction_type": interaction_type,
|
| "value": value,
|
| "timestamp": datetime.utcnow()
|
| }
|
|
|
| db.get_collection(self.collection).insert_one(doc)
|
|
|
| def record_insight_usefulness(self,
|
| insight_report_id: str,
|
| user_id: str,
|
| rating: int,
|
| implemented: bool = False):
|
| """
|
| Record how useful an insight report was.
|
| rating: 1-5 stars
|
| """
|
| doc = {
|
| "feedback_type": "insight_rating",
|
| "insight_report_id": ObjectId(insight_report_id),
|
| "user_id": user_id,
|
| "rating": rating,
|
| "implemented": implemented,
|
| "timestamp": datetime.utcnow()
|
| }
|
|
|
| db.get_collection(self.collection).insert_one(doc)
|
|
|
| def get_email_performance(self, segment_id: str) -> Dict:
|
| """
|
| Get aggregated email performance for a segment.
|
| """
|
| pipeline = [
|
| {
|
| "$match": {
|
| "feedback_type": "email_engagement",
|
| "segment_id": ObjectId(segment_id)
|
| }
|
| },
|
| {
|
| "$group": {
|
| "_id": None,
|
| "total_sent": {"$sum": 1},
|
| "opened": {"$sum": {"$cond": ["$opened", 1, 0]}},
|
| "clicked": {"$sum": {"$cond": ["$clicked", 1, 0]}},
|
| "converted": {"$sum": {"$cond": ["$converted", 1, 0]}},
|
| "unsubscribed": {"$sum": {"$cond": ["$unsubscribed", 1, 0]}}
|
| }
|
| }
|
| ]
|
|
|
| results = list(db.get_collection(self.collection).aggregate(pipeline))
|
|
|
| if not results:
|
| return {"error": "No data"}
|
|
|
| data = results[0]
|
| total = data["total_sent"]
|
|
|
| return {
|
| "total_sent": total,
|
| "open_rate": data["opened"] / total if total > 0 else 0,
|
| "click_rate": data["clicked"] / total if total > 0 else 0,
|
| "conversion_rate": data["converted"] / total if total > 0 else 0,
|
| "unsubscribe_rate": data["unsubscribed"] / total if total > 0 else 0
|
| }
|
|
|
| def get_sentiment_accuracy(self) -> Dict:
|
| """
|
| Calculate sentiment analysis accuracy based on corrections.
|
| """
|
| corrections = list(db.get_collection(self.collection).find({
|
| "feedback_type": "sentiment_correction"
|
| }))
|
|
|
| if not corrections:
|
| return {"error": "No corrections recorded"}
|
|
|
| total = len(corrections)
|
| correct = sum(1 for c in corrections if c["original_label"] == c["corrected_label"])
|
|
|
| accuracy = correct / total
|
|
|
|
|
| by_label = {}
|
| for c in corrections:
|
| label = c["original_label"]
|
| if label not in by_label:
|
| by_label[label] = {"total": 0, "correct": 0}
|
| by_label[label]["total"] += 1
|
| if c["original_label"] == c["corrected_label"]:
|
| by_label[label]["correct"] += 1
|
|
|
| for label in by_label:
|
| data = by_label[label]
|
| by_label[label]["accuracy"] = data["correct"] / data["total"]
|
|
|
| return {
|
| "overall_accuracy": accuracy,
|
| "total_corrections": total,
|
| "by_label": by_label
|
| }
|
|
|
| def get_retaining_dataset(self) -> tuple:
|
| """
|
| Get dataset for retraining sentiment model from corrections.
|
| Returns: (texts, labels)
|
| """
|
| corrections = list(db.get_collection(self.collection).find({
|
| "feedback_type": "sentiment_correction"
|
| }))
|
|
|
|
|
| analysis_ids = [c["analysis_id"] for c in corrections]
|
| analyses = {
|
| str(a["_id"]): a
|
| for a in db.sentiment_results.find({"_id": {"$in": analysis_ids}})
|
| }
|
|
|
|
|
| source_ids = [analyses[str(c["analysis_id"])]["source_id"] for c in corrections if str(c["analysis_id"]) in analyses]
|
| comments = {
|
| str(c["_id"]): c.get("CommentText", "")
|
| for c in db.user_comment_post.find({"_id": {"$in": source_ids}})
|
| }
|
|
|
|
|
| texts = []
|
| labels = []
|
|
|
| for c in corrections:
|
| analysis_id_str = str(c["analysis_id"])
|
| if analysis_id_str in analyses:
|
| source_id_str = str(analyses[analysis_id_str]["source_id"])
|
| if source_id_str in comments:
|
| texts.append(comments[source_id_str])
|
| labels.append(c["corrected_label"])
|
|
|
| print(f"✓ Built retraining dataset: {len(texts)} samples")
|
| return texts, labels
|
|
|
|
|
|
|
| feedback = FeedbackCollector()
|
|
|