"""PPE detection pipeline. Runs the fine-tuned D-FINE-Medium PPE detector (6 classes: goggles, helmet, mask, shoes, vest, glove) on a single image and returns a copy with bounding boxes + class names + scores drawn on it. The checkpoint is in the Peterande/D-FINE format, so the model is built with the vendored D-FINE source (``D-FINE/src``) and config, exactly like the original ``scripts/infer.py``. Inference is CPU-only. The model is loaded once and cached. """ import json import os import sys from pathlib import Path import torch import torch.nn as nn import torchvision.transforms as T from PIL import Image, ImageDraw, ImageFont BASE_DIR = Path(__file__).resolve().parent DFINE_DIR = BASE_DIR / "D-FINE" CFG_PATH = DFINE_DIR / "configs/dfine/custom/objects365/dfine_hgnetv2_m_ppe.yml" # Slim deploy checkpoint: EMA weights only, stored fp16 (~39 MB), upcast to fp32 at load. CKPT_PATH = BASE_DIR / "models/ppe_dfine_m_deploy.pth" CLASSES_JSON = BASE_DIR / "models/ppe_classes.json" DEVICE = "cpu" INPUT_SIZE = 640 # Distinct colors per class id (0-5). PALETTE = [ (255, 56, 56), (255, 159, 0), (255, 221, 0), (0, 199, 89), (0, 162, 255), (170, 0, 255), ] _PPE_MODEL = None _PPE_NAMES = None def _load_class_names(): if CLASSES_JSON.exists(): cats = json.loads(CLASSES_JSON.read_text())["categories"] return {c["id"]: c["name"] for c in cats} return {} def _load_font(size): for p in ("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"): if os.path.exists(p): return ImageFont.truetype(p, size) return ImageFont.load_default() def _build_model(): """Build the D-FINE PPE model on CPU and load the trained EMA weights.""" if str(DFINE_DIR) not in sys.path: sys.path.insert(0, str(DFINE_DIR)) from src.core import YAMLConfig # noqa: E402 (registers model components) cfg = YAMLConfig(str(CFG_PATH), resume=str(CKPT_PATH)) if "HGNetv2" in cfg.yaml_cfg: cfg.yaml_cfg["HGNetv2"]["pretrained"] = False checkpoint = torch.load(str(CKPT_PATH), map_location="cpu", weights_only=False) state = checkpoint["ema"]["module"] if "ema" in checkpoint else checkpoint["model"] # Deploy checkpoint stores fp16 weights; upcast to fp32 (CPU runs fp32 only). state = {k: (v.float() if torch.is_tensor(v) and v.is_floating_point() else v) for k, v in state.items()} cfg.model.load_state_dict(state) class Model(nn.Module): def __init__(self): super().__init__() self.model = cfg.model.deploy() self.postprocessor = cfg.postprocessor.deploy() def forward(self, images, orig_sizes): return self.postprocessor(self.model(images), orig_sizes) return Model().to(DEVICE).eval() def _get_model(): global _PPE_MODEL, _PPE_NAMES if _PPE_MODEL is None: print("[*] Loading PPE D-FINE model (CPU)...") _PPE_NAMES = _load_class_names() _PPE_MODEL = _build_model() print("[*] PPE model loaded.") return _PPE_MODEL, _PPE_NAMES @torch.no_grad() def detect_ppe_boxes(image, threshold=0.4): """Run the PPE detector and return raw detections as a list of ``(class_name, score, [x1, y1, x2, y2])`` in original-image pixel coords. Shared by ``run_ppe`` (plain drawing) and the compliance pipeline. """ model, names = _get_model() im = image.convert("RGB") if isinstance(image, Image.Image) else Image.fromarray(image).convert("RGB") w, h = im.size orig_size = torch.tensor([[w, h]]).to(DEVICE) im_data = T.Compose([T.Resize((INPUT_SIZE, INPUT_SIZE)), T.ToTensor()])(im).unsqueeze(0).to(DEVICE) labels, boxes, scores = model(im_data, orig_size) out = [] for lab, box, scr in zip(labels[0], boxes[0], scores[0]): s = float(scr) if s < threshold: continue cid = int(lab) out.append((names.get(cid, str(cid)), s, [float(v) for v in box])) return out @torch.no_grad() def run_ppe(image, threshold=0.4): """Detect PPE in ``image`` (PIL) and return (annotated_image, status_text).""" if image is None: return None, "Upload an image." model, names = _get_model() im = image.convert("RGB") if isinstance(image, Image.Image) else Image.fromarray(image).convert("RGB") w, h = im.size orig_size = torch.tensor([[w, h]]).to(DEVICE) im_data = T.Compose([T.Resize((INPUT_SIZE, INPUT_SIZE)), T.ToTensor()])(im).unsqueeze(0).to(DEVICE) labels, boxes, scores = model(im_data, orig_size) labels, boxes, scores = labels[0], boxes[0], scores[0] out = im.copy() draw = ImageDraw.Draw(out) font = _load_font(max(14, int(0.02 * max(out.size)))) kept = [] for lab, box, scr in zip(labels, boxes, scores): s = float(scr) if s < threshold: continue cid = int(lab) name = names.get(cid, str(cid)) color = PALETTE[cid % len(PALETTE)] x1, y1, x2, y2 = [float(v) for v in box] draw.rectangle([x1, y1, x2, y2], outline=color, width=3) text = f"{name} {s:.2f}" tb = draw.textbbox((0, 0), text, font=font) tw, th = tb[2] - tb[0], tb[3] - tb[1] ty = max(0, y1 - th - 4) draw.rectangle([x1, ty, x1 + tw + 6, ty + th + 4], fill=color) draw.text((x1 + 3, ty + 2), text, fill=(255, 255, 255), font=font) kept.append((name, s)) if kept: lines = [f"{len(kept)} detection(s) at threshold {threshold:.2f}:"] lines += [f" {n}: {sc:.3f}" for n, sc in sorted(kept, key=lambda x: -x[1])] status = "\n".join(lines) else: status = f"No PPE detected at threshold {threshold:.2f}." return out, status