Spaces:
Sleeping
Sleeping
Claude
feat: Sprint A14-S57 β Wave F clΓ΄ture audit (issues #15 #16 #21 #23 #24 #25 #26 #30)
7d68969 unverified | """``ArtifactCache`` minimal in-memory β Sprint A14-S7. | |
| Cache d'outputs d'Γ©tape indexΓ© par ``(content_hashes des inputs + | |
| spec hash + code_version)``. Permet de sauter une Γ©tape coΓ»teuse | |
| (typiquement un appel LLM cloud) si elle a dΓ©jΓ Γ©tΓ© exΓ©cutΓ©e avec | |
| exactement les mΓͺmes inputs et la mΓͺme spec. | |
| S7 livre la couche de calcul ; le branchement avec | |
| ``PipelineExecutor`` viendra quand un cas d'usage concret de | |
| rΓ©utilisation se prΓ©sentera (probablement S8 quand on aura | |
| l'orchestration corpus-wide qui peut bΓ©nΓ©ficier d'un cache pour | |
| les retries idempotents). | |
| Garde-fous | |
| ---------- | |
| - Si **un seul** input n'a pas de ``content_hash``, la clΓ© n'est | |
| pas calculable β ``compute_key`` retourne ``None`` β | |
| ``get`` retourne ``None`` (Γ©quivalent Γ un cache miss). Pas de | |
| fallback hasardeux qui pourrait servir des rΓ©sultats faux. | |
| - Pas de TTL, pas d'Γ©viction LRU β c'est un cache in-memory | |
| simple, taille gardΓ©e par le caller (qui peut appeler ``clear()`` | |
| s'il veut libΓ©rer la mΓ©moire). | |
| - Pas de persistance disque pour S7. Si un caller en a besoin, | |
| on l'ajoutera quand le besoin sera concret (S20+ probablement). | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import json | |
| from typing import Iterable | |
| from picarones.domain.artifacts import Artifact, ArtifactType | |
| from picarones.domain.pipeline_spec import PipelineStep | |
| class ArtifactCache: | |
| """Cache in-memory d'outputs d'Γ©tape. | |
| Thread-safe en lecture/écriture **après** l'init (les opérations | |
| mutantes se font sur un dict β Python GIL garantit l'atomicitΓ© | |
| des set/del sur un dict). Pas de mΓ©canisme de freeze technique. | |
| """ | |
| def __init__(self) -> None: | |
| self._store: dict[str, dict[ArtifactType, Artifact]] = {} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Calcul de clΓ© | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def compute_key( | |
| self, | |
| step: PipelineStep, | |
| input_artifacts: dict[ArtifactType, Artifact], | |
| code_version: str, | |
| ) -> str | None: | |
| """Calcule la clΓ© canonique du cache pour cette exΓ©cution. | |
| Retourne ``None`` si **un seul** input n'a pas de | |
| ``content_hash`` β convention "ne sert pas un rΓ©sultat | |
| douteux". | |
| La clΓ© combine : | |
| - les ``content_hash`` triΓ©s par ``ArtifactType.value``, | |
| - le hash de la spec du step (sΓ©rialisΓ©e JSON dΓ©terministe), | |
| - le ``code_version``. | |
| Deux exΓ©cutions avec exactement les mΓͺmes inputs (au sens | |
| ``content_hash``), la mΓͺme spec et la mΓͺme version de code | |
| produisent la mΓͺme clΓ©. | |
| """ | |
| # 1. Inputs : (type β content_hash), tous obligatoires. | |
| try: | |
| input_hashes = sorted( | |
| (t.value, input_artifacts[t].content_hash) | |
| for t in input_artifacts | |
| ) | |
| except KeyError: | |
| return None | |
| if any(h is None for _, h in input_hashes): | |
| return None | |
| # 2. Spec du step : on hash la sΓ©rialisation pydantic de | |
| # PipelineStep (params, kind, adapter_name, etc.). Tout | |
| # changement dans la spec invalide le cache. | |
| step_payload = step.model_dump(mode="json") | |
| step_blob = json.dumps( | |
| step_payload, | |
| sort_keys=True, | |
| ensure_ascii=False, | |
| separators=(",", ":"), | |
| ) | |
| # 3. Composition. | |
| material = json.dumps( | |
| { | |
| "inputs": input_hashes, | |
| "step": step_blob, | |
| "code_version": code_version, | |
| }, | |
| sort_keys=True, | |
| ensure_ascii=False, | |
| separators=(",", ":"), | |
| ) | |
| return hashlib.sha256(material.encode("utf-8")).hexdigest() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Get / Put / Clear | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get(self, key: str | None) -> dict[ArtifactType, Artifact] | None: | |
| """Retourne les outputs cachΓ©s pour la clΓ©, ou ``None``. | |
| Tolère ``key=None`` pour faciliter le pattern : | |
| key = cache.compute_key(...) | |
| cached = cache.get(key) | |
| if cached is not None: | |
| return cached | |
| """ | |
| if key is None: | |
| return None | |
| return self._store.get(key) | |
| def put( | |
| self, | |
| key: str | None, | |
| outputs: dict[ArtifactType, Artifact], | |
| ) -> None: | |
| """Stocke les outputs sous la clΓ© donnΓ©e. No-op si | |
| ``key=None`` (alignement avec la convention "ne pas servir | |
| un rΓ©sultat douteux").""" | |
| if key is None: | |
| return | |
| self._store[key] = dict(outputs) # copie dΓ©fensive | |
| def clear(self) -> None: | |
| """Vide complètement le cache.""" | |
| self._store.clear() | |
| def __len__(self) -> int: | |
| return len(self._store) | |
| def __contains__(self, key: str) -> bool: | |
| return key in self._store | |
| def keys(self) -> Iterable[str]: | |
| """Liste des clΓ©s actuellement en cache (utile pour les tests).""" | |
| return list(self._store.keys()) | |
| __all__ = ["ArtifactCache"] | |