Picarones / picarones /interfaces /web /corpus_utils.py
Claude
post-rewrite wiring audit: Phases 1-5 (sécurité, méthodologie, moteurs, zombie, naming)
5e48c0b unverified
Raw
History Blame
11.8 kB
"""Utilitaires de manipulation de corpus côté web.
Détection ALTO/PAGE, extraction de texte GT, analyse de la structure
d'un dossier corpus, extraction de ZIP avec garde-fous (taille
décompressée, nombre de fichiers, validation image extraite,
détection de collision de basename). Le parsing XML sécurisé délègue
à :func:`picarones.formats._xml_utils.safe_parse_xml`.
"""
from __future__ import annotations
import logging
import xml.etree.ElementTree as ET
import zipfile
from pathlib import Path
from picarones.formats._xml_utils import safe_parse_xml
from picarones.interfaces.web.state import IMAGE_EXTS
logger = logging.getLogger(__name__)
# Garde-fous ZIP-bomb pour l'upload
MAX_ZIP_TOTAL_SIZE = 500 * 1024 * 1024
"""500 Mo décompressé maximum."""
MAX_ZIP_FILES = 2000
"""Nombre maximum de fichiers extraits."""
# ──────────────────────────────────────────────────────────────────────────
# Détection ALTO / PAGE depuis bytes XML
# ──────────────────────────────────────────────────────────────────────────
def detect_xml_gt(xml_bytes: bytes) -> tuple[str, str] | None:
"""Détecte si ``xml_bytes`` est un fichier ALTO ou PAGE XML.
Retourne ``(format_label, texte_gt)`` ou ``None`` si le format
n'est pas reconnu.
"""
root = safe_parse_xml(xml_bytes)
if root is None:
return None
tag = root.tag
# ALTO XML : namespace contient loc.gov/standards/alto ou balise racine "alto"
ns_alto = "http://www.loc.gov/standards/alto"
is_alto = (
ns_alto in tag
or tag.lower() == "alto"
or (tag.startswith("{") and tag.split("}")[1].lower() in ("alto",))
)
if is_alto:
return ("ALTO XML", extract_alto_text(root))
# PAGE XML : balise racine PcGts (avec ou sans namespace)
local = tag.split("}")[-1] if "}" in tag else tag
if local == "PcGts":
return ("PAGE XML", extract_page_text(root))
return None
def extract_alto_text(root: ET.Element) -> str:
"""Extrait le texte plein d'un arbre ALTO XML.
Concatène les attributs ``CONTENT`` des balises ``<String>`` dans
l'ordre de lecture (bloc → ligne → mot), avec un espace entre mots
et un saut de ligne entre lignes.
"""
lines: list[str] = []
for elem in root.iter():
local = elem.tag.split("}")[-1] if "}" in elem.tag else elem.tag
if local == "TextLine":
words: list[str] = []
for child in elem.iter():
child_local = child.tag.split("}")[-1] if "}" in child.tag else child.tag
if child_local == "String":
content = child.get("CONTENT", "")
if content:
words.append(content)
if words:
lines.append(" ".join(words))
return "\n".join(lines)
def extract_page_text(root: ET.Element) -> str:
"""Extrait le texte plein d'un arbre PAGE XML.
Concatène le contenu des balises ``<Unicode>`` dans l'ordre de
lecture.
"""
texts: list[str] = []
for elem in root.iter():
local = elem.tag.split("}")[-1] if "}" in elem.tag else elem.tag
if local == "Unicode" and elem.text:
texts.append(elem.text.strip())
return "\n".join(t for t in texts if t)
# ──────────────────────────────────────────────────────────────────────────
# Analyse d'un dossier corpus
# ──────────────────────────────────────────────────────────────────────────
def analyze_corpus_dir(path: Path) -> dict:
"""Analyse un dossier et retourne un résumé des paires image/GT détectées.
- Détecte les paires ``image.{jpg,png,...}`` + ``image.gt.txt``.
- Détecte les paires ``image.{jpg,...}`` + ``image.xml`` (ALTO ou
PAGE) et matérialise le ``image.gt.txt`` correspondant pour le
chargeur de corpus.
- Identifie le format dominant et le nombre de fichiers
``image.ocr.txt`` (corpus triplets pré-OCRisés).
"""
# Exclure les fichiers cachés macOS (._* AppleDouble) et tout fichier
# débutant par un point.
images = sorted(
f.name for f in path.iterdir()
if f.suffix.lower() in IMAGE_EXTS and not f.name.startswith(".")
)
pairs: list[dict] = []
missing_gt: list[str] = []
for img in images:
stem = Path(img).stem
gt_txt = path / (stem + ".gt.txt")
gt_xml = path / (stem + ".xml")
if gt_txt.exists():
pairs.append({"image": img, "gt": stem + ".gt.txt", "gt_format": "texte brut"})
elif gt_xml.exists():
result = detect_xml_gt(gt_xml.read_bytes())
if result is not None:
fmt, text = result
gt_txt.write_text(text, encoding="utf-8")
pairs.append({"image": img, "gt": stem + ".gt.txt", "gt_format": fmt})
else:
missing_gt.append(img)
else:
missing_gt.append(img)
# Format dominant
formats = {p["gt_format"] for p in pairs}
if len(formats) == 1:
dominant_format: str = formats.pop()
elif formats:
dominant_format = "mixte"
else:
dominant_format = "texte brut"
ocr_text_count = sum(
1 for p in pairs
if (path / (Path(p["image"]).stem + ".ocr.txt")).exists()
)
return {
"doc_count": len(pairs),
"pairs": pairs[:20],
"total_pairs": len(pairs),
"missing_gt": missing_gt[:10],
"has_missing_gt": len(missing_gt) > 0,
"warnings": [f"GT manquant : {img}" for img in missing_gt[:5]],
"usable": len(pairs) > 0,
"gt_format": dominant_format,
"has_ocr_text": ocr_text_count > 0,
"ocr_text_count": ocr_text_count,
}
# ──────────────────────────────────────────────────────────────────────────
# Extraction ZIP sécurisée
# ──────────────────────────────────────────────────────────────────────────
def _slug_dirname(source_path: Path) -> str:
"""Slugifie le ``dirname`` d'une entrée ZIP pour préfixer en cas de collision.
``a/b/img.png`` → ``a_b``. Caractères non sûrs (``..``, séparateurs)
sont normalisés en ``_``. Vide si l'entrée est à la racine du ZIP.
"""
parent = source_path.parent
if parent == Path() or str(parent) == ".":
return ""
parts = [
part.replace("..", "_").replace("/", "_").replace("\\", "_")
for part in parent.parts
if part not in ("", "/", "\\")
]
return "_".join(p for p in parts if p)
def _resolve_collision(
name: str, source_path: Path, taken: set[str],
) -> str:
"""Renomme ``name`` pour éviter une collision avec ``taken``.
Stratégie :
1. Préfixe avec le slug du dirname source (traçabilité). Si pas de
dirname ou si déjà pris, ajoute un suffixe numérique.
2. Lève ``ValueError`` après 1000 tentatives (corpus pathologique).
"""
slug = _slug_dirname(source_path)
if slug:
candidate = f"{slug}__{name}"
if candidate not in taken:
return candidate
stem = Path(name).stem
suffix = "".join(Path(name).suffixes)
for n in range(2, 1001):
candidate = f"{stem}_{n}{suffix}"
if candidate not in taken:
return candidate
raise ValueError(
f"Impossible de résoudre la collision de basename pour {name!r} "
f"après 1000 tentatives — corpus pathologique ?",
)
def flatten_zip_to_dir(
zf: zipfile.ZipFile,
dest: Path,
*,
validate_images: bool = True,
) -> None:
"""Extrait un ZIP en aplatissant les paires image/.gt.txt/.xml dans ``dest``.
Garde-fous :
- Ignore les fichiers cachés macOS (préfixe ``.`` ou ``__MACOSX``).
- Refuse si la taille décompressée totale dépasse ``MAX_ZIP_TOTAL_SIZE``.
- Refuse si le nombre de fichiers extraits dépasse ``MAX_ZIP_FILES``.
- **Détection de collision de basename** : ``a/img.png`` et
``b/img.png`` ne s'écrasent plus silencieusement — le second est
renommé avec un préfixe dérivé de son dossier source (ex.
``b__img.png``) et un warning est loggué. Sans ce garde-fou,
l'utilisateur pouvait associer silencieusement une image à une
GT incorrecte.
- **Validation image** : chaque image extraite passe par
:func:`validate_image_safe` (Pillow.verify, anti-bombe), de la
même manière que les uploads directs. Désactivable via
``validate_images=False`` (utile aux tests qui ne fournissent
pas de PNG complets).
"""
# Import retardé : ``security`` dépend de ``state`` qui dépend de
# ``corpus_utils`` → circulaire si import toplevel.
from picarones.interfaces.web.security import validate_image_safe
dest.mkdir(parents=True, exist_ok=True)
total_size = 0
file_count = 0
written_names: set[str] = set()
for member in zf.infolist():
if member.is_dir():
continue
p = Path(member.filename)
name = p.name
if name.startswith("."):
continue
suffix_lower = p.suffix.lower()
is_image = suffix_lower in IMAGE_EXTS
if not (
is_image
or name.endswith(".gt.txt")
or name.endswith(".ocr.txt")
or suffix_lower == ".xml"
):
continue
total_size += member.file_size
if total_size > MAX_ZIP_TOTAL_SIZE:
raise ValueError(
f"ZIP trop volumineux : taille décompressée > "
f"{MAX_ZIP_TOTAL_SIZE // (1024*1024)} Mo"
)
file_count += 1
if file_count > MAX_ZIP_FILES:
raise ValueError(f"ZIP contient trop de fichiers (> {MAX_ZIP_FILES})")
data = zf.read(member.filename)
# Validation image après extraction (les images directes sont
# déjà validées par ``api_corpus_upload``, mais celles extraites
# d'un ZIP ne passaient pas par cette vérification — vecteur de
# zip bomb passant les 500 Mo brut).
if is_image and validate_images:
validate_image_safe(data, filename=name)
# Détection de collision : ``a/img.png`` et ``b/img.png`` ne
# doivent pas s'écraser silencieusement (vecteur de
# mauvaise association image/GT après aplatissement).
if name in written_names:
new_name = _resolve_collision(name, p, written_names)
logger.warning(
"[flatten_zip] collision de basename %r — renommé en %r "
"(source ZIP : %r)",
name, new_name, member.filename,
)
name = new_name
written_names.add(name)
(dest / name).write_bytes(data)
__all__ = [
"MAX_ZIP_TOTAL_SIZE",
"MAX_ZIP_FILES",
"safe_parse_xml",
"detect_xml_gt",
"extract_alto_text",
"extract_page_text",
"analyze_corpus_dir",
"flatten_zip_to_dir",
]