""" MedVidBench Leaderboard - Interactive leaderboard for evaluating Video-Language Models on the MedVidBench benchmark across 8 medical video understanding tasks. """ import gradio as gr import pandas as pd import json import os import shutil import subprocess import sys from datetime import datetime from pathlib import Path from typing import Dict, List, Tuple, Optional from collections import defaultdict from huggingface_hub import hf_hub_download def load_ground_truth(): """ Load ground truth from private HuggingFace dataset repository. Falls back to local file for development. """ try: # Get token from environment (HF Space secret) token = os.environ.get('HF_TOKEN') if not token: print("⚠️ HF_TOKEN not found in environment, trying local file...") raise ValueError("HF_TOKEN not found") # Download from private repository print("⏳ Downloading ground truth from private repository...") gt_file = hf_hub_download( repo_id="UIIAmerica/MedVidBench-GroundTruth", filename="ground_truth.json", repo_type="dataset", token=token, cache_dir="./cache" # Cache locally to avoid re-downloading ) # Load the data with open(gt_file) as f: data = json.load(f) print(f"✓ Loaded ground truth from private repo: {len(data)} samples") return data except Exception as e: print(f"⚠️ Could not load from private repo: {e}") # Fallback to local file for development local_file = Path("data/ground_truth.json") if local_file.exists(): with open(local_file) as f: data = json.load(f) print(f"✓ Loaded ground truth from local file: {len(data)} samples") return data else: raise FileNotFoundError( "Ground truth not found. Please set HF_TOKEN secret or provide local file." ) # Configuration SUBMISSIONS_DIR = Path("submissions") RESULTS_DIR = Path("results") LEADERBOARD_FILE = Path("leaderboard.json") EVAL_SCRIPT = Path("evaluation/evaluate_all_pai.py") # Local copy in repo # Ensure directories exist SUBMISSIONS_DIR.mkdir(exist_ok=True) RESULTS_DIR.mkdir(exist_ok=True) # Load ground truth at startup print("=" * 60) print("LOADING GROUND TRUTH DATA") print("=" * 60) GROUND_TRUTH = load_ground_truth() print("=" * 60) # MedVidBench Metrics Definitions (10 metrics from 8 tasks) # Note: TAL has 2 metrics, DVC has 2 metrics, others have 1 metric each METRICS = { "cvs_acc": { "name": "CVS_acc", "full_name": "CVS Assessment Accuracy", "higher_better": True, "description": "Clinical variable scoring accuracy" }, "nap_acc": { "name": "NAP_acc", "full_name": "Next Action Prediction Accuracy", "higher_better": True, "description": "Accuracy in predicting next surgical step" }, "sa_acc": { "name": "SA_acc", "full_name": "Skill Assessment Accuracy", "higher_better": True, "description": "Surgical skill level evaluation accuracy" }, "stg_miou": { "name": "STG_mIoU", "full_name": "Spatiotemporal Grounding mIoU", "higher_better": True, "description": "Mean IoU for spatial+temporal localization" }, "tag_miou_03": { "name": "TAG_mIoU@0.3", "full_name": "Temporal Action Grounding mIoU@0.3", "higher_better": True, "description": "Mean IoU at threshold 0.3 for temporal localization" }, "tag_miou_05": { "name": "TAG_mIoU@0.5", "full_name": "Temporal Action Grounding mIoU@0.5", "higher_better": True, "description": "Mean IoU at threshold 0.5 for temporal localization" }, "dvc_llm": { "name": "DVC_llm", "full_name": "Dense Video Captioning LLM Score", "higher_better": True, "description": "Caption quality score (LLM judge or semantic similarity)" }, "dvc_f1": { "name": "DVC_F1", "full_name": "Dense Video Captioning F1", "higher_better": True, "description": "F1 score for temporal segment localization" }, "vs_llm": { "name": "VS_llm", "full_name": "Video Summary LLM Score", "higher_better": True, "description": "Video summary quality score" }, "rc_llm": { "name": "RC_llm", "full_name": "Region Caption LLM Score", "higher_better": True, "description": "Region caption quality score" }, } # Task descriptions for Tasks & Metrics tab TASKS = { "tal": { "name": "Temporal Action Localization (TAL)", "key": "tal", "metrics": "TAG_mIoU@0.3, TAG_mIoU@0.5", "description": "Identify and temporally localize surgical actions in video" }, "stg": { "name": "Spatiotemporal Grounding (STG)", "key": "stg", "metrics": "STG_mIoU", "description": "Localize objects in both space (bbox) and time (temporal span)" }, "next_action": { "name": "Next Action Prediction (NAP)", "key": "next_action", "metrics": "NAP_acc", "description": "Predict the next surgical step given current video context" }, "dvc": { "name": "Dense Video Captioning (DVC)", "key": "dvc", "metrics": "DVC_llm, DVC_F1", "description": "Generate captions for multiple events with temporal localization" }, "vs": { "name": "Video Summary (VS)", "key": "vs", "metrics": "VS_llm", "description": "Generate comprehensive summary of surgical procedure" }, "rc": { "name": "Region Caption (RC)", "key": "rc", "metrics": "RC_llm", "description": "Describe specific spatial regions in surgical frames" }, "skill_assessment": { "name": "Skill Assessment (SA)", "key": "skill_assessment", "metrics": "SA_acc", "description": "Evaluate surgeon skill level (novice/intermediate/expert)" }, "cvs_assessment": { "name": "CVS Assessment", "key": "cvs_assessment", "metrics": "CVS_acc", "description": "Score clinical variables in surgical performance" }, } # Test set statistics TEST_SET_STATS = { "total_samples": 6245, "datasets": ["AVOS", "CholecT50", "CholecTrack20", "Cholec80_CVS", "CoPESD", "EgoSurgery", "NurViD", "JIGSAWS"], "video_frames": 103742, } def load_leaderboard() -> pd.DataFrame: """Load existing leaderboard from JSON file.""" if LEADERBOARD_FILE.exists(): with open(LEADERBOARD_FILE, 'r') as f: data = json.load(f) if data: df = pd.DataFrame(data) # Remove any 'average' column from old leaderboard format if 'average' in df.columns: df = df.drop('average', axis=1) # Sort by first metric (CVS_acc) descending - no overall average if 'cvs_acc' in df.columns: df = df.sort_values('cvs_acc', ascending=False).reset_index(drop=True) return df # Return empty dataframe with correct structure (no average column) columns = ["rank", "model_name", "organization"] + list(METRICS.keys()) + ["date", "contact"] return pd.DataFrame(columns=columns) def save_leaderboard(df: pd.DataFrame): """Save leaderboard to JSON file.""" # Add rank column df['rank'] = range(1, len(df) + 1) # Save to JSON with open(LEADERBOARD_FILE, 'w') as f: json.dump(df.to_dict('records'), f, indent=2) def detect_evaluation_output_format(file_path: str) -> Tuple[bool, str]: """ Detect if uploaded file is pre-processed evaluation output with LLM judge scores. Expected evaluation output format: { "model_name": "...", "evaluated_samples": [ { "sample_id": "...", "dataset": "...", "evaluations": { "0.3": [{"prediction": "...", "ground_truth": "...", "llm_judge": {"R2": 4, "R3": 2, ...}}], "0.5": [...], "0.7": [...] } } ], "aggregated_results": {...} } Returns: (is_evaluation_output, message) Note: Pre-processed files typically only have LLM judge scores for captioning tasks (DVC, VS, RC). Other metrics (TAL, STG, NAP, SA, CVS) still need to be calculated from raw inference results. """ try: with open(file_path, 'r') as f: data = json.load(f) # Check for evaluation output structure if isinstance(data, dict): has_model_name = "model_name" in data has_evaluated_samples = "evaluated_samples" in data has_aggregated_results = "aggregated_results" in data if has_model_name and has_evaluated_samples and has_aggregated_results: # Verify structure by checking first evaluated sample if len(data["evaluated_samples"]) > 0: sample = data["evaluated_samples"][0] if "evaluations" in sample and isinstance(sample["evaluations"], dict): return True, "✓ Detected pre-processed evaluation output with LLM judge scores (captioning tasks only)" return False, "Not an evaluation output file" except Exception as e: return False, f"Error detecting format: {str(e)}" def check_for_precomputed_llm_scores(file_path: str) -> Tuple[bool, Optional[Dict]]: """ Check if the results file has pre-computed LLM judge scores in struc_info. Returns: (has_precomputed_scores, llm_score_dict or None) llm_score_dict format: {'dvc': score, 'vs': score, 'rc': score} """ try: with open(file_path, 'r') as f: data = json.load(f) # Handle both list and dict formats if isinstance(data, dict): records = list(data.values()) elif isinstance(data, list): records = data else: return False, None if len(records) == 0: return False, None # Check if records have struc_info with llm_judge scores has_llm_scores = False for record in records[:10]: # Check first 10 samples if "struc_info" in record: struc_info = record.get("struc_info", []) if isinstance(struc_info, list) and len(struc_info) > 0: for item in struc_info: if isinstance(item, dict) and "llm_judge" in item: has_llm_scores = True break if has_llm_scores: break return has_llm_scores, None # Return None for score dict as we'll get it from evaluation except Exception as e: return False, None def validate_results_file(file_path: str) -> Tuple[bool, str, bool]: """ Validate uploaded file - accepts both prediction-only and merged formats. Expected format for predictions (preferred): [ { "id": "video_id&&start&&end&&fps", "qa_type": "tal/stg/next_action/dvc/vs/rc/skill_assessment/cvs_assessment", "prediction": "Model's answer..." }, ... ] Also accepts merged format (for testing): { "0": { "metadata": {...}, "qa_type": "tal", "question": "...", "answer": "...", "gnd": "...", "struc_info": [...] }, ... } Returns: (valid, message, has_precomputed_llm_scores) """ try: with open(file_path, 'r') as f: data = json.load(f) # Handle both list and dict formats if isinstance(data, dict): records = list(data.values()) elif isinstance(data, list): records = data else: return False, f"Invalid format: expected list or dict, got {type(data)}", False if len(records) == 0: return False, "Empty predictions file", False # Check first record to detect format sample = records[0] # Format 1: Prediction-only (id, qa_type, prediction) is_prediction_only = "id" in sample and "prediction" in sample # Format 2: Merged (metadata, question, answer, gnd, struc_info) is_merged = "metadata" in sample and "question" in sample and "answer" in sample if is_prediction_only: # Validate prediction-only format if "qa_type" not in sample: return False, "Missing required field: 'qa_type'", False # Check qa_type is valid valid_qa_types = ["tal", "stg", "next_action", "dense_captioning", "video_summary", "region_caption", "skill_assessment", "cvs_assessment"] qa_type = sample.get("qa_type", "") if not any(valid in qa_type for valid in valid_qa_types): return False, f"Invalid qa_type: {qa_type}", False # Check if file has reasonable number of samples if len(records) < 5000: return False, f"Too few samples ({len(records)}). Expected ~6245 samples for full test set.", False return True, f"✓ Valid predictions file (prediction-only format) with {len(records)} samples", False elif is_merged: # Validate merged format if "qa_type" not in sample: return False, "Missing required field: 'qa_type'", False # Check if file has reasonable number of samples if len(records) < 5000: return False, f"Too few samples ({len(records)}). Expected ~6245 samples for full test set.", False return True, f"✓ Valid predictions file (merged format) with {len(records)} samples", False else: return False, "Invalid format: Must be either prediction-only (id, qa_type, prediction) or merged format (metadata, question, answer)", False except json.JSONDecodeError as e: return False, f"Invalid JSON: {str(e)}", False except Exception as e: return False, f"Error validating file: {str(e)}", False def extract_metrics_from_evaluation_output(file_path: str) -> Tuple[bool, Dict, str]: """ Extract metrics directly from pre-processed evaluation output. Returns: (success, metrics_dict, message) """ try: with open(file_path, 'r') as f: data = json.load(f) if not isinstance(data, dict) or "aggregated_results" not in data: return False, {}, "Invalid evaluation output structure" aggregated = data["aggregated_results"] metrics = {} # Extract metrics from aggregated results # The structure varies by task, so we need to handle each carefully # TAL: Extract mIoU@0.3 and mIoU@0.5 if "tal" in aggregated or "TAL" in aggregated: tal_results = aggregated.get("tal", aggregated.get("TAL", {})) if "overall" in tal_results: overall = tal_results["overall"] metrics["tag_miou_03"] = overall.get("meanIoU@0.3", 0.0) metrics["tag_miou_05"] = overall.get("meanIoU@0.5", 0.0) # STG: Extract mIoU if "stg" in aggregated or "STG" in aggregated: stg_results = aggregated.get("stg", aggregated.get("STG", {})) if "overall" in stg_results: overall = stg_results["overall"] metrics["stg_miou"] = overall.get("mean_iou", 0.0) # Next Action: Extract accuracy if "next_action" in aggregated or "NEXT_ACTION" in aggregated: nap_results = aggregated.get("next_action", aggregated.get("NEXT_ACTION", {})) if "overall" in nap_results: overall = nap_results["overall"] metrics["nap_acc"] = overall.get("accuracy", 0.0) # DVC: Extract caption_score and temporal_f1 if "dvc" in aggregated or "DVC" in aggregated or "dense_captioning" in aggregated: dvc_results = aggregated.get("dvc", aggregated.get("DVC", aggregated.get("dense_captioning", {}))) if "overall" in dvc_results: overall = dvc_results["overall"] metrics["dvc_llm"] = overall.get("caption_score", 0.0) metrics["dvc_f1"] = overall.get("temporal_f1", 0.0) # VS: Extract LLM score if "vs" in aggregated or "VS" in aggregated or "video_summary" in aggregated: vs_results = aggregated.get("vs", aggregated.get("VS", aggregated.get("video_summary", {}))) if "overall" in vs_results: overall = vs_results["overall"] # Try multiple possible field names metrics["vs_llm"] = overall.get("score", overall.get("average_score", overall.get("caption_score", 0.0))) # RC: Extract LLM score if "rc" in aggregated or "RC" in aggregated or "region_caption" in aggregated: rc_results = aggregated.get("rc", aggregated.get("RC", aggregated.get("region_caption", {}))) if "overall" in rc_results: overall = rc_results["overall"] # Try multiple possible field names metrics["rc_llm"] = overall.get("score", overall.get("average_score", overall.get("caption_score", 0.0))) # Skill Assessment: Extract accuracy if "skill_assessment" in aggregated or "SKILL" in aggregated: sa_results = aggregated.get("skill_assessment", aggregated.get("SKILL", {})) if "overall" in sa_results: overall = sa_results["overall"] metrics["sa_acc"] = overall.get("accuracy", 0.0) # CVS Assessment: Extract accuracy if "cvs_assessment" in aggregated or "CVS" in aggregated: cvs_results = aggregated.get("cvs_assessment", aggregated.get("CVS", {})) if "overall" in cvs_results: overall = cvs_results["overall"] metrics["cvs_acc"] = overall.get("accuracy", 0.0) # Check if we got all 10 metrics if len(metrics) < 10: missing = [m for m in METRICS.keys() if m not in metrics] return False, metrics, f"Incomplete metrics extracted. Missing: {missing}" return True, metrics, "✓ Metrics extracted from pre-processed evaluation output" except Exception as e: return False, {}, f"Error extracting metrics: {str(e)}" def run_evaluation(results_file: str, model_name: str, has_precomputed_llm: bool = False, log_callback=None) -> Tuple[bool, Dict, str]: """ Run evaluation using evaluate_predictions.py wrapper. Handles both prediction-only and merged formats automatically. Args: results_file: Path to predictions JSON (either format) model_name: Name of the model has_precomputed_llm: Not used (kept for compatibility) log_callback: Optional callback function(line) to stream logs Returns: (success, metrics_dict, message) """ try: # Create output directory for this submission output_dir = RESULTS_DIR / model_name.replace(" ", "_") output_dir.mkdir(exist_ok=True) # Save user file input_file = output_dir / "input.json" shutil.copy(results_file, input_file) # Use evaluate_predictions.py wrapper which handles both formats eval_wrapper = Path("evaluation/evaluate_predictions.py") cmd = [ sys.executable, str(eval_wrapper), str(input_file), "--grouping", "overall", "--ground-truth", "data/ground_truth.json" ] print("=" * 60) print("RUNNING EVALUATION") print("=" * 60) print(f"Command: {' '.join(cmd)}") # Run with real-time output streaming process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1 # Line buffered ) # Capture output while streaming to callback output_lines = [] for line in process.stdout: line = line.rstrip() output_lines.append(line) print(line) # Print to server logs # Stream to callback if provided if log_callback and line.strip(): log_callback(line) # Wait for completion process.wait(timeout=600) if process.returncode != 0: full_output = '\n'.join(output_lines) return False, {}, f"Evaluation failed (exit code {process.returncode})" # Parse output to extract metrics full_output = '\n'.join(output_lines) metrics = parse_evaluation_output(full_output) # Save evaluation output with open(output_dir / "eval_output.txt", 'w') as f: f.write(full_output) print(f"✓ Evaluation completed") print("=" * 60) return True, metrics, "✓ Evaluation completed successfully" except subprocess.TimeoutExpired: if process: process.kill() return False, {}, "Evaluation timed out (>10 minutes)" except Exception as e: return False, {}, f"Error running evaluation: {str(e)}" def parse_evaluation_output(output: str) -> Dict[str, float]: """ Parse evaluation output to extract 10 metrics. Returns dict with keys: cvs_acc, nap_acc, sa_acc, stg_miou, tag_miou_03, tag_miou_05, dvc_llm, dvc_f1, vs_llm, rc_llm """ metrics = {} lines = output.split('\n') current_task = None current_iou_section = None # Track IoU_0.3 or IoU_0.5 sections for TAL for i, line in enumerate(lines): line = line.strip() # Detect task headers if "TAL" in line and "Overall" in line: current_task = "tal" elif "STG" in line and "Overall" in line: current_task = "stg" elif "NEXT_ACTION" in line and "Overall" in line or "Next Action" in line: current_task = "next_action" elif "DVC" in line and "Overall" in line or "Dense Video Captioning" in line: current_task = "dvc" elif "RC" in line and "Overall" in line or "Region Caption" in line: current_task = "rc" elif "VS" in line and "Overall" in line or "Video Summary" in line: current_task = "vs" elif "SKILL" in line and "Overall" in line or "Skill Assessment" in line: current_task = "skill_assessment" elif "CVS" in line and "Overall" in line or "CVS Assessment" in line: current_task = "cvs_assessment" # Detect IoU sections for TAL (new format) if current_task == "tal": if "IoU_0.3:" in line: current_iou_section = "0.3" elif "IoU_0.5:" in line: current_iou_section = "0.5" # Extract metrics based on task if current_task: # TAL: Extract both mIoU@0.3 and mIoU@0.5 if current_task == "tal": # Old format: meanIoU@0.3 or mIoU@0.3 if "meanIoU@0.3" in line or "mIoU@0.3" in line: try: value = float(line.split(":")[-1].strip()) metrics["tag_miou_03"] = value except: pass if "meanIoU@0.5" in line or "mIoU@0.5" in line: try: value = float(line.split(":")[-1].strip()) metrics["tag_miou_05"] = value except: pass # New format: IoU_0.3: section with meanIoU: if current_iou_section and "meanIoU:" in line and "meanIoU@" not in line: try: value = float(line.split(":")[-1].strip()) if current_iou_section == "0.3": metrics["tag_miou_03"] = value elif current_iou_section == "0.5": metrics["tag_miou_05"] = value except: pass # STG: Extract mIoU elif current_task == "stg" and ("mean_iou" in line.lower() or "miou" in line.lower()): try: value = float(line.split(":")[-1].strip()) metrics["stg_miou"] = value except: pass # Next Action: Extract accuracy elif current_task == "next_action" and "accuracy" in line.lower(): try: value = float(line.split(":")[-1].strip()) metrics["nap_acc"] = value except: pass # DVC: Extract both caption_score and temporal_f1 elif current_task == "dvc": if "caption_score" in line.lower() or "caption score" in line.lower(): try: value = float(line.split(":")[-1].strip()) metrics["dvc_llm"] = value except: pass if "temporal_f1" in line.lower() or "temporal f1" in line.lower(): try: value = float(line.split(":")[-1].strip()) metrics["dvc_f1"] = value except: pass # VS: Extract LLM score elif current_task == "vs" and ("score" in line.lower() or "average" in line.lower()): try: value = float(line.split(":")[-1].strip()) metrics["vs_llm"] = value except: pass # RC: Extract LLM score elif current_task == "rc" and ("score" in line.lower() or "average" in line.lower()): try: value = float(line.split(":")[-1].strip()) metrics["rc_llm"] = value except: pass # Skill Assessment: Extract Overall Accuracy (not Aspect Balanced Accuracy) elif current_task == "skill_assessment" and "overall accuracy:" in line.lower() and "aspect" not in line.lower(): try: # Extract from "Overall Accuracy: 0.2437 (39/160)" value = float(line.split(":")[1].split("(")[0].strip()) metrics["sa_acc"] = value except: pass # CVS Assessment: Extract accuracy (not component_balanced_accuracy) elif current_task == "cvs_assessment" and "accuracy:" in line and "component_balanced" not in line: try: value = float(line.split(":")[-1].strip()) metrics["cvs_acc"] = value except: pass return metrics def submit_model(file, model_name: str, organization: str, contact: str = "", progress=gr.Progress()): """ Process model submission: validate, evaluate, and add to leaderboard. Yields progress updates during evaluation. Returns: (success, message) """ # Validation if not file: yield "❌ Please upload a results file" return if not model_name or not organization: yield "❌ Please provide both model name and organization" return # Step 1: Check if model exists progress(0.05, desc="Checking model name...") yield "🔍 **Step 1/6**: Checking if model name is available..." df = load_leaderboard() if model_name in df['model_name'].values: yield f"❌ Model '{model_name}' already exists in leaderboard. Please use a different name." return # Step 2: Validate file format progress(0.15, desc="Validating file format...") yield "📋 **Step 2/6**: Validating predictions file format..." valid, msg, has_precomputed_llm = validate_results_file(file.name) if not valid: yield f"❌ Invalid results file: {msg}" return yield f"✓ {msg}" # Step 3: Run evaluation with real-time log streaming progress(0.25, desc="Running evaluation...") import time # Start evaluation eval_wrapper = Path("evaluation/evaluate_predictions.py") output_dir = RESULTS_DIR / model_name.replace(" ", "_") output_dir.mkdir(exist_ok=True) input_file = output_dir / "input.json" shutil.copy(file.name, input_file) cmd = [ sys.executable, "-u", # Unbuffered output str(eval_wrapper), str(input_file), "--grouping", "overall", "--ground-truth", "data/ground_truth.json", "--skip-llm-judge" # Skip DVC/VS/RC for faster testing ] process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, env={**os.environ, "PYTHONUNBUFFERED": "1"} # Force unbuffered ) yield "⚙️ **Step 3/6**: Running evaluation (streaming logs)...\n\n```\nStarting evaluation subprocess...\n```" log_buffer = [] last_update = time.time() last_heartbeat = time.time() line_count = 0 start_time = time.time() # Non-blocking read with select (for timeout handling) import select while True: # Check if process has finished if process.poll() is not None: # Process finished, read any remaining output remaining = process.stdout.read() if remaining: for line in remaining.split('\n'): line = line.rstrip() if line.strip() and 'WARNING: All log messages' not in line: log_buffer.append(line) break # Check for available output (with timeout) ready, _, _ = select.select([process.stdout], [], [], 0.5) if ready: # Read available line line = process.stdout.readline() if not line: break line = line.rstrip() if not line.strip(): continue # Filter noise if 'WARNING: All log messages' in line: continue log_buffer.append(line) line_count += 1 last_heartbeat = time.time() # Update UI every 0.5 seconds if time.time() - last_update > 0.5: # Show heartbeat if no logs yet if not log_buffer: elapsed = int(time.time() - start_time) log_text = f"⚙️ **Step 3/6**: Running evaluation...\n\n```\nWaiting for evaluation output... ({elapsed}s elapsed)\n```" yield log_text else: # Show last 25 lines recent = log_buffer[-25:] log_text = "⚙️ **Step 3/6**: Running evaluation...\n\n```\n" log_text += '\n'.join(recent) log_text += "\n```" yield log_text last_update = time.time() # Increment progress gradually from 25% to 75% # Assume ~500 lines of output, increment by 0.1% per line progress_increment = min(0.75, 0.25 + (line_count / 500) * 0.50) progress(progress_increment, desc="Running evaluation...") # Wait for process to fully complete process.wait() # Final log display if log_buffer: final_logs = log_buffer[-30:] log_text = "⚙️ **Step 3/6**: Evaluation completed\n\n```\n" log_text += '\n'.join(final_logs) log_text += "\n```" yield log_text # Save output with open(output_dir / "eval_output.txt", 'w') as f: f.write('\n'.join(log_buffer)) # Check if evaluation succeeded if process.returncode != 0: yield f"\n❌ Evaluation failed (exit code {process.returncode})" return # Parse metrics from output full_output = '\n'.join(log_buffer) metrics = parse_evaluation_output(full_output) if not metrics: yield f"\n❌ Failed to parse evaluation metrics" return # Step 4: Check metrics progress(0.80, desc="Validating metrics...") yield "✓ Evaluation completed!" yield "🔍 **Step 4/6**: Validating extracted metrics..." # Fill missing caption metrics with 0 if skip-llm-judge was used caption_metrics = ['dvc_llm', 'dvc_f1', 'vs_llm', 'rc_llm'] missing_metrics = [m for m in METRICS.keys() if m not in metrics] # If only caption metrics are missing, fill them with 0 missing_caption = [m for m in missing_metrics if m in caption_metrics] missing_other = [m for m in missing_metrics if m not in caption_metrics] if missing_caption: for metric in missing_caption: metrics[metric] = 0.0 yield f"⚠️ Skipped caption tasks, setting to 0: {missing_caption}" if missing_other: yield f"❌ Evaluation incomplete. Missing required metrics: {missing_other}" return yield f"✓ All 10 metrics successfully computed" # Step 5: Add to leaderboard progress(0.90, desc="Adding to leaderboard...") yield "📊 **Step 5/6**: Adding model to leaderboard..." new_entry = { "model_name": model_name, "organization": organization, **{metric: round(metrics.get(metric, 0.0), 4) for metric in METRICS.keys()}, "date": datetime.now().strftime("%Y-%m-%d"), "contact": contact } df = pd.concat([df, pd.DataFrame([new_entry])], ignore_index=True) # Sort by first metric (CVS_acc) df = df.sort_values('cvs_acc', ascending=False).reset_index(drop=True) save_leaderboard(df) yield "✓ Leaderboard updated!" # Step 6: Build success message progress(1.0, desc="Complete!") yield "✅ **Step 6/6**: Submission complete!" # Build final success message success_msg = f""" --- ## ✅ Submission Successful! **Model**: {model_name} **Organization**: {organization} """ # Add note if LLM judge was skipped if has_precomputed_llm: success_msg += "\n📊 **Note**: Used pre-computed LLM judge scores from struc_info (skipped re-evaluation of DVC/VS/RC)\n" else: success_msg += "\n⚙️ **Note**: Full evaluation completed (including LLM judge for DVC/VS/RC)\n" success_msg += "\n### 📈 Metric Scores\n" for metric_key, metric_info in METRICS.items(): score = metrics.get(metric_key, 0.0) success_msg += f"- **{metric_info['name']}**: {score:.4f}\n" rank = df[df['model_name'] == model_name].index[0] + 1 success_msg += f"\n### 🏆 Ranking\n**Rank**: #{rank} out of {len(df)} models\n" success_msg += "\nRefresh the Leaderboard tab to see your model's position!" yield success_msg def format_leaderboard_display(df: pd.DataFrame) -> pd.DataFrame: """Format leaderboard dataframe for display with 10 metrics (no average).""" if df.empty: return df # Remove 'average' column if it exists (from old format) if 'average' in df.columns: df = df.drop('average', axis=1) # Create display dataframe with selected columns (no average) display_cols = ["rank", "model_name", "organization"] # Add metric columns in order for metric_key in METRICS.keys(): if metric_key in df.columns: display_cols.append(metric_key) # Add date and contact display_cols.extend(["date", "contact"]) # Filter to only existing columns display_cols = [col for col in display_cols if col in df.columns] # Rename columns for display display_df = df[display_cols].copy() # Build column names column_names = [] for col in display_cols: if col == "rank": column_names.append("Rank") elif col == "model_name": column_names.append("Model") elif col == "organization": column_names.append("Organization") elif col == "date": column_names.append("Date") elif col == "contact": column_names.append("Contact") elif col in METRICS: column_names.append(METRICS[col]["name"]) else: column_names.append(col) display_df.columns = column_names return display_df # Create Gradio interface with gr.Blocks(title="MedVidBench Leaderboard", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🏥 MedVidBench Leaderboard Interactive leaderboard for evaluating **Video-Language Models** on the **MedVidBench benchmark** - 8 medical video understanding tasks across 8 surgical datasets. 📄 **Paper**: [MedGRPO: Multi-Task Reinforcement Learning for Heterogeneous Medical Video Understanding](https://arxiv.org/abs/2512.06581) 🌐 **Project**: [yuhaosu.github.io/MedGRPO](https://yuhaosu.github.io/MedGRPO/) 💾 **Dataset**: [huggingface.co/datasets/UIIAmerica/MedVidBench](https://huggingface.co/datasets/UIIAmerica/MedVidBench) 💻 **GitHub**: [github.com/YuhaoSu/MedGRPO](https://github.com/YuhaoSu/MedGRPO) """) with gr.Tabs(): # Tab 1: Leaderboard with gr.Tab("🏆 Leaderboard"): gr.Markdown("### Current Rankings") leaderboard_table = gr.Dataframe( value=format_leaderboard_display(load_leaderboard()), interactive=False ) refresh_btn = gr.Button("🔄 Refresh Leaderboard", size="sm") refresh_btn.click( fn=lambda: format_leaderboard_display(load_leaderboard()), outputs=leaderboard_table ) # Tab 2: Submit with gr.Tab("📤 Submit Results"): gr.Markdown(""" ### Submit Your Model Results Upload your model's **predictions only** on the **MedVidBench test set (6,245 samples)** to be added to the leaderboard. #### 📋 Requirements 1. **Run inference** on the full test set (download from [HuggingFace](https://huggingface.co/datasets/UIIAmerica/MedVidBench)) 2. **Upload predictions JSON** in the format below (NO ground truth needed) 3. **Provide model info** (name, organization) #### 📄 Expected File Format Your predictions JSON should contain **6,245 samples** with this structure: ```json [ { "id": "video_id&&start&&end&&fps", "qa_type": "tal", "prediction": "Your model's answer here" }, { "id": "another_video&&0&&10&&1.0", "qa_type": "video_summary", "prediction": "The surgeon performs..." } ] ``` **Required fields**: - `id`: Sample identifier (matches test data from HuggingFace dataset) - `qa_type`: Task type (tal/stg/next_action/dense_captioning/video_summary/region_caption/skill_assessment/cvs_assessment) - `prediction`: Your model's answer (text output) **Important**: - ✅ Submit **predictions only** (no ground truth needed) - ✅ Must include all 6,245 test samples - ✅ Format can be list or dict (dict values will be extracted) - ❌ Do NOT include ground truth fields (server handles this securely) #### ⚙️ Evaluation Process After upload, the system will: 1. **Validate** your predictions file format 2. **Merge** your predictions with server-side ground truth (private) 3. **Run evaluation** for all 8 tasks across 10 metrics 4. **Add to leaderboard** if successful **Evaluation takes**: ~5-10 minutes (includes LLM judge for caption quality assessment) **Security**: Ground truth data is stored privately and never exposed to users. """) with gr.Row(): with gr.Column(): model_name_input = gr.Textbox( label="Model Name", placeholder="e.g., Qwen2.5-VL-7B-MedGRPO", info="Unique identifier for your model" ) org_input = gr.Textbox( label="Organization / Author", placeholder="e.g., University Name or Your Name", info="Who developed this model?" ) contact_input = gr.Textbox( label="Contact (Optional)", placeholder="email@example.com or github.com/username", info="For follow-up questions" ) with gr.Column(): results_file_input = gr.File( label="Upload Results JSON", file_types=[".json"], file_count="single" ) submit_btn = gr.Button("🚀 Submit to Leaderboard", variant="primary", size="lg") submission_output = gr.Markdown(label="Submission Status") # Wire up submission with progress tracking submit_btn.click( fn=submit_model, inputs=[results_file_input, model_name_input, org_input, contact_input], outputs=submission_output ) # Tab 3: Tasks & Metrics with gr.Tab("📊 Tasks & Metrics"): gr.Markdown(""" ### MedVidBench Benchmark Tasks The benchmark evaluates models across **8 diverse tasks** spanning video, segment, and frame-level understanding: """) # Create tasks table tasks_data = [] for task_key, task_info in TASKS.items(): tasks_data.append({ "Task": task_info["name"], "Key": task_info["key"], "Metrics": task_info["metrics"], "Description": task_info["description"] }) tasks_df = pd.DataFrame(tasks_data) gr.Dataframe(value=tasks_df, interactive=False) gr.Markdown(""" ### Evaluation Metrics - **TAL** (Temporal Action Localization): **mAP@0.5** - mean Average Precision at IoU threshold 0.5 - **STG** (Spatiotemporal Grounding): **mIoU** - mean Intersection over Union (spatial + temporal) - **Next Action**: **Accuracy** - Classification accuracy - **DVC** (Dense Video Captioning): **LLM Judge** - GPT-4.1/Gemini scoring (average of top-5 aspects) - **VS** (Video Summary): **LLM Judge** - GPT-4.1/Gemini scoring (average of top-5 aspects) - **RC** (Region Caption): **LLM Judge** - GPT-4.1/Gemini scoring (average of top-5 aspects) - **Skill Assessment**: **Accuracy** - Surgical skill level classification (JIGSAWS) - **CVS Assessment**: **Accuracy** - Clinical variable scoring #### LLM Judge Details Caption tasks (DVC, VS, RC) use GPT-4.1 or Gemini-Pro with rubric-based scoring (1-5 scale) across 5 key aspects: - **R2**: Relevance & Medical Terminology - **R4**: Actionable Surgical Actions - **R5**: Comprehensive Detail Level - **R7**: Anatomical & Instrument Precision - **R8**: Clinical Context & Coherence The **final score** is the average across these 5 aspects. ### Test Set Statistics - **Total samples**: 6,245 - **Source datasets**: 8 (AVOS, CholecT50, CholecTrack20, Cholec80_CVS, CoPESD, EgoSurgery, NurViD, JIGSAWS) - **Video frames**: ~103,742 - **Task distribution**: - TAL: ~800 samples - STG: ~900 samples - Next Action: ~700 samples - DVC: ~800 samples - VS: ~900 samples - RC: ~1000 samples - Skill Assessment: ~600 samples - CVS Assessment: ~545 samples """) # Tab 4: About with gr.Tab("ℹ️ About"): gr.Markdown(""" ### About MedVidBench **MedVidBench** is a comprehensive benchmark for evaluating Video-Language Models on medical and surgical video understanding. It was introduced in the **MedGRPO** paper (Multi-Task Reinforcement Learning for Heterogeneous Medical Video Understanding). #### Key Features - **8 diverse tasks** covering multiple levels of video understanding - **8 source datasets** from various surgical procedures - **6,245 test samples** with high-quality annotations - **Automatic evaluation** with standardized metrics - **LLM-based judging** for caption quality assessment #### Paper ```bibtex @article{su2024medgrpo, title={MedGRPO: Multi-Task Reinforcement Learning for Heterogeneous Medical Video Understanding}, author={Su, Yuhao and Choudhuri, Anwesa and Gao, Zhongpai and Planche, Benjamin and Nguyen, Van Nguyen and Zheng, Meng and Shen, Yuhan and Innanje, Arun and Chen, Terrence and Elhamifar, Ehsan and Wu, Ziyan}, journal={arXiv preprint arXiv:2512.06581}, year={2025} } ``` #### Links - 📄 **Paper**: [https://arxiv.org/abs/2512.06581](https://arxiv.org/abs/2512.06581) - 🌐 **Project Page**: [https://yuhaosu.github.io/MedGRPO/](https://yuhaosu.github.io/MedGRPO/) - 💾 **Dataset**: [https://huggingface.co/datasets/UIIAmerica/MedVidBench](https://huggingface.co/datasets/UIIAmerica/MedVidBench) - 💻 **GitHub**: [https://github.com/YuhaoSu/MedGRPO](https://github.com/YuhaoSu/MedGRPO) - 🏆 **Leaderboard**: [https://huggingface.co/spaces/UIIAmerica/MedVidBench-Leaderboard](https://huggingface.co/spaces/UIIAmerica/MedVidBench-Leaderboard) #### Dataset The MedVidBench benchmark includes: - 21,060 training samples - 6,245 test samples - Multi-modal annotations (video, text, temporal spans, bounding boxes) - 8 source datasets covering various medical procedures #### License - **Dataset**: CC BY-NC-SA 4.0 (Non-commercial, Share-alike) - **Leaderboard Code**: Apache 2.0 - **Evaluation Scripts**: MIT #### Contact For questions or issues: - Open an issue on [GitHub](https://github.com/YuhaoSu/MedGRPO) - Visit the [project page](https://yuhaosu.github.io/MedGRPO/) - Email: [Contact via GitHub](https://github.com/YuhaoSu) """) if __name__ == "__main__": demo.queue(default_concurrency_limit=5).launch(share=True, server_name="0.0.0.0")