| import os |
| from bisect import bisect_left |
| from typing import Dict, List, Tuple |
|
|
| import cv2 |
| import numpy as np |
|
|
| from tools.config import OUTPUT_FOLDER, SAVE_WORD_SEGMENTER_OUTPUT_IMAGES |
|
|
| |
| BLOCK_SIZE_FACTOR = 0.5 |
| BLOCK_SIZE_MEDIAN_CC_FACTOR = 1.2 |
| C_VALUE = 2 |
| REFERENCE_LINE_HEIGHT = 50 |
|
|
| |
| INITIAL_KERNEL_WIDTH_FACTOR = 0.0 |
| INITIAL_VALLEY_THRESHOLD_FACTOR = ( |
| 0.0 |
| ) |
| MAIN_VALLEY_THRESHOLD_FACTOR = ( |
| 0.15 |
| ) |
| MIN_SPACE_FACTOR = 0.2 |
| MATCH_TOLERANCE = 0 |
|
|
| |
| MIN_AREA_HEIGHT_FRACTION = 0.05 |
| MIN_AREA_FLOOR = 2 |
| DEFAULT_TRIM_PERCENTAGE = ( |
| 0.2 |
| ) |
|
|
| |
| MIN_SKEW_THRESHOLD = 0.5 |
| MAX_SKEW_THRESHOLD = 15.0 |
| |
| SKEW_BASELINE_MIN_POINTS = 20 |
| SKEW_HOUGH_THRESHOLD = 25 |
|
|
| ALLOWED_WORD_MISMATCH_COUNT = 0 |
|
|
| |
| |
| NOISE_THRESHOLD = 800 |
|
|
| |
| |
| POLARITY_MEAN_THRESHOLD = 128 |
| POLARITY_CORNER_FRACTION = ( |
| 0.15 |
| ) |
|
|
| SEARCH_STAGE1_COARSE_STEP = 0.06 |
| SEARCH_STAGE1_FINE_STEP = 0.02 |
| SEARCH_STAGE2_COARSE_STEP = 0.05 |
| SEARCH_STAGE2_FINE_STEP = 0.02 |
|
|
|
|
| def _find_widest_zero_gaps( |
| vertical_projection: np.ndarray, |
| n: int, |
| gap_threshold: float = 0.0, |
| ) -> List[Tuple[int, int]]: |
| """ |
| Find the N widest contiguous zero-gaps (or near-zero) in the vertical projection. |
| Used for justified text: anchor word cut points to the centers of these gaps. |
| Returns list of (start, end) in left-to-right order, or empty if not enough gaps. |
| """ |
| if vertical_projection is None or n <= 0: |
| return [] |
| w = len(vertical_projection) |
| gaps = [] |
| in_gap = False |
| start = 0 |
| for x in range(w): |
| val = vertical_projection[x] if x < w else 0 |
| if val <= gap_threshold and not in_gap: |
| start = x |
| in_gap = True |
| elif val > gap_threshold and in_gap: |
| gaps.append((start, x)) |
| in_gap = False |
| if in_gap: |
| gaps.append((start, w)) |
| if not gaps: |
| return [] |
| |
| gaps_by_width = sorted(gaps, key=lambda g: g[1] - g[0], reverse=True) |
| selected = gaps_by_width[:n] |
| |
| selected.sort(key=lambda g: g[0]) |
| return selected |
|
|
|
|
| |
| TRAILING_PUNCTUATION_CHARS = frozenset(".,:;\"'!?)]}") |
|
|
|
|
| def _word_ends_with_punctuation(word: str) -> bool: |
| """True if word ends with a punctuation character that may have a gap before it.""" |
| return bool(word and word[-1] in TRAILING_PUNCTUATION_CHARS) |
|
|
|
|
| def get_weighted_length(text: str) -> float: |
| """ |
| Proportional-font heuristic: sum character width weights instead of counting chars. |
| Narrow chars (i, l, 1, punctuation) get < 1.0; wide chars (W, M, w) get > 1.0. |
| Used by HybridWordSegmenter.convert_line_to_word_level for better blind estimation. |
| """ |
| width = 0.0 |
| weights = { |
| "i": 0.4, |
| "l": 0.4, |
| "1": 0.4, |
| "t": 0.6, |
| "j": 0.4, |
| ".": 0.3, |
| ",": 0.3, |
| "!": 0.3, |
| "'": 0.3, |
| "W": 1.3, |
| "M": 1.3, |
| "m": 1.3, |
| "w": 1.2, |
| "@": 1.2, |
| "%": 1.2, |
| " ": 0.5, |
| } |
| for char in text: |
| base = 1.1 if char.isupper() else 1.0 |
| width += weights.get(char, base) |
| return width |
|
|
|
|
| def _sanitize_filename(filename: str, max_length: int = 100) -> str: |
| """ |
| Sanitizes a string to be used as a valid filename. |
| Removes or replaces invalid characters for Windows/Linux file systems. |
| |
| Args: |
| filename: The string to sanitize |
| max_length: Maximum length of the sanitized filename |
| |
| Returns: |
| A sanitized string safe for use in file names |
| """ |
| if not filename: |
| return "unnamed" |
|
|
| |
| sanitized = filename.replace(" ", "_") |
|
|
| |
| |
| invalid_chars = '<>:"/\\|?*' |
| for char in invalid_chars: |
| sanitized = sanitized.replace(char, "_") |
|
|
| |
| sanitized = "".join( |
| char for char in sanitized if ord(char) >= 32 or char in "\n\r\t" |
| ) |
|
|
| |
| sanitized = sanitized.strip(". ") |
|
|
| |
| while "__" in sanitized: |
| sanitized = sanitized.replace("__", "_") |
|
|
| |
| if len(sanitized) > max_length: |
| sanitized = sanitized[:max_length] |
|
|
| |
| if not sanitized: |
| sanitized = "unnamed" |
|
|
| return sanitized |
|
|
|
|
| class AdaptiveSegmenter: |
| """ |
| Line to word segmentation pipeline. It features: |
| 1. Adaptive Thresholding. |
| 2. Targeted Noise Removal using Connected Component Analysis. |
| 3. The robust two-stage adaptive search (Valley -> Kernel). |
| 4. CCA for final pixel-perfect refinement. |
| """ |
|
|
| def __init__(self, output_folder: str = OUTPUT_FOLDER): |
| self.output_folder = output_folder |
| self.fallback_segmenter = HybridWordSegmenter() |
|
|
| def _correct_orientation( |
| self, gray_image: np.ndarray |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| Detects and corrects 90-degree orientation issues. |
| """ |
| h, w = gray_image.shape |
| center = (w // 2, h // 2) |
|
|
| block_size = 21 |
| if h < block_size: |
| block_size = h if h % 2 != 0 else h - 1 |
|
|
| if block_size > 3: |
| binary = cv2.adaptiveThreshold( |
| gray_image, |
| 255, |
| cv2.ADAPTIVE_THRESH_GAUSSIAN_C, |
| cv2.THRESH_BINARY_INV, |
| block_size, |
| 4, |
| ) |
| else: |
| _, binary = cv2.threshold( |
| gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU |
| ) |
|
|
| opening_kernel = np.ones((2, 2), np.uint8) |
| binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, opening_kernel) |
|
|
| coords = np.column_stack(np.where(binary > 0)) |
| if len(coords) < 50: |
| M_orient = cv2.getRotationMatrix2D(center, 0, 1.0) |
| return gray_image, M_orient |
|
|
| ymin, xmin = coords.min(axis=0) |
| ymax, xmax = coords.max(axis=0) |
| box_height = ymax - ymin |
| box_width = xmax - xmin |
|
|
| orientation_angle = 0.0 |
| if box_height > box_width: |
| orientation_angle = 90.0 |
| else: |
| M_orient = cv2.getRotationMatrix2D(center, 0, 1.0) |
| return gray_image, M_orient |
|
|
| M_orient = cv2.getRotationMatrix2D(center, orientation_angle, 1.0) |
| new_w, new_h = h, w |
| M_orient[0, 2] += (new_w - w) / 2 |
| M_orient[1, 2] += (new_h - h) / 2 |
|
|
| oriented_gray = cv2.warpAffine( |
| gray_image, |
| M_orient, |
| (new_w, new_h), |
| flags=cv2.INTER_CUBIC, |
| borderMode=cv2.BORDER_REPLICATE, |
| ) |
|
|
| return oriented_gray, M_orient |
|
|
| def _skew_angle_from_baseline(self, binary: np.ndarray) -> float: |
| """ |
| Estimate skew angle from the text baseline using bottom points of foreground |
| and Hough line transform. More stable than minAreaRect for short words or |
| lines with ascenders/descenders (e.g. "all"). Returns correction angle in |
| degrees, or None if baseline cannot be reliably estimated. |
| """ |
| h, w = binary.shape |
| |
| bottom_points = [] |
| for x in range(w): |
| col = binary[:, x] |
| on_pixels = np.where(col > 0)[0] |
| if len(on_pixels) > 0: |
| y_bottom = int(np.max(on_pixels)) |
| bottom_points.append((x, y_bottom)) |
| if len(bottom_points) < SKEW_BASELINE_MIN_POINTS: |
| return None |
| |
| baseline_img = np.zeros((h, w), dtype=np.uint8) |
| for x, y in bottom_points: |
| baseline_img[y, x] = 255 |
| |
| kernel = np.ones((2, 2), np.uint8) |
| baseline_img = cv2.dilate(baseline_img, kernel) |
| lines = cv2.HoughLines( |
| baseline_img, |
| rho=1, |
| theta=np.pi / 180, |
| threshold=SKEW_HOUGH_THRESHOLD, |
| ) |
| if lines is None or len(lines) == 0: |
| return None |
| |
| best_angle = None |
| best_score = 0 |
| dist_thresh = max(2, h // 30) |
| for line in lines: |
| rho, theta = line[0] |
| |
| |
| correction_deg = 90.0 - np.degrees(theta) |
| |
| if correction_deg > 90: |
| correction_deg -= 180 |
| elif correction_deg < -90: |
| correction_deg += 180 |
| score = 0 |
| for x, y in bottom_points: |
| |
| d = abs(x * np.cos(theta) + y * np.sin(theta) - rho) |
| if d <= dist_thresh: |
| score += 1 |
| if score > best_score: |
| best_score = score |
| best_angle = correction_deg |
| if best_angle is None: |
| return None |
| return float(best_angle) |
|
|
| def _skew_angle_from_min_area_rect( |
| self, coords: np.ndarray, w: int, h: int |
| ) -> float: |
| """Fallback: skew angle from minAreaRect of all foreground pixels.""" |
| if len(coords) < 50: |
| return 0.0 |
| rect = cv2.minAreaRect(coords[:, ::-1]) |
| rect_width, rect_height = rect[1] |
| angle = rect[2] |
| if rect_width < rect_height: |
| angle += 90 |
| if angle > 45: |
| angle -= 90 |
| elif angle < -45: |
| angle += 90 |
| return float(angle) |
|
|
| def _deskew_image(self, gray_image: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| Detects skew using baseline (Hough on bottom points of letters) when possible, |
| which is more stable for short words and ascenders/descenders; falls back to |
| minAreaRect otherwise. |
| """ |
| h, w = gray_image.shape |
|
|
| block_size = 21 |
| if h < block_size: |
| block_size = h if h % 2 != 0 else h - 1 |
|
|
| if block_size > 3: |
| binary = cv2.adaptiveThreshold( |
| gray_image, |
| 255, |
| cv2.ADAPTIVE_THRESH_GAUSSIAN_C, |
| cv2.THRESH_BINARY_INV, |
| block_size, |
| 4, |
| ) |
| else: |
| _, binary = cv2.threshold( |
| gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU |
| ) |
|
|
| opening_kernel = np.ones((2, 2), np.uint8) |
| binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, opening_kernel) |
|
|
| coords = np.column_stack(np.where(binary > 0)) |
| if len(coords) < 50: |
| M = cv2.getRotationMatrix2D((w // 2, h // 2), 0, 1.0) |
| return gray_image, M |
|
|
| |
| correction_angle = self._skew_angle_from_baseline(binary) |
| if correction_angle is None: |
| correction_angle = self._skew_angle_from_min_area_rect(coords, w, h) |
|
|
| if abs(correction_angle) < MIN_SKEW_THRESHOLD: |
| correction_angle = 0.0 |
| elif abs(correction_angle) > MAX_SKEW_THRESHOLD: |
| correction_angle = 0.0 |
|
|
| center = (w // 2, h // 2) |
| M = cv2.getRotationMatrix2D(center, correction_angle, 1.0) |
|
|
| deskewed_gray = cv2.warpAffine( |
| gray_image, |
| M, |
| (w, h), |
| flags=cv2.INTER_CUBIC, |
| borderMode=cv2.BORDER_REPLICATE, |
| ) |
|
|
| return deskewed_gray, M |
|
|
| def _estimate_quick_skew_degrees(self, gray_image: np.ndarray) -> float: |
| """Cheap skew estimate used to skip expensive orientation/deskew when safe.""" |
| if gray_image is None or gray_image.size == 0: |
| return 0.0 |
| h, w = gray_image.shape[:2] |
| if h < 8 or w < 8: |
| return 0.0 |
| _, quick_bin = cv2.threshold( |
| gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU |
| ) |
| coords = np.column_stack(np.where(quick_bin > 0)) |
| if len(coords) < 30: |
| return 0.0 |
| rect = cv2.minAreaRect(coords[:, ::-1]) |
| rect_w, rect_h = rect[1] |
| angle = float(rect[2]) |
| if rect_w < rect_h: |
| angle += 90.0 |
| if angle > 45.0: |
| angle -= 90.0 |
| elif angle < -45.0: |
| angle += 90.0 |
| return angle |
|
|
| def _can_skip_expensive_preprocess(self, gray_image: np.ndarray) -> bool: |
| """Return True when line is already horizontal enough for direct segmentation.""" |
| if gray_image is None or gray_image.size == 0: |
| return True |
| h, w = gray_image.shape[:2] |
| if h <= 0 or w <= 0: |
| return True |
| if w <= int(h * 1.2): |
| return False |
| skew = self._estimate_quick_skew_degrees(gray_image) |
| return abs(skew) < 1.2 |
|
|
| def _get_boxes_from_profile( |
| self, |
| binary_image: np.ndarray, |
| stable_avg_char_width: float, |
| min_space_factor: float, |
| valley_threshold_factor: float, |
| ) -> List: |
| """ |
| Extracts word bounding boxes from vertical projection profile. |
| """ |
| img_h, img_w = binary_image.shape |
| vertical_projection = np.sum(binary_image, axis=0) |
| peaks = vertical_projection[vertical_projection > 0] |
| if len(peaks) == 0: |
| return [] |
| avg_peak_height = np.mean(peaks) |
| valley_threshold = int(avg_peak_height * valley_threshold_factor) |
| min_space_width = int(stable_avg_char_width * min_space_factor) |
|
|
| patched_projection = vertical_projection.copy() |
| in_gap = False |
| gap_start = 0 |
|
|
| for x, col_sum in enumerate(patched_projection): |
| if col_sum <= valley_threshold and not in_gap: |
| in_gap = True |
| gap_start = x |
| elif col_sum > valley_threshold and in_gap: |
| in_gap = False |
| if (x - gap_start) < min_space_width: |
| patched_projection[gap_start:x] = int(avg_peak_height) |
|
|
| unlabeled_boxes = [] |
| in_word = False |
| start_x = 0 |
| for x, col_sum in enumerate(patched_projection): |
| if col_sum > valley_threshold and not in_word: |
| start_x = x |
| in_word = True |
| elif col_sum <= valley_threshold and in_word: |
| |
| unlabeled_boxes.append((start_x, 0, x - start_x, img_h)) |
| in_word = False |
| if in_word: |
| unlabeled_boxes.append((start_x, 0, img_w - start_x, img_h)) |
| return unlabeled_boxes |
|
|
| def _enforce_logical_constraints( |
| self, output: Dict[str, List], image_width: int, image_height: int |
| ) -> Dict[str, List]: |
| """ |
| Enforces geometric sanity checks with 2D awareness. |
| """ |
| if not output or not output["text"]: |
| return output |
|
|
| num_items = len(output["text"]) |
| boxes = [] |
| for i in range(num_items): |
| boxes.append( |
| { |
| "text": output["text"][i], |
| "left": int(output["left"][i]), |
| "top": int(output["top"][i]), |
| "width": int(output["width"][i]), |
| "height": int(output["height"][i]), |
| "conf": output["conf"][i], |
| } |
| ) |
|
|
| valid_boxes = [] |
| for box in boxes: |
| x0 = max(0, box["left"]) |
| y0 = max(0, box["top"]) |
| x1 = min(image_width, box["left"] + box["width"]) |
| y1 = min(image_height, box["top"] + box["height"]) |
|
|
| w = x1 - x0 |
| h = y1 - y0 |
|
|
| if w > 0 and h > 0: |
| box["left"] = x0 |
| box["top"] = y0 |
| box["width"] = w |
| box["height"] = h |
| valid_boxes.append(box) |
| boxes = valid_boxes |
|
|
| is_vertical = image_height > (image_width * 1.2) |
| if is_vertical: |
| boxes.sort(key=lambda b: (b["top"], b["left"])) |
| else: |
| boxes.sort(key=lambda b: (b["left"], -b["width"])) |
|
|
| final_pass_boxes = [] |
| if boxes: |
| keep_indices = [True] * len(boxes) |
| |
| |
| for i in range(len(boxes) - 1): |
| b1 = boxes[i] |
| b2 = boxes[i + 1] |
| x_nested = (b1["left"] >= b2["left"] - 2) and ( |
| b1["left"] + b1["width"] <= b2["left"] + b2["width"] + 2 |
| ) |
| y_nested = (b1["top"] >= b2["top"] - 2) and ( |
| b1["top"] + b1["height"] <= b2["top"] + b2["height"] + 2 |
| ) |
| if x_nested and y_nested and b1["text"] == b2["text"]: |
| if b1["width"] * b1["height"] <= b2["width"] * b2["height"]: |
| keep_indices[i] = False |
| |
| x_nested_rev = (b2["left"] >= b1["left"] - 2) and ( |
| b2["left"] + b2["width"] <= b1["left"] + b1["width"] + 2 |
| ) |
| y_nested_rev = (b2["top"] >= b1["top"] - 2) and ( |
| b2["top"] + b2["height"] <= b1["top"] + b1["height"] + 2 |
| ) |
| if x_nested_rev and y_nested_rev and b1["text"] == b2["text"]: |
| if b2["width"] * b2["height"] <= b1["width"] * b1["height"]: |
| keep_indices[i + 1] = False |
|
|
| for i, keep in enumerate(keep_indices): |
| if keep: |
| final_pass_boxes.append(boxes[i]) |
|
|
| boxes = final_pass_boxes |
|
|
| if is_vertical: |
| boxes.sort(key=lambda b: (b["top"], b["left"])) |
| else: |
| boxes.sort(key=lambda b: (b["left"], -b["width"])) |
|
|
| for i in range(len(boxes) - 1): |
| b1 = boxes[i] |
| b2 = boxes[i + 1] |
| x_overlap = min(b1["left"] + b1["width"], b2["left"] + b2["width"]) - max( |
| b1["left"], b2["left"] |
| ) |
| y_overlap = min(b1["top"] + b1["height"], b2["top"] + b2["height"]) - max( |
| b1["top"], b2["top"] |
| ) |
|
|
| if x_overlap > 0 and y_overlap > 0: |
| if is_vertical: |
| if b1["top"] < b2["top"]: |
| b1["height"] = max(1, b2["top"] - b1["top"]) |
| else: |
| if b1["left"] < b2["left"]: |
| b1_right = b1["left"] + b1["width"] |
| b2_right = b2["left"] + b2["width"] |
| left_slice_width = max(0, b2["left"] - b1["left"]) |
| right_slice_width = max(0, b1_right - b2_right) |
| if b1_right > b2_right and right_slice_width > left_slice_width: |
| b1["left"] = b2_right |
| b1["width"] = right_slice_width |
| else: |
| b1["width"] = max(1, left_slice_width) |
|
|
| cleaned_output = { |
| k: [] for k in ["text", "left", "top", "width", "height", "conf"] |
| } |
| if is_vertical: |
| boxes.sort(key=lambda b: (b["top"], b["left"])) |
| else: |
| boxes.sort(key=lambda b: (b["left"], -b["width"])) |
|
|
| for box in boxes: |
| for key in cleaned_output.keys(): |
| cleaned_output[key].append(box[key]) |
|
|
| return cleaned_output |
|
|
| def _is_geometry_valid( |
| self, |
| boxes: List[Tuple[int, int, int, int]], |
| words: List[str], |
| expected_height: float = 0, |
| ) -> bool: |
| """ |
| Validates if the detected boxes are physically plausible. |
| [FIX] Improved robustness for punctuation and mixed-case text. |
| """ |
| if len(boxes) != len(words): |
| return False |
|
|
| baseline = expected_height |
| |
| if baseline < 5: |
| heights = [b[3] for b in boxes] |
| if heights: |
| baseline = np.median(heights) |
|
|
| if baseline < 5: |
| return True |
|
|
| for i, box in enumerate(boxes): |
| word = words[i] |
|
|
| |
| |
| is_punctuation = not any(c.isalnum() for c in word) |
| if is_punctuation: |
| continue |
|
|
| |
| num_chars = len(word) |
| if num_chars < 1: |
| continue |
|
|
| width = box[2] |
| height = box[3] |
|
|
| |
| |
| |
| if height < (baseline * 0.20): |
| return False |
|
|
| avg_char_width = width / num_chars |
| min_expected = baseline * 0.20 |
|
|
| |
| if avg_char_width < min_expected and avg_char_width < 4: |
| |
| if num_chars == 1 and avg_char_width >= 2: |
| continue |
| return False |
|
|
| return True |
|
|
| def _estimate_noise(self, gray: np.ndarray) -> float: |
| """ |
| Estimate image noisiness using Laplacian variance. Noisy images tend to have |
| high high-frequency content, so higher values indicate more noise (or very |
| sharp edges). Used to skip the primary segmentation pipeline when above |
| NOISE_THRESHOLD and use the fallback segmenter instead. |
| """ |
| if gray is None or gray.size == 0: |
| return 0.0 |
| lap = cv2.Laplacian(gray, cv2.CV_64F, ksize=3) |
| return float(lap.var()) |
|
|
| def _block_size_from_median_cc_height( |
| self, gray: np.ndarray, line_height: int, fallback_block_size: int |
| ) -> int: |
| """ |
| Determine adaptive threshold block size from median height of connected components |
| (resolution-independent). Uses an Otsu pre-pass to get CCs; if median height is |
| valid, returns block_size = median_cc_height * BLOCK_SIZE_MEDIAN_CC_FACTOR. |
| Otherwise returns fallback_block_size (e.g. from line_height). |
| """ |
| if gray is None or gray.size == 0 or line_height < 3: |
| return fallback_block_size |
| _, otsu_binary = cv2.threshold( |
| gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU |
| ) |
| num_labels, _, stats, _ = cv2.connectedComponentsWithStats( |
| otsu_binary, 8, cv2.CV_32S |
| ) |
| if num_labels < 3: |
| return fallback_block_size |
| areas = stats[1:, cv2.CC_STAT_AREA] |
| heights = stats[1:, cv2.CC_STAT_HEIGHT] |
| min_area_cc = max(2, int((line_height * 0.02) ** 2)) |
| valid = areas >= min_area_cc |
| if not np.any(valid): |
| return fallback_block_size |
| median_h = np.median(heights[valid]) |
| if np.isnan(median_h) or median_h < 2: |
| return fallback_block_size |
| block = max(3, int(median_h * BLOCK_SIZE_MEDIAN_CC_FACTOR)) |
| if block % 2 == 0: |
| block += 1 |
| return block |
|
|
| def _normalize_polarity_for_binarization(self, gray: np.ndarray) -> np.ndarray: |
| """ |
| Ensure we work with dark-text-on-light-background for binarization. If the |
| image is mostly dark (light text on dark background), invert it so that |
| adaptive threshold and projection profile logic behave correctly. |
| |
| Uses corner/edge regions to estimate background (typical in documents); |
| falls back to global mean for very small or full-page line crops. |
| """ |
| if gray is None or gray.size == 0: |
| return gray |
| h, w = gray.shape |
| frac = POLARITY_CORNER_FRACTION |
| |
| margin_w = max(1, int(w * frac)) |
| margin_h = max(1, int(h * frac)) |
| corner_pixels = [] |
| if margin_w < w and margin_h < h: |
| top_left = gray[:margin_h, :margin_w] |
| top_right = gray[:margin_h, -margin_w:] |
| bottom_left = gray[-margin_h:, :margin_w] |
| bottom_right = gray[-margin_h:, -margin_w:] |
| for region in (top_left, top_right, bottom_left, bottom_right): |
| corner_pixels.append(region.ravel()) |
| if corner_pixels: |
| corner_pixels = np.concatenate(corner_pixels) |
| background_mean = float(np.mean(corner_pixels)) |
| else: |
| background_mean = float(np.mean(gray)) |
| else: |
| background_mean = float(np.mean(gray)) |
| if background_mean < POLARITY_MEAN_THRESHOLD: |
| return cv2.bitwise_not(gray) |
| return gray |
|
|
| def segment( |
| self, |
| line_data: Dict[str, List], |
| line_image: np.ndarray, |
| min_space_factor=MIN_SPACE_FACTOR, |
| match_tolerance=MATCH_TOLERANCE, |
| image_name: str = None, |
| ) -> Tuple[Dict[str, List], bool]: |
| if ( |
| line_image is None |
| or not isinstance(line_image, np.ndarray) |
| or line_image.size == 0 |
| ): |
| return ({}, False) |
| |
| if len(line_image.shape) < 2: |
| return ({}, False) |
| if not line_data or not line_data.get("text") or len(line_data["text"]) == 0: |
| return ({}, False) |
|
|
| line_text = line_data["text"][0] |
| words = line_text.split() |
|
|
| |
| if len(words) <= 1: |
| img_h, img_w = line_image.shape[:2] |
| one_word_result = self.fallback_segmenter.convert_line_to_word_level( |
| line_data, img_w, img_h |
| ) |
| return (one_word_result, False) |
|
|
| |
| if line_image is None or line_image.size == 0 or len(line_image.shape) < 2: |
| |
| return {}, False |
|
|
| line_number = line_data["line"][0] |
| safe_image_name = "image" |
| safe_line_number = str(line_number) |
| safe_shortened_line_text = "line" |
| if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: |
| safe_image_name = _sanitize_filename(image_name or "image", max_length=50) |
| safe_line_number = _sanitize_filename(str(line_number), max_length=10) |
| safe_shortened_line_text = _sanitize_filename(line_text, max_length=10) |
|
|
| if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: |
| os.makedirs(self.output_folder, exist_ok=True) |
| output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_line_number}_{safe_shortened_line_text}_original.png" |
| os.makedirs(f"{self.output_folder}/word_segmentation", exist_ok=True) |
| |
| if line_image.size > 0 and len(line_image.shape) >= 2: |
| cv2.imwrite(output_path, line_image) |
|
|
| if len(line_image.shape) == 3: |
| gray = cv2.cvtColor(line_image, cv2.COLOR_BGR2GRAY) |
| else: |
| gray = line_image.copy() |
|
|
| |
| |
| |
| if self._can_skip_expensive_preprocess(gray): |
| h, w = gray.shape[:2] |
| deskewed_gray = gray |
| deskewed_line_image = line_image.copy() |
| M = np.array([[1.0, 0.0, 0.0], [0.0, 1.0, 0.0]], dtype=np.float32) |
| else: |
| oriented_gray, M_orient = self._correct_orientation(gray) |
| deskewed_gray, M_skew = self._deskew_image(oriented_gray) |
|
|
| |
| M_orient_3x3 = np.vstack([M_orient, [0, 0, 1]]) |
| M_skew_3x3 = np.vstack([M_skew, [0, 0, 1]]) |
| M_total_3x3 = M_skew_3x3 @ M_orient_3x3 |
| M = M_total_3x3[0:2, :] |
|
|
| |
| h, w = deskewed_gray.shape |
| deskewed_line_image = cv2.warpAffine( |
| line_image, |
| M, |
| (w, h), |
| flags=cv2.INTER_CUBIC, |
| borderMode=cv2.BORDER_REPLICATE, |
| ) |
|
|
| |
| |
| local_line_data = { |
| "text": line_data["text"], |
| "conf": line_data["conf"], |
| "left": [0], |
| "top": [0], |
| "width": [w], |
| "height": [h], |
| "line": line_data.get("line", [0]), |
| } |
|
|
| if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: |
| os.makedirs(self.output_folder, exist_ok=True) |
| output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_line_number}_{safe_shortened_line_text}_deskewed.png" |
| cv2.imwrite(output_path, deskewed_line_image) |
|
|
| |
| |
| |
| approx_char_count = len(line_data["text"][0].replace(" ", "")) |
| if approx_char_count == 0: |
| return {}, False |
|
|
| img_h, img_w = deskewed_gray.shape |
| line_height = img_h |
| estimated_char_height = img_h * 0.6 |
| avg_char_width_approx = img_w / approx_char_count |
|
|
| |
| block_size = max(3, int(line_height * BLOCK_SIZE_FACTOR)) |
| if block_size % 2 == 0: |
| block_size += 1 |
|
|
| |
| effective_noise_threshold = NOISE_THRESHOLD * ( |
| line_height / REFERENCE_LINE_HEIGHT |
| ) |
|
|
| |
| noise_level = self._estimate_noise(deskewed_gray) |
| if noise_level > effective_noise_threshold: |
| used_fallback = True |
| final_output = self.fallback_segmenter.refine_words_bidirectional( |
| local_line_data, deskewed_line_image |
| ) |
| else: |
| |
| gray_for_binary = self._normalize_polarity_for_binarization(deskewed_gray) |
|
|
| |
| block_size = self._block_size_from_median_cc_height( |
| gray_for_binary, line_height, block_size |
| ) |
|
|
| |
| binary_adaptive = cv2.adaptiveThreshold( |
| gray_for_binary, |
| 255, |
| cv2.ADAPTIVE_THRESH_GAUSSIAN_C, |
| cv2.THRESH_BINARY_INV, |
| block_size, |
| C_VALUE, |
| ) |
| otsu_thresh_val, _ = cv2.threshold( |
| gray_for_binary, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU |
| ) |
| strict_thresh_val = otsu_thresh_val * 0.75 |
| _, binary_strict = cv2.threshold( |
| gray_for_binary, strict_thresh_val, 255, cv2.THRESH_BINARY_INV |
| ) |
| binary = cv2.bitwise_and(binary_adaptive, binary_strict) |
|
|
| if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: |
| output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_line_number}_{safe_shortened_line_text}_binary.png" |
| cv2.imwrite(output_path, binary) |
|
|
| |
| morph_width = max(3, int(avg_char_width_approx * 0.40)) |
| morph_height = max(2, int(avg_char_width_approx * 0.1)) |
| kernel = cv2.getStructuringElement( |
| cv2.MORPH_RECT, (morph_width, morph_height) |
| ) |
| closed_binary = cv2.morphologyEx( |
| binary, cv2.MORPH_CLOSE, kernel, iterations=1 |
| ) |
|
|
| |
| num_labels, labels, stats, _ = cv2.connectedComponentsWithStats( |
| closed_binary, 8, cv2.CV_32S |
| ) |
| clean_binary = np.zeros_like(binary) |
|
|
| force_fallback = False |
| significant_labels = 0 |
| if num_labels > 1: |
| |
| significant_labels = np.sum(stats[1:, cv2.CC_STAT_AREA] > 3) |
|
|
| if approx_char_count > 0 and significant_labels > (approx_char_count * 12): |
| force_fallback = True |
|
|
| if num_labels > 1: |
| areas = stats[1:, cv2.CC_STAT_AREA] |
| if len(areas) == 0: |
| clean_binary = binary |
| areas = np.array([0]) |
| else: |
| p1 = np.percentile(areas, 1) |
| img_h, img_w = binary.shape |
| line_h = img_h |
| estimated_char_height = img_h * 0.7 |
| |
| min_area_threshold = max( |
| MIN_AREA_FLOOR, |
| int((line_h * MIN_AREA_HEIGHT_FRACTION) ** 2), |
| ) |
| estimated_min_letter_area = max( |
| 2, |
| int(estimated_char_height * 0.2 * estimated_char_height * 0.15), |
| ) |
| area_threshold = max( |
| min_area_threshold, min(p1, estimated_min_letter_area) |
| ) |
|
|
| |
| sorted_areas = np.sort(areas) |
| area_diffs = np.diff(sorted_areas) |
| if len(sorted_areas) > 10 and len(area_diffs) > 0: |
| jump_threshold = np.percentile(area_diffs, 95) |
| significant_jump_thresh = max(10, jump_threshold * 3) |
| jump_indices = np.where(area_diffs > significant_jump_thresh)[0] |
| if len(jump_indices) > 0: |
| gap_idx = jump_indices[0] |
| area_before_gap = sorted_areas[gap_idx] |
| final_threshold = max(area_before_gap + 1, area_threshold) |
| final_threshold = min(final_threshold, 15) |
| area_threshold = final_threshold |
|
|
| for i in range(1, num_labels): |
| if stats[i, cv2.CC_STAT_AREA] >= area_threshold: |
| clean_binary[labels == i] = 255 |
| else: |
| clean_binary = binary |
|
|
| |
| if ( |
| clean_binary is None |
| or clean_binary.size == 0 |
| or len(clean_binary.shape) < 2 |
| ): |
| |
| return {}, False |
|
|
| |
| horizontal_projection = np.sum(clean_binary, axis=1) |
| y_start = 0 |
| non_zero_rows = np.where(horizontal_projection > 0)[0] |
| if len(non_zero_rows) > 0: |
| p_top = int(np.percentile(non_zero_rows, 5)) |
| p_bottom = int(np.percentile(non_zero_rows, 95)) |
| core_height = p_bottom - p_top |
| trim_pixels = int(core_height * 0.1) |
| y_start = max(0, p_top + trim_pixels) |
| y_end = min(clean_binary.shape[0], p_bottom - trim_pixels) |
| if y_end - y_start < 5: |
| y_start = p_top |
| y_end = p_bottom |
| |
| if y_end > y_start: |
| analysis_image = clean_binary[y_start:y_end, :] |
| else: |
| |
| analysis_image = clean_binary |
| else: |
| analysis_image = clean_binary |
|
|
| |
| if ( |
| analysis_image is None |
| or analysis_image.size == 0 |
| or len(analysis_image.shape) < 2 |
| ): |
| |
| return {}, False |
|
|
| if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: |
| |
| if analysis_image.size > 0 and len(analysis_image.shape) >= 2: |
| output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_line_number}_{safe_shortened_line_text}_clean_binary.png" |
| cv2.imwrite(output_path, analysis_image) |
|
|
| |
| best_boxes = None |
| successful_binary_image = None |
|
|
| if not force_fallback: |
| words = line_data["text"][0].split() |
| target = len(words) |
| backup_boxes_s1 = None |
| best_stage1_diff = float("inf") |
| best_stage1_factor = INITIAL_VALLEY_THRESHOLD_FACTOR |
|
|
| |
| stage1_coarse = np.arange( |
| INITIAL_VALLEY_THRESHOLD_FACTOR, 0.60, SEARCH_STAGE1_COARSE_STEP |
| ) |
| for v_factor in stage1_coarse: |
| curr_boxes = self._get_boxes_from_profile( |
| analysis_image, |
| avg_char_width_approx, |
| min_space_factor, |
| v_factor, |
| ) |
| diff = abs(target - len(curr_boxes)) |
| is_geom_valid = self._is_geometry_valid( |
| curr_boxes, words, estimated_char_height |
| ) |
| if diff < best_stage1_diff: |
| best_stage1_diff = diff |
| best_stage1_factor = float(v_factor) |
|
|
| if diff == 0: |
| if is_geom_valid: |
| best_boxes = curr_boxes |
| successful_binary_image = analysis_image |
| break |
| else: |
| if backup_boxes_s1 is None: |
| backup_boxes_s1 = curr_boxes |
| if ( |
| diff <= ALLOWED_WORD_MISMATCH_COUNT |
| and backup_boxes_s1 is None |
| and is_geom_valid |
| ): |
| backup_boxes_s1 = curr_boxes |
|
|
| |
| if best_boxes is None: |
| lower = max( |
| INITIAL_VALLEY_THRESHOLD_FACTOR, |
| best_stage1_factor - SEARCH_STAGE1_COARSE_STEP, |
| ) |
| upper = min(0.60, best_stage1_factor + SEARCH_STAGE1_COARSE_STEP) |
| for v_factor in np.arange( |
| lower, upper + 1e-9, SEARCH_STAGE1_FINE_STEP |
| ): |
| curr_boxes = self._get_boxes_from_profile( |
| analysis_image, |
| avg_char_width_approx, |
| min_space_factor, |
| v_factor, |
| ) |
| diff = abs(target - len(curr_boxes)) |
| is_geom_valid = self._is_geometry_valid( |
| curr_boxes, words, estimated_char_height |
| ) |
| if diff == 0 and is_geom_valid: |
| best_boxes = curr_boxes |
| successful_binary_image = analysis_image |
| break |
| if ( |
| diff <= ALLOWED_WORD_MISMATCH_COUNT |
| and backup_boxes_s1 is None |
| and is_geom_valid |
| ): |
| backup_boxes_s1 = curr_boxes |
|
|
| |
| if best_boxes is None: |
| backup_boxes_s2 = None |
| best_stage2_diff = float("inf") |
| best_stage2_factor = INITIAL_KERNEL_WIDTH_FACTOR |
| for k_factor in np.arange( |
| INITIAL_KERNEL_WIDTH_FACTOR, 0.5, SEARCH_STAGE2_COARSE_STEP |
| ): |
| k_w = max(1, int(avg_char_width_approx * k_factor)) |
| s2_bin = cv2.morphologyEx( |
| clean_binary, cv2.MORPH_CLOSE, np.ones((1, k_w), np.uint8) |
| ) |
| s2_img = ( |
| s2_bin[y_start:y_end, :] |
| if len(non_zero_rows) > 0 |
| else s2_bin |
| ) |
|
|
| if s2_img is None or s2_img.size == 0: |
| continue |
|
|
| curr_boxes = self._get_boxes_from_profile( |
| s2_img, |
| avg_char_width_approx, |
| min_space_factor, |
| MAIN_VALLEY_THRESHOLD_FACTOR, |
| ) |
| diff = abs(target - len(curr_boxes)) |
| if diff < best_stage2_diff: |
| best_stage2_diff = diff |
| best_stage2_factor = float(k_factor) |
| is_geom_valid = self._is_geometry_valid( |
| curr_boxes, words, estimated_char_height |
| ) |
|
|
| if diff == 0 and is_geom_valid: |
| best_boxes = curr_boxes |
| successful_binary_image = s2_bin |
| break |
|
|
| if ( |
| diff <= ALLOWED_WORD_MISMATCH_COUNT |
| and backup_boxes_s2 is None |
| and is_geom_valid |
| ): |
| backup_boxes_s2 = curr_boxes |
|
|
| if best_boxes is None: |
| lower = max( |
| INITIAL_KERNEL_WIDTH_FACTOR, |
| best_stage2_factor - SEARCH_STAGE2_COARSE_STEP, |
| ) |
| upper = min(0.5, best_stage2_factor + SEARCH_STAGE2_COARSE_STEP) |
| for k_factor in np.arange( |
| lower, upper + 1e-9, SEARCH_STAGE2_FINE_STEP |
| ): |
| k_w = max(1, int(avg_char_width_approx * k_factor)) |
| s2_bin = cv2.morphologyEx( |
| clean_binary, |
| cv2.MORPH_CLOSE, |
| np.ones((1, k_w), np.uint8), |
| ) |
| s2_img = ( |
| s2_bin[y_start:y_end, :] |
| if len(non_zero_rows) > 0 |
| else s2_bin |
| ) |
| if s2_img is None or s2_img.size == 0: |
| continue |
| curr_boxes = self._get_boxes_from_profile( |
| s2_img, |
| avg_char_width_approx, |
| min_space_factor, |
| MAIN_VALLEY_THRESHOLD_FACTOR, |
| ) |
| diff = abs(target - len(curr_boxes)) |
| is_geom_valid = self._is_geometry_valid( |
| curr_boxes, words, estimated_char_height |
| ) |
| if diff == 0 and is_geom_valid: |
| best_boxes = curr_boxes |
| successful_binary_image = s2_bin |
| break |
| if ( |
| diff <= ALLOWED_WORD_MISMATCH_COUNT |
| and backup_boxes_s2 is None |
| and is_geom_valid |
| ): |
| backup_boxes_s2 = curr_boxes |
|
|
| if best_boxes is None: |
| if backup_boxes_s1 is not None: |
| best_boxes = backup_boxes_s1 |
| successful_binary_image = analysis_image |
| elif backup_boxes_s2 is not None: |
| best_boxes = backup_boxes_s2 |
| successful_binary_image = clean_binary |
|
|
| final_output = None |
| used_fallback = False |
|
|
| if best_boxes is None: |
| |
| used_fallback = True |
| |
| final_output = self.fallback_segmenter.refine_words_bidirectional( |
| local_line_data, deskewed_line_image |
| ) |
| else: |
| |
| unlabeled_boxes = best_boxes |
| if successful_binary_image is analysis_image: |
| cca_source_image = clean_binary |
| else: |
| cca_source_image = successful_binary_image |
|
|
| num_labels, _, stats, _ = cv2.connectedComponentsWithStats( |
| cca_source_image, 8, cv2.CV_32S |
| ) |
| cca_img_h, cca_img_w = cca_source_image.shape[:2] |
|
|
| component_assignments = {} |
| num_proc = min(len(words), len(unlabeled_boxes)) |
| min_valid_component_area = estimated_char_height * 2 |
| box_meta = [] |
| for i in range(num_proc): |
| box_x, box_y, box_w, box_h = unlabeled_boxes[i] |
| box_r = box_x + box_w |
| box_center_x = box_x + box_w / 2 |
| box_meta.append((i, box_x, box_r, box_center_x, box_w)) |
| box_starts = [meta[1] for meta in box_meta] |
|
|
| for j in range(1, num_labels): |
| comp_x = stats[j, cv2.CC_STAT_LEFT] |
| comp_w = stats[j, cv2.CC_STAT_WIDTH] |
| comp_area = stats[j, cv2.CC_STAT_AREA] |
| comp_r = comp_x + comp_w |
| comp_center_x = comp_x + comp_w / 2 |
| comp_y = stats[j, cv2.CC_STAT_TOP] |
| comp_h = stats[j, cv2.CC_STAT_HEIGHT] |
| comp_center_y = comp_y + comp_h / 2 |
|
|
| if ( |
| comp_center_y < cca_img_h * 0.1 |
| or comp_center_y > cca_img_h * 0.9 |
| ): |
| continue |
| if comp_area < min_valid_component_area: |
| continue |
|
|
| best_box_idx = None |
| max_overlap = 0 |
| best_center_distance = float("inf") |
| component_center_in_box = False |
|
|
| |
| |
| left_search = max(0, comp_x - comp_w) |
| right_search = comp_r + comp_w |
| start_idx = bisect_left(box_starts, left_search) |
| idx = start_idx |
| while idx < len(box_meta) and box_meta[idx][1] <= right_search: |
| i, box_x, box_r, box_center_x, box_w = box_meta[idx] |
| idx += 1 |
| if comp_w > box_w * 1.5: |
| continue |
|
|
| if comp_x < box_r and box_x < comp_r: |
| overlap_start = max(comp_x, box_x) |
| overlap_end = min(comp_r, box_r) |
| overlap = overlap_end - overlap_start |
|
|
| if overlap > 0: |
| center_in_box = box_x <= comp_center_x < box_r |
| center_distance = abs(comp_center_x - box_center_x) |
|
|
| if center_in_box: |
| if ( |
| not component_center_in_box |
| or overlap > max_overlap |
| ): |
| component_center_in_box = True |
| best_center_distance = center_distance |
| max_overlap = overlap |
| best_box_idx = i |
| elif not component_center_in_box: |
| if center_distance < best_center_distance or ( |
| center_distance == best_center_distance |
| and overlap > max_overlap |
| ): |
| best_center_distance = center_distance |
| max_overlap = overlap |
| best_box_idx = i |
|
|
| if best_box_idx is not None: |
| component_assignments[j] = best_box_idx |
|
|
| refined_boxes_list = [] |
| for i in range(num_proc): |
| word_label = words[i] |
| components_in_box = [ |
| stats[j] for j, b in component_assignments.items() if b == i |
| ] |
|
|
| use_original_box = False |
| if not components_in_box: |
| use_original_box = True |
| else: |
| min_x = min(c[cv2.CC_STAT_LEFT] for c in components_in_box) |
| min_y = min(c[cv2.CC_STAT_TOP] for c in components_in_box) |
| max_r = max( |
| c[cv2.CC_STAT_LEFT] + c[cv2.CC_STAT_WIDTH] |
| for c in components_in_box |
| ) |
| max_b = max( |
| c[cv2.CC_STAT_TOP] + c[cv2.CC_STAT_HEIGHT] |
| for c in components_in_box |
| ) |
| cca_h = max(1, max_b - min_y) |
| if cca_h < (estimated_char_height * 0.35): |
| use_original_box = True |
|
|
| if use_original_box: |
| box_x, box_y, box_w, box_h = unlabeled_boxes[i] |
| adjusted_box_y = y_start + box_y |
| refined_boxes_list.append( |
| { |
| "text": word_label, |
| "left": box_x, |
| "top": adjusted_box_y, |
| "width": box_w, |
| "height": box_h, |
| "conf": line_data["conf"][0], |
| } |
| ) |
| else: |
| refined_boxes_list.append( |
| { |
| "text": word_label, |
| "left": min_x, |
| "top": min_y, |
| "width": max(1, max_r - min_x), |
| "height": cca_h, |
| "conf": line_data["conf"][0], |
| } |
| ) |
|
|
| |
| cca_check_list = [ |
| (b["left"], b["top"], b["width"], b["height"]) |
| for b in refined_boxes_list |
| ] |
| if not self._is_geometry_valid( |
| cca_check_list, words, estimated_char_height |
| ): |
| if abs(len(refined_boxes_list) - len(words)) > 1: |
| best_boxes = None |
| else: |
| final_output = { |
| k: [] |
| for k in ["text", "left", "top", "width", "height", "conf"] |
| } |
| for box in refined_boxes_list: |
| for key in final_output.keys(): |
| final_output[key].append(box[key]) |
| else: |
| final_output = { |
| k: [] |
| for k in ["text", "left", "top", "width", "height", "conf"] |
| } |
| for box in refined_boxes_list: |
| for key in final_output.keys(): |
| final_output[key].append(box[key]) |
|
|
| |
| if best_boxes is None and not used_fallback: |
| used_fallback = True |
| |
| final_output = self.fallback_segmenter.refine_words_bidirectional( |
| local_line_data, deskewed_line_image |
| ) |
|
|
| |
| |
| |
| M_inv = cv2.invertAffineTransform(M) |
| remapped_boxes_list = [] |
| for i in range(len(final_output["text"])): |
| left, top = final_output["left"][i], final_output["top"][i] |
| width, height = final_output["width"][i], final_output["height"][i] |
|
|
| |
| corners = np.array( |
| [ |
| [left, top], |
| [left + width, top], |
| [left + width, top + height], |
| [left, top + height], |
| ], |
| dtype="float32", |
| ) |
| corners_expanded = np.expand_dims(corners, axis=1) |
| original_corners = cv2.transform(corners_expanded, M_inv) |
| squeezed_corners = original_corners.squeeze(axis=1) |
|
|
| |
| min_x = int(np.min(squeezed_corners[:, 0])) |
| max_x = int(np.max(squeezed_corners[:, 0])) |
| min_y = int(np.min(squeezed_corners[:, 1])) |
| max_y = int(np.max(squeezed_corners[:, 1])) |
|
|
| remapped_boxes_list.append( |
| { |
| "text": final_output["text"][i], |
| "left": min_x, |
| "top": min_y, |
| "width": max_x - min_x, |
| "height": max_y - min_y, |
| "conf": final_output["conf"][i], |
| } |
| ) |
|
|
| remapped_output = {k: [] for k in final_output.keys()} |
| for box in remapped_boxes_list: |
| for key in remapped_output.keys(): |
| remapped_output[key].append(box[key]) |
|
|
| img_h, img_w = line_image.shape[:2] |
| remapped_output = self._enforce_logical_constraints( |
| remapped_output, img_w, img_h |
| ) |
|
|
| |
| |
| |
| words = line_data["text"][0].split() |
| target_count = len(words) |
| current_count = len(remapped_output["text"]) |
| has_collapsed_boxes = any(w < 3 for w in remapped_output["width"]) |
|
|
| if current_count > 0: |
| total_text_len = sum(len(t) for t in remapped_output["text"]) |
| total_box_width = sum(remapped_output["width"]) |
| avg_width_pixels = total_box_width / max(1, total_text_len) |
| else: |
| avg_width_pixels = 0 |
| is_suspiciously_thin = avg_width_pixels < 4 |
|
|
| if current_count != target_count or is_suspiciously_thin or has_collapsed_boxes: |
| used_fallback = True |
|
|
| |
| |
| |
|
|
| |
| temp_local_output = self.fallback_segmenter.refine_words_bidirectional( |
| local_line_data, deskewed_line_image |
| ) |
|
|
| |
| if len(temp_local_output["text"]) != target_count: |
| h, w = deskewed_line_image.shape[:2] |
| temp_local_output = self.fallback_segmenter.convert_line_to_word_level( |
| local_line_data, w, h |
| ) |
|
|
| |
| |
| remapped_boxes_list = [] |
| for i in range(len(temp_local_output["text"])): |
| left, top = temp_local_output["left"][i], temp_local_output["top"][i] |
| width, height = ( |
| temp_local_output["width"][i], |
| temp_local_output["height"][i], |
| ) |
|
|
| corners = np.array( |
| [ |
| [left, top], |
| [left + width, top], |
| [left + width, top + height], |
| [left, top + height], |
| ], |
| dtype="float32", |
| ) |
| corners_expanded = np.expand_dims(corners, axis=1) |
| original_corners = cv2.transform(corners_expanded, M_inv) |
| squeezed_corners = original_corners.squeeze(axis=1) |
|
|
| min_x = int(np.min(squeezed_corners[:, 0])) |
| max_x = int(np.max(squeezed_corners[:, 0])) |
| min_y = int(np.min(squeezed_corners[:, 1])) |
| max_y = int(np.max(squeezed_corners[:, 1])) |
|
|
| remapped_boxes_list.append( |
| { |
| "text": temp_local_output["text"][i], |
| "left": min_x, |
| "top": min_y, |
| "width": max_x - min_x, |
| "height": max_y - min_y, |
| "conf": temp_local_output["conf"][i], |
| } |
| ) |
|
|
| remapped_output = {k: [] for k in temp_local_output.keys()} |
| for box in remapped_boxes_list: |
| for key in remapped_output.keys(): |
| remapped_output[key].append(box[key]) |
|
|
| if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: |
| output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_shortened_line_text}_final_boxes.png" |
| os.makedirs(f"{self.output_folder}/word_segmentation", exist_ok=True) |
| output_image_vis = line_image.copy() |
| for i in range(len(remapped_output["text"])): |
| x, y, w, h = ( |
| int(remapped_output["left"][i]), |
| int(remapped_output["top"][i]), |
| int(remapped_output["width"][i]), |
| int(remapped_output["height"][i]), |
| ) |
| cv2.rectangle(output_image_vis, (x, y), (x + w, y + h), (0, 255, 0), 2) |
| cv2.imwrite(output_path, output_image_vis) |
|
|
| return remapped_output, used_fallback |
|
|
|
|
| class HybridWordSegmenter: |
| """ |
| Implements a two-step approach for word segmentation: |
| 1. Proportional estimation based on text (primary; avoids image noise). |
| 2. Image-based refinement with a "Bounded Scan" that cannot shrink boxes |
| beyond a fraction of the text-based width. |
| |
| Design: Relies more on expected character spacing from the text than on |
| image analysis, so noisy images are less likely to produce tiny or |
| missing boxes. |
| |
| Situations that could otherwise cause very small boxes (and how we mitigate): |
| - False gaps in the vertical projection (noise/speckle) -> refinement is |
| bounded by shrink_limit_fraction; initial boxes use proportional only. |
| - Image-based "justified" gap anchoring picking wrong cuts -> we do not |
| use vertical_projection for initial segmentation here; only proportional. |
| - Bidirectional scan snapping to a thin low-density strip inside a word -> |
| same shrink bound; fallback "thinnest point" also clamped. |
| - De-overlapping stealing space from the next word -> shrink bound keeps |
| each box at least (1 - shrink_limit_fraction) of initial width. |
| |
| ROBUSTNESS UPGRADES: |
| - Uses Horizontal Smearing to prevent cutting inside noisy characters. |
| - Uses Gaussian Blur to suppress speckle noise. |
| - Implements 'Noise Floors' for gap detection (never assumes perfect 0). |
| """ |
|
|
| def convert_line_to_word_level( |
| self, |
| line_data: Dict[str, List], |
| image_width: int, |
| image_height: int, |
| vertical_projection: np.ndarray = None, |
| ) -> Dict[str, List]: |
| """ |
| Step 1: Converts line-level OCR results to word-level using proportional estimation. |
| Includes noise-tolerant gap anchoring for justified text. |
| """ |
| output = { |
| "text": list(), |
| "left": list(), |
| "top": list(), |
| "width": list(), |
| "height": list(), |
| "conf": list(), |
| } |
|
|
| if not line_data or not line_data.get("text"): |
| return output |
|
|
| i = 0 |
| line_text = line_data["text"][i] |
| line_left = float(line_data["left"][i]) |
| line_top = float(line_data["top"][i]) |
| line_width = float(line_data["width"][i]) |
| line_height = float(line_data["height"][i]) |
| line_conf = line_data["conf"][i] |
|
|
| if not line_text.strip(): |
| return output |
| words = line_text.split() |
| if not words: |
| return output |
| num_chars = len("".join(words)) |
| num_spaces = len(words) - 1 |
| if num_chars == 0: |
| return output |
|
|
| |
| if ( |
| vertical_projection is not None |
| and len(vertical_projection) == image_width |
| and num_spaces > 0 |
| ): |
| |
| |
| dynamic_gap_threshold = max(255.0 * 0.03 * image_height, 255.0 * 2) |
| gaps = _find_widest_zero_gaps( |
| vertical_projection, n=num_spaces, gap_threshold=dynamic_gap_threshold |
| ) |
| if len(gaps) == num_spaces: |
| cuts = [0] |
| for start, end in gaps: |
| cuts.append((start + end) // 2) |
| cuts.append(image_width) |
|
|
| for idx, word in enumerate(words): |
| left_px = cuts[idx] |
| right_px = cuts[idx + 1] |
| width_px = max(1, right_px - left_px) |
| output["text"].append(word) |
| output["left"].append(line_left + left_px) |
| output["top"].append(line_top) |
| output["width"].append(width_px) |
| output["height"].append(line_height) |
| output["conf"].append(line_conf) |
| return output |
|
|
| |
| total_line_weight = get_weighted_length(line_text) |
| if total_line_weight <= 0: |
| total_line_weight = 1.0 |
| avg_weight_unit = line_width / total_line_weight |
| estimated_space_width = get_weighted_length(" ") * avg_weight_unit |
|
|
| avg_char_width = line_width / (num_chars if num_chars > 0 else 1) |
| avg_char_width = max(3.0, avg_char_width) |
| min_word_width = max(5.0, avg_char_width * 0.5) |
|
|
| current_left = line_left |
| for word in words: |
| word_weight = get_weighted_length(word) |
| raw_word_width = word_weight * avg_weight_unit |
| word_width = max(min_word_width, raw_word_width) |
|
|
| clamped_left = max(0, min(current_left, image_width)) |
| output["text"].append(word) |
| output["left"].append(clamped_left) |
| output["top"].append(line_top) |
| output["width"].append(word_width) |
| output["height"].append(line_height) |
| output["conf"].append(line_conf) |
| current_left += word_width + estimated_space_width |
|
|
| return output |
|
|
| def _run_single_pass( |
| self, |
| initial_boxes: List[Dict], |
| vertical_projection: np.ndarray, |
| max_scan_distance: int, |
| img_w: int, |
| img_h: int, |
| direction: str = "ltr", |
| trailing_punctuation: List[bool] = None, |
| shrink_limit_fraction: float = 0.5, |
| ) -> List[Dict]: |
| """ |
| Helper function to run one pass of refinement. |
| ROBUSTNESS UPGRADE: |
| - Uses a 'gap_noise_floor' instead of looking for 0. |
| - Enforces 'safety_density_limit': if the "thinnest" point is still thick (ink), |
| it refuses to cut there (prevents cutting bold letters). |
| - shrink_limit_fraction: Refinement cannot shrink a box by more than this fraction |
| of its initial (text-based) width from either edge. Prevents noise from creating |
| tiny boxes; keeps segmentation anchored to expected character spacing. |
| """ |
|
|
| refined_boxes = [box.copy() for box in initial_boxes] |
| if trailing_punctuation is None: |
| trailing_punctuation = [False] * len(initial_boxes) |
|
|
| |
| |
| gap_noise_floor = 255.0 * (img_h * 0.05) |
|
|
| |
| |
| safety_density_limit = 255.0 * (img_h * 0.25) |
|
|
| if direction == "ltr": |
| last_corrected_right_edge = 0 |
| indices = range(len(refined_boxes)) |
| else: |
| next_corrected_left_edge = img_w |
| indices = range(len(refined_boxes) - 1, -1, -1) |
|
|
| for i in indices: |
| box = refined_boxes[i] |
| left = int(box["left"]) |
| right = int(box["left"] + box["width"]) |
| init_width = max(1, int(box["width"])) |
| |
| min_right = right - int(shrink_limit_fraction * init_width) |
| max_left = left + int(shrink_limit_fraction * init_width) |
|
|
| left = max(0, min(left, img_w - 1)) |
| right = max(0, min(right, img_w - 1)) |
|
|
| new_left, new_right = left, right |
|
|
| if direction == "ltr" or direction == "both": |
| if right < img_w: |
| scan_limit = min(img_w, right + max_scan_distance) |
| search_range = range(right, scan_limit) |
|
|
| best_x = right |
| min_density = float("inf") |
| found_gap = False |
| first_gap_x = None |
|
|
| for x in search_range: |
| density = vertical_projection[x] |
|
|
| |
| if density <= gap_noise_floor: |
| first_gap_x = x |
| found_gap = True |
| break |
|
|
| |
| if density < min_density: |
| min_density = density |
| best_x = x |
|
|
| if found_gap and first_gap_x is not None: |
| if trailing_punctuation[i]: |
| |
| |
| proj_len = len(vertical_projection) |
| x_pos = first_gap_x |
|
|
| |
| gap_safety_limit = x_pos + (max_scan_distance // 2) |
| while ( |
| x_pos < scan_limit |
| and x_pos < proj_len |
| and vertical_projection[x_pos] <= gap_noise_floor |
| ): |
| if x_pos >= gap_safety_limit: |
| break |
| x_pos += 1 |
|
|
| |
| blob_start = x_pos |
| blob_safety_limit = blob_start + max(1, int(img_h * 0.5)) |
| while ( |
| x_pos < scan_limit |
| and x_pos < proj_len |
| and vertical_projection[x_pos] > gap_noise_floor |
| ): |
| if x_pos >= blob_safety_limit: |
| x_pos = first_gap_x |
| break |
| x_pos += 1 |
| new_right = min(x_pos, scan_limit) |
| else: |
| new_right = first_gap_x |
|
|
| elif not found_gap: |
| |
| |
| if min_density < safety_density_limit: |
| new_right = best_x |
| else: |
| |
| |
| new_right = right |
|
|
| if direction == "rtl" or direction == "both": |
| if left > 0: |
| scan_limit = max(0, left - max_scan_distance) |
| search_range = range(left, scan_limit, -1) |
|
|
| best_x = left |
| min_density = float("inf") |
| found_gap = False |
|
|
| for x in search_range: |
| density = vertical_projection[x] |
|
|
| if density <= gap_noise_floor: |
| new_left = x |
| found_gap = True |
| break |
|
|
| if density < min_density: |
| min_density = density |
| best_x = x |
|
|
| if not found_gap: |
| |
| if min_density < safety_density_limit: |
| new_left = best_x |
| else: |
| |
| new_left = left |
|
|
| |
| new_right = max(new_right, min_right) |
| new_left = min(new_left, max_left) |
|
|
| |
| if direction == "ltr": |
| if new_left < last_corrected_right_edge: |
| new_left = last_corrected_right_edge |
| if new_right <= new_left: |
| new_right = new_left + 1 |
| last_corrected_right_edge = new_right |
| else: |
| if new_right > next_corrected_left_edge: |
| new_right = next_corrected_left_edge |
| if new_left >= new_right: |
| new_left = new_right - 1 |
| next_corrected_left_edge = new_left |
|
|
| box["left"] = new_left |
| box["width"] = max(1, new_right - new_left) |
|
|
| return refined_boxes |
|
|
| def refine_words_bidirectional( |
| self, |
| line_data: Dict[str, List], |
| line_image: np.ndarray, |
| ) -> Dict[str, List]: |
| """ |
| Refines boxes using a robust bidirectional scan. |
| DIFFERENCE FROM MAIN SEGMENTER: Uses aggressive smoothing and horizontal |
| smearing to force-merge characters, prioritizing word separation over |
| character detail. |
| """ |
| if line_image is None: |
| return line_data |
|
|
| |
| if len(line_image.shape) == 2: |
| gray = np.ascontiguousarray(line_image) |
| else: |
| gray = cv2.cvtColor(line_image, cv2.COLOR_BGR2GRAY) |
| img_h, img_w = gray.shape[:2] |
|
|
| |
| |
| if img_h < 5 or img_w < 5: |
| return self.convert_line_to_word_level(line_data, img_w, img_h) |
|
|
| if line_data and line_data.get("text"): |
| words = line_data["text"][0].split() |
| if len(words) <= 1: |
| return self.convert_line_to_word_level(line_data, img_w, img_h) |
|
|
| |
| |
| |
| |
| |
| |
| try: |
| |
| if gray.dtype.kind in ("f", "c"): |
| gray = np.nan_to_num(gray, nan=0.0, posinf=255.0, neginf=0.0) |
|
|
| |
| |
| if gray.dtype != np.uint8 and gray.dtype != np.float32: |
| |
| gmin = float(np.min(gray)) if gray.size else 0.0 |
| gmax = float(np.max(gray)) if gray.size else 255.0 |
| if gmax > 255.0 or gmin < 0.0: |
| gray = cv2.normalize(gray, None, 0, 255, cv2.NORM_MINMAX) |
| gray = np.clip(gray, 0, 255).astype(np.uint8) |
|
|
| blurred_gray = cv2.GaussianBlur(gray, (5, 5), 0) |
| except Exception: |
| return self.convert_line_to_word_level(line_data, img_w, img_h) |
|
|
| |
| |
| block_size = max(25, int(img_h * 0.5)) |
| if block_size % 2 == 0: |
| block_size += 1 |
|
|
| binary = cv2.adaptiveThreshold( |
| blurred_gray, |
| 255, |
| cv2.ADAPTIVE_THRESH_GAUSSIAN_C, |
| cv2.THRESH_BINARY_INV, |
| block_size, |
| 10, |
| ) |
|
|
| |
| |
| |
| smear_w = max(3, int(img_h * 0.20)) |
| smear_h = max(1, int(img_h * 0.05)) |
| kernel_smear = cv2.getStructuringElement(cv2.MORPH_RECT, (smear_w, smear_h)) |
|
|
| |
| binary_smeared = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel_smear) |
|
|
| |
| vertical_projection = np.sum(binary_smeared, axis=0) |
|
|
| |
| |
| char_blobs = [] |
| in_blob = False |
| blob_start = 0 |
| for x, col_sum in enumerate(vertical_projection): |
| if col_sum > 0 and not in_blob: |
| blob_start = x |
| in_blob = True |
| elif col_sum == 0 and in_blob: |
| char_blobs.append((blob_start, x)) |
| in_blob = False |
| if in_blob: |
| char_blobs.append((blob_start, img_w)) |
|
|
| if not char_blobs: |
| return self.convert_line_to_word_level(line_data, img_w, img_h) |
|
|
| total_chars = len("".join(words)) |
| if total_chars > 0: |
| geom_avg_char_width = img_w / total_chars |
| else: |
| geom_avg_char_width = 10 |
|
|
| blob_avg_char_width = np.mean([end - start for start, end in char_blobs]) |
| safe_avg_char_width = min(blob_avg_char_width, geom_avg_char_width * 1.5) |
|
|
| |
| max_scan_distance = max(int(safe_avg_char_width * 2.5), int(img_h * 0.6)) |
| min_safe_box_width = max(4, int(safe_avg_char_width * 0.5)) |
|
|
| |
| |
| |
| |
| estimated_data = self.convert_line_to_word_level(line_data, img_w, img_h) |
| if not estimated_data["text"]: |
| return estimated_data |
|
|
| initial_boxes = [] |
| for i in range(len(estimated_data["text"])): |
| initial_boxes.append( |
| { |
| "text": estimated_data["text"][i], |
| "left": estimated_data["left"][i], |
| "top": estimated_data["top"][i], |
| "width": estimated_data["width"][i], |
| "height": estimated_data["height"][i], |
| "conf": estimated_data["conf"][i], |
| } |
| ) |
|
|
| trailing_punctuation = [ |
| _word_ends_with_punctuation(estimated_data["text"][j]) |
| for j in range(len(estimated_data["text"])) |
| ] |
|
|
| |
| ltr_boxes = self._run_single_pass( |
| initial_boxes, |
| vertical_projection, |
| max_scan_distance, |
| img_w, |
| img_h, |
| "ltr", |
| trailing_punctuation, |
| ) |
| rtl_boxes = self._run_single_pass( |
| initial_boxes, |
| vertical_projection, |
| max_scan_distance, |
| img_w, |
| img_h, |
| "rtl", |
| trailing_punctuation, |
| ) |
|
|
| |
| combined_boxes = [box.copy() for box in initial_boxes] |
| for i in range(len(combined_boxes)): |
| final_left = ltr_boxes[i]["left"] |
| rtl_right = rtl_boxes[i]["left"] + rtl_boxes[i]["width"] |
| combined_boxes[i]["left"] = final_left |
| combined_boxes[i]["width"] = max(min_safe_box_width, rtl_right - final_left) |
|
|
| for i in range(len(combined_boxes) - 1): |
| if combined_boxes[i + 1]["left"] <= combined_boxes[i]["left"]: |
| combined_boxes[i + 1]["left"] = ( |
| combined_boxes[i]["left"] + min_safe_box_width |
| ) |
|
|
| for i in range(len(combined_boxes) - 1): |
| curr = combined_boxes[i] |
| nxt = combined_boxes[i + 1] |
| gap_width = nxt["left"] - curr["left"] |
| curr["width"] = max(min_safe_box_width, gap_width) |
|
|
| final_output = {k: [] for k in estimated_data.keys()} |
| for box in combined_boxes: |
| |
| box_width = max(1, box["width"]) |
| box["width"] = box_width |
| for key in final_output.keys(): |
| final_output[key].append(box[key]) |
|
|
| return final_output |
|
|