# ═══════════════════════════════════════════════════════════════════════════════ # File: app/utils/audit_logger.py # Description: Enterprise Audit Logging for SOC2/ISO27001 Compliance # ═══════════════════════════════════════════════════════════════════════════════ """ Audit Logging System for Enterprise Compliance. Features: - Immutable audit trail - Who accessed what data - All API operations logged - CERT-In and SOC2 compatible format - Export to SIEM (Splunk/Sentinel ready via Syslog) """ import json import time import uuid import logging import logging.handlers import socket from datetime import datetime from typing import Dict, Any, Optional, List from enum import Enum from pathlib import Path from app.config import settings from app.utils.logger import AgentLogger class AuditEventType(str, Enum): """Types of audit events.""" # API Events API_REQUEST = "API_REQUEST" API_RESPONSE = "API_RESPONSE" # Security Events AUTH_SUCCESS = "AUTH_SUCCESS" AUTH_FAILURE = "AUTH_FAILURE" RATE_LIMIT_EXCEEDED = "RATE_LIMIT_EXCEEDED" # Intelligence Events SCAM_DETECTED = "SCAM_DETECTED" INTELLIGENCE_EXTRACTED = "INTELLIGENCE_EXTRACTED" THREAT_ANALYZED = "THREAT_ANALYZED" # Enforcement Events REPORT_FILED = "REPORT_FILED" UPI_FREEZE_RECOMMENDED = "UPI_FREEZE_RECOMMENDED" CALLBACK_SENT = "CALLBACK_SENT" PERSONA_SELECTED = "PERSONA_SELECTED" # Data Events CONVERSATION_CREATED = "CONVERSATION_CREATED" CONVERSATION_UPDATED = "CONVERSATION_UPDATED" DATA_EXPORTED = "DATA_EXPORTED" class AuditLog: """Single audit log entry.""" def __init__( self, event_type: AuditEventType, actor: str, resource: str, action: str, details: Optional[Dict] = None, ip_address: Optional[str] = None, session_id: Optional[str] = None, risk_level: str = "low" ): self.id = f"AUDIT_{uuid.uuid4().hex[:12]}" self.timestamp = datetime.utcnow().isoformat() + "Z" self.event_type = event_type self.actor = actor # API key, user ID, or "system" self.resource = resource # Endpoint, conversation ID, etc. self.action = action # What was done self.details = details or {} self.ip_address = ip_address self.session_id = session_id self.risk_level = risk_level # Compliance fields self.platform = "sentinel-honeypot" self.version = settings.VERSION self.environment = "sandbox" if settings.SANDBOX_MODE else "production" def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization.""" return { "audit_id": self.id, "timestamp": self.timestamp, "event_type": self.event_type.value if isinstance(self.event_type, AuditEventType) else self.event_type, "actor": self.actor, "resource": self.resource, "action": self.action, "details": self.details, "ip_address": self.ip_address, "session_id": self.session_id, "risk_level": self.risk_level, "platform": self.platform, "version": self.version, "environment": self.environment } def to_json(self) -> str: """Convert to JSON string for file/SIEM export.""" return json.dumps(self.to_dict()) class AuditLogger: """ Centralized audit logging system. Provides: - In-memory buffer for fast writes - File-based persistence (JSONL format) - SIEM-compatible export - Query capabilities """ def __init__(self, log_dir: str = "./data/audit"): self.log_dir = Path(log_dir) self.log_dir.mkdir(parents=True, exist_ok=True) self._buffer: List[AuditLog] = [] self._buffer_size = 100 self._logger = AgentLogger("audit") # Current log file (rotates daily) self._current_file = self._get_log_file() # Syslog Handler for SIEM (Standard: UDP 514) self._setup_syslog() def _setup_syslog(self) -> None: """Configure Syslog for SIEM integration.""" self.syslog_enabled = getattr(settings, "SYSLOG_ENABLED", False) if not self.syslog_enabled: return syslog_host = getattr(settings, "SYSLOG_HOST", "localhost") syslog_port = getattr(settings, "SYSLOG_PORT", 514) try: self.syslog_handler = logging.handlers.SysLogHandler( address=(syslog_host, syslog_port), facility=logging.handlers.SysLogHandler.LOG_LOCAL7 ) # Use JSON formatter for Syslog to make it easily parsable by SIEM formatter = logging.Formatter('%(message)s') self.syslog_handler.setFormatter(formatter) self._logger.info(f"Syslog enabled: {syslog_host}:{syslog_port}") except Exception as e: self._logger.error(f"Failed to setup Syslog: {e}") self.syslog_enabled = False def _get_log_file(self) -> Path: """Get today's log file path.""" date_str = datetime.utcnow().strftime("%Y-%m-%d") return self.log_dir / f"audit_{date_str}.jsonl" def log( self, event_type: AuditEventType, actor: str, resource: str, action: str, details: Optional[Dict] = None, ip_address: Optional[str] = None, session_id: Optional[str] = None, risk_level: str = "low" ) -> AuditLog: """ Record an audit event. Args: event_type: Type of event actor: Who performed the action resource: What was affected action: Description of what happened details: Additional context ip_address: Client IP session_id: Session/conversation ID risk_level: low, medium, high, critical Returns: The created AuditLog entry """ entry = AuditLog( event_type=event_type, actor=actor, resource=resource, action=action, details=details, ip_address=ip_address, session_id=session_id, risk_level=risk_level ) # Add to buffer self._buffer.append(entry) # Persist immediately for high-risk events if risk_level in ["high", "critical"] or event_type in [ AuditEventType.AUTH_FAILURE, AuditEventType.REPORT_FILED, AuditEventType.RATE_LIMIT_EXCEEDED ]: self._flush_to_file([entry]) # Flush buffer if full if len(self._buffer) >= self._buffer_size: self._flush_buffer() return entry def _flush_buffer(self) -> None: """Write buffer to file.""" if self._buffer: self._flush_to_file(self._buffer) self._buffer.clear() def _flush_to_file(self, entries: List[AuditLog]) -> None: """Write entries to log file.""" log_file = self._get_log_file() try: with open(log_file, "a", encoding="utf-8") as f: for entry in entries: entry_json = entry.to_json() f.write(entry_json + "\n") # Forward to Syslog if enabled if self.syslog_enabled and hasattr(self, "syslog_handler"): # Format as a standard Syslog message with app name # Sentinel: {json_payload} self.syslog_handler.emit( logging.LogRecord( name="sentinel", level=logging.INFO, pathname="", lineno=0, msg=f"SentinelAudit: {entry_json}", args=None, exc_info=None ) ) except Exception as e: self._logger.error(f"Failed to write audit log: {e}") # Convenience methods for common events def log_api_request( self, endpoint: str, method: str, ip_address: str, api_key: Optional[str] = None, session_id: Optional[str] = None ) -> AuditLog: """Log an API request.""" return self.log( event_type=AuditEventType.API_REQUEST, actor=api_key or "anonymous", resource=endpoint, action=f"{method} {endpoint}", details={"method": method}, ip_address=ip_address, session_id=session_id ) def log_scam_detected( self, session_id: str, scam_type: str, confidence: float, intelligence: Dict, ip_address: Optional[str] = None ) -> AuditLog: """Log scam detection event.""" return self.log( event_type=AuditEventType.SCAM_DETECTED, actor="scam_detector", resource=f"session/{session_id}", action=f"Detected {scam_type} scam", details={ "scam_type": scam_type, "confidence": confidence, "intel_count": sum(len(v) for v in intelligence.values() if isinstance(v, list)) }, ip_address=ip_address, session_id=session_id, risk_level="high" if confidence > 0.8 else "medium" ) def log_persona_selected( self, session_id: str, persona_key: str, persona_name: str, reasoning: str, vulnerability_score: float = 0.5 ) -> AuditLog: """Log dynamic persona selection.""" return self.log( event_type=AuditEventType.PERSONA_SELECTED, actor="persona_engine", resource=f"persona/{persona_key}", action=f"Selected persona {persona_name}", details={ "persona_key": persona_key, "persona_name": persona_name, "reasoning": reasoning, "vulnerability_score": vulnerability_score }, session_id=session_id ) def log_report_filed( self, report_id: str, report_type: str, session_id: str, scam_type: str ) -> AuditLog: """Log law enforcement report.""" return self.log( event_type=AuditEventType.REPORT_FILED, actor="enforcement_api", resource=f"report/{report_id}", action=f"Filed {report_type} report", details={ "report_id": report_id, "report_type": report_type, "scam_type": scam_type }, session_id=session_id, risk_level="high" ) def log_auth_failure( self, endpoint: str, ip_address: str, reason: str ) -> AuditLog: """Log authentication failure.""" return self.log( event_type=AuditEventType.AUTH_FAILURE, actor="unknown", resource=endpoint, action="Authentication failed", details={"reason": reason}, ip_address=ip_address, risk_level="high" ) def log_callback_sent( self, session_id: str, success: bool, target: str = "GUVI" ) -> AuditLog: """Log callback to external system.""" return self.log( event_type=AuditEventType.CALLBACK_SENT, actor="callback_client", resource=f"callback/{target}", action=f"Callback to {target}: {'success' if success else 'failed'}", details={"success": success, "target": target}, session_id=session_id, risk_level="medium" if not success else "low" ) def get_recent_logs(self, count: int = 100) -> List[Dict]: """Get recent audit logs.""" # Flush buffer first self._flush_buffer() # Read from file log_file = self._get_log_file() if not log_file.exists(): return [] logs = [] with open(log_file, "r", encoding="utf-8") as f: for line in f: try: logs.append(json.loads(line.strip())) except: continue return logs[-count:] def export_siem_format(self) -> str: """Export logs in SIEM-compatible JSONL format.""" self._flush_buffer() log_file = self._get_log_file() if log_file.exists(): return log_file.read_text(encoding="utf-8") return "" def flush(self) -> None: """Force flush all buffered entries.""" self._flush_buffer() # Global audit logger instance audit_logger = AuditLogger() __all__ = ["AuditLogger", "AuditLog", "AuditEventType", "audit_logger"]