Spaces:
Sleeping
Sleeping
Claude
docs(sprint-H.8): cleanup obsolete legacy/shim language in production docstrings
e407ec0 unverified | """Détecteurs narratifs liés à *l'historique SQLite + multi-runs* (chantier 5). | |
| 3 détecteurs déplacés depuis ``narrative/detectors.py`` : | |
| - :func:`detect_engine_off_baseline` (Sprint 73) | |
| - :func:`detect_engine_unstable` (Sprint 90) | |
| - :func:`detect_regression_in_history` (Sprint 92) | |
| """ | |
| from __future__ import annotations | |
| from picarones.domain.facts import Fact, FactImportance, FactType | |
| from picarones.reports.narrative.registry import register_detector | |
| def detect_engine_off_baseline(benchmark_data: dict) -> list[Fact]: | |
| """Émet un Fact pour chaque moteur dont le CER courant s'écarte | |
| significativement de sa moyenne historique sur le **même corpus**. | |
| Lit ``benchmark_data["baseline_comparisons"]`` (liste de dicts | |
| produits par ``compute_engine_baseline`` du module | |
| ``baseline_comparison`` Sprint 73). Si la clé est absente ou | |
| vide, le détecteur reste silencieux — typiquement le cas quand | |
| aucun historique SQLite n'a été chargé. | |
| Garde-fous : | |
| - Si ``n_runs < 5`` (déjà filtré par ``compute_engine_baseline`` | |
| qui retourne ``None`` dans ce cas). | |
| - Si ``relative_delta`` n'est pas calculable (baseline = 0). | |
| - Importance ``HIGH`` si ``|relative_delta| ≥ 50 %``, sinon | |
| ``MEDIUM``. | |
| """ | |
| comparisons = benchmark_data.get("baseline_comparisons") or [] | |
| if not isinstance(comparisons, (list, tuple)): | |
| return [] | |
| facts: list[Fact] = [] | |
| for comp in comparisons: | |
| if not isinstance(comp, dict): | |
| continue | |
| if not comp.get("off_baseline"): | |
| continue | |
| rel = comp.get("relative_delta") | |
| if rel is None: | |
| continue | |
| engine = comp.get("engine_name") | |
| cer_current = comp.get("cer_current") | |
| cer_hist_mean = comp.get("cer_historical_mean") | |
| n_runs = comp.get("n_runs") | |
| if engine is None or cer_current is None or cer_hist_mean is None: | |
| continue | |
| importance = ( | |
| FactImportance.HIGH if abs(float(rel)) >= 0.50 | |
| else FactImportance.MEDIUM | |
| ) | |
| facts.append(Fact( | |
| type=FactType.ENGINE_OFF_BASELINE, | |
| importance=importance, | |
| payload={ | |
| "engine": engine, | |
| "cer_current_pct": round(float(cer_current) * 100, 2), | |
| "cer_historical_mean_pct": round( | |
| float(cer_hist_mean) * 100, 2, | |
| ), | |
| "n_runs": int(n_runs or 0), | |
| "relative_delta_pct": round(float(rel) * 100, 1), | |
| "direction": "higher" if float(rel) > 0 else "lower", | |
| }, | |
| engines_involved=(engine,), | |
| )) | |
| return facts | |
| def detect_engine_unstable(benchmark_data: dict) -> list[Fact]: | |
| """Émet un Fact pour chaque moteur dont la stabilité multi-runs | |
| est insuffisante (Sprint 83 + 90). | |
| Lit ``benchmark_data["multirun_stability"]`` : liste de dicts | |
| avec ``engine_name`` + champs de ``compute_multirun_stability`` | |
| (cer_cv, identical_run_rate, n_runs, etc.). Si la clé est | |
| absente ou vide, le détecteur reste silencieux — typiquement | |
| le cas quand l'utilisateur n'a pas exécuté `--repeats N`. | |
| Garde-fous : | |
| - ``n_runs ≥ 2`` (déjà filtré par | |
| ``compute_multirun_stability`` qui retourne ``None``). | |
| - Déclenche si ``cer_cv > 0.10`` (variance relative > 10 % du | |
| CER moyen) **ou** ``identical_run_rate < 0.50`` (moins | |
| d'une paire de runs sur deux est identique). | |
| - Importance ``HIGH`` (l'instabilité discrédite les | |
| conclusions). | |
| """ | |
| stabilities = benchmark_data.get("multirun_stability") or [] | |
| if not isinstance(stabilities, (list, tuple)): | |
| return [] | |
| facts: list[Fact] = [] | |
| for stab in stabilities: | |
| if not isinstance(stab, dict): | |
| continue | |
| engine = stab.get("engine_name") or stab.get("engine") | |
| if not engine: | |
| continue | |
| n_runs = stab.get("n_runs") | |
| if not isinstance(n_runs, int) or n_runs < 2: | |
| continue | |
| cer_cv = stab.get("cer_cv") | |
| identical_rate = stab.get("identical_run_rate") | |
| # Critères de déclenchement | |
| cv_high = ( | |
| isinstance(cer_cv, (int, float)) and float(cer_cv) > 0.10 | |
| ) | |
| runs_diverge = ( | |
| isinstance(identical_rate, (int, float)) | |
| and float(identical_rate) < 0.50 | |
| ) | |
| if not (cv_high or runs_diverge): | |
| continue | |
| payload: dict = { | |
| "engine": engine, | |
| "n_runs": int(n_runs), | |
| } | |
| if isinstance(cer_cv, (int, float)): | |
| payload["cer_cv"] = float(cer_cv) | |
| payload["cer_cv_pct"] = round(float(cer_cv) * 100, 1) | |
| if isinstance(identical_rate, (int, float)): | |
| payload["identical_run_rate"] = float(identical_rate) | |
| payload["identical_run_rate_pct"] = round( | |
| float(identical_rate) * 100, 1, | |
| ) | |
| # Champs additionnels pour la phrase de synthèse | |
| cer_mean = stab.get("cer_mean") | |
| cer_stdev = stab.get("cer_stdev") | |
| if isinstance(cer_mean, (int, float)): | |
| payload["cer_mean_pct"] = round(float(cer_mean) * 100, 2) | |
| if isinstance(cer_stdev, (int, float)): | |
| payload["cer_stdev_pct"] = round(float(cer_stdev) * 100, 2) | |
| n_distinct = stab.get("n_distinct_outputs") | |
| if isinstance(n_distinct, int): | |
| payload["n_distinct_outputs"] = int(n_distinct) | |
| facts.append(Fact( | |
| type=FactType.ENGINE_UNSTABLE, | |
| importance=FactImportance.HIGH, | |
| payload=payload, | |
| engines_involved=(engine,), | |
| )) | |
| return facts | |
| def detect_regression_in_history(benchmark_data: dict) -> list[Fact]: | |
| """Émet un Fact pour chaque moteur dont l'historique montre | |
| une dégradation : pente positive significative ou rupture | |
| brutale (Sprint 92). | |
| Lit ``benchmark_data["longitudinal_trends"]`` : liste de | |
| dicts produits par ``compute_corpus_longitudinal`` du module | |
| ``longitudinal``. Si la clé est absente ou vide, le | |
| détecteur reste silencieux — typiquement le cas quand | |
| aucun historique n'a été chargé ou que la série est trop | |
| courte. | |
| Garde-fous : | |
| - ``n_runs ≥ 3`` (déjà filtré par | |
| ``compute_engine_longitudinal``). | |
| - Déclenche si **soit** ``trend.slope`` traduit une | |
| régression d'au moins ``slope_threshold`` (en CER/jour, | |
| défaut équivalent à +1 point CER sur 365 jours), **soit** | |
| ``change_point.delta > change_threshold`` (défaut | |
| 0.01 = +1 point de CER d'un segment à l'autre). | |
| - Importance ``HIGH`` si la dégradation cumulée | |
| ``absolute_delta`` ≥ 5 points de CER. | |
| """ | |
| trends = benchmark_data.get("longitudinal_trends") or [] | |
| if not isinstance(trends, (list, tuple)): | |
| return [] | |
| slope_threshold = ( | |
| 0.01 / 365.0 # +1 point de CER sur 365 jours minimum | |
| ) | |
| change_threshold = 0.01 | |
| facts: list[Fact] = [] | |
| for entry in trends: | |
| if not isinstance(entry, dict): | |
| continue | |
| engine = entry.get("engine_name") | |
| if not engine: | |
| continue | |
| n_runs = entry.get("n_runs") | |
| if not isinstance(n_runs, int) or n_runs < 3: | |
| continue | |
| trend = entry.get("trend") or {} | |
| cp = entry.get("change_point") | |
| slope = trend.get("slope") | |
| slope_high = ( | |
| isinstance(slope, (int, float)) | |
| and float(slope) > slope_threshold | |
| ) | |
| cp_high = ( | |
| isinstance(cp, dict) | |
| and isinstance(cp.get("delta"), (int, float)) | |
| and float(cp["delta"]) > change_threshold | |
| ) | |
| if not (slope_high or cp_high): | |
| continue | |
| absolute_delta = entry.get("absolute_delta") or 0.0 | |
| importance = ( | |
| FactImportance.HIGH | |
| if isinstance(absolute_delta, (int, float)) | |
| and abs(float(absolute_delta)) >= 0.05 | |
| else FactImportance.MEDIUM | |
| ) | |
| payload: dict = { | |
| "engine": engine, | |
| "n_runs": int(n_runs), | |
| "absolute_delta_pct": round( | |
| float(absolute_delta) * 100, 2, | |
| ) if isinstance(absolute_delta, (int, float)) else 0.0, | |
| "first_cer_pct": round( | |
| float(entry.get("first_cer") or 0.0) * 100, 2, | |
| ), | |
| "last_cer_pct": round( | |
| float(entry.get("last_cer") or 0.0) * 100, 2, | |
| ), | |
| } | |
| if slope_high: | |
| payload["slope_per_year_pct"] = round( | |
| float(slope) * 365 * 100, 2, | |
| ) | |
| payload["r_squared"] = round( | |
| float(trend.get("r_squared") or 0.0), 3, | |
| ) | |
| payload["pattern"] = "trend" | |
| if cp_high: | |
| payload["change_point_timestamp"] = str( | |
| cp.get("timestamp") or "?", | |
| ) | |
| payload["change_delta_pct"] = round( | |
| float(cp["delta"]) * 100, 2, | |
| ) | |
| payload["mean_before_pct"] = round( | |
| float(cp.get("mean_before") or 0.0) * 100, 2, | |
| ) | |
| payload["mean_after_pct"] = round( | |
| float(cp.get("mean_after") or 0.0) * 100, 2, | |
| ) | |
| # Si on a aussi une rupture, le pattern domine | |
| payload["pattern"] = ( | |
| "trend_and_change_point" if slope_high else "change_point" | |
| ) | |
| facts.append(Fact( | |
| type=FactType.REGRESSION_IN_HISTORY, | |
| importance=importance, | |
| payload=payload, | |
| engines_involved=(engine,), | |
| )) | |
| return facts | |
| # --------------------------------------------------------------------------- | |
| # Sprint A3 (item B-3) — détecteur IMPORTER_FALLBACK_TRIGGERED | |
| # --------------------------------------------------------------------------- | |
| def detect_importer_fallback(benchmark_data: dict) -> list[Fact]: | |
| """Émet un Fact par incident d'importer en mode dégradé. | |
| Lit ``benchmark_data["importer_fallbacks"]`` (liste de dicts | |
| produite par ``picarones.adapters.corpus._fallback_log.consume_fallback_log()``). | |
| Si la clé est absente ou vide, le détecteur reste silencieux — | |
| typiquement le cas pour un benchmark qui n'utilise pas d'importer | |
| distant (corpus local). | |
| Importance HIGH si **plusieurs incidents** sur le même importer | |
| (signal d'une indisponibilité prolongée plutôt qu'un échec | |
| isolé) ; MEDIUM sinon. | |
| """ | |
| fallbacks = benchmark_data.get("importer_fallbacks") or [] | |
| if not fallbacks: | |
| return [] | |
| # Compte par importer pour détecter les incidents répétés. | |
| counts: dict[str, int] = {} | |
| for entry in fallbacks: | |
| if isinstance(entry, dict): | |
| counts[str(entry.get("importer", "unknown"))] = ( | |
| counts.get(str(entry.get("importer", "unknown")), 0) + 1 | |
| ) | |
| facts: list[Fact] = [] | |
| for entry in fallbacks: | |
| if not isinstance(entry, dict): | |
| continue | |
| importer = str(entry.get("importer", "unknown")) | |
| operation = str(entry.get("operation", "unknown")) | |
| importance = ( | |
| FactImportance.HIGH if counts.get(importer, 0) >= 2 else FactImportance.MEDIUM | |
| ) | |
| payload: dict = { | |
| "importer": importer, | |
| "operation": operation, | |
| "incidents_for_importer": counts.get(importer, 1), | |
| } | |
| if entry.get("error"): | |
| payload["error_repr"] = str(entry["error"]) | |
| facts.append(Fact( | |
| type=FactType.IMPORTER_FALLBACK_TRIGGERED, | |
| importance=importance, | |
| payload=payload, | |
| engines_involved=(), | |
| )) | |
| return facts | |