Claude
feat(migration): Phase 4-ter — relocalisation Cercle 1 → Cercle 2
0864c88 unverified
Raw
History Blame
4.95 kB
"""Persistance des résultats partiels du benchmark (NDJSON).
Quand le runner traite un corpus, il écrit chaque ``DocumentResult``
dans un fichier ``{partial_dir}/picarones_{corpus}_{engine}.partial.json``
au format NDJSON. Si le benchmark est interrompu (Ctrl+C, crash, kill),
la prochaine exécution reprend depuis ce fichier sans perdre le travail
déjà fait.
Thread-safe : le module utilise un :class:`threading.Lock` partagé
entre toutes les écritures pour sérialiser les appends.
"""
from __future__ import annotations
import json
import logging
import re
import tempfile
import threading
from pathlib import Path
from typing import Optional
from picarones.evaluation.benchmark_result import DocumentResult
from picarones.measurements.metrics import MetricsResult
logger = logging.getLogger(__name__)
# Lock pour la sérialisation des écritures de résultats partiels.
# Partagé entre tous les call sites (workers IO et CPU se relayent
# sur la même file).
_partial_write_lock = threading.Lock()
def _sanitize_filename(s: str) -> str:
return re.sub(r"[^\w\-]", "_", s)[:64]
def _partial_path(
corpus_name: str,
engine_name: str,
partial_dir: Optional[str | Path],
) -> Path:
base = Path(partial_dir) if partial_dir else Path(tempfile.gettempdir())
name = (
f"picarones_{_sanitize_filename(corpus_name)}"
f"_{_sanitize_filename(engine_name)}.partial.json"
)
return base / name
def _load_partial(
corpus_name: str,
engine_name: str,
partial_dir: Optional[str | Path],
) -> tuple[Path, list[DocumentResult]]:
"""Charge les résultats partiels d'une exécution précédente interrompue.
Returns
-------
(path, results) — chemin du fichier partiel et liste des
DocumentResult déjà calculés.
"""
path = _partial_path(corpus_name, engine_name, partial_dir)
results: list[DocumentResult] = []
if not path.exists():
return path, results
try:
with path.open("r", encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line:
continue
d = json.loads(line)
m = d.get("metrics", {})
metrics = MetricsResult(
cer=m.get("cer", 1.0),
cer_nfc=m.get("cer_nfc", 1.0),
cer_caseless=m.get("cer_caseless", 1.0),
wer=m.get("wer", 1.0),
wer_normalized=m.get("wer_normalized", 1.0),
mer=m.get("mer", 1.0),
wil=m.get("wil", 1.0),
reference_length=m.get("reference_length", 0),
hypothesis_length=m.get("hypothesis_length", 0),
error=m.get("error"),
)
results.append(DocumentResult(
doc_id=d["doc_id"],
image_path=d.get("image_path", ""),
ground_truth=d.get("ground_truth", ""),
hypothesis=d.get("hypothesis", ""),
metrics=metrics,
duration_seconds=d.get("duration_seconds", 0.0),
engine_error=d.get("engine_error"),
ocr_intermediate=d.get("ocr_intermediate"),
pipeline_metadata=d.get("pipeline_metadata", {}),
confusion_matrix=d.get("confusion_matrix"),
char_scores=d.get("char_scores"),
taxonomy=d.get("taxonomy"),
structure=d.get("structure"),
image_quality=d.get("image_quality"),
line_metrics=d.get("line_metrics"),
hallucination_metrics=d.get("hallucination_metrics"),
))
except Exception as e:
logger.warning("Impossible de charger les résultats partiels '%s' : %s", path, e)
results = []
return path, results
def _save_partial_line(partial_path: Path, doc_result: DocumentResult) -> None:
"""Ajoute une entrée NDJSON au fichier de résultats partiels (thread-safe)."""
try:
line = json.dumps(doc_result.as_dict(), ensure_ascii=False) + "\n"
with _partial_write_lock:
with partial_path.open("a", encoding="utf-8") as fh:
fh.write(line)
except Exception as e:
logger.warning("Impossible d'écrire dans le fichier partiel '%s' : %s", partial_path, e)
def _delete_partial(partial_path: Path) -> None:
"""Supprime le fichier de résultats partiels à la fin d'un moteur."""
try:
if partial_path.exists():
partial_path.unlink()
except Exception as e:
logger.warning("Impossible de supprimer le fichier partiel '%s' : %s", partial_path, e)
__all__ = [
"_delete_partial",
"_load_partial",
"_partial_path",
"_partial_write_lock",
"_sanitize_filename",
"_save_partial_line",
]