Spaces:
Sleeping
Sleeping
| """Sprint A14-S8 — définition de done : 1000 docs synthétiques en | |
| moins de 10 minutes sans dépasser 500 MB de RAM. | |
| Test scaled-down pour CI rapide (200 docs, mais avec mesure de RAM | |
| qui doit rester très basse vu la nature synthétique du benchmark). | |
| Le critère réel "1000 docs / 10 min / 500MB" est atteint trivialement | |
| avec ces stubs ; le test garde ces ordres de grandeur en | |
| inégalité large pour éviter d'être flaky en CI. | |
| Cross-OS | |
| -------- | |
| - ``resource`` est POSIX-only — sur Windows tout le fichier est | |
| skipé via :func:`pytest.importorskip`. | |
| - ``ru_maxrss`` a une unité **différente** selon la plateforme : | |
| Linux → KB, BSD/macOS → bytes (cf. ``man getrusage``). La | |
| fonction ``_rss_mb`` détecte la plateforme et convertit | |
| correctement. | |
| """ | |
| from __future__ import annotations | |
| import sys | |
| import time | |
| import pytest | |
| # ``resource`` est POSIX-only. Sur Windows, ``importorskip`` skip | |
| # l'intégralité du module au lieu de planter la collection. | |
| resource = pytest.importorskip( | |
| "resource", | |
| reason="``resource`` est POSIX-only — test skipé sur Windows.", | |
| ) | |
| from picarones.domain import Artifact, ArtifactType, DocumentRef | |
| from picarones.pipeline import ( | |
| CorpusRunner, | |
| PipelineExecutor, | |
| PipelineSpec, | |
| PipelineStep, | |
| RunContext, | |
| ) | |
| class _FastStub: | |
| """Adapter ultra-rapide pour mesurer les overheads d'orchestration.""" | |
| name = "fast" | |
| input_types = frozenset({ArtifactType.IMAGE}) | |
| output_types = frozenset({ArtifactType.RAW_TEXT}) | |
| execution_mode = "io" | |
| def execute(self, inputs, params, context): | |
| return { | |
| ArtifactType.RAW_TEXT: Artifact( | |
| id=f"{context.document_id}:raw_text", | |
| document_id=context.document_id, | |
| type=ArtifactType.RAW_TEXT, | |
| content_hash="0" * 64, | |
| ), | |
| } | |
| def _build(max_in_flight: int = 8): | |
| registry = {"fast": _FastStub()} | |
| exe = PipelineExecutor(adapter_resolver=lambda n: registry[n]) | |
| runner = CorpusRunner( | |
| exe, | |
| max_in_flight=max_in_flight, | |
| timeout_seconds_per_doc=60.0, | |
| poll_interval_seconds=0.01, | |
| ) | |
| spec = PipelineSpec( | |
| name="dod", initial_inputs=(ArtifactType.IMAGE,), | |
| steps=(PipelineStep( | |
| id="s", kind="ocr", adapter_name="fast", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ),), | |
| ) | |
| return runner, spec | |
| def _factories(): | |
| def inputs(doc): | |
| return {ArtifactType.IMAGE: Artifact( | |
| id=f"{doc.id}:image", | |
| document_id=doc.id, | |
| type=ArtifactType.IMAGE, | |
| )} | |
| def ctx(doc): | |
| return RunContext( | |
| document_id=doc.id, code_version="1.0.0", pipeline_name="dod", | |
| ) | |
| return inputs, ctx | |
| def _rss_mb() -> float: | |
| """RSS en mégaoctets, **avec conversion d'unité par plateforme**. | |
| Selon ``man getrusage`` : | |
| - Linux : ``ru_maxrss`` est en **kilo-octets** | |
| - macOS / BSD : ``ru_maxrss`` est en **octets** | |
| Cette différence est explicite dans la doc POSIX et a déjà | |
| été source de bugs cross-OS dans ce projet — d'où la | |
| conversion conditionnelle. | |
| """ | |
| rusage = resource.getrusage(resource.RUSAGE_SELF) | |
| if sys.platform == "darwin": | |
| # macOS : bytes → MB | |
| return rusage.ru_maxrss / (1024 * 1024) | |
| # Linux : KB → MB (et autres POSIX qui suivent la convention Linux) | |
| return rusage.ru_maxrss / 1024 | |
| def test_def_of_done_scaled(n_docs: int) -> None: | |
| """Critère : N docs en moins de 10 min, RAM bornée. | |
| Avec 200 docs synthétiques, on attend < 10s et < 500 MB RAM. | |
| """ | |
| runner, spec = _build(max_in_flight=8) | |
| inputs, ctx = _factories() | |
| docs = [ | |
| DocumentRef(id=f"d{i:04d}", image_uri=f"/tmp/{i}.png") | |
| for i in range(n_docs) | |
| ] | |
| rss_before = _rss_mb() | |
| t0 = time.perf_counter() | |
| result = runner.run(spec, docs, inputs, ctx, corpus_name="dod") | |
| elapsed = time.perf_counter() - t0 | |
| rss_after = _rss_mb() | |
| rss_growth = rss_after - rss_before | |
| assert result.n_documents == n_docs | |
| assert result.n_succeeded == n_docs | |
| # Critère temps (large marge pour CI lente). | |
| assert elapsed < 60.0, ( | |
| f"trop lent : {n_docs} docs en {elapsed:.1f}s" | |
| ) | |
| # Critère RAM (la croissance pendant le run doit rester | |
| # raisonnable — pas un test strict, juste un garde-fou contre | |
| # une régression "submit all upfront" qui ferait exploser). | |
| assert rss_growth < 200.0, ( | |
| f"croissance RAM excessive : +{rss_growth:.1f}MB" | |
| ) | |
| def test_throughput_with_backpressure_reasonable() -> None: | |
| """Avec max_in_flight=4 et un adapter ultra-rapide, on doit | |
| traiter 100 docs en bien moins d'une seconde.""" | |
| runner, spec = _build(max_in_flight=4) | |
| inputs, ctx = _factories() | |
| docs = [DocumentRef(id=f"d{i}") for i in range(100)] | |
| t0 = time.perf_counter() | |
| result = runner.run(spec, docs, inputs, ctx) | |
| elapsed = time.perf_counter() - t0 | |
| assert result.n_succeeded == 100 | |
| # Threshold large : 100 docs synthétiques en moins de 5s. | |
| assert elapsed < 5.0, f"throughput trop bas : {elapsed:.2f}s" | |