""" Calls the Modal endpoint, parses Qwen's JSON reply, and rescales bbox coordinates back into the ORIGINAL image space. The most delicate piece is the coordinate rescale: Qwen2.5-VL returns boxes in the smart-resized space, NOT in the original pixel space. We use qwen-vl-utils' own smart_resize so we use the exact same factors as the model. """ from __future__ import annotations import base64 import io import json import os import re import traceback from dataclasses import dataclass from typing import Any import requests from PIL import Image from prompts import ( CLOSEUP_SYSTEM_PROMPT, CLOSEUP_USER_PROMPT, DETECTIVE_SYSTEM_PROMPT, USER_PROMPT, ) # Same defaults as the Modal backend — KEEP IN SYNC. QWEN_FACTOR = 28 QWEN_MIN_PIXELS = 256 * 28 * 28 QWEN_MAX_PIXELS = 5120 * 28 * 28 # keep in sync with the backend MAX_PIXELS VALID_SEVERITY = {"low", "medium", "high", "capital"} VALID_GRADES = {"A", "B", "C", "D", "F"} # --------------------------------------------------------------------------- # Data # --------------------------------------------------------------------------- @dataclass class Evidence: id: int bbox: tuple[int, int, int, int] # in ORIGINAL image pixel space crime: str testimony: str severity: str fix: str = "" # the Inspector's remedy — one concrete suggested correction @dataclass class CaseFile: case_number: str case_title: str scene_summary: str evidence: list[Evidence] verdict: str grade: str closing_statement: str # --------------------------------------------------------------------------- # Network # --------------------------------------------------------------------------- def _endpoint_url() -> str: url = os.environ.get("MODAL_ENDPOINT_URL", "").rstrip("/") if not url: raise RuntimeError( "MODAL_ENDPOINT_URL is not set. Configure it as a Space secret " "(or `export MODAL_ENDPOINT_URL=...` locally)." ) # Never POST user screenshots over an unencrypted/misconfigured endpoint. if not url.startswith("https://"): raise RuntimeError("MODAL_ENDPOINT_URL must be an https:// URL") return url def _image_to_b64(img: Image.Image) -> str: buf = io.BytesIO() img.convert("RGB").save(buf, format="PNG") return base64.b64encode(buf.getvalue()).decode("ascii") def _call_modal( image_b64: str, *, system_prompt: str = DETECTIVE_SYSTEM_PROMPT, user_prompt: str = USER_PROMPT, max_tokens: int = 1024, temperature: float = 0.2, ) -> dict[str, Any]: url = f"{_endpoint_url()}/investigate" headers = {} token = os.environ.get("INVESTIGATE_TOKEN", "") if token: headers["X-Token"] = token resp = requests.post( url, json={ "image_b64": image_b64, "system_prompt": system_prompt, "user_prompt": user_prompt, "max_tokens": max_tokens, "temperature": temperature, }, headers=headers, timeout=180, ) resp.raise_for_status() return resp.json() # --------------------------------------------------------------------------- # Shareable case storage (Modal Volume via the backend endpoints) # --------------------------------------------------------------------------- def save_case(image: Image.Image, case: "CaseFile") -> str | None: """Persist a result so it gets a unique shareable id. Returns the id, or None on any failure (sharing then gracefully falls back to the tool link).""" import dataclasses try: # downscale the clean image so the stored payload stays small im = image.convert("RGB") im.thumbnail((1400, 1400)) buf = io.BytesIO() im.save(buf, format="JPEG", quality=80) image_b64 = base64.b64encode(buf.getvalue()).decode("ascii") resp = requests.post( f"{_endpoint_url()}/save_case", json={"image_b64": image_b64, "case": dataclasses.asdict(case)}, timeout=30, ) resp.raise_for_status() return resp.json().get("id") except Exception: return None def fetch_case(cid: str) -> tuple[Image.Image, "CaseFile"] | None: """Load a shared case by id -> (clean image, CaseFile). None on failure.""" if not cid or not re.match(r"^[A-Za-z0-9_-]{4,32}$", cid): return None try: resp = requests.get(f"{_endpoint_url()}/case/{cid}", timeout=30) resp.raise_for_status() data = resp.json() b64 = data["image_b64"] if len(b64) > 12_000_000: # ~9 MB decoded — guard against memory blowups return None img = Image.open(io.BytesIO(base64.b64decode(b64))).convert("RGB") c = data["case"] case = CaseFile( case_number=c["case_number"], case_title=c["case_title"], scene_summary=c["scene_summary"], evidence=[Evidence(**e) for e in c["evidence"]], verdict=c["verdict"], grade=c["grade"], closing_statement=c["closing_statement"], ) return img, case except Exception: return None # --------------------------------------------------------------------------- # MOST WANTED public board (opt-in). The frontend builds the compact summary + # thumbnail (it has PIL + the case); the backend just stores/returns it. # --------------------------------------------------------------------------- def publish_case(image: Image.Image, case: "CaseFile", case_id: str, site: str = "") -> bool: """Opt-in: add this case to the public MOST WANTED board. True on success.""" if not case_id or not re.match(r"^[A-Za-z0-9_-]{4,32}$", case_id): return False try: thumb = image.convert("RGB") thumb.thumbnail((460, 460)) buf = io.BytesIO() thumb.save(buf, format="JPEG", quality=72) thumb_b64 = base64.b64encode(buf.getvalue()).decode("ascii") summary = { "id": case_id, "title": case.case_title, "site": site, "grade": case.grade, "n_crimes": len(case.evidence), "thumb_b64": thumb_b64, } resp = requests.post(f"{_endpoint_url()}/publish", json={"summary": summary}, timeout=30) resp.raise_for_status() return bool(resp.json().get("ok")) except Exception: return False def fetch_board() -> list[dict]: """The published MOST WANTED cases (summaries). Empty list on any failure.""" try: resp = requests.get(f"{_endpoint_url()}/board", timeout=20) resp.raise_for_status() cases = resp.json().get("cases", []) return cases if isinstance(cases, list) else [] except Exception: return [] # --------------------------------------------------------------------------- # JSON extraction # --------------------------------------------------------------------------- _FENCE_RE = re.compile(r"```(?:json)?\s*(.*?)```", re.DOTALL | re.IGNORECASE) def _complete_objects(s: str) -> list[str]: """Return the source of each complete, balanced top-level {...} object in `s`, respecting string literals. Stops at the first top-level ']' (end of array).""" out: list[str] = [] depth = 0 start: int | None = None in_str = False esc = False for i, ch in enumerate(s): if in_str: if esc: esc = False elif ch == "\\": esc = True elif ch == '"': in_str = False continue if ch == '"': in_str = True elif ch == "{": if depth == 0: start = i depth += 1 elif ch == "}": if depth > 0: depth -= 1 if depth == 0 and start is not None: out.append(s[start : i + 1]) start = None elif ch == "]" and depth == 0: break return out def _salvage_truncated(text: str) -> dict[str, Any] | None: """Best-effort recovery when the model's JSON was cut off mid-output (dense pages can blow the token budget). Recover the leading scalar fields and every COMPLETE evidence object; fill anything missing with safe defaults.""" start = text.find("{") if start == -1: return None body = text[start:] def _field(name: str) -> str | None: m = re.search(rf'"{name}"\s*:\s*"((?:[^"\\]|\\.)*)"', body) return m.group(1) if m else None ev_start = body.find('"evidence"') objs: list[dict] = [] if ev_start != -1: bracket = body.find("[", ev_start) if bracket != -1: for src in _complete_objects(body[bracket + 1 :]): try: objs.append(json.loads(src)) except json.JSONDecodeError: pass if not objs: return None return { "case_number": _field("case_number") or "0000", "case_title": _field("case_title") or "Untitled Case", "scene_summary": _field("scene_summary") or "", "evidence": objs, "verdict": _field("verdict") or "GUILTY", "grade": _field("grade") or "F", "closing_statement": _field("closing_statement") or "", } def _extract_json(text: str) -> dict[str, Any]: """Qwen sometimes wraps the JSON in ```json ... ``` or adds chatter. Be lenient.""" text = text.strip() # 1. Try direct parse. try: return json.loads(text) except json.JSONDecodeError: pass # 2. Strip fences. m = _FENCE_RE.search(text) if m: inner = m.group(1).strip() try: return json.loads(inner) except json.JSONDecodeError: pass # 3. Find the first balanced {...}. start = text.find("{") if start != -1: depth = 0 for i in range(start, len(text)): c = text[i] if c == "{": depth += 1 elif c == "}": depth -= 1 if depth == 0: candidate = text[start : i + 1] try: return json.loads(candidate) except json.JSONDecodeError: break # 4. Last resort: the output was truncated mid-JSON (dense page hit the token # cap). Salvage the complete evidence objects we did get. salvaged = _salvage_truncated(text) if salvaged is not None: return salvaged raise ValueError(f"Could not extract JSON from model output:\n{text[:500]}") # --------------------------------------------------------------------------- # Coordinate rescaling — the crucial step # --------------------------------------------------------------------------- # The screenshots we feed the model are JPEG-compressed, and small models love to # charge the compression itself ("blurred", "glitched" text). Those are photo # artifacts, not design crimes — recast them deterministically into the design # language the prompt asks for, so no report (or FLUX brief) ever blames pixels. _ARTIFACT_RECASTS = [ (re.compile(r"\bblurr?(?:ed|y|iness)?\b", re.IGNORECASE), "low-contrast"), (re.compile(r"\bglitch(?:ed|y|ing|es)?\b", re.IGNORECASE), "visually muddled"), (re.compile(r"\bpixel(?:ated|ation)?\b", re.IGNORECASE), "undersized"), (re.compile(r"\bdistort(?:ed|ion)?\b", re.IGNORECASE), "cramped"), (re.compile(r"\blow[- ]resolution\b", re.IGNORECASE), "low-prominence"), ] def _scrub_artifacts(text: str) -> str: for pattern, replacement in _ARTIFACT_RECASTS: text = pattern.sub(replacement, text) return text def _scrub_crime(text: str) -> str: s = _scrub_artifacts(text) return s[:1].upper() + s[1:] if s else s def _iou(a, b) -> float: """Intersection-over-union of two [x1,y1,x2,y2] boxes (for de-duplication).""" ax1, ay1, ax2, ay2 = a bx1, by1, bx2, by2 = b ix1, iy1 = max(ax1, bx1), max(ay1, by1) ix2, iy2 = min(ax2, bx2), min(ay2, by2) iw, ih = max(0, ix2 - ix1), max(0, iy2 - iy1) inter = iw * ih if inter == 0: return 0.0 area_a = max(0, ax2 - ax1) * max(0, ay2 - ay1) area_b = max(0, bx2 - bx1) * max(0, by2 - by1) union = area_a + area_b - inter return inter / union if union else 0.0 def _valid_bbox(b) -> bool: if not isinstance(b, (list, tuple)) or len(b) != 4: return False try: [float(v) for v in b] return True except (TypeError, ValueError): return False def _decide_scale( bboxes: list, *, orig_w: int, orig_h: int, qwen_w: int, qwen_h: int ) -> tuple[float, float]: """Decide the coordinate space ONCE for the whole response (not per-box). Qwen2.5-VL with the chat path returns pixel coords in the smart-resized image space. A few prompt conditions make it emit normalized 0-1000 instead. We detect that across ALL boxes: if every coordinate fits within 0-1000 while the resized image is clearly larger than 1000 on its long side, treat the batch as normalized. Otherwise rescale resized->original. This avoids the old per-box magnitude guessing that could misclassify individual boxes.""" coords = [float(v) for b in bboxes if _valid_bbox(b) for v in b] if coords and max(coords) <= 1000.0 and max(qwen_w, qwen_h) > 1000: return orig_w / 1000.0, orig_h / 1000.0 return orig_w / qwen_w, orig_h / qwen_h def _rescale_bbox( bbox, *, orig_w: int, orig_h: int, sx: float, sy: float ) -> tuple[int, int, int, int]: """Map a [x1,y1,x2,y2] to original pixels using a precomputed scale.""" x1, y1, x2, y2 = [float(v) for v in bbox] x1, x2 = sorted([x1 * sx, x2 * sx]) y1, y2 = sorted([y1 * sy, y2 * sy]) x1 = max(0, min(orig_w - 1, int(round(x1)))) x2 = max(0, min(orig_w - 1, int(round(x2)))) y1 = max(0, min(orig_h - 1, int(round(y1)))) y2 = max(0, min(orig_h - 1, int(round(y2)))) if x2 - x1 < 16: x2 = min(orig_w - 1, x1 + 16) if y2 - y1 < 16: y2 = min(orig_h - 1, y1 + 16) return (x1, y1, x2, y2) def _resized_dims(orig_w: int, orig_h: int) -> tuple[int, int]: """Replicates qwen-vl-utils smart_resize without importing torchvision.""" import math factor = QWEN_FACTOR min_pixels = QWEN_MIN_PIXELS max_pixels = QWEN_MAX_PIXELS if max(orig_h, orig_w) / min(orig_h, orig_w) > 200: raise ValueError("aspect ratio too extreme for qwen smart_resize") h_bar = max(factor, round(orig_h / factor) * factor) w_bar = max(factor, round(orig_w / factor) * factor) if h_bar * w_bar > max_pixels: beta = math.sqrt((orig_h * orig_w) / max_pixels) h_bar = max(factor, math.floor(orig_h / beta / factor) * factor) w_bar = max(factor, math.floor(orig_w / beta / factor) * factor) elif h_bar * w_bar < min_pixels: beta = math.sqrt(min_pixels / (orig_h * orig_w)) h_bar = math.ceil(orig_h * beta / factor) * factor w_bar = math.ceil(orig_w * beta / factor) * factor return int(w_bar), int(h_bar) # --------------------------------------------------------------------------- # Public entry point # --------------------------------------------------------------------------- def investigate(image: Image.Image) -> CaseFile: """Run the full pipeline on a PIL image. Returns a CaseFile with bboxes already in the ORIGINAL image's pixel space.""" orig_w, orig_h = image.size image_b64 = _image_to_b64(image) # Dense pages produce long reports — give the sweep the backend's full token # budget so the JSON is less likely to truncate (salvage handles the rest). # Low temperature keeps the smaller model consistent and on-format. response = _call_modal(image_b64, max_tokens=2048, temperature=0.15) raw_text: str = response["text"] # The backend's reported resized_size is the SINGLE SOURCE OF TRUTH for the # coordinate space — it's exactly what was fed to the model. Never re-derive # it on the client (the two smart_resize impls can drift). Require it. backend_resized = response.get("resized_size") if not backend_resized: raise ValueError("backend did not return resized_size; cannot rescale bboxes") qwen_w, qwen_h = int(backend_resized[0]), int(backend_resized[1]) data = _extract_json(raw_text) return _to_case_file( data, image=image, orig_w=orig_w, orig_h=orig_h, qwen_w=qwen_w, qwen_h=qwen_h ) def _snap_to_content(image: Image.Image, box) -> tuple[int, int, int, int]: """Tighten a model-drawn bbox to the actual content inside it. The model sometimes returns a box that's *near* the element but offset by a few percent (e.g. the box for a section title lands a row above the title). This pads the box outward, then crops back to the tightest rectangle around edge-dense pixels — so the circle drawn from this bbox actually encircles the element instead of empty space next to it. Safe-by-default: if the inside is empty or the snap would shrink dramatically (>40%), we return the original box untouched. """ from PIL import ImageFilter import numpy as np x1, y1, x2, y2 = box W, H = image.size bw, bh = x2 - x1, y2 - y1 if bw < 12 or bh < 12: return box # Search window: original box padded outward by ~25% of its size pad_x = int(bw * 0.25) pad_y = int(bh * 0.25) sx1 = max(0, x1 - pad_x); sy1 = max(0, y1 - pad_y) sx2 = min(W, x2 + pad_x); sy2 = min(H, y2 + pad_y) if sx2 - sx1 < 12 or sy2 - sy1 < 12: return box crop = image.convert("L").crop((sx1, sy1, sx2, sy2)) edges = np.asarray(crop.filter(ImageFilter.FIND_EDGES), dtype=np.uint8) # threshold to "is this pixel on an edge?" mask mask = edges > 30 if mask.sum() < 25: # too little signal — keep original return box # tightest rectangle around the edge pixels ys = np.where(mask.any(axis=1))[0] xs = np.where(mask.any(axis=0))[0] if len(xs) == 0 or len(ys) == 0: return box nx1 = sx1 + int(xs[0]); nx2 = sx1 + int(xs[-1]) + 1 ny1 = sy1 + int(ys[0]); ny2 = sy1 + int(ys[-1]) + 1 new_area = max(1, (nx2 - nx1) * (ny2 - ny1)) old_area = bw * bh # If the snap would collapse the box too aggressively (model probably # intended the wider region), keep the original. if new_area < 0.35 * old_area: return box # If the snap barely moves things, also keep original (no churn) if abs(nx1 - x1) < 6 and abs(ny1 - y1) < 6 and abs(nx2 - x2) < 6 and abs(ny2 - y2) < 6: return box return (nx1, ny1, nx2, ny2) def _region_is_empty(image: Image.Image, box) -> bool: """True if the boxed region has almost no content (uniform background / blank space) — catches the model dropping a marker on whitespace. Buttons and text have edges and colour variance, so they survive; empty space does not.""" from PIL import ImageFilter, ImageStat x1, y1, x2, y2 = box if x2 - x1 < 4 or y2 - y1 < 4: return True crop = image.convert("RGB").crop((x1, y1, x2, y2)) # downscale big crops for speed crop.thumbnail((160, 160)) gray = crop.convert("L") var = ImageStat.Stat(gray).stddev[0] edge = ImageStat.Stat(gray.filter(ImageFilter.FIND_EDGES)).mean[0] # both low => flat, featureless region (blank). Tuned conservatively so real # flat-coloured buttons (which still have border + label edges) survive. return var < 6.0 and edge < 7.0 def _collapse_clusters(evidence: list, orig_w: int, orig_h: int) -> list: """Drop charges piled into the same tiny area. Smaller models sometimes stack 2-3 boxes on one element (e.g. three charges on a single 'Sign up' button); keep only the most severe of each tight cluster so the markers stay spread.""" if len(evidence) < 2: return evidence thr = 0.08 * min(orig_w, orig_h) rank = {"low": 0, "medium": 1, "high": 2, "capital": 3} kept: list = [] for ev in evidence: ecx, ecy = (ev.bbox[0] + ev.bbox[2]) / 2, (ev.bbox[1] + ev.bbox[3]) / 2 clash_idx = None for i, k in enumerate(kept): kcx, kcy = (k.bbox[0] + k.bbox[2]) / 2, (k.bbox[1] + k.bbox[3]) / 2 if ((ecx - kcx) ** 2 + (ecy - kcy) ** 2) ** 0.5 < thr: clash_idx = i break if clash_idx is None: kept.append(ev) elif rank.get(ev.severity, 1) > rank.get(kept[clash_idx].severity, 1): kept[clash_idx] = ev # the tighter cluster keeps its worst offender return kept def _collapse_stacks(evidence: list) -> list: """If 3+ boxes share nearly the same x and are stacked vertically with similar widths, the model is enumerating a list/menu (e.g. a Table of Contents), not finding distinct crimes. Keep only the first of such a column.""" if len(evidence) < 3: return evidence keep, dropped = [], set() for i, a in enumerate(evidence): if i in dropped: continue col = [j for j in range(i + 1, len(evidence)) if j not in dropped and abs(evidence[j].bbox[0] - a.bbox[0]) <= 0.02 * 4000 # ~same x1 and abs((evidence[j].bbox[2] - evidence[j].bbox[0]) - (a.bbox[2] - a.bbox[0])) <= 0.04 * 4000] # ~same width if len(col) >= 2: # a + 2 others = a stacked column for j in col: dropped.add(j) keep.append(a) return [e for k, e in enumerate(evidence) if k not in dropped] def _to_case_file( data: dict[str, Any], *, image: Image.Image, orig_w: int, orig_h: int, qwen_w: int, qwen_h: int, ) -> CaseFile: evidence_raw = data.get("evidence", []) or [] # Decide the coordinate space ONCE for the whole response. all_bboxes = [item.get("bbox_2d") or item.get("bbox") for item in evidence_raw] sx, sy = _decide_scale( all_bboxes, orig_w=orig_w, orig_h=orig_h, qwen_w=qwen_w, qwen_h=qwen_h ) evidence: list[Evidence] = [] for idx, item in enumerate(evidence_raw, start=1): bbox = item.get("bbox_2d") or item.get("bbox") if not _valid_bbox(bbox): continue # no usable box -> skip rather than draw a marker at (0,0) rescaled = _rescale_bbox(bbox, orig_w=orig_w, orig_h=orig_h, sx=sx, sy=sy) # Drop degenerate boxes: a "whole-page" box (>82% of the image) is useless # noise, and a sub-pixel box carries no evidence. rx1, ry1, rx2, ry2 = rescaled frac = ((rx2 - rx1) * (ry2 - ry1)) / float(orig_w * orig_h) if frac > 0.82 or frac < 0.00015: continue # Drop near-duplicate boxes: the model sometimes files two charges on the # exact same element. if any(_iou(rescaled, e.bbox) > 0.55 for e in evidence): continue # Drop boxes that landed on blank/whitespace (no real element inside). try: if _region_is_empty(image, rescaled): continue except Exception: pass # Snap-to-content was making boxes WORSE on average (it expanded toward # external edges instead of tightening to the element). The model's raw # bboxes are already accurate; trust them. sev = str(item.get("severity", "medium")).lower().strip() if sev not in VALID_SEVERITY: sev = "medium" evidence.append( Evidence( id=0, # renumbered after stack-collapse below bbox=rescaled, crime=_scrub_crime(str(item.get("crime", "Unnamed crime")).strip()), testimony=_scrub_artifacts(str(item.get("testimony", "")).strip()), severity=sev, fix=_scrub_artifacts(str(item.get("fix", "")).strip()), ) ) # Collapse list/menu enumerations (e.g. a Table of Contents column) into one, # then renumber the survivors 1..N so the markers read cleanly. evidence = _collapse_stacks(evidence) evidence = _collapse_clusters(evidence, orig_w, orig_h) for i, ev in enumerate(evidence, start=1): ev.id = i grade = str(data.get("grade", "F")).strip().upper()[:1] if grade not in VALID_GRADES: grade = "F" return CaseFile( case_number=str(data.get("case_number", "0000")).strip()[:8], case_title=str(data.get("case_title", "Untitled Case")).strip(), scene_summary=str(data.get("scene_summary", "")).strip(), evidence=evidence, verdict=str(data.get("verdict", "GUILTY")).strip(), grade=grade, closing_statement=str(data.get("closing_statement", "")).strip(), ) # --------------------------------------------------------------------------- # Agentic multi-step investigation: SWEEP -> CLOSE-UP zoom -> assemble # --------------------------------------------------------------------------- _SEVERITY_RANK = {"low": 0, "medium": 1, "high": 2, "capital": 3} def _grid_regions( W: int, H: int, cols: int, rows: int, overlap: float = 0.10 ) -> list[tuple[int, int, int, int]]: """Split into a cols×rows grid of overlapping tiles so the model sees every quadrant at native resolution (not just the top strip).""" out: list[tuple[int, int, int, int]] = [] cw, ch = W / cols, H / rows ovx, ovy = int(cw * overlap), int(ch * overlap) for r in range(rows): for c in range(cols): x0 = max(0, int(c * cw) - ovx) y0 = max(0, int(r * ch) - ovy) x1 = min(W, int((c + 1) * cw) + ovx) y1 = min(H, int((r + 1) * ch) + ovy) out.append((x0, y0, x1, y1)) return out def _panorama_pass(image: Image.Image, case: CaseFile) -> CaseFile: """Panoramic screenshots (very wide/tall) get under-scanned by a single sweep — the model fixates on the top strip. Investigate each region at native resolution and merge the NEW crimes in, keeping the global sweep's verdict/title/grade.""" W, H = image.size if max(W, H) < 2000: return case # small enough that one hi-res sweep already covers it ratio = W / H if H else 1.0 cols = rows = 1 if ratio >= 1.8: cols = 3 if ratio >= 2.6 else 2 elif ratio <= 1 / 1.8: rows = 3 if ratio <= 1 / 2.6 else 2 else: return case # not panoramic — nothing to do # If the image is also big on the other axis, add a row/column so the body # (not just the top strip) gets scanned too. if cols > 1 and H >= 1100: rows = 2 if rows > 1 and W >= 1100: cols = 2 tiles = _grid_regions(W, H, cols, rows) merged: list[Evidence] = list(case.evidence) for (x0, y0, x1, y1) in tiles: crop = image.crop((x0, y0, x1, y1)) try: sub = investigate(crop) except Exception: traceback.print_exc() continue for ev in sub.evidence: bx = ( max(0, min(W - 1, ev.bbox[0] + x0)), max(0, min(H - 1, ev.bbox[1] + y0)), max(0, min(W - 1, ev.bbox[2] + x0)), max(0, min(H - 1, ev.bbox[3] + y0)), ) if bx[2] - bx[0] < 12 or bx[3] - bx[1] < 12: continue if any(_iou(bx, e.bbox) > 0.4 for e in merged): continue # already charged by the sweep or another tile try: if _region_is_empty(image, bx): continue except Exception: pass merged.append(Evidence(id=0, bbox=bx, crime=ev.crime, testimony=ev.testimony, severity=ev.severity, fix=ev.fix)) merged = _collapse_clusters(merged, W, H) # keep the worst 6, most severe first merged.sort(key=lambda e: _SEVERITY_RANK.get(e.severity, 1), reverse=True) merged = merged[:6] for i, ev in enumerate(merged, start=1): ev.id = i return CaseFile( case_number=case.case_number, case_title=case.case_title, scene_summary=case.scene_summary, evidence=merged, verdict=case.verdict, grade=case.grade, closing_statement=case.closing_statement, ) def investigate_agentic(image: Image.Image, *, max_zoom: int = 3) -> CaseFile: """Two-phase investigation that actually *investigates*: 1. SWEEP — one full-scene pass (the existing pipeline) flags the suspects. 2. CLOSE-UP — crop + zoom into the most serious suspects and re-examine each with a focused prompt to (a) confirm or clear the crime and (b) tighten the bbox onto the real element. Cleared suspects are dropped; confirmed ones keep the sharper box. Robust by design: any close-up failure falls back to the sweep's box, and if refinement fails wholesale we still return the sweep. The case narrative (title, summary, verdict, grade) comes from the full-scene sweep.""" case = investigate(image) case = _panorama_pass(image, case) # widen coverage on panoramic screenshots if not case.evidence: return case suspects = sorted( case.evidence, key=lambda e: _SEVERITY_RANK.get(e.severity, 1), reverse=True )[:max_zoom] suspect_ids = {e.id for e in suspects} refined: dict[int, Evidence | None] = {} for ev in suspects: try: refined[ev.id] = _zoom_examine(image, ev) except Exception: traceback.print_exc() refined[ev.id] = ev # keep the coarse box on any failure new_evidence: list[Evidence] = [] for ev in case.evidence: if ev.id in suspect_ids: r = refined.get(ev.id, ev) if r is not None: # None => crime cleared on close inspection new_evidence.append(r) else: new_evidence.append(ev) if not new_evidence: # close-up cleared everything — keep the sweep board return case # Rebuild (don't mutate ids in place — the non-suspect items are shared with # `case.evidence`). new_evidence = [ Evidence(id=i, bbox=ev.bbox, crime=ev.crime, testimony=ev.testimony, severity=ev.severity, fix=ev.fix) for i, ev in enumerate(new_evidence, start=1) ] return CaseFile( case_number=case.case_number, case_title=case.case_title, scene_summary=case.scene_summary, evidence=new_evidence, verdict=case.verdict, grade=case.grade, closing_statement=case.closing_statement, ) def _zoom_examine(image: Image.Image, ev: Evidence) -> Evidence | None: """Crop+zoom the region around `ev`, re-examine it up close, and return a tightened Evidence (bbox in ORIGINAL image space) — or None if cleared.""" W, H = image.size x1, y1, x2, y2 = ev.bbox bw, bh = x2 - x1, y2 - y1 pad_x = int(bw * 0.45) + 20 pad_y = int(bh * 0.45) + 20 cx1, cy1 = max(0, x1 - pad_x), max(0, y1 - pad_y) cx2, cy2 = min(W, x2 + pad_x), min(H, y2 + pad_y) if cx2 - cx1 < 16 or cy2 - cy1 < 16: return ev crop = image.crop((cx1, cy1, cx2, cy2)) ocw, och = crop.size # Upscale small crops so the vision encoder sees real detail (better grounding). # Target ~960px long side: big enough for sharp corner localization, still under # the backend's ~1.0 MP cap so it isn't downscaled back on the server. upscale = 1.0 sent = crop if max(ocw, och) < 1280: upscale = 1280.0 / max(ocw, och) sent = crop.resize((max(1, round(ocw * upscale)), max(1, round(och * upscale)))) sent_w, sent_h = sent.size resp = _call_modal( _image_to_b64(sent), system_prompt=CLOSEUP_SYSTEM_PROMPT.format( crime=ev.crime.replace("{", "{{").replace("}", "}}") ), user_prompt=CLOSEUP_USER_PROMPT, max_tokens=400, temperature=0.1, ) data = _extract_json(resp["text"]) if data.get("confirmed") is False: return None # cleared on close inspection bbox = data.get("bbox_2d") or data.get("bbox") if not _valid_bbox(bbox): return ev # no usable refined box -> keep the coarse one qwen = resp.get("resized_size") or [sent_w, sent_h] qw = int(qwen[0]) or sent_w qh = int(qwen[1]) or sent_h # Close-up always maps resized->sent pixels DIRECTLY. The sweep's 0-1000 # normalized-coords heuristic is unreliable on a single small-crop box (it can # misread real pixel coords <=1000 as normalized), so skip it here and trust the # resized space the backend reported. sb = _rescale_bbox( bbox, orig_w=sent_w, orig_h=sent_h, sx=sent_w / qw, sy=sent_h / qh ) # SENT space # SENT -> original crop (undo upscale) -> full image (add crop origin) fx1, fx2 = sorted((cx1 + sb[0] / upscale, cx1 + sb[2] / upscale)) fy1, fy2 = sorted((cy1 + sb[1] / upscale, cy1 + sb[3] / upscale)) nb = ( max(0, min(W - 1, int(round(fx1)))), max(0, min(H - 1, int(round(fy1)))), max(0, min(W - 1, int(round(fx2)))), max(0, min(H - 1, int(round(fy2)))), ) if nb[2] - nb[0] < 12 or nb[3] - nb[1] < 12: return ev # Don't let the close-up collapse a deliberately BROAD charge (e.g. "the whole # category sea") into a tiny element. If the refined box is a small fraction of # the coarse one, the sweep's wider box was the intended evidence — keep it. coarse_area = max(1, (x2 - x1) * (y2 - y1)) new_area = (nb[2] - nb[0]) * (nb[3] - nb[1]) if new_area < 0.22 * coarse_area: return ev try: if _region_is_empty(image, nb): return ev except Exception: pass crime = _scrub_crime(str(data.get("crime") or ev.crime).strip() or ev.crime) testimony = _scrub_artifacts(str(data.get("testimony") or ev.testimony).strip() or ev.testimony) fix = _scrub_artifacts(str(data.get("fix") or ev.fix).strip()) sev = str(data.get("severity", ev.severity)).lower().strip() if sev not in VALID_SEVERITY: sev = ev.severity return Evidence(id=ev.id, bbox=nb, crime=crime, testimony=testimony, severity=sev, fix=fix)