"""Ensemble: RF-DETR + YOLO26x + EfficientNet classifier for low-confidence corrections.""" import argparse import json import numpy as np from pathlib import Path from PIL import Image import onnxruntime as ort import torch import timm from safetensors.torch import load_file from torchvision import transforms from ensemble_boxes import weighted_boxes_fusion RFDETR_MEANS = np.array([0.485, 0.456, 0.406], dtype=np.float32).reshape(1, 3, 1, 1) RFDETR_STDS = np.array([0.229, 0.224, 0.225], dtype=np.float32).reshape(1, 3, 1, 1) RFDETR_RES = 704 YOLO_RES = 1600 CONF_THRESH = 0.05 WBF_IOU = 0.55 WBF_SKIP = 0.15 CLS_OVERRIDE_THRESH = 0.4 # Override class if WBF score below this CLS_MIN_CONF = 0.3 # Classifier must be at least this confident to override def rfdetr_infer(session, img): arr = np.array(img.resize((RFDETR_RES, RFDETR_RES), Image.BILINEAR), dtype=np.float32) / 255.0 arr = arr.transpose(2, 0, 1)[np.newaxis, ...] arr = ((arr - RFDETR_MEANS) / RFDETR_STDS).astype(np.float32) outs = session.run(None, {session.get_inputs()[0].name: arr}) boxes, logits = outs[0][0], outs[1][0] probs = 1.0 / (1.0 + np.exp(-logits.astype(np.float64))).astype(np.float32) flat = probs.reshape(-1) k = min(300, len(flat)) topk = np.argpartition(flat, -k)[-k:] topk = topk[np.argsort(flat[topk])[::-1]] nc = logits.shape[1] q_idx, c_idx, scores = topk // nc, topk % nc, flat[topk] mask = scores > CONF_THRESH q_idx, c_idx, scores = q_idx[mask], c_idx[mask], scores[mask] sel = boxes[q_idx] x1 = sel[:, 0] - sel[:, 2] / 2 y1 = sel[:, 1] - sel[:, 3] / 2 x2 = sel[:, 0] + sel[:, 2] / 2 y2 = sel[:, 1] + sel[:, 3] / 2 return np.stack([x1, y1, x2, y2], axis=1).clip(0, 1), c_idx.astype(int), scores def yolo26_infer(session, img, orig_w, orig_h): scale = min(YOLO_RES / orig_w, YOLO_RES / orig_h) new_w, new_h = int(orig_w * scale), int(orig_h * scale) pad_w, pad_h = (YOLO_RES - new_w) // 2, (YOLO_RES - new_h) // 2 resized = img.resize((new_w, new_h), Image.BILINEAR) canvas = Image.new("RGB", (YOLO_RES, YOLO_RES), (114, 114, 114)) canvas.paste(resized, (pad_w, pad_h)) arr = np.array(canvas, dtype=np.float32) / 255.0 arr = arr.transpose(2, 0, 1)[np.newaxis, ...] output = session.run(None, {session.get_inputs()[0].name: arr})[0] preds = output[0] scores = preds[:, 4] cls_ids = preds[:, 5].astype(int) mask = scores > CONF_THRESH x1, y1, x2, y2 = preds[mask, 0], preds[mask, 1], preds[mask, 2], preds[mask, 3] scores, cls_ids = scores[mask], cls_ids[mask] boxes = np.stack([(x1-pad_w)/new_w, (y1-pad_h)/new_h, (x2-pad_w)/new_w, (y2-pad_h)/new_h], axis=1).clip(0, 1) return boxes, cls_ids, scores class ProductClassifier: def __init__(self, weights_path): self.device = "cuda" if torch.cuda.is_available() else "cpu" self.model = timm.create_model("efficientnet_b4", pretrained=False, num_classes=356) state_dict = load_file(str(weights_path)) self.model.load_state_dict(state_dict) self.model = self.model.to(self.device).eval() self.transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), ]) def classify_batch(self, crops): if not crops: return [], [] tensors = torch.stack([self.transform(c) for c in crops]).to(self.device) with torch.no_grad(): logits = self.model(tensors) probs = torch.softmax(logits, dim=1) scores, cls_ids = probs.max(dim=1) return cls_ids.cpu().tolist(), scores.cpu().tolist() def main(): parser = argparse.ArgumentParser() parser.add_argument("--input", required=True) parser.add_argument("--output", required=True) args = parser.parse_args() script_dir = Path(__file__).parent providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] rfdetr = ort.InferenceSession(str(script_dir / "inference_model.onnx"), providers=providers) yolo = ort.InferenceSession(str(script_dir / "yolo_best.onnx"), providers=providers) classifier = ProductClassifier(script_dir / "classifier.safetensors") predictions = [] for img_path in sorted(Path(args.input).iterdir()): if img_path.suffix.lower() not in (".jpg", ".jpeg", ".png"): continue image_id = int(img_path.stem.split("_")[-1]) img = Image.open(img_path).convert("RGB") orig_w, orig_h = img.size # Detect with both models rf_boxes, rf_cls, rf_scores = rfdetr_infer(rfdetr, img) yl_boxes, yl_cls, yl_scores = yolo26_infer(yolo, img, orig_w, orig_h) boxes_list = [rf_boxes.tolist(), yl_boxes.tolist()] scores_list = [rf_scores.tolist(), yl_scores.tolist()] labels_list = [rf_cls.tolist(), yl_cls.tolist()] if len(rf_boxes) == 0 and len(yl_boxes) == 0: continue fused_boxes, fused_scores, fused_labels = weighted_boxes_fusion( boxes_list, scores_list, labels_list, weights=[1.0, 1.0], iou_thr=WBF_IOU, skip_box_thr=WBF_SKIP, ) # Crop low-confidence detections for classifier verification low_conf_indices = [] crops = [] for i in range(len(fused_boxes)): if fused_scores[i] < CLS_OVERRIDE_THRESH: x1 = max(0, int(fused_boxes[i][0] * orig_w)) y1 = max(0, int(fused_boxes[i][1] * orig_h)) x2 = min(orig_w, int(fused_boxes[i][2] * orig_w)) y2 = min(orig_h, int(fused_boxes[i][3] * orig_h)) if x2 - x1 > 5 and y2 - y1 > 5: crops.append(img.crop((x1, y1, x2, y2))) low_conf_indices.append(i) # Batch classify low-confidence crops cls_results = {} if crops: batch_size = 64 for b in range(0, len(crops), batch_size): batch_crops = crops[b:b + batch_size] batch_indices = low_conf_indices[b:b + batch_size] cls_ids, cls_scores = classifier.classify_batch(batch_crops) for idx, cls_id, cls_score in zip(batch_indices, cls_ids, cls_scores): if cls_score > CLS_MIN_CONF: cls_results[idx] = cls_id # Build predictions for i in range(len(fused_boxes)): x1, y1, x2, y2 = fused_boxes[i] cat_id = int(fused_labels[i]) # Override with classifier if low confidence if i in cls_results: cat_id = cls_results[i] predictions.append({ "image_id": image_id, "category_id": cat_id, "bbox": [round(x1 * orig_w, 1), round(y1 * orig_h, 1), round((x2 - x1) * orig_w, 1), round((y2 - y1) * orig_h, 1)], "score": round(float(fused_scores[i]), 4), }) Path(args.output).parent.mkdir(parents=True, exist_ok=True) with open(args.output, "w") as f: json.dump(predictions, f) if __name__ == "__main__": main()