headroom / examples /langchain_before_after.py
chopratejas's picture
Fix all ruff lint and format errors for CI
e4a41fa
Raw
History Blame
20 kB
#!/usr/bin/env python3
"""
LangChain + Headroom Integration: Before vs After Examples
This script demonstrates the real-world impact of Headroom optimization
on LangChain applications. Run with:
python examples/langchain_before_after.py
Requirements:
pip install headroom[langchain] langchain-openai
Note: Set OPENAI_API_KEY environment variable for live API tests.
For dry-run mode (no API calls), the script shows simulated results.
"""
from __future__ import annotations
import json
import os
import tempfile
import time
from dataclasses import dataclass
# Check dependencies
try:
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolMessage
LANGCHAIN_AVAILABLE = True
except ImportError:
LANGCHAIN_AVAILABLE = False
print("LangChain not installed. Install with: pip install langchain-core")
try:
from langchain_openai import ChatOpenAI # noqa: F401
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
print("langchain-openai not installed. Install with: pip install langchain-openai")
# Import Headroom
try:
from headroom import ( # noqa: F401
HeadroomClient,
HeadroomConfig,
HeadroomMode,
OpenAIProvider,
)
HEADROOM_AVAILABLE = True
except ImportError:
HEADROOM_AVAILABLE = False
print("Headroom not installed. Install with: pip install headroom")
@dataclass
class ComparisonResult:
"""Result of before/after comparison."""
scenario: str
tokens_before: int
tokens_after: int
tokens_saved: int
savings_percent: float
latency_before_ms: float | None
latency_after_ms: float | None
cost_before_usd: float
cost_after_usd: float
cost_saved_usd: float
def estimate_cost(tokens: int, model: str = "gpt-4o") -> float:
"""Estimate cost in USD. GPT-4o: $2.50/1M input tokens."""
rates = {
"gpt-4o": 2.50 / 1_000_000,
"gpt-4o-mini": 0.15 / 1_000_000,
"claude-3-5-sonnet": 3.00 / 1_000_000,
}
return tokens * rates.get(model, 2.50 / 1_000_000)
def print_comparison(result: ComparisonResult) -> None:
"""Print formatted comparison results."""
print(f"\n{'=' * 60}")
print(f"Scenario: {result.scenario}")
print(f"{'=' * 60}")
print("\n[Token Comparison]")
print(f" Before: {result.tokens_before:,} tokens")
print(f" After: {result.tokens_after:,} tokens")
print(f" Saved: {result.tokens_saved:,} tokens ({result.savings_percent:.1f}%)")
print("\n[Cost Impact] (GPT-4o pricing)")
print(f" Before: ${result.cost_before_usd:.4f}")
print(f" After: ${result.cost_after_usd:.4f}")
print(f" Saved: ${result.cost_saved_usd:.4f}")
if result.latency_before_ms and result.latency_after_ms:
print("\n[Latency]")
print(f" Before: {result.latency_before_ms:.0f}ms")
print(f" After: {result.latency_after_ms:.0f}ms")
def langchain_to_openai_messages(messages: list) -> list[dict]:
"""Convert LangChain messages to OpenAI format."""
openai_messages = []
for msg in messages:
if isinstance(msg, SystemMessage):
openai_messages.append({"role": "system", "content": msg.content})
elif isinstance(msg, HumanMessage):
openai_messages.append({"role": "user", "content": msg.content})
elif isinstance(msg, AIMessage):
msg_dict = {"role": "assistant", "content": msg.content}
if hasattr(msg, "tool_calls") and msg.tool_calls:
msg_dict["tool_calls"] = [
{
"id": tc.get("id", f"call_{i}"),
"type": "function",
"function": {
"name": tc.get("name", ""),
"arguments": json.dumps(tc.get("args", {})),
},
}
for i, tc in enumerate(msg.tool_calls)
]
openai_messages.append(msg_dict)
elif isinstance(msg, ToolMessage):
openai_messages.append(
{
"role": "tool",
"tool_call_id": msg.tool_call_id,
"content": msg.content,
}
)
return openai_messages
# ============================================================================
# SCENARIO 1: Agentic Workflow with Large Tool Outputs
# ============================================================================
def scenario_agentic_workflow() -> ComparisonResult:
"""
Scenario: AI agent that searches a database and processes results.
Common pattern: Tool returns 100+ records, but only 5-10 are relevant.
Without optimization, ALL records are sent to the LLM.
"""
print("\n" + "=" * 60)
print("SCENARIO 1: Agentic Workflow with Large Tool Outputs")
print("=" * 60)
# Simulate a database search tool that returns many results
search_results = [
{
"id": f"user-{i:04d}",
"name": f"User {i}",
"email": f"user{i}@example.com",
"department": ["Engineering", "Sales", "Marketing", "Support"][i % 4],
"status": "active" if i % 10 != 0 else "inactive",
"created_at": f"2024-{(i % 12) + 1:02d}-{(i % 28) + 1:02d}T10:00:00Z",
"last_login": f"2024-12-{(i % 28) + 1:02d}T{i % 24:02d}:00:00Z",
"metadata": {
"preferences": {"theme": "dark", "notifications": True},
"tags": ["premium", "verified"] if i % 5 == 0 else [],
},
}
for i in range(100)
]
# The conversation in LangChain format
lc_messages = [
SystemMessage(
content="""You are a helpful database assistant.
When searching for users, analyze the results and provide a summary.
Focus on active users in the Engineering department."""
),
HumanMessage(content="Find users in the Engineering department"),
AIMessage(
content="I'll search the database for Engineering users.",
tool_calls=[
{"id": "call_1", "name": "search_users", "args": {"department": "Engineering"}}
],
),
ToolMessage(
content=json.dumps(search_results), # 100 records!
tool_call_id="call_1",
),
]
# Convert to OpenAI format for Headroom
messages = langchain_to_openai_messages(lc_messages)
# Create Headroom client for simulation
from openai import OpenAI
db_path = os.path.join(tempfile.gettempdir(), "headroom_langchain_example.db")
base_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-fake-key"))
provider = OpenAIProvider()
client = HeadroomClient(
original_client=base_client,
provider=provider,
store_url=f"sqlite:///{db_path}",
default_mode="optimize",
)
# Simulate optimization
plan = client.chat.completions.simulate(
model="gpt-4o",
messages=messages,
)
tokens_before = plan.tokens_before
tokens_after = plan.tokens_after
tokens_saved = plan.tokens_saved
savings_percent = (tokens_saved / tokens_before * 100) if tokens_before > 0 else 0
print("\n[Before Optimization]")
print(" - System prompt + conversation")
print(f" - Tool output: 100 user records ({len(json.dumps(search_results))} chars)")
print("\n[After Optimization]")
print(" - SmartCrusher kept: first 3, last 2, + relevance matches")
print(" - Estimated ~15 items preserved (Engineering dept matches)")
print(f" - Transforms: {plan.transforms}")
client.close()
return ComparisonResult(
scenario="Agentic Workflow with Large Tool Outputs",
tokens_before=tokens_before,
tokens_after=tokens_after,
tokens_saved=tokens_saved,
savings_percent=savings_percent,
latency_before_ms=None,
latency_after_ms=None,
cost_before_usd=estimate_cost(tokens_before),
cost_after_usd=estimate_cost(tokens_after),
cost_saved_usd=estimate_cost(tokens_saved),
)
# ============================================================================
# SCENARIO 2: Long Conversation with Context Window Pressure
# ============================================================================
def scenario_long_conversation() -> ComparisonResult:
"""
Scenario: Multi-turn conversation approaching context window limit.
Common pattern: Chatbot accumulates history, needs to drop old turns.
Without optimization, either hits context limit or loses coherence.
"""
print("\n" + "=" * 60)
print("SCENARIO 2: Long Conversation with Context Window Pressure")
print("=" * 60)
# Simulate 50-turn conversation in LangChain format
lc_messages = [
SystemMessage(
content="""You are a customer support agent for TechCorp.
You have access to customer data and can help with:
- Account issues
- Billing questions
- Technical support
- Product information
Current date: 2024-12-15
Agent ID: support-agent-42
"""
),
]
# Add 50 turns of conversation
topics = [
"I can't log into my account",
"What's my current subscription?",
"Can you explain the premium features?",
"I was charged twice this month",
"How do I reset my password?",
]
for i in range(50):
topic = topics[i % len(topics)]
lc_messages.append(HumanMessage(content=f"Turn {i}: {topic}"))
lc_messages.append(
AIMessage(
content=f"Response to turn {i}: Thank you for reaching out about '{topic}'. "
f"I can help you with that. Here's what I found... " * 3
)
)
# Convert to OpenAI format
messages = langchain_to_openai_messages(lc_messages)
# Create Headroom client for simulation
from openai import OpenAI
db_path = os.path.join(tempfile.gettempdir(), "headroom_langchain_example.db")
base_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-fake-key"))
provider = OpenAIProvider()
client = HeadroomClient(
original_client=base_client,
provider=provider,
store_url=f"sqlite:///{db_path}",
default_mode="optimize",
)
# Simulate optimization
plan = client.chat.completions.simulate(
model="gpt-4o",
messages=messages,
)
tokens_before = plan.tokens_before
tokens_after = plan.tokens_after
tokens_saved = plan.tokens_saved
savings_percent = (tokens_saved / tokens_before * 100) if tokens_before > 0 else 0
print("\n[Before Optimization]")
print(" - 50-turn conversation")
print(f" - ~{tokens_before:,} tokens total")
print("\n[After Optimization]")
print(" - RollingWindow kept system + last N turns")
print(" - CacheAligner moved date to dynamic tail")
print(f" - Transforms: {plan.transforms}")
client.close()
return ComparisonResult(
scenario="Long Conversation (50 turns)",
tokens_before=tokens_before,
tokens_after=tokens_after,
tokens_saved=tokens_saved,
savings_percent=savings_percent,
latency_before_ms=None,
latency_after_ms=None,
cost_before_usd=estimate_cost(tokens_before),
cost_after_usd=estimate_cost(tokens_after),
cost_saved_usd=estimate_cost(tokens_saved),
)
# ============================================================================
# SCENARIO 3: RAG with Retrieved Documents
# ============================================================================
def scenario_rag_pipeline() -> ComparisonResult:
"""
Scenario: RAG pipeline that retrieves multiple documents.
Common pattern: Retriever returns 10 chunks, many are redundant.
Without optimization, all chunks consume tokens.
"""
print("\n" + "=" * 60)
print("SCENARIO 3: RAG Pipeline with Retrieved Documents")
print("=" * 60)
# Simulate retrieved document chunks
chunks = []
for i in range(10):
chunk = {
"content": f"Document {i} content: " + "This is relevant information. " * 50,
"source": f"doc_{i}.pdf",
"page": i + 1,
"relevance_score": 0.9 - (i * 0.05),
"metadata": {
"author": f"Author {i}",
"date": "2024-01-15",
"category": "Technical",
},
}
chunks.append(chunk)
context = "\n\n".join(
[f"[Source: {c['source']}, Page {c['page']}]\n{c['content']}" for c in chunks]
)
# LangChain format
lc_messages = [
SystemMessage(content="You are a helpful assistant. Answer based on the provided context."),
HumanMessage(
content=f"""Based on the following retrieved documents:
{context}
Question: What are the key technical requirements?"""
),
]
# Convert to OpenAI format
messages = langchain_to_openai_messages(lc_messages)
# Create Headroom client for simulation
from openai import OpenAI
db_path = os.path.join(tempfile.gettempdir(), "headroom_langchain_example.db")
base_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-fake-key"))
provider = OpenAIProvider()
client = HeadroomClient(
original_client=base_client,
provider=provider,
store_url=f"sqlite:///{db_path}",
default_mode="optimize",
)
# Simulate optimization
plan = client.chat.completions.simulate(
model="gpt-4o",
messages=messages,
)
tokens_before = plan.tokens_before
tokens_after = plan.tokens_after
tokens_saved = plan.tokens_saved
savings_percent = (tokens_saved / tokens_before * 100) if tokens_before > 0 else 0
print("\n[Before Optimization]")
print(" - 10 retrieved document chunks")
print(f" - ~{tokens_before:,} tokens total")
print("\n[After Optimization]")
print(" - CacheAligner normalized whitespace")
print(f" - Transforms: {plan.transforms}")
client.close()
return ComparisonResult(
scenario="RAG Pipeline (10 chunks)",
tokens_before=tokens_before,
tokens_after=tokens_after,
tokens_saved=tokens_saved,
savings_percent=savings_percent,
latency_before_ms=None,
latency_after_ms=None,
cost_before_usd=estimate_cost(tokens_before),
cost_after_usd=estimate_cost(tokens_after),
cost_saved_usd=estimate_cost(tokens_saved),
)
# ============================================================================
# SCENARIO 4: Real API Comparison (if API key available)
# ============================================================================
def scenario_live_api() -> ComparisonResult | None:
"""
Scenario: Live API comparison with actual timing.
Only runs if OPENAI_API_KEY is set.
"""
if not os.environ.get("OPENAI_API_KEY"):
print("\n[!] Skipping live API test (OPENAI_API_KEY not set)")
return None
if not OPENAI_AVAILABLE:
print("\n[!] Skipping live API test (langchain-openai not installed)")
return None
print("\n" + "=" * 60)
print("SCENARIO 4: Live API Comparison")
print("=" * 60)
from openai import OpenAI
# Create base OpenAI client
base_client = OpenAI()
# Create Headroom-wrapped client
db_path = os.path.join(tempfile.gettempdir(), "headroom_langchain_live.db")
provider = OpenAIProvider()
headroom_client = HeadroomClient(
original_client=base_client,
provider=provider,
store_url=f"sqlite:///{db_path}",
default_mode="optimize",
)
# Test messages in OpenAI format
messages = [
{"role": "system", "content": "You are helpful. Be concise."},
{"role": "user", "content": "What is 2+2?"},
]
# Time the base client
start = time.time()
base_response = base_client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
max_tokens=50,
)
latency_before = (time.time() - start) * 1000
# Time the Headroom-wrapped client
start = time.time()
optimized_response = headroom_client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
headroom_mode="optimize",
max_tokens=50,
)
latency_after = (time.time() - start) * 1000
print(f"\n[Base Model Response] {base_response.choices[0].message.content[:50]}...")
print(f"[Optimized Response] {optimized_response.choices[0].message.content[:50]}...")
print(f"\n[Latency] {latency_before:.0f}ms -> {latency_after:.0f}ms")
# Get metrics
headroom_client.get_summary()
headroom_client.close()
# For this simple case, savings are minimal
tokens_before = base_response.usage.prompt_tokens if base_response.usage else 20
tokens_after = optimized_response.usage.prompt_tokens if optimized_response.usage else 20
tokens_saved = max(0, tokens_before - tokens_after)
return ComparisonResult(
scenario="Live API (Simple Query)",
tokens_before=tokens_before,
tokens_after=tokens_after,
tokens_saved=tokens_saved,
savings_percent=(tokens_saved / tokens_before * 100) if tokens_before > 0 else 0,
latency_before_ms=latency_before,
latency_after_ms=latency_after,
cost_before_usd=estimate_cost(tokens_before, "gpt-4o-mini"),
cost_after_usd=estimate_cost(tokens_after, "gpt-4o-mini"),
cost_saved_usd=estimate_cost(tokens_saved, "gpt-4o-mini"),
)
# ============================================================================
# MAIN: Run All Scenarios
# ============================================================================
def main():
"""Run all comparison scenarios."""
print("\n" + "=" * 70)
print(" HEADROOM + LANGCHAIN: Before vs After Comparison")
print("=" * 70)
if not LANGCHAIN_AVAILABLE:
print("\n[X] Cannot run examples: LangChain not installed")
print(" Install with: pip install langchain-core")
return
if not HEADROOM_AVAILABLE:
print("\n[X] Cannot run examples: Headroom not installed")
return
results = []
# Run each scenario
try:
results.append(scenario_agentic_workflow())
except Exception as e:
print(f"\n[X] Scenario 1 failed: {e}")
try:
results.append(scenario_long_conversation())
except Exception as e:
print(f"\n[X] Scenario 2 failed: {e}")
try:
results.append(scenario_rag_pipeline())
except Exception as e:
print(f"\n[X] Scenario 3 failed: {e}")
try:
live_result = scenario_live_api()
if live_result:
results.append(live_result)
except Exception as e:
print(f"\n[X] Live API scenario failed: {e}")
# Print all results
print("\n\n" + "=" * 70)
print(" SUMMARY: All Scenarios")
print("=" * 70)
for result in results:
print_comparison(result)
# Calculate totals
if results:
total_saved = sum(r.tokens_saved for r in results)
total_cost_saved = sum(r.cost_saved_usd for r in results)
avg_savings = sum(r.savings_percent for r in results) / len(results)
print("\n" + "=" * 70)
print(" TOTAL IMPACT")
print("=" * 70)
print(f"\n[Results] Across {len(results)} scenarios:")
print(f" Total tokens saved: {total_saved:,}")
print(f" Average savings: {avg_savings:.1f}%")
print(f" Total cost saved: ${total_cost_saved:.4f}")
print("\n[Projection] At scale (1M requests/month):")
print(f" Estimated monthly savings: ${total_cost_saved * 1_000_000 / len(results):,.2f}")
if __name__ == "__main__":
main()