File size: 5,775 Bytes
3b65839
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7d68969
3b65839
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""``ArtifactCache`` minimal in-memory — Sprint A14-S7.

Cache d'outputs d'étape indexé par ``(content_hashes des inputs +
spec hash + code_version)``.  Permet de sauter une étape coûteuse
(typiquement un appel LLM cloud) si elle a déjà été exécutée avec
exactement les mêmes inputs et la même spec.

S7 livre la couche de calcul ; le branchement avec
``PipelineExecutor`` viendra quand un cas d'usage concret de
réutilisation se présentera (probablement S8 quand on aura
l'orchestration corpus-wide qui peut bénéficier d'un cache pour
les retries idempotents).

Garde-fous
----------
- Si **un seul** input n'a pas de ``content_hash``, la clé n'est
  pas calculable → ``compute_key`` retourne ``None`` →
  ``get`` retourne ``None`` (équivalent à un cache miss).  Pas de
  fallback hasardeux qui pourrait servir des résultats faux.
- Pas de TTL, pas d'éviction LRU — c'est un cache in-memory
  simple, taille gardée par le caller (qui peut appeler ``clear()``
  s'il veut libérer la mémoire).
- Pas de persistance disque pour S7.  Si un caller en a besoin,
  on l'ajoutera quand le besoin sera concret (S20+ probablement).
"""

from __future__ import annotations

import hashlib
import json
from typing import Iterable

from picarones.domain.artifacts import Artifact, ArtifactType
from picarones.domain.pipeline_spec import PipelineStep


class ArtifactCache:
    """Cache in-memory d'outputs d'étape.

    Thread-safe en lecture/écriture **après** l'init (les opérations
    mutantes se font sur un dict — Python GIL garantit l'atomicité
    des set/del sur un dict).  Pas de mécanisme de freeze technique.
    """

    def __init__(self) -> None:
        self._store: dict[str, dict[ArtifactType, Artifact]] = {}

    # ──────────────────────────────────────────────────────────────────
    # Calcul de clé
    # ──────────────────────────────────────────────────────────────────

    def compute_key(
        self,
        step: PipelineStep,
        input_artifacts: dict[ArtifactType, Artifact],
        code_version: str,
    ) -> str | None:
        """Calcule la clé canonique du cache pour cette exécution.

        Retourne ``None`` si **un seul** input n'a pas de
        ``content_hash`` — convention "ne sert pas un résultat
        douteux".

        La clé combine :

        - les ``content_hash`` triés par ``ArtifactType.value``,
        - le hash de la spec du step (sérialisée JSON déterministe),
        - le ``code_version``.

        Deux exécutions avec exactement les mêmes inputs (au sens
        ``content_hash``), la même spec et la même version de code
        produisent la même clé.
        """
        # 1. Inputs : (type → content_hash), tous obligatoires.
        try:
            input_hashes = sorted(
                (t.value, input_artifacts[t].content_hash)
                for t in input_artifacts
            )
        except KeyError:
            return None
        if any(h is None for _, h in input_hashes):
            return None

        # 2. Spec du step : on hash la sérialisation pydantic de
        #    PipelineStep (params, kind, adapter_name, etc.).  Tout
        #    changement dans la spec invalide le cache.
        step_payload = step.model_dump(mode="json")
        step_blob = json.dumps(
            step_payload,
            sort_keys=True,
            ensure_ascii=False,
            separators=(",", ":"),
        )

        # 3. Composition.
        material = json.dumps(
            {
                "inputs": input_hashes,
                "step": step_blob,
                "code_version": code_version,
            },
            sort_keys=True,
            ensure_ascii=False,
            separators=(",", ":"),
        )
        return hashlib.sha256(material.encode("utf-8")).hexdigest()

    # ──────────────────────────────────────────────────────────────────
    # Get / Put / Clear
    # ──────────────────────────────────────────────────────────────────

    def get(self, key: str | None) -> dict[ArtifactType, Artifact] | None:
        """Retourne les outputs cachés pour la clé, ou ``None``.

        Tolère ``key=None`` pour faciliter le pattern :

            key = cache.compute_key(...)
            cached = cache.get(key)
            if cached is not None:
                return cached
        """
        if key is None:
            return None
        return self._store.get(key)

    def put(
        self,
        key: str | None,
        outputs: dict[ArtifactType, Artifact],
    ) -> None:
        """Stocke les outputs sous la clé donnée.  No-op si
        ``key=None`` (alignement avec la convention "ne pas servir
        un résultat douteux")."""
        if key is None:
            return
        self._store[key] = dict(outputs)  # copie défensive

    def clear(self) -> None:
        """Vide complètement le cache."""
        self._store.clear()

    def __len__(self) -> int:
        return len(self._store)

    def __contains__(self, key: str) -> bool:
        return key in self._store

    def keys(self) -> Iterable[str]:
        """Liste des clés actuellement en cache (utile pour les tests)."""
        return list(self._store.keys())


__all__ = ["ArtifactCache"]