Picarones / picarones /evaluation /metrics /longitudinal.py
Claude
refactor(evaluation): Sprint A14-S10 — déplacement de 23 fichiers de calcul vers evaluation/metrics/
052fb51 unverified
Raw
History Blame
11.4 kB
"""Métriques longitudinales — Sprint 92 (A.II.9).
Sprint 92 — A.II.9 du plan d'évolution 2026.
Pourquoi ce module
------------------
L'historique SQLite (`core/history.py`, Sprint 8) collecte les
résultats de chaque run de benchmark, mais aucune métrique
n'en sortait dans le rapport. Ce module exploite la série
temporelle des CER d'un moteur pour répondre à deux
questions :
1. **Y a-t-il une tendance ?** Régression linéaire simple
(méthode des moindres carrés) sur ``(t, CER)`` — pente,
ordonnée à l'origine, R², n_runs. Une pente > 0 signale
une régression progressive ; une pente < 0 une amélioration.
2. **Y a-t-il un point de rupture ?** Algorithme de
change-point pur Python (différence de moyennes maximale,
variante de Pettitt simplifiée). Identifie l'index où la
série se sépare en deux segments avec moyennes les plus
différentes — typiquement le run où un modèle a changé de
comportement.
Pas de scipy
------------
Pour rester sans dépendance lourde, on implémente :
- la régression linéaire en pur Python (closed-form OLS) ;
- le change-point par balayage exhaustif (O(N) pour de petits
N — l'historique d'une institution dépasse rarement quelques
centaines de runs).
"""
from __future__ import annotations
import logging
import math
import statistics
from dataclasses import dataclass
from datetime import datetime
from typing import Iterable, Optional
logger = logging.getLogger(__name__)
@dataclass
class LinearTrend:
"""Résultat d'une régression linéaire sur une série CER."""
slope: float
"""Pente (CER par jour). Positif = régression."""
intercept: float
"""Ordonnée à l'origine."""
r_squared: float
"""Qualité de l'ajustement, ∈ [0, 1]."""
n_runs: int
"""Nombre de points utilisés."""
def as_dict(self) -> dict:
return {
"slope": self.slope,
"intercept": self.intercept,
"r_squared": self.r_squared,
"n_runs": self.n_runs,
}
@dataclass
class ChangePointResult:
"""Résultat d'une détection de point de rupture."""
index: int
"""Index de la rupture (0-based, le segment 1 est [0:index],
le segment 2 est [index:N])."""
timestamp: str
"""Timestamp du run à la rupture."""
mean_before: float
mean_after: float
delta: float
"""``mean_after - mean_before``. Positif = régression."""
n_before: int
n_after: int
def as_dict(self) -> dict:
return {
"index": self.index,
"timestamp": self.timestamp,
"mean_before": self.mean_before,
"mean_after": self.mean_after,
"delta": self.delta,
"n_before": self.n_before,
"n_after": self.n_after,
}
def _parse_timestamp(ts: str) -> Optional[float]:
"""Parse un ISO timestamp en jour ordinal float.
Tolère ``YYYY-MM-DD`` et ``YYYY-MM-DDTHH:MM:SS``. Retourne
``None`` si non parsable.
"""
if not ts:
return None
formats = (
"%Y-%m-%dT%H:%M:%S.%f",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d",
)
for fmt in formats:
try:
dt = datetime.strptime(ts.split("+")[0].split("Z")[0], fmt)
return dt.toordinal() + (
dt.hour * 3600 + dt.minute * 60 + dt.second
) / 86400.0
except ValueError:
continue
return None
def compute_linear_trend(
cer_series: Iterable[tuple[str, float]],
) -> Optional[LinearTrend]:
"""Régression linéaire OLS sur une série temporelle de CER.
Parameters
----------
cer_series:
Itérable de ``(timestamp_iso, cer)``. Au moins 2 points
valides requis.
Returns
-------
LinearTrend | None
``None`` si moins de 2 points ou si tous les timestamps
sont identiques (variance nulle sur t).
"""
points: list[tuple[float, float]] = []
for ts, cer in cer_series:
t = _parse_timestamp(ts)
if t is None or cer is None:
continue
try:
cer_f = float(cer)
except (TypeError, ValueError):
continue
points.append((t, cer_f))
n = len(points)
if n < 2:
return None
xs = [p[0] for p in points]
ys = [p[1] for p in points]
x_mean = statistics.fmean(xs)
y_mean = statistics.fmean(ys)
sxx = sum((x - x_mean) ** 2 for x in xs)
sxy = sum((x - x_mean) * (y - y_mean) for x, y in zip(xs, ys))
if sxx == 0:
return None
slope = sxy / sxx
intercept = y_mean - slope * x_mean
syy = sum((y - y_mean) ** 2 for y in ys)
if syy == 0:
# Tous les CER sont égaux → R² mathématiquement indéfini ;
# on retourne 1.0 (parfaite "non-tendance").
r_squared = 1.0
else:
ss_res = sum(
(y - (slope * x + intercept)) ** 2
for x, y in zip(xs, ys)
)
r_squared = max(0.0, 1.0 - ss_res / syy)
return LinearTrend(
slope=slope,
intercept=intercept,
r_squared=r_squared,
n_runs=n,
)
def detect_change_point(
cer_series: Iterable[tuple[str, float]],
min_segment_size: int = 3,
) -> Optional[ChangePointResult]:
"""Détecte le point de rupture maximisant l'écart de moyennes.
Algorithme : balayage des indices ``i`` où la série se
sépare en deux segments d'au moins ``min_segment_size``
points chacun ; on retient l'index où ``|mean_after -
mean_before|`` est maximal. Variante simplifiée de Pettitt.
Parameters
----------
cer_series:
Itérable de ``(timestamp_iso, cer)``.
min_segment_size:
Taille minimale des deux segments. Défaut 3.
Returns
-------
ChangePointResult | None
``None`` si la série a moins de ``2 × min_segment_size``
points valides.
"""
points: list[tuple[str, float, float]] = []
for ts, cer in cer_series:
t = _parse_timestamp(ts)
if t is None or cer is None:
continue
try:
cer_f = float(cer)
except (TypeError, ValueError):
continue
points.append((ts, t, cer_f))
if len(points) < 2 * min_segment_size:
return None
points.sort(key=lambda p: p[1])
n = len(points)
best_index = -1
best_abs_delta = -1.0
best_delta = 0.0
best_mean_before = 0.0
best_mean_after = 0.0
for i in range(min_segment_size, n - min_segment_size + 1):
before = [p[2] for p in points[:i]]
after = [p[2] for p in points[i:]]
mean_b = statistics.fmean(before)
mean_a = statistics.fmean(after)
delta = mean_a - mean_b
abs_delta = abs(delta)
if abs_delta > best_abs_delta:
best_abs_delta = abs_delta
best_index = i
best_delta = delta
best_mean_before = mean_b
best_mean_after = mean_a
if best_index < 0:
return None
return ChangePointResult(
index=best_index,
timestamp=points[best_index][0],
mean_before=best_mean_before,
mean_after=best_mean_after,
delta=best_delta,
n_before=best_index,
n_after=n - best_index,
)
def compute_engine_longitudinal(
history_entries: Iterable,
engine_name: str,
corpus_name: Optional[str] = None,
*,
min_runs_for_trend: int = 3,
min_segment_size: int = 3,
change_point_threshold: float = 0.01,
) -> Optional[dict]:
"""Calcule trend + change_point pour un moteur.
Parameters
----------
history_entries:
Liste de ``HistoryEntry`` (ou dicts compatibles).
engine_name:
Filtre sur le nom du moteur.
corpus_name:
Filtre optionnel sur le corpus. ``None`` (défaut) : tous
les corpus.
min_runs_for_trend:
Minimum de runs pour calculer une tendance.
min_segment_size:
Taille minimale des segments pour le change-point.
change_point_threshold:
Magnitude absolue minimale du delta (en CER) pour
retenir le change-point. Défaut 0.01 (1 point de CER).
Returns
-------
dict | None
``{
"engine_name", "corpus_name", "n_runs", "trend",
"change_point", # ou None
"first_timestamp", "last_timestamp",
"first_cer", "last_cer", "absolute_delta_pct",
}`` ou ``None`` si moins de ``min_runs_for_trend`` runs.
"""
series: list[tuple[str, float]] = []
for entry in history_entries:
if hasattr(entry, "as_dict"):
data = entry.as_dict()
else:
data = entry
if data.get("engine_name") != engine_name:
continue
if corpus_name is not None and data.get("corpus_name") != corpus_name:
continue
cer = data.get("cer_mean")
ts = data.get("timestamp")
if cer is None or ts is None:
continue
series.append((ts, float(cer)))
if len(series) < min_runs_for_trend:
return None
series.sort(key=lambda p: _parse_timestamp(p[0]) or 0.0)
trend = compute_linear_trend(series)
cp = detect_change_point(series, min_segment_size=min_segment_size)
if cp is not None and abs(cp.delta) < change_point_threshold:
cp = None
first_ts, first_cer = series[0]
last_ts, last_cer = series[-1]
return {
"engine_name": engine_name,
"corpus_name": corpus_name,
"n_runs": len(series),
"trend": trend.as_dict() if trend else None,
"change_point": cp.as_dict() if cp else None,
"first_timestamp": first_ts,
"last_timestamp": last_ts,
"first_cer": first_cer,
"last_cer": last_cer,
"absolute_delta": last_cer - first_cer,
"absolute_delta_pct": round((last_cer - first_cer) * 100, 2),
}
def compute_corpus_longitudinal(
history_entries: Iterable,
corpus_name: Optional[str] = None,
*,
min_runs_for_trend: int = 3,
min_segment_size: int = 3,
change_point_threshold: float = 0.01,
) -> list[dict]:
"""Pour chaque moteur présent dans l'historique sur ``corpus_name``,
calcule trend + change_point.
Returns
-------
list[dict]
Une entrée par moteur (filtrée), liste vide si rien.
"""
entries = list(history_entries)
engines: set[str] = set()
for entry in entries:
data = entry.as_dict() if hasattr(entry, "as_dict") else entry
if corpus_name is not None and data.get("corpus_name") != corpus_name:
continue
name = data.get("engine_name")
if name:
engines.add(name)
out: list[dict] = []
for engine in sorted(engines):
result = compute_engine_longitudinal(
entries, engine, corpus_name=corpus_name,
min_runs_for_trend=min_runs_for_trend,
min_segment_size=min_segment_size,
change_point_threshold=change_point_threshold,
)
if result is not None:
out.append(result)
return out
__all__ = [
"LinearTrend",
"ChangePointResult",
"compute_linear_trend",
"detect_change_point",
"compute_engine_longitudinal",
"compute_corpus_longitudinal",
]
# Marqueur d'évitement d'import inutilisé (math)
_ = math