Spaces:
Sleeping
Sleeping
Claude
feat(adapters/llm): Sprint A14-S44 β BaseLLMAdapter implΓ©mente StepExecutor
dd0db4e unverified | """Tests unitaires de :class:`RunOrchestrator` (couche ``app/services/``). | |
| Le ``RunOrchestrator`` est testΓ© ici **directement** (sans passer par | |
| la CLI Click). Les tests ``tests/cli/test_sprint_a14_s24_run_command.py`` | |
| le testent indirectement via le wrapper Click β c'est complΓ©mentaire | |
| mais pas suffisant pour vΓ©rifier le contrat du service. | |
| Couverture | |
| ---------- | |
| - ``execute()`` retourne un :class:`OrchestrationResult` complet | |
| (run_result, extracted_corpus_dir, persisted_files, report_path). | |
| - ``report_renderer=None`` ne gΓ©nΓ¨re aucun rapport, mΓͺme si | |
| ``spec.report_html`` est renseignΓ©. | |
| - ``report_renderer=callable`` SANS ``spec.report_html`` ne génère | |
| rien (l'orchestrateur ne dΓ©cide pas seul d'un chemin). | |
| - ``report_renderer=callable`` ET ``spec.report_html`` β invocation | |
| du renderer avec le ``RunResult``, ``output_path`` et ``lang``. | |
| - Le corpus chargΓ© est sandboxΓ© sous l'``output_dir`` du caller. | |
| - Les 3 fichiers persistΓ©s sont Γ©crits dans ``output_dir/results/``. | |
| - Une ``CorpusImportError`` (corpus invalide) propage proprement. | |
| - Une ``RunSpecLoadError`` (adapter dotted-path inconnu) propage | |
| proprement. | |
| - Le helper ``_default_gt_factory`` traite ``CORRECTED_TEXT`` comme | |
| comparable Γ la GT ``RAW_TEXT`` (les deux sont du texte plat). | |
| - Le helper ``_default_inputs_factory`` lève quand ``image_uri`` est | |
| absent. | |
| - Le ``_filesystem_payload_loader`` lit RAW_TEXT/CORRECTED_TEXT/ | |
| ALTO_XML, lève sur type non géré ou URI absent. | |
| - Disambiguation ``_build_pipelines`` : 2 pipelines avec la mΓͺme | |
| classe d'adapter mais des kwargs distincts β 2 instances | |
| distinctes (cas ``PrecomputedTextAdapter`` Γ ``source_label``). | |
| """ | |
| from __future__ import annotations | |
| import io | |
| import textwrap | |
| import zipfile | |
| from pathlib import Path | |
| import pytest | |
| from picarones.app.results import RunResult | |
| from picarones.app.schemas import load_run_spec_from_yaml | |
| from picarones.app.services import ( | |
| CorpusImportError, | |
| OrchestrationResult, | |
| RunOrchestrator, | |
| ) | |
| from picarones.app.services.run_orchestrator import ( | |
| _default_gt_factory, | |
| _default_inputs_factory, | |
| _filesystem_payload_loader, | |
| _kwargs_signature, | |
| _make_context_factory, | |
| ) | |
| from picarones.app.schemas.run_spec import RunSpecLoadError | |
| from picarones.domain.artifacts import Artifact, ArtifactType | |
| from picarones.domain.documents import DocumentRef, GroundTruthRef | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Helpers communs | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _png_bytes() -> bytes: | |
| return ( | |
| b"\x89PNG\r\n\x1a\n" | |
| b"\x00\x00\x00\rIHDR" | |
| b"\x00\x00\x00\x01\x00\x00\x00\x01\x08\x06\x00\x00\x00" | |
| b"\x1f\x15\xc4\x89" | |
| ) | |
| def _make_corpus_zip(n_docs: int = 2) -> bytes: | |
| buf = io.BytesIO() | |
| with zipfile.ZipFile(buf, mode="w") as zf: | |
| for i in range(1, n_docs + 1): | |
| doc_id = f"doc{i:02d}" | |
| zf.writestr(f"{doc_id}.png", _png_bytes()) | |
| zf.writestr(f"{doc_id}.gt.txt", "Bonjour le monde") | |
| # Source prΓ©-calculΓ©e pour PrecomputedTextAdapter. | |
| zf.writestr(f"{doc_id}.tess.txt", "Bonjour le monde") | |
| return buf.getvalue() | |
| def _build_spec_yaml( | |
| *, | |
| corpus_zip: Path, | |
| output_dir: Path, | |
| report_html: str | None = None, | |
| ) -> str: | |
| base = textwrap.dedent(f""" | |
| corpus_zip: {corpus_zip} | |
| corpus_name: orchestrator_test | |
| pipelines: | |
| - name: tess_only | |
| initial_inputs: [image] | |
| steps: | |
| - id: ocr | |
| adapter_class: picarones.adapters.ocr.precomputed.PrecomputedTextAdapter | |
| adapter_kwargs: | |
| source_label: tess | |
| input_types: [image] | |
| output_types: [raw_text] | |
| views: [text_final] | |
| output_dir: {output_dir} | |
| code_version: "1.0.0-orch-test" | |
| """) | |
| if report_html is not None: | |
| base += f"report_html: {report_html}\n" | |
| return base | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Cycle de vie ``execute()`` | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _stub_renderer_called(records: list) -> "callable": | |
| """CrΓ©e un renderer qui enregistre ses appels et Γ©crit un fichier | |
| minimal. Utile pour vΓ©rifier l'invocation sans dΓ©pendre de | |
| ``HtmlReportRenderer``.""" | |
| def _render(result: RunResult, output_path: Path, lang: str) -> Path: | |
| records.append({"corpus": result.manifest.corpus_name, "lang": lang}) | |
| output_path.write_text(f"stub:{lang}", encoding="utf-8") | |
| return output_path | |
| return _render | |
| class TestExecuteHappyPath: | |
| def test_returns_orchestration_result_complete( | |
| self, tmp_path: Path, | |
| ) -> None: | |
| corpus_zip = tmp_path / "c.zip" | |
| corpus_zip.write_bytes(_make_corpus_zip(n_docs=2)) | |
| out_dir = tmp_path / "out" | |
| spec = load_run_spec_from_yaml( | |
| _build_spec_yaml(corpus_zip=corpus_zip, output_dir=out_dir), | |
| ) | |
| orchestrator = RunOrchestrator(out_dir) | |
| result = orchestrator.execute(spec) | |
| assert isinstance(result, OrchestrationResult) | |
| assert isinstance(result.run_result, RunResult) | |
| assert result.run_result.n_documents == 2 | |
| assert result.run_result.manifest.corpus_name == "orchestrator_test" | |
| # Corpus extrait sous le workspace. ``.resolve()`` normalise | |
| # cross-OS (macOS rΓ©sout ``/var/folders/...`` β | |
| # ``/private/var/folders/...``). | |
| assert result.extracted_corpus_dir.exists() | |
| assert result.extracted_corpus_dir.resolve().is_relative_to( | |
| out_dir.resolve(), | |
| ) | |
| # S41 β 4 fichiers persistΓ©s (artifacts_index sΓ©parΓ©). | |
| assert set(result.persisted_files) == { | |
| "manifest", "pipeline_results", "artifacts_index", "view_results", | |
| } | |
| for path in result.persisted_files.values(): | |
| assert path.exists() | |
| assert path.resolve().is_relative_to(out_dir.resolve()) | |
| # Pas de rapport car aucun renderer fourni. | |
| assert result.report_path is None | |
| def test_persisted_files_under_results_subdir( | |
| self, tmp_path: Path, | |
| ) -> None: | |
| corpus_zip = tmp_path / "c.zip" | |
| corpus_zip.write_bytes(_make_corpus_zip()) | |
| out_dir = tmp_path / "out" | |
| spec = load_run_spec_from_yaml( | |
| _build_spec_yaml(corpus_zip=corpus_zip, output_dir=out_dir), | |
| ) | |
| result = RunOrchestrator(out_dir).execute(spec) | |
| expected_parent = (out_dir / "results").resolve() | |
| for path in result.persisted_files.values(): | |
| assert path.parent.resolve() == expected_parent | |
| class TestReportRendererInjection: | |
| def test_no_renderer_skips_report_even_with_spec_path( | |
| self, tmp_path: Path, | |
| ) -> None: | |
| corpus_zip = tmp_path / "c.zip" | |
| corpus_zip.write_bytes(_make_corpus_zip()) | |
| out_dir = tmp_path / "out" | |
| report_path = out_dir / "rapport.html" | |
| spec = load_run_spec_from_yaml(_build_spec_yaml( | |
| corpus_zip=corpus_zip, | |
| output_dir=out_dir, | |
| report_html=str(report_path), | |
| )) | |
| result = RunOrchestrator(out_dir).execute(spec, report_renderer=None) | |
| assert result.report_path is None | |
| assert not report_path.exists() | |
| def test_renderer_without_spec_path_skips( | |
| self, tmp_path: Path, | |
| ) -> None: | |
| corpus_zip = tmp_path / "c.zip" | |
| corpus_zip.write_bytes(_make_corpus_zip()) | |
| out_dir = tmp_path / "out" | |
| spec = load_run_spec_from_yaml(_build_spec_yaml( | |
| corpus_zip=corpus_zip, | |
| output_dir=out_dir, | |
| report_html=None, | |
| )) | |
| records: list[dict] = [] | |
| result = RunOrchestrator(out_dir).execute( | |
| spec, report_renderer=_stub_renderer_called(records), | |
| ) | |
| assert result.report_path is None | |
| assert records == [] # renderer pas invoquΓ© | |
| def test_renderer_invoked_when_both_present( | |
| self, tmp_path: Path, | |
| ) -> None: | |
| corpus_zip = tmp_path / "c.zip" | |
| corpus_zip.write_bytes(_make_corpus_zip()) | |
| out_dir = tmp_path / "out" | |
| report_path = out_dir / "rapport.html" | |
| spec = load_run_spec_from_yaml(_build_spec_yaml( | |
| corpus_zip=corpus_zip, | |
| output_dir=out_dir, | |
| report_html=str(report_path), | |
| )) | |
| records: list[dict] = [] | |
| result = RunOrchestrator(out_dir).execute( | |
| spec, report_renderer=_stub_renderer_called(records), | |
| ) | |
| assert result.report_path == report_path | |
| assert report_path.exists() | |
| assert report_path.read_text(encoding="utf-8").startswith("stub:") | |
| assert records == [ | |
| {"corpus": "orchestrator_test", "lang": "fr"}, | |
| ] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Erreurs typΓ©es propagΓ©es | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestErrorPropagation: | |
| def test_corpus_dir_inexistant_raises(self, tmp_path: Path) -> None: | |
| out_dir = tmp_path / "out" | |
| spec = load_run_spec_from_yaml(textwrap.dedent(f""" | |
| corpus_dir: {tmp_path / "does_not_exist"} | |
| pipelines: | |
| - name: p | |
| initial_inputs: [image] | |
| steps: | |
| - id: ocr | |
| adapter_class: picarones.adapters.ocr.precomputed.PrecomputedTextAdapter | |
| adapter_kwargs: | |
| source_label: tess | |
| input_types: [image] | |
| output_types: [raw_text] | |
| views: [text_final] | |
| output_dir: {out_dir} | |
| """)) | |
| with pytest.raises(CorpusImportError, match="n'est pas un rΓ©pertoire"): | |
| RunOrchestrator(out_dir).execute(spec) | |
| def test_unknown_adapter_class_raises(self, tmp_path: Path) -> None: | |
| corpus_zip = tmp_path / "c.zip" | |
| corpus_zip.write_bytes(_make_corpus_zip()) | |
| out_dir = tmp_path / "out" | |
| spec = load_run_spec_from_yaml(textwrap.dedent(f""" | |
| corpus_zip: {corpus_zip} | |
| pipelines: | |
| - name: p | |
| initial_inputs: [image] | |
| steps: | |
| - id: ocr | |
| adapter_class: tests.does_not_exist.Nope | |
| input_types: [image] | |
| output_types: [raw_text] | |
| views: [text_final] | |
| output_dir: {out_dir} | |
| """)) | |
| with pytest.raises(RunSpecLoadError, match="introuvable"): | |
| RunOrchestrator(out_dir).execute(spec) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Disambiguation des adapters | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestPipelineDisambiguation: | |
| def test_same_class_different_kwargs_yields_distinct_instances( | |
| self, tmp_path: Path, | |
| ) -> None: | |
| """Cas BnF : 2 pipelines utilisent ``PrecomputedTextAdapter`` | |
| mais avec ``source_label`` diffΓ©rents β ils doivent recevoir | |
| des instances distinctes (sinon le 2Γ¨me lirait les fichiers | |
| du 1er).""" | |
| # Corpus avec 2 sources prΓ©-calculΓ©es diffΓ©rentes. | |
| buf = io.BytesIO() | |
| with zipfile.ZipFile(buf, mode="w") as zf: | |
| zf.writestr("doc01.png", _png_bytes()) | |
| zf.writestr("doc01.gt.txt", "Bonjour") | |
| zf.writestr("doc01.tess.txt", "Bonjour") # source 1 | |
| zf.writestr("doc01.gpt4v.txt", "Bonjur") # source 2 (1 erreur) | |
| corpus_zip = tmp_path / "c.zip" | |
| corpus_zip.write_bytes(buf.getvalue()) | |
| out_dir = tmp_path / "out" | |
| spec = load_run_spec_from_yaml(textwrap.dedent(f""" | |
| corpus_zip: {corpus_zip} | |
| pipelines: | |
| - name: tess | |
| initial_inputs: [image] | |
| steps: | |
| - id: ocr | |
| adapter_class: picarones.adapters.ocr.precomputed.PrecomputedTextAdapter | |
| adapter_kwargs: | |
| source_label: tess | |
| input_types: [image] | |
| output_types: [raw_text] | |
| - name: gpt | |
| initial_inputs: [image] | |
| steps: | |
| - id: ocr | |
| adapter_class: picarones.adapters.ocr.precomputed.PrecomputedTextAdapter | |
| adapter_kwargs: | |
| source_label: gpt4v | |
| input_types: [image] | |
| output_types: [raw_text] | |
| views: [text_final] | |
| output_dir: {out_dir} | |
| """)) | |
| result = RunOrchestrator(out_dir).execute(spec) | |
| # 1 doc Γ 2 pipelines = 2 ViewResult. Ils doivent avoir des | |
| # candidate_artifact_id distincts (preuves d'instances distinctes). | |
| view_results = result.run_result.view_results_for("text_final") | |
| owners = { | |
| "tess" if "precomputed_tess" in vr.candidate_artifact_id and "tess:" in vr.candidate_artifact_id | |
| else "gpt" if "precomputed_gpt4v" in vr.candidate_artifact_id else "?" | |
| for vr in view_results | |
| } | |
| # Au moins 2 owners distincts. | |
| assert len(owners) >= 2 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Helpers privΓ©s (importΓ©s directement pour couverture explicite) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestDefaultGtFactory: | |
| def test_returns_artifact_for_present_gt(self) -> None: | |
| doc = DocumentRef( | |
| id="doc01", | |
| ground_truths=( | |
| GroundTruthRef(type=ArtifactType.RAW_TEXT, uri="/path/gt.txt"), | |
| ), | |
| ) | |
| gt = _default_gt_factory(doc, ArtifactType.RAW_TEXT) | |
| assert gt is not None | |
| assert gt.type == ArtifactType.RAW_TEXT | |
| assert gt.uri == "/path/gt.txt" | |
| def test_corrected_text_falls_back_to_raw_text_gt(self) -> None: | |
| """Convention : un candidat CORRECTED_TEXT est comparΓ© contre | |
| la GT RAW_TEXT (les deux sont du texte plat).""" | |
| doc = DocumentRef( | |
| id="doc01", | |
| ground_truths=( | |
| GroundTruthRef(type=ArtifactType.RAW_TEXT, uri="/path/gt.txt"), | |
| ), | |
| ) | |
| gt = _default_gt_factory(doc, ArtifactType.CORRECTED_TEXT) | |
| assert gt is not None | |
| assert gt.type == ArtifactType.RAW_TEXT # fallback explicite | |
| def test_returns_none_when_gt_absent(self) -> None: | |
| doc = DocumentRef(id="doc01", ground_truths=()) | |
| gt = _default_gt_factory(doc, ArtifactType.RAW_TEXT) | |
| assert gt is None | |
| class TestDefaultInputsFactory: | |
| def test_returns_image_artifact(self) -> None: | |
| doc = DocumentRef(id="doc01", image_uri="/path/img.png") | |
| inputs = _default_inputs_factory(doc) | |
| assert ArtifactType.IMAGE in inputs | |
| assert inputs[ArtifactType.IMAGE].uri == "/path/img.png" | |
| def test_raises_when_image_uri_absent(self) -> None: | |
| doc = DocumentRef(id="doc01") | |
| with pytest.raises(CorpusImportError, match="sans ``image_uri``"): | |
| _default_inputs_factory(doc) | |
| class TestContextFactory: | |
| def test_factory_propagates_code_version(self) -> None: | |
| factory = _make_context_factory("1.2.3") | |
| doc = DocumentRef(id="doc01", image_uri="/x") | |
| ctx = factory(doc, "my_pipeline") | |
| assert ctx.document_id == "doc01" | |
| assert ctx.code_version == "1.2.3" | |
| assert ctx.pipeline_name == "my_pipeline" | |
| class TestFilesystemPayloadLoader: | |
| def test_loads_raw_text(self, tmp_path: Path) -> None: | |
| path = tmp_path / "t.txt" | |
| path.write_text("Hello", encoding="utf-8") | |
| art = Artifact( | |
| id="d:t", document_id="d", type=ArtifactType.RAW_TEXT, uri=str(path), | |
| ) | |
| assert _filesystem_payload_loader(art) == "Hello" | |
| def test_loads_corrected_text(self, tmp_path: Path) -> None: | |
| path = tmp_path / "c.txt" | |
| path.write_text("Bonjour", encoding="utf-8") | |
| art = Artifact( | |
| id="d:c", document_id="d", type=ArtifactType.CORRECTED_TEXT, | |
| uri=str(path), | |
| ) | |
| assert _filesystem_payload_loader(art) == "Bonjour" | |
| def test_loads_alto_xml(self, tmp_path: Path) -> None: | |
| from picarones.formats.alto.types import ( | |
| AltoBBox, AltoDocument, AltoLine, AltoPage, AltoString, | |
| AltoTextBlock, | |
| ) | |
| from picarones.formats.alto.writer import write_alto | |
| doc = AltoDocument(pages=(AltoPage(blocks=(AltoTextBlock(lines=(AltoLine(strings=( | |
| AltoString(content="Hi", bbox=AltoBBox(hpos=0, vpos=0, width=10, height=10)), | |
| ),),),),),),)) | |
| path = tmp_path / "a.xml" | |
| path.write_bytes(write_alto(doc)) | |
| art = Artifact( | |
| id="d:a", document_id="d", type=ArtifactType.ALTO_XML, uri=str(path), | |
| ) | |
| loaded = _filesystem_payload_loader(art) | |
| assert loaded.pages[0].blocks[0].lines[0].strings[0].content == "Hi" | |
| def test_raises_on_missing_uri(self) -> None: | |
| art = Artifact( | |
| id="d:x", document_id="d", type=ArtifactType.RAW_TEXT, | |
| ) | |
| with pytest.raises(FileNotFoundError, match="sans URI"): | |
| _filesystem_payload_loader(art) | |
| def test_raises_on_unsupported_type(self, tmp_path: Path) -> None: | |
| path = tmp_path / "x.bin" | |
| path.write_bytes(b"\x00" * 4) | |
| art = Artifact( | |
| id="d:x", document_id="d", type=ArtifactType.IMAGE, uri=str(path), | |
| ) | |
| with pytest.raises(ValueError, match="non gΓ©rΓ©"): | |
| _filesystem_payload_loader(art) | |
| class TestKwargsSignature: | |
| def test_empty_dict(self) -> None: | |
| assert _kwargs_signature({}) == "" | |
| def test_single_kwarg(self) -> None: | |
| assert _kwargs_signature({"k": "v"}) == "k='v'" | |
| def test_sorted_stable(self) -> None: | |
| # Ordre d'insertion ne doit pas changer la signature. | |
| sig_a = _kwargs_signature({"b": 2, "a": 1}) | |
| sig_b = _kwargs_signature({"a": 1, "b": 2}) | |
| assert sig_a == sig_b | |
| def test_distinguishes_values(self) -> None: | |
| assert ( | |
| _kwargs_signature({"k": 1}) | |
| != _kwargs_signature({"k": 2}) | |
| ) | |