Picarones / tests /app /test_run_orchestrator.py
Claude
feat(adapters/llm): Sprint A14-S44 β€” BaseLLMAdapter implΓ©mente StepExecutor
dd0db4e unverified
Raw
History Blame
19.7 kB
"""Tests unitaires de :class:`RunOrchestrator` (couche ``app/services/``).
Le ``RunOrchestrator`` est testΓ© ici **directement** (sans passer par
la CLI Click). Les tests ``tests/cli/test_sprint_a14_s24_run_command.py``
le testent indirectement via le wrapper Click β€” c'est complΓ©mentaire
mais pas suffisant pour vΓ©rifier le contrat du service.
Couverture
----------
- ``execute()`` retourne un :class:`OrchestrationResult` complet
(run_result, extracted_corpus_dir, persisted_files, report_path).
- ``report_renderer=None`` ne gΓ©nΓ¨re aucun rapport, mΓͺme si
``spec.report_html`` est renseignΓ©.
- ``report_renderer=callable`` SANS ``spec.report_html`` ne génère
rien (l'orchestrateur ne dΓ©cide pas seul d'un chemin).
- ``report_renderer=callable`` ET ``spec.report_html`` β†’ invocation
du renderer avec le ``RunResult``, ``output_path`` et ``lang``.
- Le corpus chargΓ© est sandboxΓ© sous l'``output_dir`` du caller.
- Les 3 fichiers persistΓ©s sont Γ©crits dans ``output_dir/results/``.
- Une ``CorpusImportError`` (corpus invalide) propage proprement.
- Une ``RunSpecLoadError`` (adapter dotted-path inconnu) propage
proprement.
- Le helper ``_default_gt_factory`` traite ``CORRECTED_TEXT`` comme
comparable Γ  la GT ``RAW_TEXT`` (les deux sont du texte plat).
- Le helper ``_default_inputs_factory`` lève quand ``image_uri`` est
absent.
- Le ``_filesystem_payload_loader`` lit RAW_TEXT/CORRECTED_TEXT/
ALTO_XML, lève sur type non géré ou URI absent.
- Disambiguation ``_build_pipelines`` : 2 pipelines avec la mΓͺme
classe d'adapter mais des kwargs distincts β†’ 2 instances
distinctes (cas ``PrecomputedTextAdapter`` Γ— ``source_label``).
"""
from __future__ import annotations
import io
import textwrap
import zipfile
from pathlib import Path
import pytest
from picarones.app.results import RunResult
from picarones.app.schemas import load_run_spec_from_yaml
from picarones.app.services import (
CorpusImportError,
OrchestrationResult,
RunOrchestrator,
)
from picarones.app.services.run_orchestrator import (
_default_gt_factory,
_default_inputs_factory,
_filesystem_payload_loader,
_kwargs_signature,
_make_context_factory,
)
from picarones.app.schemas.run_spec import RunSpecLoadError
from picarones.domain.artifacts import Artifact, ArtifactType
from picarones.domain.documents import DocumentRef, GroundTruthRef
# ──────────────────────────────────────────────────────────────────
# Helpers communs
# ──────────────────────────────────────────────────────────────────
def _png_bytes() -> bytes:
return (
b"\x89PNG\r\n\x1a\n"
b"\x00\x00\x00\rIHDR"
b"\x00\x00\x00\x01\x00\x00\x00\x01\x08\x06\x00\x00\x00"
b"\x1f\x15\xc4\x89"
)
def _make_corpus_zip(n_docs: int = 2) -> bytes:
buf = io.BytesIO()
with zipfile.ZipFile(buf, mode="w") as zf:
for i in range(1, n_docs + 1):
doc_id = f"doc{i:02d}"
zf.writestr(f"{doc_id}.png", _png_bytes())
zf.writestr(f"{doc_id}.gt.txt", "Bonjour le monde")
# Source prΓ©-calculΓ©e pour PrecomputedTextAdapter.
zf.writestr(f"{doc_id}.tess.txt", "Bonjour le monde")
return buf.getvalue()
def _build_spec_yaml(
*,
corpus_zip: Path,
output_dir: Path,
report_html: str | None = None,
) -> str:
base = textwrap.dedent(f"""
corpus_zip: {corpus_zip}
corpus_name: orchestrator_test
pipelines:
- name: tess_only
initial_inputs: [image]
steps:
- id: ocr
adapter_class: picarones.adapters.ocr.precomputed.PrecomputedTextAdapter
adapter_kwargs:
source_label: tess
input_types: [image]
output_types: [raw_text]
views: [text_final]
output_dir: {output_dir}
code_version: "1.0.0-orch-test"
""")
if report_html is not None:
base += f"report_html: {report_html}\n"
return base
# ──────────────────────────────────────────────────────────────────
# Cycle de vie ``execute()``
# ──────────────────────────────────────────────────────────────────
def _stub_renderer_called(records: list) -> "callable":
"""CrΓ©e un renderer qui enregistre ses appels et Γ©crit un fichier
minimal. Utile pour vΓ©rifier l'invocation sans dΓ©pendre de
``HtmlReportRenderer``."""
def _render(result: RunResult, output_path: Path, lang: str) -> Path:
records.append({"corpus": result.manifest.corpus_name, "lang": lang})
output_path.write_text(f"stub:{lang}", encoding="utf-8")
return output_path
return _render
class TestExecuteHappyPath:
def test_returns_orchestration_result_complete(
self, tmp_path: Path,
) -> None:
corpus_zip = tmp_path / "c.zip"
corpus_zip.write_bytes(_make_corpus_zip(n_docs=2))
out_dir = tmp_path / "out"
spec = load_run_spec_from_yaml(
_build_spec_yaml(corpus_zip=corpus_zip, output_dir=out_dir),
)
orchestrator = RunOrchestrator(out_dir)
result = orchestrator.execute(spec)
assert isinstance(result, OrchestrationResult)
assert isinstance(result.run_result, RunResult)
assert result.run_result.n_documents == 2
assert result.run_result.manifest.corpus_name == "orchestrator_test"
# Corpus extrait sous le workspace. ``.resolve()`` normalise
# cross-OS (macOS rΓ©sout ``/var/folders/...`` β†’
# ``/private/var/folders/...``).
assert result.extracted_corpus_dir.exists()
assert result.extracted_corpus_dir.resolve().is_relative_to(
out_dir.resolve(),
)
# S41 β€” 4 fichiers persistΓ©s (artifacts_index sΓ©parΓ©).
assert set(result.persisted_files) == {
"manifest", "pipeline_results", "artifacts_index", "view_results",
}
for path in result.persisted_files.values():
assert path.exists()
assert path.resolve().is_relative_to(out_dir.resolve())
# Pas de rapport car aucun renderer fourni.
assert result.report_path is None
def test_persisted_files_under_results_subdir(
self, tmp_path: Path,
) -> None:
corpus_zip = tmp_path / "c.zip"
corpus_zip.write_bytes(_make_corpus_zip())
out_dir = tmp_path / "out"
spec = load_run_spec_from_yaml(
_build_spec_yaml(corpus_zip=corpus_zip, output_dir=out_dir),
)
result = RunOrchestrator(out_dir).execute(spec)
expected_parent = (out_dir / "results").resolve()
for path in result.persisted_files.values():
assert path.parent.resolve() == expected_parent
class TestReportRendererInjection:
def test_no_renderer_skips_report_even_with_spec_path(
self, tmp_path: Path,
) -> None:
corpus_zip = tmp_path / "c.zip"
corpus_zip.write_bytes(_make_corpus_zip())
out_dir = tmp_path / "out"
report_path = out_dir / "rapport.html"
spec = load_run_spec_from_yaml(_build_spec_yaml(
corpus_zip=corpus_zip,
output_dir=out_dir,
report_html=str(report_path),
))
result = RunOrchestrator(out_dir).execute(spec, report_renderer=None)
assert result.report_path is None
assert not report_path.exists()
def test_renderer_without_spec_path_skips(
self, tmp_path: Path,
) -> None:
corpus_zip = tmp_path / "c.zip"
corpus_zip.write_bytes(_make_corpus_zip())
out_dir = tmp_path / "out"
spec = load_run_spec_from_yaml(_build_spec_yaml(
corpus_zip=corpus_zip,
output_dir=out_dir,
report_html=None,
))
records: list[dict] = []
result = RunOrchestrator(out_dir).execute(
spec, report_renderer=_stub_renderer_called(records),
)
assert result.report_path is None
assert records == [] # renderer pas invoquΓ©
def test_renderer_invoked_when_both_present(
self, tmp_path: Path,
) -> None:
corpus_zip = tmp_path / "c.zip"
corpus_zip.write_bytes(_make_corpus_zip())
out_dir = tmp_path / "out"
report_path = out_dir / "rapport.html"
spec = load_run_spec_from_yaml(_build_spec_yaml(
corpus_zip=corpus_zip,
output_dir=out_dir,
report_html=str(report_path),
))
records: list[dict] = []
result = RunOrchestrator(out_dir).execute(
spec, report_renderer=_stub_renderer_called(records),
)
assert result.report_path == report_path
assert report_path.exists()
assert report_path.read_text(encoding="utf-8").startswith("stub:")
assert records == [
{"corpus": "orchestrator_test", "lang": "fr"},
]
# ──────────────────────────────────────────────────────────────────
# Erreurs typΓ©es propagΓ©es
# ──────────────────────────────────────────────────────────────────
class TestErrorPropagation:
def test_corpus_dir_inexistant_raises(self, tmp_path: Path) -> None:
out_dir = tmp_path / "out"
spec = load_run_spec_from_yaml(textwrap.dedent(f"""
corpus_dir: {tmp_path / "does_not_exist"}
pipelines:
- name: p
initial_inputs: [image]
steps:
- id: ocr
adapter_class: picarones.adapters.ocr.precomputed.PrecomputedTextAdapter
adapter_kwargs:
source_label: tess
input_types: [image]
output_types: [raw_text]
views: [text_final]
output_dir: {out_dir}
"""))
with pytest.raises(CorpusImportError, match="n'est pas un rΓ©pertoire"):
RunOrchestrator(out_dir).execute(spec)
def test_unknown_adapter_class_raises(self, tmp_path: Path) -> None:
corpus_zip = tmp_path / "c.zip"
corpus_zip.write_bytes(_make_corpus_zip())
out_dir = tmp_path / "out"
spec = load_run_spec_from_yaml(textwrap.dedent(f"""
corpus_zip: {corpus_zip}
pipelines:
- name: p
initial_inputs: [image]
steps:
- id: ocr
adapter_class: tests.does_not_exist.Nope
input_types: [image]
output_types: [raw_text]
views: [text_final]
output_dir: {out_dir}
"""))
with pytest.raises(RunSpecLoadError, match="introuvable"):
RunOrchestrator(out_dir).execute(spec)
# ──────────────────────────────────────────────────────────────────
# Disambiguation des adapters
# ──────────────────────────────────────────────────────────────────
class TestPipelineDisambiguation:
def test_same_class_different_kwargs_yields_distinct_instances(
self, tmp_path: Path,
) -> None:
"""Cas BnF : 2 pipelines utilisent ``PrecomputedTextAdapter``
mais avec ``source_label`` diffΓ©rents β†’ ils doivent recevoir
des instances distinctes (sinon le 2Γ¨me lirait les fichiers
du 1er)."""
# Corpus avec 2 sources prΓ©-calculΓ©es diffΓ©rentes.
buf = io.BytesIO()
with zipfile.ZipFile(buf, mode="w") as zf:
zf.writestr("doc01.png", _png_bytes())
zf.writestr("doc01.gt.txt", "Bonjour")
zf.writestr("doc01.tess.txt", "Bonjour") # source 1
zf.writestr("doc01.gpt4v.txt", "Bonjur") # source 2 (1 erreur)
corpus_zip = tmp_path / "c.zip"
corpus_zip.write_bytes(buf.getvalue())
out_dir = tmp_path / "out"
spec = load_run_spec_from_yaml(textwrap.dedent(f"""
corpus_zip: {corpus_zip}
pipelines:
- name: tess
initial_inputs: [image]
steps:
- id: ocr
adapter_class: picarones.adapters.ocr.precomputed.PrecomputedTextAdapter
adapter_kwargs:
source_label: tess
input_types: [image]
output_types: [raw_text]
- name: gpt
initial_inputs: [image]
steps:
- id: ocr
adapter_class: picarones.adapters.ocr.precomputed.PrecomputedTextAdapter
adapter_kwargs:
source_label: gpt4v
input_types: [image]
output_types: [raw_text]
views: [text_final]
output_dir: {out_dir}
"""))
result = RunOrchestrator(out_dir).execute(spec)
# 1 doc Γ— 2 pipelines = 2 ViewResult. Ils doivent avoir des
# candidate_artifact_id distincts (preuves d'instances distinctes).
view_results = result.run_result.view_results_for("text_final")
owners = {
"tess" if "precomputed_tess" in vr.candidate_artifact_id and "tess:" in vr.candidate_artifact_id
else "gpt" if "precomputed_gpt4v" in vr.candidate_artifact_id else "?"
for vr in view_results
}
# Au moins 2 owners distincts.
assert len(owners) >= 2
# ──────────────────────────────────────────────────────────────────
# Helpers privΓ©s (importΓ©s directement pour couverture explicite)
# ──────────────────────────────────────────────────────────────────
class TestDefaultGtFactory:
def test_returns_artifact_for_present_gt(self) -> None:
doc = DocumentRef(
id="doc01",
ground_truths=(
GroundTruthRef(type=ArtifactType.RAW_TEXT, uri="/path/gt.txt"),
),
)
gt = _default_gt_factory(doc, ArtifactType.RAW_TEXT)
assert gt is not None
assert gt.type == ArtifactType.RAW_TEXT
assert gt.uri == "/path/gt.txt"
def test_corrected_text_falls_back_to_raw_text_gt(self) -> None:
"""Convention : un candidat CORRECTED_TEXT est comparΓ© contre
la GT RAW_TEXT (les deux sont du texte plat)."""
doc = DocumentRef(
id="doc01",
ground_truths=(
GroundTruthRef(type=ArtifactType.RAW_TEXT, uri="/path/gt.txt"),
),
)
gt = _default_gt_factory(doc, ArtifactType.CORRECTED_TEXT)
assert gt is not None
assert gt.type == ArtifactType.RAW_TEXT # fallback explicite
def test_returns_none_when_gt_absent(self) -> None:
doc = DocumentRef(id="doc01", ground_truths=())
gt = _default_gt_factory(doc, ArtifactType.RAW_TEXT)
assert gt is None
class TestDefaultInputsFactory:
def test_returns_image_artifact(self) -> None:
doc = DocumentRef(id="doc01", image_uri="/path/img.png")
inputs = _default_inputs_factory(doc)
assert ArtifactType.IMAGE in inputs
assert inputs[ArtifactType.IMAGE].uri == "/path/img.png"
def test_raises_when_image_uri_absent(self) -> None:
doc = DocumentRef(id="doc01")
with pytest.raises(CorpusImportError, match="sans ``image_uri``"):
_default_inputs_factory(doc)
class TestContextFactory:
def test_factory_propagates_code_version(self) -> None:
factory = _make_context_factory("1.2.3")
doc = DocumentRef(id="doc01", image_uri="/x")
ctx = factory(doc, "my_pipeline")
assert ctx.document_id == "doc01"
assert ctx.code_version == "1.2.3"
assert ctx.pipeline_name == "my_pipeline"
class TestFilesystemPayloadLoader:
def test_loads_raw_text(self, tmp_path: Path) -> None:
path = tmp_path / "t.txt"
path.write_text("Hello", encoding="utf-8")
art = Artifact(
id="d:t", document_id="d", type=ArtifactType.RAW_TEXT, uri=str(path),
)
assert _filesystem_payload_loader(art) == "Hello"
def test_loads_corrected_text(self, tmp_path: Path) -> None:
path = tmp_path / "c.txt"
path.write_text("Bonjour", encoding="utf-8")
art = Artifact(
id="d:c", document_id="d", type=ArtifactType.CORRECTED_TEXT,
uri=str(path),
)
assert _filesystem_payload_loader(art) == "Bonjour"
def test_loads_alto_xml(self, tmp_path: Path) -> None:
from picarones.formats.alto.types import (
AltoBBox, AltoDocument, AltoLine, AltoPage, AltoString,
AltoTextBlock,
)
from picarones.formats.alto.writer import write_alto
doc = AltoDocument(pages=(AltoPage(blocks=(AltoTextBlock(lines=(AltoLine(strings=(
AltoString(content="Hi", bbox=AltoBBox(hpos=0, vpos=0, width=10, height=10)),
),),),),),),))
path = tmp_path / "a.xml"
path.write_bytes(write_alto(doc))
art = Artifact(
id="d:a", document_id="d", type=ArtifactType.ALTO_XML, uri=str(path),
)
loaded = _filesystem_payload_loader(art)
assert loaded.pages[0].blocks[0].lines[0].strings[0].content == "Hi"
def test_raises_on_missing_uri(self) -> None:
art = Artifact(
id="d:x", document_id="d", type=ArtifactType.RAW_TEXT,
)
with pytest.raises(FileNotFoundError, match="sans URI"):
_filesystem_payload_loader(art)
def test_raises_on_unsupported_type(self, tmp_path: Path) -> None:
path = tmp_path / "x.bin"
path.write_bytes(b"\x00" * 4)
art = Artifact(
id="d:x", document_id="d", type=ArtifactType.IMAGE, uri=str(path),
)
with pytest.raises(ValueError, match="non gΓ©rΓ©"):
_filesystem_payload_loader(art)
class TestKwargsSignature:
def test_empty_dict(self) -> None:
assert _kwargs_signature({}) == ""
def test_single_kwarg(self) -> None:
assert _kwargs_signature({"k": "v"}) == "k='v'"
def test_sorted_stable(self) -> None:
# Ordre d'insertion ne doit pas changer la signature.
sig_a = _kwargs_signature({"b": 2, "a": 1})
sig_b = _kwargs_signature({"a": 1, "b": 2})
assert sig_a == sig_b
def test_distinguishes_values(self) -> None:
assert (
_kwargs_signature({"k": 1})
!= _kwargs_signature({"k": 2})
)