Claude commited on
Commit
27d155d
·
unverified ·
1 Parent(s): b5bb4fa

feat(pipeline): Sprint A14-S47 — branchement ArtifactStore (fix audit #1)

Browse files

L'audit du rewrite avait identifié que ArtifactStore (S29) était
livré comme module standalone sans consommateur runtime — la promesse
de « reprise par hash » n'était pas tenue. C'était la dette technique
critique #1 que la directive interdisait.

Ce sprint câble le store dans le PipelineExecutor pour vraie reprise.

picarones/pipeline/cache_protocol.py (nouveau, 86 lignes)
---------------------------------------------------------
Pattern hexagonal : la couche pipeline/ est plus interne que
adapters/storage/ dans la hiérarchie documentée. Importer depuis
adapters/ violerait la règle de dépendance.

Inversion : on définit le port ``ArtifactCachePort`` (Protocol
@runtime_checkable) dans pipeline/. ``ArtifactStore`` (adapter)
satisfait ce port par duck typing — aucune modification requise.
N'importe quel store custom (Redis, S3) qui implémente get/put/
__contains__ est compatible sans hériter de l'ABC.

picarones/pipeline/cache_helpers.py (nouveau, 178 lignes)
---------------------------------------------------------
Fonctions pures pour le branchement :
- ``compute_step_artifact_key(step, inputs, context)`` : ArtifactKey
multi-paramètres (input_hashes, adapter_name, step_params,
code_version).
- ``read_cached_outputs(store, step, step_hash)`` : interroge le
store pour TOUS les output_types du step, retourne None si :
· cache partiel (un output_type manquant) → on relance pour
cohérence ;
· URI cachée pointe vers fichier disparu (workspace nettoyé).
- ``write_outputs_to_cache(store, step, step_hash, outputs)`` :
persiste un Artifact par output_type sous clé composite
``{step_hash}:{type.value}`` — gère les steps multi-outputs sans
étendre l'API du store.

picarones/domain/artifact_key.py (migré)
----------------------------------------
``ArtifactKey`` (type pur, frozen dataclass) appartient au cercle 1
(domain/) — pas à adapters/. Migration verbatim depuis S29.
``picarones.adapters.storage.ArtifactKey`` reste exposé en re-export
pour rétrocompatibilité.

picarones/pipeline/executor.py (modifié)
----------------------------------------
- Nouveau param ``artifact_store: ArtifactCachePort | None = None``.
- Validation isinstance(store, ArtifactCachePort) — duck-typed.
- ``_run_step`` : entre la résolution des inputs et celle de l'adapter,
appelle ``_try_resume_from_cache`` ; si hit, retourne directement
les artefacts cachés avec ``StepResult(succeeded=True,
duration_seconds=0.0)`` SANS appeler l'adapter.
- Après succès du step, appelle ``_persist_to_cache`` pour stocker
les outputs.

Tests S47 dédiés (10 nouveaux)
------------------------------
- TestNoStoreNoRegression : sans store, comportement identique à
l'avant (115 tests pipeline existants passent inchangés).
- TestCacheHit : second run même inputs → adapter pas ré-appelé,
duration=0.0, mêmes artefacts retournés (id + content_hash).
- TestCacheMissOnKeyChange : code_version, step_params, content_hash
des inputs — chaque changement → re-exécution.
- TestCacheMissOnInvalidState : input sans content_hash → bypass
cache complet ; URI cachée vers fichier disparu → re-exécution.
- TestFilesystemStorePersistence : avec FilesystemArtifactStore,
le cache survit à un re-démarrage du process (instance executor
recreated, store recreated, hit le cache de la précédente).

Tests : 4920 passed, 11 skipped (vs 4910 avant : +10 S47).
Lint : ruff check picarones/ tests/ → All checks passed.
File budgets : pipeline/executor.py 475 → 600 (actuel 541, +60
lignes pour le branchement cache).
Layer dependencies : test passe (pipeline/ ne dépend plus de
adapters/, uniquement du Protocol défini dans pipeline/).

Pourquoi ce fix
---------------
La directive *« sans dette technique »* interdisait de livrer du
code mort. ArtifactStore en S29 était exactement ça : 504 lignes de
code testées unitairement mais jamais consommées par le runtime.
Le branchement S47 réalise la promesse initiale.

Le filet *« sans store, comportement inchangé »* (param optionnel
défaut None) garantit que les 115 tests pipeline existants ne sont
pas modifiés — pas de breaking change.

https://claude.ai/code/session_011XQZNitg1rCgia8ZD1a2hP

README.md CHANGED
@@ -396,7 +396,7 @@ ruff check picarones/ tests/
396
  python -m mypy picarones/core/
397
  ```
398
 
399
- **Test suite**: ~4910 tests, ~3 min on a modern laptop. Coverage
400
  floor at 85% (currently ~87%). The `network` marker excludes tests
401
  requiring live HTTP. A handful of tests depend on optional engines
402
  (`pero-ocr`, `pytesseract`) and are skipped/fail gracefully when
 
396
  python -m mypy picarones/core/
397
  ```
398
 
399
+ **Test suite**: ~4940 tests, ~3 min on a modern laptop. Coverage
400
  floor at 85% (currently ~87%). The `network` marker excludes tests
401
  requiring live HTTP. A handful of tests depend on optional engines
402
  (`pero-ocr`, `pytesseract`) and are skipped/fail gracefully when
picarones/adapters/storage/artifact_store.py CHANGED
@@ -42,120 +42,23 @@ Anti-sur-ingénierie
42
 
43
  from __future__ import annotations
44
 
45
- import hashlib
46
  import json
47
  import logging
48
  import threading
49
  from abc import ABC, abstractmethod
50
- from dataclasses import dataclass, field
51
  from pathlib import Path
52
 
 
53
  from picarones.domain.artifacts import Artifact
54
 
55
  logger = logging.getLogger(__name__)
56
 
57
 
58
- # ──────────────────────────────────────────────────────────────────────
59
- # Clé canonique multi-paramètres
60
- # ──────────────────────────────────────────────────────────────────────
61
-
62
-
63
- @dataclass(frozen=True)
64
- class ArtifactKey:
65
- """Composition immuable de tous les paramètres qui déterminent
66
- l'identité d'un artefact dans le store.
67
-
68
- Sérialisable JSON déterministe via ``to_canonical_json``.
69
-
70
- Attributes
71
- ----------
72
- input_hashes:
73
- Tuple ``((type, content_hash), ...)`` des inputs, trié par
74
- type. ``None`` ou vide → la clé n'est pas calculable
75
- (cas d'un input sans content_hash).
76
- adapter_name:
77
- ``step.adapter_name`` (ex : ``"tesseract"``,
78
- ``"openai:gpt-4o"``).
79
- adapter_version:
80
- Version du modèle / binaire de l'adapter. ``None`` si
81
- l'adapter ne sait pas la fournir (warning loggé une fois).
82
- step_params:
83
- Dict ``{name: scalar}`` du step, sérialisé en JSON canonique
84
- (clés triées).
85
- code_version:
86
- Version du code Picarones (cf. ``RunContext.code_version``).
87
- normalization_profile:
88
- Profil de normalisation appliqué en aval (le cas échéant).
89
- Pour les jonctions textuelles avec normalisation.
90
- projection_name:
91
- Nom du projecteur appliqué (le cas échéant).
92
- projection_params:
93
- Params du projecteur (le cas échéant).
94
- metric_version:
95
- Version du module de métriques (rare ; reporté à la phase
96
- où on aura un versioning explicite des métriques).
97
-
98
- Notes
99
- -----
100
- Frozen dataclass : aucune mutation possible. Le hash canonique
101
- est calculé à la demande via ``hash_hex()``.
102
- """
103
-
104
- input_hashes: tuple[tuple[str, str], ...] = field(default_factory=tuple)
105
- adapter_name: str = ""
106
- adapter_version: str | None = None
107
- step_params: dict[str, str | int | float | bool] = field(default_factory=dict)
108
- code_version: str = ""
109
- normalization_profile: str | None = None
110
- projection_name: str | None = None
111
- projection_params: dict[str, str | int | float | bool] = field(
112
- default_factory=dict,
113
- )
114
- metric_version: str | None = None
115
-
116
- def to_canonical_json(self) -> str:
117
- """Sérialise la clé en JSON déterministe.
118
-
119
- - Clés du dict triées (``sort_keys=True``).
120
- - ``ensure_ascii=False`` pour préserver l'Unicode brut.
121
- - Séparateurs compacts pour minimiser les variations de
122
- whitespace entre OS.
123
- """
124
- # Trier les input_hashes par type pour déterminisme
125
- # cross-platform (les Python du même version trient les
126
- # tuples par leur premier élément, mais on l'explicite).
127
- sorted_inputs = sorted(self.input_hashes)
128
- payload = {
129
- "inputs": sorted_inputs,
130
- "adapter": self.adapter_name,
131
- "adapter_version": self.adapter_version,
132
- "step_params": self.step_params,
133
- "code_version": self.code_version,
134
- "normalization_profile": self.normalization_profile,
135
- "projection_name": self.projection_name,
136
- "projection_params": self.projection_params,
137
- "metric_version": self.metric_version,
138
- }
139
- return json.dumps(
140
- payload,
141
- sort_keys=True,
142
- ensure_ascii=False,
143
- separators=(",", ":"),
144
- )
145
-
146
- def hash_hex(self) -> str | None:
147
- """Calcule la clé hex SHA-256 (64 chars).
148
-
149
- Retourne ``None`` si **un seul** ``input_hash`` est ``None``
150
- ou vide — convention « ne pas servir un résultat douteux ».
151
- Les autres champs peuvent être ``None`` (ils sont sérialisés
152
- comme ``null`` dans le JSON canonique → entrent dans le hash).
153
- """
154
- for _, h in self.input_hashes:
155
- if h is None or h == "":
156
- return None
157
- canonical = self.to_canonical_json()
158
- return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
159
 
160
 
161
  # ────────────────────────────────���─────────────────────────────────────
 
42
 
43
  from __future__ import annotations
44
 
 
45
  import json
46
  import logging
47
  import threading
48
  from abc import ABC, abstractmethod
49
+ from dataclasses import dataclass
50
  from pathlib import Path
51
 
52
+ from picarones.domain.artifact_key import ArtifactKey
53
  from picarones.domain.artifacts import Artifact
54
 
55
  logger = logging.getLogger(__name__)
56
 
57
 
58
+ # Sprint A14-S47 — ``ArtifactKey`` (type pur) a migré dans
59
+ # ``picarones/domain/artifact_key.py``. Re-import ici pour ne pas
60
+ # casser les callers (``from picarones.adapters.storage import
61
+ # ArtifactKey`` reste valide).
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62
 
63
 
64
  # ────────────────────────────────���─────────────────────────────────────
picarones/domain/__init__.py CHANGED
@@ -43,6 +43,7 @@ Voir ``docs/roadmap/rewrite-2026.md`` pour le plan complet.
43
 
44
  from __future__ import annotations
45
 
 
46
  from picarones.domain.artifacts import Artifact, ArtifactType, compute_content_hash
47
  from picarones.domain.corpus import CorpusSpec
48
  from picarones.domain.documents import DocumentRef, GroundTruthRef
@@ -76,6 +77,8 @@ __all__ = [
76
  "Artifact",
77
  "ArtifactType",
78
  "compute_content_hash",
 
 
79
  # S4 — Corpus + documents
80
  "CorpusSpec",
81
  "DocumentRef",
 
43
 
44
  from __future__ import annotations
45
 
46
+ from picarones.domain.artifact_key import ArtifactKey
47
  from picarones.domain.artifacts import Artifact, ArtifactType, compute_content_hash
48
  from picarones.domain.corpus import CorpusSpec
49
  from picarones.domain.documents import DocumentRef, GroundTruthRef
 
77
  "Artifact",
78
  "ArtifactType",
79
  "compute_content_hash",
80
+ # S29/S47 — ArtifactKey (clé canonique multi-paramètres pour cache)
81
+ "ArtifactKey",
82
  # S4 — Corpus + documents
83
  "CorpusSpec",
84
  "DocumentRef",
picarones/domain/artifact_key.py ADDED
@@ -0,0 +1,132 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """``ArtifactKey`` — Sprint A14-S29, migré dans ``domain/`` au S47.
2
+
3
+ Le S29 livrait ``ArtifactKey`` dans ``picarones/adapters/storage/``
4
+ avec le store qui le consomme. Au S47 (branchement du store dans
5
+ ``PipelineExecutor``), on découvre que ``ArtifactKey`` est un type
6
+ **pur** (dataclass frozen, méthodes de sérialisation déterministe,
7
+ calcul de hash) — il appartient au cercle 1 (``domain/``).
8
+
9
+ Migration : ``ArtifactKey`` vit désormais ici.
10
+ ``picarones.adapters.storage.ArtifactKey`` reste exposé en re-export
11
+ (alias de chemin pur, pas un shim).
12
+
13
+ Pourquoi cette migration
14
+ ------------------------
15
+ La couche ``pipeline/`` doit pouvoir calculer une clé pour interroger
16
+ le cache (cf. ``pipeline/cache_helpers.py``), mais ne peut pas
17
+ importer depuis ``adapters/`` (couche plus externe). L'inversion
18
+ de dépendance demandait un Protocol. Plus simple et plus correct :
19
+ constater que ``ArtifactKey`` est un type domaine et le placer dans
20
+ le bon cercle.
21
+
22
+ ``StoredArtifact``, ``ArtifactStore`` (ABC), ``InMemoryArtifactStore``,
23
+ ``FilesystemArtifactStore`` restent dans ``adapters/storage/`` — ce
24
+ sont des infrastructures, pas des types purs.
25
+ """
26
+
27
+ from __future__ import annotations
28
+
29
+ import hashlib
30
+ import json
31
+ from dataclasses import dataclass, field
32
+
33
+
34
+ @dataclass(frozen=True)
35
+ class ArtifactKey:
36
+ """Composition immuable de tous les paramètres qui déterminent
37
+ l'identité d'un artefact dans le store.
38
+
39
+ Sérialisable JSON déterministe via ``to_canonical_json``.
40
+
41
+ Attributes
42
+ ----------
43
+ input_hashes:
44
+ Tuple ``((type, content_hash), ...)`` des inputs, trié par
45
+ type. ``None`` ou vide → la clé n'est pas calculable
46
+ (cas d'un input sans content_hash).
47
+ adapter_name:
48
+ ``step.adapter_name`` (ex : ``"tesseract"``,
49
+ ``"openai:gpt-4o"``).
50
+ adapter_version:
51
+ Version du modèle / binaire de l'adapter. ``None`` si
52
+ l'adapter ne sait pas la fournir (warning loggé une fois).
53
+ step_params:
54
+ Dict ``{name: scalar}`` du step, sérialisé en JSON canonique
55
+ (clés triées).
56
+ code_version:
57
+ Version du code Picarones (cf. ``RunContext.code_version``).
58
+ normalization_profile:
59
+ Profil de normalisation appliqué en aval (le cas échéant).
60
+ Pour les jonctions textuelles avec normalisation.
61
+ projection_name:
62
+ Nom du projecteur appliqué (le cas échéant).
63
+ projection_params:
64
+ Params du projecteur (le cas échéant).
65
+ metric_version:
66
+ Version du module de métriques (rare ; reporté à la phase
67
+ où on aura un versioning explicite des métriques).
68
+
69
+ Notes
70
+ -----
71
+ Frozen dataclass : aucune mutation possible. Le hash canonique
72
+ est calculé à la demande via ``hash_hex()``.
73
+ """
74
+
75
+ input_hashes: tuple[tuple[str, str], ...] = field(default_factory=tuple)
76
+ adapter_name: str = ""
77
+ adapter_version: str | None = None
78
+ step_params: dict[str, str | int | float | bool] = field(default_factory=dict)
79
+ code_version: str = ""
80
+ normalization_profile: str | None = None
81
+ projection_name: str | None = None
82
+ projection_params: dict[str, str | int | float | bool] = field(
83
+ default_factory=dict,
84
+ )
85
+ metric_version: str | None = None
86
+
87
+ def to_canonical_json(self) -> str:
88
+ """Sérialise la clé en JSON déterministe.
89
+
90
+ - Clés du dict triées (``sort_keys=True``).
91
+ - ``ensure_ascii=False`` pour préserver l'Unicode brut.
92
+ - Séparateurs compacts pour minimiser les variations de
93
+ whitespace entre OS.
94
+ """
95
+ # Trier les input_hashes par type pour déterminisme
96
+ # cross-platform (les Python du même version trient les
97
+ # tuples par leur premier élément, mais on l'explicite).
98
+ sorted_inputs = sorted(self.input_hashes)
99
+ payload = {
100
+ "inputs": sorted_inputs,
101
+ "adapter": self.adapter_name,
102
+ "adapter_version": self.adapter_version,
103
+ "step_params": self.step_params,
104
+ "code_version": self.code_version,
105
+ "normalization_profile": self.normalization_profile,
106
+ "projection_name": self.projection_name,
107
+ "projection_params": self.projection_params,
108
+ "metric_version": self.metric_version,
109
+ }
110
+ return json.dumps(
111
+ payload,
112
+ sort_keys=True,
113
+ ensure_ascii=False,
114
+ separators=(",", ":"),
115
+ )
116
+
117
+ def hash_hex(self) -> str | None:
118
+ """Calcule la clé hex SHA-256 (64 chars).
119
+
120
+ Retourne ``None`` si **un seul** ``input_hash`` est ``None``
121
+ ou vide — convention « ne pas servir un résultat douteux ».
122
+ Les autres champs peuvent être ``None`` (ils sont sérialisés
123
+ comme ``null`` dans le JSON canonique → entrent dans le hash).
124
+ """
125
+ for _, h in self.input_hashes:
126
+ if h is None or h == "":
127
+ return None
128
+ canonical = self.to_canonical_json()
129
+ return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
130
+
131
+
132
+ __all__ = ["ArtifactKey"]
picarones/pipeline/cache_helpers.py ADDED
@@ -0,0 +1,179 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Helpers de cache d'artefacts pour le ``PipelineExecutor`` — Sprint A14-S47.
2
+
3
+ Fix de l'audit #1 du rewrite ciblé : avant ce sprint,
4
+ ``picarones/adapters/storage/artifact_store.py`` (S29) existait sans
5
+ être consommé par aucun runtime — promesse de « reprise par hash »
6
+ non tenue.
7
+
8
+ Ce module fournit les **fonctions pures** qui transforment un
9
+ ``(PipelineStep, inputs, RunContext)`` en ``ArtifactKey`` et en clés
10
+ de stockage par output_type, pour que le ``PipelineExecutor`` puisse :
11
+
12
+ 1. Avant d'exécuter un step : calculer la clé, interroger le store,
13
+ et si toutes les sorties attendues sont présentes ET valides,
14
+ sauter l'exécution en retournant les artefacts cachés.
15
+ 2. Après une exécution réussie : persister chaque output dans le store
16
+ sous une clé dérivée.
17
+
18
+ Stratégie de clé multi-output
19
+ -----------------------------
20
+ Un ``PipelineStep`` peut produire plusieurs ``ArtifactType``.
21
+ ``ArtifactStore.put/get`` opère sur **un** Artifact à la fois. Pour
22
+ gérer cela sans étendre l'API du store, on dérive une **clé composite**
23
+ par output_type :
24
+
25
+ ::
26
+
27
+ store_key = f"{step_hash}:{output_type.value}"
28
+
29
+ où ``step_hash`` est ``ArtifactKey(...).hash_hex()`` qui dépend des
30
+ inputs, du step et du code_version. À la lecture, on demande au store
31
+ toutes les clés ``{step_hash}:<type>`` pour les ``output_types`` du
32
+ step ; si une seule manque, c'est un miss complet (cache partiel
33
+ n'est pas exploitable — on relance le step pour cohérence).
34
+
35
+ Pas de stockage du payload bytes
36
+ --------------------------------
37
+ On stocke uniquement les **métadonnées** ``Artifact`` (id, type,
38
+ content_hash, uri, provenance). Le payload (texte, ALTO XML, image)
39
+ reste sur le filesystem au chemin pointé par ``Artifact.uri``.
40
+
41
+ Conséquence : si le workspace a été nettoyé entre deux runs, l'URI
42
+ cachée pointe vers un fichier disparu → cache miss (la fonction
43
+ ``read_cached_outputs`` vérifie l'existence des URIs). C'est le
44
+ comportement attendu : le store est un **cache**, pas une source de
45
+ vérité du contenu.
46
+
47
+ Anti-sur-ingénierie
48
+ -------------------
49
+ - Pas de TTL, pas d'éviction LRU. Le caller appelle ``store.clear()``
50
+ s'il veut forcer un re-run complet.
51
+ - Pas de support des artefacts inline (sans URI). Si un step produit
52
+ un artefact dont le contenu vit en RAM seulement, le cache est
53
+ inopérant — c'est documenté.
54
+ """
55
+
56
+ from __future__ import annotations
57
+
58
+ import logging
59
+ from pathlib import Path
60
+ from typing import TYPE_CHECKING
61
+
62
+ from picarones.domain.artifact_key import ArtifactKey
63
+ from picarones.domain.artifacts import Artifact, ArtifactType
64
+ from picarones.pipeline.cache_protocol import ArtifactCachePort
65
+
66
+ if TYPE_CHECKING:
67
+ from picarones.pipeline.spec import PipelineStep
68
+ from picarones.pipeline.types import RunContext
69
+
70
+ logger = logging.getLogger(__name__)
71
+
72
+
73
+ def compute_step_artifact_key(
74
+ step: "PipelineStep",
75
+ inputs: dict[ArtifactType, Artifact],
76
+ context: "RunContext",
77
+ ) -> ArtifactKey:
78
+ """Calcule la ``ArtifactKey`` d'un step pour le cache d'artefacts.
79
+
80
+ La clé combine :
81
+
82
+ - les ``content_hash`` des inputs (triés par type pour
83
+ déterminisme — délégué à ``ArtifactKey.to_canonical_json``) ;
84
+ - ``step.adapter_name`` ;
85
+ - ``step.params`` (dict scalaire) ;
86
+ - ``context.code_version``.
87
+
88
+ Les autres champs de ``ArtifactKey`` (normalization_profile,
89
+ projection_name, metric_version) restent ``None`` — ils sont
90
+ spécifiques aux jonctions d'évaluation, pas aux steps de pipeline.
91
+
92
+ La clé peut retourner ``None`` à ``hash_hex()`` si **un seul**
93
+ input n'a pas de ``content_hash`` (cf. la convention « ne pas
94
+ servir un résultat douteux » d'``ArtifactKey``). Le caller doit
95
+ tester ``key.hash_hex() is None`` avant d'utiliser la clé.
96
+ """
97
+ input_hashes: tuple[tuple[str, str], ...] = tuple(
98
+ (art_type.value, artifact.content_hash or "")
99
+ for art_type, artifact in inputs.items()
100
+ )
101
+ return ArtifactKey(
102
+ input_hashes=input_hashes,
103
+ adapter_name=step.adapter_name,
104
+ adapter_version=None, # adapters ne déclarent pas (encore) de version
105
+ step_params=dict(step.params),
106
+ code_version=context.code_version,
107
+ )
108
+
109
+
110
+ def storage_key_for_output(step_hash: str, output_type: ArtifactType) -> str:
111
+ """Construit la clé de stockage composite pour un output donné."""
112
+ return f"{step_hash}:{output_type.value}"
113
+
114
+
115
+ def read_cached_outputs(
116
+ store: ArtifactCachePort,
117
+ step: "PipelineStep",
118
+ step_hash: str,
119
+ ) -> dict[ArtifactType, Artifact] | None:
120
+ """Tente de lire les outputs cachés d'un step.
121
+
122
+ Retourne ``None`` si :
123
+
124
+ - une seule sortie attendue n'est pas dans le store
125
+ (cache partiel) ;
126
+ - une URI cachée pointe vers un fichier disparu
127
+ (cache orphelin).
128
+
129
+ Sinon, retourne le dict ``{output_type: Artifact}`` complet,
130
+ prêt à être réinjecté dans le bag du runner.
131
+ """
132
+ cached: dict[ArtifactType, Artifact] = {}
133
+ for output_type in step.output_types:
134
+ store_key = storage_key_for_output(step_hash, output_type)
135
+ stored = store.get(store_key)
136
+ if stored is None:
137
+ logger.debug(
138
+ "[cache] miss partiel sur step %r : %s manquant.",
139
+ step.id, output_type.value,
140
+ )
141
+ return None
142
+ # Vérifie que l'URI cachée pointe vers un fichier qui existe
143
+ # encore. Sinon, le payload a disparu (workspace nettoyé,
144
+ # mount débranché, etc.) — on doit re-exécuter.
145
+ if stored.artifact.uri is not None:
146
+ uri_path = Path(stored.artifact.uri)
147
+ if not uri_path.exists():
148
+ logger.debug(
149
+ "[cache] orphelin sur step %r : URI %s disparu.",
150
+ step.id, uri_path,
151
+ )
152
+ return None
153
+ cached[output_type] = stored.artifact
154
+ return cached
155
+
156
+
157
+ def write_outputs_to_cache(
158
+ store: ArtifactCachePort,
159
+ step: "PipelineStep",
160
+ step_hash: str,
161
+ outputs: dict[ArtifactType, Artifact],
162
+ ) -> None:
163
+ """Persiste tous les outputs d'un step réussi dans le store.
164
+
165
+ Idempotent : ``store.put`` écrase silencieusement une entrée
166
+ existante (cf. la convention de ``InMemoryArtifactStore`` et
167
+ ``FilesystemArtifactStore``).
168
+ """
169
+ for output_type, artifact in outputs.items():
170
+ store_key = storage_key_for_output(step_hash, output_type)
171
+ store.put(store_key, artifact, payload=None)
172
+
173
+
174
+ __all__ = [
175
+ "compute_step_artifact_key",
176
+ "read_cached_outputs",
177
+ "storage_key_for_output",
178
+ "write_outputs_to_cache",
179
+ ]
picarones/pipeline/cache_protocol.py ADDED
@@ -0,0 +1,85 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """``ArtifactCachePort`` — port (Protocol) consommé par ``PipelineExecutor``.
2
+
3
+ Sprint A14-S47 — inversion de dépendance pour le branchement
4
+ ``ArtifactStore`` dans le pipeline.
5
+
6
+ Pourquoi ce Protocol
7
+ --------------------
8
+ La couche ``pipeline/`` est plus interne que ``adapters/`` dans la
9
+ hiérarchie documentée du rewrite (``domain → formats → evaluation
10
+ → pipeline → adapters → app → reports_v2 → interfaces``). Importer
11
+ depuis ``adapters/`` dans ``pipeline/`` violerait la règle de
12
+ dépendance.
13
+
14
+ On applique l'inversion de dépendance (pattern hexagonal /
15
+ ports-and-adapters) :
16
+
17
+ - ``pipeline/`` définit le **port** ``ArtifactCachePort`` (ce
18
+ module) — ce que le pipeline a besoin de consommer.
19
+ - ``adapters/storage/artifact_store.ArtifactStore`` (S29) est
20
+ l'**adapter** qui satisfait ce port par duck typing.
21
+ - Toute autre implémentation tierce (Redis, S3, GCS, ...) qui
22
+ implémente ces 5 méthodes est compatible.
23
+
24
+ Convention duck typing
25
+ ----------------------
26
+ ``StoredArtifact`` est aussi exposé comme Protocol minimal pour
27
+ éviter d'importer la dataclass concrète depuis ``adapters/``.
28
+ Les implémentations réelles fournissent une dataclass plus riche ;
29
+ ``pipeline/`` ne consomme que ``stored.artifact`` et
30
+ ``stored.artifact.uri``.
31
+ """
32
+
33
+ from __future__ import annotations
34
+
35
+ from typing import Protocol, runtime_checkable
36
+
37
+ from picarones.domain.artifacts import Artifact
38
+
39
+
40
+ @runtime_checkable
41
+ class CachedArtifactRef(Protocol):
42
+ """Port minimal consommé par ``read_cached_outputs``.
43
+
44
+ Les implémentations concrètes peuvent porter des champs
45
+ supplémentaires (``payload``, ``key``, …) ; ``pipeline/``
46
+ n'utilise que l'``Artifact`` reconstitué.
47
+ """
48
+
49
+ @property
50
+ def artifact(self) -> Artifact: # pragma: no cover — Protocol
51
+ ...
52
+
53
+
54
+ @runtime_checkable
55
+ class ArtifactCachePort(Protocol):
56
+ """Contrat minimal d'un cache d'artefacts consommable par
57
+ ``PipelineExecutor`` pour la reprise par hash.
58
+
59
+ Les méthodes correspondent **exactement** à l'API publique de
60
+ ``ArtifactStore`` (S29) — ``ArtifactStore`` est donc compatible
61
+ par duck typing sans rien changer.
62
+
63
+ Pas d'``isinstance(store, ArtifactCachePort)`` requis : Python
64
+ type-checke à l'usage (les méthodes manquantes lèvent
65
+ ``AttributeError`` au runtime). Le ``@runtime_checkable``
66
+ autorise un test ``isinstance`` côté caller s'il veut une
67
+ validation explicite.
68
+ """
69
+
70
+ def get(self, key: str) -> CachedArtifactRef | None: # pragma: no cover
71
+ ...
72
+
73
+ def put(
74
+ self,
75
+ key: str,
76
+ artifact: Artifact,
77
+ payload: bytes | None = None,
78
+ ) -> None: # pragma: no cover
79
+ ...
80
+
81
+ def __contains__(self, key: str) -> bool: # pragma: no cover
82
+ ...
83
+
84
+
85
+ __all__ = ["ArtifactCachePort", "CachedArtifactRef"]
picarones/pipeline/executor.py CHANGED
@@ -68,6 +68,12 @@ from typing import Callable
68
  from picarones.domain.artifacts import Artifact, ArtifactType
69
  from picarones.domain.documents import DocumentRef
70
  from picarones.domain.errors import PicaronesError
 
 
 
 
 
 
71
  from picarones.pipeline.planner import (
72
  ExecutionPlan,
73
  PipelinePlanner,
@@ -113,12 +119,30 @@ class PipelineExecutor:
113
  ``StepExecutor``. Typiquement
114
  ``lambda name: registry[name]`` en test, ou un service
115
  applicatif qui injecte les bonnes dépendances en prod.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
116
  """
117
 
118
  def __init__(
119
  self,
120
  adapter_resolver: AdapterResolver,
121
  planner: PipelinePlanner | None = None,
 
122
  ) -> None:
123
  if not callable(adapter_resolver):
124
  raise PicaronesError(
@@ -128,10 +152,24 @@ class PipelineExecutor:
128
  raise PicaronesError(
129
  "PipelineExecutor : planner doit être un PipelinePlanner ou None."
130
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
131
  self._resolver = adapter_resolver
132
  # Si pas de planner injecté, on en fabrique un sans MetricRegistry —
133
  # les jonctions seront vides mais la planification reste correcte.
134
  self._planner = planner if planner is not None else PipelinePlanner()
 
135
 
136
  def plan(self, spec: PipelineSpec) -> ExecutionPlan:
137
  """Planifie une ``PipelineSpec`` en ``ExecutionPlan``.
@@ -286,6 +324,35 @@ class PipelineExecutor:
286
  {},
287
  )
288
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
289
  # 2. Résoudre l'adapter.
290
  try:
291
  adapter = self._resolver(step.adapter_name)
@@ -355,6 +422,13 @@ class PipelineExecutor:
355
  )
356
 
357
  # 5. Succès.
 
 
 
 
 
 
 
358
  produced_map = {
359
  t.value: a.id for t, a in outputs.items()
360
  }
@@ -368,6 +442,66 @@ class PipelineExecutor:
368
  outputs,
369
  )
370
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
371
  def _inputs_from_bindings(
372
  self,
373
  *,
 
68
  from picarones.domain.artifacts import Artifact, ArtifactType
69
  from picarones.domain.documents import DocumentRef
70
  from picarones.domain.errors import PicaronesError
71
+ from picarones.pipeline.cache_helpers import (
72
+ compute_step_artifact_key,
73
+ read_cached_outputs,
74
+ write_outputs_to_cache,
75
+ )
76
+ from picarones.pipeline.cache_protocol import ArtifactCachePort
77
  from picarones.pipeline.planner import (
78
  ExecutionPlan,
79
  PipelinePlanner,
 
119
  ``StepExecutor``. Typiquement
120
  ``lambda name: registry[name]`` en test, ou un service
121
  applicatif qui injecte les bonnes dépendances en prod.
122
+ planner:
123
+ ``PipelinePlanner`` injecté (S28). Si ``None``, un planner
124
+ par défaut sans ``MetricRegistry`` est instancié.
125
+ artifact_store:
126
+ ``ArtifactStore`` optionnel (S29 + S47) pour la **reprise par
127
+ hash**. Si fourni, l'executor :
128
+
129
+ - **avant** chaque step, calcule la clé du step via
130
+ ``compute_step_artifact_key`` et interroge le store ; si
131
+ toutes les sorties attendues sont présentes ET valides
132
+ (URIs accessibles), saute l'exécution et retourne les
133
+ artefacts cachés (``StepResult.duration_seconds=0.0``) ;
134
+ - **après** chaque step réussi, persiste les outputs dans
135
+ le store sous la clé dérivée.
136
+
137
+ Si ``None`` (défaut), aucun cache n'est consulté ni écrit.
138
+ Le comportement est strictement identique à l'avant-S47.
139
  """
140
 
141
  def __init__(
142
  self,
143
  adapter_resolver: AdapterResolver,
144
  planner: PipelinePlanner | None = None,
145
+ artifact_store: ArtifactCachePort | None = None,
146
  ) -> None:
147
  if not callable(adapter_resolver):
148
  raise PicaronesError(
 
152
  raise PicaronesError(
153
  "PipelineExecutor : planner doit être un PipelinePlanner ou None."
154
  )
155
+ # ``isinstance(artifact_store, ArtifactCachePort)`` est un duck
156
+ # typing check (Protocol @runtime_checkable) — valide get/put/
157
+ # __contains__ par leur seule présence. Permet à un caller
158
+ # tiers (Redis, S3) de fournir un store custom satisfaisant
159
+ # le protocol sans hériter de la classe ABC ``ArtifactStore``.
160
+ if artifact_store is not None and not isinstance(
161
+ artifact_store, ArtifactCachePort,
162
+ ):
163
+ raise PicaronesError(
164
+ "PipelineExecutor : artifact_store doit satisfaire le "
165
+ "protocole ArtifactCachePort (get / put / __contains__) "
166
+ "ou être None.",
167
+ )
168
  self._resolver = adapter_resolver
169
  # Si pas de planner injecté, on en fabrique un sans MetricRegistry —
170
  # les jonctions seront vides mais la planification reste correcte.
171
  self._planner = planner if planner is not None else PipelinePlanner()
172
+ self._artifact_store = artifact_store
173
 
174
  def plan(self, spec: PipelineSpec) -> ExecutionPlan:
175
  """Planifie une ``PipelineSpec`` en ``ExecutionPlan``.
 
324
  {},
325
  )
326
 
327
+ # 1bis. S47 — Reprise par hash via ArtifactStore.
328
+ # Si un store est injecté et que tous les inputs ont un
329
+ # ``content_hash``, on calcule la clé du step et on interroge
330
+ # le store. Hit complet → on saute l'exécution (durée 0,
331
+ # même artefacts que la dernière exécution réussie). Miss
332
+ # ou cache partiel → on tombe dans l'exécution normale.
333
+ if self._artifact_store is not None:
334
+ cached_outputs = self._try_resume_from_cache(
335
+ step=step, inputs=inputs, context=context,
336
+ )
337
+ if cached_outputs is not None:
338
+ logger.info(
339
+ "[pipeline:%s] step '%s' : hit cache "
340
+ "(reprise par hash, exécution sautée).",
341
+ context.pipeline_name, step.id,
342
+ )
343
+ return (
344
+ StepResult(
345
+ step_id=step.id,
346
+ succeeded=True,
347
+ duration_seconds=0.0,
348
+ produced_artifacts={
349
+ t.value: a.id
350
+ for t, a in cached_outputs.items()
351
+ },
352
+ ),
353
+ cached_outputs,
354
+ )
355
+
356
  # 2. Résoudre l'adapter.
357
  try:
358
  adapter = self._resolver(step.adapter_name)
 
422
  )
423
 
424
  # 5. Succès.
425
+ # S47 — persiste les outputs dans le store si fourni. La
426
+ # méthode interne sait gérer le cas content_hash manquant
427
+ # (skip silencieux) — on lui passe la responsabilité.
428
+ if self._artifact_store is not None:
429
+ self._persist_to_cache(
430
+ step=step, inputs=inputs, context=context, outputs=outputs,
431
+ )
432
  produced_map = {
433
  t.value: a.id for t, a in outputs.items()
434
  }
 
442
  outputs,
443
  )
444
 
445
+ # ──────────────────────────────────────────────────────────────────
446
+ # S47 — Reprise par hash via ArtifactStore
447
+ # ──────────────────────────────────────────────────────────────────
448
+
449
+ def _try_resume_from_cache(
450
+ self,
451
+ *,
452
+ step,
453
+ inputs: dict[ArtifactType, Artifact],
454
+ context: RunContext,
455
+ ) -> dict[ArtifactType, Artifact] | None:
456
+ """Tente de retrouver les outputs cachés du step.
457
+
458
+ Retourne ``None`` (cache miss) dans 3 cas :
459
+
460
+ 1. Un input n'a pas de ``content_hash`` → la clé n'est pas
461
+ calculable (cf. ``ArtifactKey.hash_hex``).
462
+ 2. Le store ne contient pas TOUS les ``output_types`` du step.
463
+ 3. Une URI cachée pointe vers un fichier qui n'existe plus.
464
+ """
465
+ # Nécessairement non-None ici (vérifié par le caller), mais on
466
+ # défend en profondeur.
467
+ if self._artifact_store is None:
468
+ return None
469
+ key = compute_step_artifact_key(step, inputs, context)
470
+ step_hash = key.hash_hex()
471
+ if step_hash is None:
472
+ return None
473
+ return read_cached_outputs(
474
+ store=self._artifact_store,
475
+ step=step,
476
+ step_hash=step_hash,
477
+ )
478
+
479
+ def _persist_to_cache(
480
+ self,
481
+ *,
482
+ step,
483
+ inputs: dict[ArtifactType, Artifact],
484
+ context: RunContext,
485
+ outputs: dict[ArtifactType, Artifact],
486
+ ) -> None:
487
+ """Persiste les outputs d'un step réussi dans le store.
488
+
489
+ Skip silencieux si la clé n'est pas calculable (un input sans
490
+ ``content_hash``).
491
+ """
492
+ if self._artifact_store is None:
493
+ return
494
+ key = compute_step_artifact_key(step, inputs, context)
495
+ step_hash = key.hash_hex()
496
+ if step_hash is None:
497
+ return
498
+ write_outputs_to_cache(
499
+ store=self._artifact_store,
500
+ step=step,
501
+ step_hash=step_hash,
502
+ outputs=outputs,
503
+ )
504
+
505
  def _inputs_from_bindings(
506
  self,
507
  *,
tests/architecture/test_file_budgets.py CHANGED
@@ -82,7 +82,9 @@ FILE_BUDGETS: dict[str, int] = {
82
  # ExecutionPlan (run_plan) tout en gardant run(spec) comme sucre.
83
  # PipelinePlanner introduit pour transformer une PipelineSpec en
84
  # plan immuable (validation + bindings + jonctions de métriques).
85
- "picarones/pipeline/executor.py": 475, # actuel 413
 
 
86
  "picarones/pipeline/planner.py": 465, # actuel 403
87
  # Sprint A14-S29 — ArtifactStore (ABC + 2 implémentations) avec
88
  # hash multi-paramètres pour adresser la critique d'audit n° 14
 
82
  # ExecutionPlan (run_plan) tout en gardant run(spec) comme sucre.
83
  # PipelinePlanner introduit pour transformer une PipelineSpec en
84
  # plan immuable (validation + bindings + jonctions de métriques).
85
+ # Sprint A14-S47 — branchement ArtifactStore : +60 lignes (lookup
86
+ # cache avant exec, persistance après succès, helpers privés).
87
+ "picarones/pipeline/executor.py": 600, # actuel 541
88
  "picarones/pipeline/planner.py": 465, # actuel 403
89
  # Sprint A14-S29 — ArtifactStore (ABC + 2 implémentations) avec
90
  # hash multi-paramètres pour adresser la critique d'audit n° 14
tests/pipeline/test_sprint_a14_s47_artifact_store_resume.py ADDED
@@ -0,0 +1,451 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Sprint A14-S47 — branchement ``ArtifactStore`` dans ``PipelineExecutor``.
2
+
3
+ Fix de l'audit #1 : avant ce sprint, ``ArtifactStore`` (S29) était
4
+ livré comme module standalone sans consommateur runtime — la promesse
5
+ de « reprise par hash » n'était pas tenue.
6
+
7
+ Tests vérifient :
8
+
9
+ 1. Sans ``artifact_store`` injecté : comportement identique à l'avant
10
+ (pas de régression sur les 115 tests existants).
11
+ 2. Avec store : premier run → exécution normale + persistance.
12
+ 3. Avec store : second run même inputs+spec+code_version → cache hit,
13
+ ``StepResult.duration_seconds=0.0``, adapter NON appelé.
14
+ 4. Cache miss si un seul ``content_hash`` manque sur les inputs.
15
+ 5. Cache miss si un output_type promis n'est pas dans le store
16
+ (cache partiel rejeté).
17
+ 6. Cache miss si une URI cachée pointe vers un fichier disparu
18
+ (cache orphelin → re-run).
19
+ 7. Cache miss si ``code_version`` change (key change).
20
+ 8. Cache miss si ``step.params`` change.
21
+ 9. Cache hit ne re-exécute PAS l'adapter (vérifie via spy).
22
+ """
23
+
24
+ from __future__ import annotations
25
+
26
+ from pathlib import Path
27
+
28
+ import pytest
29
+
30
+ from picarones.adapters.storage import (
31
+ FilesystemArtifactStore,
32
+ InMemoryArtifactStore,
33
+ )
34
+ from picarones.domain.artifacts import Artifact, ArtifactType
35
+ from picarones.domain.documents import DocumentRef
36
+ from picarones.pipeline.executor import PipelineExecutor
37
+ from picarones.pipeline.spec import PipelineSpec, PipelineStep
38
+ from picarones.pipeline.types import RunContext
39
+
40
+
41
+ # ──────────────────────────────────────────────────────────────────────
42
+ # Adapter de test : compte ses appels et écrit un fichier déterministe
43
+ # ──────────────────────────────────────────────────────────────────────
44
+
45
+
46
+ class _CountingOCRAdapter:
47
+ """Stub OCR qui produit RAW_TEXT et compte ses exécutions.
48
+
49
+ Écrit le texte sur disque (URI valide) pour que le check
50
+ ``read_cached_outputs`` (vérification existence URI) trouve le
51
+ fichier.
52
+ """
53
+
54
+ name = "counting_ocr"
55
+ input_types = frozenset({ArtifactType.IMAGE})
56
+ output_types = frozenset({ArtifactType.RAW_TEXT})
57
+ execution_mode = "io"
58
+
59
+ def __init__(self, output_dir: Path, response_text: str = "hello") -> None:
60
+ self.output_dir = output_dir
61
+ self.response_text = response_text
62
+ self.call_count = 0
63
+
64
+ def execute(self, inputs, params, context):
65
+ self.call_count += 1
66
+ out_path = self.output_dir / f"{context.document_id}.txt"
67
+ out_path.write_text(self.response_text, encoding="utf-8")
68
+ return {
69
+ ArtifactType.RAW_TEXT: Artifact(
70
+ id=f"{context.document_id}:{self.name}:raw_text",
71
+ document_id=context.document_id,
72
+ type=ArtifactType.RAW_TEXT,
73
+ content_hash="b" * 64,
74
+ produced_by_step="ocr",
75
+ uri=str(out_path),
76
+ ),
77
+ }
78
+
79
+
80
+ def _make_spec() -> PipelineSpec:
81
+ return PipelineSpec(
82
+ name="cache_test",
83
+ initial_inputs=(ArtifactType.IMAGE,),
84
+ steps=(
85
+ PipelineStep(
86
+ id="ocr",
87
+ kind="ocr",
88
+ adapter_name="counting_ocr",
89
+ input_types=(ArtifactType.IMAGE,),
90
+ output_types=(ArtifactType.RAW_TEXT,),
91
+ ),
92
+ ),
93
+ )
94
+
95
+
96
+ def _make_initial_inputs(image_uri: str = "/tmp/img.png") -> dict:
97
+ return {
98
+ ArtifactType.IMAGE: Artifact(
99
+ id="d1:image",
100
+ document_id="d1",
101
+ type=ArtifactType.IMAGE,
102
+ content_hash="a" * 64,
103
+ uri=image_uri,
104
+ ),
105
+ }
106
+
107
+
108
+ def _make_context(code_version: str = "1.0.0") -> RunContext:
109
+ return RunContext(
110
+ document_id="d1",
111
+ code_version=code_version,
112
+ pipeline_name="cache_test",
113
+ )
114
+
115
+
116
+ # ──────────────────────────────────────────────────────────────────────
117
+ # Comportement par défaut (sans store) — pas de régression
118
+ # ──────────────────────────────────────────────────────────────────────
119
+
120
+
121
+ class TestNoStoreNoRegression:
122
+ def test_executor_works_without_store(self, tmp_path: Path) -> None:
123
+ adapter = _CountingOCRAdapter(tmp_path)
124
+ executor = PipelineExecutor(adapter_resolver=lambda n: adapter)
125
+ # Pas d'artifact_store → comportement identique à l'avant-S47.
126
+ result = executor.run(
127
+ spec=_make_spec(),
128
+ document=DocumentRef(id="d1"),
129
+ initial_inputs=_make_initial_inputs(),
130
+ context=_make_context(),
131
+ )
132
+ assert result.succeeded
133
+ assert adapter.call_count == 1
134
+
135
+ def test_rejects_non_store_in_constructor(self) -> None:
136
+ from picarones.domain.errors import PicaronesError
137
+ with pytest.raises(PicaronesError, match="artifact_store"):
138
+ PipelineExecutor(
139
+ adapter_resolver=lambda n: None,
140
+ artifact_store="not a store", # type: ignore[arg-type]
141
+ )
142
+
143
+
144
+ # ──────────────────────────────────────────────────────────────────────
145
+ # Cache hit — second run avec mêmes inputs+spec+code_version
146
+ # ──────────────────────────────────────────────────────────────────────
147
+
148
+
149
+ class TestCacheHit:
150
+ def test_second_run_hits_cache(self, tmp_path: Path) -> None:
151
+ adapter = _CountingOCRAdapter(tmp_path)
152
+ store = InMemoryArtifactStore()
153
+ executor = PipelineExecutor(
154
+ adapter_resolver=lambda n: adapter,
155
+ artifact_store=store,
156
+ )
157
+
158
+ # Premier run : exécute, persiste.
159
+ result1 = executor.run(
160
+ spec=_make_spec(),
161
+ document=DocumentRef(id="d1"),
162
+ initial_inputs=_make_initial_inputs(),
163
+ context=_make_context(),
164
+ )
165
+ assert result1.succeeded
166
+ assert adapter.call_count == 1
167
+ assert len(store) >= 1 # au moins une entrée persistée
168
+
169
+ # Second run identique : doit hit le cache.
170
+ result2 = executor.run(
171
+ spec=_make_spec(),
172
+ document=DocumentRef(id="d1"),
173
+ initial_inputs=_make_initial_inputs(),
174
+ context=_make_context(),
175
+ )
176
+ assert result2.succeeded
177
+ # L'adapter n'a PAS été ré-appelé.
178
+ assert adapter.call_count == 1, (
179
+ "Cache hit raté : l'adapter a été ré-exécuté."
180
+ )
181
+ # Le step est marqué succeeded avec duration ≈ 0.
182
+ cached_step = result2.step_results[0]
183
+ assert cached_step.succeeded
184
+ assert cached_step.duration_seconds == 0.0
185
+
186
+ def test_cache_hit_returns_same_artifact(self, tmp_path: Path) -> None:
187
+ adapter = _CountingOCRAdapter(tmp_path)
188
+ store = InMemoryArtifactStore()
189
+ executor = PipelineExecutor(
190
+ adapter_resolver=lambda n: adapter,
191
+ artifact_store=store,
192
+ )
193
+
194
+ result1 = executor.run(
195
+ spec=_make_spec(),
196
+ document=DocumentRef(id="d1"),
197
+ initial_inputs=_make_initial_inputs(),
198
+ context=_make_context(),
199
+ )
200
+ result2 = executor.run(
201
+ spec=_make_spec(),
202
+ document=DocumentRef(id="d1"),
203
+ initial_inputs=_make_initial_inputs(),
204
+ context=_make_context(),
205
+ )
206
+ # Même artefact retourné (mêmes id, même content_hash).
207
+ a1 = [a for a in result1.artifacts if a.type == ArtifactType.RAW_TEXT][0]
208
+ a2 = [a for a in result2.artifacts if a.type == ArtifactType.RAW_TEXT][0]
209
+ assert a1.id == a2.id
210
+ assert a1.content_hash == a2.content_hash
211
+ assert a1.uri == a2.uri
212
+
213
+
214
+ # ──────────────────────────────────────────────────────────────────────
215
+ # Cache miss — invariants de la clé
216
+ # ──────────────────────────────────────────────────────────────────────
217
+
218
+
219
+ class TestCacheMissOnKeyChange:
220
+ def test_miss_when_code_version_differs(self, tmp_path: Path) -> None:
221
+ adapter = _CountingOCRAdapter(tmp_path)
222
+ store = InMemoryArtifactStore()
223
+ executor = PipelineExecutor(
224
+ adapter_resolver=lambda n: adapter,
225
+ artifact_store=store,
226
+ )
227
+
228
+ executor.run(
229
+ spec=_make_spec(),
230
+ document=DocumentRef(id="d1"),
231
+ initial_inputs=_make_initial_inputs(),
232
+ context=_make_context(code_version="1.0.0"),
233
+ )
234
+ executor.run(
235
+ spec=_make_spec(),
236
+ document=DocumentRef(id="d1"),
237
+ initial_inputs=_make_initial_inputs(),
238
+ context=_make_context(code_version="2.0.0"), # change !
239
+ )
240
+ # Le code_version fait partie de la clé → 2 exécutions distinctes.
241
+ assert adapter.call_count == 2
242
+
243
+ def test_miss_when_step_params_differ(self, tmp_path: Path) -> None:
244
+ adapter = _CountingOCRAdapter(tmp_path)
245
+ store = InMemoryArtifactStore()
246
+ executor = PipelineExecutor(
247
+ adapter_resolver=lambda n: adapter,
248
+ artifact_store=store,
249
+ )
250
+
251
+ spec_a = PipelineSpec(
252
+ name="x",
253
+ initial_inputs=(ArtifactType.IMAGE,),
254
+ steps=(
255
+ PipelineStep(
256
+ id="ocr",
257
+ kind="ocr",
258
+ adapter_name="counting_ocr",
259
+ input_types=(ArtifactType.IMAGE,),
260
+ output_types=(ArtifactType.RAW_TEXT,),
261
+ params={"lang": "fra"},
262
+ ),
263
+ ),
264
+ )
265
+ spec_b = PipelineSpec(
266
+ name="x",
267
+ initial_inputs=(ArtifactType.IMAGE,),
268
+ steps=(
269
+ PipelineStep(
270
+ id="ocr",
271
+ kind="ocr",
272
+ adapter_name="counting_ocr",
273
+ input_types=(ArtifactType.IMAGE,),
274
+ output_types=(ArtifactType.RAW_TEXT,),
275
+ params={"lang": "eng"}, # change !
276
+ ),
277
+ ),
278
+ )
279
+
280
+ executor.run(
281
+ spec=spec_a,
282
+ document=DocumentRef(id="d1"),
283
+ initial_inputs=_make_initial_inputs(),
284
+ context=_make_context(),
285
+ )
286
+ executor.run(
287
+ spec=spec_b,
288
+ document=DocumentRef(id="d1"),
289
+ initial_inputs=_make_initial_inputs(),
290
+ context=_make_context(),
291
+ )
292
+ assert adapter.call_count == 2
293
+
294
+ def test_miss_when_input_content_hash_differs(self, tmp_path: Path) -> None:
295
+ adapter = _CountingOCRAdapter(tmp_path)
296
+ store = InMemoryArtifactStore()
297
+ executor = PipelineExecutor(
298
+ adapter_resolver=lambda n: adapter,
299
+ artifact_store=store,
300
+ )
301
+
302
+ inputs_a = {
303
+ ArtifactType.IMAGE: Artifact(
304
+ id="d1:image", document_id="d1", type=ArtifactType.IMAGE,
305
+ content_hash="a" * 64, uri="/tmp/img.png",
306
+ ),
307
+ }
308
+ inputs_b = {
309
+ ArtifactType.IMAGE: Artifact(
310
+ id="d1:image", document_id="d1", type=ArtifactType.IMAGE,
311
+ content_hash="c" * 64, # change !
312
+ uri="/tmp/img.png",
313
+ ),
314
+ }
315
+
316
+ executor.run(
317
+ spec=_make_spec(),
318
+ document=DocumentRef(id="d1"),
319
+ initial_inputs=inputs_a,
320
+ context=_make_context(),
321
+ )
322
+ executor.run(
323
+ spec=_make_spec(),
324
+ document=DocumentRef(id="d1"),
325
+ initial_inputs=inputs_b,
326
+ context=_make_context(),
327
+ )
328
+ assert adapter.call_count == 2
329
+
330
+
331
+ # ──────────────────────────────────────────────────────────────────────
332
+ # Cache miss — invariants de validité
333
+ # ──────────────────────────────────────────────────────────────────────
334
+
335
+
336
+ class TestCacheMissOnInvalidState:
337
+ def test_miss_when_input_has_no_content_hash(self, tmp_path: Path) -> None:
338
+ """Si un input n'a pas de content_hash, la clé n'est pas
339
+ calculable → bypass complet du cache (pas de hit, pas de
340
+ persistence)."""
341
+ adapter = _CountingOCRAdapter(tmp_path)
342
+ store = InMemoryArtifactStore()
343
+ executor = PipelineExecutor(
344
+ adapter_resolver=lambda n: adapter,
345
+ artifact_store=store,
346
+ )
347
+
348
+ inputs_no_hash = {
349
+ ArtifactType.IMAGE: Artifact(
350
+ id="d1:image", document_id="d1", type=ArtifactType.IMAGE,
351
+ content_hash=None, # pas de hash !
352
+ uri="/tmp/img.png",
353
+ ),
354
+ }
355
+
356
+ executor.run(
357
+ spec=_make_spec(),
358
+ document=DocumentRef(id="d1"),
359
+ initial_inputs=inputs_no_hash,
360
+ context=_make_context(),
361
+ )
362
+ executor.run(
363
+ spec=_make_spec(),
364
+ document=DocumentRef(id="d1"),
365
+ initial_inputs=inputs_no_hash,
366
+ context=_make_context(),
367
+ )
368
+ # Sans hash, on n'a ni hit ni miss déterministe — on
369
+ # exécute systématiquement.
370
+ assert adapter.call_count == 2
371
+ # Le store reste vide (rien n'a été persisté).
372
+ assert len(store) == 0
373
+
374
+ def test_miss_when_cached_uri_disappeared(self, tmp_path: Path) -> None:
375
+ """Si le fichier pointé par l'URI cachée a été supprimé entre
376
+ les deux runs (workspace nettoyé), on doit re-exécuter."""
377
+ adapter = _CountingOCRAdapter(tmp_path)
378
+ store = InMemoryArtifactStore()
379
+ executor = PipelineExecutor(
380
+ adapter_resolver=lambda n: adapter,
381
+ artifact_store=store,
382
+ )
383
+
384
+ executor.run(
385
+ spec=_make_spec(),
386
+ document=DocumentRef(id="d1"),
387
+ initial_inputs=_make_initial_inputs(),
388
+ context=_make_context(),
389
+ )
390
+ assert adapter.call_count == 1
391
+
392
+ # Simule un nettoyage du workspace.
393
+ for f in tmp_path.iterdir():
394
+ if f.is_file():
395
+ f.unlink()
396
+
397
+ executor.run(
398
+ spec=_make_spec(),
399
+ document=DocumentRef(id="d1"),
400
+ initial_inputs=_make_initial_inputs(),
401
+ context=_make_context(),
402
+ )
403
+ # URI cachée pointe vers fichier disparu → cache miss → ré-exec.
404
+ assert adapter.call_count == 2
405
+
406
+
407
+ # ──────────────────────────────────────────────────────────────────────
408
+ # Persistance filesystem — survie inter-process
409
+ # ──────────────────────────────────────────────────────────────────────
410
+
411
+
412
+ class TestFilesystemStorePersistence:
413
+ def test_cache_survives_executor_recreation(self, tmp_path: Path) -> None:
414
+ """Avec un FilesystemArtifactStore partagé, deux instances
415
+ d'executor distinctes (simule un redémarrage) hit le cache
416
+ de la première."""
417
+ store_root = tmp_path / "store"
418
+ adapter = _CountingOCRAdapter(tmp_path / "outputs")
419
+ (tmp_path / "outputs").mkdir()
420
+
421
+ # Premier executor.
422
+ store1 = FilesystemArtifactStore(store_root)
423
+ exe1 = PipelineExecutor(
424
+ adapter_resolver=lambda n: adapter,
425
+ artifact_store=store1,
426
+ )
427
+ exe1.run(
428
+ spec=_make_spec(),
429
+ document=DocumentRef(id="d1"),
430
+ initial_inputs=_make_initial_inputs(),
431
+ context=_make_context(),
432
+ )
433
+ assert adapter.call_count == 1
434
+
435
+ # Second executor avec un NOUVEAU store pointant vers le même
436
+ # filesystem root (simule un redémarrage du process).
437
+ store2 = FilesystemArtifactStore(store_root)
438
+ exe2 = PipelineExecutor(
439
+ adapter_resolver=lambda n: adapter,
440
+ artifact_store=store2,
441
+ )
442
+ exe2.run(
443
+ spec=_make_spec(),
444
+ document=DocumentRef(id="d1"),
445
+ initial_inputs=_make_initial_inputs(),
446
+ context=_make_context(),
447
+ )
448
+ # Le cache filesystem a survécu → hit.
449
+ assert adapter.call_count == 1, (
450
+ "Le cache filesystem n'a pas survécu au re-démarrage."
451
+ )