Picarones / tests /pipeline /test_sprint_a14_s8_def_of_done.py
Claude
fix(tests): test_sprint_a14_s8_def_of_done — bugs réels (pas spéculatifs)
dbf17e9 unverified
Raw
History Blame
5.25 kB
"""Sprint A14-S8 — définition de done : 1000 docs synthétiques en
moins de 10 minutes sans dépasser 500 MB de RAM.
Test scaled-down pour CI rapide (200 docs, mais avec mesure de RAM
qui doit rester très basse vu la nature synthétique du benchmark).
Le critère réel "1000 docs / 10 min / 500MB" est atteint trivialement
avec ces stubs ; le test garde ces ordres de grandeur en
inégalité large pour éviter d'être flaky en CI.
Cross-OS
--------
- ``resource`` est POSIX-only — sur Windows tout le fichier est
skipé via :func:`pytest.importorskip`.
- ``ru_maxrss`` a une unité **différente** selon la plateforme :
Linux → KB, BSD/macOS → bytes (cf. ``man getrusage``). La
fonction ``_rss_mb`` détecte la plateforme et convertit
correctement.
"""
from __future__ import annotations
import sys
import time
import pytest
# ``resource`` est POSIX-only. Sur Windows, ``importorskip`` skip
# l'intégralité du module au lieu de planter la collection.
resource = pytest.importorskip(
"resource",
reason="``resource`` est POSIX-only — test skipé sur Windows.",
)
from picarones.domain import Artifact, ArtifactType, DocumentRef
from picarones.pipeline import (
CorpusRunner,
PipelineExecutor,
PipelineSpec,
PipelineStep,
RunContext,
)
class _FastStub:
"""Adapter ultra-rapide pour mesurer les overheads d'orchestration."""
name = "fast"
input_types = frozenset({ArtifactType.IMAGE})
output_types = frozenset({ArtifactType.RAW_TEXT})
execution_mode = "io"
def execute(self, inputs, params, context):
return {
ArtifactType.RAW_TEXT: Artifact(
id=f"{context.document_id}:raw_text",
document_id=context.document_id,
type=ArtifactType.RAW_TEXT,
content_hash="0" * 64,
),
}
def _build(max_in_flight: int = 8):
registry = {"fast": _FastStub()}
exe = PipelineExecutor(adapter_resolver=lambda n: registry[n])
runner = CorpusRunner(
exe,
max_in_flight=max_in_flight,
timeout_seconds_per_doc=60.0,
poll_interval_seconds=0.01,
)
spec = PipelineSpec(
name="dod", initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="s", kind="ocr", adapter_name="fast",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
return runner, spec
def _factories():
def inputs(doc):
return {ArtifactType.IMAGE: Artifact(
id=f"{doc.id}:image",
document_id=doc.id,
type=ArtifactType.IMAGE,
)}
def ctx(doc):
return RunContext(
document_id=doc.id, code_version="1.0.0", pipeline_name="dod",
)
return inputs, ctx
def _rss_mb() -> float:
"""RSS en mégaoctets, **avec conversion d'unité par plateforme**.
Selon ``man getrusage`` :
- Linux : ``ru_maxrss`` est en **kilo-octets**
- macOS / BSD : ``ru_maxrss`` est en **octets**
Cette différence est explicite dans la doc POSIX et a déjà
été source de bugs cross-OS dans ce projet — d'où la
conversion conditionnelle.
"""
rusage = resource.getrusage(resource.RUSAGE_SELF)
if sys.platform == "darwin":
# macOS : bytes → MB
return rusage.ru_maxrss / (1024 * 1024)
# Linux : KB → MB (et autres POSIX qui suivent la convention Linux)
return rusage.ru_maxrss / 1024
@pytest.mark.parametrize("n_docs", [200])
def test_def_of_done_scaled(n_docs: int) -> None:
"""Critère : N docs en moins de 10 min, RAM bornée.
Avec 200 docs synthétiques, on attend < 10s et < 500 MB RAM.
"""
runner, spec = _build(max_in_flight=8)
inputs, ctx = _factories()
docs = [
DocumentRef(id=f"d{i:04d}", image_uri=f"/tmp/{i}.png")
for i in range(n_docs)
]
rss_before = _rss_mb()
t0 = time.perf_counter()
result = runner.run(spec, docs, inputs, ctx, corpus_name="dod")
elapsed = time.perf_counter() - t0
rss_after = _rss_mb()
rss_growth = rss_after - rss_before
assert result.n_documents == n_docs
assert result.n_succeeded == n_docs
# Critère temps (large marge pour CI lente).
assert elapsed < 60.0, (
f"trop lent : {n_docs} docs en {elapsed:.1f}s"
)
# Critère RAM (la croissance pendant le run doit rester
# raisonnable — pas un test strict, juste un garde-fou contre
# une régression "submit all upfront" qui ferait exploser).
assert rss_growth < 200.0, (
f"croissance RAM excessive : +{rss_growth:.1f}MB"
)
def test_throughput_with_backpressure_reasonable() -> None:
"""Avec max_in_flight=4 et un adapter ultra-rapide, on doit
traiter 100 docs en bien moins d'une seconde."""
runner, spec = _build(max_in_flight=4)
inputs, ctx = _factories()
docs = [DocumentRef(id=f"d{i}") for i in range(100)]
t0 = time.perf_counter()
result = runner.run(spec, docs, inputs, ctx)
elapsed = time.perf_counter() - t0
assert result.n_succeeded == 100
# Threshold large : 100 docs synthétiques en moins de 5s.
assert elapsed < 5.0, f"throughput trop bas : {elapsed:.2f}s"