#!/usr/bin/env python3 """End-to-End Test: Fast Memory with Zero-Latency Extraction. This demonstrates the complete flow: 1. User shares information → Memory extracted INLINE (no extra latency) 2. User asks follow-up → Memory retrieved semantically 3. Assistant uses memory in response Usage: export OPENAI_API_KEY="sk-..." python examples/fast_memory_e2e.py """ from __future__ import annotations import os import sys import tempfile import time from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) from openai import OpenAI from headroom.memory.fast_wrapper import with_fast_memory def run_conversation_test(): """Test multi-turn conversation with memory.""" print("=" * 70) print("FAST MEMORY E2E TEST") print("Zero-latency inline extraction + semantic retrieval") print("=" * 70) api_key = os.environ.get("OPENAI_API_KEY") if not api_key: print("ERROR: OPENAI_API_KEY not set") sys.exit(1) openai_client = OpenAI(api_key=api_key) # Use temp directory for clean test with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test_memory.db" # Create wrapped client print("\nšŸ“¦ Creating fast memory client...") print(" Using local embeddings (sentence-transformers)") client = with_fast_memory( openai_client, user_id="test_user", db_path=db_path, use_local_embeddings=True, ) # Conversation 1: Share preferences print("\n" + "─" * 70) print("TURN 1: User shares preferences") print("─" * 70) user_msg1 = "I'm a Python developer who prefers async/await patterns. I work at a fintech company and we use PostgreSQL." print(f"\nšŸ§‘ User: {user_msg1}") start = time.perf_counter() response1 = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": user_msg1}, ], ) latency1 = time.perf_counter() - start print(f"\nšŸ¤– Assistant: {response1.choices[0].message.content[:200]}...") print(f"\nā±ļø Latency: {latency1 * 1000:.0f}ms (includes inline memory extraction)") # Check what was stored memories = client.memory.get_all() print(f"\nšŸ“ Memories stored: {len(memories)}") for mem in memories: print(f" - {mem.text}") # Conversation 2: Ask related question print("\n" + "─" * 70) print("TURN 2: User asks related question (memory should be retrieved)") print("─" * 70) user_msg2 = "What database should I use for my new project?" print(f"\nšŸ§‘ User: {user_msg2}") start = time.perf_counter() response2 = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": user_msg2}, ], ) latency2 = time.perf_counter() - start print(f"\nšŸ¤– Assistant: {response2.choices[0].message.content}") print(f"\nā±ļø Latency: {latency2 * 1000:.0f}ms") # Check if PostgreSQL is mentioned (should be from memory) response_text = response2.choices[0].message.content.lower() if "postgresql" in response_text or "postgres" in response_text: print("\nāœ… SUCCESS: Assistant referenced PostgreSQL from memory!") else: print("\nāš ļø Note: Assistant didn't explicitly mention PostgreSQL") print(" (Memory was still injected - check if response is contextual)") # Conversation 3: Different topic print("\n" + "─" * 70) print("TURN 3: User asks about coding patterns") print("─" * 70) user_msg3 = "What's the best way to handle concurrent operations in my code?" print(f"\nšŸ§‘ User: {user_msg3}") start = time.perf_counter() response3 = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": user_msg3}, ], ) latency3 = time.perf_counter() - start print(f"\nšŸ¤– Assistant: {response3.choices[0].message.content}") print(f"\nā±ļø Latency: {latency3 * 1000:.0f}ms") # Check if async/await is mentioned response_text = response3.choices[0].message.content.lower() if "async" in response_text or "await" in response_text: print("\nāœ… SUCCESS: Assistant referenced async/await from memory!") else: print("\nāš ļø Note: Assistant didn't explicitly mention async/await") # Final summary print("\n" + "=" * 70) print("SUMMARY") print("=" * 70) all_memories = client.memory.get_all() avg_latency = (latency1 + latency2 + latency3) / 3 print(f"\nTotal memories stored: {len(all_memories)}") print(f"Average latency: {avg_latency * 1000:.0f}ms") print("\nLatency breakdown:") print(f" Turn 1 (extraction): {latency1 * 1000:.0f}ms") print(f" Turn 2 (retrieval): {latency2 * 1000:.0f}ms") print(f" Turn 3 (retrieval): {latency3 * 1000:.0f}ms") print("\nāœ… ZERO extra latency - all memory ops happen inline!") print("āœ… Semantic search - finds conceptually related memories") print("āœ… Local embeddings - sub-50ms retrieval (no API calls)") def benchmark_memory_overhead(): """Measure the overhead of memory operations.""" print("\n" + "=" * 70) print("BENCHMARK: Memory Overhead") print("=" * 70) api_key = os.environ.get("OPENAI_API_KEY") if not api_key: return openai_client = OpenAI(api_key=api_key) test_message = "I prefer Python and use PostgreSQL." # Baseline: No memory print("\n1. BASELINE (no memory wrapper)") baseline_latencies = [] for i in range(3): start = time.perf_counter() openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": test_message}, ], ) elapsed = time.perf_counter() - start baseline_latencies.append(elapsed * 1000) print(f" Run {i + 1}: {elapsed * 1000:.0f}ms") # With memory print("\n2. WITH FAST MEMORY (inline extraction)") with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "bench_memory.db" client = with_fast_memory( openai_client, user_id="bench", db_path=db_path, use_local_embeddings=True, ) memory_latencies = [] for i in range(3): start = time.perf_counter() client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": test_message}, ], ) elapsed = time.perf_counter() - start memory_latencies.append(elapsed * 1000) print(f" Run {i + 1}: {elapsed * 1000:.0f}ms") baseline_avg = sum(baseline_latencies) / len(baseline_latencies) memory_avg = sum(memory_latencies) / len(memory_latencies) overhead = memory_avg - baseline_avg print(f"\n{'─' * 70}") print(f"{'Approach':<30} {'Avg Latency':<15} {'Overhead':<15}") print(f"{'─' * 70}") print(f"{'Baseline (no memory)':<30} {baseline_avg:>10.0f}ms {'0ms':>15}") print(f"{'With fast memory':<30} {memory_avg:>10.0f}ms {f'{overhead:+.0f}ms':>15}") print(f"{'─' * 70}") if overhead < 100: print(f"\nāœ… Memory overhead is only {overhead:.0f}ms - negligible!") else: print(f"\nāš ļø Memory overhead is {overhead:.0f}ms") print(" This is mostly from the memory instruction in the prompt.") if __name__ == "__main__": run_conversation_test() benchmark_memory_overhead()