"""Unified LLM cascade: Groq → Gemini → local Llama (offline). ONE place — both api/ and internal modules (HyDE, citation guard) call here. Cost rationale: Groq free tier handles ~30 req/min/model, Gemini free tier adds another 60 req/min. Combined that's enough for a few thousand farmers/day at zero cost. Local Llama (LoRA, 4-bit) is the offline last resort. """ from __future__ import annotations import logging import threading import time from typing import Iterator, Optional from . import config logger = logging.getLogger(__name__) # ── Groq ────────────────────────────────────────────────────────────────────── def _groq_generate(prompt: str, model: str, *, max_tokens: int, temperature: float, stream: bool = False): import groq client = groq.Groq(api_key=config.GROQ_API_KEY) return client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], max_tokens=max_tokens, temperature=temperature, stream=stream, ) def _try_groq(prompt: str, max_tokens: int, temperature: float, prefer_cheap: bool = False) -> Optional[str]: if not config.GROQ_API_KEY: return None models = list(reversed(config.GROQ_MODELS)) if prefer_cheap else config.GROQ_MODELS for m in models: try: resp = _groq_generate(prompt, m, max_tokens=max_tokens, temperature=temperature, stream=False) txt = resp.choices[0].message.content or "" if txt.strip(): return txt except Exception as e: msg = str(e).lower() if "429" in msg or "rate" in msg: time.sleep(1.0) logger.debug(f"[llm] groq {m} failed: {e}") continue return None def _try_groq_stream(prompt: str, max_tokens: int, prefer_cheap: bool = False) -> Optional[Iterator[str]]: if not config.GROQ_API_KEY: return None models = list(reversed(config.GROQ_MODELS)) if prefer_cheap else config.GROQ_MODELS for m in models: try: stream = _groq_generate(prompt, m, max_tokens=max_tokens, temperature=0.1, stream=True) return _consume_groq_stream(stream) except Exception as e: logger.debug(f"[llm] groq stream {m} failed: {e}") continue return None def _consume_groq_stream(stream) -> Iterator[str]: for chunk in stream: if chunk.choices: tok = chunk.choices[0].delta.content or "" if tok: yield tok # ── Gemini ──────────────────────────────────────────────────────────────────── _gemini_client = None _gemini_lock = threading.Lock() def _get_gemini_client(): global _gemini_client if _gemini_client is not None: return _gemini_client with _gemini_lock: if _gemini_client is not None: return _gemini_client try: from google import genai _gemini_client = genai.Client(api_key=config.GEMINI_API_KEY) return _gemini_client except Exception: try: import google.generativeai as legacy legacy.configure(api_key=config.GEMINI_API_KEY) return legacy except Exception as e: logger.error(f"[llm] gemini init failed: {e}") return None def _try_gemini(prompt: str, max_tokens: int, temperature: float) -> Optional[str]: if not config.GEMINI_API_KEY: return None client = _get_gemini_client() if client is None: return None for m in config.GEMINI_FALLBACK_MODELS: try: if hasattr(client, "models"): resp = client.models.generate_content(model=m, contents=prompt) if getattr(resp, "text", None): return resp.text else: resp = client.GenerativeModel(m).generate_content( prompt, generation_config={"max_output_tokens": max_tokens, "temperature": temperature}) if getattr(resp, "text", None): return resp.text except Exception as e: logger.debug(f"[llm] gemini {m} failed: {e}") continue return None def _try_gemini_stream(prompt: str) -> Optional[Iterator[str]]: if not config.GEMINI_API_KEY: return None client = _get_gemini_client() if client is None: return None try: if hasattr(client, "models"): for chunk in client.models.generate_content_stream( model=config.GEMINI_MODEL, contents=prompt): if getattr(chunk, "text", None): yield chunk.text else: stream = client.GenerativeModel(config.GEMINI_MODEL).generate_content( prompt, stream=True) for chunk in stream: if getattr(chunk, "text", None): yield chunk.text except Exception as e: yield f"[ERROR] {e}" # ── Local LoRA Llama (last-resort, only if explicitly enabled) ─────────────── def _try_local_llama(prompt: str, max_tokens: int) -> Optional[str]: if not config.USE_LOCAL_LLAMA: return None try: from .local_llama import generate as _local_gen return _local_gen(prompt, max_tokens) except Exception as e: logger.debug(f"[llm] local llama failed: {e}") return None # ── Public API ─────────────────────────────────────────────────────────────── def generate(prompt: str, *, max_tokens: int = 600, temperature: float = 0.1, prefer_cheap: bool = False) -> str: """Cascade: Groq → Gemini → local Llama. Returns "" if all fail.""" return generate_with_meta(prompt, max_tokens=max_tokens, temperature=temperature, prefer_cheap=prefer_cheap)["text"] def generate_with_meta(prompt: str, *, max_tokens: int = 600, temperature: float = 0.1, prefer_cheap: bool = False) -> dict: """Same cascade as generate(), but returns {"text", "backend"}. Backend is the actual model that produced the answer ("groq", "gemini", "local_llama", or "none" if all failed). Lets callers persist real telemetry instead of hardcoded labels. """ out = _try_groq(prompt, max_tokens, temperature, prefer_cheap=prefer_cheap) if out: return {"text": out, "backend": "groq"} out = _try_gemini(prompt, max_tokens, temperature) if out: return {"text": out, "backend": "gemini"} out = _try_local_llama(prompt, max_tokens) if out: return {"text": out, "backend": "local_llama"} return {"text": "", "backend": "none"} def generate_stream(prompt: str, *, max_tokens: int = 600, prefer_cheap: bool = False) -> Iterator[str]: stream = _try_groq_stream(prompt, max_tokens, prefer_cheap=prefer_cheap) if stream is not None: produced = False for tok in stream: produced = True yield tok if produced: return g = _try_gemini_stream(prompt) if g is not None: produced = False for tok in g: produced = True yield tok if produced: return out = _try_local_llama(prompt, max_tokens) if out: yield out else: yield "Service temporarily unavailable. Please try again." def diagnose_image(image_bytes: bytes) -> dict: """Use Gemini Vision to diagnose a crop image. Returns a dict with keys: crop, condition, problem_type, confidence, visible_symptoms.""" if not config.GEMINI_API_KEY: return {} client = _get_gemini_client() if client is None: return {} prompt = ( "You are an expert agricultural plant-disease diagnostician. " "Analyse this image and return STRICT JSON with these keys: " '{"crop": str, "condition": str, "problem_type": "disease|pest|nutrient|healthy", ' '"confidence": "High|Medium|Low", "visible_symptoms": str}. ' "If unclear, set crop or condition to 'Unknown'. JSON only, no markdown." ) try: if hasattr(client, "models"): from google.genai import types as gtypes resp = client.models.generate_content( model=config.GEMINI_MODEL, contents=[prompt, gtypes.Part.from_bytes( data=image_bytes, mime_type="image/jpeg")]) else: from PIL import Image import io img = Image.open(io.BytesIO(image_bytes)) resp = client.GenerativeModel(config.GEMINI_MODEL).generate_content( [prompt, img]) import json, re txt = (resp.text or "").strip() txt = re.sub(r"^```(?:json)?|```$", "", txt, flags=re.MULTILINE).strip() return json.loads(txt) if txt else {} except Exception as e: logger.error(f"[llm] diagnose_image failed: {e}") return {}