Spaces:
Running
Running
Claude
refactor(core): faire de core/ un cercle 1 strict, déplacer cercle 2 vers measurements/
979f3c3 unverified | """Suivi longitudinal des benchmarks — base SQLite optionnelle. | |
| Fonctionnement | |
| -------------- | |
| - Chaque run de benchmark est enregistré dans une table SQLite avec horodatage, | |
| corpus, moteurs, métriques agrégées. | |
| - L'historique permet de tracer des courbes d'évolution du CER dans le temps. | |
| - La détection de régression compare le dernier run à une baseline configurable. | |
| Structure de la base | |
| -------------------- | |
| Table ``runs`` : | |
| run_id TEXT PRIMARY KEY — UUID ou hash du run | |
| timestamp TEXT — ISO 8601 | |
| corpus_name TEXT | |
| engine_name TEXT | |
| cer_mean REAL | |
| wer_mean REAL | |
| doc_count INTEGER | |
| metadata TEXT — JSON | |
| Usage | |
| ----- | |
| from picarones.measurements.history import BenchmarkHistory | |
| history = BenchmarkHistory("~/.picarones/history.db") | |
| history.record(benchmark_result) | |
| df = history.query(engine="tesseract", corpus="chroniques") | |
| regression = history.detect_regression(engine="tesseract", threshold=0.02) | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import sqlite3 | |
| import uuid | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import TYPE_CHECKING, Optional | |
| if TYPE_CHECKING: | |
| from picarones.core.results import BenchmarkResult | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Structures de données | |
| # --------------------------------------------------------------------------- | |
| class HistoryEntry: | |
| """Un enregistrement dans l'historique des benchmarks.""" | |
| run_id: str | |
| timestamp: str | |
| corpus_name: str | |
| engine_name: str | |
| cer_mean: Optional[float] | |
| wer_mean: Optional[float] | |
| doc_count: int | |
| metadata: dict = field(default_factory=dict) | |
| def cer_percent(self) -> Optional[float]: | |
| return self.cer_mean * 100 if self.cer_mean is not None else None | |
| def as_dict(self) -> dict: | |
| return { | |
| "run_id": self.run_id, | |
| "timestamp": self.timestamp, | |
| "corpus_name": self.corpus_name, | |
| "engine_name": self.engine_name, | |
| "cer_mean": self.cer_mean, | |
| "wer_mean": self.wer_mean, | |
| "doc_count": self.doc_count, | |
| "metadata": self.metadata, | |
| } | |
| class RegressionResult: | |
| """Résultat d'une détection de régression.""" | |
| engine_name: str | |
| corpus_name: str | |
| baseline_run_id: str | |
| baseline_timestamp: str | |
| baseline_cer: Optional[float] | |
| current_run_id: str | |
| current_timestamp: str | |
| current_cer: Optional[float] | |
| delta_cer: Optional[float] | |
| """Delta CER (current - baseline). Positif = régression.""" | |
| is_regression: bool | |
| threshold: float | |
| def as_dict(self) -> dict: | |
| return { | |
| "engine_name": self.engine_name, | |
| "corpus_name": self.corpus_name, | |
| "baseline_run_id": self.baseline_run_id, | |
| "baseline_timestamp": self.baseline_timestamp, | |
| "baseline_cer": self.baseline_cer, | |
| "current_run_id": self.current_run_id, | |
| "current_timestamp": self.current_timestamp, | |
| "current_cer": self.current_cer, | |
| "delta_cer": self.delta_cer, | |
| "is_regression": self.is_regression, | |
| "threshold": self.threshold, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # BenchmarkHistory | |
| # --------------------------------------------------------------------------- | |
| class BenchmarkHistory: | |
| """Gestionnaire de l'historique des benchmarks dans SQLite. | |
| Parameters | |
| ---------- | |
| db_path: | |
| Chemin vers le fichier SQLite. Utiliser ``":memory:"`` pour les tests. | |
| Examples | |
| -------- | |
| >>> history = BenchmarkHistory("~/.picarones/history.db") | |
| >>> history.record(benchmark) | |
| >>> entries = history.query(engine="tesseract") | |
| >>> for e in entries: | |
| ... print(e.timestamp, f"CER={e.cer_percent:.2f}%") | |
| """ | |
| _CREATE_TABLE = """ | |
| CREATE TABLE IF NOT EXISTS runs ( | |
| run_id TEXT PRIMARY KEY, | |
| timestamp TEXT NOT NULL, | |
| corpus_name TEXT NOT NULL, | |
| engine_name TEXT NOT NULL, | |
| cer_mean REAL, | |
| wer_mean REAL, | |
| doc_count INTEGER, | |
| metadata TEXT | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_engine ON runs (engine_name); | |
| CREATE INDEX IF NOT EXISTS idx_corpus ON runs (corpus_name); | |
| CREATE INDEX IF NOT EXISTS idx_timestamp ON runs (timestamp); | |
| """ | |
| def __init__(self, db_path: str = "~/.picarones/history.db") -> None: | |
| if db_path != ":memory:": | |
| path = Path(db_path).expanduser() | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| self.db_path = str(path) | |
| else: | |
| self.db_path = ":memory:" | |
| self._conn: Optional[sqlite3.Connection] = None | |
| self._init_db() | |
| def _connect(self) -> sqlite3.Connection: | |
| if self._conn is None: | |
| self._conn = sqlite3.connect(self.db_path) | |
| self._conn.row_factory = sqlite3.Row | |
| return self._conn | |
| def _init_db(self) -> None: | |
| conn = self._connect() | |
| conn.executescript(self._CREATE_TABLE) | |
| conn.commit() | |
| def close(self) -> None: | |
| """Ferme la connexion SQLite.""" | |
| if self._conn: | |
| self._conn.close() | |
| self._conn = None | |
| # ------------------------------------------------------------------ | |
| # Enregistrement | |
| # ------------------------------------------------------------------ | |
| def record( | |
| self, | |
| benchmark_result: "BenchmarkResult", | |
| run_id: Optional[str] = None, | |
| extra_metadata: Optional[dict] = None, | |
| ) -> str: | |
| """Enregistre les résultats d'un benchmark dans l'historique. | |
| Parameters | |
| ---------- | |
| benchmark_result: | |
| Résultats à enregistrer (``BenchmarkResult``). | |
| run_id: | |
| Identifiant du run (auto-généré si None). | |
| extra_metadata: | |
| Métadonnées supplémentaires à stocker. | |
| Returns | |
| ------- | |
| str | |
| L'identifiant du run enregistré. | |
| """ | |
| if run_id is None: | |
| run_id = str(uuid.uuid4()) | |
| timestamp = datetime.now(timezone.utc).isoformat() | |
| conn = self._connect() | |
| for report in benchmark_result.engine_reports: | |
| ranking = benchmark_result.ranking() | |
| engine_entry = next( | |
| (r for r in ranking if r["engine"] == report.engine_name), | |
| None, | |
| ) | |
| cer_mean = engine_entry["mean_cer"] if engine_entry else None | |
| wer_mean = engine_entry["mean_wer"] if engine_entry else None | |
| meta = { | |
| "engine_version": report.engine_version, | |
| "engine_config": report.engine_config, | |
| "picarones_version": benchmark_result.metadata.get("picarones_version", ""), | |
| **(extra_metadata or {}), | |
| } | |
| conn.execute( | |
| """ | |
| INSERT OR REPLACE INTO runs | |
| (run_id, timestamp, corpus_name, engine_name, | |
| cer_mean, wer_mean, doc_count, metadata) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| """, | |
| ( | |
| f"{run_id}_{report.engine_name}", | |
| timestamp, | |
| benchmark_result.corpus_name, | |
| report.engine_name, | |
| cer_mean, | |
| wer_mean, | |
| benchmark_result.document_count, | |
| json.dumps(meta, ensure_ascii=False), | |
| ), | |
| ) | |
| conn.commit() | |
| logger.info("Benchmark enregistré dans l'historique : run_id=%s", run_id) | |
| return run_id | |
| def record_single( | |
| self, | |
| run_id: str, | |
| corpus_name: str, | |
| engine_name: str, | |
| cer_mean: Optional[float], | |
| wer_mean: Optional[float], | |
| doc_count: int, | |
| timestamp: Optional[str] = None, | |
| metadata: Optional[dict] = None, | |
| ) -> str: | |
| """Enregistre manuellement une entrée dans l'historique. | |
| Utile pour les tests, les imports de données externes, ou pour | |
| enregistrer des résultats calculés en dehors de Picarones. | |
| Returns | |
| ------- | |
| str | |
| run_id enregistré. | |
| """ | |
| if timestamp is None: | |
| timestamp = datetime.now(timezone.utc).isoformat() | |
| conn = self._connect() | |
| conn.execute( | |
| """ | |
| INSERT OR REPLACE INTO runs | |
| (run_id, timestamp, corpus_name, engine_name, | |
| cer_mean, wer_mean, doc_count, metadata) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| """, | |
| ( | |
| run_id, | |
| timestamp, | |
| corpus_name, | |
| engine_name, | |
| cer_mean, | |
| wer_mean, | |
| doc_count, | |
| json.dumps(metadata or {}, ensure_ascii=False), | |
| ), | |
| ) | |
| conn.commit() | |
| return run_id | |
| # ------------------------------------------------------------------ | |
| # Requêtes | |
| # ------------------------------------------------------------------ | |
| def query( | |
| self, | |
| engine: Optional[str] = None, | |
| corpus: Optional[str] = None, | |
| since: Optional[str] = None, | |
| limit: int = 100, | |
| ) -> list[HistoryEntry]: | |
| """Retourne l'historique des runs, avec filtres optionnels. | |
| Parameters | |
| ---------- | |
| engine: | |
| Filtre sur le nom du moteur. | |
| corpus: | |
| Filtre sur le nom du corpus. | |
| since: | |
| Date ISO 8601 minimale (``"2025-01-01"``). | |
| limit: | |
| Nombre maximum d'entrées retournées. | |
| Returns | |
| ------- | |
| list[HistoryEntry] | |
| Entrées triées par timestamp croissant. | |
| """ | |
| clauses: list[str] = [] | |
| params: list = [] | |
| if engine: | |
| clauses.append("engine_name = ?") | |
| params.append(engine) | |
| if corpus: | |
| clauses.append("corpus_name = ?") | |
| params.append(corpus) | |
| if since: | |
| clauses.append("timestamp >= ?") | |
| params.append(since) | |
| where = f"WHERE {' AND '.join(clauses)}" if clauses else "" | |
| params.append(limit) | |
| conn = self._connect() | |
| rows = conn.execute( | |
| f"SELECT * FROM runs {where} ORDER BY timestamp ASC LIMIT ?", | |
| params, | |
| ).fetchall() | |
| return [ | |
| HistoryEntry( | |
| run_id=row["run_id"], | |
| timestamp=row["timestamp"], | |
| corpus_name=row["corpus_name"], | |
| engine_name=row["engine_name"], | |
| cer_mean=row["cer_mean"], | |
| wer_mean=row["wer_mean"], | |
| doc_count=row["doc_count"], | |
| metadata=json.loads(row["metadata"] or "{}"), | |
| ) | |
| for row in rows | |
| ] | |
| def list_engines(self) -> list[str]: | |
| """Retourne la liste des moteurs présents dans l'historique.""" | |
| conn = self._connect() | |
| rows = conn.execute( | |
| "SELECT DISTINCT engine_name FROM runs ORDER BY engine_name" | |
| ).fetchall() | |
| return [row[0] for row in rows] | |
| def list_corpora(self) -> list[str]: | |
| """Retourne la liste des corpus présents dans l'historique.""" | |
| conn = self._connect() | |
| rows = conn.execute( | |
| "SELECT DISTINCT corpus_name FROM runs ORDER BY corpus_name" | |
| ).fetchall() | |
| return [row[0] for row in rows] | |
| def count(self) -> int: | |
| """Nombre total d'entrées dans l'historique.""" | |
| conn = self._connect() | |
| return conn.execute("SELECT COUNT(*) FROM runs").fetchone()[0] | |
| # ------------------------------------------------------------------ | |
| # Courbes d'évolution | |
| # ------------------------------------------------------------------ | |
| def get_cer_curve( | |
| self, | |
| engine: str, | |
| corpus: Optional[str] = None, | |
| ) -> list[dict]: | |
| """Retourne les données pour tracer la courbe d'évolution du CER. | |
| Parameters | |
| ---------- | |
| engine: | |
| Nom du moteur. | |
| corpus: | |
| Corpus spécifique (None = tous les corpus pour ce moteur). | |
| Returns | |
| ------- | |
| list[dict] | |
| Chaque dict contient ``{"timestamp": str, "cer": float, "run_id": str}``. | |
| """ | |
| entries = self.query(engine=engine, corpus=corpus, limit=1000) | |
| return [ | |
| { | |
| "timestamp": e.timestamp, | |
| "cer": e.cer_mean, | |
| "cer_percent": e.cer_percent, | |
| "run_id": e.run_id, | |
| "corpus_name": e.corpus_name, | |
| } | |
| for e in entries | |
| if e.cer_mean is not None | |
| ] | |
| # ------------------------------------------------------------------ | |
| # Détection de régression | |
| # ------------------------------------------------------------------ | |
| def detect_regression( | |
| self, | |
| engine: str, | |
| corpus: Optional[str] = None, | |
| threshold: float = 0.01, | |
| baseline_run_id: Optional[str] = None, | |
| ) -> Optional[RegressionResult]: | |
| """Détecte une régression du CER entre deux runs. | |
| Compare le run le plus récent à une baseline (le run précédent ou | |
| un run spécifique). | |
| Parameters | |
| ---------- | |
| engine: | |
| Nom du moteur à surveiller. | |
| corpus: | |
| Corpus spécifique (None = tous). | |
| threshold: | |
| Seuil de régression en points absolus de CER (ex : 0.01 = 1%). | |
| Si delta_cer > threshold → régression détectée. | |
| baseline_run_id: | |
| run_id de référence. Si None, utilise l'avant-dernier run. | |
| Returns | |
| ------- | |
| RegressionResult | None | |
| None si moins de 2 runs disponibles. | |
| """ | |
| entries = self.query(engine=engine, corpus=corpus, limit=1000) | |
| if len(entries) < 2: | |
| logger.info("Pas assez de runs pour détecter une régression (moteur=%s)", engine) | |
| return None | |
| current = entries[-1] | |
| if baseline_run_id: | |
| baseline_list = [e for e in entries[:-1] if e.run_id == baseline_run_id] | |
| baseline = baseline_list[0] if baseline_list else entries[-2] | |
| else: | |
| baseline = entries[-2] | |
| delta = None | |
| is_regression = False | |
| if current.cer_mean is not None and baseline.cer_mean is not None: | |
| delta = current.cer_mean - baseline.cer_mean | |
| is_regression = delta > threshold | |
| return RegressionResult( | |
| engine_name=engine, | |
| corpus_name=corpus or "tous", | |
| baseline_run_id=baseline.run_id, | |
| baseline_timestamp=baseline.timestamp, | |
| baseline_cer=baseline.cer_mean, | |
| current_run_id=current.run_id, | |
| current_timestamp=current.timestamp, | |
| current_cer=current.cer_mean, | |
| delta_cer=delta, | |
| is_regression=is_regression, | |
| threshold=threshold, | |
| ) | |
| def detect_all_regressions( | |
| self, | |
| threshold: float = 0.01, | |
| ) -> list[RegressionResult]: | |
| """Détecte les régressions pour tous les moteurs et corpus connus. | |
| Parameters | |
| ---------- | |
| threshold: | |
| Seuil de régression. | |
| Returns | |
| ------- | |
| list[RegressionResult] | |
| Uniquement les moteurs où une régression est détectée. | |
| """ | |
| results: list[RegressionResult] = [] | |
| engines = self.list_engines() | |
| corpora = self.list_corpora() | |
| for engine in engines: | |
| for corpus in corpora: | |
| result = self.detect_regression(engine, corpus, threshold) | |
| if result and result.is_regression: | |
| results.append(result) | |
| return results | |
| # ------------------------------------------------------------------ | |
| # Export | |
| # ------------------------------------------------------------------ | |
| def export_json(self, output_path: str) -> Path: | |
| """Exporte l'historique complet en JSON. | |
| Parameters | |
| ---------- | |
| output_path: | |
| Chemin du fichier JSON de sortie. | |
| Returns | |
| ------- | |
| Path | |
| Chemin vers le fichier créé. | |
| """ | |
| entries = self.query(limit=100_000) | |
| path = Path(output_path) | |
| data = { | |
| "picarones_history": True, | |
| "exported_at": datetime.now(timezone.utc).isoformat(), | |
| "total_runs": len(entries), | |
| "engines": self.list_engines(), | |
| "corpora": self.list_corpora(), | |
| "runs": [e.as_dict() for e in entries], | |
| } | |
| path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") | |
| return path | |
| def __repr__(self) -> str: | |
| return f"BenchmarkHistory(db='{self.db_path}', runs={self.count()})" | |
| # --------------------------------------------------------------------------- | |
| # Données de démonstration longitudinale | |
| # --------------------------------------------------------------------------- | |
| def generate_demo_history( | |
| db: BenchmarkHistory, | |
| n_runs: int = 8, | |
| seed: int = 42, | |
| ) -> None: | |
| """Insère des données fictives de suivi longitudinal pour la démo. | |
| Simule l'amélioration progressive d'un modèle tesseract sur 8 runs, | |
| avec une légère régression au run 5. | |
| Parameters | |
| ---------- | |
| db: | |
| Base d'historique à remplir. | |
| n_runs: | |
| Nombre de runs à générer. | |
| seed: | |
| Graine aléatoire. | |
| """ | |
| import random | |
| rng = random.Random(seed) | |
| engines = ["tesseract", "pero_ocr", "ancien_moteur"] | |
| corpus = "Chroniques médiévales" | |
| # Trajectoires de CER simulées (amélioration progressive + bruit) | |
| base_cers = { | |
| "tesseract": 0.15, | |
| "pero_ocr": 0.09, | |
| "ancien_moteur": 0.28, | |
| } | |
| improvements = { | |
| "tesseract": -0.008, # améliore de ~0.8% par run | |
| "pero_ocr": -0.005, # améliore de ~0.5% par run | |
| "ancien_moteur": -0.003, | |
| } | |
| from datetime import timedelta | |
| base_date = datetime(2024, 9, 1, tzinfo=timezone.utc) | |
| for run_idx in range(n_runs): | |
| run_date = base_date + timedelta(weeks=run_idx * 2) | |
| run_id = f"demo_run_{run_idx + 1:02d}" | |
| for engine in engines: | |
| cer = base_cers[engine] + improvements[engine] * run_idx | |
| # Ajouter du bruit + régression au run 5 | |
| noise = rng.gauss(0, 0.005) | |
| if run_idx == 4 and engine == "tesseract": | |
| noise += 0.02 # régression simulée | |
| cer = max(0.01, min(0.5, cer + noise)) | |
| wer = cer * 1.8 + rng.gauss(0, 0.01) | |
| wer = max(0.01, min(0.9, wer)) | |
| db.record_single( | |
| run_id=f"{run_id}_{engine}", | |
| corpus_name=corpus, | |
| engine_name=engine, | |
| cer_mean=round(cer, 4), | |
| wer_mean=round(wer, 4), | |
| doc_count=12, | |
| timestamp=run_date.isoformat(), | |
| metadata={ | |
| "note": f"Run de démonstration #{run_idx + 1}", | |
| "engine_version": f"5.{run_idx}.0" if engine == "tesseract" else "0.7.2", | |
| }, | |
| ) | |