Spaces:
Sleeping
Sleeping
File size: 5,775 Bytes
3b65839 7d68969 3b65839 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | """``ArtifactCache`` minimal in-memory — Sprint A14-S7.
Cache d'outputs d'étape indexé par ``(content_hashes des inputs +
spec hash + code_version)``. Permet de sauter une étape coûteuse
(typiquement un appel LLM cloud) si elle a déjà été exécutée avec
exactement les mêmes inputs et la même spec.
S7 livre la couche de calcul ; le branchement avec
``PipelineExecutor`` viendra quand un cas d'usage concret de
réutilisation se présentera (probablement S8 quand on aura
l'orchestration corpus-wide qui peut bénéficier d'un cache pour
les retries idempotents).
Garde-fous
----------
- Si **un seul** input n'a pas de ``content_hash``, la clé n'est
pas calculable → ``compute_key`` retourne ``None`` →
``get`` retourne ``None`` (équivalent à un cache miss). Pas de
fallback hasardeux qui pourrait servir des résultats faux.
- Pas de TTL, pas d'éviction LRU — c'est un cache in-memory
simple, taille gardée par le caller (qui peut appeler ``clear()``
s'il veut libérer la mémoire).
- Pas de persistance disque pour S7. Si un caller en a besoin,
on l'ajoutera quand le besoin sera concret (S20+ probablement).
"""
from __future__ import annotations
import hashlib
import json
from typing import Iterable
from picarones.domain.artifacts import Artifact, ArtifactType
from picarones.domain.pipeline_spec import PipelineStep
class ArtifactCache:
"""Cache in-memory d'outputs d'étape.
Thread-safe en lecture/écriture **après** l'init (les opérations
mutantes se font sur un dict — Python GIL garantit l'atomicité
des set/del sur un dict). Pas de mécanisme de freeze technique.
"""
def __init__(self) -> None:
self._store: dict[str, dict[ArtifactType, Artifact]] = {}
# ──────────────────────────────────────────────────────────────────
# Calcul de clé
# ──────────────────────────────────────────────────────────────────
def compute_key(
self,
step: PipelineStep,
input_artifacts: dict[ArtifactType, Artifact],
code_version: str,
) -> str | None:
"""Calcule la clé canonique du cache pour cette exécution.
Retourne ``None`` si **un seul** input n'a pas de
``content_hash`` — convention "ne sert pas un résultat
douteux".
La clé combine :
- les ``content_hash`` triés par ``ArtifactType.value``,
- le hash de la spec du step (sérialisée JSON déterministe),
- le ``code_version``.
Deux exécutions avec exactement les mêmes inputs (au sens
``content_hash``), la même spec et la même version de code
produisent la même clé.
"""
# 1. Inputs : (type → content_hash), tous obligatoires.
try:
input_hashes = sorted(
(t.value, input_artifacts[t].content_hash)
for t in input_artifacts
)
except KeyError:
return None
if any(h is None for _, h in input_hashes):
return None
# 2. Spec du step : on hash la sérialisation pydantic de
# PipelineStep (params, kind, adapter_name, etc.). Tout
# changement dans la spec invalide le cache.
step_payload = step.model_dump(mode="json")
step_blob = json.dumps(
step_payload,
sort_keys=True,
ensure_ascii=False,
separators=(",", ":"),
)
# 3. Composition.
material = json.dumps(
{
"inputs": input_hashes,
"step": step_blob,
"code_version": code_version,
},
sort_keys=True,
ensure_ascii=False,
separators=(",", ":"),
)
return hashlib.sha256(material.encode("utf-8")).hexdigest()
# ──────────────────────────────────────────────────────────────────
# Get / Put / Clear
# ──────────────────────────────────────────────────────────────────
def get(self, key: str | None) -> dict[ArtifactType, Artifact] | None:
"""Retourne les outputs cachés pour la clé, ou ``None``.
Tolère ``key=None`` pour faciliter le pattern :
key = cache.compute_key(...)
cached = cache.get(key)
if cached is not None:
return cached
"""
if key is None:
return None
return self._store.get(key)
def put(
self,
key: str | None,
outputs: dict[ArtifactType, Artifact],
) -> None:
"""Stocke les outputs sous la clé donnée. No-op si
``key=None`` (alignement avec la convention "ne pas servir
un résultat douteux")."""
if key is None:
return
self._store[key] = dict(outputs) # copie défensive
def clear(self) -> None:
"""Vide complètement le cache."""
self._store.clear()
def __len__(self) -> int:
return len(self._store)
def __contains__(self, key: str) -> bool:
return key in self._store
def keys(self) -> Iterable[str]:
"""Liste des clés actuellement en cache (utile pour les tests)."""
return list(self._store.keys())
__all__ = ["ArtifactCache"]
|