Spaces:
Sleeping
Sleeping
| """ | |
| eval_llm.py | |
| =========== | |
| Evaluate a local HuggingFace model (base or GRPO-trained) against the | |
| IntelliCredit environment server and save results in the SAME format as | |
| baseline_results.json β so compare_results.py works directly. | |
| Usage: | |
| # Evaluate the GRPO-trained model (default): | |
| python eval_llm.py | |
| # Evaluate the base model: | |
| python eval_llm.py --model mistralai/Mistral-7B-Instruct-v0.3 --out base_results.json | |
| # Evaluate the trained model explicitly: | |
| python eval_llm.py --model vssksn/intellicredit-mistral-7b-grpo --out grpo_results.json | |
| # Then compare: | |
| python compare_results.py --baseline baseline_results.json --after grpo_results.json | |
| Requirements: | |
| pip install transformers torch accelerate | |
| Environment server must be running at ENV_URL (default: http://localhost:7860) | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import re | |
| import sys | |
| import textwrap | |
| import time | |
| import uuid | |
| import requests | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Configuration | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| ENV_URL = os.getenv("ENV_URL", "http://localhost:7860") | |
| DEFAULT_MODEL = "vssksn/intellicredit-mistral-7b-grpo" | |
| BASE_MODEL = "mistralai/Mistral-7B-Instruct-v0.3" | |
| TEMPERATURE = 0.2 | |
| MAX_NEW_TOKENS = 128 # We only need APPROVE/CONDITIONAL/REJECT (+ brief reasoning) | |
| TASK_CONFIGS = { | |
| "task1": {"num_steps": 5, "description": "Easy β Clean profiles"}, | |
| "task2": {"num_steps": 8, "description": "Medium β Forensic alerts"}, | |
| "task3": {"num_steps": 12, "description": "Medium-Hard β Macro shocks + missing data"}, | |
| } | |
| SYSTEM_PROMPT = textwrap.dedent(""" | |
| You are an experienced Senior Credit Officer at an Indian MSME lending institution. | |
| You are evaluating corporate credit applications one at a time. | |
| For each application, review the financial profile, banking behavior, GST compliance, | |
| management quality, and any forensic alerts carefully. | |
| You must respond with EXACTLY ONE of these decisions: | |
| - APPROVE (action 0): Strong financials, low risk, no red flags | |
| - CONDITIONAL (action 1): Acceptable but with concerns requiring monitoring | |
| - REJECT (action 2): High risk, hard rule violations, or severe red flags | |
| IMPORTANT RULES: | |
| - If DSCR < 1.0 β ALWAYS REJECT (regulatory requirement) | |
| - If any RED forensic alert β ALWAYS REJECT | |
| - If Cheque Bounce Rate > 25% β ALWAYS REJECT | |
| - If CIBIL < 550 equivalent β Strong lean toward REJECT | |
| - Consider portfolio diversification and capital constraints | |
| - When financial data shows 'DATA UNAVAILABLE', treat this as increased uncertainty. | |
| Missing critical metrics (DSCR, Current Ratio, Collateral) should bias toward | |
| CONDITIONAL or REJECT unless other indicators are exceptionally strong. | |
| - Consider portfolio alerts (CC Spike, Bounce Surge, etc.) as systemic risk signals. | |
| Respond with ONLY the decision word: APPROVE, CONDITIONAL, or REJECT. | |
| Do NOT include explanations or any other text. | |
| """).strip() | |
| ACTION_MAP = { | |
| "approve": 0, | |
| "conditional": 1, | |
| "reject": 2, | |
| "conditional_approve": 1, | |
| "conditional approve": 1, | |
| } | |
| DECISION_LABELS = {0: "APPROVE", 1: "CONDITIONAL", 2: "REJECT"} | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Environment HTTP Client (same as inference.py) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class EnvHTTPClient: | |
| def __init__(self, base_url: str): | |
| self.base = base_url.rstrip("/") | |
| self.session = requests.Session() | |
| self.session.headers.update({"Content-Type": "application/json"}) | |
| def health(self) -> bool: | |
| try: | |
| return self.session.get(f"{self.base}/health", timeout=15).status_code == 200 | |
| except Exception: | |
| return False | |
| def reset(self, task_id: str, seed: int = 42, episode_id: str | None = None) -> dict: | |
| body: dict = {"seed": seed, "task_id": task_id} | |
| if episode_id: | |
| body["episode_id"] = episode_id | |
| r = self.session.post(f"{self.base}/reset", json=body, timeout=60) | |
| r.raise_for_status() | |
| return r.json() | |
| def step(self, action: dict, episode_id: str | None = None) -> dict: | |
| body: dict = {"action": action} | |
| if episode_id: | |
| body["episode_id"] = episode_id | |
| r = self.session.post(f"{self.base}/step", json=body, timeout=60) | |
| r.raise_for_status() | |
| return r.json() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Local LLM Agent | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class LocalLLMAgent: | |
| """ | |
| Wraps a local HuggingFace model (loaded via transformers pipeline) | |
| and provides a decision() method compatible with the environment loop. | |
| """ | |
| def __init__(self, model_name: str): | |
| print(f"\n{'β'*60}") | |
| print(f" Loading model: {model_name}") | |
| print(f"{'β'*60}") | |
| from transformers import pipeline, AutoTokenizer | |
| import torch | |
| self.model_name = model_name | |
| device = 0 if __import__("torch").cuda.is_available() else -1 | |
| device_label = "GPU (CUDA)" if device == 0 else "CPU" | |
| print(f" Device: {device_label}") | |
| # Load tokenizer separately to apply chat template | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| self.pipe = pipeline( | |
| "text-generation", | |
| model=model_name, | |
| tokenizer=self.tokenizer, | |
| device=device, | |
| dtype=__import__("torch").bfloat16 if device == 0 else None, | |
| trust_remote_code=True, | |
| ) | |
| # Override any saved generation_config that might conflict with our | |
| # max_new_tokens (e.g. a saved max_length=20 from GRPO training). | |
| from transformers import GenerationConfig | |
| self.pipe.model.generation_config = GenerationConfig( | |
| max_new_tokens=MAX_NEW_TOKENS, | |
| do_sample=True, | |
| temperature=TEMPERATURE, | |
| pad_token_id=self.tokenizer.eos_token_id, | |
| ) | |
| print(f" β Model ready\n") | |
| def decide(self, obs_data: dict) -> int: | |
| """ | |
| Format the observation as a prompt, call the model, parse the decision. | |
| Returns: 0 (APPROVE), 1 (CONDITIONAL), 2 (REJECT) | |
| """ | |
| user_prompt = self._build_prompt(obs_data) | |
| messages = [ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_prompt}, | |
| ] | |
| try: | |
| output = self.pipe( | |
| messages, | |
| # All generation params already set in generation_config above; | |
| # passing them again here causes the deprecation warning. | |
| ) | |
| # pipeline returns list of dicts; extract the generated assistant turn | |
| generated = output[0]["generated_text"] | |
| if isinstance(generated, list): | |
| # Chat format: list of role/content dicts | |
| response_text = generated[-1].get("content", "") | |
| else: | |
| response_text = str(generated) | |
| except Exception as exc: | |
| print(f" β LLM call failed: {exc} β defaulting to REJECT", file=sys.stderr) | |
| response_text = "REJECT" | |
| return self._parse(response_text) | |
| def _build_prompt(self, obs_data: dict) -> str: | |
| """Build user message from environment observation (same format as inference.py).""" | |
| app_summary = obs_data.get("application_summary", {}) | |
| summary = app_summary.get("text_summary", "") if isinstance(app_summary, dict) else str(app_summary) | |
| portfolio_state = obs_data.get("portfolio_state", [0] * 10) | |
| alert_state = obs_data.get("alert_state", [0] * 5) | |
| alert_labels = ["CC Spike", "Bounce Surge", "GST Filing Miss", "Adverse Media", "Credit Degradation"] | |
| active_alerts = [ | |
| alert_labels[i] | |
| for i, v in enumerate(alert_state) | |
| if i < len(alert_labels) and isinstance(v, (int, float)) and v > 0.5 | |
| ] | |
| alert_text = ", ".join(active_alerts) if active_alerts else "None" | |
| def safe_pct(lst, idx, default=0.0): | |
| try: return float(lst[idx]) * 100 | |
| except: return default | |
| portfolio_info = ( | |
| f"\nββ PORTFOLIO STATUS ββ\n" | |
| f" Capital Remaining: {safe_pct(portfolio_state, 0):.0f}%\n" | |
| f" Capital Deployed: {safe_pct(portfolio_state, 1):.0f}%\n" | |
| f" NPA Rate: {safe_pct(portfolio_state, 2):.1f}%\n" | |
| f" CRAR: {safe_pct(portfolio_state, 9):.1f}%\n" | |
| f" Sector Conc.: {safe_pct(portfolio_state, 4):.0f}%\n" | |
| f" Portfolio Alerts: {alert_text}\n" | |
| ) | |
| return f"{summary}\n{portfolio_info}" | |
| def _parse(text: str) -> int: | |
| """Parse APPROVE / CONDITIONAL / REJECT from model output.""" | |
| if not text: | |
| return 2 | |
| lower = text.strip().lower() | |
| for key, val in ACTION_MAP.items(): | |
| if key in lower: | |
| return val | |
| numbers = re.findall(r'\b[012]\b', lower) | |
| if numbers: | |
| return int(numbers[0]) | |
| return 2 # default safe: REJECT | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Episode Runner (mirrors inference.py logic but uses local model) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_episode(env: EnvHTTPClient, agent: LocalLLMAgent, | |
| task_id: str, seed: int = 42) -> dict: | |
| config = TASK_CONFIGS[task_id] | |
| num_steps = config["num_steps"] | |
| episode_id = f"{task_id}-{uuid.uuid4().hex[:8]}" | |
| print(f"\n ββ {task_id}: {config['description']} (seed={seed}) ββ") | |
| reset_resp = env.reset(task_id=task_id, seed=seed, episode_id=episode_id) | |
| obs_data = reset_resp.get("observation", reset_resp) | |
| done = bool(reset_resp.get("done", False) or obs_data.get("done", False)) | |
| rewards: list[float] = [] | |
| decisions: list[int] = [] # store numeric for baseline compat | |
| step = 0 | |
| while not done and step < num_steps: | |
| step += 1 | |
| decision = agent.decide(obs_data) | |
| action_str = DECISION_LABELS[decision] | |
| step_resp = env.step(action={"decision": decision}, episode_id=episode_id) | |
| obs_data = step_resp.get("observation", step_resp) | |
| reward = float(step_resp.get("reward", 0.0) or 0.0) | |
| done = bool(step_resp.get("done", False) or obs_data.get("done", False)) | |
| rewards.append(reward) | |
| decisions.append(decision) | |
| print(f" Step {step:2d}: {action_str:<12} | reward={reward:+.2f} | " | |
| f"cumulative={sum(rewards):+.2f}") | |
| # ββ Compute local metrics from step rewards ββββββββββββββββββββββββββ | |
| # The HTTP API only returns {observation, reward, done} per step. | |
| # episode_score and breakdown are NOT in the response β compute locally. | |
| n_steps = len(rewards) | |
| correct = sum(1 for r in rewards if r > 0) | |
| accuracy = round(correct / n_steps, 4) if n_steps else 0.0 | |
| total_reward = round(sum(rewards), 4) | |
| # Approximate hard-rule compliance: | |
| # Large negative rewards (< -5) typically indicate hard-rule violations | |
| hr_violations = sum(1 for r in rewards if r < -5.0) | |
| hr_compliance = round(1.0 - (hr_violations / n_steps), 4) if n_steps else 1.0 | |
| # Survival: did we complete all steps without early termination? | |
| survived = step >= num_steps or not done | |
| survival_rate = 1.0 if survived else round(step / num_steps, 4) | |
| # NPA proxy: count steps with delayed negative rewards (-1 to -5 range) | |
| npa_signals = sum(1 for r in rewards if -5.0 <= r < 0) | |
| npa_rate = round(npa_signals / max(n_steps, 1), 4) | |
| # Capital utilization: approvals / total decisions | |
| n_approve = decisions.count(0) | |
| cap_util = round(n_approve / max(n_steps, 1), 4) | |
| # Weighted score (matches baseline_results.json format) | |
| score = round(accuracy * 0.5 + hr_compliance * 0.3 + survival_rate * 0.2, 4) | |
| breakdown = { | |
| "accuracy": accuracy, | |
| "correct_decisions": correct, | |
| "total_decisions": n_steps, | |
| "hard_rule_compliance": hr_compliance, | |
| "forensic_handling": hr_compliance, # proxy | |
| "npa_count": npa_signals, | |
| "npa_rate": npa_rate, | |
| "crar": 1.0, # not available via HTTP | |
| "capital_utilization": cap_util, | |
| "survival_rate": survival_rate, | |
| "episode_terminated": done and step < num_steps, | |
| "termination_reason": "", | |
| } | |
| print(f" Score: {score:.4f} | Accuracy: {accuracy:.1%} | " | |
| f"HR comply: {hr_compliance:.1%} | Reward: {total_reward:+.2f}") | |
| return { | |
| "task_id": task_id, | |
| "score": score, | |
| "total_reward": total_reward, | |
| "decisions": decisions, | |
| "breakdown": breakdown, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="IntelliCredit LLM Evaluation") | |
| parser.add_argument("--model", default=DEFAULT_MODEL, | |
| help=f"HF model name or local path (default: {DEFAULT_MODEL})") | |
| parser.add_argument("--seed", type=int, default=42, help="Environment seed") | |
| parser.add_argument("--tasks", nargs="+", default=["task1", "task2", "task3"], | |
| choices=list(TASK_CONFIGS.keys()), | |
| help="Tasks to evaluate (default: task1 task2 task3)") | |
| parser.add_argument("--env-url", default=ENV_URL, | |
| help=f"Environment server URL (default: {ENV_URL})") | |
| parser.add_argument("--out", default=None, | |
| help="Output JSON path (default: auto-named from model)") | |
| args = parser.parse_args() | |
| # Auto-name output file | |
| if args.out is None: | |
| tag = "grpo" if "grpo" in args.model.lower() else "base" | |
| args.out = f"{tag}_results.json" | |
| print("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") | |
| print("β IntelliCredit β Local LLM Evaluation β") | |
| print("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") | |
| print(f" Model : {args.model}") | |
| print(f" Tasks : {', '.join(args.tasks)}") | |
| print(f" Seed : {args.seed}") | |
| print(f" Env URL : {args.env_url}") | |
| print(f" Output : {args.out}") | |
| # ββ Check environment server ββββββββββββββββββββββββββββββββββββββββββ | |
| env = EnvHTTPClient(args.env_url) | |
| print("\n Waiting for environment server...", end="", flush=True) | |
| for attempt in range(20): | |
| if env.health(): | |
| print(" β Ready") | |
| break | |
| time.sleep(3) | |
| print(".", end="", flush=True) | |
| else: | |
| print(f"\n β Environment server not reachable at {args.env_url}/health") | |
| print(" Start it first: python -m server.app (or check ENV_URL in .env)") | |
| sys.exit(1) | |
| # ββ Load model ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| agent = LocalLLMAgent(args.model) | |
| # ββ Run episodes ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| results: dict = {} | |
| t_start = time.time() | |
| for task_id in args.tasks: | |
| try: | |
| result = run_episode(env, agent, task_id=task_id, seed=args.seed) | |
| results[task_id] = result | |
| except Exception as exc: | |
| print(f"\n β Task {task_id} failed: {exc}", file=sys.stderr) | |
| results[task_id] = { | |
| "task_id": task_id, | |
| "score": 0.0, | |
| "total_reward": 0.0, | |
| "decisions": [], | |
| "breakdown": {}, | |
| } | |
| elapsed = time.time() - t_start | |
| # ββ Summary table βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print(f"\n{'β'*60}") | |
| print(f" RESULTS SUMMARY β {args.model.split('/')[-1]}") | |
| print(f"{'β'*60}") | |
| print(f" {'Task':<8} {'Score':>7} {'Reward':>8} {'Steps':>6} {'Decision Counts'}") | |
| print(f" {'β'*52}") | |
| for task_id, r in results.items(): | |
| decs = r["decisions"] | |
| # decisions are stored as ints: 0=APPROVE, 1=CONDITIONAL, 2=REJECT | |
| n_app = decs.count(0) | |
| n_con = decs.count(1) | |
| n_rej = decs.count(2) | |
| print(f" {task_id:<8} {r['score']:>7.4f} {r['total_reward']:>8.2f} " | |
| f"{len(decs):>6} A:{n_app} C:{n_con} R:{n_rej}") | |
| avg_score = sum(r["score"] for r in results.values()) / max(len(results), 1) | |
| avg_reward = sum(r["total_reward"] for r in results.values()) / max(len(results), 1) | |
| print(f" {'β'*52}") | |
| print(f" {'AVG':<8} {avg_score:>7.4f} {avg_reward:>8.2f}") | |
| print(f" Elapsed: {elapsed:.1f}s") | |
| print(f"{'β'*60}") | |
| # ββ Save JSON (matches baseline_results.json format exactly) ββββββββββ | |
| with open(args.out, "w") as f: | |
| json.dump(results, f, indent=2) | |
| print(f"\n β Saved β {args.out}") | |
| print(f" Run comparison with:") | |
| print(f" python compare_results.py --baseline baseline_results.json --after {args.out}") | |
| if __name__ == "__main__": | |
| main() | |