"""Sprint A14-S8 — définition de done : 1000 docs synthétiques en moins de 10 minutes sans dépasser 500 MB de RAM. Test scaled-down pour CI rapide (200 docs, mais avec mesure de RAM qui doit rester très basse vu la nature synthétique du benchmark). Le critère réel "1000 docs / 10 min / 500MB" est atteint trivialement avec ces stubs ; le test garde ces ordres de grandeur en inégalité large pour éviter d'être flaky en CI. Cross-OS -------- - ``resource`` est POSIX-only — sur Windows tout le fichier est skipé via :func:`pytest.importorskip`. - ``ru_maxrss`` a une unité **différente** selon la plateforme : Linux → KB, BSD/macOS → bytes (cf. ``man getrusage``). La fonction ``_rss_mb`` détecte la plateforme et convertit correctement. """ from __future__ import annotations import sys import time import pytest # ``resource`` est POSIX-only. Sur Windows, ``importorskip`` skip # l'intégralité du module au lieu de planter la collection. resource = pytest.importorskip( "resource", reason="``resource`` est POSIX-only — test skipé sur Windows.", ) from picarones.domain import Artifact, ArtifactType, DocumentRef from picarones.pipeline import ( CorpusRunner, PipelineExecutor, PipelineSpec, PipelineStep, RunContext, ) class _FastStub: """Adapter ultra-rapide pour mesurer les overheads d'orchestration.""" name = "fast" input_types = frozenset({ArtifactType.IMAGE}) output_types = frozenset({ArtifactType.RAW_TEXT}) execution_mode = "io" def execute(self, inputs, params, context): return { ArtifactType.RAW_TEXT: Artifact( id=f"{context.document_id}:raw_text", document_id=context.document_id, type=ArtifactType.RAW_TEXT, content_hash="0" * 64, ), } def _build(max_in_flight: int = 8): registry = {"fast": _FastStub()} exe = PipelineExecutor(adapter_resolver=lambda n: registry[n]) runner = CorpusRunner( exe, max_in_flight=max_in_flight, timeout_seconds_per_doc=60.0, poll_interval_seconds=0.01, ) spec = PipelineSpec( name="dod", initial_inputs=(ArtifactType.IMAGE,), steps=(PipelineStep( id="s", kind="ocr", adapter_name="fast", input_types=(ArtifactType.IMAGE,), output_types=(ArtifactType.RAW_TEXT,), ),), ) return runner, spec def _factories(): def inputs(doc): return {ArtifactType.IMAGE: Artifact( id=f"{doc.id}:image", document_id=doc.id, type=ArtifactType.IMAGE, )} def ctx(doc): return RunContext( document_id=doc.id, code_version="1.0.0", pipeline_name="dod", ) return inputs, ctx def _rss_mb() -> float: """RSS en mégaoctets, **avec conversion d'unité par plateforme**. Selon ``man getrusage`` : - Linux : ``ru_maxrss`` est en **kilo-octets** - macOS / BSD : ``ru_maxrss`` est en **octets** Cette différence est explicite dans la doc POSIX et a déjà été source de bugs cross-OS dans ce projet — d'où la conversion conditionnelle. """ rusage = resource.getrusage(resource.RUSAGE_SELF) if sys.platform == "darwin": # macOS : bytes → MB return rusage.ru_maxrss / (1024 * 1024) # Linux : KB → MB (et autres POSIX qui suivent la convention Linux) return rusage.ru_maxrss / 1024 @pytest.mark.parametrize("n_docs", [200]) def test_def_of_done_scaled(n_docs: int) -> None: """Critère : N docs en moins de 10 min, RAM bornée. Avec 200 docs synthétiques, on attend < 10s et < 500 MB RAM. """ runner, spec = _build(max_in_flight=8) inputs, ctx = _factories() docs = [ DocumentRef(id=f"d{i:04d}", image_uri=f"/tmp/{i}.png") for i in range(n_docs) ] rss_before = _rss_mb() t0 = time.perf_counter() result = runner.run(spec, docs, inputs, ctx, corpus_name="dod") elapsed = time.perf_counter() - t0 rss_after = _rss_mb() rss_growth = rss_after - rss_before assert result.n_documents == n_docs assert result.n_succeeded == n_docs # Critère temps (large marge pour CI lente). assert elapsed < 60.0, ( f"trop lent : {n_docs} docs en {elapsed:.1f}s" ) # Critère RAM (la croissance pendant le run doit rester # raisonnable — pas un test strict, juste un garde-fou contre # une régression "submit all upfront" qui ferait exploser). assert rss_growth < 200.0, ( f"croissance RAM excessive : +{rss_growth:.1f}MB" ) def test_throughput_with_backpressure_reasonable() -> None: """Avec max_in_flight=4 et un adapter ultra-rapide, on doit traiter 100 docs en bien moins d'une seconde.""" runner, spec = _build(max_in_flight=4) inputs, ctx = _factories() docs = [DocumentRef(id=f"d{i}") for i in range(100)] t0 = time.perf_counter() result = runner.run(spec, docs, inputs, ctx) elapsed = time.perf_counter() - t0 assert result.n_succeeded == 100 # Threshold large : 100 docs synthétiques en moins de 5s. assert elapsed < 5.0, f"throughput trop bas : {elapsed:.2f}s"