document_redaction / tools /word_segmenter.py
seanpedrickcase's picture
Sync: Merge pull request #199 from seanpedrick-case/startup_optimise
a2e06b3
Raw
History Blame Contribute Delete
83.5 kB
import os
from bisect import bisect_left
from typing import Dict, List, Tuple
import cv2
import numpy as np
from tools.config import OUTPUT_FOLDER, SAVE_WORD_SEGMENTER_OUTPUT_IMAGES
# Adaptive thresholding parameters (resolution-independent via line_height / median CC height)
BLOCK_SIZE_FACTOR = 0.5 # Fraction of line_height when median CC height unavailable
BLOCK_SIZE_MEDIAN_CC_FACTOR = 1.2 # Block size = median_cc_height * this when available
C_VALUE = 2 # Constant subtracted from mean in adaptive thresholding
REFERENCE_LINE_HEIGHT = 50 # Line height (px) at which NOISE_THRESHOLD is defined
# Word segmentation search parameters
INITIAL_KERNEL_WIDTH_FACTOR = 0.0 # Starting kernel width factor for Stage 2 search
INITIAL_VALLEY_THRESHOLD_FACTOR = (
0.0 # Starting valley threshold factor for Stage 1 search
)
MAIN_VALLEY_THRESHOLD_FACTOR = (
0.15 # Primary valley threshold factor for word separation
)
MIN_SPACE_FACTOR = 0.2 # Minimum space width relative to character width
MATCH_TOLERANCE = 0 # Tolerance for word count matching
# Noise removal parameters (resolution-independent: derived from line_height)
MIN_AREA_HEIGHT_FRACTION = 0.05 # MIN_AREA = (line_height * this)^2
MIN_AREA_FLOOR = 2 # Minimum pixel area floor for very low-res lines
DEFAULT_TRIM_PERCENTAGE = (
0.2 # Percentage to trim from top/bottom for vertical cropping
)
# Skew detection parameters
MIN_SKEW_THRESHOLD = 0.5 # Ignore angles smaller than this (likely noise)
MAX_SKEW_THRESHOLD = 15.0 # Angles larger than this are extreme and likely errors
# Baseline (Hough) skew: minimum bottom points to use baseline method; Hough threshold
SKEW_BASELINE_MIN_POINTS = 20
SKEW_HOUGH_THRESHOLD = 25 # Min votes for a line to be considered
ALLOWED_WORD_MISMATCH_COUNT = 0 # Maximum allowed difference in word count between the target and the detected words during the word segmentation process. If above this, it will use the fallback segmenter.
# Noise detection: if estimated noise (Laplacian variance) is above this (at REFERENCE_LINE_HEIGHT),
# skip primary segmentation and use fallback. Scaled by line_height for resolution independence.
NOISE_THRESHOLD = 800
# Polarity: binarization assumes dark text on light background. If estimated background
# mean is below this, the image is treated as light-on-dark and inverted before binarization.
POLARITY_MEAN_THRESHOLD = 128
POLARITY_CORNER_FRACTION = (
0.15 # Fraction of width/height used for corner/edge sampling
)
SEARCH_STAGE1_COARSE_STEP = 0.06
SEARCH_STAGE1_FINE_STEP = 0.02
SEARCH_STAGE2_COARSE_STEP = 0.05
SEARCH_STAGE2_FINE_STEP = 0.02
def _find_widest_zero_gaps(
vertical_projection: np.ndarray,
n: int,
gap_threshold: float = 0.0,
) -> List[Tuple[int, int]]:
"""
Find the N widest contiguous zero-gaps (or near-zero) in the vertical projection.
Used for justified text: anchor word cut points to the centers of these gaps.
Returns list of (start, end) in left-to-right order, or empty if not enough gaps.
"""
if vertical_projection is None or n <= 0:
return []
w = len(vertical_projection)
gaps = []
in_gap = False
start = 0
for x in range(w):
val = vertical_projection[x] if x < w else 0
if val <= gap_threshold and not in_gap:
start = x
in_gap = True
elif val > gap_threshold and in_gap:
gaps.append((start, x))
in_gap = False
if in_gap:
gaps.append((start, w))
if not gaps:
return []
# Sort by width descending, take first n
gaps_by_width = sorted(gaps, key=lambda g: g[1] - g[0], reverse=True)
selected = gaps_by_width[:n]
# Sort by position (left-to-right) for cutting
selected.sort(key=lambda g: g[0])
return selected
# Punctuation that often sits after a word with a visible gap (anchor to include in word box)
TRAILING_PUNCTUATION_CHARS = frozenset(".,:;\"'!?)]}")
def _word_ends_with_punctuation(word: str) -> bool:
"""True if word ends with a punctuation character that may have a gap before it."""
return bool(word and word[-1] in TRAILING_PUNCTUATION_CHARS)
def get_weighted_length(text: str) -> float:
"""
Proportional-font heuristic: sum character width weights instead of counting chars.
Narrow chars (i, l, 1, punctuation) get < 1.0; wide chars (W, M, w) get > 1.0.
Used by HybridWordSegmenter.convert_line_to_word_level for better blind estimation.
"""
width = 0.0
weights = {
"i": 0.4,
"l": 0.4,
"1": 0.4,
"t": 0.6,
"j": 0.4,
".": 0.3,
",": 0.3,
"!": 0.3,
"'": 0.3,
"W": 1.3,
"M": 1.3,
"m": 1.3,
"w": 1.2,
"@": 1.2,
"%": 1.2,
" ": 0.5, # space between words
}
for char in text:
base = 1.1 if char.isupper() else 1.0
width += weights.get(char, base)
return width
def _sanitize_filename(filename: str, max_length: int = 100) -> str:
"""
Sanitizes a string to be used as a valid filename.
Removes or replaces invalid characters for Windows/Linux file systems.
Args:
filename: The string to sanitize
max_length: Maximum length of the sanitized filename
Returns:
A sanitized string safe for use in file names
"""
if not filename:
return "unnamed"
# Replace spaces with underscores
sanitized = filename.replace(" ", "_")
# Remove or replace invalid characters for Windows/Linux
# Invalid: < > : " / \ | ? *
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
sanitized = sanitized.replace(char, "_")
# Remove control characters
sanitized = "".join(
char for char in sanitized if ord(char) >= 32 or char in "\n\r\t"
)
# Remove leading/trailing dots and spaces (Windows doesn't allow these)
sanitized = sanitized.strip(". ")
# Replace multiple consecutive underscores with a single one
while "__" in sanitized:
sanitized = sanitized.replace("__", "_")
# Truncate if too long
if len(sanitized) > max_length:
sanitized = sanitized[:max_length]
# Ensure it's not empty after sanitization
if not sanitized:
sanitized = "unnamed"
return sanitized
class AdaptiveSegmenter:
"""
Line to word segmentation pipeline. It features:
1. Adaptive Thresholding.
2. Targeted Noise Removal using Connected Component Analysis.
3. The robust two-stage adaptive search (Valley -> Kernel).
4. CCA for final pixel-perfect refinement.
"""
def __init__(self, output_folder: str = OUTPUT_FOLDER):
self.output_folder = output_folder
self.fallback_segmenter = HybridWordSegmenter()
def _correct_orientation(
self, gray_image: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""
Detects and corrects 90-degree orientation issues.
"""
h, w = gray_image.shape
center = (w // 2, h // 2)
block_size = 21
if h < block_size:
block_size = h if h % 2 != 0 else h - 1
if block_size > 3:
binary = cv2.adaptiveThreshold(
gray_image,
255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV,
block_size,
4,
)
else:
_, binary = cv2.threshold(
gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU
)
opening_kernel = np.ones((2, 2), np.uint8)
binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, opening_kernel)
coords = np.column_stack(np.where(binary > 0))
if len(coords) < 50:
M_orient = cv2.getRotationMatrix2D(center, 0, 1.0)
return gray_image, M_orient
ymin, xmin = coords.min(axis=0)
ymax, xmax = coords.max(axis=0)
box_height = ymax - ymin
box_width = xmax - xmin
orientation_angle = 0.0
if box_height > box_width:
orientation_angle = 90.0
else:
M_orient = cv2.getRotationMatrix2D(center, 0, 1.0)
return gray_image, M_orient
M_orient = cv2.getRotationMatrix2D(center, orientation_angle, 1.0)
new_w, new_h = h, w
M_orient[0, 2] += (new_w - w) / 2
M_orient[1, 2] += (new_h - h) / 2
oriented_gray = cv2.warpAffine(
gray_image,
M_orient,
(new_w, new_h),
flags=cv2.INTER_CUBIC,
borderMode=cv2.BORDER_REPLICATE,
)
return oriented_gray, M_orient
def _skew_angle_from_baseline(self, binary: np.ndarray) -> float:
"""
Estimate skew angle from the text baseline using bottom points of foreground
and Hough line transform. More stable than minAreaRect for short words or
lines with ascenders/descenders (e.g. "all"). Returns correction angle in
degrees, or None if baseline cannot be reliably estimated.
"""
h, w = binary.shape
# For each column, take the bottom-most foreground pixel (baseline point)
bottom_points = []
for x in range(w):
col = binary[:, x]
on_pixels = np.where(col > 0)[0]
if len(on_pixels) > 0:
y_bottom = int(np.max(on_pixels))
bottom_points.append((x, y_bottom))
if len(bottom_points) < SKEW_BASELINE_MIN_POINTS:
return None
# Draw baseline points on a blank image for Hough
baseline_img = np.zeros((h, w), dtype=np.uint8)
for x, y in bottom_points:
baseline_img[y, x] = 255
# Slight dilation so Hough sees a denser line
kernel = np.ones((2, 2), np.uint8)
baseline_img = cv2.dilate(baseline_img, kernel)
lines = cv2.HoughLines(
baseline_img,
rho=1,
theta=np.pi / 180,
threshold=SKEW_HOUGH_THRESHOLD,
)
if lines is None or len(lines) == 0:
return None
# Score each line by number of bottom points near it; take best
best_angle = None
best_score = 0
dist_thresh = max(2, h // 30)
for line in lines:
rho, theta = line[0]
# Line equation: rho = x*cos(theta) + y*sin(theta). Perpendicular is at angle theta.
# Baseline angle from horizontal = theta - 90°. To level it we rotate by -(theta - 90°) = 90° - theta.
correction_deg = 90.0 - np.degrees(theta)
# Normalize to [-90, 90] for comparison
if correction_deg > 90:
correction_deg -= 180
elif correction_deg < -90:
correction_deg += 180
score = 0
for x, y in bottom_points:
# Distance from (x,y) to line rho = x*cos(theta)+y*sin(theta)
d = abs(x * np.cos(theta) + y * np.sin(theta) - rho)
if d <= dist_thresh:
score += 1
if score > best_score:
best_score = score
best_angle = correction_deg
if best_angle is None:
return None
return float(best_angle)
def _skew_angle_from_min_area_rect(
self, coords: np.ndarray, w: int, h: int
) -> float:
"""Fallback: skew angle from minAreaRect of all foreground pixels."""
if len(coords) < 50:
return 0.0
rect = cv2.minAreaRect(coords[:, ::-1])
rect_width, rect_height = rect[1]
angle = rect[2]
if rect_width < rect_height:
angle += 90
if angle > 45:
angle -= 90
elif angle < -45:
angle += 90
return float(angle)
def _deskew_image(self, gray_image: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Detects skew using baseline (Hough on bottom points of letters) when possible,
which is more stable for short words and ascenders/descenders; falls back to
minAreaRect otherwise.
"""
h, w = gray_image.shape
block_size = 21
if h < block_size:
block_size = h if h % 2 != 0 else h - 1
if block_size > 3:
binary = cv2.adaptiveThreshold(
gray_image,
255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV,
block_size,
4,
)
else:
_, binary = cv2.threshold(
gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU
)
opening_kernel = np.ones((2, 2), np.uint8)
binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, opening_kernel)
coords = np.column_stack(np.where(binary > 0))
if len(coords) < 50:
M = cv2.getRotationMatrix2D((w // 2, h // 2), 0, 1.0)
return gray_image, M
# Prefer baseline-based skew (stable for short words / ascenders-descenders)
correction_angle = self._skew_angle_from_baseline(binary)
if correction_angle is None:
correction_angle = self._skew_angle_from_min_area_rect(coords, w, h)
if abs(correction_angle) < MIN_SKEW_THRESHOLD:
correction_angle = 0.0
elif abs(correction_angle) > MAX_SKEW_THRESHOLD:
correction_angle = 0.0
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, correction_angle, 1.0)
deskewed_gray = cv2.warpAffine(
gray_image,
M,
(w, h),
flags=cv2.INTER_CUBIC,
borderMode=cv2.BORDER_REPLICATE,
)
return deskewed_gray, M
def _estimate_quick_skew_degrees(self, gray_image: np.ndarray) -> float:
"""Cheap skew estimate used to skip expensive orientation/deskew when safe."""
if gray_image is None or gray_image.size == 0:
return 0.0
h, w = gray_image.shape[:2]
if h < 8 or w < 8:
return 0.0
_, quick_bin = cv2.threshold(
gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU
)
coords = np.column_stack(np.where(quick_bin > 0))
if len(coords) < 30:
return 0.0
rect = cv2.minAreaRect(coords[:, ::-1])
rect_w, rect_h = rect[1]
angle = float(rect[2])
if rect_w < rect_h:
angle += 90.0
if angle > 45.0:
angle -= 90.0
elif angle < -45.0:
angle += 90.0
return angle
def _can_skip_expensive_preprocess(self, gray_image: np.ndarray) -> bool:
"""Return True when line is already horizontal enough for direct segmentation."""
if gray_image is None or gray_image.size == 0:
return True
h, w = gray_image.shape[:2]
if h <= 0 or w <= 0:
return True
if w <= int(h * 1.2):
return False
skew = self._estimate_quick_skew_degrees(gray_image)
return abs(skew) < 1.2
def _get_boxes_from_profile(
self,
binary_image: np.ndarray,
stable_avg_char_width: float,
min_space_factor: float,
valley_threshold_factor: float,
) -> List:
"""
Extracts word bounding boxes from vertical projection profile.
"""
img_h, img_w = binary_image.shape
vertical_projection = np.sum(binary_image, axis=0)
peaks = vertical_projection[vertical_projection > 0]
if len(peaks) == 0:
return []
avg_peak_height = np.mean(peaks)
valley_threshold = int(avg_peak_height * valley_threshold_factor)
min_space_width = int(stable_avg_char_width * min_space_factor)
patched_projection = vertical_projection.copy()
in_gap = False
gap_start = 0
for x, col_sum in enumerate(patched_projection):
if col_sum <= valley_threshold and not in_gap:
in_gap = True
gap_start = x
elif col_sum > valley_threshold and in_gap:
in_gap = False
if (x - gap_start) < min_space_width:
patched_projection[gap_start:x] = int(avg_peak_height)
unlabeled_boxes = []
in_word = False
start_x = 0
for x, col_sum in enumerate(patched_projection):
if col_sum > valley_threshold and not in_word:
start_x = x
in_word = True
elif col_sum <= valley_threshold and in_word:
# [NOTE] Returns full height stripe
unlabeled_boxes.append((start_x, 0, x - start_x, img_h))
in_word = False
if in_word:
unlabeled_boxes.append((start_x, 0, img_w - start_x, img_h))
return unlabeled_boxes
def _enforce_logical_constraints(
self, output: Dict[str, List], image_width: int, image_height: int
) -> Dict[str, List]:
"""
Enforces geometric sanity checks with 2D awareness.
"""
if not output or not output["text"]:
return output
num_items = len(output["text"])
boxes = []
for i in range(num_items):
boxes.append(
{
"text": output["text"][i],
"left": int(output["left"][i]),
"top": int(output["top"][i]),
"width": int(output["width"][i]),
"height": int(output["height"][i]),
"conf": output["conf"][i],
}
)
valid_boxes = []
for box in boxes:
x0 = max(0, box["left"])
y0 = max(0, box["top"])
x1 = min(image_width, box["left"] + box["width"])
y1 = min(image_height, box["top"] + box["height"])
w = x1 - x0
h = y1 - y0
if w > 0 and h > 0:
box["left"] = x0
box["top"] = y0
box["width"] = w
box["height"] = h
valid_boxes.append(box)
boxes = valid_boxes
is_vertical = image_height > (image_width * 1.2)
if is_vertical:
boxes.sort(key=lambda b: (b["top"], b["left"]))
else:
boxes.sort(key=lambda b: (b["left"], -b["width"]))
final_pass_boxes = []
if boxes:
keep_indices = [True] * len(boxes)
# Fast path: adjacent comparisons after sorting removes most duplicates
# without full O(n^2) cross checks.
for i in range(len(boxes) - 1):
b1 = boxes[i]
b2 = boxes[i + 1]
x_nested = (b1["left"] >= b2["left"] - 2) and (
b1["left"] + b1["width"] <= b2["left"] + b2["width"] + 2
)
y_nested = (b1["top"] >= b2["top"] - 2) and (
b1["top"] + b1["height"] <= b2["top"] + b2["height"] + 2
)
if x_nested and y_nested and b1["text"] == b2["text"]:
if b1["width"] * b1["height"] <= b2["width"] * b2["height"]:
keep_indices[i] = False
# Also evaluate opposite containment (b2 inside b1).
x_nested_rev = (b2["left"] >= b1["left"] - 2) and (
b2["left"] + b2["width"] <= b1["left"] + b1["width"] + 2
)
y_nested_rev = (b2["top"] >= b1["top"] - 2) and (
b2["top"] + b2["height"] <= b1["top"] + b1["height"] + 2
)
if x_nested_rev and y_nested_rev and b1["text"] == b2["text"]:
if b2["width"] * b2["height"] <= b1["width"] * b1["height"]:
keep_indices[i + 1] = False
for i, keep in enumerate(keep_indices):
if keep:
final_pass_boxes.append(boxes[i])
boxes = final_pass_boxes
if is_vertical:
boxes.sort(key=lambda b: (b["top"], b["left"]))
else:
boxes.sort(key=lambda b: (b["left"], -b["width"]))
for i in range(len(boxes) - 1):
b1 = boxes[i]
b2 = boxes[i + 1]
x_overlap = min(b1["left"] + b1["width"], b2["left"] + b2["width"]) - max(
b1["left"], b2["left"]
)
y_overlap = min(b1["top"] + b1["height"], b2["top"] + b2["height"]) - max(
b1["top"], b2["top"]
)
if x_overlap > 0 and y_overlap > 0:
if is_vertical:
if b1["top"] < b2["top"]:
b1["height"] = max(1, b2["top"] - b1["top"])
else:
if b1["left"] < b2["left"]:
b1_right = b1["left"] + b1["width"]
b2_right = b2["left"] + b2["width"]
left_slice_width = max(0, b2["left"] - b1["left"])
right_slice_width = max(0, b1_right - b2_right)
if b1_right > b2_right and right_slice_width > left_slice_width:
b1["left"] = b2_right
b1["width"] = right_slice_width
else:
b1["width"] = max(1, left_slice_width)
cleaned_output = {
k: [] for k in ["text", "left", "top", "width", "height", "conf"]
}
if is_vertical:
boxes.sort(key=lambda b: (b["top"], b["left"]))
else:
boxes.sort(key=lambda b: (b["left"], -b["width"]))
for box in boxes:
for key in cleaned_output.keys():
cleaned_output[key].append(box[key])
return cleaned_output
def _is_geometry_valid(
self,
boxes: List[Tuple[int, int, int, int]],
words: List[str],
expected_height: float = 0,
) -> bool:
"""
Validates if the detected boxes are physically plausible.
[FIX] Improved robustness for punctuation and mixed-case text.
"""
if len(boxes) != len(words):
return False
baseline = expected_height
# Use median only if provided expected height is unreliable
if baseline < 5:
heights = [b[3] for b in boxes]
if heights:
baseline = np.median(heights)
if baseline < 5:
return True
for i, box in enumerate(boxes):
word = words[i]
# [FIX] Check for punctuation/symbols. They are allowed to be small.
# If word is just punctuation, skip geometry checks
is_punctuation = not any(c.isalnum() for c in word)
if is_punctuation:
continue
# Standard checks for alphanumeric words
num_chars = len(word)
if num_chars < 1:
continue
width = box[2]
height = box[3]
# [FIX] Only reject height if it's REALLY small compared to baseline
# A period might be small, but we skipped that check above.
# This check ensures a real word like "The" isn't 2 pixels tall.
if height < (baseline * 0.20):
return False
avg_char_width = width / num_chars
min_expected = baseline * 0.20
# Only reject if it fails BOTH absolute (4px) and relative checks
if avg_char_width < min_expected and avg_char_width < 4:
# Exception: If the word is 1 char long (e.g. "I", "l", "1"), allow it to be skinny.
if num_chars == 1 and avg_char_width >= 2:
continue
return False
return True
def _estimate_noise(self, gray: np.ndarray) -> float:
"""
Estimate image noisiness using Laplacian variance. Noisy images tend to have
high high-frequency content, so higher values indicate more noise (or very
sharp edges). Used to skip the primary segmentation pipeline when above
NOISE_THRESHOLD and use the fallback segmenter instead.
"""
if gray is None or gray.size == 0:
return 0.0
lap = cv2.Laplacian(gray, cv2.CV_64F, ksize=3)
return float(lap.var())
def _block_size_from_median_cc_height(
self, gray: np.ndarray, line_height: int, fallback_block_size: int
) -> int:
"""
Determine adaptive threshold block size from median height of connected components
(resolution-independent). Uses an Otsu pre-pass to get CCs; if median height is
valid, returns block_size = median_cc_height * BLOCK_SIZE_MEDIAN_CC_FACTOR.
Otherwise returns fallback_block_size (e.g. from line_height).
"""
if gray is None or gray.size == 0 or line_height < 3:
return fallback_block_size
_, otsu_binary = cv2.threshold(
gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU
)
num_labels, _, stats, _ = cv2.connectedComponentsWithStats(
otsu_binary, 8, cv2.CV_32S
)
if num_labels < 3: # background + need at least 2 components
return fallback_block_size
areas = stats[1:, cv2.CC_STAT_AREA]
heights = stats[1:, cv2.CC_STAT_HEIGHT]
min_area_cc = max(2, int((line_height * 0.02) ** 2))
valid = areas >= min_area_cc
if not np.any(valid):
return fallback_block_size
median_h = np.median(heights[valid])
if np.isnan(median_h) or median_h < 2:
return fallback_block_size
block = max(3, int(median_h * BLOCK_SIZE_MEDIAN_CC_FACTOR))
if block % 2 == 0:
block += 1
return block
def _normalize_polarity_for_binarization(self, gray: np.ndarray) -> np.ndarray:
"""
Ensure we work with dark-text-on-light-background for binarization. If the
image is mostly dark (light text on dark background), invert it so that
adaptive threshold and projection profile logic behave correctly.
Uses corner/edge regions to estimate background (typical in documents);
falls back to global mean for very small or full-page line crops.
"""
if gray is None or gray.size == 0:
return gray
h, w = gray.shape
frac = POLARITY_CORNER_FRACTION
# Sample corners and edges (background is often visible there)
margin_w = max(1, int(w * frac))
margin_h = max(1, int(h * frac))
corner_pixels = []
if margin_w < w and margin_h < h:
top_left = gray[:margin_h, :margin_w]
top_right = gray[:margin_h, -margin_w:]
bottom_left = gray[-margin_h:, :margin_w]
bottom_right = gray[-margin_h:, -margin_w:]
for region in (top_left, top_right, bottom_left, bottom_right):
corner_pixels.append(region.ravel())
if corner_pixels:
corner_pixels = np.concatenate(corner_pixels)
background_mean = float(np.mean(corner_pixels))
else:
background_mean = float(np.mean(gray))
else:
background_mean = float(np.mean(gray))
if background_mean < POLARITY_MEAN_THRESHOLD:
return cv2.bitwise_not(gray)
return gray
def segment(
self,
line_data: Dict[str, List],
line_image: np.ndarray,
min_space_factor=MIN_SPACE_FACTOR,
match_tolerance=MATCH_TOLERANCE,
image_name: str = None,
) -> Tuple[Dict[str, List], bool]:
if (
line_image is None
or not isinstance(line_image, np.ndarray)
or line_image.size == 0
):
return ({}, False)
# Allow grayscale (2 dims) or color (3 dims)
if len(line_image.shape) < 2:
return ({}, False)
if not line_data or not line_data.get("text") or len(line_data["text"]) == 0:
return ({}, False)
line_text = line_data["text"][0]
words = line_text.split()
# Early return if 1 or fewer words
if len(words) <= 1:
img_h, img_w = line_image.shape[:2]
one_word_result = self.fallback_segmenter.convert_line_to_word_level(
line_data, img_w, img_h
)
return (one_word_result, False)
# Validate that line_image is not empty before processing
if line_image is None or line_image.size == 0 or len(line_image.shape) < 2:
# If line_image is empty, fall back to proportional estimation
return {}, False
line_number = line_data["line"][0]
safe_image_name = "image"
safe_line_number = str(line_number)
safe_shortened_line_text = "line"
if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES:
safe_image_name = _sanitize_filename(image_name or "image", max_length=50)
safe_line_number = _sanitize_filename(str(line_number), max_length=10)
safe_shortened_line_text = _sanitize_filename(line_text, max_length=10)
if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES:
os.makedirs(self.output_folder, exist_ok=True)
output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_line_number}_{safe_shortened_line_text}_original.png"
os.makedirs(f"{self.output_folder}/word_segmentation", exist_ok=True)
# Only write if image is valid
if line_image.size > 0 and len(line_image.shape) >= 2:
cv2.imwrite(output_path, line_image)
if len(line_image.shape) == 3:
gray = cv2.cvtColor(line_image, cv2.COLOR_BGR2GRAY)
else:
gray = line_image.copy()
# ========================================================================
# IMAGE PREPROCESSING (Deskew / Rotate)
# ========================================================================
if self._can_skip_expensive_preprocess(gray):
h, w = gray.shape[:2]
deskewed_gray = gray
deskewed_line_image = line_image.copy()
M = np.array([[1.0, 0.0, 0.0], [0.0, 1.0, 0.0]], dtype=np.float32)
else:
oriented_gray, M_orient = self._correct_orientation(gray)
deskewed_gray, M_skew = self._deskew_image(oriented_gray)
# Combine matrices: M_total = M_skew * M_orient
M_orient_3x3 = np.vstack([M_orient, [0, 0, 1]])
M_skew_3x3 = np.vstack([M_skew, [0, 0, 1]])
M_total_3x3 = M_skew_3x3 @ M_orient_3x3
M = M_total_3x3[0:2, :] # Extract 2x3 affine matrix
# Apply transformation to the original color image
h, w = deskewed_gray.shape
deskewed_line_image = cv2.warpAffine(
line_image,
M,
(w, h),
flags=cv2.INTER_CUBIC,
borderMode=cv2.BORDER_REPLICATE,
)
# [FIX] Create Local Line Data that matches the deskewed/rotated image dimensions.
# This prevents the fallback segmenter from using vertical dimensions on a horizontal image.
local_line_data = {
"text": line_data["text"],
"conf": line_data["conf"],
"left": [0], # Local coordinate system starts at 0
"top": [0],
"width": [w], # Use the ROTATED width
"height": [h], # Use the ROTATED height
"line": line_data.get("line", [0]),
}
if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES:
os.makedirs(self.output_folder, exist_ok=True)
output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_line_number}_{safe_shortened_line_text}_deskewed.png"
cv2.imwrite(output_path, deskewed_line_image)
# ========================================================================
# MAIN SEGMENTATION PIPELINE
# ========================================================================
approx_char_count = len(line_data["text"][0].replace(" ", ""))
if approx_char_count == 0:
return {}, False
img_h, img_w = deskewed_gray.shape
line_height = img_h
estimated_char_height = img_h * 0.6
avg_char_width_approx = img_w / approx_char_count
# Block size from line height (resolution-independent); could be refined from median CC height in two-pass
block_size = max(3, int(line_height * BLOCK_SIZE_FACTOR))
if block_size % 2 == 0:
block_size += 1
# Noise threshold scaled by line height so behavior is resolution-independent
effective_noise_threshold = NOISE_THRESHOLD * (
line_height / REFERENCE_LINE_HEIGHT
)
# --- Noise check: skip primary pipeline if image is too noisy ---
noise_level = self._estimate_noise(deskewed_gray)
if noise_level > effective_noise_threshold:
used_fallback = True
final_output = self.fallback_segmenter.refine_words_bidirectional(
local_line_data, deskewed_line_image
)
else:
# --- Polarity: ensure dark text on light background for binarization ---
gray_for_binary = self._normalize_polarity_for_binarization(deskewed_gray)
# Refine block size from median CC height (Otsu pre-pass) when possible
block_size = self._block_size_from_median_cc_height(
gray_for_binary, line_height, block_size
)
# --- Binarization ---
binary_adaptive = cv2.adaptiveThreshold(
gray_for_binary,
255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV,
block_size,
C_VALUE,
)
otsu_thresh_val, _ = cv2.threshold(
gray_for_binary, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU
)
strict_thresh_val = otsu_thresh_val * 0.75
_, binary_strict = cv2.threshold(
gray_for_binary, strict_thresh_val, 255, cv2.THRESH_BINARY_INV
)
binary = cv2.bitwise_and(binary_adaptive, binary_strict)
if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES:
output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_line_number}_{safe_shortened_line_text}_binary.png"
cv2.imwrite(output_path, binary)
# --- Morphological Closing ---
morph_width = max(3, int(avg_char_width_approx * 0.40))
morph_height = max(2, int(avg_char_width_approx * 0.1))
kernel = cv2.getStructuringElement(
cv2.MORPH_RECT, (morph_width, morph_height)
)
closed_binary = cv2.morphologyEx(
binary, cv2.MORPH_CLOSE, kernel, iterations=1
)
# --- Noise Removal ---
num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(
closed_binary, 8, cv2.CV_32S
)
clean_binary = np.zeros_like(binary)
force_fallback = False
significant_labels = 0
if num_labels > 1:
# Only count components with area > 3 pixels
significant_labels = np.sum(stats[1:, cv2.CC_STAT_AREA] > 3)
if approx_char_count > 0 and significant_labels > (approx_char_count * 12):
force_fallback = True
if num_labels > 1:
areas = stats[1:, cv2.CC_STAT_AREA]
if len(areas) == 0:
clean_binary = binary
areas = np.array([0])
else:
p1 = np.percentile(areas, 1)
img_h, img_w = binary.shape
line_h = img_h
estimated_char_height = img_h * 0.7
# Resolution-independent min area: (line_height * 0.05)^2 with floor
min_area_threshold = max(
MIN_AREA_FLOOR,
int((line_h * MIN_AREA_HEIGHT_FRACTION) ** 2),
)
estimated_min_letter_area = max(
2,
int(estimated_char_height * 0.2 * estimated_char_height * 0.15),
)
area_threshold = max(
min_area_threshold, min(p1, estimated_min_letter_area)
)
# Gap detection logic...
sorted_areas = np.sort(areas)
area_diffs = np.diff(sorted_areas)
if len(sorted_areas) > 10 and len(area_diffs) > 0:
jump_threshold = np.percentile(area_diffs, 95)
significant_jump_thresh = max(10, jump_threshold * 3)
jump_indices = np.where(area_diffs > significant_jump_thresh)[0]
if len(jump_indices) > 0:
gap_idx = jump_indices[0]
area_before_gap = sorted_areas[gap_idx]
final_threshold = max(area_before_gap + 1, area_threshold)
final_threshold = min(final_threshold, 15)
area_threshold = final_threshold
for i in range(1, num_labels):
if stats[i, cv2.CC_STAT_AREA] >= area_threshold:
clean_binary[labels == i] = 255
else:
clean_binary = binary
# Validate clean_binary is not empty before proceeding
if (
clean_binary is None
or clean_binary.size == 0
or len(clean_binary.shape) < 2
):
# If clean_binary is empty, fall back to proportional estimation
return {}, False
# --- Vertical Cropping ---
horizontal_projection = np.sum(clean_binary, axis=1)
y_start = 0
non_zero_rows = np.where(horizontal_projection > 0)[0]
if len(non_zero_rows) > 0:
p_top = int(np.percentile(non_zero_rows, 5))
p_bottom = int(np.percentile(non_zero_rows, 95))
core_height = p_bottom - p_top
trim_pixels = int(core_height * 0.1)
y_start = max(0, p_top + trim_pixels)
y_end = min(clean_binary.shape[0], p_bottom - trim_pixels)
if y_end - y_start < 5:
y_start = p_top
y_end = p_bottom
# Ensure y_end > y_start to avoid empty slice
if y_end > y_start:
analysis_image = clean_binary[y_start:y_end, :]
else:
# If slice would be empty, use the full image
analysis_image = clean_binary
else:
analysis_image = clean_binary
# Validate that analysis_image is not empty before proceeding
if (
analysis_image is None
or analysis_image.size == 0
or len(analysis_image.shape) < 2
):
# If analysis_image is empty, fall back to proportional estimation
return {}, False
if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES:
# Validate that analysis_image is not empty before writing
if analysis_image.size > 0 and len(analysis_image.shape) >= 2:
output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_line_number}_{safe_shortened_line_text}_clean_binary.png"
cv2.imwrite(output_path, analysis_image)
# --- Adaptive Search ---
best_boxes = None
successful_binary_image = None
if not force_fallback:
words = line_data["text"][0].split()
target = len(words)
backup_boxes_s1 = None
best_stage1_diff = float("inf")
best_stage1_factor = INITIAL_VALLEY_THRESHOLD_FACTOR
# STAGE 1
stage1_coarse = np.arange(
INITIAL_VALLEY_THRESHOLD_FACTOR, 0.60, SEARCH_STAGE1_COARSE_STEP
)
for v_factor in stage1_coarse:
curr_boxes = self._get_boxes_from_profile(
analysis_image,
avg_char_width_approx,
min_space_factor,
v_factor,
)
diff = abs(target - len(curr_boxes))
is_geom_valid = self._is_geometry_valid(
curr_boxes, words, estimated_char_height
)
if diff < best_stage1_diff:
best_stage1_diff = diff
best_stage1_factor = float(v_factor)
if diff == 0:
if is_geom_valid:
best_boxes = curr_boxes
successful_binary_image = analysis_image
break
else:
if backup_boxes_s1 is None:
backup_boxes_s1 = curr_boxes
if (
diff <= ALLOWED_WORD_MISMATCH_COUNT
and backup_boxes_s1 is None
and is_geom_valid
):
backup_boxes_s1 = curr_boxes
# Refine around best coarse factor only when needed.
if best_boxes is None:
lower = max(
INITIAL_VALLEY_THRESHOLD_FACTOR,
best_stage1_factor - SEARCH_STAGE1_COARSE_STEP,
)
upper = min(0.60, best_stage1_factor + SEARCH_STAGE1_COARSE_STEP)
for v_factor in np.arange(
lower, upper + 1e-9, SEARCH_STAGE1_FINE_STEP
):
curr_boxes = self._get_boxes_from_profile(
analysis_image,
avg_char_width_approx,
min_space_factor,
v_factor,
)
diff = abs(target - len(curr_boxes))
is_geom_valid = self._is_geometry_valid(
curr_boxes, words, estimated_char_height
)
if diff == 0 and is_geom_valid:
best_boxes = curr_boxes
successful_binary_image = analysis_image
break
if (
diff <= ALLOWED_WORD_MISMATCH_COUNT
and backup_boxes_s1 is None
and is_geom_valid
):
backup_boxes_s1 = curr_boxes
# STAGE 2 (if needed)
if best_boxes is None:
backup_boxes_s2 = None
best_stage2_diff = float("inf")
best_stage2_factor = INITIAL_KERNEL_WIDTH_FACTOR
for k_factor in np.arange(
INITIAL_KERNEL_WIDTH_FACTOR, 0.5, SEARCH_STAGE2_COARSE_STEP
):
k_w = max(1, int(avg_char_width_approx * k_factor))
s2_bin = cv2.morphologyEx(
clean_binary, cv2.MORPH_CLOSE, np.ones((1, k_w), np.uint8)
)
s2_img = (
s2_bin[y_start:y_end, :]
if len(non_zero_rows) > 0
else s2_bin
)
if s2_img is None or s2_img.size == 0:
continue
curr_boxes = self._get_boxes_from_profile(
s2_img,
avg_char_width_approx,
min_space_factor,
MAIN_VALLEY_THRESHOLD_FACTOR,
)
diff = abs(target - len(curr_boxes))
if diff < best_stage2_diff:
best_stage2_diff = diff
best_stage2_factor = float(k_factor)
is_geom_valid = self._is_geometry_valid(
curr_boxes, words, estimated_char_height
)
if diff == 0 and is_geom_valid:
best_boxes = curr_boxes
successful_binary_image = s2_bin
break
if (
diff <= ALLOWED_WORD_MISMATCH_COUNT
and backup_boxes_s2 is None
and is_geom_valid
):
backup_boxes_s2 = curr_boxes
if best_boxes is None:
lower = max(
INITIAL_KERNEL_WIDTH_FACTOR,
best_stage2_factor - SEARCH_STAGE2_COARSE_STEP,
)
upper = min(0.5, best_stage2_factor + SEARCH_STAGE2_COARSE_STEP)
for k_factor in np.arange(
lower, upper + 1e-9, SEARCH_STAGE2_FINE_STEP
):
k_w = max(1, int(avg_char_width_approx * k_factor))
s2_bin = cv2.morphologyEx(
clean_binary,
cv2.MORPH_CLOSE,
np.ones((1, k_w), np.uint8),
)
s2_img = (
s2_bin[y_start:y_end, :]
if len(non_zero_rows) > 0
else s2_bin
)
if s2_img is None or s2_img.size == 0:
continue
curr_boxes = self._get_boxes_from_profile(
s2_img,
avg_char_width_approx,
min_space_factor,
MAIN_VALLEY_THRESHOLD_FACTOR,
)
diff = abs(target - len(curr_boxes))
is_geom_valid = self._is_geometry_valid(
curr_boxes, words, estimated_char_height
)
if diff == 0 and is_geom_valid:
best_boxes = curr_boxes
successful_binary_image = s2_bin
break
if (
diff <= ALLOWED_WORD_MISMATCH_COUNT
and backup_boxes_s2 is None
and is_geom_valid
):
backup_boxes_s2 = curr_boxes
if best_boxes is None:
if backup_boxes_s1 is not None:
best_boxes = backup_boxes_s1
successful_binary_image = analysis_image
elif backup_boxes_s2 is not None:
best_boxes = backup_boxes_s2
successful_binary_image = clean_binary
final_output = None
used_fallback = False
if best_boxes is None:
# --- FALLBACK WITH ROTATED DATA ---
used_fallback = True
# [FIX] Use local_line_data (rotated dims) instead of line_data (original dims)
final_output = self.fallback_segmenter.refine_words_bidirectional(
local_line_data, deskewed_line_image
)
else:
# --- CCA Refinement ---
unlabeled_boxes = best_boxes
if successful_binary_image is analysis_image:
cca_source_image = clean_binary
else:
cca_source_image = successful_binary_image
num_labels, _, stats, _ = cv2.connectedComponentsWithStats(
cca_source_image, 8, cv2.CV_32S
)
cca_img_h, cca_img_w = cca_source_image.shape[:2]
component_assignments = {}
num_proc = min(len(words), len(unlabeled_boxes))
min_valid_component_area = estimated_char_height * 2
box_meta = []
for i in range(num_proc):
box_x, box_y, box_w, box_h = unlabeled_boxes[i]
box_r = box_x + box_w
box_center_x = box_x + box_w / 2
box_meta.append((i, box_x, box_r, box_center_x, box_w))
box_starts = [meta[1] for meta in box_meta]
for j in range(1, num_labels):
comp_x = stats[j, cv2.CC_STAT_LEFT]
comp_w = stats[j, cv2.CC_STAT_WIDTH]
comp_area = stats[j, cv2.CC_STAT_AREA]
comp_r = comp_x + comp_w
comp_center_x = comp_x + comp_w / 2
comp_y = stats[j, cv2.CC_STAT_TOP]
comp_h = stats[j, cv2.CC_STAT_HEIGHT]
comp_center_y = comp_y + comp_h / 2
if (
comp_center_y < cca_img_h * 0.1
or comp_center_y > cca_img_h * 0.9
):
continue
if comp_area < min_valid_component_area:
continue
best_box_idx = None
max_overlap = 0
best_center_distance = float("inf")
component_center_in_box = False
# Assign components to boxes...
# Candidate pruning: only evaluate boxes near this component.
left_search = max(0, comp_x - comp_w)
right_search = comp_r + comp_w
start_idx = bisect_left(box_starts, left_search)
idx = start_idx
while idx < len(box_meta) and box_meta[idx][1] <= right_search:
i, box_x, box_r, box_center_x, box_w = box_meta[idx]
idx += 1
if comp_w > box_w * 1.5:
continue
if comp_x < box_r and box_x < comp_r:
overlap_start = max(comp_x, box_x)
overlap_end = min(comp_r, box_r)
overlap = overlap_end - overlap_start
if overlap > 0:
center_in_box = box_x <= comp_center_x < box_r
center_distance = abs(comp_center_x - box_center_x)
if center_in_box:
if (
not component_center_in_box
or overlap > max_overlap
):
component_center_in_box = True
best_center_distance = center_distance
max_overlap = overlap
best_box_idx = i
elif not component_center_in_box:
if center_distance < best_center_distance or (
center_distance == best_center_distance
and overlap > max_overlap
):
best_center_distance = center_distance
max_overlap = overlap
best_box_idx = i
if best_box_idx is not None:
component_assignments[j] = best_box_idx
refined_boxes_list = []
for i in range(num_proc):
word_label = words[i]
components_in_box = [
stats[j] for j, b in component_assignments.items() if b == i
]
use_original_box = False
if not components_in_box:
use_original_box = True
else:
min_x = min(c[cv2.CC_STAT_LEFT] for c in components_in_box)
min_y = min(c[cv2.CC_STAT_TOP] for c in components_in_box)
max_r = max(
c[cv2.CC_STAT_LEFT] + c[cv2.CC_STAT_WIDTH]
for c in components_in_box
)
max_b = max(
c[cv2.CC_STAT_TOP] + c[cv2.CC_STAT_HEIGHT]
for c in components_in_box
)
cca_h = max(1, max_b - min_y)
if cca_h < (estimated_char_height * 0.35):
use_original_box = True
if use_original_box:
box_x, box_y, box_w, box_h = unlabeled_boxes[i]
adjusted_box_y = y_start + box_y
refined_boxes_list.append(
{
"text": word_label,
"left": box_x,
"top": adjusted_box_y,
"width": box_w,
"height": box_h,
"conf": line_data["conf"][0],
}
)
else:
refined_boxes_list.append(
{
"text": word_label,
"left": min_x,
"top": min_y,
"width": max(1, max_r - min_x),
"height": cca_h,
"conf": line_data["conf"][0],
}
)
# Check validity
cca_check_list = [
(b["left"], b["top"], b["width"], b["height"])
for b in refined_boxes_list
]
if not self._is_geometry_valid(
cca_check_list, words, estimated_char_height
):
if abs(len(refined_boxes_list) - len(words)) > 1:
best_boxes = None # Trigger fallback
else:
final_output = {
k: []
for k in ["text", "left", "top", "width", "height", "conf"]
}
for box in refined_boxes_list:
for key in final_output.keys():
final_output[key].append(box[key])
else:
final_output = {
k: []
for k in ["text", "left", "top", "width", "height", "conf"]
}
for box in refined_boxes_list:
for key in final_output.keys():
final_output[key].append(box[key])
# --- REPEAT FALLBACK IF VALIDATION FAILED ---
if best_boxes is None and not used_fallback:
used_fallback = True
# [FIX] Use local_line_data here too
final_output = self.fallback_segmenter.refine_words_bidirectional(
local_line_data, deskewed_line_image
)
# ========================================================================
# COORDINATE TRANSFORMATION (Map back to Original)
# ========================================================================
M_inv = cv2.invertAffineTransform(M)
remapped_boxes_list = []
for i in range(len(final_output["text"])):
left, top = final_output["left"][i], final_output["top"][i]
width, height = final_output["width"][i], final_output["height"][i]
# Map the 4 corners
corners = np.array(
[
[left, top],
[left + width, top],
[left + width, top + height],
[left, top + height],
],
dtype="float32",
)
corners_expanded = np.expand_dims(corners, axis=1)
original_corners = cv2.transform(corners_expanded, M_inv)
squeezed_corners = original_corners.squeeze(axis=1)
# Get axis aligned bounding box in original space
min_x = int(np.min(squeezed_corners[:, 0]))
max_x = int(np.max(squeezed_corners[:, 0]))
min_y = int(np.min(squeezed_corners[:, 1]))
max_y = int(np.max(squeezed_corners[:, 1]))
remapped_boxes_list.append(
{
"text": final_output["text"][i],
"left": min_x,
"top": min_y,
"width": max_x - min_x,
"height": max_y - min_y,
"conf": final_output["conf"][i],
}
)
remapped_output = {k: [] for k in final_output.keys()}
for box in remapped_boxes_list:
for key in remapped_output.keys():
remapped_output[key].append(box[key])
img_h, img_w = line_image.shape[:2]
remapped_output = self._enforce_logical_constraints(
remapped_output, img_w, img_h
)
# ========================================================================
# FINAL SAFETY NET
# ========================================================================
words = line_data["text"][0].split()
target_count = len(words)
current_count = len(remapped_output["text"])
has_collapsed_boxes = any(w < 3 for w in remapped_output["width"])
if current_count > 0:
total_text_len = sum(len(t) for t in remapped_output["text"])
total_box_width = sum(remapped_output["width"])
avg_width_pixels = total_box_width / max(1, total_text_len)
else:
avg_width_pixels = 0
is_suspiciously_thin = avg_width_pixels < 4
if current_count != target_count or is_suspiciously_thin or has_collapsed_boxes:
used_fallback = True
# [FIX] Do NOT use original line_image/line_data here.
# Use the local_line_data + deskewed_line_image pipeline,
# then transform back using M_inv (same as above).
# 1. Run fallback on rotated data
temp_local_output = self.fallback_segmenter.refine_words_bidirectional(
local_line_data, deskewed_line_image
)
# 2. If bidirectional failed to split correctly, use purely mathematical split on rotated data
if len(temp_local_output["text"]) != target_count:
h, w = deskewed_line_image.shape[:2]
temp_local_output = self.fallback_segmenter.convert_line_to_word_level(
local_line_data, w, h
)
# 3. Transform the result back to original coordinates (M_inv)
# (Repeating the transformation logic for the safety net result)
remapped_boxes_list = []
for i in range(len(temp_local_output["text"])):
left, top = temp_local_output["left"][i], temp_local_output["top"][i]
width, height = (
temp_local_output["width"][i],
temp_local_output["height"][i],
)
corners = np.array(
[
[left, top],
[left + width, top],
[left + width, top + height],
[left, top + height],
],
dtype="float32",
)
corners_expanded = np.expand_dims(corners, axis=1)
original_corners = cv2.transform(corners_expanded, M_inv)
squeezed_corners = original_corners.squeeze(axis=1)
min_x = int(np.min(squeezed_corners[:, 0]))
max_x = int(np.max(squeezed_corners[:, 0]))
min_y = int(np.min(squeezed_corners[:, 1]))
max_y = int(np.max(squeezed_corners[:, 1]))
remapped_boxes_list.append(
{
"text": temp_local_output["text"][i],
"left": min_x,
"top": min_y,
"width": max_x - min_x,
"height": max_y - min_y,
"conf": temp_local_output["conf"][i],
}
)
remapped_output = {k: [] for k in temp_local_output.keys()}
for box in remapped_boxes_list:
for key in remapped_output.keys():
remapped_output[key].append(box[key])
if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES:
output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_shortened_line_text}_final_boxes.png"
os.makedirs(f"{self.output_folder}/word_segmentation", exist_ok=True)
output_image_vis = line_image.copy()
for i in range(len(remapped_output["text"])):
x, y, w, h = (
int(remapped_output["left"][i]),
int(remapped_output["top"][i]),
int(remapped_output["width"][i]),
int(remapped_output["height"][i]),
)
cv2.rectangle(output_image_vis, (x, y), (x + w, y + h), (0, 255, 0), 2)
cv2.imwrite(output_path, output_image_vis)
return remapped_output, used_fallback
class HybridWordSegmenter:
"""
Implements a two-step approach for word segmentation:
1. Proportional estimation based on text (primary; avoids image noise).
2. Image-based refinement with a "Bounded Scan" that cannot shrink boxes
beyond a fraction of the text-based width.
Design: Relies more on expected character spacing from the text than on
image analysis, so noisy images are less likely to produce tiny or
missing boxes.
Situations that could otherwise cause very small boxes (and how we mitigate):
- False gaps in the vertical projection (noise/speckle) -> refinement is
bounded by shrink_limit_fraction; initial boxes use proportional only.
- Image-based "justified" gap anchoring picking wrong cuts -> we do not
use vertical_projection for initial segmentation here; only proportional.
- Bidirectional scan snapping to a thin low-density strip inside a word ->
same shrink bound; fallback "thinnest point" also clamped.
- De-overlapping stealing space from the next word -> shrink bound keeps
each box at least (1 - shrink_limit_fraction) of initial width.
ROBUSTNESS UPGRADES:
- Uses Horizontal Smearing to prevent cutting inside noisy characters.
- Uses Gaussian Blur to suppress speckle noise.
- Implements 'Noise Floors' for gap detection (never assumes perfect 0).
"""
def convert_line_to_word_level(
self,
line_data: Dict[str, List],
image_width: int,
image_height: int,
vertical_projection: np.ndarray = None,
) -> Dict[str, List]:
"""
Step 1: Converts line-level OCR results to word-level using proportional estimation.
Includes noise-tolerant gap anchoring for justified text.
"""
output = {
"text": list(),
"left": list(),
"top": list(),
"width": list(),
"height": list(),
"conf": list(),
}
if not line_data or not line_data.get("text"):
return output
i = 0
line_text = line_data["text"][i]
line_left = float(line_data["left"][i])
line_top = float(line_data["top"][i])
line_width = float(line_data["width"][i])
line_height = float(line_data["height"][i])
line_conf = line_data["conf"][i]
if not line_text.strip():
return output
words = line_text.split()
if not words:
return output
num_chars = len("".join(words))
num_spaces = len(words) - 1
if num_chars == 0:
return output
# --- Justified text: anchor cut points to widest zero-gaps in projection ---
if (
vertical_projection is not None
and len(vertical_projection) == image_width
and num_spaces > 0
):
# ROBUSTNESS: Allow significantly more noise in gaps for justified text detection.
# Allow up to 3% of the column height to be noise and still count as a "gap".
dynamic_gap_threshold = max(255.0 * 0.03 * image_height, 255.0 * 2)
gaps = _find_widest_zero_gaps(
vertical_projection, n=num_spaces, gap_threshold=dynamic_gap_threshold
)
if len(gaps) == num_spaces:
cuts = [0]
for start, end in gaps:
cuts.append((start + end) // 2)
cuts.append(image_width)
for idx, word in enumerate(words):
left_px = cuts[idx]
right_px = cuts[idx + 1]
width_px = max(1, right_px - left_px)
output["text"].append(word)
output["left"].append(line_left + left_px)
output["top"].append(line_top)
output["width"].append(width_px)
output["height"].append(line_height)
output["conf"].append(line_conf)
return output
# --- Proportional estimation ---
total_line_weight = get_weighted_length(line_text)
if total_line_weight <= 0:
total_line_weight = 1.0
avg_weight_unit = line_width / total_line_weight
estimated_space_width = get_weighted_length(" ") * avg_weight_unit
avg_char_width = line_width / (num_chars if num_chars > 0 else 1)
avg_char_width = max(3.0, avg_char_width)
min_word_width = max(5.0, avg_char_width * 0.5)
current_left = line_left
for word in words:
word_weight = get_weighted_length(word)
raw_word_width = word_weight * avg_weight_unit
word_width = max(min_word_width, raw_word_width)
clamped_left = max(0, min(current_left, image_width))
output["text"].append(word)
output["left"].append(clamped_left)
output["top"].append(line_top)
output["width"].append(word_width)
output["height"].append(line_height)
output["conf"].append(line_conf)
current_left += word_width + estimated_space_width
return output
def _run_single_pass(
self,
initial_boxes: List[Dict],
vertical_projection: np.ndarray,
max_scan_distance: int,
img_w: int,
img_h: int,
direction: str = "ltr",
trailing_punctuation: List[bool] = None,
shrink_limit_fraction: float = 0.5,
) -> List[Dict]:
"""
Helper function to run one pass of refinement.
ROBUSTNESS UPGRADE:
- Uses a 'gap_noise_floor' instead of looking for 0.
- Enforces 'safety_density_limit': if the "thinnest" point is still thick (ink),
it refuses to cut there (prevents cutting bold letters).
- shrink_limit_fraction: Refinement cannot shrink a box by more than this fraction
of its initial (text-based) width from either edge. Prevents noise from creating
tiny boxes; keeps segmentation anchored to expected character spacing.
"""
refined_boxes = [box.copy() for box in initial_boxes]
if trailing_punctuation is None:
trailing_punctuation = [False] * len(initial_boxes)
# ROBUSTNESS: Define what constitutes a "gap" vs "ink"
# 1. Gap Floor: Anything below 5% of image height is treated as empty space (noise tolerance)
gap_noise_floor = 255.0 * (img_h * 0.05)
# 2. Ink Safety Limit: If the "thinnest" point has > 25% ink density, it is NOT a gap.
# It's a character. Do not cut.
safety_density_limit = 255.0 * (img_h * 0.25)
if direction == "ltr":
last_corrected_right_edge = 0
indices = range(len(refined_boxes))
else: # rtl
next_corrected_left_edge = img_w
indices = range(len(refined_boxes) - 1, -1, -1)
for i in indices:
box = refined_boxes[i]
left = int(box["left"])
right = int(box["left"] + box["width"])
init_width = max(1, int(box["width"]))
# Bounds from initial (text-based) box: don't let image refinement shrink too much
min_right = right - int(shrink_limit_fraction * init_width)
max_left = left + int(shrink_limit_fraction * init_width)
left = max(0, min(left, img_w - 1))
right = max(0, min(right, img_w - 1))
new_left, new_right = left, right
if direction == "ltr" or direction == "both": # Scan right
if right < img_w:
scan_limit = min(img_w, right + max_scan_distance)
search_range = range(right, scan_limit)
best_x = right
min_density = float("inf")
found_gap = False
first_gap_x = None
for x in search_range:
density = vertical_projection[x]
# Check for Gap
if density <= gap_noise_floor:
first_gap_x = x
found_gap = True
break
# Track minimum density for fallback
if density < min_density:
min_density = density
best_x = x
if found_gap and first_gap_x is not None:
if trailing_punctuation[i]:
# Logic to jump over the gap and include the punctuation blob
# ... (same safety limits as before) ...
proj_len = len(vertical_projection)
x_pos = first_gap_x
# 1. Cross the gap
gap_safety_limit = x_pos + (max_scan_distance // 2)
while (
x_pos < scan_limit
and x_pos < proj_len
and vertical_projection[x_pos] <= gap_noise_floor
):
if x_pos >= gap_safety_limit:
break
x_pos += 1
# 2. Consume blob
blob_start = x_pos
blob_safety_limit = blob_start + max(1, int(img_h * 0.5))
while (
x_pos < scan_limit
and x_pos < proj_len
and vertical_projection[x_pos] > gap_noise_floor
):
if x_pos >= blob_safety_limit:
x_pos = first_gap_x # Revert
break
x_pos += 1
new_right = min(x_pos, scan_limit)
else:
new_right = first_gap_x
elif not found_gap:
# Fallback: No clear gap found.
# ROBUSTNESS CHECK: Is the "thinnest" point actually thin?
if min_density < safety_density_limit:
new_right = best_x
else:
# The thinnest point is still very dark (ink).
# Don't cut through a letter. Keep original guess or limit.
new_right = right
if direction == "rtl" or direction == "both": # Scan left
if left > 0:
scan_limit = max(0, left - max_scan_distance)
search_range = range(left, scan_limit, -1)
best_x = left
min_density = float("inf")
found_gap = False
for x in search_range:
density = vertical_projection[x]
if density <= gap_noise_floor:
new_left = x
found_gap = True
break
if density < min_density:
min_density = density
best_x = x
if not found_gap:
# ROBUSTNESS CHECK
if min_density < safety_density_limit:
new_left = best_x
else:
# Refuse to cut through dense ink
new_left = left
# --- Anchor to text: don't shrink past allowed fraction of initial width ---
new_right = max(new_right, min_right)
new_left = min(new_left, max_left)
# --- Directional de-overlapping ---
if direction == "ltr":
if new_left < last_corrected_right_edge:
new_left = last_corrected_right_edge
if new_right <= new_left:
new_right = new_left + 1
last_corrected_right_edge = new_right
else: # rtl
if new_right > next_corrected_left_edge:
new_right = next_corrected_left_edge
if new_left >= new_right:
new_left = new_right - 1
next_corrected_left_edge = new_left
box["left"] = new_left
box["width"] = max(1, new_right - new_left)
return refined_boxes
def refine_words_bidirectional(
self,
line_data: Dict[str, List],
line_image: np.ndarray,
) -> Dict[str, List]:
"""
Refines boxes using a robust bidirectional scan.
DIFFERENCE FROM MAIN SEGMENTER: Uses aggressive smoothing and horizontal
smearing to force-merge characters, prioritizing word separation over
character detail.
"""
if line_image is None:
return line_data
# Handle grayscale (2D) or BGR (3D) line images
if len(line_image.shape) == 2:
gray = np.ascontiguousarray(line_image)
else:
gray = cv2.cvtColor(line_image, cv2.COLOR_BGR2GRAY)
img_h, img_w = gray.shape[:2]
# OpenCV GaussianBlur(5,5) and later adaptiveThreshold need minimum dimensions.
# Avoid "Unknown C++ exception" on very small line crops (e.g. 1–4 px).
if img_h < 5 or img_w < 5:
return self.convert_line_to_word_level(line_data, img_w, img_h)
if line_data and line_data.get("text"):
words = line_data["text"][0].split()
if len(words) <= 1:
return self.convert_line_to_word_level(line_data, img_w, img_h)
# --- PRE-PROCESSING: The "Bulldozer" Approach ---
# 1. Gaussian Blur: Suppress high-frequency speckle noise that confuses the main segmenter
# We accept slight edge blurring for the sake of noise reduction.
# OpenCV can intermittently throw low-information C++ exceptions on some
# page crops (often due to dtype/range/nan/inf issues). If that happens,
# fall back to the non-image-based word conversion to keep OCR flowing.
try:
# Guard against NaN/Inf propagating into OpenCV internals.
if gray.dtype.kind in ("f", "c"):
gray = np.nan_to_num(gray, nan=0.0, posinf=255.0, neginf=0.0)
# GaussianBlur is most stable on uint8 or float32. If we have another
# dtype (e.g. int16/float64/object), normalize and cast.
if gray.dtype != np.uint8 and gray.dtype != np.float32:
# Normalize to 0..255 if range looks unusual.
gmin = float(np.min(gray)) if gray.size else 0.0
gmax = float(np.max(gray)) if gray.size else 255.0
if gmax > 255.0 or gmin < 0.0:
gray = cv2.normalize(gray, None, 0, 255, cv2.NORM_MINMAX)
gray = np.clip(gray, 0, 255).astype(np.uint8)
blurred_gray = cv2.GaussianBlur(gray, (5, 5), 0)
except Exception:
return self.convert_line_to_word_level(line_data, img_w, img_h)
# 2. Aggressive Thresholding
# We use a larger block size here to be less sensitive to local texture variations
block_size = max(25, int(img_h * 0.5))
if block_size % 2 == 0:
block_size += 1
binary = cv2.adaptiveThreshold(
blurred_gray,
255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV,
block_size,
10,
)
# 3. Horizontal Smearing (The critical difference)
# We intentionally smear mostly horizontally to bridge gaps inside noisy letters.
# Kernel width: ~15-20% of line height.
smear_w = max(3, int(img_h * 0.20))
smear_h = max(1, int(img_h * 0.05))
kernel_smear = cv2.getStructuringElement(cv2.MORPH_RECT, (smear_w, smear_h))
# Apply Morphological Closing
binary_smeared = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel_smear)
# Calculate projection on the SMEARED image
vertical_projection = np.sum(binary_smeared, axis=0)
# --- Setup for Scan ---
# Detect blobs to estimate character width for scan limiting
char_blobs = []
in_blob = False
blob_start = 0
for x, col_sum in enumerate(vertical_projection):
if col_sum > 0 and not in_blob:
blob_start = x
in_blob = True
elif col_sum == 0 and in_blob:
char_blobs.append((blob_start, x))
in_blob = False
if in_blob:
char_blobs.append((blob_start, img_w))
if not char_blobs:
return self.convert_line_to_word_level(line_data, img_w, img_h)
total_chars = len("".join(words))
if total_chars > 0:
geom_avg_char_width = img_w / total_chars
else:
geom_avg_char_width = 10
blob_avg_char_width = np.mean([end - start for start, end in char_blobs])
safe_avg_char_width = min(blob_avg_char_width, geom_avg_char_width * 1.5)
# Scan distance parameters
max_scan_distance = max(int(safe_avg_char_width * 2.5), int(img_h * 0.6))
min_safe_box_width = max(4, int(safe_avg_char_width * 0.5))
# --- Standard Logic Continues ---
# Use proportional estimation only (no vertical_projection) so initial boxes
# are driven by text/character spacing. Image-based gap anchoring on noisy
# images can produce tiny slices; refinement will still run but is bounded.
estimated_data = self.convert_line_to_word_level(line_data, img_w, img_h)
if not estimated_data["text"]:
return estimated_data
initial_boxes = []
for i in range(len(estimated_data["text"])):
initial_boxes.append(
{
"text": estimated_data["text"][i],
"left": estimated_data["left"][i],
"top": estimated_data["top"][i],
"width": estimated_data["width"][i],
"height": estimated_data["height"][i],
"conf": estimated_data["conf"][i],
}
)
trailing_punctuation = [
_word_ends_with_punctuation(estimated_data["text"][j])
for j in range(len(estimated_data["text"]))
]
# Run passes (ensure _run_single_pass uses the robust gap logic)
ltr_boxes = self._run_single_pass(
initial_boxes,
vertical_projection,
max_scan_distance,
img_w,
img_h,
"ltr",
trailing_punctuation,
)
rtl_boxes = self._run_single_pass(
initial_boxes,
vertical_projection,
max_scan_distance,
img_w,
img_h,
"rtl",
trailing_punctuation,
)
# [Re-use stitching logic from previous code...]
combined_boxes = [box.copy() for box in initial_boxes]
for i in range(len(combined_boxes)):
final_left = ltr_boxes[i]["left"]
rtl_right = rtl_boxes[i]["left"] + rtl_boxes[i]["width"]
combined_boxes[i]["left"] = final_left
combined_boxes[i]["width"] = max(min_safe_box_width, rtl_right - final_left)
for i in range(len(combined_boxes) - 1):
if combined_boxes[i + 1]["left"] <= combined_boxes[i]["left"]:
combined_boxes[i + 1]["left"] = (
combined_boxes[i]["left"] + min_safe_box_width
)
for i in range(len(combined_boxes) - 1):
curr = combined_boxes[i]
nxt = combined_boxes[i + 1]
gap_width = nxt["left"] - curr["left"]
curr["width"] = max(min_safe_box_width, gap_width)
final_output = {k: [] for k in estimated_data.keys()}
for box in combined_boxes:
# Always keep one box per word; enforce minimum width 1 for valid geometry
box_width = max(1, box["width"])
box["width"] = box_width
for key in final_output.keys():
final_output[key].append(box[key])
return final_output