Picarones / picarones /measurements /runner /orchestration.py
Claude
feat(migration): Phase 4-quater — relocate core/corpus.py vers evaluation/
3300273 unverified
Raw
History Blame
22.3 kB
"""Orchestrateur principal du benchmark.
Contient :func:`run_benchmark` et son helper :func:`_build_pipeline_info`.
Le runner exécute chaque moteur de la liste sur le corpus complet :
- Pour les moteurs CPU-bound (``execution_mode == "cpu"`` :
Tesseract, Pero OCR, Kraken), utilise un ``ProcessPoolExecutor``
et délègue aux workers picklables de :mod:`workers`.
- Pour les moteurs IO-bound (Mistral, Google Vision, Azure, LLMs),
utilise un ``ThreadPoolExecutor``.
Les résultats partiels (NDJSON par moteur) sont gérés par
:mod:`partial` ; le calcul d'un :class:`DocumentResult` individuel
par :mod:`document` ; l'agrégation finale par les hooks délégués à
:mod:`builtin_hooks` (chantier 2 post-Sprint 97).
"""
from __future__ import annotations
import concurrent.futures
import logging
import threading
import time
from pathlib import Path
from typing import Optional
from tqdm import tqdm
from picarones.evaluation.corpus import Corpus
from picarones.evaluation.benchmark_result import BenchmarkResult, DocumentResult, EngineReport
from picarones.engines.base import BaseOCREngine
from picarones.measurements.runner.document import (
_make_error_doc_result,
_make_timeout_doc_result,
)
from picarones.measurements.runner.ner_attach import (
_aggregate_ner,
_attach_ner_metrics,
)
from picarones.measurements.runner.partial import (
_delete_partial,
_load_partial,
_save_partial_line,
)
from picarones.measurements.runner.workers import (
_cpu_doc_worker,
_io_doc_worker,
)
logger = logging.getLogger(__name__)
def run_benchmark(
corpus: Corpus,
engines: list[BaseOCREngine],
output_json: Optional[str | Path] = None,
show_progress: bool = True,
progress_callback: Optional[callable] = None,
char_exclude: Optional[frozenset] = None,
max_workers: int = 4,
timeout_seconds: float = 60.0,
partial_dir: Optional[str | Path] = None,
cancel_event: Optional[threading.Event] = None,
entity_extractor: Optional[callable] = None,
profile: str = "standard",
normalization_profile: Optional[str] = None,
) -> BenchmarkResult:
"""Exécute le benchmark d'un ou plusieurs moteurs/pipelines sur un corpus.
Les pipelines OCR+LLM (``OCRLLMPipeline``) sont traités exactement comme
les moteurs OCR classiques — ils implémentent la même interface
``BaseOCREngine`` et produisent les mêmes métriques CER/WER.
Parallélisation
---------------
* Moteurs CPU-bound (Tesseract, Pero OCR, Kraken) : ``ProcessPoolExecutor``
* Moteurs IO-bound / API (Mistral, Google, Azure, LLMs) : ``ThreadPoolExecutor``
Reprise sur interruption
------------------------
Les résultats partiels sont sauvegardés document par document dans
``{partial_dir}/{corpus}_{engine}.partial.json``. Si le benchmark est
interrompu, la prochaine exécution repart automatiquement de là où elle
s'est arrêtée.
Parameters
----------
corpus:
Corpus à évaluer.
engines:
Liste d'adaptateurs moteurs ou de pipelines OCR+LLM.
output_json:
Chemin optionnel pour écrire le résultat JSON.
show_progress:
Affiche une barre de progression tqdm.
progress_callback:
Fonction ``(engine_name, doc_idx, doc_id) → None`` appelée après chaque
document traité. Une exception dans le callback est loguée en WARNING
et n'interrompt pas le benchmark.
char_exclude:
Ensemble de caractères à exclure du calcul CER/WER.
max_workers:
Taille maximale des pools de threads/processus (défaut : 4).
Peut être défini via le champ ``max_workers`` du YAML de configuration.
timeout_seconds:
Timeout par document en secondes (défaut : 60). Un document dépassant
ce délai est marqué comme erreur ``timeout`` et le benchmark continue.
partial_dir:
Répertoire pour les fichiers de reprise (défaut : répertoire temporaire
système).
cancel_event:
``threading.Event`` optionnel. Si défini et signalé (``set()``),
le benchmark s'interrompt proprement dès que possible et retourne
les résultats partiels collectés jusque-là.
profile:
Profil de calcul des métriques (chantier 2 post-Sprint 97).
Valeurs : ``"minimal"`` (CER/WER seuls), ``"standard"`` (défaut,
comportement historique avec les 12 hooks), ``"philological"``,
``"diagnostics"``, ``"economics"``, ``"pipeline"``, ``"full"``.
Le profil ``"standard"`` est strictement rétrocompatible avec
le runner pré-chantier-2.
normalization_profile:
Identifiant d'un profil de normalisation diplomatique
(cf. ``measurements.normalization.NORMALIZATION_PROFILES``).
Sprint A14-S1 — A.I.0 P0 : auparavant l'API web exposait ce
paramètre mais il était silencieusement perdu avant
d'atteindre ``compute_metrics``, ce qui rendait
scientifiquement faux tout benchmark lancé via la web app.
Désormais propagé end-to-end : web → run_benchmark → workers
→ compute_metrics. ``None`` = profil par défaut (medieval_french).
Returns
-------
BenchmarkResult
"""
# Validation du profil dès l'entrée pour échouer rapidement sur
# une faute de frappe utilisateur, avant de soumettre des futures
# aux pools. Eager-load des hooks natifs pour peupler le registre
# dans le main process (les sous-processus du pool feront leur
# propre import dans ``_compute_document_result``).
import picarones.measurements.builtin_hooks # noqa: F401
from picarones.evaluation.metric_hooks import (
run_corpus_aggregators, validate_profile,
)
validate_profile(profile)
# Sprint A14-S1 — résolution one-shot du profil de normalisation.
# On le fait ici (main process) pour échouer rapidement sur un ID
# invalide avant de soumettre des futures aux pools, et pour
# éviter de re-résoudre N fois côté workers.
norm_profile_obj = None
if normalization_profile is not None:
from picarones.measurements.normalization import get_builtin_profile
norm_profile_obj = get_builtin_profile(normalization_profile)
def _is_cancelled() -> bool:
return cancel_event is not None and cancel_event.is_set()
engine_reports: list[EngineReport] = []
# Sprint 36 — collecte des hypothèses brutes par moteur avant
# ``compact()`` pour pouvoir calculer la divergence taxonomique et
# la complémentarité (oracle) en fin de benchmark.
per_engine_outputs: dict[str, dict[str, str]] = {}
ground_truths_by_doc: dict[str, str] = {}
# Sprint 45 — A.III stratification : capture du ``script_type`` par
# document avant ``compact()`` (qui efface ``image_quality``).
doc_strata: dict[str, str] = {}
# Sprint 87 — langue du corpus pour le delta Flesch (A.II.2).
# Lecture depuis corpus.metadata, fallback "fr".
corpus_lang: str = (corpus.metadata or {}).get("language", "fr")
if corpus_lang not in ("fr", "en"):
# Sprint 52 ne supporte que fr/en — fallback "fr" en warning.
logger.warning(
"[readability] langue '%s' non supportée, fallback 'fr'.",
corpus_lang,
)
corpus_lang = "fr"
for engine in engines:
if _is_cancelled():
logger.info("Benchmark annulé avant le moteur '%s'.", engine.name)
break
logger.info("Démarrage : %s", engine.name)
# Reprise depuis résultats partiels d'une éventuelle exécution précédente
partial_path, loaded_results = _load_partial(corpus.name, engine.name, partial_dir)
loaded_doc_ids = {dr.doc_id for dr in loaded_results}
if loaded_results:
logger.info(
"Reprise depuis résultats partiels : %d/%d documents déjà traités.",
len(loaded_results), len(corpus),
)
docs_to_process = [doc for doc in corpus.documents if doc.doc_id not in loaded_doc_ids]
if loaded_doc_ids:
logger.info(
"[%s] %d doc(s) ignorés (résultats partiels existants) — "
"supprimer le fichier partiel '%s' pour forcer le recalcul.",
engine.name, len(loaded_doc_ids), partial_path,
)
document_results: list[DocumentResult] = list(loaded_results)
# Sélection du type d'exécution selon execution_mode du moteur
is_cpu_bound = getattr(engine, "execution_mode", "io") == "cpu"
ExecutorClass = (
concurrent.futures.ProcessPoolExecutor
if is_cpu_bound
else concurrent.futures.ThreadPoolExecutor
)
logger.info(
"[%s] classe=%s, exécuteur=%s, docs à traiter=%d (reprise=%d).",
engine.name,
engine.__class__.__name__,
"ProcessPoolExecutor" if is_cpu_bound else "ThreadPoolExecutor",
len(docs_to_process),
len(loaded_results),
)
pbar = tqdm(
total=len(corpus.documents),
initial=len(loaded_results),
desc=f"[{engine.name}]",
unit="doc",
disable=not show_progress,
)
processed_count = len(loaded_results)
executor = ExecutorClass(max_workers=max_workers)
try:
# Soumission de tous les documents au pool
future_to_doc: dict = {}
submitted_at: dict = {}
for doc in docs_to_process:
if _is_cancelled():
logger.info("[%s] annulation — arrêt de la soumission.", engine.name)
break
if is_cpu_bound:
engine_module = engine.__class__.__module__
engine_class_name = engine.__class__.__name__
char_exclude_tuple = tuple(char_exclude) if char_exclude else ()
future = executor.submit(
_cpu_doc_worker,
(engine_module, engine_class_name, engine.config,
doc.doc_id, str(doc.image_path), doc.ground_truth,
char_exclude_tuple, corpus_lang, profile,
norm_profile_obj),
)
else:
future = executor.submit(
_io_doc_worker, engine, doc, char_exclude,
corpus_lang, profile, norm_profile_obj,
)
future_to_doc[future] = doc
submitted_at[future] = time.monotonic()
remaining = set(future_to_doc)
while remaining:
if _is_cancelled():
logger.info("[%s] annulation — annulation des futures restantes.", engine.name)
for f in remaining:
f.cancel()
break
done, remaining = concurrent.futures.wait(
remaining,
timeout=0.5,
return_when=concurrent.futures.FIRST_COMPLETED,
)
for future in done:
doc = future_to_doc[future]
try:
doc_result = future.result()
except Exception as e:
logger.warning(
"[%s] doc %s : erreur inattendue : %s",
engine.name, doc.doc_id, e,
)
doc_result = _make_error_doc_result(doc, str(e))
document_results.append(doc_result)
_save_partial_line(partial_path, doc_result)
pbar.update(1)
if progress_callback is not None:
try:
progress_callback(engine.name, processed_count, doc.doc_id)
except Exception as e:
logger.warning("[progress_callback] fonctionnalité dégradée : %s", e)
processed_count += 1
# Vérification des timeouts par document
now = time.monotonic()
timed_out = [
f for f in remaining
if now - submitted_at[f] > timeout_seconds
]
for future in timed_out:
remaining.discard(future)
doc = future_to_doc[future]
future.cancel()
logger.warning(
"[%s] doc %s : timeout (%.0fs), document marqué en erreur.",
engine.name, doc.doc_id, timeout_seconds,
)
doc_result = _make_timeout_doc_result(doc, timeout_seconds)
document_results.append(doc_result)
_save_partial_line(partial_path, doc_result)
pbar.update(1)
if progress_callback is not None:
try:
progress_callback(engine.name, processed_count, doc.doc_id)
except Exception as e:
logger.warning(
"[progress_callback] fonctionnalité dégradée : %s", e
)
processed_count += 1
finally:
executor.shutdown(wait=False, cancel_futures=True)
pbar.close()
if _is_cancelled():
logger.info(
"[%s] annulé — %d documents traités sur %d.",
engine.name, len(document_results) - len(loaded_results),
len(docs_to_process),
)
# Conserver le fichier partiel pour reprise ultérieure
break
# Réordonner selon l'ordre du corpus pour reproductibilité
doc_order = {doc.doc_id: i for i, doc in enumerate(corpus.documents)}
document_results.sort(key=lambda dr: doc_order.get(dr.doc_id, len(doc_order)))
logger.info(
"[%s] collecte terminée — %d/%d documents (dont %d chargés depuis reprise).",
engine.name,
len(document_results),
len(corpus.documents),
len(loaded_results),
)
if not document_results:
logger.warning(
"[%s] aucun DocumentResult collecté — le rapport affichera 0/0 documents. "
"Vérifier que le moteur/pipeline a bien produit des résultats.",
engine.name,
)
# Supprimer le fichier partiel — moteur terminé avec succès
_delete_partial(partial_path)
engine_version = engine._safe_version()
pipeline_info = _build_pipeline_info(engine, document_results)
# Chantier 2 (post-Sprint 97) — agrégation déléguée au registre.
# Les 12 appels manuels aux fonctions ``_aggregate_*`` sont
# remplacés par un seul appel qui itère sur les agrégateurs
# actifs du profil. Le profil ``"standard"`` (défaut) reproduit
# exactement le comportement pré-chantier-2.
aggregated = run_corpus_aggregators(profile, document_results)
report = EngineReport(
engine_name=engine.name,
engine_version=engine_version,
engine_config=engine.config,
document_results=document_results,
pipeline_info=pipeline_info,
aggregated_confusion=aggregated.get("aggregated_confusion"),
aggregated_char_scores=aggregated.get("aggregated_char_scores"),
aggregated_taxonomy=aggregated.get("aggregated_taxonomy"),
aggregated_structure=aggregated.get("aggregated_structure"),
aggregated_image_quality=aggregated.get("aggregated_image_quality"),
aggregated_line_metrics=aggregated.get("aggregated_line_metrics"),
aggregated_hallucination=aggregated.get("aggregated_hallucination"),
aggregated_calibration=aggregated.get("aggregated_calibration"),
aggregated_philological=aggregated.get("aggregated_philological"),
aggregated_searchability=aggregated.get("aggregated_searchability"),
aggregated_numerical_sequences=aggregated.get("aggregated_numerical_sequences"),
aggregated_readability=aggregated.get("aggregated_readability"),
)
engine_reports.append(report)
logger.info(
"%s terminé — CER moyen : %.2f%%",
engine.name,
(report.mean_cer or 0) * 100,
)
# Sprint 36 — capture des hypothèses brutes pour le calcul
# inter-moteurs (effectué après la boucle, avant la sérialisation).
# On clone les chaînes pour ne pas dépendre de la durée de vie des
# DocumentResult après ``compact()``.
per_engine_outputs[engine.name] = {
dr.doc_id: dr.hypothesis for dr in document_results
if dr.engine_error is None
}
for dr in document_results:
if dr.doc_id not in ground_truths_by_doc and dr.ground_truth:
ground_truths_by_doc[dr.doc_id] = dr.ground_truth
# Sprint 45 — capture script_type avant compact()
if dr.doc_id not in doc_strata and dr.image_quality:
st = dr.image_quality.get("script_type")
if st:
doc_strata[dr.doc_id] = str(st)
# Sprint 40 — calcul des métriques NER si :
# 1. l'utilisateur a fourni un EntityExtractor au runner ;
# 2. ET le document a un niveau de GT ENTITIES (Sprint 32).
# Fait dans le main process (pas dans les sous-processus du pool)
# pour éviter de pickler l'extracteur (spaCy + modèle).
if entity_extractor is not None:
_attach_ner_metrics(corpus, document_results, entity_extractor)
agg_ner = _aggregate_ner(document_results)
report.aggregated_ner = agg_ner
# Sprint A14-S1 — A.I.0 P0 : la compaction inconditionnelle qui
# vivait ici amputait silencieusement le JSON exporté (et donc
# le rapport HTML qui le consomme) en supprimant 13 dicts
# d'analyse per-document et en tronquant les textes à 200 chars.
# ``DocumentResult.compact()`` est désormais opt-in (paramètres
# ``text_limit`` et ``drop_analyses``) ; le runner ne compacte
# plus par défaut afin que ``output_json`` contienne réellement
# toutes les analyses détaillées promises par le README.
# Un caller qui veut un JSON léger peut appeler
# ``dr.compact(text_limit=200, drop_analyses=True)`` lui-même
# après ``run_benchmark`` et avant la sérialisation finale.
# Sprint 36 — analyse inter-moteurs (divergence taxonomique +
# complémentarité / oracle). N'est calculée qu'à partir de 2
# moteurs ; en deçà l'analyse n'a pas de sens.
inter_engine_payload: Optional[dict] = None
if len(engine_reports) >= 2:
try:
from picarones.measurements.inter_engine import compute_inter_engine_analysis
taxonomy_distros = {
report.engine_name: (
report.aggregated_taxonomy.get("class_distribution", {})
if report.aggregated_taxonomy
else {}
)
for report in engine_reports
}
# Élimine les moteurs sans distribution taxonomique pour ne pas
# polluer la matrice.
taxonomy_distros = {
name: dist for name, dist in taxonomy_distros.items() if dist
}
inter_engine_payload = compute_inter_engine_analysis(
per_engine_outputs=per_engine_outputs,
ground_truths=ground_truths_by_doc,
taxonomy_distributions=taxonomy_distros or None,
)
except Exception as exc: # noqa: BLE001
logger.warning(
"[runner] analyse inter-moteurs dégradée : %s — section omise du rapport",
exc,
)
benchmark = BenchmarkResult(
corpus_name=corpus.name,
corpus_source=corpus.source_path,
document_count=len(corpus),
engine_reports=engine_reports,
inter_engine_analysis=inter_engine_payload,
doc_strata=dict(doc_strata) if doc_strata else None,
)
if output_json:
path = benchmark.to_json(output_json)
logger.info("Résultats écrits dans : %s", path)
return benchmark
def _build_pipeline_info(engine: BaseOCREngine, doc_results: list[DocumentResult]) -> dict:
"""Construit le dictionnaire pipeline_info pour un EngineReport."""
first_with_meta = next(
(dr for dr in doc_results if dr.pipeline_metadata), None
)
if first_with_meta is None:
return {}
meta = first_with_meta.pipeline_metadata
info: dict = {
"pipeline_mode": meta.get("pipeline_mode"),
"prompt_file": meta.get("prompt_file"),
"llm_model": meta.get("llm_model"),
"llm_provider": meta.get("llm_provider"),
}
try:
from picarones.pipelines.base import OCRLLMPipeline
if isinstance(engine, OCRLLMPipeline):
info["pipeline_steps"] = engine._build_steps_info()
info["prompt_template"] = engine._prompt_template
except ImportError:
pass
over_norm_results = [
dr.pipeline_metadata.get("over_normalization")
for dr in doc_results
if dr.pipeline_metadata.get("over_normalization") is not None
]
if over_norm_results:
total_correct = sum(r["total_correct_ocr_words"] for r in over_norm_results)
total_over = sum(r["over_normalized_count"] for r in over_norm_results)
info["over_normalization"] = {
"score": round(total_over / total_correct, 4) if total_correct > 0 else 0.0,
"total_correct_ocr_words": total_correct,
"over_normalized_count": total_over,
"document_count": len(over_norm_results),
}
return info
__all__ = ["_build_pipeline_info", "run_benchmark"]