# # File: app/core/llm_client.py # Module: Artificial Intelligence Interface Layer # Description: Unified abstraction layer for Large Language Model (LLM) interactions. # Implements Strategy Pattern to support multi-provider routing # (OpenAI, Anthropic, Groq) with automatic fallback and connection pooling. # """ LLM Client Abstraction Layer. This module provides a unified interface for interacting with various LLM providers. It abstracts provider-specific API details, ensuring that the core application logic remains agnostic to the underlying model infrastructure. Key Features: - **Multi-Provider Support:** Seamless switching between OpenAI, Anthropic, and Groq. - **Connection Pooling:** Shared AsyncClient for high-throughput performance. - **Standardized Interfaces:** Uniform `generate` and `check_connectivity` methods. - **Role-Based Routing:** Integration with ModelRegistry for capability-based model selection. """ import httpx import json import asyncio import time import re from typing import Optional, Dict, Any, List, Tuple from abc import ABC, abstractmethod from app.config import settings from app.core.model_registry import model_registry, Capability from app.utils.logger import logger # Groq-Correct Error Taxonomy (Added per audit) from app.core.groq_errors import ( classify_groq_error, get_recovery_action, update_rate_state, GroqErrorType ) # Prompt Cache for Token Storm Prevention from app.core.prompt_cache import prompt_cache # Shared HTTP Client for performance (Connection Pooling) _shared_client = httpx.AsyncClient(timeout=30.0) from enum import Enum class ModelRole(Enum): """ Semantic roles for model assignment. Used to decouple logical intent (e.g., 'Safety Check') from specific model implementations (e.g., 'Llama-Guard-3'). """ FAST_CHAT = "FAST_CHAT_MODEL" # Low latency interactions SMART_REASONING = "SMART_REASONING_MODEL" # Complex logic and planning STRUCTURED_OUTPUT = "STRUCTURED_OUTPUT_MODEL" # JSON extraction and compliance SAFETY_GUARD = "SAFETY_GUARD_MODEL" # Input/Output content filtering FORENSIC_SEARCH = "FORENSIC_SEARCH_MODEL" # Real-time web search and browser tool FALLBACK = "FALLBACK_MODEL" # High-availability redundancy class BudgetExceeded(Exception): """Exception raised when LLM budget is exceeded.""" pass class LLMResponse: """ Stateless container for LLM output and metadata. Enables concurrency-safe tracking of reasoning and tools. """ def __init__( self, content: str, reasoning: Optional[str] = None, usage: Optional[Dict[str, Any]] = None, executed_tools: Optional[List[Dict[str, Any]]] = None, model: Optional[str] = None ): self.content = content self.reasoning = reasoning self.usage = usage or {} self.executed_tools = executed_tools or [] self.model = model def __str__(self) -> str: return self.content def __getitem__(self, key): # Support dict-like access for backward compatibility with JSON extraction if key == "content": return self.content if key == "reasoning": return self.reasoning # If content is JSON, allow access to its keys try: data = json.loads(self.content) if isinstance(data, dict): return data.get(key) except: pass return None def get(self, key, default=None): val = self[key] return val if val is not None else default def keys(self): try: data = json.loads(self.content) if isinstance(data, dict): return list(data.keys()) + ["content", "reasoning"] except: pass return ["content", "reasoning"] def __contains__(self, key): return key in self.keys() class BaseLLMClient(ABC): """ Abstract Base Class for LLM Providers. Enforces a standard contract for all model integrations, ensuring interchangeability and consistent error handling patterns. """ @abstractmethod async def generate(self, prompt: str, **kwargs) -> str: """ Execute a text generation request. Args: prompt (str): The input context or query. **kwargs: Provider-specific parameters (temperature, max_tokens, etc.) Returns: str: The generated response content. Raises: RuntimeError: If the provider client is not initialized or fails. """ pass @abstractmethod async def check_connectivity(self) -> bool: """ Verify provider availability. Performs a minimal health check against the provider's API (e.g., model listing) to ensure operational readiness. Returns: bool: True if the provider is reachable and authenticated. """ pass class OpenAIClient(BaseLLMClient): """OpenAI GPT client.""" def __init__(self): # MULTI-KEY SUPPORT: Parse comma-separated keys for load balancing raw_keys = settings.OPENAI_API_KEY or "" self.api_keys = [k.strip() for k in raw_keys.split(",") if k.strip()] self.current_key_idx = 0 self.api_key = self.api_keys[0] if self.api_keys else None self.client = None self.model = settings.GPT_MODEL def _rotate_key(self): """Switch to the next available API key.""" if not self.api_keys or len(self.api_keys) <= 1: return False self.current_key_idx = (self.current_key_idx + 1) % len(self.api_keys) self.api_key = self.api_keys[self.current_key_idx] from openai import AsyncOpenAI self.client = AsyncOpenAI(api_key=self.api_key) print(f" OpenAI Key Swapped! (Key ending in ...{self.api_key[-4:]})") return True async def initialize(self): """Initialize OpenAI client.""" if self.api_key: try: from openai import AsyncOpenAI self.client = AsyncOpenAI(api_key=self.api_key) except ImportError: pass async def generate( self, prompt: str, temperature: float = 0.7, max_tokens: int = 500, model: Optional[str] = None, **kwargs ) -> str: """Generate response using GPT with built-in key rotation.""" if not self.client: await self.initialize() if not self.client: raise RuntimeError("OpenAI client not initialized") # Merge kwargs for flexible parameters # PRODUCTION HARDENING: Limit cascade depth to 2 (Primary + 1 Fallback) max_retries = 2 # Hard limit to prevent retry storms for attempt in range(max_retries): try: response = await self.client.chat.completions.create( model=model or self.model, messages=[{"role": "user", "content": prompt}], temperature=temperature, max_tokens=max_tokens, **kwargs ) return response.choices[0].message.content except Exception as e: # Check for rate limit (429) or authentication errors that might require rotation err_str = str(e).lower() if "rate_limit" in err_str or "429" in err_str or "insufficient_quota" in err_str: print(f" OpenAI Key Rate Limited! Attempting rotation...") if self._rotate_key(): continue if attempt == max_retries - 1: print(f" OpenAI Final Attempt Failed: {e}") raise RuntimeError(f"OpenAI generation failed after {max_retries} attempts: {e}") wait = 1.0 * (attempt + 1) await asyncio.sleep(wait) async def generate_structured(self, prompt: str, schema: Dict[str, Any], model: Optional[str] = None, **kwargs) -> LLMResponse: """Structured output for OpenAI (supports JSON mode).""" prompt_with_json = f"{prompt}\n\nReturn response as valid JSON matching this schema: {json.dumps(schema)}" content = await self.generate(prompt_with_json, model=model, **kwargs) return LLMResponse(content=content, model=model or self.model) async def check_connectivity(self) -> bool: if not self.client: return False try: await self.client.models.list() return True except: return False class AnthropicClient(BaseLLMClient): """Anthropic Claude client.""" def __init__(self): # MULTI-KEY SUPPORT: Parse comma-separated keys for load balancing raw_keys = settings.ANTHROPIC_API_KEY or "" self.api_keys = [k.strip() for k in raw_keys.split(",") if k.strip()] self.current_key_idx = 0 self.api_key = self.api_keys[0] if self.api_keys else None self.client = None self.model = settings.CLAUDE_MODEL def _rotate_key(self): """Switch to the next available API key.""" if not self.api_keys or len(self.api_keys) <= 1: return False self.current_key_idx = (self.current_key_idx + 1) % len(self.api_keys) self.api_key = self.api_keys[self.current_key_idx] from anthropic import AsyncAnthropic self.client = AsyncAnthropic(api_key=self.api_key) print(f" Anthropic Key Swapped! (Key ending in ...{self.api_key[-4:]})") return True async def initialize(self): """Initialize Anthropic client.""" if self.api_key: try: from anthropic import AsyncAnthropic self.client = AsyncAnthropic(api_key=self.api_key) except ImportError: pass async def generate( self, prompt: str, temperature: float = 0.7, max_tokens: int = 500, model: Optional[str] = None, **kwargs ) -> str: """Generate response using Claude with built-in key rotation.""" if not self.client: await self.initialize() if not self.client: raise RuntimeError("Anthropic client not initialized") max_retries = max(3, len(self.api_keys)) for attempt in range(max_retries): try: response = await self.client.messages.create( model=model or self.model, messages=[{"role": "user", "content": prompt}], temperature=temperature, max_tokens=max_tokens, **kwargs ) return response.content[0].text except Exception as e: # Check for rate limit (429) err_str = str(e).lower() if "rate_limit" in err_str or "429" in err_str: print(f" Anthropic Key Rate Limited! Attempting rotation...") if self._rotate_key(): continue if attempt == max_retries - 1: print(f" Anthropic Final Attempt Failed: {e}") raise RuntimeError(f"Anthropic generation failed after {max_retries} attempts: {e}") wait = 1.0 * (attempt + 1) await asyncio.sleep(wait) async def generate_structured(self, prompt: str, schema: Dict[str, Any], model: Optional[str] = None, **kwargs) -> LLMResponse: """Structured output for Anthropic.""" prompt_with_json = f"{prompt}\n\nReturn response as valid JSON matching this schema: {json.dumps(schema)}" content = await self.generate(prompt_with_json, model=model, **kwargs) return LLMResponse(content=content, model=model or self.model) async def check_connectivity(self) -> bool: if not self.client: return False try: await self.client.models.list() return True except: return False class GroqClient(BaseLLMClient): """ Groq LLM client - FAST and FREE! Uses Llama 3.1 70B with lightning-fast inference. """ def __init__(self): # MULTI-KEY SUPPORT: Parse comma-separated keys for load balancing raw_keys = settings.GROQ_API_KEY or "" self.api_keys = [k.strip() for k in raw_keys.split(",") if k.strip()] self.current_key_idx = 0 self.api_key = self.api_keys[0] if self.api_keys else None # --- SMART KEY TRACKING --- self.key_cooldowns = {k: 0.0 for k in self.api_keys} self.model = settings.GROQ_MODEL self.base_url = "https://api.groq.com/openai/v1/chat/completions" # --- RATE LIMIT MONITOR --- self.remaining_requests = 1000 self.remaining_tokens = 6000 self.reset_requests = "0s" self.reset_tokens = "0s" # --- MODEL-LEVEL BLACKLIST (COOLDOWNS) --- # Used to hide models that hit hard daily limits (RPD/TPD) self.model_cooldowns = {} # {model_id: expire_time} # --- API TELEMETRY --- self.total_api_calls = 0 self.logger = logger def _rotate_key(self, retry_after: Optional[float] = None): """ Switch to the next available API key. If retry_after is provided, the current key is put on cooldown. """ if not self.api_keys or len(self.api_keys) <= 1: return False now = time.time() # 1. Mark current key as cooled down if needed if retry_after: self.key_cooldowns[self.api_key] = now + retry_after # 2. Look for the first key that is NOT on cooldown for i in range(1, len(self.api_keys) + 1): next_idx = (self.current_key_idx + i) % len(self.api_keys) next_key = self.api_keys[next_idx] if self.key_cooldowns[next_key] <= now: self.current_key_idx = next_idx self.api_key = next_key print(f" Groq Key Swapped! (Key ending in ...{self.api_key[-4:]} is available)", flush=True) return True # 3. If all are on cooldown, pick the one that expires SHORTEST earliest_key = min(self.key_cooldowns, key=self.key_cooldowns.get) self.current_key_idx = self.api_keys.index(earliest_key) self.api_key = earliest_key print(f" Groq Key Pool Exhausted. Using earliest available key (expires in {self.key_cooldowns[earliest_key] - now:.1f}s)") return True def _get_fallback_model( self, current_model: str, tried_models: Optional[set] = None, role: Optional[str] = None, required_caps: Optional[List[Capability]] = None ) -> str: """ Intelligent Cascading: Capability-Locked Failover -> Precision -> Survival. """ # 1. ATTEMPT SAME-ROLE PEER FAILOVER if role: clean_role = role.replace("_MODEL", "") from app.core.model_registry import model_registry # Find all candidates for this role from Groq provider role_models = [k for k, v in model_registry.MODELS.items() if v.get("provider") == "groq" and v.get("role") == clean_role] for candidate in role_models: if (tried_models is None or candidate not in tried_models) and \ candidate != current_model and \ (candidate not in self.model_cooldowns or self.model_cooldowns[candidate] <= time.time()): # CAPABILITY LOCK CHECK: Ensure candidate supports requirements if required_caps: if not all(model_registry.supports(candidate, c) for c in required_caps): continue print(f" [RELIABILITY] Role-Aware Fallback: {current_model} -> {candidate} (Peer for {clean_role})") return candidate # 2. STANDARD PRECISION CHAIN (Powered by ModelRegistry) try: from app.core.model_registry import model_registry ctx_role = role if role else "SMART_REASONING" chain = model_registry.get_fallback_chain("groq", ctx_role) start_index = 0 if current_model in chain: start_index = chain.index(current_model) + 1 now = time.time() for candidate in chain[start_index:]: if (tried_models is None or candidate not in tried_models) and \ (candidate not in self.model_cooldowns or self.model_cooldowns[candidate] <= now): # CAPABILITY LOCK CHECK if required_caps: if not all(model_registry.supports(candidate, c) for c in required_caps): continue print(f" [RELIABILITY] Registry Chain Fallback: {current_model} -> {candidate}") return candidate except Exception as e: print(f"Fallback Chain Error: {e}") # 3. SURVIVAL RELAXATION (If locked failover failed, grab anything that works) if required_caps: print(f" [WARNING] SOC EMERGENCY: No models available with capabilities {required_caps}. Relaxing constraints...") return self._get_fallback_model(current_model, tried_models, role, required_caps=None) return "llama-3.1-8b-instant" async def _log_rate_limit_telemetry(self, headers: Dict[str, str]): """EXTRACT & TRACK REAL-TIME QUOTAS (Aligned with GroqDocs).""" try: # Limits (Capacity) limit_req = headers.get("x-ratelimit-limit-requests") limit_tok = headers.get("x-ratelimit-limit-tokens") # Remaining (State) rem_req = headers.get("x-ratelimit-remaining-requests") rem_tok = headers.get("x-ratelimit-remaining-tokens") if rem_req: self.remaining_requests = int(rem_req) if rem_tok: self.remaining_tokens = int(rem_tok) self.reset_requests = headers.get("x-ratelimit-reset-requests", self.reset_requests) self.reset_tokens = headers.get("x-ratelimit-reset-tokens", self.reset_tokens) # Smart Alerting: Calculate utilization if limits are available if limit_tok and rem_tok: l_tok = float(limit_tok) r_tok = float(rem_tok) if l_tok > 0 and (r_tok / l_tok) < 0.2: # Less than 20% remaining print(f" [ALERT] SOC ALERT: High Token Load ({int(r_tok)}/{int(l_tok)} TPM left). Reset in {self.reset_tokens}", flush=True) elif self.remaining_tokens < 1000: # Fallback absolute floor print(f" [ALERT] SOC ALERT: Critical Token Quota ({self.remaining_tokens} left).", flush=True) if limit_req and rem_req: l_req = float(limit_req) r_req = float(rem_req) if l_req > 0 and (r_req / l_req) < 0.1: # Less than 10% daily requests remaining print(f" [ALERT] SOC ALERT: Daily Request Limits Critical ({int(r_req)}/{int(l_req)} RPD left).", flush=True) except (ValueError, TypeError): pass async def initialize(self): """No special initialization needed.""" pass def _static_fallback_response(self, role: str) -> LLMResponse: """ Absolute last resort. Returns a pre-defined static response. GUARANTEE: This function NEVER fails. System never crashes. Used when: - All API keys exhausted - All fallback models exhausted - Network completely unavailable - Budget exceeded """ # 🔥 DYNAMIC/HUMAN FALLBACKS (Requirement for Realism) import random static_responses = { "FAST_CHAT": [ "Hmm, ek minute ruko, main check karke bataati hoon...", "Arey, thoda busy hoon abhi... ek second ruko.", "Baad mein baat karte hain? Mera beta thoda pareshaan kar raha hai.", "Haan haan, sun rahi hoon... bas thoda connection problem hai.", "Ji, ek minute... aap thoda line pe wait karo please." ], "SMART_REASONING": ['{"scam_type": "unknown", "confidence": 0.3}'], "STRUCTURED_OUTPUT": ['{"extracted": [], "status": "fallback"}'], "SAFETY_GUARD": ['{"safe": true, "reason": "fallback_mode"}'], "NATURAL_CHAT": [ "Suno, main abhi thode der mein reply karta hoon...", "Arey yaar, internet slow hai... wait karo thoda." ], } role_key = role.replace("_MODEL", "") options = static_responses.get(role_key, ["Processing... please wait."]) content = random.choice(options) self.logger.warning(f" [CRASH-PROOF] Static fallback used for role: {role}") return LLMResponse( content=content, model="static_fallback", reasoning="System in fallback mode due to API exhaustion" ) def _harden_schema_for_strict_mode(self, schema: Dict[str, Any]) -> Dict[str, Any]: """ Recursively hardens a JSON schema for Groq's `strict: true` mode. - Adds `additionalProperties: false` to all objects. - Ensures all defined properties are in the `required` array. - Handles optionality by wrapping types in ["original_type", "null"]. """ if not isinstance(schema, dict): return schema hardened = schema.copy() orig_required = set(hardened.get("required", [])) if hardened.get("type") == "object": # 1. Force additionalProperties: false hardened["additionalProperties"] = False # 2. Process properties props = hardened.get("properties", {}) if props: new_props = {} for k, v in props.items(): prop_schema = self._harden_schema_for_strict_mode(v) # 3. Handle Optionality via Null-Unions # If field was NOT in 'required', we make it nullable but still 'required' by Groq if k not in orig_required: if "type" in prop_schema: current_type = prop_schema["type"] if isinstance(current_type, list): if "null" not in current_type: prop_schema["type"] = current_type + ["null"] elif current_type != "null": prop_schema["type"] = [current_type, "null"] elif "anyOf" in prop_schema: # Add null to anyOf if not already there if not any(item.get("type") == "null" for item in prop_schema["anyOf"]): prop_schema["anyOf"].append({"type": "null"}) new_props[k] = prop_schema hardened["properties"] = new_props # 4. Now all properties MUST be in the required list for Groq Strict Mode hardened["required"] = list(props.keys()) elif hardened.get("type") == "array": if "items" in hardened: hardened["items"] = self._harden_schema_for_strict_mode(hardened["items"]) elif "anyOf" in hardened: hardened["anyOf"] = [self._harden_schema_for_strict_mode(s) for s in hardened["anyOf"]] return hardened async def generate( self, prompt: str, temperature: float = 0.7, max_tokens: int = 500, json_mode: bool = False, model: Optional[str] = None, role: Optional[str] = "CHAT", **kwargs ) -> LLMResponse: """Generate response using Groq.""" if not self.api_key: raise RuntimeError("Groq API key not set") # [REMOVED] Stateful reset. Client is now stateless. # Pop reasoning-specific kwargs reasoning_effort = kwargs.get("reasoning_effort") include_reasoning = kwargs.get("include_reasoning") reasoning_format = kwargs.get("reasoning_format") enabled_tools = kwargs.get("enabled_tools") # Extract context for budget tracking (if passed) context = kwargs.get("context") # ConversationContext for budget_exceeded tracking if not prompt and "messages" not in kwargs: raise ValueError("Either 'prompt' or 'messages' must be provided") source_messages = kwargs.get("messages", [{"role": "user", "content": prompt}]) # --- CACHING OPTIMIZATION (System Separate) --- # Groq caches the entire message list if the prefix matches. # Keeping system instructions in a separate 'system' message is optimal. is_reasoning_model = model_registry.supports(self.model, Capability.REASONING) processed_messages = [] system_content = "" # Separately collect system instructions for msg in source_messages: if msg.get("role") == "system": system_content += msg.get("content", "") + "\n\n" else: processed_messages.append(msg) # Build final message list final_messages = [] if system_content: # Reasoning models often prefer instructions in a single stream or 'developer' role, # but standardizing on a leading 'system' message for prefix caching. final_messages.append({"role": "system", "content": system_content.strip()}) final_messages.extend(processed_messages) # --- PREFILL OPTIMIZATION --- prefill = kwargs.get("prefill") if prefill: # Inject prefilled assistant message to guide output final_messages.append({"role": "assistant", "content": prefill}) # Auto-Stop optimization for code blocks if prefill.strip().startswith("```") and "stop" not in kwargs: kwargs["stop"] = "```" payload = { "model": model or self.model, "messages": final_messages, "temperature": temperature, "max_tokens": max_tokens } if kwargs.get("stop"): payload["stop"] = kwargs["stop"] # ═══════════════════════════════════════════════════════════════════════════ # 🔒 PROMPT CACHE: Token Storm Prevention # Check cache before making LLM call. Same prompt = same response (30-120s TTL) # ═══════════════════════════════════════════════════════════════════════════ session_id = "default" if context and hasattr(context, "session_id"): session_id = context.session_id elif context and hasattr(context, "session") and isinstance(context.session, dict): session_id = context.session.get("session_id", "default") # Build fingerprint from system + history + user message user_msg = prompt or (final_messages[-1].get("content", "") if final_messages else "") cache_fingerprint = prompt_cache.get_fingerprint( system_prompt=system_content, conversation_history=processed_messages[:-1] if len(processed_messages) > 1 else [], user_message=user_msg, role=role or "" ) # Check cache (only for non-tool, non-schema calls) if not enabled_tools and not kwargs.get("schema"): cached_response = prompt_cache.get(cache_fingerprint, session_id) if cached_response: self.logger.info(f"[CACHE HIT] Fingerprint: {cache_fingerprint}, Session: {session_id}") return cached_response # --- RETRY LOGIC (Handling 429s with Key/Model Rotation) --- # ⚡ OPTIMIZATION: STRICT CASCADE LIMIT (Max 2 attempts per request) # Prevents API storms and high latency. max_retries = 2 current_model = model or self.model now = time.time() # Proactively check if the requested model is on a daily quota blacklist if current_model in self.model_cooldowns and self.model_cooldowns[current_model] > now: new_model = self._get_fallback_model(current_model) if new_model != current_model: print(f" [RELIABILITY] {role} PROACTIVE REDIRECT: Model {current_model} is on Quota Cooldown. Using {new_model}...", flush=True) current_model = new_model tried_models = set() # REQUIRED CAPABILITY GATING required_caps = [] if json_mode: required_caps.append(Capability.JSON_OBJECT) if enabled_tools: required_caps.append(Capability.TOOLS) # --- 🛡️ THE FAST_CHAT FIREWALL (Humanization Layer) --- # Implementation of Phase 4 Safety: Isolate human replies from technical logic. is_fast_chat = (role == "FAST_CHAT" or role == "FAST_CHAT_MODEL") if is_fast_chat: # 1. Strip Technical Contaminants json_mode = False enabled_tools = [] required_caps = [] # 2. Pin Humanized Parameters # Temperature >= 0.85 (Humanity/Variety), max_tokens <= 60 (Chat brevity) temperature = max(temperature, 0.85) max_tokens = min(max_tokens, 60) # 3. Clean ALL technical kwargs to prevent 400 errors # Schema & JSON kwargs.pop("schema", None) kwargs.pop("response_format", None) kwargs.pop("json_mode", None) # Reasoning (CRITICAL: These cause 400 on non-reasoning models) kwargs.pop("reasoning_effort", None) kwargs.pop("reasoning_format", None) kwargs.pop("include_reasoning", None) # Reset local vars to None reasoning_effort = None reasoning_format = None include_reasoning = None # Tools kwargs.pop("tools", None) kwargs.pop("tool_choice", None) kwargs.pop("parallel_tool_calls", None) enabled_tools = None # Prefill (can cause schema leakage) kwargs.pop("prefill", None) self.logger.info(f" [FIREWALL] FAST_CHAT pipeline isolated. Temp: {temperature}, Tokens: {max_tokens}") # --- TOKEN RESILIENCE: Predictive Balancing --- from app.utils.token_utils import calculate_payload_tokens model_meta = model_registry.MODELS.get(current_model, {}) max_context = model_meta.get("context_window", 131072) safety_threshold = int(max_context * 0.9) for attempt in range(max_retries): tried_models.add(current_model) # 1. Update Capabilities for current model is_reasoning_model = model_registry.supports(current_model, Capability.REASONING) # 2. Re-build payload from scratch to avoid corruption loop_messages = [m.copy() for m in final_messages] # Predictive Pruning: Remove oldest history messages (after system) if over threshold # Messages: [system, history1, history2, ..., current_user] current_tokens = calculate_payload_tokens(loop_messages) if current_tokens > safety_threshold: self.logger.warning(f" [TOKEN SAFETY] Payload ({current_tokens}) exceeds threshold ({safety_threshold}). Pruning history...") # Keep system (index 0) and the last user message while len(loop_messages) > 3 and calculate_payload_tokens(loop_messages) > safety_threshold: loop_messages.pop(1) # Remove oldest history turn print(f" [TOKEN SAFETY] Pruned payload to {calculate_payload_tokens(loop_messages)} tokens.") payload = { "model": current_model, "messages": loop_messages, "temperature": temperature, "max_tokens": max_tokens } if kwargs.get("stop"): payload["stop"] = kwargs["stop"] # 🔒 COMPOUND CUSTOM: Only for Groq Compound models if enabled_tools: is_compound = "compound" in current_model.lower() if is_compound: payload["compound_custom"] = {"tools": {"enabled_tools": enabled_tools}} else: # For standard models, we ignore enabled_tools as passed here (list of strings) # because standard models require full tool definitions. pass # JSON Mode Handling if json_mode: if model_registry.supports(current_model, Capability.JSON_OBJECT): payload["response_format"] = {"type": "json_object"} # Only append hint if not already present if "json" not in prompt.lower() and "json" not in payload["messages"][-1]["content"].lower(): # Check if user message is last, append to it if payload["messages"][-1]["role"] == "user": payload["messages"][-1]["content"] += "\n\n(Respond in JSON)" else: # Append system instruction if last msg is not user payload["messages"].append({"role": "user", "content": "(Respond in JSON)"}) else: # Raw fallback logic for JSON if "json" not in prompt.lower() and "json" not in payload["messages"][-1]["content"].lower(): if payload["messages"][-1]["role"] == "user": payload["messages"][-1]["content"] += "\n\nCRITICAL: Respond ONLY with a valid JSON object." # Reasoning Optimization if is_reasoning_model: if reasoning_effort: payload["reasoning_effort"] = reasoning_effort # Dynamic Parameter Handling based on Model Family if "qwen" in current_model.lower(): if reasoning_format: payload["reasoning_format"] = reasoning_format else: payload["reasoning_format"] = "parsed" elif "gpt-oss" in current_model.lower(): # GPT-OSS uses include_reasoning=True, NOT reasoning_format if include_reasoning is not None: payload["include_reasoning"] = include_reasoning else: payload["include_reasoning"] = True # Defaults to medium effort if not set, handled by API default else: # Forward compatible default payload["reasoning_format"] = "parsed" # 3. Update headers/key headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } if current_model and current_model.startswith("groq/compound"): headers["Groq-Model-Version"] = "latest" # --- TELEMETRY TRACKING --- self.total_api_calls += 1 print(f" [TELEMETRY] API Call Sequence #{self.total_api_calls} | Target: {current_model} | Role: {role}", flush=True) response = await _shared_client.post( self.base_url, headers=headers, json=payload ) # --- CONTEXT ERROR DETECTION (NON-RECOVERABLE) --- # These errors cannot be fixed by retrying with the same prompt if response.status_code == 400: err_body = response.text.lower() is_context_error = any(x in err_body for x in [ "context length", "too many tokens", "maximum context", "token limit", "context_length_exceeded", "max_tokens" ]) if is_context_error: print(f" [!!!] CONTEXT ERROR: Non-recoverable. Prompt too large for model.", flush=True) if context and hasattr(context, "budget_exceeded"): context.budget_exceeded = True raise BudgetExceeded(f"Context length exceeded - non-recoverable for {current_model}") # [OPTIMIZATION] FAST FAILOVER for Generic 400 (Bad Request) # Don't raise/retry same model. Switch immediately. print(f" [GROQ] 400 Bad Request (Generic). Switching model...", flush=True) new_model = self._get_fallback_model(current_model, tried_models, role=role, required_caps=required_caps) if new_model != current_model: print(f" [GROQ] 400 Fallback: {current_model} -> {new_model}", flush=True) current_model = new_model continue # --- GROQ ERROR TAXONOMY HANDLING (Per Audit) --- # Update rate-limit state from headers on ALL responses update_rate_state(current_model, dict(response.headers)) # [413] PAYLOAD TOO LARGE - Truncate and retry if response.status_code == 413: error_type = classify_groq_error(413) recovery = get_recovery_action(error_type, current_model) print(f" [GROQ] 413 Payload Too Large. Triggering smart_truncate...", flush=True) # Truncate messages by removing older history if len(loop_messages) > 2: loop_messages = [loop_messages[0], loop_messages[-1]] # Keep system + last user print(f" [GROQ] Truncated to {len(loop_messages)} messages") continue else: raise RuntimeError("413: Cannot truncate further - minimal payload still too large") # [422] SEMANTIC MODEL FAILURE - Retry once with lower temp, then fallback if response.status_code == 422: error_type = classify_groq_error(422) recovery = get_recovery_action(error_type, current_model) print(f" [GROQ] 422 Semantic Failure. Retrying with lower temperature...", flush=True) if attempt == 0: # First attempt: retry with lower temperature temperature = max(0.1, temperature * 0.5) continue else: # Second attempt: fallback model new_model = self._get_fallback_model(current_model, tried_models, role=role, required_caps=required_caps) if new_model != current_model: print(f" [GROQ] 422 Fallback: {current_model} -> {new_model}", flush=True) current_model = new_model continue # [498] FLEX CAPACITY EXCEEDED - Delay and switch model tier if response.status_code == 498: error_type = classify_groq_error(498) recovery = get_recovery_action(error_type, current_model) print(f" [GROQ] 498 Flex Capacity Exceeded. Switching model tier...", flush=True) self.model_cooldowns[current_model] = time.time() + 300 # 5 min cooldown new_model = self._get_fallback_model(current_model, tried_models, role=role, required_caps=required_caps) if new_model != current_model: print(f" [GROQ] 498 Model Tier Switch: {current_model} -> {new_model}", flush=True) current_model = new_model await asyncio.sleep(recovery["delay_seconds"]) continue if response.status_code == 429: err_body = response.text.lower() is_daily_limit = "tokens per day" in err_body or "requests per day" in err_body retry_after_str = response.headers.get("retry-after") retry_after = float(retry_after_str) if retry_after_str else None # FIX: Try key rotation FIRST - keys may be from different accounts! # Only skip to model fallback if key rotation fails. model_meta = model_registry.MODELS.get(current_model, {}) tpm_limit = model_meta.get("tpm", 6000) estimated_tokens = sum(len(m.get("content", "")) for m in loop_messages) / 3.5 # 1. ALWAYS try key rotation first (keys from different accounts have separate quotas) if self._rotate_key(retry_after): reason = "DAILY_LIMIT" if is_daily_limit else "RATE_LIMIT" # [OPTIMIZATION] Don't sleep if we have a fresh key! print(f" [RELIABILITY] {reason} hit - Rotated to Key ...{self.api_key[-4:]} (Immediate Retry)", flush=True) await asyncio.sleep(0.1) continue # 2. Key Pool Exhausted - All keys on cooldown, cascade to fallback model # Only skip rotation if request is huge (>50% TPM) - unlikely to help any key should_cascade = (estimated_tokens > (tpm_limit * 0.5)) new_model = self._get_fallback_model(current_model, tried_models, role=role, required_caps=required_caps) if new_model != current_model: reason_msg = "DAILY QUOTA REACHED (All Keys)" if is_daily_limit else "Key Pool Exhausted" print(f" [RELIABILITY] {role} ALERT: {reason_msg}. Cascading: {current_model} -> {new_model}", flush=True) if is_daily_limit: self.model_cooldowns[current_model] = now + 600 current_model = new_model self.current_key_idx = 0 self.api_key = self.api_keys[0] # Update capability gating for the new model in the next loop continue await asyncio.sleep(retry_after or 1.0) continue response.raise_for_status() break else: # === CRASH-PROOF GUARANTEE === # Instead of raising, return static response. System NEVER crashes. print(f" [CRASH-PROOF] All retries exhausted for role {role}. Using static fallback.", flush=True) return self._static_fallback_response(role) # --- RATE LIMIT TELEMETRY --- await self._log_rate_limit_telemetry(response.headers) data = response.json() # Cache Hit Telemetry usage = data.get("usage", {}) cached_tokens = usage.get("prompt_tokens_details", {}).get("cached_tokens", 0) if cached_tokens > 0: msg = f" CACHE HIT: Reused {cached_tokens} tokens! (Speedup Active)" print(msg.encode('ascii', errors='replace').decode('ascii'), flush=True) # --- REASONING EXTRACTION --- message = data["choices"][0]["message"] content = message.get("content", "") reasoning = message.get("reasoning") # --- EXECUTED TOOLS EXTRACTION (Compound AI) --- executed_tools = message.get("executed_tools") self.last_executed_tools = executed_tools if executed_tools: log_msg = f" [AGENT LOG] Compound tools used: {executed_tools}" # Ensure safe printing on Windows terminal print(log_msg.encode('ascii', errors='replace').decode('ascii'), flush=True) # Handle 'raw' format where tags are in content if not reasoning and "" in content and "" in content: think_match = re.search(r"(.*?)", content, re.DOTALL) if think_match: reasoning = think_match.group(1).strip() # Optionally strip think tags from content if requested or by default content = re.sub(r".*?", "", content, flags=re.DOTALL).strip() self.last_reasoning = reasoning if reasoning: print(f" [BRAIN] NATIVE REASONING CAPTURED ({len(reasoning)} tokens) | Source: {current_model}", flush=True) # --- COMPOUND SYSTEM TELEMETRY (Forensic Audit) --- executed_tools = message.get("executed_tools") if executed_tools: print(f" [ACTIONS] COMPOUND ACTIONS: {len(executed_tools)} tools executed.") for tool in executed_tools: print(f" -> Tool: {tool.get('type','unknown')} | Args: {tool.get('arguments')}") usage_breakdown = data.get("usage_breakdown") if usage_breakdown: print(f" [USAGE] USAGE BREAKDOWN (Multi-Model Orchestration):") for entry in usage_breakdown.get("models", []): m_name = entry.get("model") m_usage = entry.get("usage", {}) print(f" -> {m_name}: {m_usage.get('total_tokens')} tokens in {m_usage.get('total_time', 0):.2f}s") # Re-assemble the full response if prefilled if prefill: content = prefill + content # ═══════════════════════════════════════════════════════════════════════════ # 🔒 CACHE STORAGE: Store successful response for future reuse # ═══════════════════════════════════════════════════════════════════════════ final_response = LLMResponse( content=content, reasoning=reasoning, usage=usage, executed_tools=executed_tools, model=current_model ) # Only cache non-tool, non-schema responses if not enabled_tools and not kwargs.get("schema"): prompt_cache.set( fingerprint=cache_fingerprint, session_id=session_id, response=final_response, model=current_model, ttl=60 # 60 second TTL ) self.logger.info(f"[CACHE SET] Fingerprint: {cache_fingerprint[:8]}... | Model: {current_model}") return final_response async def generate_structured( self, prompt: str, schema: Dict[str, Any], model: str = "openai/gpt-oss-20b", temperature: float = 0.1, schema_name: str = "structured_data_extraction", reasoning_effort: Optional[str] = None, include_reasoning: Optional[bool] = None, reasoning_format: Optional[str] = None, role: str = "STRUCTURED", **kwargs ) -> LLMResponse: """ Produce STRICT schema-compliant JSON using Groq Structured Outputs or JSON Mode. """ if not self.api_key: raise RuntimeError("Groq API key not set") # [REMOVED] Stateful reset # DYNAMIC CAPABILITY DETECTION (Powered by ModelRegistry) # Check if the PROVIDED model supports specific tech is_strict_model = model_registry.supports(model, Capability.STRICT_MODE) is_schema_model = is_strict_model or model_registry.supports(model, Capability.JSON_SCHEMA) # 0. SMART CONTEXT CHECK (Handled by LLMClient Switchboard) current_model = model now = time.time() # Proactively check if the requested model is on a daily quota blacklist if current_model in self.model_cooldowns and self.model_cooldowns[current_model] > now: new_model = self._get_fallback_model(current_model) if new_model != current_model: print(f" [RELIABILITY] {role} PROACTIVE REDIRECT: Model {current_model} is on Quota Cooldown. Using {new_model}...", flush=True) current_model = new_model if not prompt and "messages" not in kwargs: raise ValueError("Either 'prompt' or 'messages' must be provided") source_messages = kwargs.get("messages", [{"role": "user", "content": prompt}]) # --- CACHING OPTIMIZATION --- processed_messages = [] system_content = "" for msg in source_messages: if msg.get("role") == "system": system_content += msg.get("content", "") + "\n\n" else: processed_messages.append(msg) final_messages = [] if system_content: final_messages.append({"role": "system", "content": system_content.strip()}) final_messages.extend(processed_messages) # --- RETRY LOGIC (Handling 429s with Key/Model Rotation) --- max_retries = max(5, len(self.api_keys) * 3) current_model = model tried_models = set() # REQUIRED CAPABILITY GATING for Structured Output required_caps = [Capability.JSON_SCHEMA] # Base requirement for attempt in range(max_retries): # 1. Update Capabilities for current model is_strict_model = model_registry.supports(current_model, Capability.STRICT_MODE) is_schema_model = is_strict_model or model_registry.supports(current_model, Capability.JSON_SCHEMA) is_reasoning_model = model_registry.supports(current_model, Capability.REASONING) tried_models.add(current_model) # 2. Re-build payload from scratch to avoid corruption loop_messages = [m.copy() for m in final_messages] payload = { "model": current_model, "messages": loop_messages, "temperature": temperature, } # Structured Output Parameters if is_schema_model: final_schema = schema if is_strict_model: final_schema = self._harden_schema_for_strict_mode(schema) payload["response_format"] = { "type": "json_schema", "json_schema": { "name": schema_name, "strict": is_strict_model, "schema": final_schema } } elif model_registry.supports(current_model, Capability.JSON_OBJECT): payload["response_format"] = {"type": "json_object"} if "json" not in prompt.lower(): payload["messages"][0]["content"] += "\n\nCRITICAL: Respond ONLY with a valid JSON object matching the requested schema." else: # Raw Fallback if "json" not in prompt.lower(): payload["messages"][0]["content"] += "\n\nCRITICAL: Respond ONLY with a valid JSON object matching the requested schema." # Reasoning Optimization if is_reasoning_model: if reasoning_effort: payload["reasoning_effort"] = reasoning_effort if reasoning_format: payload["reasoning_format"] = reasoning_format elif include_reasoning is not None: payload["include_reasoning"] = include_reasoning elif "gpt-oss" not in current_model.lower(): payload["reasoning_format"] = "parsed" # Update headers with rotated key headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } # --- TELEMETRY TRACKING --- self.total_api_calls += 1 print(f" [TELEMETRY] API Call Sequence #{self.total_api_calls} | Target: {current_model} | Role: {role}", flush=True) response = await _shared_client.post( self.base_url, headers=headers, json=payload ) if response.status_code == 429: err_body = response.text.lower() is_daily_limit = "tokens per day" in err_body or "requests per day" in err_body # Structured Scalability Check model_meta = model_registry.MODELS.get(current_model, {}) tpm_limit = model_meta.get("tpm", 6000) estimated_tokens = sum(len(m.get("content", "")) for m in loop_messages) / 3.5 should_escalate = is_daily_limit or (estimated_tokens > (tpm_limit * 0.5)) retry_after_str = response.headers.get("retry-after") retry_after = float(retry_after_str) if retry_after_str else None if not should_escalate and self._rotate_key(retry_after): # [OPTIMIZATION] Key rotated successfully - minimal safety delay await asyncio.sleep(0.1) continue # 2. Key Pool Exhausted or Daily Limit - Cascading Failover new_model = self._get_fallback_model(current_model, tried_models, role=role, required_caps=required_caps) if new_model != current_model: reason_msg = "DAILY QUOTA REACHED" if is_daily_limit else "Key Pool Exhausted" print(f" [RELIABILITY] {role} ALERT: {reason_msg}. Cascading: {current_model} -> {new_model}", flush=True) if is_daily_limit: # Blacklist the model for 10 minutes session-wide self.model_cooldowns[current_model] = time.time() + 600 current_model = new_model self.current_key_idx = 0 self.api_key = self.api_keys[0] continue await asyncio.sleep(retry_after or 1.0) continue # --- SCHEMA MISMATCH RETRY --- # If best-effort mode (strict: false) returned a 400, retry as per Groq docs if response.status_code == 400 and not is_strict_model: if "Generated JSON does not match the expected schema" in response.text: import random wait = 1.0 + random.uniform(0, 0.5) print(f" [SOC] Groq Structured 400 (Schema Mismatch) on {current_model} - Retrying in {wait:.2f}s... (Attempt {attempt+1})") await asyncio.sleep(wait) continue response.raise_for_status() break else: response.raise_for_status() # Final attempt failure if response.status_code != 200: print(f" Groq Structured Error [{model}]: {response.text}") response.raise_for_status() # --- RATE LIMIT TELEMETRY --- await self._log_rate_limit_telemetry(response.headers) data = response.json() # Cache Hit Telemetry usage = data.get("usage", {}) cached_tokens = usage.get("prompt_tokens_details", {}).get("cached_tokens", 0) if cached_tokens > 0: print(f" CACHE HIT: Reused {cached_tokens} tokens!") message = data["choices"][0]["message"] content = message.get("content", "") reasoning = message.get("reasoning") # --- EXECUTED TOOLS EXTRACTION (Compound AI) --- executed_tools = message.get("executed_tools") self.last_executed_tools = executed_tools if executed_tools: print(f" [AGENT LOG] Compound tools used: {executed_tools}", flush=True) self.last_reasoning = reasoning if reasoning: print(f" [BRAIN] NATIVE STRUCTURED REASONING CAPTURED.") # ATOMIC SANITIZATION: Handle common LLM output formats if isinstance(content, str): content = content.strip() try: from app.utils.json_utils import robust_json_loads result = robust_json_loads(content) if result is None: raise ValueError("No JSON found in response") # Inject native reasoning into the result DICT but also return it in LLMResponse if isinstance(result, dict) and reasoning: result["_native_reasoning"] = reasoning return LLMResponse( content=json.dumps(result), # Normalized content reasoning=reasoning, usage=usage, executed_tools=executed_tools, model=current_model ) except Exception as e: if isinstance(content, str) and content.startswith('"') and content.endswith('"'): return LLMResponse(content=content.strip('"'), model=current_model) raise ValueError(f"Failed to parse structured output: {content[:100]}") from e async def generate_tool_call( self, prompt: str, tools: list[Dict[str, Any]], model: Optional[str] = None, temperature: float = 0.0, parallel_tool_calls: bool = True ) -> Optional[list[Dict[str, Any]]]: """ Hardened Groq Native Tool Use (Feb 2026 Compliant). Includes temperature scaling on retry for 400 errors. """ if not self.api_key: return None target_model = model or "llama-3.3-70b-versatile" current_temp = temperature max_retries = 3 # Detect Parallel Capability supports_parallel = model_registry.supports(target_model, Capability.PARALLEL_TOOLS) for attempt in range(max_retries): try: payload = { "model": target_model, "messages": [{"role": "user", "content": prompt}], "tools": tools, "tool_choice": "auto", "temperature": current_temp, "parallel_tool_calls": parallel_tool_calls if supports_parallel else False } response = await _shared_client.post( self.base_url, headers={"Authorization": f"Bearer {self.api_key}"}, json=payload ) # --- RATE LIMIT TELEMETRY --- await self._log_rate_limit_telemetry(response.headers) if response.status_code == 400: # Potential invalid tool call schema/generation if attempt < max_retries - 1: current_temp = min(current_temp + 0.2, 1.0) continue response.raise_for_status() data = response.json() message = data["choices"][0]["message"] return message.get("tool_calls") except Exception as e: if attempt == max_retries - 1: print(f" [!] Tool call generation failed after {max_retries} attempts: {e}") return None return None async def check_connectivity(self) -> bool: """Verify API key validity.""" if not self.api_key: return False try: res = await _shared_client.post( self.base_url, headers={"Authorization": f"Bearer {self.api_key}"}, json={"model": self.model, "messages": [{"role": "user", "content": "hi"}], "max_tokens": 1}, timeout=5.0 ) return res.status_code == 200 except: return False class OpenRouterClient(BaseLLMClient): """ OpenRouter client - Access to many models with one API key. """ def __init__(self): # MULTI-KEY SUPPORT: Parse comma-separated keys for load balancing raw_keys = settings.OPENROUTER_API_KEY or "" self.api_keys = [k.strip() for k in raw_keys.split(",") if k.strip()] self.current_key_idx = 0 self.api_key = self.api_keys[0] if self.api_keys else None self.model = settings.OPENROUTER_MODEL self.base_url = "https://openrouter.ai/api/v1/chat/completions" def _rotate_key(self): """Switch to the next available API key.""" if not self.api_keys or len(self.api_keys) <= 1: return False self.current_key_idx = (self.current_key_idx + 1) % len(self.api_keys) self.api_key = self.api_keys[self.current_key_idx] print(f" OpenRouter Key Swapped! (Key ending in ...{self.api_key[-4:]})") return True async def generate(self, prompt: str, **kwargs) -> str: """Generate response using OpenRouter with rotation.""" if not self.api_key: raise RuntimeError("OpenRouter API key not set") max_retries = max(3, len(self.api_keys)) temperature = kwargs.get("temperature", settings.LLM_TEMPERATURE) max_tokens = kwargs.get("max_tokens", settings.LLM_MAX_TOKENS) for attempt in range(max_retries): try: payload = { "model": self.model, "messages": [{"role": "user", "content": prompt}], "temperature": temperature, "max_tokens": max_tokens } response = await _shared_client.post( self.base_url, headers={ "Authorization": f"Bearer {self.api_key}", "HTTP-Referer": "https://huggingface.co/spaces", "X-Title": "Scam Honeypot" }, json=payload ) if response.status_code == 429: if self._rotate_key(): continue response.raise_for_status() data = response.json() return data["choices"][0]["message"]["content"] except Exception as e: if attempt == max_retries - 1: raise e await asyncio.sleep(1.0 * (attempt + 1)) return "" async def generate_structured(self, prompt: str, schema: Dict[str, Any], model: Optional[str] = None, **kwargs) -> LLMResponse: """Structured output for OpenRouter.""" prompt_with_json = f"{prompt}\n\nReturn response as valid JSON matching this schema: {json.dumps(schema)}" content = await self.generate(prompt_with_json, **kwargs) return LLMResponse(content=content, model=model or self.model) async def check_connectivity(self) -> bool: """Verify API key validity.""" if not self.api_key: return False try: res = await _shared_client.post( self.base_url, headers={"Authorization": f"Bearer {self.api_key}"}, json={"model": self.model, "messages": [{"role": "user", "content": "hi"}], "max_tokens": 1}, timeout=5.0 ) return res.status_code == 200 except: return False class MockLLMClient(BaseLLMClient): """Mock LLM client for when no API keys are available.""" async def generate(self, prompt: str, **kwargs) -> str: """Return mock response with JSON stability.""" prompt_lower = prompt.lower() # 1. Detection Prompt if "is_scam" in prompt_lower and "scam_type" in prompt_lower: return json.dumps({ "is_scam": True, "scam_type": "banking_scam", "confidence": 0.85, "threat_level": "high", "intent": "money_theft", "reasoning": "Mock: Highly suspicious banking request detected in patterns.", "risk_indicators": ["Mock: Urgency", "Mock: Payment Request"] }) # 2. Intelligence Extraction Prompt if "phone_numbers" in prompt_lower and "upi_ids" in prompt_lower: return json.dumps({ "phone_numbers": ["+91-9876543210"], "upi_ids": ["scammer@ybl"], "bank_accounts": [], "urls": ["http://fake-bank.site"], "crypto_addresses": [], "ifsc_codes": [], "pan_cards": [], "aadhar_numbers": [] }) if "selected_persona_key" in prompt_lower: return json.dumps({ "selected_persona_key": "elderly_excited", "reasoning": "Mock: Matches high excitement in message.", "victim_profile": {"bank": "HDFC"} }) # 4. Fallback PERSONA Response (Realistic Hinglish for honeypot) import random persona_fallbacks = [ "Haan ji, main sun raha hoon... aap kaun bol rahe ho?", "Beta, aap kon ho? Mujhe samajh nahi aa raha...", "Arey, mera phone pe kya ho gaya? Aap kiske taraf se baat kar rahe ho?", "Main thoda busy hoon abhi, baad mein call karo na.", "Haan haan, bol rahe ho toh suno... lekin jaldi bolo.", "Aapka number mere phone pe save nahi hai. Kaun bol rahe ho?", "Main abhi samajh nahi paa raha, thoda clearly bolo please.", "OTP? Kaunsa OTP? Mujhe koi OTP nahi aaya..." ] return random.choice(persona_fallbacks) async def generate_structured(self, prompt: str, schema: Dict[str, Any], model: Optional[str] = None, **kwargs) -> LLMResponse: """Structured output for Mock client.""" content = await self.generate(prompt, **kwargs) return LLMResponse(content=content, model="mock") # 4. Fallback Generic Response (Anti-Loop) import random defaults = [ "Main abhi busy hoon, baad mein baat karte hain.", "Phone pe baat nahi ho paayegi abhi.", "Aap kaun bol rahe hain?", "Mere paas abhi time nahi hai.", "Main abhi drive kar raha hoon." ] return random.choice(defaults) async def check_connectivity(self) -> bool: return True class LLMClient: """ Unified LLM client with provider switching and fallback. Supports: - OpenAI GPT-4 Turbo - Anthropic Claude 3 - Groq Llama 3.1 70B (FAST & FREE!) - OpenRouter (multiple models) - Mock client (fallback) """ def __init__(self): self.primary: Optional[BaseLLMClient] = None self.fallback: Optional[BaseLLMClient] = None self.mock = MockLLMClient() self.initialized = False self.provider_name = "none" @property def is_available(self) -> bool: """Check if any LLM provider is available.""" return self.primary is not None async def initialize(self) -> None: """Initialize LLM clients based on configuration.""" provider = settings.LLM_PROVIDER.lower() # Initialize based on provider preference if provider == "groq" and settings.GROQ_API_KEY: self.primary = GroqClient() await self.primary.initialize() self.provider_name = "groq" elif provider == "openrouter" and settings.OPENROUTER_API_KEY: self.primary = OpenRouterClient() await self.primary.initialize() self.provider_name = "openrouter" elif provider == "openai" and settings.OPENAI_API_KEY: self.primary = OpenAIClient() await self.primary.initialize() self.provider_name = "openai" elif provider == "anthropic" and settings.ANTHROPIC_API_KEY: self.primary = AnthropicClient() await self.primary.initialize() self.provider_name = "anthropic" # Try to set up any available fallback if settings.GROQ_API_KEY and self.provider_name != "groq": self.fallback = GroqClient() await self.fallback.initialize() elif settings.OPENAI_API_KEY and self.provider_name != "openai": self.fallback = OpenAIClient() await self.fallback.initialize() self.initialized = True if self.primary: is_valid = await self.primary.check_connectivity() if not is_valid: print(f"WARNING: {self.provider_name.upper()} API key is INVALID or EXPIRED.") print(f"INFO: Sentinel is falling back to MOCK mode for safety.") self.primary = None # Fallback else: print(f"\n" + "="*60) print(f"SENTINEL ADAPTIVE ROLE SWITCHBOARD (ONLINE)") print(f" Provider: {self.provider_name.upper()}") # List key roles and their registry assignments for role in [ModelRole.SMART_REASONING, ModelRole.FAST_CHAT, ModelRole.STRUCTURED_OUTPUT]: m, reason = self._switchboard(role) caps = [c.value for c in model_registry.get_capabilities(m)] cap_str = f"[{', '.join(caps)}]" if caps else "[standard]" print(f" [{role.name}] -> {m} {cap_str}") print("="*60 + "\n") else: print("No LLM API key configured - using keyword detection + internal patterns") if not settings.GROQ_API_KEY and not settings.OPENROUTER_API_KEY: print("Tip: Add GROQ_API_KEY to your environment/secrets to enable high-intelligence agents.") def _get_subclass_static_fallback(self, role: str = "FAST_CHAT") -> LLMResponse: """ Crash-proof static fallback for budget exceeded scenarios. Delegates to underlying provider if available, otherwise uses defaults. GUARANTEE: This function NEVER fails. """ if self.primary and hasattr(self.primary, "_static_fallback_response"): return self.primary._static_fallback_response(role) # Fallback if no provider available import random static_responses = { "FAST_CHAT": [ "Hmm, ek minute ruko...", "Wait karo thoda, connection issues hain...", "Aap bolo, main sun rahi hoon..." ], "SMART_REASONING": ['{"scam_type": "unknown", "confidence": 0.3}'], "STRUCTURED_OUTPUT": ['{"extracted": [], "status": "fallback"}'], "SAFETY_GUARD": ['{"safe": true, "reason": "fallback_mode"}'], } role_key = role.replace("_MODEL", "") options = static_responses.get(role_key, ["Processing... please wait."]) content = random.choice(options) return LLMResponse( content=content, model="static_fallback", reasoning="System in fallback mode due to budget exhaustion" ) def _switchboard(self, role: ModelRole, prompt_text: str = "") -> tuple[str, str]: """ Adaptive Model Selector powered by ModelRegistry. Detects prompt length and routes to high-context models if needed. """ provider = self.provider_name role_name = role.name # 1. Check for manual override in settings setting_map = { ModelRole.SAFETY_GUARD: "GROQ_SAFETY_MODEL", ModelRole.STRUCTURED_OUTPUT: "GROQ_STRUCTURED_MODEL", ModelRole.SMART_REASONING: "GROQ_SMART_MODEL", ModelRole.FAST_CHAT: "GROQ_FAST_MODEL" } override_attr = setting_map.get(role) if override_attr: val = getattr(settings, override_attr, None) if val: return val, f"Manual override ({override_attr})" # 2. ADAPTIVE ROUTING (Context + Throughput) estimated_tokens = len(prompt_text) // 3.5 # 3. Dynamic Registry Lookup model_id = model_registry.get_preferred_model(provider, role_name) model_meta = model_registry.MODELS.get(model_id, {}) # --- CONTEXT WINDOW SAFETY CHECK --- window = model_meta.get("context_window", 131072) # If payload > 80% of window, force high-context model if estimated_tokens > (window * 0.8): wide_model = "moonshotai/kimi-k2-instruct-0905" # 262k window champion if model_registry.MODELS.get(wide_model) and wide_model != model_id: return wide_model, f"Adaptive Context Switch (Prompt ~{int(estimated_tokens)} tokens > 80% of {model_id} window)" # --- THROUGHPUT SAFETY CHECK (TPM/TPD Protection) --- tpm_limit = model_meta.get("tpm", 6000) tpd_limit = model_meta.get("tpd", 200000) # PROACTIVE ESCALATION: # If estimated tokens > 60% of TPM OR > 10% of Daily Quota (TPD) in a single call, # route to a high-capacity "Workhorse" model immediately. if estimated_tokens > (tpm_limit * 0.6) or estimated_tokens > (tpd_limit * 0.1): # Select appropriate Workhorse based on provider if provider == "groq": workhorse = "meta-llama/llama-4-maverick-17b-128e-instruct" # 500k TPD if role == ModelRole.FAST_CHAT: workhorse = "meta-llama/llama-4-scout-17b-16e-instruct" # 30K TPM if model_registry.MODELS.get(workhorse) and workhorse != model_id: return workhorse, f"Proactive Throughput Switch (Dense Task: ~{int(estimated_tokens)} tokens)" reason = f"Registry-optimized for {role_name} on {provider}" return model_id, reason def _log_switchboard(self, role: ModelRole, model: str, reason: str): """Mandatory SOC Audit Logging.""" print(f"\n[MODEL_SELECTED]: {role.value}") print(f"[REASON]: {reason} -> {model}") async def generate( self, prompt: str, role: ModelRole = ModelRole.FAST_CHAT, temperature: Optional[float] = None, max_tokens: Optional[int] = None, context: Optional[Any] = None, # NEW: Pass TurnContext **kwargs ) -> str: """ Generate text with SOC Switchboard routing. Returns ONLY the string content. """ res = await self.generate_res(prompt, role, temperature, max_tokens, context=context, **kwargs) return str(res) async def generate_res( self, prompt: str, role: ModelRole = ModelRole.FAST_CHAT, temperature: Optional[float] = None, max_tokens: Optional[int] = None, context: Optional[Any] = None, # NEW: Pass TurnContext **kwargs ) -> LLMResponse: """ Generate response and return full LLMResponse object (Stateless). """ # --- GLOBAL BUDGET GATE (PRODUCTION HARDENING) --- if context and hasattr(context, "llm_call_count"): MAX_PER_TURN = 5 # 🔥 SYNCED BUDGET (Increased from 1 to allow multi-agent reasoning) MAX_PER_SESSION = 50 # Hard session limit (allows ~25 turn scam sessions) # 1. Check TURN budget if context.llm_call_count >= MAX_PER_TURN: print(f" [CRASH-PROOF] Turn budget ({MAX_PER_TURN}) reached. Using static fallback.") if hasattr(context, "budget_exceeded"): context.budget_exceeded = True # CRASH-PROOF: Return static response instead of raising return self._get_subclass_static_fallback(role=kwargs.get("role", "FAST_CHAT")) # 2. Check SESSION budget (if session object is available) if hasattr(context, "session") and context.session: session_calls = context.session.get("session_llm_calls", 0) if session_calls >= MAX_PER_SESSION: print(f" [CRASH-PROOF] Session budget ({MAX_PER_SESSION}) reached. Using static fallback.") if hasattr(context, "budget_exceeded"): context.budget_exceeded = True # CRASH-PROOF: Return static response instead of raising return self._get_subclass_static_fallback(role=kwargs.get("role", "FAST_CHAT")) # Increment budget BEFORE the call to ensure atomicity in execution flow context.llm_call_count += 1 print(f" [SOC] LLM Turn Budget: {context.llm_call_count}/{MAX_PER_TURN}") # Avoid duplicate role in kwargs if passed from wrapper kwargs.pop("role", None) # Override model if explicitly passed in kwargs model_override = kwargs.pop("model", None) model, reason = self._switchboard(role, prompt_text=prompt) if model_override: model = model_override reason = "Manual override (kwargs.model)" self._log_switchboard(role, model, reason) temp = temperature if temperature is not None else settings.LLM_TEMPERATURE tokens = max_tokens if max_tokens is not None else settings.LLM_MAX_TOKENS # --- PROMPT CACHE CHECK (Token Storm Prevention) --- # Generate fingerprint for this prompt (SHA256-based) session_id = getattr(context, "conversation_id", "default") if context else "default" cache_fingerprint = prompt_cache.get_fingerprint( system_prompt="", # System prompt handled inside provider conversation_history=[], user_message=prompt, role=role.name ) # Check cache BEFORE making expensive LLM call cached_response = prompt_cache.get(cache_fingerprint, session_id) if cached_response: print(f" [CACHE HIT] Reusing response for fingerprint {cache_fingerprint[:8]}... (Token Storm Prevented)") return cached_response # Try primary provider if self.primary: try: # Use explicit model parameter instead of stateful swapping (Thread-Safe) res = await self.primary.generate( prompt, temperature=temp, max_tokens=tokens, model=model, role=role.name, **kwargs ) # If primary returns a string (legacy), wrap it if isinstance(res, str): response = LLMResponse(content=res, model=model) else: response = res # --- CACHE SET (Store successful response) --- prompt_cache.set(cache_fingerprint, session_id, response, model=model, ttl=60) return response except Exception as e: print(f" Primary Role {role.value} Failed: {e}") # Automatic Fallback if self.fallback: fb_model, fb_reason = self._switchboard(ModelRole.FALLBACK) self._log_switchboard(ModelRole.FALLBACK, fb_model, fb_reason) try: res = await self.fallback.generate( prompt, temperature=temp, max_tokens=tokens, model=fb_model ) if isinstance(res, str): return LLMResponse(content=res, model=fb_model) return res except Exception as e: print(f" Fallback Failed: {e}") # Mock Fallback (Stateless) mock_content = await self.mock.generate(prompt) return LLMResponse(content=mock_content, model="mock") async def generate_fast(self, prompt: str, context: Optional[Any] = None, **kwargs) -> str: """Use Fast Model role for chat/realtime.""" return await self.generate(prompt, role=ModelRole.FAST_CHAT, context=context, **kwargs) async def generate_smart(self, prompt: str, schema: Optional[Dict[str, Any]] = None, context: Optional[Any] = None, **kwargs) -> LLMResponse: """Use Smart Model role for reasoning/extraction.""" if schema: return await self.generate_structured(prompt, schema, role=ModelRole.SMART_REASONING, context=context, **kwargs) return await self.generate_res(prompt, role=ModelRole.SMART_REASONING, context=context, **kwargs) async def close(self) -> None: """Cleanup resources.""" await _shared_client.aclose() async def check_safety(self, prompt: str) -> bool: """ GUARDRAIL (Legacy): Check prompt for malicious intent using Llama Guard. Returns: True if SAFE, False if UNSAFE. """ if not isinstance(self.primary, GroqClient): return True # Skip if not on Groq try: # Pass safety model explicitly to generate (Thread-Safe) safety_model = settings.GROQ_SAFETY_MODEL # Call Llama Guard (Raw text mode, no JSON) res = await self.generate( prompt, role=ModelRole.SAFETY_GUARD, temperature=0.0, max_tokens=10 ) if "unsafe" in res.lower(): print(f"SECURITY ALERT: Prompt Injection Blocked! Content: {prompt[:50]}...") return False return True except Exception as e: print(f"Safety Check Failed: {e}") return True # Fail open to avoid blocking valid traffic on error async def check_safeguard(self, prompt: str, context: Optional[Any] = None, **kwargs) -> bool: """ Policy-following Safety Check using GPT-OSS-Safeguard 20B. Optimized for BYO-Policy classification. """ if not self.initialized: await self.initialize() if not self.primary: return True try: # 1. Prepare Policy from app.core.prompts import PROMPT_INJECTION_POLICY formatted_policy = PROMPT_INJECTION_POLICY.replace("{{USER_INPUT}}", prompt) # 2. Extract structured violation data # Force GPT-OSS-Safeguard 20B for Policy Following target_model = "openai/gpt-oss-safeguard-20b" # Use generate_structured but override with Safety Role res = await self.generate_structured( prompt=formatted_policy, schema={ "type": "object", "properties": { "violation": {"type": "integer"}, "category": {"type": ["string", "null"]}, "rationale": {"type": "string"} }, "required": ["violation", "rationale"] }, model=target_model, context=kwargs.get("context") # Pass context if available ) if res.get("violation") == 1: print(f"\n[!!!] SAFEGUARD BLOCKED (GPT-OSS) [!!!]") print(f"Category: {res.get('category')}") print(f"Rationale: {res.get('rationale')}") return False return True except Exception as e: print(f"GPT-OSS Safeguard Check Failed: {e}") return True async def check_llama_guard(self, prompt: str) -> bool: """ Multimodal binary safety check using Llama Guard 4. Optimized for 14.4K RPD high-throughput screening. NO system message used as per documentation. """ if not self.initialized: await self.initialize() if not self.primary: return True try: target_model = "meta-llama/llama-guard-4-12b" # Llama Guard 4 documentation: Just run the message as user # It responds with 'safe' or 'unsafe\nCATEGORY' res = await self.generate( prompt=prompt, model=target_model, temperature=0.0 # Force deterministic output ) clean_res = res.strip().lower() if clean_res.startswith("unsafe"): print(f"\n[!!!] LLAMA GUARD BLOCKED [!!!]") print(f"Raw Output: {res}") return False return True except Exception as e: print(f"Llama Guard Check Failed: {e}") return True async def generate_structured( self, prompt: str, schema: Dict[str, Any], model: Optional[str] = None, context: Optional[Any] = None, # NEW: Pass TurnContext **kwargs ) -> LLMResponse: """ Produce STRICT JSON output using STRUCTURED_OUTPUT_MODEL role. """ # --- GLOBAL BUDGET GATE --- if context and hasattr(context, "llm_call_count"): MAX_PER_TURN = 5 # 🔥 SYNCED BUDGET if context.llm_call_count >= MAX_PER_TURN: print(f" [!!!] BUDGET EXCEEDED (Structured): Turn budget reached.") raise BudgetExceeded(f"LLM Budget of {MAX_PER_TURN} calls per turn exceeded.") context.llm_call_count += 1 print(f" [SOC] LLM Turn Budget (Structured): {context.llm_call_count}/{MAX_PER_TURN}") # Avoid duplicate role / model in kwargs kwargs.pop("role", None) model_override = kwargs.pop("model", None) role = ModelRole.STRUCTURED_OUTPUT target_model, reason = self._switchboard(role) if model_override: target_model = model_override reason = "Manual override (kwargs.model)" self._log_switchboard(role, target_model, reason) if self.primary: try: res = await self.primary.generate_structured( prompt, schema, model=target_model, **kwargs ) if isinstance(res, str): return LLMResponse(content=res, model=target_model) return res except Exception as e: print(f" Primary Structured Failed: {e}") # Fallback to standard JSON mode res = await self.generate_res(prompt + "\n\nResponse must be valid JSON.", role=ModelRole.SMART_REASONING, json_mode=True) try: import json parsed = json.loads(res.content) res.content = json.dumps(parsed) # Ensure content is JSON return res except: return res async def generate_with_cot( self, prompt: str, messages: Optional[List[Dict]] = None, schema: Optional[Dict] = None, **kwargs ) -> LLMResponse: """ Unified Cot + Structured entry point. """ role = ModelRole.SMART_REASONING model, reason = self._switchboard(role) # Proactively use optimized path if available if self.primary and hasattr(self.primary, "generate_with_cot"): return await self.primary.generate_with_cot(prompt, messages=messages, schema=schema, **kwargs) # Determine reasoning defaults based on role if using standard generate if "gpt-oss" in model.lower(): kwargs.setdefault("include_reasoning", True) kwargs.setdefault("reasoning_effort", "medium") else: kwargs.setdefault("reasoning_format", "parsed") # Default fallback if schema: return await self.generate_structured(prompt, schema, model=model, messages=messages, **kwargs) else: return await self.generate_res(prompt, role=role, messages=messages, **kwargs) async def generate_tool_call( self, prompt: str, tools: list[Dict[str, Any]], model: Optional[str] = None ) -> Optional[list[Dict[str, Any]]]: """ Produce Groq Native Tool Calls. """ if isinstance(self.primary, GroqClient): return await self.primary.generate_tool_call(prompt, tools, model) return None async def generate_verified(self, prompt: str, schema: Dict[str, Any], context: str = "", turn_context: Optional[Any] = None, **kwargs) -> LLMResponse: """ Implementation of Chain of Verification (CoVe) - 4 Phase Process. Stateless refactor. """ # Phase 1: Draft Analysis draft_res = await self.generate_structured(prompt, schema, context=turn_context, **kwargs) # Phase 2: Verification Planning plan_prompt = ( f"INPUT CONTEXT: {context}\n\n" f"INITIAL ANALYSIS: {draft_res.content}\n\n" f"TASK: Generate 3-5 verification questions to check accuracy." ) questions_res = await self.generate_res(plan_prompt, role=ModelRole.SMART_REASONING, context=turn_context) # Phase 3: Answer Verification Questions answer_prompt = ( f"INPUT CONTEXT: {context}\n\n" f"VERIFICATION QUESTIONS:\n{questions_res.content}\n\n" f"TASK: Answer based on context." ) answers_res = await self.generate_res(answer_prompt, role=ModelRole.SMART_REASONING, context=turn_context) # Phase 4: Verified Analysis final_prompt = ( f"INPUT CONTEXT: {context}\n\n" f"INITIAL DRAFT: {draft_res.content}\n\n" f"VERIFICATION ANSWERS:\n{answers_res.content}\n\n" f"TASK: Produce final verified version." ) final_res = await self.generate_structured(final_prompt, schema, context=turn_context, **kwargs) # Merge reasoning from all steps into the final response for auditing final_res.reasoning = ( f"PHASE 1 (Draft) Reasoning: {draft_res.reasoning or 'N/A'}\n\n" f"PHASE 2 (Questions): {questions_res.content}\n\n" f"PHASE 3 (Verification): {answers_res.content}\n\n" f"PHASE 4 (Refined) Reasoning: {final_res.reasoning or 'N/A'}" ) return final_res async def generate_dense(self, context: str, initial_summary: str = "", rounds: int = 3, **kwargs) -> LLMResponse: """ Implementation of Chain of Density (CoD) - Iterative Loop. """ import json current_summary = initial_summary if not current_summary: summary_res = await self.generate_res(f"Summarize this: {context[:2000]}", role=ModelRole.FAST_CHAT) current_summary = summary_res.content all_reasoning = [] last_model = None for i in range(rounds): dense_prompt = ( f"CONTEXT: {context}\n\n" f"CURRENT SUMMARY: {current_summary}\n\n" f"ROUND {i+1} TASK: Identify 1-3 missing entities and fuse them into the summary." ) res = await self.generate_res(dense_prompt, role=ModelRole.SMART_REASONING, context=kwargs.get("context"), **kwargs) current_summary = res.content last_model = res.model if res.reasoning: all_reasoning.append(f"Round {i+1}: {res.reasoning}") return LLMResponse( content=current_summary, reasoning="\n".join(all_reasoning), model=last_model ) # ⚡ Singleton Instance for Global Application Access llm_client = LLMClient()