kcc-agri / kcc_llm.py
hritikm15's picture
KCC AgriAdvisor v1 β€” code deploy
ca88a2c verified
"""
kcc_llm.py β€” Unified LLM backend for KCC Agricultural Chatbot
==============================================================
PRIMARY: Fine-tuned Llama-3.2-3B-Instruct (KCC LoRA adapter chunk4)
Trained on 16.5M KCC Q&A pairs β€” domain-expert agricultural Hindi/English
Loaded in 4-bit (QLoRA) β€” runs on RTX 3050 4GB VRAM
FALLBACK: Gemini (Google AI Studio free tier) β€” used if GPU/model not available
Usage:
from kcc_llm import generate, generate_stream, is_llama_loaded
answer = generate(prompt)
for chunk in generate_stream(prompt): ...
The prompt format matches the Alpaca training template exactly.
"""
import os
import sys
import threading
import logging
from pathlib import Path
from typing import Iterator, Optional
logger = logging.getLogger(__name__)
# ── CUDA library path fix ─────────────────────────────────────────────────────
# torch 2.10+ requires libcusparseLt which lives in nvidia-cusparselt package.
# Pre-patch LD_LIBRARY_PATH so torch can find it on import.
def _patch_cuda_ld_path():
"""Add nvidia CUDA runtime lib paths before torch is imported."""
try:
import sysconfig
site = sysconfig.get_path("purelib") # site-packages dir
nvidia_roots = [
os.path.join(site, "nvidia", pkg, "lib")
for pkg in ("cusparselt", "cublas", "cuda_runtime", "cudnn",
"cuda_cupti", "nvjitlink", "nvtx")
]
torch_lib = os.path.join(site, "torch", "lib")
extra = [p for p in [torch_lib] + nvidia_roots if os.path.isdir(p)]
if extra:
cur = os.environ.get("LD_LIBRARY_PATH", "")
os.environ["LD_LIBRARY_PATH"] = ":".join(extra) + (":" + cur if cur else "")
except Exception:
pass
_patch_cuda_ld_path()
# ── Paths ─────────────────────────────────────────────────────────────────────
_PROJECT_DIR = Path(__file__).parent
_ADAPTER_ZIP = _PROJECT_DIR / "KCC_Chunk" / "kcc_adapter_after_chunk4.zip"
_ADAPTER_DIR = _PROJECT_DIR / "kcc_adapter" # where zip is extracted
_BASE_MODEL = "unsloth/llama-3.2-3b-instruct-unsloth-bnb-4bit" # pre-quantized by Unsloth
_MAX_NEW_TOKENS = 512
_TEMPERATURE = 0.3 # Low temp β†’ more factual, consistent
_DO_SAMPLE = True
# ── Alpaca template (must match training exactly) ─────────────────────────────
# Training used: "{instruction}\n\n### Response:\n{output}"
def _alpaca_prompt(instruction: str) -> str:
return f"{instruction}\n\n### Response:\n"
# ── Module-level model state ──────────────────────────────────────────────────
_model = None
_tokenizer = None
_pipeline = None
_load_lock = threading.Lock()
_load_attempted = False
_load_ok = False
def is_llama_loaded() -> bool:
return _load_ok
def _extract_adapter() -> bool:
"""Extract chunk4 adapter zip to _ADAPTER_DIR if not already done."""
if _ADAPTER_DIR.exists() and ((_ADAPTER_DIR / "adapter_config.json").exists()):
return True
if not _ADAPTER_ZIP.exists():
logger.warning(f"[kcc_llm] Adapter zip not found: {_ADAPTER_ZIP}")
return False
try:
import zipfile
_ADAPTER_DIR.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(str(_ADAPTER_ZIP), "r") as z:
# Zip has inner folder kcc_adapter_after_chunk4/ β€” extract to flat _ADAPTER_DIR
for member in z.namelist():
parts = Path(member).parts
if len(parts) > 1:
# Strip top-level folder
dest = _ADAPTER_DIR / Path(*parts[1:])
else:
dest = _ADAPTER_DIR / member
if member.endswith("/"):
dest.mkdir(parents=True, exist_ok=True)
else:
dest.parent.mkdir(parents=True, exist_ok=True)
with z.open(member) as src, open(dest, "wb") as dst:
dst.write(src.read())
logger.info(f"[kcc_llm] Adapter extracted β†’ {_ADAPTER_DIR}")
return True
except Exception as e:
logger.error(f"[kcc_llm] Adapter extraction failed: {e}")
return False
def _load_llama():
"""Load fine-tuned Llama model (called once, thread-safe)."""
global _model, _tokenizer, _pipeline, _load_attempted, _load_ok
with _load_lock:
if _load_attempted:
return _load_ok
_load_attempted = True
# Step 1: Extract adapter
if not _extract_adapter():
logger.warning("[kcc_llm] Adapter unavailable β€” using Gemini fallback")
return False
# Step 2: Check GPU
try:
import torch
has_gpu = torch.cuda.is_available()
if has_gpu:
vram_gb = torch.cuda.get_device_properties(0).total_memory / 1e9
logger.info(f"[kcc_llm] GPU: {torch.cuda.get_device_name(0)} ({vram_gb:.1f}GB VRAM)")
if vram_gb < 3.0:
logger.warning("[kcc_llm] <3GB VRAM β€” Llama may OOM. Trying anyway…")
except Exception:
has_gpu = False
logger.info("[kcc_llm] No GPU detected β€” Llama will use CPU (slow)")
# Step 3: Load model β€” try Unsloth first (fastest for pre-quantized model),
# then fall back to standard transformers + peft
try:
# ── Path A: Unsloth FastLanguageModel (preferred) ─────────────────
from unsloth import FastLanguageModel
import torch
logger.info(f"[kcc_llm] Loading via Unsloth: {_BASE_MODEL}")
_model, _tokenizer = FastLanguageModel.from_pretrained(
model_name=_BASE_MODEL,
max_seq_length=2048,
dtype=None,
load_in_4bit=True,
)
from peft import PeftModel
_model = PeftModel.from_pretrained(_model, str(_ADAPTER_DIR))
FastLanguageModel.for_inference(_model)
if _tokenizer.pad_token is None:
_tokenizer.pad_token = _tokenizer.eos_token
_load_ok = True
logger.info("[kcc_llm] βœ… Fine-tuned Llama loaded via Unsloth")
return True
except ImportError:
logger.info("[kcc_llm] Unsloth not available β€” trying standard transformers+peft")
except Exception as e:
logger.warning(f"[kcc_llm] Unsloth load failed ({e}) β€” trying transformers+peft")
try:
# ── Path B: Standard transformers + peft ──────────────────────────
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from peft import PeftModel
logger.info(f"[kcc_llm] Loading base: {_BASE_MODEL}")
if has_gpu:
quant_cfg = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
)
base_model = AutoModelForCausalLM.from_pretrained(
_BASE_MODEL, quantization_config=quant_cfg,
device_map="auto", trust_remote_code=True,
)
else:
# CPU: load in fp32 (slower but works without CUDA)
base_model = AutoModelForCausalLM.from_pretrained(
_BASE_MODEL, torch_dtype=torch.float32,
device_map="cpu", trust_remote_code=True,
)
logger.info("[kcc_llm] Merging LoRA adapter…")
_model = PeftModel.from_pretrained(base_model, str(_ADAPTER_DIR))
_model.eval()
_tokenizer = AutoTokenizer.from_pretrained(str(_ADAPTER_DIR))
if _tokenizer.pad_token is None:
_tokenizer.pad_token = _tokenizer.eos_token
_load_ok = True
logger.info("[kcc_llm] βœ… Fine-tuned Llama loaded via transformers+peft")
if has_gpu:
vram_used = torch.cuda.memory_allocated() / 1e9
logger.info(f"[kcc_llm] VRAM used: {vram_used:.1f}GB")
return True
except Exception as e:
logger.error(f"[kcc_llm] Model load failed: {e}")
logger.info("[kcc_llm] Falling back to Groq/Gemini")
_model = None; _tokenizer = None; _load_ok = False
return False
def _generate_llama(prompt: str, max_new_tokens: int = _MAX_NEW_TOKENS) -> str:
"""Generate text with fine-tuned Llama model."""
import torch
formatted = _alpaca_prompt(prompt)
inputs = _tokenizer(
formatted,
return_tensors="pt",
truncation=True,
max_length=1800, # Leave room for response in 2048 context
).to(_model.device)
with torch.no_grad():
output = _model.generate(
**inputs,
max_new_tokens=max_new_tokens,
temperature=_TEMPERATURE,
do_sample=_DO_SAMPLE,
top_p=0.9,
repetition_penalty=1.1,
pad_token_id=_tokenizer.eos_token_id,
eos_token_id=_tokenizer.eos_token_id,
)
# Decode only the generated tokens (exclude prompt)
generated = output[0][inputs["input_ids"].shape[1]:]
text = _tokenizer.decode(generated, skip_special_tokens=True)
return text.strip()
def _stream_llama(prompt: str, max_new_tokens: int = _MAX_NEW_TOKENS) -> Iterator[str]:
"""Stream text token by token from fine-tuned Llama."""
import torch
from transformers import TextIteratorStreamer
import threading
formatted = _alpaca_prompt(prompt)
inputs = _tokenizer(
formatted,
return_tensors="pt",
truncation=True,
max_length=1800,
).to(_model.device)
streamer = TextIteratorStreamer(
_tokenizer, skip_prompt=True, skip_special_tokens=True
)
gen_kwargs = {
**inputs,
"max_new_tokens": max_new_tokens,
"temperature": _TEMPERATURE,
"do_sample": _DO_SAMPLE,
"top_p": 0.9,
"repetition_penalty": 1.1,
"pad_token_id": _tokenizer.eos_token_id,
"eos_token_id": _tokenizer.eos_token_id,
"streamer": streamer,
}
thread = threading.Thread(target=_model.generate, kwargs=gen_kwargs)
thread.start()
for token in streamer:
yield token
thread.join()
# ── Gemini fallback ───────────────────────────────────────────────────────────
def _get_gemini_client():
"""Return cached Gemini client (re-uses step4_app's cache if possible)."""
try:
from step4_app import _get_gemini_client as _gc
return _gc()
except Exception:
pass
try:
import config
from google import genai
return genai.Client(api_key=config.GEMINI_API_KEY)
except Exception as e:
logger.error(f"[kcc_llm] Gemini client failed: {e}")
return None
def _generate_gemini(prompt: str) -> str:
client = _get_gemini_client()
if client is None:
return "⚠️ Service temporarily unavailable. Please try again."
try:
import config
result = client.models.generate_content(
model=config.GEMINI_MODEL, contents=prompt
)
return result.text
except Exception as e:
logger.error(f"[kcc_llm] Gemini generate failed: {e}")
return "⚠️ Service temporarily unavailable. Please try again."
def _stream_gemini(prompt: str) -> Iterator[str]:
client = _get_gemini_client()
if client is None:
yield "⚠️ Service temporarily unavailable."; return
try:
import config
for chunk in client.models.generate_content_stream(
model=config.GEMINI_MODEL, contents=prompt
):
if chunk.text:
yield chunk.text
except Exception as e:
yield f"[Error: {e}]"
# ── Public API ────────────────────────────────────────────────────────────────
def generate(prompt: str, max_new_tokens: int = _MAX_NEW_TOKENS,
prefer_llama: bool = True) -> str:
"""
Generate a response. Uses fine-tuned Llama if available, else Gemini.
Args:
prompt: The full assembled prompt (system + context + question)
max_new_tokens: Maximum response length
prefer_llama: If False, always use Gemini (for debugging)
Returns:
Generated text string
"""
if prefer_llama:
# Lazy-load on first call
if not _load_attempted:
_load_llama()
if _load_ok and _model is not None:
try:
return _generate_llama(prompt, max_new_tokens)
except Exception as e:
logger.error(f"[kcc_llm] Llama inference error: {e} β€” falling back to Gemini")
return _generate_gemini(prompt)
def generate_stream(prompt: str, max_new_tokens: int = _MAX_NEW_TOKENS,
prefer_llama: bool = True) -> Iterator[str]:
"""
Stream response tokens. Uses fine-tuned Llama if available, else Gemini.
"""
if prefer_llama:
if not _load_attempted:
_load_llama()
if _load_ok and _model is not None:
try:
yield from _stream_llama(prompt, max_new_tokens)
return
except Exception as e:
logger.error(f"[kcc_llm] Llama stream error: {e} β€” falling back to Gemini")
yield from _stream_gemini(prompt)
def model_info() -> dict:
"""Return info about the currently loaded model."""
if _load_ok:
return {
"backend": "llama",
"model": "Llama-3.2-3B-Instruct (KCC fine-tuned)",
"adapter": "kcc_adapter_after_chunk4",
"quantization": "4-bit NF4 (QLoRA)",
"adapter_path": str(_ADAPTER_DIR),
}
return {
"backend": "gemini",
"model": "Gemini (fallback)",
"reason": "Llama not loaded" if _load_attempted else "Llama not yet attempted",
}
# ── Background pre-load ───────────────────────────────────────────────────────
# Start loading the model in a background thread on import
# so it's ready by the time the first request comes in.
def _preload():
"""Pre-load model in background to reduce first-request latency."""
try:
_load_llama()
except Exception as e:
logger.error(f"[kcc_llm] Pre-load failed: {e}")
_preload_thread = threading.Thread(target=_preload, daemon=True, name="kcc_llm_preload")
_preload_thread.start()