Spaces:
Sleeping
Sleeping
File size: 20,719 Bytes
a705e16 de9192c a705e16 ff7895c de9192c a705e16 5e48c0b a705e16 de9192c 5e48c0b a705e16 5112943 5e48c0b a705e16 ff7895c a705e16 ff7895c a705e16 ff7895c a705e16 ff7895c a705e16 de9192c a705e16 5112943 a705e16 5e48c0b a705e16 5112943 a705e16 5e48c0b a705e16 5112943 a705e16 5112943 a705e16 5e48c0b a705e16 5112943 a705e16 5112943 a705e16 5e48c0b a705e16 5112943 a705e16 5112943 a705e16 5112943 a705e16 5112943 a705e16 5e48c0b a705e16 5112943 a705e16 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 | """Sprint D.2.b β reprise sur interruption (``partial_dir``) dans
``run_benchmark_via_service``.
Couvre :
- Helpers ``picarones.app.services.partial_store`` (chemin,
sΓ©rialisation NDJSON, tolΓ©rance aux lignes corrompues).
- Comportement bout-en-bout de ``run_benchmark_via_service`` quand
``partial_dir`` est fourni :
reprise depuis un partial existant, suppression Γ la fin d'un
engine traité avec succès, isolation per-engine.
"""
from __future__ import annotations
import json
import threading
from pathlib import Path
import pytest
from picarones.adapters.ocr.base import BaseOCRAdapter
from picarones.domain.artifacts import Artifact, ArtifactType
from picarones.app.services.partial_store import (
_delete_partial,
_load_partial,
_partial_path,
_sanitize_filename,
_save_partial_line,
partial_path_for_engine,
)
from picarones.app.services.benchmark_runner import (
_engine_config_for_fingerprint,
)
from tests._migration_helpers import run_via_orchestrator
def _partial_path_for_run(corpus, engine, partial_dir):
"""Helper test β calcule le chemin partial avec le fingerprint
que le runner utilisera par dΓ©faut (pas de normalisation, pas
de char_exclude, profil ``standard``). Phase 2.3 du chantier
post-rewrite : la clΓ© partial inclut dΓ©sormais un fingerprint
pour empΓͺcher la rΓ©utilisation accidentelle entre runs avec
configs diffΓ©rentes."""
import importlib
try:
code_version = importlib.import_module("picarones").__version__
except (ImportError, AttributeError):
code_version = "unknown"
return partial_path_for_engine(
corpus=corpus,
engine=engine,
partial_dir=partial_dir,
engine_config=_engine_config_for_fingerprint(engine),
normalization_profile=None,
char_exclude=None,
profile="standard",
code_version=code_version,
)
from picarones.evaluation.benchmark_result import DocumentResult
from picarones.evaluation.corpus import Corpus, Document
from picarones.evaluation.metric_result import MetricsResult
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Mocks
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class _MockOCR(BaseOCRAdapter):
"""Adapter canonique minimal pour les tests.
Compat ergonomique avec le pattern legacy : un test peut faire
``ocr._run_ocr = lambda p: "..."`` après construction pour
customiser la sortie ; le mock l'invoque depuis ``execute()``.
Sans override, retourne ``"ocr text"`` par dΓ©faut.
"""
def __init__(self, name: str = "mock_ocr") -> None:
self._name = name
@property
def name(self) -> str:
return self._name
def execute(self, inputs, params, context):
from pathlib import Path
out_dir = Path(context.workspace_uri)
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / f"{context.document_id}_mock.txt"
runtime_override = getattr(self, "_run_ocr", None)
if callable(runtime_override):
text = runtime_override(out_path)
else:
text = "ocr text"
out_path.write_text(text, encoding="utf-8")
return {
ArtifactType.RAW_TEXT: Artifact(
id=f"{context.document_id}:{self._name}:raw_text",
document_id=context.document_id,
type=ArtifactType.RAW_TEXT,
produced_by_step="ocr",
uri=str(out_path),
),
}
def _make_doc_result(doc_id: str, hyp: str = "h", cer: float = 0.1) -> DocumentResult:
return DocumentResult(
doc_id=doc_id,
image_path=f"/tmp/{doc_id}.png",
ground_truth="g",
hypothesis=hyp,
metrics=MetricsResult(
cer=cer,
cer_nfc=cer,
cer_caseless=cer,
wer=cer,
wer_normalized=cer,
mer=cer,
wil=cer,
reference_length=1,
hypothesis_length=1,
),
duration_seconds=0.5,
)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# 1. Helpers partial_store
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestSanitizeFilename:
def test_keeps_word_chars_and_dash(self) -> None:
assert _sanitize_filename("abc-123_def") == "abc-123_def"
def test_replaces_special_chars(self) -> None:
assert _sanitize_filename("a/b:c d") == "a_b_c_d"
def test_truncates_to_64_chars(self) -> None:
result = _sanitize_filename("a" * 100)
assert len(result) == 64
assert result == "a" * 64
class TestPartialPath:
def test_uses_partial_dir(self, tmp_path: Path) -> None:
path = _partial_path("corpus_x", "engine_y", tmp_path)
assert path.parent == tmp_path
assert "corpus_x" in path.name
assert "engine_y" in path.name
assert path.suffix == ".jsonl"
def test_sanitizes_names_in_path(self, tmp_path: Path) -> None:
path = _partial_path("c/orpus", "engine:a", tmp_path)
# Pas de slash rΓ©siduel dans le filename β uniquement dans
# le dirname (tmp_path).
assert "/" not in path.name
assert ":" not in path.name
def test_none_partial_dir_falls_back_to_tempdir(self) -> None:
import tempfile
path = _partial_path("c", "e", None)
assert path.parent == Path(tempfile.gettempdir())
class TestSaveAndLoad:
def test_round_trip_single_result(self, tmp_path: Path) -> None:
path = tmp_path / "r.jsonl"
dr = _make_doc_result("doc1", hyp="hello", cer=0.05)
_save_partial_line(path, dr)
loaded = _load_partial(path)
assert len(loaded) == 1
assert loaded[0].doc_id == "doc1"
assert loaded[0].hypothesis == "hello"
assert loaded[0].metrics.cer == pytest.approx(0.05)
def test_round_trip_preserves_optional_fields(self, tmp_path: Path) -> None:
path = tmp_path / "r.jsonl"
dr = _make_doc_result("doc1")
dr.ocr_intermediate = "intermediate"
dr.pipeline_metadata = {"mode": "post_correction_texte"}
_save_partial_line(path, dr)
loaded = _load_partial(path)
assert loaded[0].ocr_intermediate == "intermediate"
assert loaded[0].pipeline_metadata == {"mode": "post_correction_texte"}
def test_appends_multiple_results(self, tmp_path: Path) -> None:
path = tmp_path / "r.jsonl"
for i in range(3):
_save_partial_line(path, _make_doc_result(f"doc{i}"))
loaded = _load_partial(path)
assert [d.doc_id for d in loaded] == ["doc0", "doc1", "doc2"]
def test_empty_file_returns_empty_list(self, tmp_path: Path) -> None:
path = tmp_path / "empty.jsonl"
path.write_text("", encoding="utf-8")
assert _load_partial(path) == []
def test_missing_file_returns_empty_list(self, tmp_path: Path) -> None:
path = tmp_path / "nope.jsonl"
assert _load_partial(path) == []
def test_corrupted_line_is_skipped(
self, tmp_path: Path, caplog: pytest.LogCaptureFixture,
) -> None:
path = tmp_path / "r.jsonl"
# Une ligne valide + une corrompue + une valide.
_save_partial_line(path, _make_doc_result("doc0"))
with path.open("a", encoding="utf-8") as fh:
fh.write("not valid json\n")
_save_partial_line(path, _make_doc_result("doc2"))
with caplog.at_level("WARNING"):
loaded = _load_partial(path)
assert [d.doc_id for d in loaded] == ["doc0", "doc2"]
def test_save_creates_parent_directory(self, tmp_path: Path) -> None:
path = tmp_path / "subdir" / "r.jsonl"
_save_partial_line(path, _make_doc_result("doc0"))
assert path.exists()
def test_concurrent_writes_are_safe(self, tmp_path: Path) -> None:
"""Le lock module-level sΓ©rialise les appends β le fichier ne
contient jamais une ligne tronquΓ©e mΓͺme avec N threads."""
path = tmp_path / "concurrent.jsonl"
n_threads = 8
per_thread = 10
def writer(tid: int) -> None:
for i in range(per_thread):
_save_partial_line(path, _make_doc_result(f"t{tid}_d{i}"))
threads = [threading.Thread(target=writer, args=(t,)) for t in range(n_threads)]
for t in threads:
t.start()
for t in threads:
t.join()
loaded = _load_partial(path)
assert len(loaded) == n_threads * per_thread
# Tous les doc_ids sont uniques et bien formΓ©s.
assert len({d.doc_id for d in loaded}) == n_threads * per_thread
class TestDelete:
def test_delete_existing_file(self, tmp_path: Path) -> None:
path = tmp_path / "r.jsonl"
path.write_text("x\n", encoding="utf-8")
_delete_partial(path)
assert not path.exists()
def test_delete_missing_file_is_noop(self, tmp_path: Path) -> None:
path = tmp_path / "nope.jsonl"
# Ne lève pas.
_delete_partial(path)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# 2. Resume bout-en-bout dans run_benchmark_via_service
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestResumeViaPartialDir:
"""Sprint D.2.b β quand ``partial_dir`` est fourni,
``run_benchmark_via_service`` reprend depuis l'Γ©ventuel partial
existant et persiste chaque ``DocumentResult`` au fil de l'eau."""
def _make_corpus(self, tmp_path: Path, n: int = 3) -> Corpus:
docs = []
for i in range(n):
img = tmp_path / f"doc{i}.png"
img.write_bytes(b"x")
docs.append(Document(
image_path=img,
ground_truth=f"gt {i}",
doc_id=f"doc{i}",
))
return Corpus(name="resume_test", documents=docs)
def test_fresh_run_deletes_partial_on_success(self, tmp_path: Path) -> None:
partial_dir = tmp_path / "partials"
corpus = self._make_corpus(tmp_path, n=2)
ocr = _MockOCR(name="resumable")
ocr._run_ocr = lambda p: "match"
bm = run_via_orchestrator(
corpus, [ocr], partial_dir=partial_dir,
)
assert bm.document_count == 2
# Plus aucun fichier partial pour cet engine après succès.
partial_path = _partial_path_for_run(corpus, ocr, partial_dir)
assert not partial_path.exists()
@pytest.mark.skip(reason=(
"Phase B4 migration Option B (2026-05) : ce test prΓ©-Γ©crit un "
"partial au format legacy ``partial_store._save_partial_line`` "
"(sΓ©rialise DocumentResult) qui n'est pas compatible avec le "
"format pipeline-pivoted de ``_orchestrator_partial.py`` "
"(sΓ©rialise PipelineResult). La sΓ©mantique resume du "
"RunOrchestrator est couverte par TestParityPartialDir dans "
"tests/app/services/test_run_orchestrator_feature_parity.py. "
"Retrait dΓ©finitif prΓ©vu Phase B8."
))
def test_resume_skips_already_done_docs(self, tmp_path: Path) -> None:
"""Si un partial existe avec doc0 dΓ©jΓ calculΓ©, le run ne
rΓ©-invoque pas l'engine pour doc0 β il prend le rΓ©sultat
partiel tel quel."""
partial_dir = tmp_path / "partials"
partial_dir.mkdir()
corpus = self._make_corpus(tmp_path, n=3)
ocr = _MockOCR(name="resumable2")
# On compte combien de fois l'engine est appelΓ©.
call_count = {"n": 0}
def counting_ocr(p):
call_count["n"] += 1
return "match"
ocr._run_ocr = counting_ocr
# PrΓ©-Γ©crire un partial pour doc0 avec une CER fictive de 0.99
# pour vΓ©rifier qu'on prend la valeur du partial, pas une
# nouvelle exΓ©cution.
partial_path = _partial_path_for_run(corpus, ocr, partial_dir)
pre_existing = _make_doc_result("doc0", hyp="from_partial", cer=0.99)
_save_partial_line(partial_path, pre_existing)
bm = run_via_orchestrator(
corpus, [ocr], partial_dir=partial_dir,
)
# L'engine n'a Γ©tΓ© appelΓ© que pour doc1 + doc2 (pas doc0).
assert call_count["n"] == 2
# Le rΓ©sultat final contient bien les 3 docs, doc0 venant
# du partial (CER 0.99).
report = bm.engine_reports[0]
assert len(report.document_results) == 3
doc0_result = next(d for d in report.document_results if d.doc_id == "doc0")
assert doc0_result.hypothesis == "from_partial"
assert doc0_result.metrics.cer == pytest.approx(0.99)
@pytest.mark.skip(reason=(
"Phase B4 migration β partial prΓ©-Γ©crit au format legacy "
"incompatible avec _orchestrator_partial. Couvert par "
"TestParityPartialDir."
))
def test_all_docs_already_done_skips_engine_entirely(
self, tmp_path: Path,
) -> None:
partial_dir = tmp_path / "partials"
partial_dir.mkdir()
corpus = self._make_corpus(tmp_path, n=2)
ocr = _MockOCR(name="alldone")
ocr._run_ocr = lambda p: pytest.fail(
"Engine ne devrait pas Γͺtre appelΓ© β tout est dans le partial.",
)
partial_path = _partial_path_for_run(corpus, ocr, partial_dir)
for i in range(2):
_save_partial_line(
partial_path, _make_doc_result(f"doc{i}", hyp=f"prefilled{i}"),
)
bm = run_via_orchestrator(
corpus, [ocr], partial_dir=partial_dir,
)
report = bm.engine_reports[0]
assert len(report.document_results) == 2
# Ordre du corpus original prΓ©servΓ©.
assert [d.doc_id for d in report.document_results] == ["doc0", "doc1"]
assert [d.hypothesis for d in report.document_results] == [
"prefilled0", "prefilled1",
]
@pytest.mark.skip(reason=(
"Phase B4 migration β isolation per-engine du legacy partial_store, "
"format incompatible. L'isolation per-pipeline du RunOrchestrator "
"est testΓ©e via TestParityPartialDir."
))
def test_per_engine_isolation(self, tmp_path: Path) -> None:
"""Deux engines ont chacun leur propre fichier partial β un
partial pour engine_a ne pollue pas engine_b."""
partial_dir = tmp_path / "partials"
partial_dir.mkdir()
corpus = self._make_corpus(tmp_path, n=2)
ocr_a = _MockOCR(name="engine_a")
ocr_a._run_ocr = lambda p: "from_a"
ocr_b = _MockOCR(name="engine_b")
ocr_b._run_ocr = lambda p: "from_b"
# PrΓ©-remplir uniquement le partial de engine_a pour doc0.
partial_a = _partial_path_for_run(corpus, ocr_a, partial_dir)
_save_partial_line(
partial_a, _make_doc_result("doc0", hyp="A_pre"),
)
bm = run_via_orchestrator(
corpus, [ocr_a, ocr_b], partial_dir=partial_dir,
)
report_a = next(r for r in bm.engine_reports if r.engine_name == "engine_a")
report_b = next(r for r in bm.engine_reports if r.engine_name == "engine_b")
# engine_a : doc0 vient du partial, doc1 calculΓ©.
a_doc0 = next(d for d in report_a.document_results if d.doc_id == "doc0")
assert a_doc0.hypothesis == "A_pre"
# engine_b : doc0 calculΓ© from_b (pas de partial pour B).
b_doc0 = next(d for d in report_b.document_results if d.doc_id == "doc0")
assert b_doc0.hypothesis == "from_b"
def test_partial_files_removed_on_success(self, tmp_path: Path) -> None:
partial_dir = tmp_path / "partials"
corpus = self._make_corpus(tmp_path, n=2)
engines = [_MockOCR(name=f"e{i}") for i in range(3)]
for e in engines:
e._run_ocr = lambda p: "match"
run_via_orchestrator(
corpus, engines, partial_dir=partial_dir,
)
# Aucun fichier partial ne survit après un run réussi.
leftovers = list(partial_dir.glob("*.partial.jsonl"))
assert leftovers == [], f"partials rΓ©siduels : {leftovers}"
def test_no_partial_dir_keeps_unified_path(self, tmp_path: Path) -> None:
"""Sans ``partial_dir``, le code garde le chemin rapide
unifié (pas de fichiers partiels créés)."""
corpus = self._make_corpus(tmp_path, n=2)
ocr = _MockOCR(name="no_partial")
ocr._run_ocr = lambda p: "match"
bm = run_via_orchestrator(corpus, [ocr])
assert bm.document_count == 2
# Aucun .partial.jsonl créé dans tmp_path car le chemin
# unifiΓ© n'Γ©crit pas de partials.
leftovers = list(tmp_path.rglob("*.partial.jsonl"))
assert leftovers == []
@pytest.mark.skip(reason=(
"Phase B4 migration β partial prΓ©-Γ©crit au format legacy. "
"Couvert par TestParityPartialDir.test_partial_dir_fingerprint_isolation."
))
def test_partial_persists_when_engine_was_not_finished(
self, tmp_path: Path,
) -> None:
"""Si le run a rΓ©ussi pour engine_a (partial supprimΓ©) mais
seuls 1/2 docs sont dans le partial de engine_b avant
cancel, le partial de engine_b doit survivre pour reprise."""
partial_dir = tmp_path / "partials"
partial_dir.mkdir()
corpus = self._make_corpus(tmp_path, n=2)
# Simulation d'un Γ©tat post-crash : engine_b a un partial
# avec doc0 mais pas doc1. cancel_event signalΓ© avant
# l'engine suivant.
ocr_b = _MockOCR(name="incomplete_b")
partial_b = _partial_path_for_run(corpus, ocr_b, partial_dir)
_save_partial_line(
partial_b, _make_doc_result("doc0", hyp="B0_pre"),
)
# cancel_event signalΓ© β on n'entre pas dans la boucle
# engine. Pas de docs traitΓ©s pendant ce run.
cancel = threading.Event()
cancel.set()
bm = run_via_orchestrator(
corpus, [ocr_b],
partial_dir=partial_dir,
cancel_event=cancel,
)
# Aucun engine traitΓ© (cancel prΓ©-engine).
assert bm.engine_reports == []
# Le partial de engine_b est prΓ©servΓ© pour la prochaine
# exΓ©cution.
assert partial_b.exists()
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# 3. SΓ©rialisation NDJSON cross-process
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TestNDJSONFormat:
"""Le format NDJSON (une ligne JSON par document) est ce qui
rend la reprise robuste : un crash mid-write tronque au pire
une ligne ; toutes les lignes complètes restent lisibles."""
def test_one_json_per_line(self, tmp_path: Path) -> None:
path = tmp_path / "r.jsonl"
_save_partial_line(path, _make_doc_result("doc0"))
_save_partial_line(path, _make_doc_result("doc1"))
lines = path.read_text(encoding="utf-8").splitlines()
assert len(lines) == 2
for line in lines:
payload = json.loads(line)
assert "doc_id" in payload
assert "metrics" in payload
def test_unicode_preserved_in_hypothesis(self, tmp_path: Path) -> None:
path = tmp_path / "r.jsonl"
dr = _make_doc_result("doc1")
dr.hypothesis = "Γglise β Ε Γ§ Γ Γ©"
_save_partial_line(path, dr)
loaded = _load_partial(path)
assert loaded[0].hypothesis == "Γglise β Ε Γ§ Γ Γ©"
|