Spaces:
Sleeping
Sleeping
Claude
refactor(evaluation): Sprint A14-S10 — déplacement de 23 fichiers de calcul vers evaluation/metrics/
052fb51 unverified | """Métriques longitudinales — Sprint 92 (A.II.9). | |
| Sprint 92 — A.II.9 du plan d'évolution 2026. | |
| Pourquoi ce module | |
| ------------------ | |
| L'historique SQLite (`core/history.py`, Sprint 8) collecte les | |
| résultats de chaque run de benchmark, mais aucune métrique | |
| n'en sortait dans le rapport. Ce module exploite la série | |
| temporelle des CER d'un moteur pour répondre à deux | |
| questions : | |
| 1. **Y a-t-il une tendance ?** Régression linéaire simple | |
| (méthode des moindres carrés) sur ``(t, CER)`` — pente, | |
| ordonnée à l'origine, R², n_runs. Une pente > 0 signale | |
| une régression progressive ; une pente < 0 une amélioration. | |
| 2. **Y a-t-il un point de rupture ?** Algorithme de | |
| change-point pur Python (différence de moyennes maximale, | |
| variante de Pettitt simplifiée). Identifie l'index où la | |
| série se sépare en deux segments avec moyennes les plus | |
| différentes — typiquement le run où un modèle a changé de | |
| comportement. | |
| Pas de scipy | |
| ------------ | |
| Pour rester sans dépendance lourde, on implémente : | |
| - la régression linéaire en pur Python (closed-form OLS) ; | |
| - le change-point par balayage exhaustif (O(N) pour de petits | |
| N — l'historique d'une institution dépasse rarement quelques | |
| centaines de runs). | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import math | |
| import statistics | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from typing import Iterable, Optional | |
| logger = logging.getLogger(__name__) | |
| class LinearTrend: | |
| """Résultat d'une régression linéaire sur une série CER.""" | |
| slope: float | |
| """Pente (CER par jour). Positif = régression.""" | |
| intercept: float | |
| """Ordonnée à l'origine.""" | |
| r_squared: float | |
| """Qualité de l'ajustement, ∈ [0, 1].""" | |
| n_runs: int | |
| """Nombre de points utilisés.""" | |
| def as_dict(self) -> dict: | |
| return { | |
| "slope": self.slope, | |
| "intercept": self.intercept, | |
| "r_squared": self.r_squared, | |
| "n_runs": self.n_runs, | |
| } | |
| class ChangePointResult: | |
| """Résultat d'une détection de point de rupture.""" | |
| index: int | |
| """Index de la rupture (0-based, le segment 1 est [0:index], | |
| le segment 2 est [index:N]).""" | |
| timestamp: str | |
| """Timestamp du run à la rupture.""" | |
| mean_before: float | |
| mean_after: float | |
| delta: float | |
| """``mean_after - mean_before``. Positif = régression.""" | |
| n_before: int | |
| n_after: int | |
| def as_dict(self) -> dict: | |
| return { | |
| "index": self.index, | |
| "timestamp": self.timestamp, | |
| "mean_before": self.mean_before, | |
| "mean_after": self.mean_after, | |
| "delta": self.delta, | |
| "n_before": self.n_before, | |
| "n_after": self.n_after, | |
| } | |
| def _parse_timestamp(ts: str) -> Optional[float]: | |
| """Parse un ISO timestamp en jour ordinal float. | |
| Tolère ``YYYY-MM-DD`` et ``YYYY-MM-DDTHH:MM:SS``. Retourne | |
| ``None`` si non parsable. | |
| """ | |
| if not ts: | |
| return None | |
| formats = ( | |
| "%Y-%m-%dT%H:%M:%S.%f", | |
| "%Y-%m-%dT%H:%M:%S", | |
| "%Y-%m-%d %H:%M:%S", | |
| "%Y-%m-%d", | |
| ) | |
| for fmt in formats: | |
| try: | |
| dt = datetime.strptime(ts.split("+")[0].split("Z")[0], fmt) | |
| return dt.toordinal() + ( | |
| dt.hour * 3600 + dt.minute * 60 + dt.second | |
| ) / 86400.0 | |
| except ValueError: | |
| continue | |
| return None | |
| def compute_linear_trend( | |
| cer_series: Iterable[tuple[str, float]], | |
| ) -> Optional[LinearTrend]: | |
| """Régression linéaire OLS sur une série temporelle de CER. | |
| Parameters | |
| ---------- | |
| cer_series: | |
| Itérable de ``(timestamp_iso, cer)``. Au moins 2 points | |
| valides requis. | |
| Returns | |
| ------- | |
| LinearTrend | None | |
| ``None`` si moins de 2 points ou si tous les timestamps | |
| sont identiques (variance nulle sur t). | |
| """ | |
| points: list[tuple[float, float]] = [] | |
| for ts, cer in cer_series: | |
| t = _parse_timestamp(ts) | |
| if t is None or cer is None: | |
| continue | |
| try: | |
| cer_f = float(cer) | |
| except (TypeError, ValueError): | |
| continue | |
| points.append((t, cer_f)) | |
| n = len(points) | |
| if n < 2: | |
| return None | |
| xs = [p[0] for p in points] | |
| ys = [p[1] for p in points] | |
| x_mean = statistics.fmean(xs) | |
| y_mean = statistics.fmean(ys) | |
| sxx = sum((x - x_mean) ** 2 for x in xs) | |
| sxy = sum((x - x_mean) * (y - y_mean) for x, y in zip(xs, ys)) | |
| if sxx == 0: | |
| return None | |
| slope = sxy / sxx | |
| intercept = y_mean - slope * x_mean | |
| syy = sum((y - y_mean) ** 2 for y in ys) | |
| if syy == 0: | |
| # Tous les CER sont égaux → R² mathématiquement indéfini ; | |
| # on retourne 1.0 (parfaite "non-tendance"). | |
| r_squared = 1.0 | |
| else: | |
| ss_res = sum( | |
| (y - (slope * x + intercept)) ** 2 | |
| for x, y in zip(xs, ys) | |
| ) | |
| r_squared = max(0.0, 1.0 - ss_res / syy) | |
| return LinearTrend( | |
| slope=slope, | |
| intercept=intercept, | |
| r_squared=r_squared, | |
| n_runs=n, | |
| ) | |
| def detect_change_point( | |
| cer_series: Iterable[tuple[str, float]], | |
| min_segment_size: int = 3, | |
| ) -> Optional[ChangePointResult]: | |
| """Détecte le point de rupture maximisant l'écart de moyennes. | |
| Algorithme : balayage des indices ``i`` où la série se | |
| sépare en deux segments d'au moins ``min_segment_size`` | |
| points chacun ; on retient l'index où ``|mean_after - | |
| mean_before|`` est maximal. Variante simplifiée de Pettitt. | |
| Parameters | |
| ---------- | |
| cer_series: | |
| Itérable de ``(timestamp_iso, cer)``. | |
| min_segment_size: | |
| Taille minimale des deux segments. Défaut 3. | |
| Returns | |
| ------- | |
| ChangePointResult | None | |
| ``None`` si la série a moins de ``2 × min_segment_size`` | |
| points valides. | |
| """ | |
| points: list[tuple[str, float, float]] = [] | |
| for ts, cer in cer_series: | |
| t = _parse_timestamp(ts) | |
| if t is None or cer is None: | |
| continue | |
| try: | |
| cer_f = float(cer) | |
| except (TypeError, ValueError): | |
| continue | |
| points.append((ts, t, cer_f)) | |
| if len(points) < 2 * min_segment_size: | |
| return None | |
| points.sort(key=lambda p: p[1]) | |
| n = len(points) | |
| best_index = -1 | |
| best_abs_delta = -1.0 | |
| best_delta = 0.0 | |
| best_mean_before = 0.0 | |
| best_mean_after = 0.0 | |
| for i in range(min_segment_size, n - min_segment_size + 1): | |
| before = [p[2] for p in points[:i]] | |
| after = [p[2] for p in points[i:]] | |
| mean_b = statistics.fmean(before) | |
| mean_a = statistics.fmean(after) | |
| delta = mean_a - mean_b | |
| abs_delta = abs(delta) | |
| if abs_delta > best_abs_delta: | |
| best_abs_delta = abs_delta | |
| best_index = i | |
| best_delta = delta | |
| best_mean_before = mean_b | |
| best_mean_after = mean_a | |
| if best_index < 0: | |
| return None | |
| return ChangePointResult( | |
| index=best_index, | |
| timestamp=points[best_index][0], | |
| mean_before=best_mean_before, | |
| mean_after=best_mean_after, | |
| delta=best_delta, | |
| n_before=best_index, | |
| n_after=n - best_index, | |
| ) | |
| def compute_engine_longitudinal( | |
| history_entries: Iterable, | |
| engine_name: str, | |
| corpus_name: Optional[str] = None, | |
| *, | |
| min_runs_for_trend: int = 3, | |
| min_segment_size: int = 3, | |
| change_point_threshold: float = 0.01, | |
| ) -> Optional[dict]: | |
| """Calcule trend + change_point pour un moteur. | |
| Parameters | |
| ---------- | |
| history_entries: | |
| Liste de ``HistoryEntry`` (ou dicts compatibles). | |
| engine_name: | |
| Filtre sur le nom du moteur. | |
| corpus_name: | |
| Filtre optionnel sur le corpus. ``None`` (défaut) : tous | |
| les corpus. | |
| min_runs_for_trend: | |
| Minimum de runs pour calculer une tendance. | |
| min_segment_size: | |
| Taille minimale des segments pour le change-point. | |
| change_point_threshold: | |
| Magnitude absolue minimale du delta (en CER) pour | |
| retenir le change-point. Défaut 0.01 (1 point de CER). | |
| Returns | |
| ------- | |
| dict | None | |
| ``{ | |
| "engine_name", "corpus_name", "n_runs", "trend", | |
| "change_point", # ou None | |
| "first_timestamp", "last_timestamp", | |
| "first_cer", "last_cer", "absolute_delta_pct", | |
| }`` ou ``None`` si moins de ``min_runs_for_trend`` runs. | |
| """ | |
| series: list[tuple[str, float]] = [] | |
| for entry in history_entries: | |
| if hasattr(entry, "as_dict"): | |
| data = entry.as_dict() | |
| else: | |
| data = entry | |
| if data.get("engine_name") != engine_name: | |
| continue | |
| if corpus_name is not None and data.get("corpus_name") != corpus_name: | |
| continue | |
| cer = data.get("cer_mean") | |
| ts = data.get("timestamp") | |
| if cer is None or ts is None: | |
| continue | |
| series.append((ts, float(cer))) | |
| if len(series) < min_runs_for_trend: | |
| return None | |
| series.sort(key=lambda p: _parse_timestamp(p[0]) or 0.0) | |
| trend = compute_linear_trend(series) | |
| cp = detect_change_point(series, min_segment_size=min_segment_size) | |
| if cp is not None and abs(cp.delta) < change_point_threshold: | |
| cp = None | |
| first_ts, first_cer = series[0] | |
| last_ts, last_cer = series[-1] | |
| return { | |
| "engine_name": engine_name, | |
| "corpus_name": corpus_name, | |
| "n_runs": len(series), | |
| "trend": trend.as_dict() if trend else None, | |
| "change_point": cp.as_dict() if cp else None, | |
| "first_timestamp": first_ts, | |
| "last_timestamp": last_ts, | |
| "first_cer": first_cer, | |
| "last_cer": last_cer, | |
| "absolute_delta": last_cer - first_cer, | |
| "absolute_delta_pct": round((last_cer - first_cer) * 100, 2), | |
| } | |
| def compute_corpus_longitudinal( | |
| history_entries: Iterable, | |
| corpus_name: Optional[str] = None, | |
| *, | |
| min_runs_for_trend: int = 3, | |
| min_segment_size: int = 3, | |
| change_point_threshold: float = 0.01, | |
| ) -> list[dict]: | |
| """Pour chaque moteur présent dans l'historique sur ``corpus_name``, | |
| calcule trend + change_point. | |
| Returns | |
| ------- | |
| list[dict] | |
| Une entrée par moteur (filtrée), liste vide si rien. | |
| """ | |
| entries = list(history_entries) | |
| engines: set[str] = set() | |
| for entry in entries: | |
| data = entry.as_dict() if hasattr(entry, "as_dict") else entry | |
| if corpus_name is not None and data.get("corpus_name") != corpus_name: | |
| continue | |
| name = data.get("engine_name") | |
| if name: | |
| engines.add(name) | |
| out: list[dict] = [] | |
| for engine in sorted(engines): | |
| result = compute_engine_longitudinal( | |
| entries, engine, corpus_name=corpus_name, | |
| min_runs_for_trend=min_runs_for_trend, | |
| min_segment_size=min_segment_size, | |
| change_point_threshold=change_point_threshold, | |
| ) | |
| if result is not None: | |
| out.append(result) | |
| return out | |
| __all__ = [ | |
| "LinearTrend", | |
| "ChangePointResult", | |
| "compute_linear_trend", | |
| "detect_change_point", | |
| "compute_engine_longitudinal", | |
| "compute_corpus_longitudinal", | |
| ] | |
| # Marqueur d'évitement d'import inutilisé (math) | |
| _ = math | |