Picarones / picarones /pipeline /validation.py
Claude
feat(pipeline): Sprint A14-S6 — PipelineSpec déclaratif + validation + YAML round-trip
b9ff8de unverified
Raw
History Blame
7.66 kB
"""``validate_spec`` — Sprint A14-S6.
Validation statique d'une ``PipelineSpec`` : vérifier que les
types s'enchaînent, qu'il n'y a pas d'IDs dupliqués, que les
références ``inputs_from`` pointent bien vers des étapes
antérieures qui produisent le bon type, et (optionnellement) que
les ``adapter_name`` existent dans un registre fourni.
S'exécute **sans instancier aucun adapter** — c'est le bénéfice
clé de la séparation déclaratif/runtime du S6.
API :
>>> errors = validate_spec(spec)
>>> if errors:
... for e in errors:
... print(f"{e.step_id}: {e.message}")
Le caller décide de la suite — typiquement un service applicatif
refuse de démarrer un run si la spec a des erreurs.
Anti-sur-ingénierie
-------------------
Pas de détection de cycles graphes complexe (le DAG est exprimé
par ordre des steps, donc impossible de référencer une étape
postérieure : si tu as une boucle, c'est qu'une référence pointe
vers un nom inconnu, déjà détecté).
Pas de validation des params (chaque adapter validera les siens
au moment de l'exécution — le format libre des params est un
choix assumé).
"""
from __future__ import annotations
from pydantic import BaseModel, ConfigDict
from picarones.domain.artifacts import ArtifactType
from picarones.pipeline.spec import INITIAL_STEP_ID, PipelineSpec, PipelineStep
class ValidationError(BaseModel):
"""Une erreur de validation d'une ``PipelineSpec``.
Format structuré pour faciliter le rendu (CLI, rapport, JSON).
Volontairement plat — pas de hiérarchie d'erreurs ; on ajoute
un ``code`` discriminant si un caller en a besoin.
"""
model_config = ConfigDict(frozen=True, extra="forbid")
step_id: str | None
"""Step concerné, ou ``None`` pour les erreurs globales (DAG vide,
ID dupliqué détecté entre deux steps...)."""
code: str
"""Identifiant court (``"duplicate_id"``, ``"unknown_adapter"``,
``"missing_input"``, ``"unknown_input_source"``, ...). Permet
à un test d'asserter sur le code plutôt que sur le message
français.
"""
message: str
"""Description humainement lisible (français)."""
def validate_spec(
spec: PipelineSpec,
available_adapters: set[str] | None = None,
) -> list[ValidationError]:
"""Vérifie une ``PipelineSpec`` et retourne la liste des erreurs.
Parameters
----------
spec:
La spec à valider.
available_adapters:
Set des noms d'adapters connus. Si fourni, chaque
``adapter_name`` du DAG est vérifié. Si ``None`` (défaut),
cette validation est sautée — utile pour les tests qui
valident la cohérence d'un YAML sans avoir le runtime
chargé.
Returns
-------
list[ValidationError]
Liste vide si la spec est valide ; sinon un ou plusieurs
problèmes (ne s'arrête pas à la première erreur — le
caller veut tout voir d'un coup).
"""
errors: list[ValidationError] = []
# -- 0. Steps absents
if not spec.steps:
errors.append(ValidationError(
step_id=None,
code="empty_pipeline",
message="pipeline vide : au moins une étape est requise",
))
return errors # impossible de continuer
# -- 1. IDs dupliqués
seen_ids: dict[str, int] = {}
for i, step in enumerate(spec.steps):
if step.id in seen_ids:
errors.append(ValidationError(
step_id=step.id,
code="duplicate_id",
message=(
f"id dupliqué : '{step.id}' apparaît à l'étape {i} "
f"et précédemment à {seen_ids[step.id]}"
),
))
else:
seen_ids[step.id] = i
# -- 2. Adapter inconnu (si registre fourni)
if available_adapters is not None:
for step in spec.steps:
if step.adapter_name not in available_adapters:
errors.append(ValidationError(
step_id=step.id,
code="unknown_adapter",
message=(
f"adapter '{step.adapter_name}' non disponible. "
f"Adapters connus : {sorted(available_adapters)}"
),
))
# -- 3. Cohérence des types et des références inputs_from
# On simule un parcours topologique en ordre de spec.steps.
# À chaque étape :
# a) Tout type de input_types doit être disponible (soit
# initial, soit produit par une étape antérieure).
# b) Si inputs_from[type] = "src", "src" doit être une étape
# antérieure connue (ou "__initial__") qui produit ce type.
# Map { step_id (ou "__initial__") -> set(types qu'elle produit) }.
step_outputs: dict[str, set[ArtifactType]] = {
INITIAL_STEP_ID: set(spec.initial_inputs),
}
# Set des types disponibles à un instant t (latest seulement).
available: set[ArtifactType] = set(spec.initial_inputs)
for step in spec.steps:
errors.extend(_validate_step_against_state(
step=step,
step_outputs=step_outputs,
available=available,
))
# Mise à jour de l'état pour les étapes suivantes.
step_outputs[step.id] = set(step.output_types)
available.update(step.output_types)
return errors
def _validate_step_against_state(
*,
step: PipelineStep,
step_outputs: dict[str, set[ArtifactType]],
available: set[ArtifactType],
) -> list[ValidationError]:
"""Valide une étape donnée contre l'état des types
disponibles et des outputs des étapes antérieures."""
errors: list[ValidationError] = []
# 3.a — entrées disponibles
missing = [t for t in step.input_types if t not in available]
if missing:
errors.append(ValidationError(
step_id=step.id,
code="missing_input",
message=(
f"types d'entrée non disponibles : "
f"{[t.value for t in missing]}. "
f"Disponibles : {sorted(t.value for t in available)}"
),
))
# 3.b — références inputs_from
for ref_type, ref_step in step.inputs_from.items():
if ref_type not in step.input_types:
errors.append(ValidationError(
step_id=step.id,
code="inputs_from_unused",
message=(
f"inputs_from[{ref_type.value}]={ref_step!r} "
"mais l'étape ne consomme pas ce type "
f"(input_types = {[t.value for t in step.input_types]})"
),
))
continue
if ref_step not in step_outputs:
errors.append(ValidationError(
step_id=step.id,
code="unknown_input_source",
message=(
f"inputs_from[{ref_type.value}]={ref_step!r} "
"ne désigne pas une étape antérieure connue "
f"({INITIAL_STEP_ID!r} pour les entrées initiales)"
),
))
continue
if ref_type not in step_outputs[ref_step]:
errors.append(ValidationError(
step_id=step.id,
code="source_does_not_produce_type",
message=(
f"inputs_from[{ref_type.value}]={ref_step!r} "
f"mais '{ref_step}' ne produit pas {ref_type.value!r}"
),
))
return errors
__all__ = ["validate_spec", "ValidationError"]