Spaces:
Sleeping
Sleeping
File size: 5,250 Bytes
823fb32 dbf17e9 823fb32 dbf17e9 823fb32 dbf17e9 823fb32 dbf17e9 823fb32 dbf17e9 823fb32 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | """Sprint A14-S8 — définition de done : 1000 docs synthétiques en
moins de 10 minutes sans dépasser 500 MB de RAM.
Test scaled-down pour CI rapide (200 docs, mais avec mesure de RAM
qui doit rester très basse vu la nature synthétique du benchmark).
Le critère réel "1000 docs / 10 min / 500MB" est atteint trivialement
avec ces stubs ; le test garde ces ordres de grandeur en
inégalité large pour éviter d'être flaky en CI.
Cross-OS
--------
- ``resource`` est POSIX-only — sur Windows tout le fichier est
skipé via :func:`pytest.importorskip`.
- ``ru_maxrss`` a une unité **différente** selon la plateforme :
Linux → KB, BSD/macOS → bytes (cf. ``man getrusage``). La
fonction ``_rss_mb`` détecte la plateforme et convertit
correctement.
"""
from __future__ import annotations
import sys
import time
import pytest
# ``resource`` est POSIX-only. Sur Windows, ``importorskip`` skip
# l'intégralité du module au lieu de planter la collection.
resource = pytest.importorskip(
"resource",
reason="``resource`` est POSIX-only — test skipé sur Windows.",
)
from picarones.domain import Artifact, ArtifactType, DocumentRef
from picarones.pipeline import (
CorpusRunner,
PipelineExecutor,
PipelineSpec,
PipelineStep,
RunContext,
)
class _FastStub:
"""Adapter ultra-rapide pour mesurer les overheads d'orchestration."""
name = "fast"
input_types = frozenset({ArtifactType.IMAGE})
output_types = frozenset({ArtifactType.RAW_TEXT})
execution_mode = "io"
def execute(self, inputs, params, context):
return {
ArtifactType.RAW_TEXT: Artifact(
id=f"{context.document_id}:raw_text",
document_id=context.document_id,
type=ArtifactType.RAW_TEXT,
content_hash="0" * 64,
),
}
def _build(max_in_flight: int = 8):
registry = {"fast": _FastStub()}
exe = PipelineExecutor(adapter_resolver=lambda n: registry[n])
runner = CorpusRunner(
exe,
max_in_flight=max_in_flight,
timeout_seconds_per_doc=60.0,
poll_interval_seconds=0.01,
)
spec = PipelineSpec(
name="dod", initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="s", kind="ocr", adapter_name="fast",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
return runner, spec
def _factories():
def inputs(doc):
return {ArtifactType.IMAGE: Artifact(
id=f"{doc.id}:image",
document_id=doc.id,
type=ArtifactType.IMAGE,
)}
def ctx(doc):
return RunContext(
document_id=doc.id, code_version="1.0.0", pipeline_name="dod",
)
return inputs, ctx
def _rss_mb() -> float:
"""RSS en mégaoctets, **avec conversion d'unité par plateforme**.
Selon ``man getrusage`` :
- Linux : ``ru_maxrss`` est en **kilo-octets**
- macOS / BSD : ``ru_maxrss`` est en **octets**
Cette différence est explicite dans la doc POSIX et a déjà
été source de bugs cross-OS dans ce projet — d'où la
conversion conditionnelle.
"""
rusage = resource.getrusage(resource.RUSAGE_SELF)
if sys.platform == "darwin":
# macOS : bytes → MB
return rusage.ru_maxrss / (1024 * 1024)
# Linux : KB → MB (et autres POSIX qui suivent la convention Linux)
return rusage.ru_maxrss / 1024
@pytest.mark.parametrize("n_docs", [200])
def test_def_of_done_scaled(n_docs: int) -> None:
"""Critère : N docs en moins de 10 min, RAM bornée.
Avec 200 docs synthétiques, on attend < 10s et < 500 MB RAM.
"""
runner, spec = _build(max_in_flight=8)
inputs, ctx = _factories()
docs = [
DocumentRef(id=f"d{i:04d}", image_uri=f"/tmp/{i}.png")
for i in range(n_docs)
]
rss_before = _rss_mb()
t0 = time.perf_counter()
result = runner.run(spec, docs, inputs, ctx, corpus_name="dod")
elapsed = time.perf_counter() - t0
rss_after = _rss_mb()
rss_growth = rss_after - rss_before
assert result.n_documents == n_docs
assert result.n_succeeded == n_docs
# Critère temps (large marge pour CI lente).
assert elapsed < 60.0, (
f"trop lent : {n_docs} docs en {elapsed:.1f}s"
)
# Critère RAM (la croissance pendant le run doit rester
# raisonnable — pas un test strict, juste un garde-fou contre
# une régression "submit all upfront" qui ferait exploser).
assert rss_growth < 200.0, (
f"croissance RAM excessive : +{rss_growth:.1f}MB"
)
def test_throughput_with_backpressure_reasonable() -> None:
"""Avec max_in_flight=4 et un adapter ultra-rapide, on doit
traiter 100 docs en bien moins d'une seconde."""
runner, spec = _build(max_in_flight=4)
inputs, ctx = _factories()
docs = [DocumentRef(id=f"d{i}") for i in range(100)]
t0 = time.perf_counter()
result = runner.run(spec, docs, inputs, ctx)
elapsed = time.perf_counter() - t0
assert result.n_succeeded == 100
# Threshold large : 100 docs synthétiques en moins de 5s.
assert elapsed < 5.0, f"throughput trop bas : {elapsed:.2f}s"
|