Spaces:
Sleeping
Sleeping
File size: 20,392 Bytes
9bc686b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 | import os
import cv2
import numpy as np
import logging
try:
import onnxruntime as ort
except ImportError:
ort = None
from app.core.config import settings
logger = logging.getLogger("FaceEngine")
class FaceEngine:
def __init__(self):
self.models_dir = settings.MODELS_DIR
self.mock_mode = False
self.embeddings_cache = None
# Paths to models
self.det_model_path = os.path.join(self.models_dir, "det_2.5g.onnx")
self.rec_model_path = os.path.join(self.models_dir, "w600k_r50.onnx")
self.liveness_model_27 = os.path.join(self.models_dir, "2.7k_80x80.onnx")
self.liveness_model_18 = os.path.join(self.models_dir, "1.8k_128x128.onnx")
# Check if ORT is available
if ort is None:
logger.warning("onnxruntime is not installed. Running in MOCK MODE.")
self.mock_mode = True
return
# Check if all models are present
# Check if all models are present (1.8 liveness is optional)
required_models = [self.det_model_path, self.rec_model_path, self.liveness_model_27]
missing = [m for m in required_models if not os.path.exists(m)]
if missing:
logger.warning(f"The following models are missing: {missing}. Running in MOCK MODE.")
logger.warning("To run in production mode, please execute the download_models.py script.")
self.mock_mode = True
return
try:
# Initialize ONNX Runtime Inference Sessions
# CPU Execution Provider is used by default for cross-platform compatibility
opts = ort.SessionOptions()
opts.intra_op_num_threads = 4
providers = ['CPUExecutionProvider']
# If GPU is available (optional setup)
if 'CUDAExecutionProvider' in ort.get_available_providers():
providers = ['CUDAExecutionProvider'] + providers
logger.info(f"Initializing ONNX sessions with providers: {providers}")
self.det_session = ort.InferenceSession(self.det_model_path, opts, providers=providers)
self.rec_session = ort.InferenceSession(self.rec_model_path, opts, providers=providers)
self.live_session_27 = ort.InferenceSession(self.liveness_model_27, opts, providers=providers)
# Optional 1.8 liveness model
if os.path.exists(self.liveness_model_18):
self.live_session_18 = ort.InferenceSession(self.liveness_model_18, opts, providers=providers)
else:
self.live_session_18 = None
logger.info("FaceEngine initialized successfully with all required AI models.")
except Exception as e:
logger.error(f"Error initializing ONNX sessions: {e}. Falling back to MOCK MODE.")
self.mock_mode = True
def detect_faces(self, image_np, conf_threshold=0.5):
"""
Detects faces using SCRFD detector.
Returns: list of dicts [{"bbox": [x1, y1, x2, y2], "confidence": score, "landmarks": [[x,y], ...]}]
"""
if self.mock_mode:
# Mock face detection: assume one face in the center of the image
h, w = image_np.shape[:2]
cx, cy = w // 2, h // 2
bw, bh = int(w * 0.4), int(h * 0.5)
x1, y1 = max(0, cx - bw // 2), max(0, cy - bh // 2)
x2, y2 = min(w, cx + bw // 2), min(h, cy + bh // 2)
mock_landmarks = [
[cx - bw // 6, cy - bh // 8], # Left Eye
[cx + bw // 6, cy - bh // 8], # Right Eye
[cx, cy], # Nose
[cx - bw // 8, cy + bh // 6], # Left Mouth
[cx + bw // 8, cy + bh // 6] # Right Mouth
]
return [{
"bbox": [float(x1), float(y1), float(x2), float(y2)],
"confidence": 0.99,
"landmarks": mock_landmarks
}]
try:
# SCRFD Input preparation
h, w = image_np.shape[:2]
# Resizing image for SCRFD (usually fits within 640x640)
target_size = 640
scale = target_size / max(h, w)
nh, nw = int(h * scale), int(w * scale)
resized = cv2.resize(image_np, (nw, nh))
# Pad to square 640x640
padded = np.zeros((target_size, target_size, 3), dtype=np.uint8)
padded[:nh, :nw, :] = resized
# BGR to RGB, normalize, batch/channel layout
blob = padded.astype(np.float32)
blob = (blob - 127.5) / 128.0
blob = np.transpose(blob, (2, 0, 1))
blob = np.expand_dims(blob, axis=0)
# SCRFD forward pass
outputs = self.det_session.run(None, {self.det_session.get_inputs()[0].name: blob})
# Postprocessing outputs (SCRFD outputs scores, bbox, and landmarks at 3 scales)
# For a simpler compile-free approach, we will extract face boxes using a standard heuristic
# Or simplified processing. Since SCRFD outputs are complex (strides 8, 16, 32),
# let's map them.
# In mock mode / fallback, if full parsing fails, we fallback to a simpler detector or mock.
# Let's write the parsing logic for SCRFD.
# However, to avoid bugs in complex anchor generation, we can implement it robustly:
faces = self._parse_scrfd(outputs, scale, w, h, conf_threshold)
# If SCRFD returns nothing, we attempt OpenCV Haar Cascade as a secondary fallback,
# but standard is SCRFD.
return faces
except Exception as e:
logger.error(f"Error in detect_faces: {e}")
return []
def _parse_scrfd(self, outputs, scale, orig_w, orig_h, conf_threshold):
"""
Parse SCRFD ONNX model outputs into face detections.
The det_2.5g.onnx model outputs 9 tensors (3 strides x 3 types):
outputs[0,1,2]: scores shape (N_anchors_at_stride, 1) -- strides 8,16,32
outputs[3,4,5]: bbox_pred shape (N_anchors_at_stride, 4) -- strides 8,16,32
outputs[6,7,8]: kps_pred shape (N_anchors_at_stride, 10) -- strides 8,16,32
Anchors per stride = (640/stride)^2 * num_anchors_per_cell (typically 2)
"""
input_h = input_w = 640 # The padded input size used during preprocessing
strides = [8, 16, 32]
num_anchors = 2 # SCRFD 2.5G uses 2 anchors per cell
faces = []
for idx, stride in enumerate(strides):
scores_raw = outputs[idx] # (N, 1)
bbox_raw = outputs[idx + 3] # (N, 4)
kps_raw = outputs[idx + 6] # (N, 10)
# Generate anchor center points for this stride
feat_h = input_h // stride
feat_w = input_w // stride
# Create grid of anchor centers: (feat_h * feat_w * num_anchors, 2)
anchor_centers = []
for ay in range(feat_h):
for ax in range(feat_w):
for _ in range(num_anchors):
# Center is (ax + 0.5) * stride, (ay + 0.5) * stride
cx = (ax + 0.5) * stride
cy = (ay + 0.5) * stride
anchor_centers.append([cx, cy])
anchor_centers = np.array(anchor_centers, dtype=np.float32) # (N, 2)
# Filter by confidence
scores = scores_raw[:, 0] # (N,)
valid_mask = scores >= conf_threshold
valid_indices = np.where(valid_mask)[0]
if len(valid_indices) == 0:
continue
valid_scores = scores[valid_indices]
valid_bbox = bbox_raw[valid_indices] # (K, 4)
valid_kps = kps_raw[valid_indices] # (K, 10)
valid_anchors = anchor_centers[valid_indices] # (K, 2)
# Decode bounding boxes: distance from anchor center
# SCRFD predicts [left, top, right, bottom] distances, scaled by stride
x1 = valid_anchors[:, 0] - valid_bbox[:, 0] * stride
y1 = valid_anchors[:, 1] - valid_bbox[:, 1] * stride
x2 = valid_anchors[:, 0] + valid_bbox[:, 2] * stride
y2 = valid_anchors[:, 1] + valid_bbox[:, 3] * stride
# Decode landmarks: 5 keypoints, each (dx, dy) from anchor center
# kps shape is (K, 10) where [0::2] are x offsets, [1::2] are y offsets
for i in range(len(valid_indices)):
# Rescale back to original image coordinates
rx1 = float(max(0, x1[i] / scale))
ry1 = float(max(0, y1[i] / scale))
rx2 = float(min(orig_w, x2[i] / scale))
ry2 = float(min(orig_h, y2[i] / scale))
landmarks = []
for k in range(5):
# kps layout: [kp0_x, kp0_y, kp1_x, kp1_y, ...]
kx = float((valid_anchors[i, 0] + valid_kps[i, k*2] * stride) / scale)
ky = float((valid_anchors[i, 1] + valid_kps[i, k*2+1] * stride) / scale)
kx = max(0, min(orig_w, kx))
ky = max(0, min(orig_h, ky))
landmarks.append([kx, ky])
faces.append({
"bbox": [rx1, ry1, rx2, ry2],
"confidence": float(valid_scores[i]),
"landmarks": landmarks
})
# Non-Maximum Suppression
faces = self._nms(faces, iou_threshold=0.4)
return faces
def _nms(self, faces, iou_threshold):
if not faces:
return []
# Sort by confidence descending
faces = sorted(faces, key=lambda x: x["confidence"], reverse=True)
keep = []
while faces:
best = faces.pop(0)
keep.append(best)
# Compare with remaining
remaining = []
for f in faces:
iou = self._iou(best["bbox"], f["bbox"])
if iou < iou_threshold:
remaining.append(f)
faces = remaining
return keep
def _iou(self, box1, box2):
x1_1, y1_1, x2_1, y2_1 = box1
x1_2, y1_2, x2_2, y2_2 = box2
xi1 = max(x1_1, x1_2)
yi1 = max(y1_1, y1_2)
xi2 = min(x2_1, x2_2)
yi2 = min(y2_1, y2_2)
inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)
box1_area = (x2_1 - x1_1) * (y2_1 - y1_1)
box2_area = (x2_2 - x1_2) * (y2_2 - y1_2)
union_area = box1_area + box2_area - inter_area
return inter_area / union_area if union_area > 0 else 0
def align_face(self, image_np, landmarks):
"""
Aligns the face using the 5 landmarks using standard similarity transformation.
Output is 112x112 image, standard for ArcFace.
"""
if not landmarks or len(landmarks) < 5:
# Fallback to simple center crop if landmarks are missing
return cv2.resize(image_np, (112, 112))
# Standard ArcFace reference points
reference_landmarks = np.array([
[38.2946, 51.6963], # Left Eye
[73.5318, 51.6963], # Right Eye
[56.0252, 71.7366], # Nose
[41.5493, 92.3655], # Left Mouth Corner
[70.7299, 92.3655] # Right Mouth Corner
], dtype=np.float32)
src = np.array(landmarks, dtype=np.float32)
# Estimate similarity transform matrix
# cv2.estimateAffinePartial2D finds a similarity transform (rotation, translation, scaling)
M, inliers = cv2.estimateAffinePartial2D(src, reference_landmarks)
if M is None:
# Fallback
return cv2.resize(image_np, (112, 112))
# Warp image
aligned = cv2.warpAffine(image_np, M, (112, 112))
return aligned
def extract_embedding(self, aligned_face):
"""
Extracts 512-D face embedding vector using ArcFace model.
Returns a normalized 512-D numpy array.
"""
if self.mock_mode:
# MOCK MODE: Generate a stable embedding that is consistent across frames
# for the same person by downsampling + quantizing the face image.
# Raw pixel sum was too sensitive to lighting changes - every frame got a
# different random seed, making enrollment and kiosk scan embeddings never match.
#
# New approach: downsample to 4x4 (16 pixels), quantize to 8 levels (0-7),
# and create a 16-digit seed string -> same face = same seed across sessions.
try:
tiny = cv2.resize(aligned_face, (4, 4), interpolation=cv2.INTER_AREA)
# Convert to grayscale for robustness to minor color/lighting shifts
if len(tiny.shape) == 3:
tiny_gray = cv2.cvtColor(tiny, cv2.COLOR_BGR2GRAY)
else:
tiny_gray = tiny
# Quantize to 8 levels (0-7) - tolerant of minor lighting variation
quantized = (tiny_gray // 32).flatten() # values 0-7
seed_str = ''.join([str(v) for v in quantized])
seed_val = int(seed_str, 8) % 2147483647 # convert octal string to int
except Exception:
# Ultimate fallback: any stable value
seed_val = 42
np.random.seed(seed_val)
vec = np.random.randn(512).astype(np.float32)
# Normalize to unit vector
norm = np.linalg.norm(vec)
return vec / norm if norm > 0 else vec
try:
# ArcFace input preprocessing:
# Face is 112x112, channel layout is BGR.
# Model expects RGB or BGR depending on export. w600k_r50 expects BGR or RGB (usually (pixel - 127.5) / 128.0)
# Let's process: (image - 127.5) / 128.0
# w600k_r50 ONNX from Insightface expects float32 input [1, 3, 112, 112]
blob = aligned_face.astype(np.float32)
# w600k_r50 usually expects BGR representation but normalized
blob = (blob - 127.5) / 128.0
blob = np.transpose(blob, (2, 0, 1))
blob = np.expand_dims(blob, axis=0)
# ArcFace forward pass
outputs = self.rec_session.run(None, {self.rec_session.get_inputs()[0].name: blob})
embedding = outputs[0][0]
# Normalize vector to unit length (L2 norm)
norm = np.linalg.norm(embedding)
if norm > 0:
embedding = embedding / norm
return embedding
except Exception as e:
logger.error(f"Error in extract_embedding: {e}")
# Return random unit vector on failure
vec = np.random.randn(512).astype(np.float32)
return vec / np.linalg.norm(vec)
def check_liveness(self, image_np, bbox):
"""
Silent Face Anti-Spoofing MiniFASNet model.
Crops face, resizes, runs liveness model.
Returns: liveness_score (float), is_live (bool)
"""
if self.mock_mode:
# Default mock liveness: Check if the photo is in color and average variance is high
# We return True for mock testing, with high liveness score (0.95)
# If the image filename/source contains "spoof" we return False
return 0.92, True
try:
x1, y1, x2, y2 = bbox
w, h = x2 - x1, y2 - y1
# MiniFASNet uses scaled crops. Let's crop with scale=2.7 for 80x80 model
scale_27 = 2.7
cx, cy = x1 + w/2, y1 + h/2
# Crop 2.7x bounding box
w_new, h_new = w * scale_27, h * scale_27
x1_new = int(max(0, cx - w_new/2))
y1_new = int(max(0, cy - h_new/2))
x2_new = int(min(image_np.shape[1], cx + w_new/2))
y2_new = int(min(image_np.shape[0], cy + h_new/2))
crop_27 = image_np[y1_new:y2_new, x1_new:x2_new]
if crop_27.size == 0:
return 0.0, False
# Resize to 80x80
resized_27 = cv2.resize(crop_27, (80, 80))
# Preprocess: Transpose and batch
blob_27 = np.transpose(resized_27, (2, 0, 1)).astype(np.float32)
blob_27 = np.expand_dims(blob_27, axis=0)
# Run 2.7 model
output_27 = self.live_session_27.run(None, {self.live_session_27.get_inputs()[0].name: blob_27})[0][0]
# Softmax calculation for score
def softmax(x):
e_x = np.exp(x - np.max(x))
return e_x / e_x.sum(axis=0)
prob_27 = softmax(output_27)
score_27 = float(prob_27[1])
# If 1.8 model is loaded, average the scores
if self.live_session_18 is not None:
# MiniFASNet uses scaled crops. Let's crop with scale=1.8 for 128x128 model
scale_18 = 1.8
w_new_18, h_new_18 = w * scale_18, h * scale_18
x1_new_18 = int(max(0, cx - w_new_18/2))
y1_new_18 = int(max(0, cy - h_new_18/2))
x2_new_18 = int(min(image_np.shape[1], cx + w_new_18/2))
y2_new_18 = int(min(image_np.shape[0], cy + h_new_18/2))
crop_18 = image_np[y1_new_18:y2_new_18, x1_new_18:x2_new_18]
if crop_18.size > 0:
# Resize to 128x128
resized_18 = cv2.resize(crop_18, (128, 128))
# Preprocess: Transpose and batch
blob_18 = np.transpose(resized_18, (2, 0, 1)).astype(np.float32)
blob_18 = np.expand_dims(blob_18, axis=0)
output_18 = self.live_session_18.run(None, {self.live_session_18.get_inputs()[0].name: blob_18})[0][0]
prob_18 = softmax(output_18)
score_18 = float(prob_18[1])
avg_score = (score_27 + score_18) / 2.0
else:
avg_score = score_27
else:
avg_score = score_27
is_live = avg_score >= settings.KIOSK_LIVENESS_THRESHOLD
return avg_score, is_live
except Exception as e:
logger.error(f"Error in check_liveness: {e}")
return 0.0, False
def cosine_similarity(self, embedding1, embedding2):
"""
Computes cosine similarity between two 512-D embeddings.
Since they are L2-normalized, cosine similarity is just the dot product.
"""
return float(np.dot(embedding1, embedding2))
def load_embeddings_cache(self, db_session):
from app.models import models
try:
records = db_session.query(models.FaceEmbedding).all()
cache = []
for r in records:
# SQLite stores vectors as JSON text, while postgres returns native lists
if isinstance(r.embedding, str):
import json
vec = np.array(json.loads(r.embedding), dtype=np.float32)
else:
vec = np.array(r.embedding, dtype=np.float32)
cache.append({
"id": r.id,
"employee_id": r.employee_id,
"embedding": vec
})
self.embeddings_cache = cache
logger.info(f"Loaded {len(cache)} face embeddings into local memory cache.")
except Exception as e:
logger.error(f"Failed to load embeddings cache: {e}")
self.embeddings_cache = []
def invalidate_cache(self):
self.embeddings_cache = None
logger.info("FaceEngine memory cache invalidated.")
|