Claude
feat: Sprint A14-S57 β€” Wave F clΓ΄ture audit (issues #15 #16 #21 #23 #24 #25 #26 #30)
7d68969 unverified
Raw
History Blame
5.78 kB
"""``ArtifactCache`` minimal in-memory β€” Sprint A14-S7.
Cache d'outputs d'Γ©tape indexΓ© par ``(content_hashes des inputs +
spec hash + code_version)``. Permet de sauter une Γ©tape coΓ»teuse
(typiquement un appel LLM cloud) si elle a dΓ©jΓ  Γ©tΓ© exΓ©cutΓ©e avec
exactement les mΓͺmes inputs et la mΓͺme spec.
S7 livre la couche de calcul ; le branchement avec
``PipelineExecutor`` viendra quand un cas d'usage concret de
rΓ©utilisation se prΓ©sentera (probablement S8 quand on aura
l'orchestration corpus-wide qui peut bΓ©nΓ©ficier d'un cache pour
les retries idempotents).
Garde-fous
----------
- Si **un seul** input n'a pas de ``content_hash``, la clΓ© n'est
pas calculable β†’ ``compute_key`` retourne ``None`` β†’
``get`` retourne ``None`` (Γ©quivalent Γ  un cache miss). Pas de
fallback hasardeux qui pourrait servir des rΓ©sultats faux.
- Pas de TTL, pas d'Γ©viction LRU β€” c'est un cache in-memory
simple, taille gardΓ©e par le caller (qui peut appeler ``clear()``
s'il veut libΓ©rer la mΓ©moire).
- Pas de persistance disque pour S7. Si un caller en a besoin,
on l'ajoutera quand le besoin sera concret (S20+ probablement).
"""
from __future__ import annotations
import hashlib
import json
from typing import Iterable
from picarones.domain.artifacts import Artifact, ArtifactType
from picarones.domain.pipeline_spec import PipelineStep
class ArtifactCache:
"""Cache in-memory d'outputs d'Γ©tape.
Thread-safe en lecture/écriture **après** l'init (les opérations
mutantes se font sur un dict β€” Python GIL garantit l'atomicitΓ©
des set/del sur un dict). Pas de mΓ©canisme de freeze technique.
"""
def __init__(self) -> None:
self._store: dict[str, dict[ArtifactType, Artifact]] = {}
# ──────────────────────────────────────────────────────────────────
# Calcul de clΓ©
# ──────────────────────────────────────────────────────────────────
def compute_key(
self,
step: PipelineStep,
input_artifacts: dict[ArtifactType, Artifact],
code_version: str,
) -> str | None:
"""Calcule la clΓ© canonique du cache pour cette exΓ©cution.
Retourne ``None`` si **un seul** input n'a pas de
``content_hash`` β€” convention "ne sert pas un rΓ©sultat
douteux".
La clΓ© combine :
- les ``content_hash`` triΓ©s par ``ArtifactType.value``,
- le hash de la spec du step (sΓ©rialisΓ©e JSON dΓ©terministe),
- le ``code_version``.
Deux exΓ©cutions avec exactement les mΓͺmes inputs (au sens
``content_hash``), la mΓͺme spec et la mΓͺme version de code
produisent la mΓͺme clΓ©.
"""
# 1. Inputs : (type β†’ content_hash), tous obligatoires.
try:
input_hashes = sorted(
(t.value, input_artifacts[t].content_hash)
for t in input_artifacts
)
except KeyError:
return None
if any(h is None for _, h in input_hashes):
return None
# 2. Spec du step : on hash la sΓ©rialisation pydantic de
# PipelineStep (params, kind, adapter_name, etc.). Tout
# changement dans la spec invalide le cache.
step_payload = step.model_dump(mode="json")
step_blob = json.dumps(
step_payload,
sort_keys=True,
ensure_ascii=False,
separators=(",", ":"),
)
# 3. Composition.
material = json.dumps(
{
"inputs": input_hashes,
"step": step_blob,
"code_version": code_version,
},
sort_keys=True,
ensure_ascii=False,
separators=(",", ":"),
)
return hashlib.sha256(material.encode("utf-8")).hexdigest()
# ──────────────────────────────────────────────────────────────────
# Get / Put / Clear
# ──────────────────────────────────────────────────────────────────
def get(self, key: str | None) -> dict[ArtifactType, Artifact] | None:
"""Retourne les outputs cachΓ©s pour la clΓ©, ou ``None``.
Tolère ``key=None`` pour faciliter le pattern :
key = cache.compute_key(...)
cached = cache.get(key)
if cached is not None:
return cached
"""
if key is None:
return None
return self._store.get(key)
def put(
self,
key: str | None,
outputs: dict[ArtifactType, Artifact],
) -> None:
"""Stocke les outputs sous la clΓ© donnΓ©e. No-op si
``key=None`` (alignement avec la convention "ne pas servir
un rΓ©sultat douteux")."""
if key is None:
return
self._store[key] = dict(outputs) # copie dΓ©fensive
def clear(self) -> None:
"""Vide complètement le cache."""
self._store.clear()
def __len__(self) -> int:
return len(self._store)
def __contains__(self, key: str) -> bool:
return key in self._store
def keys(self) -> Iterable[str]:
"""Liste des clΓ©s actuellement en cache (utile pour les tests)."""
return list(self._store.keys())
__all__ = ["ArtifactCache"]