"""Helpers de câblage des métriques philologiques (Sprints 55-60) au runner. Sprint 61 — câblage backend des 6 modules philologiques : - ``unicode_blocks`` (Sprint 55) - ``abbreviations`` (Sprint 56) - ``mufi`` (Sprint 57) - ``early_modern`` (Sprint 58) - ``modern_archives`` (Sprint 59) - ``roman_numerals`` (Sprint 60) Principe « adaptive » ---------------------- Un module n'est inclus dans le résultat que si la **GT contient du signal exploitable** pour ce module. Cette logique évite de polluer les rapports sur les corpus sans marqueurs philologiques (typique sur des données XXIᵉ ou des transcriptions modernes propres). Coût ---- Les 6 calculs sont O(N) sur la longueur du texte ; le surcoût total par document est négligeable face à un appel OCR. L'activation est donc **automatique** (pas d'opt-in), contrairement aux backends NER ou calibration qui exigent une dépendance externe ou des données spécifiques. """ from __future__ import annotations import logging from typing import Optional from picarones.measurements.abbreviations import compute_abbreviation_metrics from picarones.measurements.early_modern_typography import compute_early_modern_metrics from picarones.measurements.modern_archives import compute_modern_archives_metrics from picarones.measurements.mufi import compute_mufi_coverage from picarones.measurements.roman_numerals import compute_roman_numeral_metrics from picarones.measurements.unicode_blocks import compute_unicode_block_accuracy logger = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────────────────── # Critères « le module a-t-il du signal sur ce document ? » # ────────────────────────────────────────────────────────────────────────── # # Pour chaque module, on définit un prédicat sur le résultat : si vrai, # le module est inclus ; sinon, il est omis pour ne pas alourdir le # rapport. def _has_unicode_signal(result: dict) -> bool: # Le module retourne toujours du signal dès que GT non-vide ; on # n'inclut que si la GT a au moins un caractère **hors Basic # Latin** (sinon le breakdown se réduit à 100 % Basic Latin et # n'apporte rien au lecteur). per_block = result.get("per_block", {}) for block, stats in per_block.items(): if block == "Basic Latin": continue if stats.get("total", 0) > 0: return True return False def _has_abbreviation_signal(result: dict) -> bool: return result.get("n_abbreviations_in_reference", 0) > 0 def _has_mufi_signal(result: dict) -> bool: return result.get("n_mufi_chars_reference", 0) > 0 def _has_early_modern_signal(result: dict) -> bool: return result.get("n_markers_reference", 0) > 0 def _has_modern_archives_signal(result: dict) -> bool: return result.get("n_markers_reference", 0) > 0 def _has_roman_numeral_signal(result: dict) -> bool: return result.get("n_numerals_reference", 0) > 0 # Ordre fixé pour la reproductibilité des sorties. _PHILOLOGICAL_MODULES: tuple[ tuple[str, callable, callable], ... ] = ( ("unicode_blocks", compute_unicode_block_accuracy, _has_unicode_signal), ("abbreviations", compute_abbreviation_metrics, _has_abbreviation_signal), ("mufi", compute_mufi_coverage, _has_mufi_signal), ("early_modern", compute_early_modern_metrics, _has_early_modern_signal), ("modern_archives", compute_modern_archives_metrics, _has_modern_archives_signal), ("roman_numerals", compute_roman_numeral_metrics, _has_roman_numeral_signal), ) # ────────────────────────────────────────────────────────────────────────── # Calcul par document # ────────────────────────────────────────────────────────────────────────── def compute_philological_metrics( reference: Optional[str], hypothesis: Optional[str], ) -> Optional[dict]: """Calcule les 6 métriques philologiques pour un document. Retourne un dict avec une clé par module ayant du signal, ou ``None`` si aucun module n'en a (corpus sans marqueur philologique pertinent). En cas d'erreur dans un module individuel, le module est silencieusement omis et un warning est émis (les autres modules restent calculés). """ ref = reference or "" if not ref: return None out: dict = {} for name, compute_fn, has_signal_fn in _PHILOLOGICAL_MODULES: try: result = compute_fn(ref, hypothesis or "") except Exception as exc: # pragma: no cover — défense en profondeur logger.warning( "[philological_hooks] module %s a échoué : %s", name, exc, ) continue if has_signal_fn(result): out[name] = result return out if out else None # ────────────────────────────────────────────────────────────────────────── # Agrégation corpus-wide par moteur # ────────────────────────────────────────────────────────────────────────── def _aggregate_unicode(per_doc: list[dict]) -> dict: total_correct = 0 total_chars = 0 per_block: dict[str, dict[str, int]] = {} for d in per_doc: for block, stats in d.get("per_block", {}).items(): slot = per_block.setdefault(block, {"correct": 0, "total": 0}) slot["correct"] += stats.get("correct", 0) slot["total"] += stats.get("total", 0) total_correct += stats.get("correct", 0) total_chars += stats.get("total", 0) out_per_block = { block: { "correct": slot["correct"], "total": slot["total"], "accuracy": ( slot["correct"] / slot["total"] if slot["total"] > 0 else 0.0 ), } for block, slot in sorted(per_block.items()) } return { "global_accuracy": total_correct / total_chars if total_chars > 0 else 0.0, "n_chars_total": total_chars, "n_chars_correct": total_correct, "per_block": out_per_block, "doc_count": len(per_doc), } def _aggregate_abbreviations(per_doc: list[dict]) -> dict: n_total = 0 n_strict = 0 n_expansion = 0 per_abbr: dict[str, dict[str, int]] = {} for d in per_doc: n_total += d.get("n_abbreviations_in_reference", 0) n_strict += d.get("n_strict_preserved", 0) n_expansion += d.get("n_expansion_preserved", 0) for entry in d.get("per_abbreviation", []): slot = per_abbr.setdefault( entry["abbr"], {"total": 0, "strict": 0, "expansion": 0}, ) slot["total"] += 1 if entry.get("strict_preserved"): slot["strict"] += 1 if entry.get("expansion_preserved"): slot["expansion"] += 1 return { "n_abbreviations_in_reference": n_total, "n_strict_preserved": n_strict, "n_expansion_preserved": n_expansion, "global_strict_score": n_strict / n_total if n_total > 0 else 0.0, "global_expansion_score": n_expansion / n_total if n_total > 0 else 0.0, "per_abbreviation": { abbr: { "n_total": slot["total"], "n_strict": slot["strict"], "n_expansion": slot["expansion"], "strict_score": slot["strict"] / slot["total"], "expansion_score": slot["expansion"] / slot["total"], } for abbr, slot in sorted(per_abbr.items()) }, "doc_count": len(per_doc), } def _aggregate_mufi(per_doc: list[dict]) -> dict: n_total = 0 n_preserved = 0 per_char: dict[str, dict[str, int]] = {} for d in per_doc: n_total += d.get("n_mufi_chars_reference", 0) n_preserved += d.get("n_mufi_chars_preserved", 0) for ch, stats in d.get("per_char", {}).items(): slot = per_char.setdefault(ch, {"total": 0, "preserved": 0}) slot["total"] += stats.get("total", 0) slot["preserved"] += stats.get("preserved", 0) return { "n_mufi_chars_reference": n_total, "n_mufi_chars_preserved": n_preserved, "coverage": n_preserved / n_total if n_total > 0 else 0.0, "per_char": { ch: { "total": slot["total"], "preserved": slot["preserved"], "coverage": slot["preserved"] / slot["total"], } for ch, slot in sorted(per_char.items()) }, "doc_count": len(per_doc), } def _aggregate_early_modern(per_doc: list[dict]) -> dict: n_total = 0 n_preserved = 0 per_cat: dict[str, dict[str, int]] = {} for d in per_doc: n_total += d.get("n_markers_reference", 0) n_preserved += d.get("n_markers_preserved", 0) for cat, stats in d.get("per_category", {}).items(): slot = per_cat.setdefault(cat, {"total": 0, "preserved": 0}) slot["total"] += stats.get("total", 0) slot["preserved"] += stats.get("preserved", 0) return { "n_markers_reference": n_total, "n_markers_preserved": n_preserved, "global_preservation": n_preserved / n_total if n_total > 0 else 0.0, "per_category": { cat: { "total": slot["total"], "preserved": slot["preserved"], "preservation": slot["preserved"] / slot["total"], } for cat, slot in sorted(per_cat.items()) }, "doc_count": len(per_doc), } def _aggregate_modern_archives(per_doc: list[dict]) -> dict: n_total = 0 n_strict = 0 n_expansion = 0 per_cat: dict[str, dict[str, int]] = {} for d in per_doc: n_total += d.get("n_markers_reference", 0) n_strict += d.get("n_strict_preserved", 0) n_expansion += d.get("n_expansion_preserved", 0) for cat, stats in d.get("per_category", {}).items(): slot = per_cat.setdefault( cat, {"total": 0, "strict": 0, "expansion": 0}, ) slot["total"] += stats.get("n_total", 0) slot["strict"] += stats.get("n_strict_preserved", 0) slot["expansion"] += stats.get("n_expansion_preserved", 0) return { "n_markers_reference": n_total, "n_strict_preserved": n_strict, "n_expansion_preserved": n_expansion, "global_strict_score": n_strict / n_total if n_total > 0 else 0.0, "global_expansion_score": n_expansion / n_total if n_total > 0 else 0.0, "per_category": { cat: { "n_total": slot["total"], "n_strict_preserved": slot["strict"], "n_expansion_preserved": slot["expansion"], "strict_score": slot["strict"] / slot["total"], "expansion_score": slot["expansion"] / slot["total"], } for cat, slot in sorted(per_cat.items()) }, "doc_count": len(per_doc), } def _aggregate_roman_numerals(per_doc: list[dict]) -> dict: from picarones.measurements.roman_numerals import ALL_STATUSES, VALUE_PRESERVING_STATUSES n_total = 0 per_status: dict[str, int] = {s: 0 for s in ALL_STATUSES} for d in per_doc: n_total += d.get("n_numerals_reference", 0) for status, count in d.get("per_status", {}).items(): per_status[status] = per_status.get(status, 0) + count n_strict = per_status.get("strict_preserved", 0) n_value = sum(per_status.get(s, 0) for s in VALUE_PRESERVING_STATUSES) return { "n_numerals_reference": n_total, "n_strict_preserved": n_strict, "n_value_preserved": n_value, "global_strict_score": n_strict / n_total if n_total > 0 else 0.0, "global_value_score": n_value / n_total if n_total > 0 else 0.0, "per_status": per_status, "doc_count": len(per_doc), } _AGGREGATORS = { "unicode_blocks": _aggregate_unicode, "abbreviations": _aggregate_abbreviations, "mufi": _aggregate_mufi, "early_modern": _aggregate_early_modern, "modern_archives": _aggregate_modern_archives, "roman_numerals": _aggregate_roman_numerals, } def aggregate_philological_metrics( doc_metrics: list[Optional[dict]], ) -> Optional[dict]: """Agrège les ``philological_metrics`` per-document en un dict corpus-wide par module. Pour chaque module, on agrège uniquement les documents qui ont eu du signal pour ce module. Si aucun module n'a été calculé sur aucun document, retourne ``None``. """ by_module: dict[str, list[dict]] = {} for doc in doc_metrics: if not doc: continue for module, payload in doc.items(): by_module.setdefault(module, []).append(payload) if not by_module: return None out: dict = {} for module, payloads in by_module.items(): aggregator = _AGGREGATORS.get(module) if aggregator is None: # pragma: no cover logger.warning( "[philological_hooks] aucun agrégateur pour %s", module, ) continue out[module] = aggregator(payloads) return out if out else None __all__ = [ "compute_philological_metrics", "aggregate_philological_metrics", ]