| """
|
| Logging & Monitoring Module
|
| Author: AI Generated
|
| Created: 2025-11-24
|
| Purpose: Track pipeline performance, errors, and model drift
|
| """
|
|
|
| import logging
|
| from datetime import datetime
|
| from typing import Dict, Any, Optional
|
| import json
|
| from pathlib import Path
|
| import numpy as np
|
|
|
| from database import db
|
|
|
|
|
|
|
| LOG_DIR = Path("logs")
|
| LOG_DIR.mkdir(exist_ok=True)
|
|
|
| logging.basicConfig(
|
| level=logging.INFO,
|
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
| handlers=[
|
| logging.FileHandler(LOG_DIR / 'pipeline.log'),
|
| logging.StreamHandler()
|
| ]
|
| )
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class PipelineMonitor:
|
| """
|
| Monitor AI pipeline performance and log metrics.
|
| """
|
|
|
| def __init__(self):
|
| self.metrics_collection = "PipelineMetrics"
|
|
|
| def log_segmentation_run(self, metrics: Dict[str, Any]):
|
| """
|
| Log segmentation pipeline metrics.
|
|
|
| Metrics should include:
|
| - n_users: Number of users processed
|
| - n_segments: Number of segments created
|
| - inertia: K-means inertia
|
| - execution_time: Time in seconds
|
| - outliers_removed: Count
|
| """
|
| logger.info(f"Segmentation Run: {metrics}")
|
|
|
|
|
| doc = {
|
| "pipeline": "segmentation",
|
| "timestamp": datetime.utcnow(),
|
| "metrics": metrics
|
| }
|
| db.get_collection(self.metrics_collection).insert_one(doc)
|
|
|
| def log_sentiment_run(self, metrics: Dict[str, Any]):
|
| """
|
| Log sentiment analysis metrics.
|
|
|
| Metrics should include:
|
| - n_comments: Number of comments analyzed
|
| - sentiment_distribution: {Positive: X, Negative: Y, Neutral: Z}
|
| - avg_confidence: Average confidence score
|
| - execution_time: Time in seconds
|
| """
|
| logger.info(f"Sentiment Analysis Run: {metrics}")
|
|
|
| doc = {
|
| "pipeline": "sentiment",
|
| "timestamp": datetime.utcnow(),
|
| "metrics": metrics
|
| }
|
| db.get_collection(self.metrics_collection).insert_one(doc)
|
|
|
| def log_genai_run(self, task: str, metrics: Dict[str, Any]):
|
| """
|
| Log Generative AI metrics.
|
|
|
| Metrics should include:
|
| - n_generated: Number of items generated
|
| - avg_generation_time: Average time per item
|
| - total_time: Total execution time
|
| """
|
| logger.info(f"GenAI Run ({task}): {metrics}")
|
|
|
| doc = {
|
| "pipeline": "genai",
|
| "task": task,
|
| "timestamp": datetime.utcnow(),
|
| "metrics": metrics
|
| }
|
| db.get_collection(self.metrics_collection).insert_one(doc)
|
|
|
| def log_error(self, pipeline: str, error: Exception, context: Dict = None):
|
| """
|
| Log pipeline errors.
|
| """
|
| logger.error(f"Error in {pipeline}: {str(error)}", exc_info=True)
|
|
|
| doc = {
|
| "pipeline": pipeline,
|
| "timestamp": datetime.utcnow(),
|
| "error": str(error),
|
| "error_type": type(error).__name__,
|
| "context": context or {}
|
| }
|
| db.get_collection("PipelineErrors").insert_one(doc)
|
|
|
| def detect_drift_segmentation(self, current_centroids: np.ndarray) -> Dict:
|
| """
|
| Detect drift in K-means clustering.
|
| Compare current centroids with previous run.
|
| """
|
|
|
| last_metric = db.get_collection(self.metrics_collection).find_one(
|
| {"pipeline": "segmentation"},
|
| sort=[("timestamp", -1)]
|
| )
|
|
|
| if not last_metric or "centroids" not in last_metric["metrics"]:
|
| logger.info("No previous centroids found for drift detection")
|
| return {"drift_detected": False, "reason": "no_baseline"}
|
|
|
|
|
| prev_centroids = np.array(last_metric["metrics"]["centroids"])
|
|
|
| if prev_centroids.shape != current_centroids.shape:
|
| return {"drift_detected": True, "reason": "shape_mismatch"}
|
|
|
|
|
| distances = np.linalg.norm(current_centroids - prev_centroids, axis=1)
|
| avg_drift = float(np.mean(distances))
|
| max_drift = float(np.max(distances))
|
|
|
|
|
| drift_detected = avg_drift > 0.5
|
|
|
| result = {
|
| "drift_detected": drift_detected,
|
| "avg_drift": avg_drift,
|
| "max_drift": max_drift,
|
| "threshold": 0.5
|
| }
|
|
|
| if drift_detected:
|
| logger.warning(f"⚠️ Cluster drift detected: avg={avg_drift:.3f}, max={max_drift:.3f}")
|
|
|
| return result
|
|
|
| def detect_drift_sentiment(self, current_distribution: Dict[str, int]) -> Dict:
|
| """
|
| Detect drift in sentiment distribution.
|
| """
|
|
|
| last_metric = db.get_collection(self.metrics_collection).find_one(
|
| {"pipeline": "sentiment"},
|
| sort=[("timestamp", -1)]
|
| )
|
|
|
| if not last_metric:
|
| return {"drift_detected": False, "reason": "no_baseline"}
|
|
|
| prev_dist = last_metric["metrics"].get("sentiment_distribution", {})
|
|
|
|
|
| prev_total = sum(prev_dist.values())
|
| curr_total = sum(current_distribution.values())
|
|
|
| if prev_total == 0 or curr_total == 0:
|
| return {"drift_detected": False, "reason": "insufficient_data"}
|
|
|
|
|
| changes = {}
|
| for label in ["Positive", "Negative", "Neutral"]:
|
| prev_pct = prev_dist.get(label, 0) / prev_total
|
| curr_pct = current_distribution.get(label, 0) / curr_total
|
| changes[label] = abs(curr_pct - prev_pct)
|
|
|
|
|
| max_change = max(changes.values())
|
| drift_detected = max_change > 0.1
|
|
|
| result = {
|
| "drift_detected": drift_detected,
|
| "changes": changes,
|
| "max_change": max_change,
|
| "threshold": 0.1
|
| }
|
|
|
| if drift_detected:
|
| logger.warning(f"⚠️ Sentiment drift detected: max_change={max_change:.1%}")
|
|
|
| return result
|
|
|
| def get_performance_summary(self, pipeline: str, days: int = 7) -> Dict:
|
| """
|
| Get performance summary for the last N days.
|
| """
|
| from datetime import timedelta
|
|
|
| cutoff = datetime.utcnow() - timedelta(days=days)
|
|
|
| metrics = list(db.get_collection(self.metrics_collection).find({
|
| "pipeline": pipeline,
|
| "timestamp": {"$gte": cutoff}
|
| }).sort("timestamp", -1))
|
|
|
| if not metrics:
|
| return {"error": "No metrics found"}
|
|
|
|
|
| total_runs = len(metrics)
|
| avg_time = np.mean([m["metrics"].get("execution_time", 0) for m in metrics])
|
|
|
| return {
|
| "pipeline": pipeline,
|
| "period_days": days,
|
| "total_runs": total_runs,
|
| "avg_execution_time": avg_time,
|
| "last_run": metrics[0]["timestamp"]
|
| }
|
|
|
|
|
|
|
| monitor = PipelineMonitor()
|
|
|