Claude commited on
Commit
3b65839
·
unverified ·
1 Parent(s): b9ff8de

feat(pipeline): Sprint A14-S7 — PipelineExecutor mono-doc + ArtifactCache

Browse files

Sprint S7 du plan rewrite ciblé. **Phase 2 démarrée** (pipeline
executor + migration des calculs).

Première version réelle de l'exécuteur du nouveau pipeline.
Mono-document, séquentiel, capture gracieuse des erreurs. Plus
un ``ArtifactCache`` minimal in-memory en couche de calcul (pas
encore branché à l'executor — viendra quand un cas d'usage
concret de réutilisation se présentera).

Modules livrés
--------------
``picarones/pipeline/executor.py``
``PipelineExecutor(adapter_resolver)`` :

- ``adapter_resolver: Callable[[str], StepExecutor]`` injecté au
constructeur. Permet aux tests d'utiliser un dict simple, et
au S19 d'injecter un service applicatif complet.
- ``run(spec, document, initial_inputs, context) -> PipelineResult``
exécute la pipeline en séquentiel.

Garanties :
- Validation défensive : ``validate_spec()`` appelée avant
toute exécution → ``PipelineSpecInvalid`` levée si la spec
est incohérente (bug de programmation, pas runtime).
- Bag versionné ``(ArtifactType, step_id) → Artifact`` + map
``latest_producer`` pour la résolution des inputs. Respecte
``inputs_from`` quand présent (DAG branchant Sprint 66
historique), sinon prend la version la plus récente.
- Capture gracieuse des erreurs par étape :
* adapter qui lève → ``error="adapter_raised: <Type>: <msg>"``
* adapter introuvable → ``error="adapter_not_found: <name>"``
* input manquant → ``error="missing_input: <type>[@<step>]"``
* output promis manquant → ``error="missing_output: [<types>]"``
- Mesure ``time.perf_counter()`` autour de ``execute()`` pour
chaque step + total. Le timeout depuis le début d'exécution
réelle vient au S8.

Pas implémenté (reportés) :
- Annulation propre par signal aux workers (S8).
- Branchement avec ``ArtifactCache`` (cas d'usage concret S8+).
- Parallélisation inter-étapes (post-livraison probable).

``picarones/pipeline/cache.py``
``ArtifactCache`` in-memory :

- ``compute_key(step, input_artifacts, code_version)`` →
SHA-256 hex de ``(content_hashes triés + step.model_dump()
sérialisé déterministe + code_version)``. Retourne ``None``
si un seul input n'a pas de ``content_hash``
(convention "ne pas servir un résultat douteux").
- ``get(key)`` / ``put(key, outputs)`` / ``clear()`` /
``__contains__`` / ``__len__`` / ``keys()``.
- ``put`` fait une copie défensive du dict d'outputs.
- Pas de TTL, pas d'éviction LRU, pas de persistance disque
pour S7.

Tests — 29 nouveaux tests
-------------------------
- tests/pipeline/test_sprint_a14_s7_executor.py (13) — pipeline
mono-step, deux-step, fork avec inputs_from explicite, fallback
latest sans inputs_from, capture step qui lève, adapter
inconnu, output manquant, input manquant, spec invalide
(PipelineSpecInvalid), resolver non-callable rejeté.

- tests/pipeline/test_sprint_a14_s7_timing.py (4) — duration
reflète le sleep, total >= somme des steps, durée non-négative
même en échec, **def of done : pipeline mock en < 100ms**.

- tests/pipeline/test_sprint_a14_s7_artifact_cache.py (12) —
compute_key déterministe, sensible aux content_hash / code_version
/ step.params, retourne None sans hash, get/put/clear,
none-key no-op, copie défensive.

Critère go/no-go S7 atteint
---------------------------
``PipelineExecutor.run`` exécute une pipeline mock 2 étapes en
**0.06 ms** (smoke test) — largement sous les 100 ms requis.
Le ``PipelineResult`` retourné contient les durées par étape, la
liste de tous les artefacts (initial + produits), le succeeded
agrégé, et chaque ``StepResult`` détaillé.

État de la suite
----------------
``pytest tests/ -q`` → 4103 passed, 6 skipped, 2 failed.
+29 tests par rapport à S6. Les 2 fails restants sont
strictement environnementaux (sous-process pytest sans
``pip install -e .``). Aucune régression S7.

Prêt pour S8 (CorpusRunner avec backpressure, timeout depuis le
début d'exécution réelle, annulation propre).

https://claude.ai/code/session_011XQZNitg1rCgia8ZD1a2hP

picarones/pipeline/__init__.py CHANGED
@@ -19,15 +19,20 @@ Modules livrés au S6
19
  ``ValidationError``. Validation statique sans instancier de module.
20
  - ``yaml_io.py`` — ``dump_spec_to_yaml`` / ``load_spec_from_yaml``.
21
 
22
- À venir aux Sprints S7-S8
23
- -------------------------
24
- - ``executor.py`` — ``PipelineExecutor.run(spec, document, inputs,
25
- context)`` exécute mono-document avec capture gracieuse des erreurs.
 
 
 
 
 
 
 
26
  - ``runner.py`` — ``CorpusRunner`` orchestre l'executor sur un corpus
27
  complet avec **backpressure**, **timeout depuis le début
28
  d'exécution réelle**, **annulation propre**.
29
- - ``cache.py`` — ``ArtifactCache`` indexé par
30
- ``hash(content + spec + code_version)``.
31
 
32
  Cible du Sprint S12 : équivalence numérique CER/WER avec l'ancien
33
  ``measurements.runner`` à 1e-9 près sur les fixtures.
@@ -35,6 +40,12 @@ Cible du Sprint S12 : équivalence numérique CER/WER avec l'ancien
35
 
36
  from __future__ import annotations
37
 
 
 
 
 
 
 
38
  from picarones.pipeline.protocols import ExecutionMode, StepExecutor
39
  from picarones.pipeline.spec import INITIAL_STEP_ID, PipelineSpec, PipelineStep
40
  from picarones.pipeline.types import PipelineResult, RunContext, StepResult
@@ -59,4 +70,10 @@ __all__ = [
59
  # YAML IO
60
  "dump_spec_to_yaml",
61
  "load_spec_from_yaml",
 
 
 
 
 
 
62
  ]
 
19
  ``ValidationError``. Validation statique sans instancier de module.
20
  - ``yaml_io.py`` — ``dump_spec_to_yaml`` / ``load_spec_from_yaml``.
21
 
22
+ Modules livrés au S7
23
+ --------------------
24
+ - ``executor.py`` — ``PipelineExecutor.run(spec, document,
25
+ initial_inputs, context)`` exécute mono-document avec capture
26
+ gracieuse des erreurs et bag d'artefacts versionné.
27
+ ``AdapterResolver`` type alias.
28
+ - ``cache.py`` — ``ArtifactCache`` minimal in-memory indexé par
29
+ ``hash(content + spec + code_version)``.
30
+
31
+ À venir au Sprint S8
32
+ --------------------
33
  - ``runner.py`` — ``CorpusRunner`` orchestre l'executor sur un corpus
34
  complet avec **backpressure**, **timeout depuis le début
35
  d'exécution réelle**, **annulation propre**.
 
 
36
 
37
  Cible du Sprint S12 : équivalence numérique CER/WER avec l'ancien
38
  ``measurements.runner`` à 1e-9 près sur les fixtures.
 
40
 
41
  from __future__ import annotations
42
 
43
+ from picarones.pipeline.cache import ArtifactCache
44
+ from picarones.pipeline.executor import (
45
+ AdapterResolver,
46
+ PipelineExecutor,
47
+ PipelineSpecInvalid,
48
+ )
49
  from picarones.pipeline.protocols import ExecutionMode, StepExecutor
50
  from picarones.pipeline.spec import INITIAL_STEP_ID, PipelineSpec, PipelineStep
51
  from picarones.pipeline.types import PipelineResult, RunContext, StepResult
 
70
  # YAML IO
71
  "dump_spec_to_yaml",
72
  "load_spec_from_yaml",
73
+ # Executor (S7)
74
+ "PipelineExecutor",
75
+ "PipelineSpecInvalid",
76
+ "AdapterResolver",
77
+ # Cache (S7)
78
+ "ArtifactCache",
79
  ]
picarones/pipeline/cache.py ADDED
@@ -0,0 +1,154 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """``ArtifactCache`` minimal in-memory — Sprint A14-S7.
2
+
3
+ Cache d'outputs d'étape indexé par ``(content_hashes des inputs +
4
+ spec hash + code_version)``. Permet de sauter une étape coûteuse
5
+ (typiquement un appel LLM cloud) si elle a déjà été exécutée avec
6
+ exactement les mêmes inputs et la même spec.
7
+
8
+ S7 livre la couche de calcul ; le branchement avec
9
+ ``PipelineExecutor`` viendra quand un cas d'usage concret de
10
+ réutilisation se présentera (probablement S8 quand on aura
11
+ l'orchestration corpus-wide qui peut bénéficier d'un cache pour
12
+ les retries idempotents).
13
+
14
+ Garde-fous
15
+ ----------
16
+ - Si **un seul** input n'a pas de ``content_hash``, la clé n'est
17
+ pas calculable → ``compute_key`` retourne ``None`` →
18
+ ``get`` retourne ``None`` (équivalent à un cache miss). Pas de
19
+ fallback hasardeux qui pourrait servir des résultats faux.
20
+ - Pas de TTL, pas d'éviction LRU — c'est un cache in-memory
21
+ simple, taille gardée par le caller (qui peut appeler ``clear()``
22
+ s'il veut libérer la mémoire).
23
+ - Pas de persistance disque pour S7. Si un caller en a besoin,
24
+ on l'ajoutera quand le besoin sera concret (S20+ probablement).
25
+ """
26
+
27
+ from __future__ import annotations
28
+
29
+ import hashlib
30
+ import json
31
+ from typing import Iterable
32
+
33
+ from picarones.domain.artifacts import Artifact, ArtifactType
34
+ from picarones.pipeline.spec import PipelineStep
35
+
36
+
37
+ class ArtifactCache:
38
+ """Cache in-memory d'outputs d'étape.
39
+
40
+ Thread-safe en lecture/écriture **après** l'init (les opérations
41
+ mutantes se font sur un dict — Python GIL garantit l'atomicité
42
+ des set/del sur un dict). Pas de mécanisme de freeze technique.
43
+ """
44
+
45
+ def __init__(self) -> None:
46
+ self._store: dict[str, dict[ArtifactType, Artifact]] = {}
47
+
48
+ # ──────────────────────────────────────────────────────────────────
49
+ # Calcul de clé
50
+ # ──────────────────────────────────────────────────────────────────
51
+
52
+ def compute_key(
53
+ self,
54
+ step: PipelineStep,
55
+ input_artifacts: dict[ArtifactType, Artifact],
56
+ code_version: str,
57
+ ) -> str | None:
58
+ """Calcule la clé canonique du cache pour cette exécution.
59
+
60
+ Retourne ``None`` si **un seul** input n'a pas de
61
+ ``content_hash`` — convention "ne sert pas un résultat
62
+ douteux".
63
+
64
+ La clé combine :
65
+
66
+ - les ``content_hash`` triés par ``ArtifactType.value``,
67
+ - le hash de la spec du step (sérialisée JSON déterministe),
68
+ - le ``code_version``.
69
+
70
+ Deux exécutions avec exactement les mêmes inputs (au sens
71
+ ``content_hash``), la même spec et la même version de code
72
+ produisent la même clé.
73
+ """
74
+ # 1. Inputs : (type → content_hash), tous obligatoires.
75
+ try:
76
+ input_hashes = sorted(
77
+ (t.value, input_artifacts[t].content_hash)
78
+ for t in input_artifacts
79
+ )
80
+ except KeyError:
81
+ return None
82
+ if any(h is None for _, h in input_hashes):
83
+ return None
84
+
85
+ # 2. Spec du step : on hash la sérialisation pydantic de
86
+ # PipelineStep (params, kind, adapter_name, etc.). Tout
87
+ # changement dans la spec invalide le cache.
88
+ step_payload = step.model_dump(mode="json")
89
+ step_blob = json.dumps(
90
+ step_payload,
91
+ sort_keys=True,
92
+ ensure_ascii=False,
93
+ separators=(",", ":"),
94
+ )
95
+
96
+ # 3. Composition.
97
+ material = json.dumps(
98
+ {
99
+ "inputs": input_hashes,
100
+ "step": step_blob,
101
+ "code_version": code_version,
102
+ },
103
+ sort_keys=True,
104
+ ensure_ascii=False,
105
+ separators=(",", ":"),
106
+ )
107
+ return hashlib.sha256(material.encode("utf-8")).hexdigest()
108
+
109
+ # ──────────────────────────────────────────────────────────────────
110
+ # Get / Put / Clear
111
+ # ──────────────────────────────────────────────────────────────────
112
+
113
+ def get(self, key: str | None) -> dict[ArtifactType, Artifact] | None:
114
+ """Retourne les outputs cachés pour la clé, ou ``None``.
115
+
116
+ Tolère ``key=None`` pour faciliter le pattern :
117
+
118
+ key = cache.compute_key(...)
119
+ cached = cache.get(key)
120
+ if cached is not None:
121
+ return cached
122
+ """
123
+ if key is None:
124
+ return None
125
+ return self._store.get(key)
126
+
127
+ def put(
128
+ self,
129
+ key: str | None,
130
+ outputs: dict[ArtifactType, Artifact],
131
+ ) -> None:
132
+ """Stocke les outputs sous la clé donnée. No-op si
133
+ ``key=None`` (alignement avec la convention "ne pas servir
134
+ un résultat douteux")."""
135
+ if key is None:
136
+ return
137
+ self._store[key] = dict(outputs) # copie défensive
138
+
139
+ def clear(self) -> None:
140
+ """Vide complètement le cache."""
141
+ self._store.clear()
142
+
143
+ def __len__(self) -> int:
144
+ return len(self._store)
145
+
146
+ def __contains__(self, key: str) -> bool:
147
+ return key in self._store
148
+
149
+ def keys(self) -> Iterable[str]:
150
+ """Liste des clés actuellement en cache (utile pour les tests)."""
151
+ return list(self._store.keys())
152
+
153
+
154
+ __all__ = ["ArtifactCache"]
picarones/pipeline/executor.py ADDED
@@ -0,0 +1,355 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """``PipelineExecutor`` mono-document — Sprint A14-S7.
2
+
3
+ Première version réelle de l'exécuteur du nouveau pipeline.
4
+ Mono-document, séquentiel, capture gracieuse des erreurs par
5
+ étape. L'orchestration corpus-wide (backpressure, timeout réel,
6
+ annulation propre) arrive au Sprint S8.
7
+
8
+ Contrat
9
+ -------
10
+ Le caller (typiquement un service applicatif au S19) fournit :
11
+
12
+ - une ``PipelineSpec`` validée (le caller doit avoir appelé
13
+ ``validate_spec`` en amont — l'executor re-valide quand même
14
+ pour défendre en profondeur),
15
+ - un ``DocumentRef`` du document à traiter,
16
+ - un dict ``{ArtifactType: Artifact}`` des entrées initiales
17
+ (typiquement ``{IMAGE: Artifact(...)}``),
18
+ - un ``RunContext`` qui porte ``document_id``, ``code_version``,
19
+ ``pipeline_name`` et un éventuel ``workspace_uri``,
20
+ - un ``adapter_resolver: Callable[[str], StepExecutor]`` qui
21
+ résout ``adapter_name`` → instance d'adapter. Au S19, ce
22
+ resolver sera fourni par ``app/services/adapter_registry``.
23
+
24
+ L'executor garantit :
25
+
26
+ - Les étapes sont exécutées dans l'ordre de ``spec.steps``.
27
+ - Chaque entrée d'une étape est résolue depuis le **bag versionné** :
28
+ si ``inputs_from[type] = "step_x"``, on prend la version
29
+ produite par ``step_x`` ; sinon, on prend la dernière version
30
+ disponible (comportement Sprint 66 historique).
31
+ - Toute exception levée par un adapter est capturée — le step
32
+ est marqué ``succeeded=False`` avec ``error=str(exc)``, et le
33
+ pipeline continue (les étapes en aval pourront échouer si
34
+ elles dépendaient des outputs de ce step, ce qui est explicite).
35
+ - Les ``output_types`` déclarés par l'adapter sont validés au
36
+ retour : si un type promis est manquant, le step est marqué
37
+ en échec avec ``error="missing_output: <type>"``.
38
+
39
+ L'executor ne garantit PAS (reportés à des sprints suivants) :
40
+
41
+ - Mesure du temps depuis le début d'exécution réelle (S8 — pour
42
+ l'instant, ``time.perf_counter()`` autour de ``execute()``).
43
+ - Annulation propre par signal aux workers en cours (S8).
44
+ - Cache d'artefacts inter-runs (S7 livre ``ArtifactCache`` mais
45
+ l'executor ne s'y branche pas encore — ça vient quand on aura
46
+ un cas d'usage concret de réutilisation).
47
+ - Parallélisation inter-documents ou inter-étapes (S8).
48
+
49
+ Définition de done du S7
50
+ ------------------------
51
+ ``PipelineExecutor.run(spec, document, initial_inputs, context)``
52
+ exécute une pipeline mock en moins de 100 ms et produit un
53
+ ``PipelineResult`` complet (durées par étape, artefacts produits,
54
+ ``succeeded`` agrégé).
55
+ """
56
+
57
+ from __future__ import annotations
58
+
59
+ import logging
60
+ import time
61
+ from typing import Callable
62
+
63
+ from picarones.domain.artifacts import Artifact, ArtifactType
64
+ from picarones.domain.documents import DocumentRef
65
+ from picarones.domain.errors import PicaronesError
66
+ from picarones.pipeline.protocols import StepExecutor
67
+ from picarones.pipeline.spec import INITIAL_STEP_ID, PipelineSpec, PipelineStep
68
+ from picarones.pipeline.types import PipelineResult, RunContext, StepResult
69
+ from picarones.pipeline.validation import validate_spec
70
+
71
+ logger = logging.getLogger(__name__)
72
+
73
+
74
+ class PipelineSpecInvalid(PicaronesError):
75
+ """``PipelineSpec`` mal formée — l'executor refuse de démarrer."""
76
+
77
+
78
+ #: Type alias pour le resolver d'adapters. Une fonction qui
79
+ #: prend un ``adapter_name`` (str) et retourne une instance
80
+ #: ``StepExecutor`` prête à l'emploi. Si le resolver lève
81
+ #: ``KeyError``, l'executor traduit en step en échec avec
82
+ #: ``error="adapter_not_found: ..."``.
83
+ AdapterResolver = Callable[[str], StepExecutor]
84
+
85
+
86
+ class PipelineExecutor:
87
+ """Exécuteur séquentiel mono-document.
88
+
89
+ Une instance peut traiter plusieurs documents (l'état est
90
+ porté par les paramètres de ``run()``, pas par le constructeur).
91
+ L'instance est thread-safe en lecture (rien n'est muté après
92
+ construction).
93
+
94
+ Parameters
95
+ ----------
96
+ adapter_resolver:
97
+ Callable qui résout un ``adapter_name`` en instance
98
+ ``StepExecutor``. Typiquement
99
+ ``lambda name: registry[name]`` en test, ou un service
100
+ applicatif qui injecte les bonnes dépendances en prod.
101
+ """
102
+
103
+ def __init__(self, adapter_resolver: AdapterResolver) -> None:
104
+ if not callable(adapter_resolver):
105
+ raise PicaronesError(
106
+ "PipelineExecutor : adapter_resolver doit être callable."
107
+ )
108
+ self._resolver = adapter_resolver
109
+
110
+ def run(
111
+ self,
112
+ spec: PipelineSpec,
113
+ document: DocumentRef,
114
+ initial_inputs: dict[ArtifactType, Artifact],
115
+ context: RunContext,
116
+ ) -> PipelineResult:
117
+ """Exécute une pipeline complète sur un document.
118
+
119
+ Returns
120
+ -------
121
+ PipelineResult
122
+ ``succeeded`` global = True ssi toutes les étapes ont
123
+ réussi. Une étape en échec n'arrête PAS l'exécution —
124
+ les étapes suivantes peuvent quand même tourner si
125
+ leurs entrées ne dépendent pas du step en échec.
126
+
127
+ Raises
128
+ ------
129
+ PipelineSpecInvalid
130
+ Si ``validate_spec`` détecte des erreurs de
131
+ cohérence. L'executor ne masque pas ce type d'erreur :
132
+ c'est un bug de programmation, pas un problème runtime.
133
+ """
134
+ # 1. Validation défensive.
135
+ errors = validate_spec(spec)
136
+ if errors:
137
+ messages = "; ".join(
138
+ f"{e.step_id or '<global>'}: {e.message}" for e in errors
139
+ )
140
+ raise PipelineSpecInvalid(
141
+ f"Spec '{spec.name}' invalide : {messages}"
142
+ )
143
+
144
+ # 2. Bag versionné : map (type, step_id) → Artifact.
145
+ # Plus une map type → step_id "le plus récent" pour le
146
+ # fallback quand inputs_from ne précise pas la source.
147
+ versioned: dict[tuple[ArtifactType, str], Artifact] = {}
148
+ latest_producer: dict[ArtifactType, str] = {}
149
+
150
+ for art_type, art in initial_inputs.items():
151
+ versioned[(art_type, INITIAL_STEP_ID)] = art
152
+ latest_producer[art_type] = INITIAL_STEP_ID
153
+
154
+ # 3. Exécution séquentielle.
155
+ step_results: list[StepResult] = []
156
+ all_artifacts: list[Artifact] = list(initial_inputs.values())
157
+ run_started = time.perf_counter()
158
+
159
+ for step in spec.steps:
160
+ result, produced = self._run_step(
161
+ step=step,
162
+ versioned=versioned,
163
+ latest_producer=latest_producer,
164
+ context=context,
165
+ )
166
+ step_results.append(result)
167
+ for art_type, art in produced.items():
168
+ versioned[(art_type, step.id)] = art
169
+ latest_producer[art_type] = step.id
170
+ all_artifacts.append(art)
171
+
172
+ run_duration = time.perf_counter() - run_started
173
+ succeeded = all(r.succeeded for r in step_results)
174
+
175
+ return PipelineResult(
176
+ pipeline_name=spec.name,
177
+ document_id=document.id,
178
+ step_results=tuple(step_results),
179
+ succeeded=succeeded,
180
+ duration_seconds=run_duration,
181
+ artifacts=tuple(all_artifacts),
182
+ )
183
+
184
+ # ──────────────────────────────────────────────────────────────────
185
+ # Helpers internes
186
+ # ──────────────────────────────────────────────────────────────────
187
+
188
+ def _run_step(
189
+ self,
190
+ *,
191
+ step: PipelineStep,
192
+ versioned: dict[tuple[ArtifactType, str], Artifact],
193
+ latest_producer: dict[ArtifactType, str],
194
+ context: RunContext,
195
+ ) -> tuple[StepResult, dict[ArtifactType, Artifact]]:
196
+ """Exécute une étape, retourne (result, artefacts produits).
197
+
198
+ Le tuple est important : si le step échoue, on retourne quand
199
+ même un dict vide pour les artefacts → le caller peut
200
+ continuer la boucle proprement.
201
+ """
202
+ step_started = time.perf_counter()
203
+
204
+ # 1. Résoudre les inputs depuis le bag.
205
+ try:
206
+ inputs = self._resolve_inputs(
207
+ step=step,
208
+ versioned=versioned,
209
+ latest_producer=latest_producer,
210
+ )
211
+ except _InputResolutionError as exc:
212
+ duration = time.perf_counter() - step_started
213
+ return (
214
+ StepResult(
215
+ step_id=step.id,
216
+ succeeded=False,
217
+ duration_seconds=duration,
218
+ error=str(exc),
219
+ ),
220
+ {},
221
+ )
222
+
223
+ # 2. Résoudre l'adapter.
224
+ try:
225
+ adapter = self._resolver(step.adapter_name)
226
+ except KeyError:
227
+ duration = time.perf_counter() - step_started
228
+ return (
229
+ StepResult(
230
+ step_id=step.id,
231
+ succeeded=False,
232
+ duration_seconds=duration,
233
+ error=f"adapter_not_found: {step.adapter_name}",
234
+ ),
235
+ {},
236
+ )
237
+ except Exception as exc: # noqa: BLE001
238
+ duration = time.perf_counter() - step_started
239
+ return (
240
+ StepResult(
241
+ step_id=step.id,
242
+ succeeded=False,
243
+ duration_seconds=duration,
244
+ error=f"adapter_resolver_failed: {exc}",
245
+ ),
246
+ {},
247
+ )
248
+
249
+ # 3. Exécuter. Toute exception est capturée → step en échec.
250
+ try:
251
+ outputs = adapter.execute(inputs, dict(step.params), context)
252
+ except Exception as exc: # noqa: BLE001
253
+ duration = time.perf_counter() - step_started
254
+ logger.warning(
255
+ "[pipeline:%s] step '%s' a levé : %s",
256
+ context.pipeline_name, step.id, exc,
257
+ )
258
+ return (
259
+ StepResult(
260
+ step_id=step.id,
261
+ succeeded=False,
262
+ duration_seconds=duration,
263
+ error=f"adapter_raised: {type(exc).__name__}: {exc}",
264
+ ),
265
+ {},
266
+ )
267
+
268
+ # 4. Valider les outputs déclarés.
269
+ missing = [
270
+ t for t in step.output_types
271
+ if t not in outputs
272
+ ]
273
+ duration = time.perf_counter() - step_started
274
+ if missing:
275
+ return (
276
+ StepResult(
277
+ step_id=step.id,
278
+ succeeded=False,
279
+ duration_seconds=duration,
280
+ error=(
281
+ "missing_output: "
282
+ f"{[t.value for t in missing]}"
283
+ ),
284
+ ),
285
+ # On garde quand même les outputs qui ont été produits,
286
+ # pour que les éventuels steps en aval puissent les
287
+ # utiliser si la pipeline est résiliente.
288
+ outputs,
289
+ )
290
+
291
+ # 5. Succès.
292
+ produced_map = {
293
+ t.value: a.id for t, a in outputs.items()
294
+ }
295
+ return (
296
+ StepResult(
297
+ step_id=step.id,
298
+ succeeded=True,
299
+ duration_seconds=duration,
300
+ produced_artifacts=produced_map,
301
+ ),
302
+ outputs,
303
+ )
304
+
305
+ def _resolve_inputs(
306
+ self,
307
+ *,
308
+ step: PipelineStep,
309
+ versioned: dict[tuple[ArtifactType, str], Artifact],
310
+ latest_producer: dict[ArtifactType, str],
311
+ ) -> dict[ArtifactType, Artifact]:
312
+ """Construit le dict ``{ArtifactType: Artifact}`` à passer
313
+ à l'adapter, en respectant ``step.inputs_from``.
314
+
315
+ Algorithme :
316
+
317
+ - Pour chaque type dans ``step.input_types`` :
318
+ - si ``step.inputs_from[type]`` est défini : exiger la
319
+ version produite par cette étape, lever sinon ;
320
+ - sinon : prendre la dernière version disponible
321
+ (``latest_producer[type]``), lever si aucune.
322
+ """
323
+ inputs: dict[ArtifactType, Artifact] = {}
324
+ for input_type in step.input_types:
325
+ source_step = step.inputs_from.get(input_type)
326
+ if source_step is None:
327
+ source_step = latest_producer.get(input_type)
328
+ if source_step is None:
329
+ raise _InputResolutionError(
330
+ f"missing_input: {input_type.value} "
331
+ "non disponible dans le bag d'artefacts"
332
+ )
333
+ key = (input_type, source_step)
334
+ if key not in versioned:
335
+ raise _InputResolutionError(
336
+ f"missing_input: {input_type.value}"
337
+ f"@{source_step}"
338
+ )
339
+ inputs[input_type] = versioned[key]
340
+ return inputs
341
+
342
+
343
+ class _InputResolutionError(Exception):
344
+ """Erreur interne signalant qu'un input n'a pas pu être résolu.
345
+
346
+ Capturée par ``_run_step`` qui la traduit en ``StepResult``
347
+ en échec avec ``error="missing_input: ..."``.
348
+ """
349
+
350
+
351
+ __all__ = [
352
+ "AdapterResolver",
353
+ "PipelineExecutor",
354
+ "PipelineSpecInvalid",
355
+ ]
tests/pipeline/test_sprint_a14_s7_artifact_cache.py ADDED
@@ -0,0 +1,151 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Sprint A14-S7 — ``ArtifactCache`` minimal.
2
+
3
+ Vérifie compute_key déterministe, get/put basique, et garde-fou
4
+ "un seul input sans content_hash → pas de clé".
5
+ """
6
+
7
+ from __future__ import annotations
8
+
9
+ from picarones.domain import Artifact, ArtifactType
10
+ from picarones.pipeline import ArtifactCache, PipelineStep
11
+
12
+
13
+ def _hashed_artifact(
14
+ suffix: str, type_: ArtifactType, content_hash: str | None = None,
15
+ ) -> Artifact:
16
+ return Artifact(
17
+ id=f"d1:{suffix}",
18
+ document_id="d1",
19
+ type=type_,
20
+ content_hash=content_hash,
21
+ )
22
+
23
+
24
+ def _ocr_step() -> PipelineStep:
25
+ return PipelineStep(
26
+ id="ocr", kind="ocr", adapter_name="tesseract",
27
+ params={"lang": "fra"},
28
+ input_types=(ArtifactType.IMAGE,),
29
+ output_types=(ArtifactType.RAW_TEXT,),
30
+ )
31
+
32
+
33
+ class TestComputeKey:
34
+ def test_returns_string_when_all_inputs_have_hash(self) -> None:
35
+ cache = ArtifactCache()
36
+ img = _hashed_artifact("img", ArtifactType.IMAGE, "a" * 64)
37
+ key = cache.compute_key(_ocr_step(), {ArtifactType.IMAGE: img}, "1.0.0")
38
+ assert key is not None
39
+ assert len(key) == 64 # SHA-256 hex
40
+
41
+ def test_deterministic(self) -> None:
42
+ cache = ArtifactCache()
43
+ img = _hashed_artifact("img", ArtifactType.IMAGE, "a" * 64)
44
+ k1 = cache.compute_key(_ocr_step(), {ArtifactType.IMAGE: img}, "1.0.0")
45
+ k2 = cache.compute_key(_ocr_step(), {ArtifactType.IMAGE: img}, "1.0.0")
46
+ assert k1 == k2
47
+
48
+ def test_different_content_hash_different_key(self) -> None:
49
+ cache = ArtifactCache()
50
+ img_a = _hashed_artifact("a", ArtifactType.IMAGE, "a" * 64)
51
+ img_b = _hashed_artifact("b", ArtifactType.IMAGE, "b" * 64)
52
+ k_a = cache.compute_key(_ocr_step(), {ArtifactType.IMAGE: img_a}, "1.0.0")
53
+ k_b = cache.compute_key(_ocr_step(), {ArtifactType.IMAGE: img_b}, "1.0.0")
54
+ assert k_a != k_b
55
+
56
+ def test_different_code_version_different_key(self) -> None:
57
+ cache = ArtifactCache()
58
+ img = _hashed_artifact("img", ArtifactType.IMAGE, "a" * 64)
59
+ k1 = cache.compute_key(_ocr_step(), {ArtifactType.IMAGE: img}, "1.0.0")
60
+ k2 = cache.compute_key(_ocr_step(), {ArtifactType.IMAGE: img}, "2.0.0")
61
+ assert k1 != k2
62
+
63
+ def test_different_step_params_different_key(self) -> None:
64
+ cache = ArtifactCache()
65
+ img = _hashed_artifact("img", ArtifactType.IMAGE, "a" * 64)
66
+ step_fra = PipelineStep(
67
+ id="ocr", kind="ocr", adapter_name="tesseract",
68
+ params={"lang": "fra"},
69
+ input_types=(ArtifactType.IMAGE,),
70
+ output_types=(ArtifactType.RAW_TEXT,),
71
+ )
72
+ step_eng = PipelineStep(
73
+ id="ocr", kind="ocr", adapter_name="tesseract",
74
+ params={"lang": "eng"},
75
+ input_types=(ArtifactType.IMAGE,),
76
+ output_types=(ArtifactType.RAW_TEXT,),
77
+ )
78
+ k_fra = cache.compute_key(step_fra, {ArtifactType.IMAGE: img}, "1.0.0")
79
+ k_eng = cache.compute_key(step_eng, {ArtifactType.IMAGE: img}, "1.0.0")
80
+ assert k_fra != k_eng
81
+
82
+ def test_returns_none_when_input_has_no_hash(self) -> None:
83
+ cache = ArtifactCache()
84
+ img = _hashed_artifact("img", ArtifactType.IMAGE, content_hash=None)
85
+ key = cache.compute_key(_ocr_step(), {ArtifactType.IMAGE: img}, "1.0.0")
86
+ assert key is None
87
+
88
+
89
+ class TestGetPutClear:
90
+ def test_get_miss_returns_none(self) -> None:
91
+ cache = ArtifactCache()
92
+ assert cache.get("non_existent") is None
93
+
94
+ def test_put_then_get_returns_outputs(self) -> None:
95
+ cache = ArtifactCache()
96
+ artifacts = {
97
+ ArtifactType.RAW_TEXT: _hashed_artifact(
98
+ "raw", ArtifactType.RAW_TEXT, "f" * 64,
99
+ ),
100
+ }
101
+ cache.put("k1", artifacts)
102
+ cached = cache.get("k1")
103
+ assert cached is not None
104
+ assert ArtifactType.RAW_TEXT in cached
105
+
106
+ def test_put_with_none_key_is_noop(self) -> None:
107
+ cache = ArtifactCache()
108
+ cache.put(None, {ArtifactType.RAW_TEXT: _hashed_artifact(
109
+ "raw", ArtifactType.RAW_TEXT, "f" * 64,
110
+ )})
111
+ assert len(cache) == 0
112
+
113
+ def test_get_with_none_key_returns_none(self) -> None:
114
+ cache = ArtifactCache()
115
+ assert cache.get(None) is None
116
+
117
+ def test_clear(self) -> None:
118
+ cache = ArtifactCache()
119
+ cache.put("k", {ArtifactType.RAW_TEXT: _hashed_artifact(
120
+ "raw", ArtifactType.RAW_TEXT, "f" * 64,
121
+ )})
122
+ assert len(cache) == 1
123
+ cache.clear()
124
+ assert len(cache) == 0
125
+
126
+ def test_contains(self) -> None:
127
+ cache = ArtifactCache()
128
+ cache.put("foo", {})
129
+ assert "foo" in cache
130
+ assert "bar" not in cache
131
+
132
+ def test_keys(self) -> None:
133
+ cache = ArtifactCache()
134
+ cache.put("a", {})
135
+ cache.put("b", {})
136
+ assert sorted(cache.keys()) == ["a", "b"]
137
+
138
+ def test_put_makes_defensive_copy(self) -> None:
139
+ """Modifier le dict d'origine après put() ne doit pas
140
+ affecter le contenu du cache."""
141
+ cache = ArtifactCache()
142
+ artifacts = {
143
+ ArtifactType.RAW_TEXT: _hashed_artifact(
144
+ "raw", ArtifactType.RAW_TEXT, "f" * 64,
145
+ ),
146
+ }
147
+ cache.put("k", artifacts)
148
+ artifacts.clear()
149
+ cached = cache.get("k")
150
+ assert cached is not None
151
+ assert ArtifactType.RAW_TEXT in cached
tests/pipeline/test_sprint_a14_s7_executor.py ADDED
@@ -0,0 +1,465 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Sprint A14-S7 — ``PipelineExecutor`` mono-document.
2
+
3
+ Tous les tests utilisent des stubs ``StepExecutor`` définis dans
4
+ ce fichier — aucun adapter réel n'est instancié, ce qui rend la
5
+ suite rapide et déterministe.
6
+
7
+ Couvre les cas critiques :
8
+
9
+ - pipeline qui réussit complètement,
10
+ - step qui lève → step en échec, pipeline continue,
11
+ - adapter introuvable (KeyError du resolver),
12
+ - output manquant (adapter ne retourne pas un type promis),
13
+ - input manquant (initial_inputs incomplet),
14
+ - fork avec ``inputs_from`` explicite (reprise du Sprint 66),
15
+ - spec invalide → ``PipelineSpecInvalid`` levée,
16
+ - bag versionné : étape qui consomme l'output d'une étape antérieure.
17
+ """
18
+
19
+ from __future__ import annotations
20
+
21
+ import pytest
22
+
23
+ from picarones.domain import (
24
+ Artifact,
25
+ ArtifactType,
26
+ DocumentRef,
27
+ PicaronesError,
28
+ )
29
+ from picarones.pipeline import (
30
+ PipelineExecutor,
31
+ PipelineResult,
32
+ PipelineSpec,
33
+ PipelineSpecInvalid,
34
+ PipelineStep,
35
+ RunContext,
36
+ )
37
+
38
+
39
+ # ──────────────────────────────────────────────────────────────────────
40
+ # Stubs ``StepExecutor``
41
+ # ──────────────────────────────────────────────────────────────────────
42
+
43
+
44
+ class _StubOCR:
45
+ name = "stub_ocr"
46
+ input_types = frozenset({ArtifactType.IMAGE})
47
+ output_types = frozenset({ArtifactType.RAW_TEXT, ArtifactType.ALTO_XML})
48
+ execution_mode = "cpu"
49
+
50
+ def execute(self, inputs, params, context):
51
+ return {
52
+ ArtifactType.RAW_TEXT: Artifact(
53
+ id=f"{context.document_id}:ocr:raw_text",
54
+ document_id=context.document_id,
55
+ type=ArtifactType.RAW_TEXT,
56
+ produced_by_step="ocr",
57
+ ),
58
+ ArtifactType.ALTO_XML: Artifact(
59
+ id=f"{context.document_id}:ocr:alto_xml",
60
+ document_id=context.document_id,
61
+ type=ArtifactType.ALTO_XML,
62
+ produced_by_step="ocr",
63
+ ),
64
+ }
65
+
66
+
67
+ class _StubLLM:
68
+ name = "stub_llm"
69
+ input_types = frozenset({ArtifactType.RAW_TEXT})
70
+ output_types = frozenset({ArtifactType.CORRECTED_TEXT})
71
+ execution_mode = "io"
72
+
73
+ def execute(self, inputs, params, context):
74
+ return {
75
+ ArtifactType.CORRECTED_TEXT: Artifact(
76
+ id=f"{context.document_id}:llm:corrected_text",
77
+ document_id=context.document_id,
78
+ type=ArtifactType.CORRECTED_TEXT,
79
+ produced_by_step="llm",
80
+ ),
81
+ }
82
+
83
+
84
+ class _CrashingStub:
85
+ name = "crashing"
86
+ input_types = frozenset({ArtifactType.RAW_TEXT})
87
+ output_types = frozenset({ArtifactType.CORRECTED_TEXT})
88
+ execution_mode = "cpu"
89
+
90
+ def execute(self, inputs, params, context):
91
+ raise RuntimeError("simulated boom")
92
+
93
+
94
+ class _IncompleteOutputStub:
95
+ """Promet RAW_TEXT mais ne le retourne pas — viole le contrat."""
96
+
97
+ name = "incomplete"
98
+ input_types = frozenset({ArtifactType.IMAGE})
99
+ output_types = frozenset({ArtifactType.RAW_TEXT})
100
+ execution_mode = "cpu"
101
+
102
+ def execute(self, inputs, params, context):
103
+ return {} # vide intentionnellement
104
+
105
+
106
+ class _SecondOCRStub:
107
+ """Second OCR pour tester le fork via inputs_from."""
108
+
109
+ name = "ocr_b"
110
+ input_types = frozenset({ArtifactType.IMAGE})
111
+ output_types = frozenset({ArtifactType.RAW_TEXT})
112
+ execution_mode = "cpu"
113
+
114
+ def execute(self, inputs, params, context):
115
+ return {
116
+ ArtifactType.RAW_TEXT: Artifact(
117
+ id=f"{context.document_id}:ocr_b:raw_text",
118
+ document_id=context.document_id,
119
+ type=ArtifactType.RAW_TEXT,
120
+ produced_by_step="ocr_b",
121
+ ),
122
+ }
123
+
124
+
125
+ # ──────────────────────────────────────────────────────────────────────
126
+ # Fixtures
127
+ # ──────────────────────────────────────────────────────────────────────
128
+
129
+
130
+ @pytest.fixture
131
+ def registry() -> dict[str, object]:
132
+ return {
133
+ "stub_ocr": _StubOCR(),
134
+ "stub_ocr_b": _SecondOCRStub(),
135
+ "stub_llm": _StubLLM(),
136
+ "crashing": _CrashingStub(),
137
+ "incomplete": _IncompleteOutputStub(),
138
+ }
139
+
140
+
141
+ @pytest.fixture
142
+ def executor(registry: dict[str, object]) -> PipelineExecutor:
143
+ return PipelineExecutor(adapter_resolver=lambda name: registry[name])
144
+
145
+
146
+ @pytest.fixture
147
+ def doc() -> DocumentRef:
148
+ return DocumentRef(id="doc1", image_uri="/tmp/x.png")
149
+
150
+
151
+ @pytest.fixture
152
+ def ctx() -> RunContext:
153
+ return RunContext(
154
+ document_id="doc1", code_version="1.0.0", pipeline_name="test",
155
+ )
156
+
157
+
158
+ @pytest.fixture
159
+ def image_artifact() -> Artifact:
160
+ return Artifact(
161
+ id="doc1:image",
162
+ document_id="doc1",
163
+ type=ArtifactType.IMAGE,
164
+ uri="/tmp/x.png",
165
+ )
166
+
167
+
168
+ def _ocr_only_spec() -> PipelineSpec:
169
+ return PipelineSpec(
170
+ name="ocr_only",
171
+ initial_inputs=(ArtifactType.IMAGE,),
172
+ steps=(
173
+ PipelineStep(
174
+ id="ocr", kind="ocr", adapter_name="stub_ocr",
175
+ input_types=(ArtifactType.IMAGE,),
176
+ output_types=(
177
+ ArtifactType.RAW_TEXT, ArtifactType.ALTO_XML,
178
+ ),
179
+ ),
180
+ ),
181
+ )
182
+
183
+
184
+ def _ocr_llm_spec() -> PipelineSpec:
185
+ return PipelineSpec(
186
+ name="ocr_llm",
187
+ initial_inputs=(ArtifactType.IMAGE,),
188
+ steps=(
189
+ PipelineStep(
190
+ id="ocr", kind="ocr", adapter_name="stub_ocr",
191
+ input_types=(ArtifactType.IMAGE,),
192
+ output_types=(
193
+ ArtifactType.RAW_TEXT, ArtifactType.ALTO_XML,
194
+ ),
195
+ ),
196
+ PipelineStep(
197
+ id="llm", kind="post_correction", adapter_name="stub_llm",
198
+ input_types=(ArtifactType.RAW_TEXT,),
199
+ output_types=(ArtifactType.CORRECTED_TEXT,),
200
+ inputs_from={ArtifactType.RAW_TEXT: "ocr"},
201
+ ),
202
+ ),
203
+ )
204
+
205
+
206
+ # ──────────────────────────────────────────────────────────────────────
207
+ # Cas nominaux
208
+ # ──────────────────────────────────────────────────────────────────────
209
+
210
+
211
+ class TestNominalRun:
212
+ def test_single_step_pipeline(
213
+ self, executor, doc, ctx, image_artifact,
214
+ ) -> None:
215
+ spec = _ocr_only_spec()
216
+ result = executor.run(
217
+ spec, doc, {ArtifactType.IMAGE: image_artifact}, ctx,
218
+ )
219
+ assert isinstance(result, PipelineResult)
220
+ assert result.succeeded
221
+ assert result.pipeline_name == "ocr_only"
222
+ assert result.document_id == "doc1"
223
+ assert len(result.step_results) == 1
224
+ assert result.step_results[0].succeeded
225
+ assert result.step_results[0].step_id == "ocr"
226
+
227
+ def test_two_step_pipeline_chains_artifacts(
228
+ self, executor, doc, ctx, image_artifact,
229
+ ) -> None:
230
+ spec = _ocr_llm_spec()
231
+ result = executor.run(
232
+ spec, doc, {ArtifactType.IMAGE: image_artifact}, ctx,
233
+ )
234
+ assert result.succeeded
235
+ # Tous les artefacts sont là : initial + 2 OCR + 1 LLM = 4
236
+ assert len(result.artifacts) == 4
237
+ types = {a.type for a in result.artifacts}
238
+ assert ArtifactType.IMAGE in types
239
+ assert ArtifactType.RAW_TEXT in types
240
+ assert ArtifactType.ALTO_XML in types
241
+ assert ArtifactType.CORRECTED_TEXT in types
242
+
243
+ def test_step_results_record_produced_artifacts(
244
+ self, executor, doc, ctx, image_artifact,
245
+ ) -> None:
246
+ result = executor.run(
247
+ _ocr_llm_spec(), doc,
248
+ {ArtifactType.IMAGE: image_artifact}, ctx,
249
+ )
250
+ ocr_result = result.step_result_by_id("ocr")
251
+ assert ocr_result is not None
252
+ assert "raw_text" in ocr_result.produced_artifacts
253
+ assert "alto_xml" in ocr_result.produced_artifacts
254
+
255
+
256
+ # ──────────────────────────────────────────────────────────────────────
257
+ # Cas d'erreur — capture gracieuse
258
+ # ──────────────────────────────────────────────────────────────────────
259
+
260
+
261
+ class TestErrorCapture:
262
+ def test_step_that_raises_marks_step_failed(
263
+ self, executor, doc, ctx, image_artifact,
264
+ ) -> None:
265
+ """Un step qui lève → step en échec, pipeline continue."""
266
+ spec = PipelineSpec(
267
+ name="ocr_then_crash",
268
+ initial_inputs=(ArtifactType.IMAGE,),
269
+ steps=(
270
+ PipelineStep(
271
+ id="ocr", kind="ocr", adapter_name="stub_ocr",
272
+ input_types=(ArtifactType.IMAGE,),
273
+ output_types=(
274
+ ArtifactType.RAW_TEXT, ArtifactType.ALTO_XML,
275
+ ),
276
+ ),
277
+ PipelineStep(
278
+ id="boom", kind="post_correction",
279
+ adapter_name="crashing",
280
+ input_types=(ArtifactType.RAW_TEXT,),
281
+ output_types=(ArtifactType.CORRECTED_TEXT,),
282
+ ),
283
+ ),
284
+ )
285
+ result = executor.run(
286
+ spec, doc, {ArtifactType.IMAGE: image_artifact}, ctx,
287
+ )
288
+ assert not result.succeeded
289
+ assert result.step_results[0].succeeded
290
+ assert not result.step_results[1].succeeded
291
+ assert "adapter_raised" in (result.step_results[1].error or "")
292
+ assert "simulated boom" in (result.step_results[1].error or "")
293
+
294
+ def test_unknown_adapter_yields_step_failure(
295
+ self, executor, doc, ctx, image_artifact,
296
+ ) -> None:
297
+ spec = PipelineSpec(
298
+ name="bad_adapter",
299
+ initial_inputs=(ArtifactType.IMAGE,),
300
+ steps=(
301
+ PipelineStep(
302
+ id="ocr", kind="ocr", adapter_name="not_in_registry",
303
+ input_types=(ArtifactType.IMAGE,),
304
+ output_types=(ArtifactType.RAW_TEXT,),
305
+ ),
306
+ ),
307
+ )
308
+ result = executor.run(
309
+ spec, doc, {ArtifactType.IMAGE: image_artifact}, ctx,
310
+ )
311
+ assert not result.succeeded
312
+ assert "adapter_not_found" in (result.step_results[0].error or "")
313
+
314
+ def test_adapter_returns_missing_output(
315
+ self, executor, doc, ctx, image_artifact,
316
+ ) -> None:
317
+ spec = PipelineSpec(
318
+ name="incomplete",
319
+ initial_inputs=(ArtifactType.IMAGE,),
320
+ steps=(
321
+ PipelineStep(
322
+ id="bad", kind="ocr", adapter_name="incomplete",
323
+ input_types=(ArtifactType.IMAGE,),
324
+ output_types=(ArtifactType.RAW_TEXT,),
325
+ ),
326
+ ),
327
+ )
328
+ result = executor.run(
329
+ spec, doc, {ArtifactType.IMAGE: image_artifact}, ctx,
330
+ )
331
+ assert not result.succeeded
332
+ assert "missing_output" in (result.step_results[0].error or "")
333
+
334
+ def test_initial_inputs_missing_blocks_first_step(
335
+ self, executor, doc, ctx,
336
+ ) -> None:
337
+ """Si initial_inputs ne fournit pas IMAGE alors qu'un step en
338
+ a besoin, le step échoue avec missing_input."""
339
+ # On garde la spec valide (initial_inputs déclare IMAGE) mais
340
+ # le caller "oublie" de fournir l'artefact → résolution
341
+ # d'inputs échoue au runtime.
342
+ spec = _ocr_only_spec()
343
+ result = executor.run(spec, doc, {}, ctx) # vide
344
+ assert not result.succeeded
345
+ assert "missing_input" in (result.step_results[0].error or "")
346
+
347
+
348
+ # ──────────────────────────────────────────────────────────────────────
349
+ # Bag versionné — fork via ``inputs_from`` (Sprint 66 historique)
350
+ # ──────────────────────────────────────────────────────────────────────
351
+
352
+
353
+ class TestBagVersionedFork:
354
+ def test_inputs_from_explicit_picks_correct_version(
355
+ self, executor, doc, ctx, image_artifact,
356
+ ) -> None:
357
+ """Deux OCR successifs produisent RAW_TEXT. L'étape LLM
358
+ précise ``inputs_from = "ocr_a"`` et doit consommer la
359
+ version A, pas la dernière (B)."""
360
+ spec = PipelineSpec(
361
+ name="fork",
362
+ initial_inputs=(ArtifactType.IMAGE,),
363
+ steps=(
364
+ PipelineStep(
365
+ id="ocr_a", kind="ocr", adapter_name="stub_ocr",
366
+ input_types=(ArtifactType.IMAGE,),
367
+ output_types=(
368
+ ArtifactType.RAW_TEXT, ArtifactType.ALTO_XML,
369
+ ),
370
+ ),
371
+ PipelineStep(
372
+ id="ocr_b", kind="ocr", adapter_name="stub_ocr_b",
373
+ input_types=(ArtifactType.IMAGE,),
374
+ output_types=(ArtifactType.RAW_TEXT,),
375
+ ),
376
+ PipelineStep(
377
+ id="llm", kind="post_correction",
378
+ adapter_name="stub_llm",
379
+ input_types=(ArtifactType.RAW_TEXT,),
380
+ output_types=(ArtifactType.CORRECTED_TEXT,),
381
+ inputs_from={ArtifactType.RAW_TEXT: "ocr_a"},
382
+ ),
383
+ ),
384
+ )
385
+ result = executor.run(
386
+ spec, doc, {ArtifactType.IMAGE: image_artifact}, ctx,
387
+ )
388
+ assert result.succeeded
389
+ # 1 image initiale + 2 (ocr_a) + 1 (ocr_b) + 1 (llm) = 5
390
+ assert len(result.artifacts) == 5
391
+
392
+ def test_default_picks_latest_when_no_inputs_from(
393
+ self, executor, doc, ctx, image_artifact,
394
+ ) -> None:
395
+ """Sans ``inputs_from``, le LLM consomme le dernier RAW_TEXT,
396
+ donc ``ocr_b`` (dernière étape qui a produit le type)."""
397
+ spec = PipelineSpec(
398
+ name="latest",
399
+ initial_inputs=(ArtifactType.IMAGE,),
400
+ steps=(
401
+ PipelineStep(
402
+ id="ocr_a", kind="ocr", adapter_name="stub_ocr",
403
+ input_types=(ArtifactType.IMAGE,),
404
+ output_types=(
405
+ ArtifactType.RAW_TEXT, ArtifactType.ALTO_XML,
406
+ ),
407
+ ),
408
+ PipelineStep(
409
+ id="ocr_b", kind="ocr", adapter_name="stub_ocr_b",
410
+ input_types=(ArtifactType.IMAGE,),
411
+ output_types=(ArtifactType.RAW_TEXT,),
412
+ ),
413
+ PipelineStep(
414
+ id="llm", kind="post_correction",
415
+ adapter_name="stub_llm",
416
+ input_types=(ArtifactType.RAW_TEXT,),
417
+ output_types=(ArtifactType.CORRECTED_TEXT,),
418
+ # pas d'inputs_from
419
+ ),
420
+ ),
421
+ )
422
+ result = executor.run(
423
+ spec, doc, {ArtifactType.IMAGE: image_artifact}, ctx,
424
+ )
425
+ assert result.succeeded
426
+
427
+
428
+ # ──────────────────────────────────────────────────────────────────────
429
+ # Validation défensive
430
+ # ──────────────────────────────────────────────────────────────────────
431
+
432
+
433
+ class TestDefensiveValidation:
434
+ def test_invalid_spec_raises(
435
+ self, executor, doc, ctx, image_artifact,
436
+ ) -> None:
437
+ """Spec avec ID dupliqué — l'executor lève sans appeler
438
+ aucun adapter."""
439
+ spec = PipelineSpec(
440
+ name="dup",
441
+ initial_inputs=(ArtifactType.IMAGE,),
442
+ steps=(
443
+ PipelineStep(
444
+ id="step", kind="ocr", adapter_name="stub_ocr",
445
+ input_types=(ArtifactType.IMAGE,),
446
+ output_types=(
447
+ ArtifactType.RAW_TEXT, ArtifactType.ALTO_XML,
448
+ ),
449
+ ),
450
+ PipelineStep(
451
+ id="step", kind="post_correction",
452
+ adapter_name="stub_llm",
453
+ input_types=(ArtifactType.RAW_TEXT,),
454
+ output_types=(ArtifactType.CORRECTED_TEXT,),
455
+ ),
456
+ ),
457
+ )
458
+ with pytest.raises(PipelineSpecInvalid, match="dupliqué"):
459
+ executor.run(
460
+ spec, doc, {ArtifactType.IMAGE: image_artifact}, ctx,
461
+ )
462
+
463
+ def test_non_callable_resolver_rejected(self) -> None:
464
+ with pytest.raises(PicaronesError, match="callable"):
465
+ PipelineExecutor(adapter_resolver="not_callable") # type: ignore[arg-type]
tests/pipeline/test_sprint_a14_s7_timing.py ADDED
@@ -0,0 +1,188 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Sprint A14-S7 — mesure de temps par étape.
2
+
3
+ Vérifie que ``StepResult.duration_seconds`` reflète le temps réel
4
+ d'exécution de l'adapter (pas zéro, pas négatif), et que la durée
5
+ totale est cohérente avec la somme des étapes.
6
+
7
+ Définition de done : pipeline mock en moins de 100 ms.
8
+ """
9
+
10
+ from __future__ import annotations
11
+
12
+ import time
13
+
14
+ import pytest
15
+
16
+ from picarones.domain import Artifact, ArtifactType, DocumentRef
17
+ from picarones.pipeline import (
18
+ PipelineExecutor,
19
+ PipelineSpec,
20
+ PipelineStep,
21
+ RunContext,
22
+ )
23
+
24
+
25
+ class _SlowStub:
26
+ """Adapter qui dort un certain temps avant de retourner."""
27
+
28
+ def __init__(self, sleep_seconds: float) -> None:
29
+ self._sleep = sleep_seconds
30
+
31
+ name = "slow"
32
+ input_types = frozenset({ArtifactType.IMAGE})
33
+ output_types = frozenset({ArtifactType.RAW_TEXT})
34
+ execution_mode = "cpu"
35
+
36
+ def execute(self, inputs, params, context):
37
+ time.sleep(self._sleep)
38
+ return {
39
+ ArtifactType.RAW_TEXT: Artifact(
40
+ id=f"{context.document_id}:slow:raw_text",
41
+ document_id=context.document_id,
42
+ type=ArtifactType.RAW_TEXT,
43
+ produced_by_step="slow",
44
+ ),
45
+ }
46
+
47
+
48
+ class _InstantStub:
49
+ name = "instant"
50
+ input_types = frozenset({ArtifactType.RAW_TEXT})
51
+ output_types = frozenset({ArtifactType.CORRECTED_TEXT})
52
+ execution_mode = "io"
53
+
54
+ def execute(self, inputs, params, context):
55
+ return {
56
+ ArtifactType.CORRECTED_TEXT: Artifact(
57
+ id=f"{context.document_id}:instant:corrected",
58
+ document_id=context.document_id,
59
+ type=ArtifactType.CORRECTED_TEXT,
60
+ produced_by_step="instant",
61
+ ),
62
+ }
63
+
64
+
65
+ @pytest.fixture
66
+ def doc() -> DocumentRef:
67
+ return DocumentRef(id="d1", image_uri="/tmp/x.png")
68
+
69
+
70
+ @pytest.fixture
71
+ def ctx() -> RunContext:
72
+ return RunContext(
73
+ document_id="d1", code_version="1.0.0", pipeline_name="timing",
74
+ )
75
+
76
+
77
+ @pytest.fixture
78
+ def image_artifact() -> Artifact:
79
+ return Artifact(
80
+ id="d1:image", document_id="d1", type=ArtifactType.IMAGE,
81
+ uri="/tmp/x.png",
82
+ )
83
+
84
+
85
+ def _spec_two_steps() -> PipelineSpec:
86
+ return PipelineSpec(
87
+ name="timing",
88
+ initial_inputs=(ArtifactType.IMAGE,),
89
+ steps=(
90
+ PipelineStep(
91
+ id="slow", kind="ocr", adapter_name="slow",
92
+ input_types=(ArtifactType.IMAGE,),
93
+ output_types=(ArtifactType.RAW_TEXT,),
94
+ ),
95
+ PipelineStep(
96
+ id="instant", kind="post_correction",
97
+ adapter_name="instant",
98
+ input_types=(ArtifactType.RAW_TEXT,),
99
+ output_types=(ArtifactType.CORRECTED_TEXT,),
100
+ inputs_from={ArtifactType.RAW_TEXT: "slow"},
101
+ ),
102
+ ),
103
+ )
104
+
105
+
106
+ class TestExecutorTiming:
107
+ def test_step_duration_reflects_sleep(
108
+ self, doc, ctx, image_artifact,
109
+ ) -> None:
110
+ registry = {"slow": _SlowStub(0.05), "instant": _InstantStub()}
111
+ executor = PipelineExecutor(adapter_resolver=lambda n: registry[n])
112
+
113
+ result = executor.run(
114
+ _spec_two_steps(), doc,
115
+ {ArtifactType.IMAGE: image_artifact}, ctx,
116
+ )
117
+ assert result.succeeded
118
+ slow_dur = result.step_result_by_id("slow").duration_seconds # type: ignore[union-attr]
119
+ # Marges larges pour absorber le bruit OS.
120
+ assert 0.04 < slow_dur < 0.5
121
+
122
+ def test_total_duration_at_least_sum_of_steps(
123
+ self, doc, ctx, image_artifact,
124
+ ) -> None:
125
+ registry = {"slow": _SlowStub(0.02), "instant": _InstantStub()}
126
+ executor = PipelineExecutor(adapter_resolver=lambda n: registry[n])
127
+
128
+ result = executor.run(
129
+ _spec_two_steps(), doc,
130
+ {ArtifactType.IMAGE: image_artifact}, ctx,
131
+ )
132
+ sum_steps = sum(r.duration_seconds for r in result.step_results)
133
+ # Le total inclut l'overhead orchestration → légèrement >.
134
+ assert result.duration_seconds >= sum_steps - 0.01
135
+ # Marge raisonnable pour ne pas exploser à cause du timing.
136
+ assert result.duration_seconds < sum_steps + 0.5
137
+
138
+ def test_duration_is_non_negative_even_on_failure(
139
+ self, doc, ctx, image_artifact,
140
+ ) -> None:
141
+ class _Crasher:
142
+ name = "crash"
143
+ input_types = frozenset({ArtifactType.IMAGE})
144
+ output_types = frozenset({ArtifactType.RAW_TEXT})
145
+ execution_mode = "cpu"
146
+
147
+ def execute(self, *a, **kw):
148
+ raise RuntimeError("boom")
149
+
150
+ registry = {"crash": _Crasher()}
151
+ executor = PipelineExecutor(adapter_resolver=lambda n: registry[n])
152
+ spec = PipelineSpec(
153
+ name="crashing",
154
+ initial_inputs=(ArtifactType.IMAGE,),
155
+ steps=(
156
+ PipelineStep(
157
+ id="bad", kind="ocr", adapter_name="crash",
158
+ input_types=(ArtifactType.IMAGE,),
159
+ output_types=(ArtifactType.RAW_TEXT,),
160
+ ),
161
+ ),
162
+ )
163
+ result = executor.run(
164
+ spec, doc, {ArtifactType.IMAGE: image_artifact}, ctx,
165
+ )
166
+ assert not result.succeeded
167
+ assert result.step_results[0].duration_seconds >= 0.0
168
+
169
+ def test_def_of_done_under_100ms(
170
+ self, doc, ctx, image_artifact,
171
+ ) -> None:
172
+ """Définition de done du S7 : pipeline mock en < 100ms."""
173
+ registry = {
174
+ "slow": _SlowStub(0.0), # pas de sleep
175
+ "instant": _InstantStub(),
176
+ }
177
+ executor = PipelineExecutor(adapter_resolver=lambda n: registry[n])
178
+
179
+ t0 = time.perf_counter()
180
+ result = executor.run(
181
+ _spec_two_steps(), doc,
182
+ {ArtifactType.IMAGE: image_artifact}, ctx,
183
+ )
184
+ elapsed = time.perf_counter() - t0
185
+
186
+ assert result.succeeded
187
+ # Marge généreuse pour la CI : 100ms est largement atteignable.
188
+ assert elapsed < 0.1, f"trop lent : {elapsed * 1000:.2f}ms"