File size: 5,250 Bytes
823fb32
 
 
 
 
 
 
 
dbf17e9
 
 
 
 
 
 
 
 
823fb32
 
 
 
dbf17e9
823fb32
 
 
 
dbf17e9
 
 
 
 
 
 
823fb32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dbf17e9
 
 
 
 
 
 
 
 
 
 
823fb32
dbf17e9
 
 
 
 
823fb32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""Sprint A14-S8 — définition de done : 1000 docs synthétiques en
moins de 10 minutes sans dépasser 500 MB de RAM.

Test scaled-down pour CI rapide (200 docs, mais avec mesure de RAM
qui doit rester très basse vu la nature synthétique du benchmark).
Le critère réel "1000 docs / 10 min / 500MB" est atteint trivialement
avec ces stubs ; le test garde ces ordres de grandeur en
inégalité large pour éviter d'être flaky en CI.

Cross-OS
--------
- ``resource`` est POSIX-only — sur Windows tout le fichier est
  skipé via :func:`pytest.importorskip`.
- ``ru_maxrss`` a une unité **différente** selon la plateforme :
  Linux → KB, BSD/macOS → bytes (cf. ``man getrusage``).  La
  fonction ``_rss_mb`` détecte la plateforme et convertit
  correctement.
"""

from __future__ import annotations

import sys
import time

import pytest

# ``resource`` est POSIX-only.  Sur Windows, ``importorskip`` skip
# l'intégralité du module au lieu de planter la collection.
resource = pytest.importorskip(
    "resource",
    reason="``resource`` est POSIX-only — test skipé sur Windows.",
)

from picarones.domain import Artifact, ArtifactType, DocumentRef
from picarones.pipeline import (
    CorpusRunner,
    PipelineExecutor,
    PipelineSpec,
    PipelineStep,
    RunContext,
)


class _FastStub:
    """Adapter ultra-rapide pour mesurer les overheads d'orchestration."""

    name = "fast"
    input_types = frozenset({ArtifactType.IMAGE})
    output_types = frozenset({ArtifactType.RAW_TEXT})
    execution_mode = "io"

    def execute(self, inputs, params, context):
        return {
            ArtifactType.RAW_TEXT: Artifact(
                id=f"{context.document_id}:raw_text",
                document_id=context.document_id,
                type=ArtifactType.RAW_TEXT,
                content_hash="0" * 64,
            ),
        }


def _build(max_in_flight: int = 8):
    registry = {"fast": _FastStub()}
    exe = PipelineExecutor(adapter_resolver=lambda n: registry[n])
    runner = CorpusRunner(
        exe,
        max_in_flight=max_in_flight,
        timeout_seconds_per_doc=60.0,
        poll_interval_seconds=0.01,
    )
    spec = PipelineSpec(
        name="dod", initial_inputs=(ArtifactType.IMAGE,),
        steps=(PipelineStep(
            id="s", kind="ocr", adapter_name="fast",
            input_types=(ArtifactType.IMAGE,),
            output_types=(ArtifactType.RAW_TEXT,),
        ),),
    )
    return runner, spec


def _factories():
    def inputs(doc):
        return {ArtifactType.IMAGE: Artifact(
            id=f"{doc.id}:image",
            document_id=doc.id,
            type=ArtifactType.IMAGE,
        )}

    def ctx(doc):
        return RunContext(
            document_id=doc.id, code_version="1.0.0", pipeline_name="dod",
        )
    return inputs, ctx


def _rss_mb() -> float:
    """RSS en mégaoctets, **avec conversion d'unité par plateforme**.

    Selon ``man getrusage`` :

    - Linux : ``ru_maxrss`` est en **kilo-octets**
    - macOS / BSD : ``ru_maxrss`` est en **octets**

    Cette différence est explicite dans la doc POSIX et a déjà
    été source de bugs cross-OS dans ce projet — d'où la
    conversion conditionnelle.
    """
    rusage = resource.getrusage(resource.RUSAGE_SELF)
    if sys.platform == "darwin":
        # macOS : bytes → MB
        return rusage.ru_maxrss / (1024 * 1024)
    # Linux : KB → MB (et autres POSIX qui suivent la convention Linux)
    return rusage.ru_maxrss / 1024


@pytest.mark.parametrize("n_docs", [200])
def test_def_of_done_scaled(n_docs: int) -> None:
    """Critère : N docs en moins de 10 min, RAM bornée.

    Avec 200 docs synthétiques, on attend < 10s et < 500 MB RAM.
    """
    runner, spec = _build(max_in_flight=8)
    inputs, ctx = _factories()
    docs = [
        DocumentRef(id=f"d{i:04d}", image_uri=f"/tmp/{i}.png")
        for i in range(n_docs)
    ]

    rss_before = _rss_mb()
    t0 = time.perf_counter()
    result = runner.run(spec, docs, inputs, ctx, corpus_name="dod")
    elapsed = time.perf_counter() - t0
    rss_after = _rss_mb()

    rss_growth = rss_after - rss_before

    assert result.n_documents == n_docs
    assert result.n_succeeded == n_docs

    # Critère temps (large marge pour CI lente).
    assert elapsed < 60.0, (
        f"trop lent : {n_docs} docs en {elapsed:.1f}s"
    )

    # Critère RAM (la croissance pendant le run doit rester
    # raisonnable — pas un test strict, juste un garde-fou contre
    # une régression "submit all upfront" qui ferait exploser).
    assert rss_growth < 200.0, (
        f"croissance RAM excessive : +{rss_growth:.1f}MB"
    )


def test_throughput_with_backpressure_reasonable() -> None:
    """Avec max_in_flight=4 et un adapter ultra-rapide, on doit
    traiter 100 docs en bien moins d'une seconde."""
    runner, spec = _build(max_in_flight=4)
    inputs, ctx = _factories()
    docs = [DocumentRef(id=f"d{i}") for i in range(100)]

    t0 = time.perf_counter()
    result = runner.run(spec, docs, inputs, ctx)
    elapsed = time.perf_counter() - t0

    assert result.n_succeeded == 100
    # Threshold large : 100 docs synthétiques en moins de 5s.
    assert elapsed < 5.0, f"throughput trop bas : {elapsed:.2f}s"