"""PPE compliance: per-person missing-PPE detection for the Gradio app. Runs TWO detectors on the full image and reports, per person, which REQUIRED PPE items are MISSING (the "negation" logic): 1. Person detector — the SAME HF-transformers D-FINE model the "Detect & Classify" tab uses (``ustc-community/dfine-*``), filtered to the COCO/ Objects365 "person" class. We reuse that tab's already-loaded model when it matches, so the Space doesn't load a third detector. 2. PPE detector — the fine-tuned D-FINE-M (6 classes: goggles/helmet/mask/ shoes/vest/glove), reused from ``ppe_pipeline``. Rather than one full-frame pass (which shrinks every worker in the 640x640 resize and misses small PPE like vests), we run it on EACH person crop and map the boxes back. On this image that lifted vest detections from 0 -> 4. Each PPE detection is credited to the person whose crop produced it (gated by containment in that person's box). A person missing any required item is a VIOLATION. Rendering: green box = OK, red box = violation, with a compact per-person checklist (1=present / 0=missing) drawn outside the box. Adapted from ``ppe/scripts/compliance.py``; the original's body-region bands are dropped — per-person cropping already localises PPE to the right worker, so the band only hurt recall on bent/crouched poses. """ import torch from PIL import Image, ImageDraw import dfine_jina_pipeline as djp from dfine_jina_pipeline import DFINE_MODEL_IDS, run_dfine from ppe_pipeline import detect_ppe_boxes, _load_font, PALETTE from transformers import AutoImageProcessor, DFineForObjectDetection DEVICE = "cpu" ALL_PPE = ["goggles", "helmet", "mask", "shoes", "vest", "glove"] # Expected vertical position of each item within its wearer's box (0 = top of the # person box, 1 = bottom). Used ONLY to disambiguate which of several overlapping # persons an item belongs to — never to reject an item outright. PPE_VFRAC = { "helmet": 0.10, "goggles": 0.13, "mask": 0.20, "vest": 0.45, "glove": 0.62, "shoes": 0.92, } # Person detector defaults to the same model the "Detect & Classify" tab uses. DEFAULT_PERSON_MODEL = "medium-obj2coco" # Per-person PPE detection: pad each person box before cropping so PPE near the # edges isn't clipped; cap the number of crops so crowded frames stay responsive # (each crop is one PPE forward pass, CPU on the Space). PERSON_CROP_PAD = 0.15 MAX_PERSON_CROPS = 20 # Collapse the same physical item seen in two overlapping (padded) crops. DEDUP_IOU = 0.55 _PERSON = None # (model_key, processor, model, person_ids) def _get_person_label_ids(model): """Label IDs whose name contains 'person' (handles COCO + Objects365).""" id2label = getattr(model.config, "id2label", {}) or {} ids = set() for idx, name in id2label.items(): try: i = int(idx) except (ValueError, TypeError): continue if "person" in (name or "").lower(): ids.add(i) return ids def _get_person_model(model_key=DEFAULT_PERSON_MODEL): """Load (and cache) the HF D-FINE person detector, reusing the Detect & Classify tab's loaded model when the choice matches (avoids a duplicate load). """ global _PERSON # Reuse the classify tab's model if it's the same checkpoint. if djp._APP_DFINE is not None and djp._APP_DFINE[0] == model_key: _, proc, mdl, _ids = djp._APP_DFINE return model_key, proc, mdl, _get_person_label_ids(mdl) if _PERSON is None or _PERSON[0] != model_key: model_id = DFINE_MODEL_IDS.get(model_key, DFINE_MODEL_IDS[DEFAULT_PERSON_MODEL]) print(f"[*] Loading person detector ({model_id})...") proc = AutoImageProcessor.from_pretrained(model_id) mdl = DFineForObjectDetection.from_pretrained(model_id).to(DEVICE).eval() _PERSON = (model_key, proc, mdl, _get_person_label_ids(mdl)) print("[*] Person detector loaded.") return _PERSON def _contain_frac(inner, outer): """Fraction of ``inner`` box area that lies inside ``outer`` box.""" ix1, iy1 = max(inner[0], outer[0]), max(inner[1], outer[1]) ix2, iy2 = min(inner[2], outer[2]), min(inner[3], outer[3]) iw, ih = max(0.0, ix2 - ix1), max(0.0, iy2 - iy1) inter = iw * ih area = max(1e-6, (inner[2] - inner[0]) * (inner[3] - inner[1])) return inter / area def _box_iou(a, b): ix1, iy1 = max(a[0], b[0]), max(a[1], b[1]) ix2, iy2 = min(a[2], b[2]), min(a[3], b[3]) inter = max(0.0, ix2 - ix1) * max(0.0, iy2 - iy1) union = (a[2] - a[0]) * (a[3] - a[1]) + (b[2] - b[0]) * (b[3] - b[1]) - inter return inter / union if union > 0 else 0.0 def _dedup_ppe(dets, iou=DEDUP_IOU): """Drop near-duplicate detections of the SAME class (same item caught in two overlapping person crops). Keep the highest-scoring box.""" kept = [] for name, s, box in sorted(dets, key=lambda d: -d[1]): if any(n == name and _box_iou(box, kb) >= iou for n, _, kb in kept): continue kept.append((name, s, box)) return kept def _color_of(name): return PALETTE[ALL_PPE.index(name) % len(PALETTE)] if name in ALL_PPE else (150, 150, 150) @torch.no_grad() def run_ppe_compliance( image, person_threshold=0.75, ppe_threshold=0.25, assoc=0.5, person_model=DEFAULT_PERSON_MODEL, min_side=960, ): """Detect persons + PPE and flag per-person missing PPE (all 6 required). Returns ``(overview_PIL, person_cards, status_text)`` where ``overview_PIL`` shows only the numbered person boxes (green=OK / red=violation), and ``person_cards`` is a list of ``(crop_with_checklist_PIL, caption)`` for a per-person gallery. """ if image is None: return None, [], "Upload an image." required = list(ALL_PPE) # all 6 items are required im = image.convert("RGB") if isinstance(image, Image.Image) else Image.fromarray(image).convert("RGB") W, H = im.size # 1) Person detector (reused HF D-FINE) — keep only 'person' above threshold. # Use the model's own device (it may be reused from the Detect & Classify tab, # which could be on GPU); run_dfine moves inputs to that device. _key, proc, mdl, person_ids = _get_person_model(person_model) pdev = str(next(mdl.parameters()).device) dets = run_dfine(im, proc, mdl, pdev, person_threshold) persons = [(d["conf"], d["box"]) for d in dets if d["cls"] in person_ids] # Keep the most confident persons if the frame is crowded (cap PPE passes). persons.sort(key=lambda ps: -ps[0]) persons = persons[:MAX_PERSON_CROPS] # 2) PPE detector (fine-tuned D-FINE-M), run on EACH person crop so small PPE # (vests, distant helmets) isn't lost to the 640x640 full-frame resize. Boxes # are mapped back to full-image coords; the union is de-duplicated across crops. people = [{"score": conf, "box": pb, "present": {}} for conf, pb in persons] ppe = [] # (name, score, full_box) for conf, pb in persons: x1, y1, x2, y2 = pb pw, ph = x2 - x1, y2 - y1 cx1 = max(0, int(x1 - PERSON_CROP_PAD * pw)) cy1 = max(0, int(y1 - PERSON_CROP_PAD * ph)) cx2 = min(W, int(x2 + PERSON_CROP_PAD * pw)) cy2 = min(H, int(y2 + PERSON_CROP_PAD * ph)) if cx2 <= cx1 or cy2 <= cy1: continue crop = im.crop((cx1, cy1, cx2, cy2)) for name, s, b in detect_ppe_boxes(crop, threshold=ppe_threshold): ppe.append((name, s, [b[0] + cx1, b[1] + cy1, b[2] + cx1, b[3] + cy1])) ppe = _dedup_ppe(ppe) # 3) Attribute each PPE item to EXACTLY ONE person. Among the persons whose box # contains the item (>= assoc), pick the one where it sits in the anatomically # expected place (helmet near the top, shoes near the bottom, ...). This stops a # helmet that lies in the MIDDLE of a tall overlapping neighbour from being # credited to them when it's really at the TOP (head) of the person beside them. # It's a tie-break among containers, not a hard reject, so a uniquely-contained # item is still credited regardless of pose (no false "missing" on bent workers). for name, s, box in ppe: tgt = PPE_VFRAC.get(name, 0.5) cy = 0.5 * (box[1] + box[3]) best_i, best_d = -1, None for i, p in enumerate(people): pb = p["box"] if _contain_frac(box, pb) < assoc: continue d = abs((cy - pb[1]) / max(1e-6, pb[3] - pb[1]) - tgt) if best_d is None or d < best_d: best_i, best_d = i, d if best_i >= 0: cur = people[best_i]["present"].get(name) if cur is None or s > cur[0]: people[best_i]["present"][name] = (s, box) # 4) Verdicts + report. verdicts = [] lines = [f"{len(people)} person(s), {len(ppe)} PPE detection(s). Required: {required}", ""] for i, p in enumerate(people): present = set(p["present"]) missing = [c for c in required if c not in present] verdicts.append((i, p, present, missing, not missing)) status = "OK" if not missing else "VIOLATION" lines.append(f"Person #{i} (score {p['score']:.2f}) [{status}]") lines.append(f" present: {sorted(present) or '-'}") lines.append(f" missing: {missing or '-'}") if not people: lines.append("No persons detected — lower the person threshold or try another image.") # 5) Build per-person CARDS first (crop + PPE boxes + checklist beside it), # off the original image, before the overview is upscaled. cards = [_person_card(im, i, p, ok) for i, p, present, missing, ok in verdicts] # 6) Overview image: bounding boxes for detected objects — thin coloured boxes # for the PPE items, plus the numbered person boxes (green = OK, red = # violation). No checklist text on top; that 1/0 detail lives in the cards. scale = max(1.0, min_side / max(im.size)) overview = im boxes = [(i, [v * scale for v in p["box"]], ok) for i, p, present, missing, ok in verdicts] if scale > 1.0: overview = im.resize((round(im.size[0] * scale), round(im.size[1] * scale)), Image.LANCZOS) overview = overview.copy() od = ImageDraw.Draw(overview) OW, OH = overview.size nf = _load_font(max(16, int(0.030 * max(OW, OH)))) pw = max(3, int(0.006 * max(OW, OH))) # PPE item boxes first (thin), so the thicker person boxes sit on top. ppe_w = max(2, int(0.004 * max(OW, OH))) for name, s, box in ppe: b = [int(v * scale) for v in box] od.rectangle(b, outline=_color_of(name), width=ppe_w) for i, box, ok in boxes: col = GREEN if ok else RED x1, y1, x2, y2 = [int(v) for v in box] od.rectangle([x1, y1, x2, y2], outline=col, width=pw) tag = f"#{i}" tb = od.textbbox((0, 0), tag, font=nf) tw, th = tb[2] - tb[0], tb[3] - tb[1] ty = max(0, y1 - th - 6) od.rectangle([x1, ty, x1 + tw + 8, ty + th + 6], fill=col) od.text((x1 + 4, ty + 2), tag, font=nf, fill=(255, 255, 255)) return overview, cards, "\n".join(lines) GREEN, RED = (40, 200, 90), (235, 45, 55) def _person_card(im, idx, person, ok, pad=0.12): """One person's crop with PPE boxes drawn, plus a checklist column beside it (item 1 = present / 0 = missing). Returns ``(PIL_card, caption)`` for a gallery. """ W, H = im.size x1, y1, x2, y2 = person["box"] bw, bh = x2 - x1, y2 - y1 cx1 = max(0, int(x1 - pad * bw)); cy1 = max(0, int(y1 - pad * bh)) cx2 = min(W, int(x2 + pad * bw)); cy2 = min(H, int(y2 + pad * bh)) crop = im.crop((cx1, cy1, cx2, cy2)).convert("RGB") cw, ch = crop.size # Draw the present items' boxes on the crop (with class colour + score). cd = ImageDraw.Draw(crop) cf = _load_font(max(12, int(0.045 * max(cw, ch)))) for name, (s, box) in person["present"].items(): b = [box[0] - cx1, box[1] - cy1, box[2] - cx1, box[3] - cy1] col = _color_of(name) cd.rectangle(b, outline=col, width=max(2, int(0.006 * max(cw, ch)))) t = f"{name} {s:.2f}" tb = cd.textbbox((0, 0), t, font=cf) tw, th = tb[2] - tb[0], tb[3] - tb[1] ty = max(0, b[1] - th - 3) cd.rectangle([b[0], ty, b[0] + tw + 5, ty + th + 3], fill=col) cd.text((b[0] + 2, ty + 1), t, font=cf, fill=(255, 255, 255)) # Build the checklist panel to the RIGHT of the crop. present = set(person["present"]) pf = _load_font(max(14, int(ch / 14))) line_h = pf.getbbox("Hg1")[3] + 6 rows = [(f"#{idx} {'OK' if ok else 'VIOLATION'}", None)] + [(it, it in present) for it in ALL_PPE] txts = [lab if pres is None else f"{lab} {1 if pres else 0}" for lab, pres in rows] tmp = ImageDraw.Draw(crop) panel_pad = 12 panel_w = max(tmp.textbbox((0, 0), t, font=pf)[2] for t in txts) + 2 * panel_pad panel_h = line_h * len(rows) + 2 * panel_pad card_h = max(ch, panel_h) card = Image.new("RGB", (cw + panel_w, card_h), (245, 245, 245)) card.paste(crop, (0, (card_h - ch) // 2)) pd = ImageDraw.Draw(card) x0 = cw + panel_pad y0 = (card_h - panel_h) // 2 + panel_pad for k, (lab, pres) in enumerate(rows): col = (GREEN if ok else RED) if pres is None else (GREEN if pres else RED) pd.text((x0, y0 + k * line_h), txts[k], font=pf, fill=col) caption = f"#{idx} — " + ("OK" if ok else "VIOLATION: no " + ", ".join(c for c in ALL_PPE if c not in present)) return (card, caption)