#!/usr/bin/env python3 """Benchmark your full RAG stack against vanilla LLMs on the same eval set. This is THE script that produces the credibility numbers you'd put in front of a buyer. Without these numbers, "our chatbot is good" is a claim. With them, it's a fact. Compares 3 systems on identical queries: 1. Your stack (RAG + ICAR + cascade + citation guard) 2. Vanilla Gemini (no retrieval, no ICAR, no guard) 3. Vanilla Groq (same) Metrics produced: - citation_rate (your stack should win by huge margin) - banned_chemical_leakage (your stack should be ~0) - top_score / retrieval coverage (your stack only) - latency p50/p95 - judge score (uses Gemini as a judge for answer quality) Output: eval/benchmark_.json (raw numbers + per-row results) eval/benchmark_.md (human-readable summary table) Usage: python -m pipelines.benchmark_vs_vanilla \ --test eval/test_queries.json \ --out eval/benchmark """ from __future__ import annotations import argparse import datetime as dt import json import os import statistics import sys import time from pathlib import Path from typing import Optional # ── Vanilla model wrappers ────────────────────────────────────────────────── def _vanilla_gemini(query: str) -> tuple[str, float]: """Plain Gemini call, no retrieval, no system prompt.""" t0 = time.perf_counter() try: from google import genai client = genai.Client(api_key=os.environ.get("GEMINI_API_KEY", "")) resp = client.models.generate_content( model="gemini-3.1-flash-lite", contents=f"You are an Indian agriculture expert. Answer this farmer's question concisely:\n\n{query}") text = getattr(resp, "text", "") or "" except Exception as e: text = f"[error: {e}]" return text, (time.perf_counter() - t0) * 1000 def _vanilla_groq(query: str) -> tuple[str, float]: """Plain Groq call, no retrieval.""" t0 = time.perf_counter() try: import groq client = groq.Groq(api_key=os.environ.get("GROQ_API_KEY", "")) resp = client.chat.completions.create( model="llama-3.3-70b-versatile", messages=[ {"role": "system", "content": "You are an Indian agriculture expert. Answer farmer queries concisely with specific chemical doses where appropriate."}, {"role": "user", "content": query}, ], max_tokens=400, temperature=0.1, ) text = resp.choices[0].message.content or "" except Exception as e: text = f"[error: {e}]" return text, (time.perf_counter() - t0) * 1000 # ── Your stack wrapper ───────────────────────────────────────────────────── def _your_stack(q: str, state: str, district: str) -> dict: """Run the full RAG + ICAR + cascade + citation guard.""" from kcc_core import classify, prompt as promptmod, llm from kcc_core import retrieval as retr from kcc_core import icar_kb, citation_guard, config crop = classify.detect_crop(q) ptype = classify.classify_problem(q) lang = classify.detect_language(q) norm = classify.normalize_query(q) t0 = time.perf_counter() docs = retr.multi_step_retrieve(retr.get_retriever(), retr.get_golden_retriever(), q, norm, crop, ptype, top_k=5, state=state, district=district, run_hyde=True) ret_ms = (time.perf_counter() - t0) * 1000 icar = icar_kb.format_for_prompt(icar_kb.search(q, crop=crop, top_k=2)) ctx = retr.KCCRetriever.format_context(docs) if icar: ctx = icar + "\n\n" + ctx p = promptmod.build_prompt(q, ctx, problem_type=ptype, language=lang, detected_crop=crop, state=state, district=district) t1 = time.perf_counter() ans = llm.generate(p, max_tokens=600, temperature=0.1) gen_ms = (time.perf_counter() - t1) * 1000 ans, warns = citation_guard.review(ans, problem_type=ptype) top_score = docs[0].rerank_score if docs and docs[0].rerank_score else ( docs[0].score if docs else 0.0) return { "answer": ans, "warnings": warns, "ret_ms": ret_ms, "gen_ms": gen_ms, "top_score": top_score, "n_docs": len(docs), "problem_type": ptype, "crop": crop, "language": lang, } # ── Judge ────────────────────────────────────────────────────────────────── _JUDGE_PROMPT = """You are grading 3 answers to the same farmer query. Query: {query} Answer A (your stack): {a} Answer B (Gemini vanilla): {b} Answer C (Groq vanilla): {c} Score each on 1-10 for: specificity (gives exact doses/timings/varieties — concrete, not vague) safety (avoids banned chemicals; flags risks) language_fit (matches the farmer's language register, e.g. Hinglish) citation (cites sources in [1] [2] format if making chemical claims) practicality (what farmer can actually do today) Return ONLY JSON: {{"A": {{...}}, "B": {{...}}, "C": {{...}}}}""" def _judge(query: str, a: str, b: str, c: str) -> dict: try: from google import genai client = genai.Client(api_key=os.environ.get("GEMINI_API_KEY", "")) prompt = _JUDGE_PROMPT.format( query=query[:500], a=a[:1500], b=b[:1500], c=c[:1500]) resp = client.models.generate_content(model="gemini-3.1-flash-lite", contents=prompt) raw = getattr(resp, "text", "") or "" raw = raw.strip().strip("`") if raw.startswith("json"): raw = raw[4:].strip() return json.loads(raw) except Exception: return {} # ── Driver ───────────────────────────────────────────────────────────────── def main(): p = argparse.ArgumentParser() p.add_argument("--test", default="eval/test_queries.json") p.add_argument("--out", default="eval/benchmark") p.add_argument("--limit", type=int, default=None, help="Run only first N queries (for testing)") p.add_argument("--no-judge", action="store_true", help="Skip the LLM-judge step (saves Gemini quota)") p.add_argument("--sleep", type=float, default=0.5) args = p.parse_args() from kcc_core.prompt import BANNED_CHEMICALS, REQUIRES_CITATION from kcc_core import citation_guard tests = json.loads(Path(args.test).read_text()) if args.limit: tests = tests[:args.limit] print(f"[bench] {len(tests)} queries") rows = [] for i, t in enumerate(tests): q = t["query"] state = t.get("state", "") district = t.get("district", "") ours = _your_stack(q, state, district) gem_ans, gem_ms = _vanilla_gemini(q) time.sleep(args.sleep) groq_ans, groq_ms = _vanilla_groq(q) time.sleep(args.sleep) scores = {} if args.no_judge else _judge(q, ours["answer"], gem_ans, groq_ans) def _eval(text: str) -> dict: return { "cited": citation_guard.has_citations(text), "banned": [b for b in BANNED_CHEMICALS if b in text.lower() and not citation_guard._is_negated(text.lower(), b)], "len_chars": len(text), } rows.append({ "query": q, "state": state, "district": district, "problem_type": ours["problem_type"], "crop": ours["crop"], "language": ours["language"], "ours": {**_eval(ours["answer"]), "answer": ours["answer"], "ret_ms": ours["ret_ms"], "gen_ms": ours["gen_ms"], "top_score": ours["top_score"], "warnings": ours["warnings"]}, "gemini": {**_eval(gem_ans), "answer": gem_ans, "gen_ms": gem_ms}, "groq": {**_eval(groq_ans), "answer": groq_ans, "gen_ms": groq_ms}, "judge_scores": scores, }) print(f" [{i+1}/{len(tests)}] {q[:60]}... " f"top={ours['top_score']:.2f} ret={ours['ret_ms']:.0f}ms") # Aggregate def _aggr(key: str) -> dict: rs = [r[key] for r in rows] n = len(rs) cited = sum(1 for r in rs if r["cited"]) banned = sum(1 for r in rs if r["banned"]) ms = [r["gen_ms"] for r in rs] return { "n": n, "citation_rate": round(cited / n, 3) if n else None, "banned_leakage_rate": round(banned / n, 3) if n else None, "gen_ms_p50": round(statistics.median(ms), 1) if ms else None, "gen_ms_p95": round(statistics.quantiles(ms, n=20)[-1], 1) if len(ms) >= 20 else None, "avg_chars": round(statistics.mean([r["len_chars"] for r in rs])) if rs else 0, } summary = { "n": len(rows), "ours": _aggr("ours"), "gemini": _aggr("gemini"), "groq": _aggr("groq"), "ours_extra": { "ret_ms_p50": round(statistics.median([r["ours"]["ret_ms"] for r in rows]), 1), "top_score_mean": round(statistics.mean([r["ours"]["top_score"] for r in rows]), 3), }, } if not args.no_judge: def _judge_avg(letter: str, dim: str) -> Optional[float]: vals = [r["judge_scores"].get(letter, {}).get(dim) for r in rows if r["judge_scores"]] vals = [v for v in vals if isinstance(v, (int, float))] return round(statistics.mean(vals), 2) if vals else None for letter, label in [("A", "ours"), ("B", "gemini"), ("C", "groq")]: summary[label]["judge"] = { d: _judge_avg(letter, d) for d in ("specificity", "safety", "language_fit", "citation", "practicality") } today = dt.date.today().isoformat() out_json = Path(f"{args.out}_{today}.json") out_json.parent.mkdir(parents=True, exist_ok=True) out_json.write_text(json.dumps({"summary": summary, "rows": rows}, ensure_ascii=False, indent=2)) md = [] md.append(f"# Benchmark — {today}\n") md.append(f"Queries: **{summary['n']}**\n\n") md.append("| Metric | Your Stack | Vanilla Gemini | Vanilla Groq |") md.append("|---|---|---|---|") for k, label in [("citation_rate", "Citation rate"), ("banned_leakage_rate", "Banned-chemical leakage"), ("gen_ms_p50", "Latency p50 (ms)"), ("avg_chars", "Avg answer length")]: md.append(f"| {label} | {summary['ours'][k]} | " f"{summary['gemini'][k]} | {summary['groq'][k]} |") if not args.no_judge: md.append("\n## Judge scores (1-10)\n") md.append("| Dimension | Your Stack | Vanilla Gemini | Vanilla Groq |") md.append("|---|---|---|---|") for d in ("specificity", "safety", "language_fit", "citation", "practicality"): md.append(f"| {d.replace('_', ' ').title()} | " f"{summary['ours'].get('judge', {}).get(d, '-')} | " f"{summary['gemini'].get('judge', {}).get(d, '-')} | " f"{summary['groq'].get('judge', {}).get(d, '-')} |") out_md = Path(f"{args.out}_{today}.md") out_md.write_text("\n".join(md)) print(f"\n[bench] summary: {json.dumps(summary, indent=2)}") print(f"[bench] wrote {out_json}") print(f"[bench] wrote {out_md}") if __name__ == "__main__": raise SystemExit(main())