#!/usr/bin/env python3 """ LangChain + Headroom Integration: Before vs After Examples This script demonstrates the real-world impact of Headroom optimization on LangChain applications. Run with: python examples/langchain_before_after.py Requirements: pip install headroom[langchain] langchain-openai Note: Set OPENAI_API_KEY environment variable for live API tests. For dry-run mode (no API calls), the script shows simulated results. """ from __future__ import annotations import json import os import tempfile import time from dataclasses import dataclass # Check dependencies try: from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage LANGCHAIN_AVAILABLE = True except ImportError: LANGCHAIN_AVAILABLE = False print("LangChain not installed. Install with: pip install langchain-core") try: from langchain_openai import ChatOpenAI # noqa: F401 OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False print("langchain-openai not installed. Install with: pip install langchain-openai") # Import Headroom try: from headroom import ( # noqa: F401 HeadroomClient, HeadroomConfig, HeadroomMode, OpenAIProvider, ) HEADROOM_AVAILABLE = True except ImportError: HEADROOM_AVAILABLE = False print("Headroom not installed. Install with: pip install headroom") @dataclass class ComparisonResult: """Result of before/after comparison.""" scenario: str tokens_before: int tokens_after: int tokens_saved: int savings_percent: float latency_before_ms: float | None latency_after_ms: float | None cost_before_usd: float cost_after_usd: float cost_saved_usd: float def estimate_cost(tokens: int, model: str = "gpt-4o") -> float: """Estimate cost in USD. GPT-4o: $2.50/1M input tokens.""" rates = { "gpt-4o": 2.50 / 1_000_000, "gpt-4o-mini": 0.15 / 1_000_000, "claude-3-5-sonnet": 3.00 / 1_000_000, } return tokens * rates.get(model, 2.50 / 1_000_000) def print_comparison(result: ComparisonResult) -> None: """Print formatted comparison results.""" print(f"\n{'=' * 60}") print(f"Scenario: {result.scenario}") print(f"{'=' * 60}") print("\n[Token Comparison]") print(f" Before: {result.tokens_before:,} tokens") print(f" After: {result.tokens_after:,} tokens") print(f" Saved: {result.tokens_saved:,} tokens ({result.savings_percent:.1f}%)") print("\n[Cost Impact] (GPT-4o pricing)") print(f" Before: ${result.cost_before_usd:.4f}") print(f" After: ${result.cost_after_usd:.4f}") print(f" Saved: ${result.cost_saved_usd:.4f}") if result.latency_before_ms and result.latency_after_ms: print("\n[Latency]") print(f" Before: {result.latency_before_ms:.0f}ms") print(f" After: {result.latency_after_ms:.0f}ms") def langchain_to_openai_messages(messages: list) -> list[dict]: """Convert LangChain messages to OpenAI format.""" openai_messages = [] for msg in messages: if isinstance(msg, SystemMessage): openai_messages.append({"role": "system", "content": msg.content}) elif isinstance(msg, HumanMessage): openai_messages.append({"role": "user", "content": msg.content}) elif isinstance(msg, AIMessage): msg_dict = {"role": "assistant", "content": msg.content} if hasattr(msg, "tool_calls") and msg.tool_calls: msg_dict["tool_calls"] = [ { "id": tc.get("id", f"call_{i}"), "type": "function", "function": { "name": tc.get("name", ""), "arguments": json.dumps(tc.get("args", {})), }, } for i, tc in enumerate(msg.tool_calls) ] openai_messages.append(msg_dict) elif isinstance(msg, ToolMessage): openai_messages.append( { "role": "tool", "tool_call_id": msg.tool_call_id, "content": msg.content, } ) return openai_messages # ============================================================================ # SCENARIO 1: Agentic Workflow with Large Tool Outputs # ============================================================================ def scenario_agentic_workflow() -> ComparisonResult: """ Scenario: AI agent that searches a database and processes results. Common pattern: Tool returns 100+ records, but only 5-10 are relevant. Without optimization, ALL records are sent to the LLM. """ print("\n" + "=" * 60) print("SCENARIO 1: Agentic Workflow with Large Tool Outputs") print("=" * 60) # Simulate a database search tool that returns many results search_results = [ { "id": f"user-{i:04d}", "name": f"User {i}", "email": f"user{i}@example.com", "department": ["Engineering", "Sales", "Marketing", "Support"][i % 4], "status": "active" if i % 10 != 0 else "inactive", "created_at": f"2024-{(i % 12) + 1:02d}-{(i % 28) + 1:02d}T10:00:00Z", "last_login": f"2024-12-{(i % 28) + 1:02d}T{i % 24:02d}:00:00Z", "metadata": { "preferences": {"theme": "dark", "notifications": True}, "tags": ["premium", "verified"] if i % 5 == 0 else [], }, } for i in range(100) ] # The conversation in LangChain format lc_messages = [ SystemMessage( content="""You are a helpful database assistant. When searching for users, analyze the results and provide a summary. Focus on active users in the Engineering department.""" ), HumanMessage(content="Find users in the Engineering department"), AIMessage( content="I'll search the database for Engineering users.", tool_calls=[ {"id": "call_1", "name": "search_users", "args": {"department": "Engineering"}} ], ), ToolMessage( content=json.dumps(search_results), # 100 records! tool_call_id="call_1", ), ] # Convert to OpenAI format for Headroom messages = langchain_to_openai_messages(lc_messages) # Create Headroom client for simulation from openai import OpenAI db_path = os.path.join(tempfile.gettempdir(), "headroom_langchain_example.db") base_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-fake-key")) provider = OpenAIProvider() client = HeadroomClient( original_client=base_client, provider=provider, store_url=f"sqlite:///{db_path}", default_mode="optimize", ) # Simulate optimization plan = client.chat.completions.simulate( model="gpt-4o", messages=messages, ) tokens_before = plan.tokens_before tokens_after = plan.tokens_after tokens_saved = plan.tokens_saved savings_percent = (tokens_saved / tokens_before * 100) if tokens_before > 0 else 0 print("\n[Before Optimization]") print(" - System prompt + conversation") print(f" - Tool output: 100 user records ({len(json.dumps(search_results))} chars)") print("\n[After Optimization]") print(" - SmartCrusher kept: first 3, last 2, + relevance matches") print(" - Estimated ~15 items preserved (Engineering dept matches)") print(f" - Transforms: {plan.transforms}") client.close() return ComparisonResult( scenario="Agentic Workflow with Large Tool Outputs", tokens_before=tokens_before, tokens_after=tokens_after, tokens_saved=tokens_saved, savings_percent=savings_percent, latency_before_ms=None, latency_after_ms=None, cost_before_usd=estimate_cost(tokens_before), cost_after_usd=estimate_cost(tokens_after), cost_saved_usd=estimate_cost(tokens_saved), ) # ============================================================================ # SCENARIO 2: Long Conversation with Context Window Pressure # ============================================================================ def scenario_long_conversation() -> ComparisonResult: """ Scenario: Multi-turn conversation approaching context window limit. Common pattern: Chatbot accumulates history, needs to drop old turns. Without optimization, either hits context limit or loses coherence. """ print("\n" + "=" * 60) print("SCENARIO 2: Long Conversation with Context Window Pressure") print("=" * 60) # Simulate 50-turn conversation in LangChain format lc_messages = [ SystemMessage( content="""You are a customer support agent for TechCorp. You have access to customer data and can help with: - Account issues - Billing questions - Technical support - Product information Current date: 2024-12-15 Agent ID: support-agent-42 """ ), ] # Add 50 turns of conversation topics = [ "I can't log into my account", "What's my current subscription?", "Can you explain the premium features?", "I was charged twice this month", "How do I reset my password?", ] for i in range(50): topic = topics[i % len(topics)] lc_messages.append(HumanMessage(content=f"Turn {i}: {topic}")) lc_messages.append( AIMessage( content=f"Response to turn {i}: Thank you for reaching out about '{topic}'. " f"I can help you with that. Here's what I found... " * 3 ) ) # Convert to OpenAI format messages = langchain_to_openai_messages(lc_messages) # Create Headroom client for simulation from openai import OpenAI db_path = os.path.join(tempfile.gettempdir(), "headroom_langchain_example.db") base_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-fake-key")) provider = OpenAIProvider() client = HeadroomClient( original_client=base_client, provider=provider, store_url=f"sqlite:///{db_path}", default_mode="optimize", ) # Simulate optimization plan = client.chat.completions.simulate( model="gpt-4o", messages=messages, ) tokens_before = plan.tokens_before tokens_after = plan.tokens_after tokens_saved = plan.tokens_saved savings_percent = (tokens_saved / tokens_before * 100) if tokens_before > 0 else 0 print("\n[Before Optimization]") print(" - 50-turn conversation") print(f" - ~{tokens_before:,} tokens total") print("\n[After Optimization]") print(" - RollingWindow kept system + last N turns") print(" - CacheAligner moved date to dynamic tail") print(f" - Transforms: {plan.transforms}") client.close() return ComparisonResult( scenario="Long Conversation (50 turns)", tokens_before=tokens_before, tokens_after=tokens_after, tokens_saved=tokens_saved, savings_percent=savings_percent, latency_before_ms=None, latency_after_ms=None, cost_before_usd=estimate_cost(tokens_before), cost_after_usd=estimate_cost(tokens_after), cost_saved_usd=estimate_cost(tokens_saved), ) # ============================================================================ # SCENARIO 3: RAG with Retrieved Documents # ============================================================================ def scenario_rag_pipeline() -> ComparisonResult: """ Scenario: RAG pipeline that retrieves multiple documents. Common pattern: Retriever returns 10 chunks, many are redundant. Without optimization, all chunks consume tokens. """ print("\n" + "=" * 60) print("SCENARIO 3: RAG Pipeline with Retrieved Documents") print("=" * 60) # Simulate retrieved document chunks chunks = [] for i in range(10): chunk = { "content": f"Document {i} content: " + "This is relevant information. " * 50, "source": f"doc_{i}.pdf", "page": i + 1, "relevance_score": 0.9 - (i * 0.05), "metadata": { "author": f"Author {i}", "date": "2024-01-15", "category": "Technical", }, } chunks.append(chunk) context = "\n\n".join( [f"[Source: {c['source']}, Page {c['page']}]\n{c['content']}" for c in chunks] ) # LangChain format lc_messages = [ SystemMessage(content="You are a helpful assistant. Answer based on the provided context."), HumanMessage( content=f"""Based on the following retrieved documents: {context} Question: What are the key technical requirements?""" ), ] # Convert to OpenAI format messages = langchain_to_openai_messages(lc_messages) # Create Headroom client for simulation from openai import OpenAI db_path = os.path.join(tempfile.gettempdir(), "headroom_langchain_example.db") base_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-fake-key")) provider = OpenAIProvider() client = HeadroomClient( original_client=base_client, provider=provider, store_url=f"sqlite:///{db_path}", default_mode="optimize", ) # Simulate optimization plan = client.chat.completions.simulate( model="gpt-4o", messages=messages, ) tokens_before = plan.tokens_before tokens_after = plan.tokens_after tokens_saved = plan.tokens_saved savings_percent = (tokens_saved / tokens_before * 100) if tokens_before > 0 else 0 print("\n[Before Optimization]") print(" - 10 retrieved document chunks") print(f" - ~{tokens_before:,} tokens total") print("\n[After Optimization]") print(" - CacheAligner normalized whitespace") print(f" - Transforms: {plan.transforms}") client.close() return ComparisonResult( scenario="RAG Pipeline (10 chunks)", tokens_before=tokens_before, tokens_after=tokens_after, tokens_saved=tokens_saved, savings_percent=savings_percent, latency_before_ms=None, latency_after_ms=None, cost_before_usd=estimate_cost(tokens_before), cost_after_usd=estimate_cost(tokens_after), cost_saved_usd=estimate_cost(tokens_saved), ) # ============================================================================ # SCENARIO 4: Real API Comparison (if API key available) # ============================================================================ def scenario_live_api() -> ComparisonResult | None: """ Scenario: Live API comparison with actual timing. Only runs if OPENAI_API_KEY is set. """ if not os.environ.get("OPENAI_API_KEY"): print("\n[!] Skipping live API test (OPENAI_API_KEY not set)") return None if not OPENAI_AVAILABLE: print("\n[!] Skipping live API test (langchain-openai not installed)") return None print("\n" + "=" * 60) print("SCENARIO 4: Live API Comparison") print("=" * 60) from openai import OpenAI # Create base OpenAI client base_client = OpenAI() # Create Headroom-wrapped client db_path = os.path.join(tempfile.gettempdir(), "headroom_langchain_live.db") provider = OpenAIProvider() headroom_client = HeadroomClient( original_client=base_client, provider=provider, store_url=f"sqlite:///{db_path}", default_mode="optimize", ) # Test messages in OpenAI format messages = [ {"role": "system", "content": "You are helpful. Be concise."}, {"role": "user", "content": "What is 2+2?"}, ] # Time the base client start = time.time() base_response = base_client.chat.completions.create( model="gpt-4o-mini", messages=messages, max_tokens=50, ) latency_before = (time.time() - start) * 1000 # Time the Headroom-wrapped client start = time.time() optimized_response = headroom_client.chat.completions.create( model="gpt-4o-mini", messages=messages, headroom_mode="optimize", max_tokens=50, ) latency_after = (time.time() - start) * 1000 print(f"\n[Base Model Response] {base_response.choices[0].message.content[:50]}...") print(f"[Optimized Response] {optimized_response.choices[0].message.content[:50]}...") print(f"\n[Latency] {latency_before:.0f}ms -> {latency_after:.0f}ms") # Get metrics headroom_client.get_summary() headroom_client.close() # For this simple case, savings are minimal tokens_before = base_response.usage.prompt_tokens if base_response.usage else 20 tokens_after = optimized_response.usage.prompt_tokens if optimized_response.usage else 20 tokens_saved = max(0, tokens_before - tokens_after) return ComparisonResult( scenario="Live API (Simple Query)", tokens_before=tokens_before, tokens_after=tokens_after, tokens_saved=tokens_saved, savings_percent=(tokens_saved / tokens_before * 100) if tokens_before > 0 else 0, latency_before_ms=latency_before, latency_after_ms=latency_after, cost_before_usd=estimate_cost(tokens_before, "gpt-4o-mini"), cost_after_usd=estimate_cost(tokens_after, "gpt-4o-mini"), cost_saved_usd=estimate_cost(tokens_saved, "gpt-4o-mini"), ) # ============================================================================ # MAIN: Run All Scenarios # ============================================================================ def main(): """Run all comparison scenarios.""" print("\n" + "=" * 70) print(" HEADROOM + LANGCHAIN: Before vs After Comparison") print("=" * 70) if not LANGCHAIN_AVAILABLE: print("\n[X] Cannot run examples: LangChain not installed") print(" Install with: pip install langchain-core") return if not HEADROOM_AVAILABLE: print("\n[X] Cannot run examples: Headroom not installed") return results = [] # Run each scenario try: results.append(scenario_agentic_workflow()) except Exception as e: print(f"\n[X] Scenario 1 failed: {e}") try: results.append(scenario_long_conversation()) except Exception as e: print(f"\n[X] Scenario 2 failed: {e}") try: results.append(scenario_rag_pipeline()) except Exception as e: print(f"\n[X] Scenario 3 failed: {e}") try: live_result = scenario_live_api() if live_result: results.append(live_result) except Exception as e: print(f"\n[X] Live API scenario failed: {e}") # Print all results print("\n\n" + "=" * 70) print(" SUMMARY: All Scenarios") print("=" * 70) for result in results: print_comparison(result) # Calculate totals if results: total_saved = sum(r.tokens_saved for r in results) total_cost_saved = sum(r.cost_saved_usd for r in results) avg_savings = sum(r.savings_percent for r in results) / len(results) print("\n" + "=" * 70) print(" TOTAL IMPACT") print("=" * 70) print(f"\n[Results] Across {len(results)} scenarios:") print(f" Total tokens saved: {total_saved:,}") print(f" Average savings: {avg_savings:.1f}%") print(f" Total cost saved: ${total_cost_saved:.4f}") print("\n[Projection] At scale (1M requests/month):") print(f" Estimated monthly savings: ${total_cost_saved * 1_000_000 / len(results):,.2f}") if __name__ == "__main__": main()