Claude
docs(sprint-H.8): cleanup obsolete legacy/shim language in production docstrings
e407ec0 unverified
Raw
History Blame
12.6 kB
"""Détecteurs narratifs liés à *l'historique SQLite + multi-runs* (chantier 5).
3 détecteurs déplacés depuis ``narrative/detectors.py`` :
- :func:`detect_engine_off_baseline` (Sprint 73)
- :func:`detect_engine_unstable` (Sprint 90)
- :func:`detect_regression_in_history` (Sprint 92)
"""
from __future__ import annotations
from picarones.domain.facts import Fact, FactImportance, FactType
from picarones.reports.narrative.registry import register_detector
@register_detector(
FactType.ENGINE_OFF_BASELINE,
priority=150,
importance=FactImportance.MEDIUM,
)
def detect_engine_off_baseline(benchmark_data: dict) -> list[Fact]:
"""Émet un Fact pour chaque moteur dont le CER courant s'écarte
significativement de sa moyenne historique sur le **même corpus**.
Lit ``benchmark_data["baseline_comparisons"]`` (liste de dicts
produits par ``compute_engine_baseline`` du module
``baseline_comparison`` Sprint 73). Si la clé est absente ou
vide, le détecteur reste silencieux — typiquement le cas quand
aucun historique SQLite n'a été chargé.
Garde-fous :
- Si ``n_runs < 5`` (déjà filtré par ``compute_engine_baseline``
qui retourne ``None`` dans ce cas).
- Si ``relative_delta`` n'est pas calculable (baseline = 0).
- Importance ``HIGH`` si ``|relative_delta| ≥ 50 %``, sinon
``MEDIUM``.
"""
comparisons = benchmark_data.get("baseline_comparisons") or []
if not isinstance(comparisons, (list, tuple)):
return []
facts: list[Fact] = []
for comp in comparisons:
if not isinstance(comp, dict):
continue
if not comp.get("off_baseline"):
continue
rel = comp.get("relative_delta")
if rel is None:
continue
engine = comp.get("engine_name")
cer_current = comp.get("cer_current")
cer_hist_mean = comp.get("cer_historical_mean")
n_runs = comp.get("n_runs")
if engine is None or cer_current is None or cer_hist_mean is None:
continue
importance = (
FactImportance.HIGH if abs(float(rel)) >= 0.50
else FactImportance.MEDIUM
)
facts.append(Fact(
type=FactType.ENGINE_OFF_BASELINE,
importance=importance,
payload={
"engine": engine,
"cer_current_pct": round(float(cer_current) * 100, 2),
"cer_historical_mean_pct": round(
float(cer_hist_mean) * 100, 2,
),
"n_runs": int(n_runs or 0),
"relative_delta_pct": round(float(rel) * 100, 1),
"direction": "higher" if float(rel) > 0 else "lower",
},
engines_involved=(engine,),
))
return facts
@register_detector(
FactType.ENGINE_UNSTABLE,
priority=160,
importance=FactImportance.HIGH,
)
def detect_engine_unstable(benchmark_data: dict) -> list[Fact]:
"""Émet un Fact pour chaque moteur dont la stabilité multi-runs
est insuffisante (Sprint 83 + 90).
Lit ``benchmark_data["multirun_stability"]`` : liste de dicts
avec ``engine_name`` + champs de ``compute_multirun_stability``
(cer_cv, identical_run_rate, n_runs, etc.). Si la clé est
absente ou vide, le détecteur reste silencieux — typiquement
le cas quand l'utilisateur n'a pas exécuté `--repeats N`.
Garde-fous :
- ``n_runs ≥ 2`` (déjà filtré par
``compute_multirun_stability`` qui retourne ``None``).
- Déclenche si ``cer_cv > 0.10`` (variance relative > 10 % du
CER moyen) **ou** ``identical_run_rate < 0.50`` (moins
d'une paire de runs sur deux est identique).
- Importance ``HIGH`` (l'instabilité discrédite les
conclusions).
"""
stabilities = benchmark_data.get("multirun_stability") or []
if not isinstance(stabilities, (list, tuple)):
return []
facts: list[Fact] = []
for stab in stabilities:
if not isinstance(stab, dict):
continue
engine = stab.get("engine_name") or stab.get("engine")
if not engine:
continue
n_runs = stab.get("n_runs")
if not isinstance(n_runs, int) or n_runs < 2:
continue
cer_cv = stab.get("cer_cv")
identical_rate = stab.get("identical_run_rate")
# Critères de déclenchement
cv_high = (
isinstance(cer_cv, (int, float)) and float(cer_cv) > 0.10
)
runs_diverge = (
isinstance(identical_rate, (int, float))
and float(identical_rate) < 0.50
)
if not (cv_high or runs_diverge):
continue
payload: dict = {
"engine": engine,
"n_runs": int(n_runs),
}
if isinstance(cer_cv, (int, float)):
payload["cer_cv"] = float(cer_cv)
payload["cer_cv_pct"] = round(float(cer_cv) * 100, 1)
if isinstance(identical_rate, (int, float)):
payload["identical_run_rate"] = float(identical_rate)
payload["identical_run_rate_pct"] = round(
float(identical_rate) * 100, 1,
)
# Champs additionnels pour la phrase de synthèse
cer_mean = stab.get("cer_mean")
cer_stdev = stab.get("cer_stdev")
if isinstance(cer_mean, (int, float)):
payload["cer_mean_pct"] = round(float(cer_mean) * 100, 2)
if isinstance(cer_stdev, (int, float)):
payload["cer_stdev_pct"] = round(float(cer_stdev) * 100, 2)
n_distinct = stab.get("n_distinct_outputs")
if isinstance(n_distinct, int):
payload["n_distinct_outputs"] = int(n_distinct)
facts.append(Fact(
type=FactType.ENGINE_UNSTABLE,
importance=FactImportance.HIGH,
payload=payload,
engines_involved=(engine,),
))
return facts
@register_detector(
FactType.REGRESSION_IN_HISTORY,
priority=170,
importance=FactImportance.MEDIUM,
)
def detect_regression_in_history(benchmark_data: dict) -> list[Fact]:
"""Émet un Fact pour chaque moteur dont l'historique montre
une dégradation : pente positive significative ou rupture
brutale (Sprint 92).
Lit ``benchmark_data["longitudinal_trends"]`` : liste de
dicts produits par ``compute_corpus_longitudinal`` du module
``longitudinal``. Si la clé est absente ou vide, le
détecteur reste silencieux — typiquement le cas quand
aucun historique n'a été chargé ou que la série est trop
courte.
Garde-fous :
- ``n_runs ≥ 3`` (déjà filtré par
``compute_engine_longitudinal``).
- Déclenche si **soit** ``trend.slope`` traduit une
régression d'au moins ``slope_threshold`` (en CER/jour,
défaut équivalent à +1 point CER sur 365 jours), **soit**
``change_point.delta > change_threshold`` (défaut
0.01 = +1 point de CER d'un segment à l'autre).
- Importance ``HIGH`` si la dégradation cumulée
``absolute_delta`` ≥ 5 points de CER.
"""
trends = benchmark_data.get("longitudinal_trends") or []
if not isinstance(trends, (list, tuple)):
return []
slope_threshold = (
0.01 / 365.0 # +1 point de CER sur 365 jours minimum
)
change_threshold = 0.01
facts: list[Fact] = []
for entry in trends:
if not isinstance(entry, dict):
continue
engine = entry.get("engine_name")
if not engine:
continue
n_runs = entry.get("n_runs")
if not isinstance(n_runs, int) or n_runs < 3:
continue
trend = entry.get("trend") or {}
cp = entry.get("change_point")
slope = trend.get("slope")
slope_high = (
isinstance(slope, (int, float))
and float(slope) > slope_threshold
)
cp_high = (
isinstance(cp, dict)
and isinstance(cp.get("delta"), (int, float))
and float(cp["delta"]) > change_threshold
)
if not (slope_high or cp_high):
continue
absolute_delta = entry.get("absolute_delta") or 0.0
importance = (
FactImportance.HIGH
if isinstance(absolute_delta, (int, float))
and abs(float(absolute_delta)) >= 0.05
else FactImportance.MEDIUM
)
payload: dict = {
"engine": engine,
"n_runs": int(n_runs),
"absolute_delta_pct": round(
float(absolute_delta) * 100, 2,
) if isinstance(absolute_delta, (int, float)) else 0.0,
"first_cer_pct": round(
float(entry.get("first_cer") or 0.0) * 100, 2,
),
"last_cer_pct": round(
float(entry.get("last_cer") or 0.0) * 100, 2,
),
}
if slope_high:
payload["slope_per_year_pct"] = round(
float(slope) * 365 * 100, 2,
)
payload["r_squared"] = round(
float(trend.get("r_squared") or 0.0), 3,
)
payload["pattern"] = "trend"
if cp_high:
payload["change_point_timestamp"] = str(
cp.get("timestamp") or "?",
)
payload["change_delta_pct"] = round(
float(cp["delta"]) * 100, 2,
)
payload["mean_before_pct"] = round(
float(cp.get("mean_before") or 0.0) * 100, 2,
)
payload["mean_after_pct"] = round(
float(cp.get("mean_after") or 0.0) * 100, 2,
)
# Si on a aussi une rupture, le pattern domine
payload["pattern"] = (
"trend_and_change_point" if slope_high else "change_point"
)
facts.append(Fact(
type=FactType.REGRESSION_IN_HISTORY,
importance=importance,
payload=payload,
engines_involved=(engine,),
))
return facts
# ---------------------------------------------------------------------------
# Sprint A3 (item B-3) — détecteur IMPORTER_FALLBACK_TRIGGERED
# ---------------------------------------------------------------------------
@register_detector(
FactType.IMPORTER_FALLBACK_TRIGGERED,
# Priorité 180 — en queue, après les détecteurs de tendance historique.
# L'incident d'importer est *informationnel sur l'acquisition*, pas
# sur le ranking ou la performance d'un moteur — il vient logiquement
# après tout le reste de la synthèse.
priority=180,
importance=FactImportance.MEDIUM,
)
def detect_importer_fallback(benchmark_data: dict) -> list[Fact]:
"""Émet un Fact par incident d'importer en mode dégradé.
Lit ``benchmark_data["importer_fallbacks"]`` (liste de dicts
produite par ``picarones.adapters.corpus._fallback_log.consume_fallback_log()``).
Si la clé est absente ou vide, le détecteur reste silencieux —
typiquement le cas pour un benchmark qui n'utilise pas d'importer
distant (corpus local).
Importance HIGH si **plusieurs incidents** sur le même importer
(signal d'une indisponibilité prolongée plutôt qu'un échec
isolé) ; MEDIUM sinon.
"""
fallbacks = benchmark_data.get("importer_fallbacks") or []
if not fallbacks:
return []
# Compte par importer pour détecter les incidents répétés.
counts: dict[str, int] = {}
for entry in fallbacks:
if isinstance(entry, dict):
counts[str(entry.get("importer", "unknown"))] = (
counts.get(str(entry.get("importer", "unknown")), 0) + 1
)
facts: list[Fact] = []
for entry in fallbacks:
if not isinstance(entry, dict):
continue
importer = str(entry.get("importer", "unknown"))
operation = str(entry.get("operation", "unknown"))
importance = (
FactImportance.HIGH if counts.get(importer, 0) >= 2 else FactImportance.MEDIUM
)
payload: dict = {
"importer": importer,
"operation": operation,
"incidents_for_importer": counts.get(importer, 1),
}
if entry.get("error"):
payload["error_repr"] = str(entry["error"])
facts.append(Fact(
type=FactType.IMPORTER_FALLBACK_TRIGGERED,
importance=importance,
payload=payload,
engines_involved=(),
))
return facts