#!/usr/bin/env python3 import argparse import logging import os import sys import time import cv2 import numpy as np import axengine as ort logging.basicConfig( level=logging.DEBUG, format='[%(name)s] [%(asctime)s.%(msecs)03d] [%(levelname)s] %(message)s', datefmt='%H:%M:%S', ) logger = logging.getLogger("Aquarium-YOLOv8-6way") PROB_THRESHOLD = 0.45 NMS_THRESHOLD = 0.45 REG_MAX = 16 STRIDES = (8, 16, 32) DEFAULT_NAMES = ["fish", "turtle", "shrimp", "crab", "snail"] DEFAULT_COLORS = [ (56, 56, 255), (151, 157, 255), (31, 112, 255), (29, 178, 255), (49, 210, 207), ] def infer_hw_layout(shape): shape = list(shape) if len(shape) == 4 and shape[-1] == 3: h = int(shape[1] or 640) w = int(shape[2] or 640) return h, w, "NHWC" if len(shape) == 4 and shape[1] == 3: h = int(shape[2] or 640) w = int(shape[3] or 640) return h, w, "NCHW" return 640, 640, "NCHW" def letterbox(bgr, dst_h, dst_w, pad_value=114): h, w = bgr.shape[:2] scale = min(dst_h / h, dst_w / w) new_h, new_w = int(round(h * scale)), int(round(w * scale)) resized = cv2.resize(bgr, (new_w, new_h), interpolation=cv2.INTER_LINEAR) top = (dst_h - new_h) // 2 bot = dst_h - new_h - top left = (dst_w - new_w) // 2 right = dst_w - new_w - left out = cv2.copyMakeBorder( resized, top, bot, left, right, cv2.BORDER_CONSTANT, value=(pad_value, pad_value, pad_value), ) meta = { "src_h": h, "src_w": w, "dst_h": dst_h, "dst_w": dst_w, "scale": scale, "pad_top": top, "pad_left": left, } return out, meta def _to_hwc(t, c_expected): a = np.asarray(t) if a.ndim == 3: a = a[None, ...] if a.shape[-1] == c_expected: return a[0] if a.shape[1] == c_expected: return np.transpose(a[0], (1, 2, 0)) raise ValueError(f"unexpected shape {a.shape!r} for C={c_expected}") def group_outputs(out_names, outs, cls_num): name_to_arr = dict(zip(out_names, outs)) by_stride = {} if all(f"stride_{s}_{suf}" in name_to_arr for s in STRIDES for suf in ("cls", "bbox")): for s in STRIDES: by_stride[s] = ( _to_hwc(name_to_arr[f"stride_{s}_cls"], cls_num), _to_hwc(name_to_arr[f"stride_{s}_bbox"], 4 * REG_MAX), ) return by_stride cls_outs, bb_outs = [], [] for t in outs: a = np.asarray(t) if a.ndim == 3: a = a[None, ...] c_last, c_first = a.shape[-1], a.shape[1] if cls_num in (c_last, c_first): cls_outs.append(a) elif (4 * REG_MAX) in (c_last, c_first): bb_outs.append(a) cls_outs.sort(key=lambda x: -(x.shape[1] * x.shape[2])) bb_outs.sort(key=lambda x: -(x.shape[1] * x.shape[2])) if len(cls_outs) != 3 or len(bb_outs) != 3: raise ValueError( f"expected 3 cls + 3 bbox, got {len(cls_outs)} cls + {len(bb_outs)} bbox" ) for s, ct, bt in zip(STRIDES, cls_outs, bb_outs): by_stride[s] = (_to_hwc(ct, cls_num), _to_hwc(bt, 4 * REG_MAX)) return by_stride def decode_one_scale(stride, cls_hwc, bbox_hwc, prob_thr, dst_h, dst_w): hf, wf, _ = cls_hwc.shape assert bbox_hwc.shape[:2] == (hf, wf) and bbox_hwc.shape[2] == 4 * REG_MAX logit_thr = -np.log(1.0 / prob_thr - 1.0) if 0 < prob_thr < 1 else -np.inf cls_max = cls_hwc.max(axis=2) cls_arg = cls_hwc.argmax(axis=2) keep = cls_max >= logit_thr if not keep.any(): return (np.empty((0, 4), np.float32), np.empty((0,), np.float32), np.empty((0,), np.int32)) yi, xi = np.where(keep) logits = cls_max[yi, xi].astype(np.float64) probs = (1.0 / (1.0 + np.exp(-logits))).astype(np.float32) labels = cls_arg[yi, xi].astype(np.int32) dfl = bbox_hwc[yi, xi].reshape(-1, 4, REG_MAX).astype(np.float64) dfl = dfl - dfl.max(axis=-1, keepdims=True) e = np.exp(dfl) sm = e / e.sum(axis=-1, keepdims=True) proj = np.arange(REG_MAX, dtype=np.float64) ltrb = (sm * proj).sum(axis=-1) * stride cx = (xi + 0.5) * stride cy = (yi + 0.5) * stride x0 = cx - ltrb[:, 0] y0 = cy - ltrb[:, 1] x1 = cx + ltrb[:, 2] y1 = cy + ltrb[:, 3] boxes = np.stack([x0, y0, x1, y1], axis=1).astype(np.float32) boxes[:, [0, 2]] = np.clip(boxes[:, [0, 2]], 0, dst_w - 1) boxes[:, [1, 3]] = np.clip(boxes[:, [1, 3]], 0, dst_h - 1) return boxes, probs, labels def per_class_nms(boxes_xyxy, scores, labels, score_thr, iou_thr): if len(boxes_xyxy) == 0: return np.empty((0,), np.int64) keep_global = [] for c in np.unique(labels): idx = np.where(labels == c)[0] rects_xywh = np.column_stack([ boxes_xyxy[idx, 0], boxes_xyxy[idx, 1], boxes_xyxy[idx, 2] - boxes_xyxy[idx, 0], boxes_xyxy[idx, 3] - boxes_xyxy[idx, 1], ]).tolist() kept = cv2.dnn.NMSBoxes(rects_xywh, scores[idx].tolist(), score_thr, iou_thr) if isinstance(kept, np.ndarray): kept = kept.flatten().tolist() keep_global.extend(int(idx[k]) for k in kept) return np.array(keep_global, dtype=np.int64) def unletterbox(boxes_xyxy, meta): if len(boxes_xyxy) == 0: return boxes_xyxy out = boxes_xyxy.copy() out[:, [0, 2]] -= meta["pad_left"] out[:, [1, 3]] -= meta["pad_top"] out /= meta["scale"] out[:, [0, 2]] = np.clip(out[:, [0, 2]], 0, meta["src_w"] - 1) out[:, [1, 3]] = np.clip(out[:, [1, 3]], 0, meta["src_h"] - 1) return out def draw(img, boxes_xyxy, scores, labels, names, colors): vis = img.copy() for b, s, c in zip(boxes_xyxy, scores, labels): x0, y0, x1, y1 = [int(round(v)) for v in b] color = colors[int(c) % len(colors)] nm = names[int(c)] if 0 <= int(c) < len(names) else str(int(c)) cv2.rectangle(vis, (x0, y0), (x1, y1), color, 2) text = f"{nm} {float(s):.2f}" (tw, th), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) y_text = max(th + 2, y0) cv2.rectangle(vis, (x0, y_text - th - 2), (x0 + tw + 2, y_text + 1), color, -1) cv2.putText(vis, text, (x0 + 1, y_text - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, cv2.LINE_AA) return vis def main(): ap = argparse.ArgumentParser(description="aquarium YOLOv8s 6-way axmodel inference (AXERARuntime)") ap.add_argument('--model-path', type=str, default='aquarium_yolov8s_6way.axmodel') ap.add_argument('--test-img', type=str, default='test.jpg') ap.add_argument('--img-save-path', type=str, default='result_aquarium_yolov8.jpg') ap.add_argument('--score-thres', type=float, default=PROB_THRESHOLD) ap.add_argument('--nms-thres', type=float, default=NMS_THRESHOLD) ap.add_argument('--repeat', type=int, default=1) ap.add_argument('--names', type=str, default=",".join(DEFAULT_NAMES)) ap.add_argument('--providers', type=str, default='AxEngineExecutionProvider') opt = ap.parse_args() if not os.path.exists(opt.model_path): logger.error(f"Model not found: {opt.model_path}") sys.exit(1) if not os.path.exists(opt.test_img): logger.error(f"Image not found: {opt.test_img}") sys.exit(1) names = [s.strip() for s in opt.names.split(",") if s.strip()] cls_num = len(names) t0 = time.time() providers = [p.strip() for p in opt.providers.split(",") if p.strip()] or None sess = ort.InferenceSession(opt.model_path, providers=providers) logger.debug(f"\033[1;31mLoad model time = {(time.time() - t0) * 1000:.2f} ms\033[0m") inp = sess.get_inputs()[0] input_name = inp.name m_h, m_w, layout = infer_hw_layout(inp.shape) img = cv2.imread(opt.test_img) if img is None: logger.error(f"Failed to read image: {opt.test_img}") sys.exit(1) t0 = time.time() pad_bgr, meta = letterbox(img, m_h, m_w, pad_value=114) rgb = cv2.cvtColor(pad_bgr, cv2.COLOR_BGR2RGB) if layout == "NHWC": input_tensor = rgb[None, ...].astype(np.uint8) else: input_tensor = np.transpose(rgb, (2, 0, 1))[None, ...].astype(np.uint8) logger.debug(f"\033[1;31mPre-process time = {(time.time() - t0) * 1000:.2f} ms\033[0m") out_infos = sess.get_outputs() out_names = [o.name for o in out_infos] times = [] outs = None for _ in range(max(opt.repeat, 1)): t0 = time.time() outs = sess.run(None, {input_name: input_tensor}) times.append((time.time() - t0) * 1000.0) logger.debug( f"\033[1;31mForward time min/avg/max = " f"{min(times):.2f}/{sum(times)/len(times):.2f}/{max(times):.2f} ms (n={len(times)})\033[0m" ) assert outs is not None if len(outs) != 6: raise ValueError(f"need 6 outputs, got {len(outs)}: {out_names}") t0 = time.time() by_s = group_outputs(out_names, outs, cls_num) boxes_all, scores_all, labels_all = [], [], [] for s in STRIDES: cl, bb = by_s[s] b, p, l = decode_one_scale(s, cl, bb, opt.score_thres, m_h, m_w) if len(b): boxes_all.append(b); scores_all.append(p); labels_all.append(l) if boxes_all: boxes = np.concatenate(boxes_all) scores = np.concatenate(scores_all) labels = np.concatenate(labels_all) keep = per_class_nms(boxes, scores, labels, opt.score_thres, opt.nms_thres) boxes = unletterbox(boxes[keep], meta) scores = scores[keep]; labels = labels[keep] else: boxes = np.empty((0, 4), np.float32) scores = np.empty((0,), np.float32) labels = np.empty((0,), np.int32) logger.debug(f"\033[1;31mPost-process time = {(time.time() - t0) * 1000:.2f} ms\033[0m") counts = {n: 0 for n in names} logger.info(f"\033[1;32mDetections: {len(boxes)}\033[0m") for b, s, c in zip(boxes, scores, labels): x0, y0, x1, y1 = b nm = names[int(c)] if 0 <= int(c) < len(names) else str(int(c)) counts[nm] = counts.get(nm, 0) + 1 logger.info(f" {nm:8s} score={float(s):.3f} xyxy=({x0:.1f},{y0:.1f},{x1:.1f},{y1:.1f})") logger.info(f"per-class: {counts}") if opt.img_save_path: vis = draw(img, boxes, scores, labels, names, DEFAULT_COLORS) os.makedirs(os.path.dirname(os.path.abspath(opt.img_save_path)) or ".", exist_ok=True) cv2.imwrite(opt.img_save_path, vis) logger.info(f"Saved to {opt.img_save_path}") if __name__ == "__main__": main()