Spaces:
Sleeping
Sleeping
Claude
feat(migration): Phase 4-quater — relocate core/corpus.py vers evaluation/
3300273 unverified | """Orchestrateur principal du benchmark. | |
| Contient :func:`run_benchmark` et son helper :func:`_build_pipeline_info`. | |
| Le runner exécute chaque moteur de la liste sur le corpus complet : | |
| - Pour les moteurs CPU-bound (``execution_mode == "cpu"`` : | |
| Tesseract, Pero OCR, Kraken), utilise un ``ProcessPoolExecutor`` | |
| et délègue aux workers picklables de :mod:`workers`. | |
| - Pour les moteurs IO-bound (Mistral, Google Vision, Azure, LLMs), | |
| utilise un ``ThreadPoolExecutor``. | |
| Les résultats partiels (NDJSON par moteur) sont gérés par | |
| :mod:`partial` ; le calcul d'un :class:`DocumentResult` individuel | |
| par :mod:`document` ; l'agrégation finale par les hooks délégués à | |
| :mod:`builtin_hooks` (chantier 2 post-Sprint 97). | |
| """ | |
| from __future__ import annotations | |
| import concurrent.futures | |
| import logging | |
| import threading | |
| import time | |
| from pathlib import Path | |
| from typing import Optional | |
| from tqdm import tqdm | |
| from picarones.evaluation.corpus import Corpus | |
| from picarones.evaluation.benchmark_result import BenchmarkResult, DocumentResult, EngineReport | |
| from picarones.engines.base import BaseOCREngine | |
| from picarones.measurements.runner.document import ( | |
| _make_error_doc_result, | |
| _make_timeout_doc_result, | |
| ) | |
| from picarones.measurements.runner.ner_attach import ( | |
| _aggregate_ner, | |
| _attach_ner_metrics, | |
| ) | |
| from picarones.measurements.runner.partial import ( | |
| _delete_partial, | |
| _load_partial, | |
| _save_partial_line, | |
| ) | |
| from picarones.measurements.runner.workers import ( | |
| _cpu_doc_worker, | |
| _io_doc_worker, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def run_benchmark( | |
| corpus: Corpus, | |
| engines: list[BaseOCREngine], | |
| output_json: Optional[str | Path] = None, | |
| show_progress: bool = True, | |
| progress_callback: Optional[callable] = None, | |
| char_exclude: Optional[frozenset] = None, | |
| max_workers: int = 4, | |
| timeout_seconds: float = 60.0, | |
| partial_dir: Optional[str | Path] = None, | |
| cancel_event: Optional[threading.Event] = None, | |
| entity_extractor: Optional[callable] = None, | |
| profile: str = "standard", | |
| normalization_profile: Optional[str] = None, | |
| ) -> BenchmarkResult: | |
| """Exécute le benchmark d'un ou plusieurs moteurs/pipelines sur un corpus. | |
| Les pipelines OCR+LLM (``OCRLLMPipeline``) sont traités exactement comme | |
| les moteurs OCR classiques — ils implémentent la même interface | |
| ``BaseOCREngine`` et produisent les mêmes métriques CER/WER. | |
| Parallélisation | |
| --------------- | |
| * Moteurs CPU-bound (Tesseract, Pero OCR, Kraken) : ``ProcessPoolExecutor`` | |
| * Moteurs IO-bound / API (Mistral, Google, Azure, LLMs) : ``ThreadPoolExecutor`` | |
| Reprise sur interruption | |
| ------------------------ | |
| Les résultats partiels sont sauvegardés document par document dans | |
| ``{partial_dir}/{corpus}_{engine}.partial.json``. Si le benchmark est | |
| interrompu, la prochaine exécution repart automatiquement de là où elle | |
| s'est arrêtée. | |
| Parameters | |
| ---------- | |
| corpus: | |
| Corpus à évaluer. | |
| engines: | |
| Liste d'adaptateurs moteurs ou de pipelines OCR+LLM. | |
| output_json: | |
| Chemin optionnel pour écrire le résultat JSON. | |
| show_progress: | |
| Affiche une barre de progression tqdm. | |
| progress_callback: | |
| Fonction ``(engine_name, doc_idx, doc_id) → None`` appelée après chaque | |
| document traité. Une exception dans le callback est loguée en WARNING | |
| et n'interrompt pas le benchmark. | |
| char_exclude: | |
| Ensemble de caractères à exclure du calcul CER/WER. | |
| max_workers: | |
| Taille maximale des pools de threads/processus (défaut : 4). | |
| Peut être défini via le champ ``max_workers`` du YAML de configuration. | |
| timeout_seconds: | |
| Timeout par document en secondes (défaut : 60). Un document dépassant | |
| ce délai est marqué comme erreur ``timeout`` et le benchmark continue. | |
| partial_dir: | |
| Répertoire pour les fichiers de reprise (défaut : répertoire temporaire | |
| système). | |
| cancel_event: | |
| ``threading.Event`` optionnel. Si défini et signalé (``set()``), | |
| le benchmark s'interrompt proprement dès que possible et retourne | |
| les résultats partiels collectés jusque-là. | |
| profile: | |
| Profil de calcul des métriques (chantier 2 post-Sprint 97). | |
| Valeurs : ``"minimal"`` (CER/WER seuls), ``"standard"`` (défaut, | |
| comportement historique avec les 12 hooks), ``"philological"``, | |
| ``"diagnostics"``, ``"economics"``, ``"pipeline"``, ``"full"``. | |
| Le profil ``"standard"`` est strictement rétrocompatible avec | |
| le runner pré-chantier-2. | |
| normalization_profile: | |
| Identifiant d'un profil de normalisation diplomatique | |
| (cf. ``measurements.normalization.NORMALIZATION_PROFILES``). | |
| Sprint A14-S1 — A.I.0 P0 : auparavant l'API web exposait ce | |
| paramètre mais il était silencieusement perdu avant | |
| d'atteindre ``compute_metrics``, ce qui rendait | |
| scientifiquement faux tout benchmark lancé via la web app. | |
| Désormais propagé end-to-end : web → run_benchmark → workers | |
| → compute_metrics. ``None`` = profil par défaut (medieval_french). | |
| Returns | |
| ------- | |
| BenchmarkResult | |
| """ | |
| # Validation du profil dès l'entrée pour échouer rapidement sur | |
| # une faute de frappe utilisateur, avant de soumettre des futures | |
| # aux pools. Eager-load des hooks natifs pour peupler le registre | |
| # dans le main process (les sous-processus du pool feront leur | |
| # propre import dans ``_compute_document_result``). | |
| import picarones.measurements.builtin_hooks # noqa: F401 | |
| from picarones.evaluation.metric_hooks import ( | |
| run_corpus_aggregators, validate_profile, | |
| ) | |
| validate_profile(profile) | |
| # Sprint A14-S1 — résolution one-shot du profil de normalisation. | |
| # On le fait ici (main process) pour échouer rapidement sur un ID | |
| # invalide avant de soumettre des futures aux pools, et pour | |
| # éviter de re-résoudre N fois côté workers. | |
| norm_profile_obj = None | |
| if normalization_profile is not None: | |
| from picarones.measurements.normalization import get_builtin_profile | |
| norm_profile_obj = get_builtin_profile(normalization_profile) | |
| def _is_cancelled() -> bool: | |
| return cancel_event is not None and cancel_event.is_set() | |
| engine_reports: list[EngineReport] = [] | |
| # Sprint 36 — collecte des hypothèses brutes par moteur avant | |
| # ``compact()`` pour pouvoir calculer la divergence taxonomique et | |
| # la complémentarité (oracle) en fin de benchmark. | |
| per_engine_outputs: dict[str, dict[str, str]] = {} | |
| ground_truths_by_doc: dict[str, str] = {} | |
| # Sprint 45 — A.III stratification : capture du ``script_type`` par | |
| # document avant ``compact()`` (qui efface ``image_quality``). | |
| doc_strata: dict[str, str] = {} | |
| # Sprint 87 — langue du corpus pour le delta Flesch (A.II.2). | |
| # Lecture depuis corpus.metadata, fallback "fr". | |
| corpus_lang: str = (corpus.metadata or {}).get("language", "fr") | |
| if corpus_lang not in ("fr", "en"): | |
| # Sprint 52 ne supporte que fr/en — fallback "fr" en warning. | |
| logger.warning( | |
| "[readability] langue '%s' non supportée, fallback 'fr'.", | |
| corpus_lang, | |
| ) | |
| corpus_lang = "fr" | |
| for engine in engines: | |
| if _is_cancelled(): | |
| logger.info("Benchmark annulé avant le moteur '%s'.", engine.name) | |
| break | |
| logger.info("Démarrage : %s", engine.name) | |
| # Reprise depuis résultats partiels d'une éventuelle exécution précédente | |
| partial_path, loaded_results = _load_partial(corpus.name, engine.name, partial_dir) | |
| loaded_doc_ids = {dr.doc_id for dr in loaded_results} | |
| if loaded_results: | |
| logger.info( | |
| "Reprise depuis résultats partiels : %d/%d documents déjà traités.", | |
| len(loaded_results), len(corpus), | |
| ) | |
| docs_to_process = [doc for doc in corpus.documents if doc.doc_id not in loaded_doc_ids] | |
| if loaded_doc_ids: | |
| logger.info( | |
| "[%s] %d doc(s) ignorés (résultats partiels existants) — " | |
| "supprimer le fichier partiel '%s' pour forcer le recalcul.", | |
| engine.name, len(loaded_doc_ids), partial_path, | |
| ) | |
| document_results: list[DocumentResult] = list(loaded_results) | |
| # Sélection du type d'exécution selon execution_mode du moteur | |
| is_cpu_bound = getattr(engine, "execution_mode", "io") == "cpu" | |
| ExecutorClass = ( | |
| concurrent.futures.ProcessPoolExecutor | |
| if is_cpu_bound | |
| else concurrent.futures.ThreadPoolExecutor | |
| ) | |
| logger.info( | |
| "[%s] classe=%s, exécuteur=%s, docs à traiter=%d (reprise=%d).", | |
| engine.name, | |
| engine.__class__.__name__, | |
| "ProcessPoolExecutor" if is_cpu_bound else "ThreadPoolExecutor", | |
| len(docs_to_process), | |
| len(loaded_results), | |
| ) | |
| pbar = tqdm( | |
| total=len(corpus.documents), | |
| initial=len(loaded_results), | |
| desc=f"[{engine.name}]", | |
| unit="doc", | |
| disable=not show_progress, | |
| ) | |
| processed_count = len(loaded_results) | |
| executor = ExecutorClass(max_workers=max_workers) | |
| try: | |
| # Soumission de tous les documents au pool | |
| future_to_doc: dict = {} | |
| submitted_at: dict = {} | |
| for doc in docs_to_process: | |
| if _is_cancelled(): | |
| logger.info("[%s] annulation — arrêt de la soumission.", engine.name) | |
| break | |
| if is_cpu_bound: | |
| engine_module = engine.__class__.__module__ | |
| engine_class_name = engine.__class__.__name__ | |
| char_exclude_tuple = tuple(char_exclude) if char_exclude else () | |
| future = executor.submit( | |
| _cpu_doc_worker, | |
| (engine_module, engine_class_name, engine.config, | |
| doc.doc_id, str(doc.image_path), doc.ground_truth, | |
| char_exclude_tuple, corpus_lang, profile, | |
| norm_profile_obj), | |
| ) | |
| else: | |
| future = executor.submit( | |
| _io_doc_worker, engine, doc, char_exclude, | |
| corpus_lang, profile, norm_profile_obj, | |
| ) | |
| future_to_doc[future] = doc | |
| submitted_at[future] = time.monotonic() | |
| remaining = set(future_to_doc) | |
| while remaining: | |
| if _is_cancelled(): | |
| logger.info("[%s] annulation — annulation des futures restantes.", engine.name) | |
| for f in remaining: | |
| f.cancel() | |
| break | |
| done, remaining = concurrent.futures.wait( | |
| remaining, | |
| timeout=0.5, | |
| return_when=concurrent.futures.FIRST_COMPLETED, | |
| ) | |
| for future in done: | |
| doc = future_to_doc[future] | |
| try: | |
| doc_result = future.result() | |
| except Exception as e: | |
| logger.warning( | |
| "[%s] doc %s : erreur inattendue : %s", | |
| engine.name, doc.doc_id, e, | |
| ) | |
| doc_result = _make_error_doc_result(doc, str(e)) | |
| document_results.append(doc_result) | |
| _save_partial_line(partial_path, doc_result) | |
| pbar.update(1) | |
| if progress_callback is not None: | |
| try: | |
| progress_callback(engine.name, processed_count, doc.doc_id) | |
| except Exception as e: | |
| logger.warning("[progress_callback] fonctionnalité dégradée : %s", e) | |
| processed_count += 1 | |
| # Vérification des timeouts par document | |
| now = time.monotonic() | |
| timed_out = [ | |
| f for f in remaining | |
| if now - submitted_at[f] > timeout_seconds | |
| ] | |
| for future in timed_out: | |
| remaining.discard(future) | |
| doc = future_to_doc[future] | |
| future.cancel() | |
| logger.warning( | |
| "[%s] doc %s : timeout (%.0fs), document marqué en erreur.", | |
| engine.name, doc.doc_id, timeout_seconds, | |
| ) | |
| doc_result = _make_timeout_doc_result(doc, timeout_seconds) | |
| document_results.append(doc_result) | |
| _save_partial_line(partial_path, doc_result) | |
| pbar.update(1) | |
| if progress_callback is not None: | |
| try: | |
| progress_callback(engine.name, processed_count, doc.doc_id) | |
| except Exception as e: | |
| logger.warning( | |
| "[progress_callback] fonctionnalité dégradée : %s", e | |
| ) | |
| processed_count += 1 | |
| finally: | |
| executor.shutdown(wait=False, cancel_futures=True) | |
| pbar.close() | |
| if _is_cancelled(): | |
| logger.info( | |
| "[%s] annulé — %d documents traités sur %d.", | |
| engine.name, len(document_results) - len(loaded_results), | |
| len(docs_to_process), | |
| ) | |
| # Conserver le fichier partiel pour reprise ultérieure | |
| break | |
| # Réordonner selon l'ordre du corpus pour reproductibilité | |
| doc_order = {doc.doc_id: i for i, doc in enumerate(corpus.documents)} | |
| document_results.sort(key=lambda dr: doc_order.get(dr.doc_id, len(doc_order))) | |
| logger.info( | |
| "[%s] collecte terminée — %d/%d documents (dont %d chargés depuis reprise).", | |
| engine.name, | |
| len(document_results), | |
| len(corpus.documents), | |
| len(loaded_results), | |
| ) | |
| if not document_results: | |
| logger.warning( | |
| "[%s] aucun DocumentResult collecté — le rapport affichera 0/0 documents. " | |
| "Vérifier que le moteur/pipeline a bien produit des résultats.", | |
| engine.name, | |
| ) | |
| # Supprimer le fichier partiel — moteur terminé avec succès | |
| _delete_partial(partial_path) | |
| engine_version = engine._safe_version() | |
| pipeline_info = _build_pipeline_info(engine, document_results) | |
| # Chantier 2 (post-Sprint 97) — agrégation déléguée au registre. | |
| # Les 12 appels manuels aux fonctions ``_aggregate_*`` sont | |
| # remplacés par un seul appel qui itère sur les agrégateurs | |
| # actifs du profil. Le profil ``"standard"`` (défaut) reproduit | |
| # exactement le comportement pré-chantier-2. | |
| aggregated = run_corpus_aggregators(profile, document_results) | |
| report = EngineReport( | |
| engine_name=engine.name, | |
| engine_version=engine_version, | |
| engine_config=engine.config, | |
| document_results=document_results, | |
| pipeline_info=pipeline_info, | |
| aggregated_confusion=aggregated.get("aggregated_confusion"), | |
| aggregated_char_scores=aggregated.get("aggregated_char_scores"), | |
| aggregated_taxonomy=aggregated.get("aggregated_taxonomy"), | |
| aggregated_structure=aggregated.get("aggregated_structure"), | |
| aggregated_image_quality=aggregated.get("aggregated_image_quality"), | |
| aggregated_line_metrics=aggregated.get("aggregated_line_metrics"), | |
| aggregated_hallucination=aggregated.get("aggregated_hallucination"), | |
| aggregated_calibration=aggregated.get("aggregated_calibration"), | |
| aggregated_philological=aggregated.get("aggregated_philological"), | |
| aggregated_searchability=aggregated.get("aggregated_searchability"), | |
| aggregated_numerical_sequences=aggregated.get("aggregated_numerical_sequences"), | |
| aggregated_readability=aggregated.get("aggregated_readability"), | |
| ) | |
| engine_reports.append(report) | |
| logger.info( | |
| "%s terminé — CER moyen : %.2f%%", | |
| engine.name, | |
| (report.mean_cer or 0) * 100, | |
| ) | |
| # Sprint 36 — capture des hypothèses brutes pour le calcul | |
| # inter-moteurs (effectué après la boucle, avant la sérialisation). | |
| # On clone les chaînes pour ne pas dépendre de la durée de vie des | |
| # DocumentResult après ``compact()``. | |
| per_engine_outputs[engine.name] = { | |
| dr.doc_id: dr.hypothesis for dr in document_results | |
| if dr.engine_error is None | |
| } | |
| for dr in document_results: | |
| if dr.doc_id not in ground_truths_by_doc and dr.ground_truth: | |
| ground_truths_by_doc[dr.doc_id] = dr.ground_truth | |
| # Sprint 45 — capture script_type avant compact() | |
| if dr.doc_id not in doc_strata and dr.image_quality: | |
| st = dr.image_quality.get("script_type") | |
| if st: | |
| doc_strata[dr.doc_id] = str(st) | |
| # Sprint 40 — calcul des métriques NER si : | |
| # 1. l'utilisateur a fourni un EntityExtractor au runner ; | |
| # 2. ET le document a un niveau de GT ENTITIES (Sprint 32). | |
| # Fait dans le main process (pas dans les sous-processus du pool) | |
| # pour éviter de pickler l'extracteur (spaCy + modèle). | |
| if entity_extractor is not None: | |
| _attach_ner_metrics(corpus, document_results, entity_extractor) | |
| agg_ner = _aggregate_ner(document_results) | |
| report.aggregated_ner = agg_ner | |
| # Sprint A14-S1 — A.I.0 P0 : la compaction inconditionnelle qui | |
| # vivait ici amputait silencieusement le JSON exporté (et donc | |
| # le rapport HTML qui le consomme) en supprimant 13 dicts | |
| # d'analyse per-document et en tronquant les textes à 200 chars. | |
| # ``DocumentResult.compact()`` est désormais opt-in (paramètres | |
| # ``text_limit`` et ``drop_analyses``) ; le runner ne compacte | |
| # plus par défaut afin que ``output_json`` contienne réellement | |
| # toutes les analyses détaillées promises par le README. | |
| # Un caller qui veut un JSON léger peut appeler | |
| # ``dr.compact(text_limit=200, drop_analyses=True)`` lui-même | |
| # après ``run_benchmark`` et avant la sérialisation finale. | |
| # Sprint 36 — analyse inter-moteurs (divergence taxonomique + | |
| # complémentarité / oracle). N'est calculée qu'à partir de 2 | |
| # moteurs ; en deçà l'analyse n'a pas de sens. | |
| inter_engine_payload: Optional[dict] = None | |
| if len(engine_reports) >= 2: | |
| try: | |
| from picarones.measurements.inter_engine import compute_inter_engine_analysis | |
| taxonomy_distros = { | |
| report.engine_name: ( | |
| report.aggregated_taxonomy.get("class_distribution", {}) | |
| if report.aggregated_taxonomy | |
| else {} | |
| ) | |
| for report in engine_reports | |
| } | |
| # Élimine les moteurs sans distribution taxonomique pour ne pas | |
| # polluer la matrice. | |
| taxonomy_distros = { | |
| name: dist for name, dist in taxonomy_distros.items() if dist | |
| } | |
| inter_engine_payload = compute_inter_engine_analysis( | |
| per_engine_outputs=per_engine_outputs, | |
| ground_truths=ground_truths_by_doc, | |
| taxonomy_distributions=taxonomy_distros or None, | |
| ) | |
| except Exception as exc: # noqa: BLE001 | |
| logger.warning( | |
| "[runner] analyse inter-moteurs dégradée : %s — section omise du rapport", | |
| exc, | |
| ) | |
| benchmark = BenchmarkResult( | |
| corpus_name=corpus.name, | |
| corpus_source=corpus.source_path, | |
| document_count=len(corpus), | |
| engine_reports=engine_reports, | |
| inter_engine_analysis=inter_engine_payload, | |
| doc_strata=dict(doc_strata) if doc_strata else None, | |
| ) | |
| if output_json: | |
| path = benchmark.to_json(output_json) | |
| logger.info("Résultats écrits dans : %s", path) | |
| return benchmark | |
| def _build_pipeline_info(engine: BaseOCREngine, doc_results: list[DocumentResult]) -> dict: | |
| """Construit le dictionnaire pipeline_info pour un EngineReport.""" | |
| first_with_meta = next( | |
| (dr for dr in doc_results if dr.pipeline_metadata), None | |
| ) | |
| if first_with_meta is None: | |
| return {} | |
| meta = first_with_meta.pipeline_metadata | |
| info: dict = { | |
| "pipeline_mode": meta.get("pipeline_mode"), | |
| "prompt_file": meta.get("prompt_file"), | |
| "llm_model": meta.get("llm_model"), | |
| "llm_provider": meta.get("llm_provider"), | |
| } | |
| try: | |
| from picarones.pipelines.base import OCRLLMPipeline | |
| if isinstance(engine, OCRLLMPipeline): | |
| info["pipeline_steps"] = engine._build_steps_info() | |
| info["prompt_template"] = engine._prompt_template | |
| except ImportError: | |
| pass | |
| over_norm_results = [ | |
| dr.pipeline_metadata.get("over_normalization") | |
| for dr in doc_results | |
| if dr.pipeline_metadata.get("over_normalization") is not None | |
| ] | |
| if over_norm_results: | |
| total_correct = sum(r["total_correct_ocr_words"] for r in over_norm_results) | |
| total_over = sum(r["over_normalized_count"] for r in over_norm_results) | |
| info["over_normalization"] = { | |
| "score": round(total_over / total_correct, 4) if total_correct > 0 else 0.0, | |
| "total_correct_ocr_words": total_correct, | |
| "over_normalized_count": total_over, | |
| "document_count": len(over_norm_results), | |
| } | |
| return info | |
| __all__ = ["_build_pipeline_info", "run_benchmark"] | |