"""Utilitaires de manipulation de corpus côté web. Détection ALTO/PAGE, extraction de texte GT, analyse de la structure d'un dossier corpus, extraction de ZIP avec garde-fous (taille décompressée, nombre de fichiers, validation image extraite, détection de collision de basename). Le parsing XML sécurisé délègue à :func:`picarones.formats._xml_utils.safe_parse_xml`. """ from __future__ import annotations import logging import xml.etree.ElementTree as ET import zipfile from pathlib import Path from picarones.formats._xml_utils import safe_parse_xml from picarones.interfaces.web.state import IMAGE_EXTS logger = logging.getLogger(__name__) # Garde-fous ZIP-bomb pour l'upload MAX_ZIP_TOTAL_SIZE = 500 * 1024 * 1024 """500 Mo décompressé maximum.""" MAX_ZIP_FILES = 2000 """Nombre maximum de fichiers extraits.""" # ────────────────────────────────────────────────────────────────────────── # Détection ALTO / PAGE depuis bytes XML # ────────────────────────────────────────────────────────────────────────── def detect_xml_gt(xml_bytes: bytes) -> tuple[str, str] | None: """Détecte si ``xml_bytes`` est un fichier ALTO ou PAGE XML. Retourne ``(format_label, texte_gt)`` ou ``None`` si le format n'est pas reconnu. """ root = safe_parse_xml(xml_bytes) if root is None: return None tag = root.tag # ALTO XML : namespace contient loc.gov/standards/alto ou balise racine "alto" ns_alto = "http://www.loc.gov/standards/alto" is_alto = ( ns_alto in tag or tag.lower() == "alto" or (tag.startswith("{") and tag.split("}")[1].lower() in ("alto",)) ) if is_alto: return ("ALTO XML", extract_alto_text(root)) # PAGE XML : balise racine PcGts (avec ou sans namespace) local = tag.split("}")[-1] if "}" in tag else tag if local == "PcGts": return ("PAGE XML", extract_page_text(root)) return None def extract_alto_text(root: ET.Element) -> str: """Extrait le texte plein d'un arbre ALTO XML. Concatène les attributs ``CONTENT`` des balises ```` dans l'ordre de lecture (bloc → ligne → mot), avec un espace entre mots et un saut de ligne entre lignes. """ lines: list[str] = [] for elem in root.iter(): local = elem.tag.split("}")[-1] if "}" in elem.tag else elem.tag if local == "TextLine": words: list[str] = [] for child in elem.iter(): child_local = child.tag.split("}")[-1] if "}" in child.tag else child.tag if child_local == "String": content = child.get("CONTENT", "") if content: words.append(content) if words: lines.append(" ".join(words)) return "\n".join(lines) def extract_page_text(root: ET.Element) -> str: """Extrait le texte plein d'un arbre PAGE XML. Concatène le contenu des balises ```` dans l'ordre de lecture. """ texts: list[str] = [] for elem in root.iter(): local = elem.tag.split("}")[-1] if "}" in elem.tag else elem.tag if local == "Unicode" and elem.text: texts.append(elem.text.strip()) return "\n".join(t for t in texts if t) # ────────────────────────────────────────────────────────────────────────── # Analyse d'un dossier corpus # ────────────────────────────────────────────────────────────────────────── def analyze_corpus_dir(path: Path) -> dict: """Analyse un dossier et retourne un résumé des paires image/GT détectées. - Détecte les paires ``image.{jpg,png,...}`` + ``image.gt.txt``. - Détecte les paires ``image.{jpg,...}`` + ``image.xml`` (ALTO ou PAGE) et matérialise le ``image.gt.txt`` correspondant pour le chargeur de corpus. - Identifie le format dominant et le nombre de fichiers ``image.ocr.txt`` (corpus triplets pré-OCRisés). """ # Exclure les fichiers cachés macOS (._* AppleDouble) et tout fichier # débutant par un point. images = sorted( f.name for f in path.iterdir() if f.suffix.lower() in IMAGE_EXTS and not f.name.startswith(".") ) pairs: list[dict] = [] missing_gt: list[str] = [] for img in images: stem = Path(img).stem gt_txt = path / (stem + ".gt.txt") gt_xml = path / (stem + ".xml") if gt_txt.exists(): pairs.append({"image": img, "gt": stem + ".gt.txt", "gt_format": "texte brut"}) elif gt_xml.exists(): result = detect_xml_gt(gt_xml.read_bytes()) if result is not None: fmt, text = result gt_txt.write_text(text, encoding="utf-8") pairs.append({"image": img, "gt": stem + ".gt.txt", "gt_format": fmt}) else: missing_gt.append(img) else: missing_gt.append(img) # Format dominant formats = {p["gt_format"] for p in pairs} if len(formats) == 1: dominant_format: str = formats.pop() elif formats: dominant_format = "mixte" else: dominant_format = "texte brut" ocr_text_count = sum( 1 for p in pairs if (path / (Path(p["image"]).stem + ".ocr.txt")).exists() ) return { "doc_count": len(pairs), "pairs": pairs[:20], "total_pairs": len(pairs), "missing_gt": missing_gt[:10], "has_missing_gt": len(missing_gt) > 0, "warnings": [f"GT manquant : {img}" for img in missing_gt[:5]], "usable": len(pairs) > 0, "gt_format": dominant_format, "has_ocr_text": ocr_text_count > 0, "ocr_text_count": ocr_text_count, } # ────────────────────────────────────────────────────────────────────────── # Extraction ZIP sécurisée # ────────────────────────────────────────────────────────────────────────── def _slug_dirname(source_path: Path) -> str: """Slugifie le ``dirname`` d'une entrée ZIP pour préfixer en cas de collision. ``a/b/img.png`` → ``a_b``. Caractères non sûrs (``..``, séparateurs) sont normalisés en ``_``. Vide si l'entrée est à la racine du ZIP. """ parent = source_path.parent if parent == Path() or str(parent) == ".": return "" parts = [ part.replace("..", "_").replace("/", "_").replace("\\", "_") for part in parent.parts if part not in ("", "/", "\\") ] return "_".join(p for p in parts if p) def _resolve_collision( name: str, source_path: Path, taken: set[str], ) -> str: """Renomme ``name`` pour éviter une collision avec ``taken``. Stratégie : 1. Préfixe avec le slug du dirname source (traçabilité). Si pas de dirname ou si déjà pris, ajoute un suffixe numérique. 2. Lève ``ValueError`` après 1000 tentatives (corpus pathologique). """ slug = _slug_dirname(source_path) if slug: candidate = f"{slug}__{name}" if candidate not in taken: return candidate stem = Path(name).stem suffix = "".join(Path(name).suffixes) for n in range(2, 1001): candidate = f"{stem}_{n}{suffix}" if candidate not in taken: return candidate raise ValueError( f"Impossible de résoudre la collision de basename pour {name!r} " f"après 1000 tentatives — corpus pathologique ?", ) def flatten_zip_to_dir( zf: zipfile.ZipFile, dest: Path, *, validate_images: bool = True, ) -> None: """Extrait un ZIP en aplatissant les paires image/.gt.txt/.xml dans ``dest``. Garde-fous : - Ignore les fichiers cachés macOS (préfixe ``.`` ou ``__MACOSX``). - Refuse si la taille décompressée totale dépasse ``MAX_ZIP_TOTAL_SIZE``. - Refuse si le nombre de fichiers extraits dépasse ``MAX_ZIP_FILES``. - **Détection de collision de basename** : ``a/img.png`` et ``b/img.png`` ne s'écrasent plus silencieusement — le second est renommé avec un préfixe dérivé de son dossier source (ex. ``b__img.png``) et un warning est loggué. Sans ce garde-fou, l'utilisateur pouvait associer silencieusement une image à une GT incorrecte. - **Validation image** : chaque image extraite passe par :func:`validate_image_safe` (Pillow.verify, anti-bombe), de la même manière que les uploads directs. Désactivable via ``validate_images=False`` (utile aux tests qui ne fournissent pas de PNG complets). """ # Import retardé : ``security`` dépend de ``state`` qui dépend de # ``corpus_utils`` → circulaire si import toplevel. from picarones.interfaces.web.security import validate_image_safe dest.mkdir(parents=True, exist_ok=True) total_size = 0 file_count = 0 written_names: set[str] = set() for member in zf.infolist(): if member.is_dir(): continue p = Path(member.filename) name = p.name if name.startswith("."): continue suffix_lower = p.suffix.lower() is_image = suffix_lower in IMAGE_EXTS if not ( is_image or name.endswith(".gt.txt") or name.endswith(".ocr.txt") or suffix_lower == ".xml" ): continue total_size += member.file_size if total_size > MAX_ZIP_TOTAL_SIZE: raise ValueError( f"ZIP trop volumineux : taille décompressée > " f"{MAX_ZIP_TOTAL_SIZE // (1024*1024)} Mo" ) file_count += 1 if file_count > MAX_ZIP_FILES: raise ValueError(f"ZIP contient trop de fichiers (> {MAX_ZIP_FILES})") data = zf.read(member.filename) # Validation image après extraction (les images directes sont # déjà validées par ``api_corpus_upload``, mais celles extraites # d'un ZIP ne passaient pas par cette vérification — vecteur de # zip bomb passant les 500 Mo brut). if is_image and validate_images: validate_image_safe(data, filename=name) # Détection de collision : ``a/img.png`` et ``b/img.png`` ne # doivent pas s'écraser silencieusement (vecteur de # mauvaise association image/GT après aplatissement). if name in written_names: new_name = _resolve_collision(name, p, written_names) logger.warning( "[flatten_zip] collision de basename %r — renommé en %r " "(source ZIP : %r)", name, new_name, member.filename, ) name = new_name written_names.add(name) (dest / name).write_bytes(data) __all__ = [ "MAX_ZIP_TOTAL_SIZE", "MAX_ZIP_FILES", "safe_parse_xml", "detect_xml_gt", "extract_alto_text", "extract_page_text", "analyze_corpus_dir", "flatten_zip_to_dir", ]