# ═══════════════════════════════════════════════════════════════════════════════ # File: app/core/memory.py # Description: Conversation memory management and storage # ═══════════════════════════════════════════════════════════════════════════════ """Conversation memory store for multi-turn engagement.""" from typing import Dict, List, Optional, Any from datetime import datetime, timedelta from datetime import datetime, timedelta import uuid import random class ConversationMemory: """ In-memory conversation storage with TTL support. Stores conversation history, extracted intelligence, and conversation metadata for multi-turn honeypot engagement. """ def __init__(self, ttl_hours: int = 24): self.conversations: Dict[str, Dict] = {} self.ttl_hours = ttl_hours # Global statistics self.stats = { "total_conversations": 0, "total_messages": 0, "scams_detected": 0, "intelligence_extracted": 0 } def get_or_create( self, conversation_id: Optional[str] = None, sender_id: Optional[str] = None ) -> Dict: """ Get existing conversation or create new one. Args: conversation_id: Optional existing conversation ID sender_id: Optional sender identifier Returns: Conversation dictionary """ # 🔥 LAZY CLEANUP: Prevent Memory Leaks # 10% chance to run cleanup on every access if random.random() < 0.1: self.cleanup_expired() # Generate ID if not provided if not conversation_id: conversation_id = f"conv_{uuid.uuid4().hex[:12]}" # Return existing if conversation_id in self.conversations: return self.conversations[conversation_id] # Create new conversation = { "id": conversation_id, "sender_id": sender_id, "created_at": datetime.utcnow().isoformat(), "updated_at": datetime.utcnow().isoformat(), "message_count": 0, "phase": "hook", "trust_score": 0.0, # Trust evolution tracking "scam_type": None, "persona": None, "history": [], "aggregated_intelligence": { "phone_numbers": [], "upi_ids": [], "bank_accounts": [], "ifsc_codes": [], "emails": [], "urls": [], "credit_cards": [], # Added "otps": [], # Added "rat_apps": [], # Added "pan_cards": [], # Added "aadhar_numbers": [], # Added "metadata_agitation": [], # 🔥 PERSISTENCE FIX "metadata_agitation_reason": [] # 🔥 PERSISTENCE FIX }, "threat_intel": None, "risk_score": 0.0, "session_llm_calls": 0 } self.conversations[conversation_id] = conversation self.stats["total_conversations"] += 1 return conversation def get(self, conversation_id: str) -> Optional[Dict]: """Get conversation by ID.""" return self.conversations.get(conversation_id) def update( self, conversation_id: str, scammer_message: str, honeypot_response: str, intelligence: Dict, phase: str, scam_type: Optional[str] = None, persona: Optional[str] = None, risk_score: float = 0.0, trust_score: float = 0.0 ) -> Dict: """ Update conversation with new message exchange. Args: conversation_id: Conversation ID scammer_message: Message from scammer honeypot_response: Response from honeypot intelligence: Extracted intelligence from message phase: Current conversation phase scam_type: Detected scam type persona: Persona used for response """ conv = self.get_or_create(conversation_id) # Increment counts conv["message_count"] += 1 self.stats["total_messages"] += 1 # Update metadata conv["updated_at"] = datetime.utcnow().isoformat() conv["phase"] = phase if scam_type: conv["scam_type"] = scam_type if conv["message_count"] == 1: self.stats["scams_detected"] += 1 if persona: conv["persona"] = persona conv["risk_score"] = risk_score conv["trust_score"] = trust_score # Add to history conv["history"].append({ "turn": conv["message_count"], "timestamp": datetime.utcnow().isoformat(), "scammer_message": scammer_message, "honeypot_response": honeypot_response, "phase": phase, "intelligence": intelligence }) # 🔥 [RISK 5] HISTORY PRUNING: Cap history at 20 records (10 turns) # Prevents linear memory growth and latency spikes in long sessions. if len(conv["history"]) > 20: conv["history"] = conv["history"][-20:] # Aggregate intelligence for key in conv["aggregated_intelligence"]: if key in intelligence: for item in intelligence[key]: if item not in conv["aggregated_intelligence"][key]: conv["aggregated_intelligence"][key].append(item) self.stats["intelligence_extracted"] += 1 # 🔥 [RISK 5] TRACE PRUNING: Cap reasoning segments if len(conv["aggregated_intelligence"].get("reasoning_history", [])) > 5: conv["aggregated_intelligence"]["reasoning_history"] = \ conv["aggregated_intelligence"]["reasoning_history"][-5:] def update_intelligence(self, conversation_id: str, intelligence: Dict[str, Any]) -> Dict: """Explicitly update intelligence fields.""" conv = self.get(conversation_id) if not conv: return {} for key, values in intelligence.items(): if key not in conv["aggregated_intelligence"]: conv["aggregated_intelligence"][key] = [] for val in (values if isinstance(values, list) else [values]): if val not in conv["aggregated_intelligence"][key]: conv["aggregated_intelligence"][key].append(val) self.stats["intelligence_extracted"] += 1 # 🔥 [RISK 5] TRACE PRUNING: Cap reasoning segments if len(conv["aggregated_intelligence"].get("reasoning_history", [])) > 5: conv["aggregated_intelligence"]["reasoning_history"] = \ conv["aggregated_intelligence"]["reasoning_history"][-5:] return conv def get_history_text(self, conversation_id: str, max_turns: int = 10) -> str: """Get conversation history as formatted text.""" conv = self.get(conversation_id) if not conv: return "" history = conv["history"][-max_turns:] lines = [] for msg in history: lines.append(f"Scammer: {msg['scammer_message']}") lines.append(f"You: {msg['honeypot_response']}") return "\n".join(lines) def count_active(self) -> int: """Count active conversations (within TTL).""" cutoff = datetime.utcnow() - timedelta(hours=self.ttl_hours) count = 0 for conv in self.conversations.values(): updated = datetime.fromisoformat(conv["updated_at"]) if updated > cutoff: count += 1 return count def get_statistics(self) -> Dict[str, Any]: """Get global statistics.""" scam_distribution = {} for conv in self.conversations.values(): scam_type = conv.get("scam_type") or "pending" scam_distribution[str(scam_type)] = scam_distribution.get(str(scam_type), 0) + 1 return { **self.stats, "active_conversations": self.count_active(), "scam_distribution": scam_distribution } def cleanup_expired(self) -> int: """Remove expired conversations. Returns count removed.""" cutoff = datetime.utcnow() - timedelta(hours=self.ttl_hours) expired = [] for conv_id, conv in self.conversations.items(): updated = datetime.fromisoformat(conv["updated_at"]) if updated < cutoff: expired.append(conv_id) for conv_id in expired: del self.conversations[conv_id] return len(expired) # Global memory instance memory_store = ConversationMemory()