""" Pipeline: D-FINE (person/car only) → group detections → crop regions → find all bboxes inside each crop → Jina-CLIP-v2 and Nomic embeddings on those crops. Outputs separate crop folders per model (jina_crops, nomic_crops) for visual comparison. """ import argparse import csv import time from pathlib import Path import numpy as np import torch import torch.nn.functional as F from PIL import Image from transformers import AutoImageProcessor, DFineForObjectDetection # Jina-CLIP-v2 few-shot (same refs + classify as jina_fewshot.py) from jina_fewshot import ( IMAGE_EXTS, TRUNCATE_DIM, JinaCLIPv2Encoder, build_refs, classify as jina_classify, draw_bboxes_on_image, draw_label_on_image, ) # Only these ref classes get bboxes on group crops and appear in the known-object gallery KNOWN_DISPLAY_CLASSES = {"gun", "knife", "cigarette", "phone"} # Only show objects (and group crops) with confidence >= this MIN_DISPLAY_CONF = 0.7 from nomic_fewshot import NomicTextEncoder, NomicVisionEncoder, build_refs_nomic # ----------------------------------------------------------------------------- # Detection + grouping (from reference_detection.py) # ----------------------------------------------------------------------------- def get_box_dist(box1, box2): """Euclidean distance between box centers. box = [x1, y1, x2, y2].""" c1 = np.array([(box1[0] + box1[2]) / 2, (box1[1] + box1[3]) / 2]) c2 = np.array([(box2[0] + box2[2]) / 2, (box2[1] + box2[3]) / 2]) return np.linalg.norm(c1 - c2) def group_detections(detections, threshold): """ Group detections by proximity (center distance < threshold). detections: list of {"box": [x1,y1,x2,y2], "conf", "cls", ...} Returns list of {"box": merged [x1,y1,x2,y2], "conf": best in group, "cls": best in group}. """ if not detections: return [] boxes = [d["box"] for d in detections] n = len(boxes) adj = {i: [] for i in range(n)} for i in range(n): for j in range(i + 1, n): if get_box_dist(boxes[i], boxes[j]) < threshold: adj[i].append(j) adj[j].append(i) groups = [] visited = [False] * n for i in range(n): if not visited[i]: group_indices = [] stack = [i] visited[i] = True while stack: curr = stack.pop() group_indices.append(curr) for neighbor in adj[curr]: if not visited[neighbor]: visited[neighbor] = True stack.append(neighbor) group_dets = [detections[k] for k in group_indices] x1 = min(d["box"][0] for d in group_dets) y1 = min(d["box"][1] for d in group_dets) x2 = max(d["box"][2] for d in group_dets) y2 = max(d["box"][3] for d in group_dets) best_det = max(group_dets, key=lambda x: x["conf"]) groups.append({ "box": [x1, y1, x2, y2], "conf": best_det["conf"], "cls": best_det["cls"], "label": best_det.get("label", str(best_det["cls"])), }) return groups def box_center_inside(box, crop_box): """True if center of box is inside crop_box. All [x1,y1,x2,y2].""" cx = (box[0] + box[2]) / 2 cy = (box[1] + box[3]) / 2 return ( crop_box[0] <= cx <= crop_box[2] and crop_box[1] <= cy <= crop_box[3] ) def squarify_crop_box(bx1, by1, bx2, by2, img_w, img_h): """ Expand the shorter side to match the longer (same ratio / square), centered, clamped to image. If height > width: expand width. If width >= height: expand height. Returns (bx1, by1, bx2, by2) as integers. """ orig = (int(bx1), int(by1), int(bx2), int(by2)) w = bx2 - bx1 h = by2 - by1 if w <= 0 or h <= 0: return orig if h > w: add = (h - w) / 2.0 bx1 = max(0, bx1 - add) bx2 = min(img_w, bx2 + add) else: add = (w - h) / 2.0 by1 = max(0, by1 - add) by2 = min(img_h, by2 + add) bx1, by1, bx2, by2 = int(bx1), int(by1), int(bx2), int(by2) if bx2 <= bx1 or by2 <= by1: return orig return bx1, by1, bx2, by2 def box_iou(box1, box2): """IoU of two boxes [x1,y1,x2,y2]. Returns float in [0, 1].""" ix1 = max(box1[0], box2[0]) iy1 = max(box1[1], box2[1]) ix2 = min(box1[2], box2[2]) iy2 = min(box1[3], box2[3]) inter_w = max(0, ix2 - ix1) inter_h = max(0, iy2 - iy1) inter = inter_w * inter_h a1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) a2 = (box2[2] - box2[0]) * (box2[3] - box2[1]) union = a1 + a2 - inter return inter / union if union > 0 else 0.0 def deduplicate_by_iou(detections, iou_threshold=0.9): """Keep one detection per overlapping group (IoU >= iou_threshold). Prefer higher confidence.""" if not detections: return [] # Sort by confidence descending; keep first, then add only if no kept box overlaps >= threshold sorted_d = sorted(detections, key=lambda x: -x["conf"]) kept = [] for d in sorted_d: if not any(box_iou(d["box"], k["box"]) >= iou_threshold for k in kept): kept.append(d) return kept def parse_args(): p = argparse.ArgumentParser( description="D-FINE (person/car) → group → Jina-CLIP-v2 on crops inside groups" ) p.add_argument("--refs", required=True, help="Reference images folder for Jina and Nomic (e.g. refs/)") p.add_argument("--input", required=True, help="Full-frame images folder") p.add_argument("--output", default="pipeline_results", help="Output folder (CSV, etc.)") p.add_argument("--det-threshold", type=float, default=0.13, help="D-FINE score threshold") p.add_argument("--group-dist", type=float, default=None, help="Group distance (default: 0.1 * max(H,W))") p.add_argument("--min-side", type=int, default=40, help="Min side of expanded bbox in px (skip smaller)") p.add_argument("--crop-dedup-iou", type=float, default=0.35, help="Min IoU to treat two crops as same object (keep larger)") p.add_argument("--no-squarify", action="store_true", help="Skip squarify; use expanded bbox only (tighter crops, often better recognition)") p.add_argument("--padding", type=float, default=0.2, help="Crop padding around group box (0.2 = 20%%)") p.add_argument("--conf-threshold", type=float, default=0.75, help="Jina accept confidence") p.add_argument("--gap-threshold", type=float, default=0.05, help="Jina accept gap") p.add_argument("--text-weight", type=float, default=0.3) p.add_argument("--max-images", type=int, default=None) p.add_argument("--device", default=None) return p.parse_args() def get_person_car_label_ids(model): """Return set of label IDs for person and car (Objects365: Person, Car, SUV, etc.).""" id2label = getattr(model.config, "id2label", None) or {} ids = set() for idx, name in id2label.items(): try: i = int(idx) except (ValueError, TypeError): continue n = (name or "").lower() if "person" in n or n in ("car", "suv"): ids.add(i) return ids def run_dfine(image, processor, model, device, score_threshold): """Run D-FINE, return all detections as list of {box, score, label_id, label}.""" from PIL import Image if isinstance(image, Image.Image): pil = image.convert("RGB") else: pil = Image.fromarray(image).convert("RGB") w, h = pil.size target_size = torch.tensor([[h, w]], device=device) inputs = processor(images=pil, return_tensors="pt") inputs = {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): outputs = model(**inputs) target_sizes = target_size.to(outputs["logits"].device) results = processor.post_process_object_detection( outputs, target_sizes=target_sizes, threshold=score_threshold, ) id2label = getattr(model.config, "id2label", {}) or {} detections = [] for result in results: for score, label_id, box in zip( result["scores"], result["labels"], result["boxes"] ): sid = int(label_id.item()) detections.append({ "box": [float(x) for x in box.cpu().tolist()], "conf": float(score.item()), "cls": sid, "label": id2label.get(sid, str(sid)), }) return detections def main(): args = parse_args() device = args.device or ("cuda" if torch.cuda.is_available() else "cpu") input_dir = Path(args.input) output_dir = Path(args.output) refs_dir = Path(args.refs) output_dir.mkdir(parents=True, exist_ok=True) if not refs_dir.is_dir(): raise SystemExit(f"Refs folder not found: {refs_dir}") if not input_dir.is_dir(): raise SystemExit(f"Input folder not found: {input_dir}") paths = sorted( p for p in input_dir.iterdir() if p.suffix.lower() in IMAGE_EXTS ) if args.max_images is not None: paths = paths[: args.max_images] if not paths: raise SystemExit(f"No images in {input_dir}") # Load D-FINE print("[*] Loading D-FINE (dfine-medium-obj365)...") t0 = time.perf_counter() image_processor = AutoImageProcessor.from_pretrained("ustc-community/dfine-medium-obj365") dfine_model = DFineForObjectDetection.from_pretrained("ustc-community/dfine-medium-obj365") dfine_model = dfine_model.to(device).eval() person_car_ids = get_person_car_label_ids(dfine_model) print(f" Person/car label IDs: {person_car_ids} ({time.perf_counter()-t0:.1f}s)") # Load Jina-CLIP-v2 + build refs print("[*] Loading Jina-CLIP-v2 and building refs...") t0 = time.perf_counter() jina_encoder = JinaCLIPv2Encoder(device) ref_labels, ref_embs = build_refs( jina_encoder, refs_dir, TRUNCATE_DIM, args.text_weight, batch_size=16 ) print(f" Jina refs: {ref_labels} ({time.perf_counter()-t0:.1f}s)\n") # Load Nomic vision + text, build refs (same as Jina: image + text prompts, text_weight 0.3) print("[*] Loading Nomic embed-vision + embed-text and building refs...") t0 = time.perf_counter() nomic_encoder = NomicVisionEncoder(device) nomic_text_encoder = NomicTextEncoder(device) ref_labels_nomic, ref_embs_nomic = build_refs_nomic( nomic_encoder, refs_dir, batch_size=16, text_encoder=nomic_text_encoder, text_weight=args.text_weight, ) print(f" Nomic refs: {ref_labels_nomic} ({time.perf_counter()-t0:.1f}s)\n") # Separate output folders per model for visual comparison jina_crops_dir = output_dir / "jina_crops" nomic_crops_dir = output_dir / "nomic_crops" jina_crops_dir.mkdir(parents=True, exist_ok=True) nomic_crops_dir.mkdir(parents=True, exist_ok=True) # CSV csv_path = output_dir / "results.csv" f = open(csv_path, "w", newline="") w = csv.writer(f) w.writerow([ "image", "crop_filename", "group_idx", "crop_x1", "crop_y1", "crop_x2", "crop_y2", "bbox_x1", "bbox_y1", "bbox_x2", "bbox_y2", "dfine_label", "dfine_conf", "jina_prediction", "jina_confidence", "jina_status", "nomic_prediction", "nomic_confidence", "nomic_status", ]) for img_path in paths: pil = Image.open(img_path).convert("RGB") img_w, img_h = pil.size group_dist = args.group_dist if args.group_dist is not None else 0.1 * max(img_h, img_w) # 1) D-FINE: detect everything, keep all bboxes for the image detections = run_dfine( pil, image_processor, dfine_model, device, args.det_threshold ) person_car = [d for d in detections if d["cls"] in person_car_ids] if not person_car: continue # 2) Group person/car detections (same as reference) grouped = group_detections(person_car, group_dist) grouped.sort(key=lambda x: x["conf"], reverse=True) top_groups = grouped[:10] # limit groups per image # 3) Collect all candidate crops (bboxes inside person/car groups) # Each: (crop_box, crop_pil, d, gidx, crop_idx, x1, y1, x2, y2) candidates = [] for gidx, grp in enumerate(top_groups): x1, y1, x2, y2 = grp["box"] group_box = [x1, y1, x2, y2] inside = [ d for d in detections if box_center_inside(d["box"], group_box) and d["cls"] not in person_car_ids ] inside = deduplicate_by_iou(inside, iou_threshold=0.9) for crop_idx, d in enumerate(inside): bx1, by1, bx2, by2 = [float(x) for x in d["box"]] obj_w, obj_h = bx2 - bx1, by2 - by1 if obj_w <= 0 or obj_h <= 0: continue # Small objects (min side < 24 px): expand by 60%; larger: 30% min_side_obj = min(obj_w, obj_h) pad_ratio = 0.6 if min_side_obj < 24 else 0.3 pad_x = obj_w * pad_ratio pad_y = obj_h * pad_ratio bx1 = max(0, int(bx1 - pad_x)) by1 = max(0, int(by1 - pad_y)) bx2 = min(img_w, int(bx2 + pad_x)) by2 = min(img_h, int(by2 + pad_y)) if bx2 <= bx1 or by2 <= by1: continue if min(bx2 - bx1, by2 - by1) < args.min_side: continue expanded_box = [bx1, by1, bx2, by2] candidates.append((expanded_box, d, gidx, crop_idx, x1, y1, x2, y2)) # 4) Dedup on EXPANDED boxes (before squarify), keep larger; then squarify only kept def crop_area(box): return (box[2] - box[0]) * (box[3] - box[1]) candidates.sort(key=lambda c: -crop_area(c[0])) kept = [] for c in candidates: expanded_box = c[0] def is_same_object(box_a, box_b): if box_iou(box_a, box_b) >= args.crop_dedup_iou: return True if box_center_inside(box_a, box_b) or box_center_inside(box_b, box_a): return True return False if not any(is_same_object(expanded_box, k[0]) for k in kept): kept.append(c) # 5) Optionally squarify, then run Jina and Nomic only on kept crops for i, (expanded_box, d, gidx, crop_idx, x1, y1, x2, y2) in enumerate(kept): if not args.no_squarify: bx1, by1, bx2, by2 = squarify_crop_box( expanded_box[0], expanded_box[1], expanded_box[2], expanded_box[3], img_w, img_h ) else: bx1, by1, bx2, by2 = expanded_box[0], expanded_box[1], expanded_box[2], expanded_box[3] crop_pil = pil.crop((bx1, by1, bx2, by2)) crop_name = f"{img_path.stem}_g{gidx}_{i}_{bx1}_{by1}_{bx2}_{by2}{img_path.suffix}" q_jina = jina_encoder.encode_images([crop_pil], TRUNCATE_DIM) result_jina = jina_classify( q_jina, ref_labels, ref_embs, args.conf_threshold, args.gap_threshold ) if result_jina["prediction"] in ref_labels: label_jina = result_jina["prediction"] conf_jina = result_jina["confidence"] else: label_jina = f"unnamed (dfine: {d['label']})" conf_jina = 0.0 ann_jina = draw_label_on_image(crop_pil, label_jina, conf_jina) ann_jina.save(jina_crops_dir / crop_name) q_nomic = nomic_encoder.encode_images([crop_pil]) result_nomic = jina_classify( q_nomic, ref_labels_nomic, ref_embs_nomic, args.conf_threshold, args.gap_threshold ) if result_nomic["prediction"] in ref_labels_nomic: label_nomic = result_nomic["prediction"] conf_nomic = result_nomic["confidence"] else: label_nomic = f"unnamed (dfine: {d['label']})" conf_nomic = 0.0 ann_nomic = draw_label_on_image(crop_pil, label_nomic, conf_nomic) ann_nomic.save(nomic_crops_dir / crop_name) w.writerow([ img_path.name, crop_name, gidx, x1, y1, x2, y2, bx1, by1, bx2, by2, d["label"], f"{d['conf']:.4f}", result_jina["prediction"], f"{result_jina['confidence']:.4f}", result_jina["status"], result_nomic["prediction"], f"{result_nomic['confidence']:.4f}", result_nomic["status"], ]) f.close() print(f"[*] Wrote {csv_path}") print(f"[*] Jina crops: {jina_crops_dir}") print(f"[*] Nomic crops: {nomic_crops_dir}") # ----------------------------------------------------------------------------- # Single-image runner for Gradio app: D-FINE first, then Jina or Nomic (user choice) # ----------------------------------------------------------------------------- _APP_DFINE = None _APP_JINA = None _APP_NOMIC = None _APP_REFS_JINA = None _APP_REFS_NOMIC = None def run_single_image( pil_image, refs_dir, device=None, encoder_choice="jina", det_threshold=0.3, conf_threshold=0.75, gap_threshold=0.05, min_side=40, crop_dedup_iou=0.35, squarify=True, min_display_conf=None, ): """ Run D-FINE on one image, then classify small-object crops with Jina or Nomic. refs_dir: path to refs folder (str or Path). encoder_choice: "jina" or "nomic". Returns (group_crop_images, known_crop_composites, status_message). - group_crop_images: list of PIL/numpy (one per person/car group, with bboxes for known objects only). - known_crop_composites: list of PIL/numpy (label+score above + crop) for known classes only. - status_message: None on success, or error/empty-state string. """ import numpy as np if min_display_conf is None: min_display_conf = MIN_DISPLAY_CONF from PIL import Image global _APP_DFINE, _APP_JINA, _APP_NOMIC, _APP_REFS_JINA, _APP_REFS_NOMIC refs_dir = Path(refs_dir) if not refs_dir.is_dir(): return [], [], f"Refs folder not found: {refs_dir}" device = device or ("cuda" if torch.cuda.is_available() else "cpu") print(f"[*] Device: {device}") pil = pil_image.convert("RGB") if isinstance(pil_image, Image.Image) else Image.fromarray(pil_image).convert("RGB") img_w, img_h = pil.size group_dist = 0.1 * max(img_h, img_w) # Load D-FINE once if _APP_DFINE is None: image_processor = AutoImageProcessor.from_pretrained("ustc-community/dfine-medium-obj365") dfine_model = DFineForObjectDetection.from_pretrained("ustc-community/dfine-medium-obj365") dfine_model = dfine_model.to(device).eval() person_car_ids = get_person_car_label_ids(dfine_model) _APP_DFINE = (image_processor, dfine_model, person_car_ids) image_processor, dfine_model, person_car_ids = _APP_DFINE detections = run_dfine(pil, image_processor, dfine_model, device, det_threshold) person_car = [d for d in detections if d["cls"] in person_car_ids] if not person_car: return [], [], "No person/car detected. No small-object crops." grouped = group_detections(person_car, group_dist) grouped.sort(key=lambda x: x["conf"], reverse=True) top_groups = grouped[:10] candidates = [] for gidx, grp in enumerate(top_groups): x1, y1, x2, y2 = grp["box"] group_box = [x1, y1, x2, y2] inside = [ d for d in detections if box_center_inside(d["box"], group_box) and d["cls"] not in person_car_ids ] inside = deduplicate_by_iou(inside, iou_threshold=0.9) for crop_idx, d in enumerate(inside): bx1, by1, bx2, by2 = [float(x) for x in d["box"]] obj_w, obj_h = bx2 - bx1, by2 - by1 if obj_w <= 0 or obj_h <= 0: continue # Small objects (min side < 24 px): expand by 60%; larger: 30% min_side_obj = min(obj_w, obj_h) pad_ratio = 0.6 if min_side_obj < 24 else 0.3 pad_x = obj_w * pad_ratio pad_y = obj_h * pad_ratio bx1 = max(0, int(bx1 - pad_x)) by1 = max(0, int(by1 - pad_y)) bx2 = min(img_w, int(bx2 + pad_x)) by2 = min(img_h, int(by2 + pad_y)) if bx2 <= bx1 or by2 <= by1: continue if min(bx2 - bx1, by2 - by1) < min_side: continue expanded_box = [bx1, by1, bx2, by2] candidates.append((expanded_box, d, gidx, crop_idx)) def crop_area(box): return (box[2] - box[0]) * (box[3] - box[1]) candidates.sort(key=lambda c: -crop_area(c[0])) kept = [] for c in candidates: def is_same_object(box_a, box_b): if box_iou(box_a, box_b) >= crop_dedup_iou: return True if box_center_inside(box_a, box_b) or box_center_inside(box_b, box_a): return True return False if not any(is_same_object(c[0], k[0]) for k in kept): kept.append(c) if not kept: if not candidates: return [], [], "No small-object crops: D-FINE did not detect any object (gun/phone/etc.) inside person/car areas, or all were below min size. Try a higher-resolution image." return [], [], "No small-object crops (after dedup)." # Load encoder + refs for chosen model if encoder_choice == "jina": if _APP_JINA is None or _APP_REFS_JINA != str(refs_dir): jina_encoder = JinaCLIPv2Encoder(device) ref_labels, ref_embs = build_refs(jina_encoder, refs_dir, TRUNCATE_DIM, 0.3, batch_size=16) _APP_JINA = (jina_encoder, ref_labels, ref_embs) _APP_REFS_JINA = str(refs_dir) jina_encoder, ref_labels, ref_embs = _APP_JINA else: if _APP_NOMIC is None or _APP_REFS_NOMIC != str(refs_dir): nomic_encoder = NomicVisionEncoder(device) nomic_text_encoder = NomicTextEncoder(device) ref_labels, ref_embs = build_refs_nomic( nomic_encoder, refs_dir, batch_size=16, text_encoder=nomic_text_encoder, text_weight=0.3, ) _APP_NOMIC = (nomic_encoder, ref_labels, ref_embs) _APP_REFS_NOMIC = str(refs_dir) nomic_encoder, ref_labels, ref_embs = _APP_NOMIC # Classify each kept crop and store (gidx, box_in_full_image, crop_pil, pred, conf) results_per_crop = [] for expanded_box, d, gidx, crop_idx in kept: if squarify: bx1, by1, bx2, by2 = squarify_crop_box( expanded_box[0], expanded_box[1], expanded_box[2], expanded_box[3], img_w, img_h, ) else: bx1, by1, bx2, by2 = expanded_box[0], expanded_box[1], expanded_box[2], expanded_box[3] crop_pil = pil.crop((bx1, by1, bx2, by2)) if encoder_choice == "jina": q = jina_encoder.encode_images([crop_pil], TRUNCATE_DIM) result = jina_classify(q, ref_labels, ref_embs, conf_threshold, gap_threshold) else: q = nomic_encoder.encode_images([crop_pil]) result = jina_classify(q, ref_labels, ref_embs, conf_threshold, gap_threshold) pred = result["prediction"] if result["prediction"] in ref_labels else f"unknown ({d['label']})" conf = result["confidence"] results_per_crop.append((gidx, (bx1, by1, bx2, by2), crop_pil, pred, conf)) # Build group crop images: only groups that contain at least one known object with conf >= MIN_DISPLAY_CONF group_crop_images = [] for gidx, grp in enumerate(top_groups): gx1, gy1, gx2, gy2 = grp["box"] gx1, gy1 = int(gx1), int(gy1) gx2, gy2 = int(gx2), int(gy2) gx1, gy1 = max(0, gx1), max(0, gy1) gx2, gy2 = min(img_w, gx2), min(img_h, gy2) if gx2 <= gx1 or gy2 <= gy1: continue group_crop = pil.crop((gx1, gy1, gx2, gy2)).copy().convert("RGB") crop_w, crop_h = group_crop.size boxes_to_draw = [] for (gidx2, (bx1, by1, bx2, by2), _crop_pil, pred, conf) in results_per_crop: if gidx2 != gidx or pred not in KNOWN_DISPLAY_CLASSES or conf < min_display_conf: continue # Convert to group-crop-relative coords and clamp rx1 = max(0, min(crop_w, bx1 - gx1)) ry1 = max(0, min(crop_h, by1 - gy1)) rx2 = max(0, min(crop_w, bx2 - gx1)) ry2 = max(0, min(crop_h, by2 - gy1)) if rx2 > rx1 and ry2 > ry1: boxes_to_draw.append((rx1, ry1, rx2, ry2, pred, conf)) # Only show this group crop if it has at least one known object >= min_display_conf if not boxes_to_draw: continue group_crop = draw_bboxes_on_image(group_crop, boxes_to_draw) group_crop_images.append(np.array(group_crop)) # Build known-only gallery: only objects with conf >= min_display_conf known_crop_composites = [] for (_gidx, _box, crop_pil, pred, conf) in results_per_crop: if pred not in KNOWN_DISPLAY_CLASSES or conf < min_display_conf: continue composite = draw_label_on_image(crop_pil, pred, conf) known_crop_composites.append(np.array(composite)) return group_crop_images, known_crop_composites, None if __name__ == "__main__": main()