Claude
refactor(core): faire de core/ un cercle 1 strict, déplacer cercle 2 vers measurements/
979f3c3 unverified
Raw
History Blame
19.7 kB
"""Suivi longitudinal des benchmarks — base SQLite optionnelle.
Fonctionnement
--------------
- Chaque run de benchmark est enregistré dans une table SQLite avec horodatage,
corpus, moteurs, métriques agrégées.
- L'historique permet de tracer des courbes d'évolution du CER dans le temps.
- La détection de régression compare le dernier run à une baseline configurable.
Structure de la base
--------------------
Table ``runs`` :
run_id TEXT PRIMARY KEY — UUID ou hash du run
timestamp TEXT — ISO 8601
corpus_name TEXT
engine_name TEXT
cer_mean REAL
wer_mean REAL
doc_count INTEGER
metadata TEXT — JSON
Usage
-----
>>> from picarones.measurements.history import BenchmarkHistory
>>> history = BenchmarkHistory("~/.picarones/history.db")
>>> history.record(benchmark_result)
>>> df = history.query(engine="tesseract", corpus="chroniques")
>>> regression = history.detect_regression(engine="tesseract", threshold=0.02)
"""
from __future__ import annotations
import json
import logging
import sqlite3
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from picarones.core.results import BenchmarkResult
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Structures de données
# ---------------------------------------------------------------------------
@dataclass
class HistoryEntry:
"""Un enregistrement dans l'historique des benchmarks."""
run_id: str
timestamp: str
corpus_name: str
engine_name: str
cer_mean: Optional[float]
wer_mean: Optional[float]
doc_count: int
metadata: dict = field(default_factory=dict)
@property
def cer_percent(self) -> Optional[float]:
return self.cer_mean * 100 if self.cer_mean is not None else None
def as_dict(self) -> dict:
return {
"run_id": self.run_id,
"timestamp": self.timestamp,
"corpus_name": self.corpus_name,
"engine_name": self.engine_name,
"cer_mean": self.cer_mean,
"wer_mean": self.wer_mean,
"doc_count": self.doc_count,
"metadata": self.metadata,
}
@dataclass
class RegressionResult:
"""Résultat d'une détection de régression."""
engine_name: str
corpus_name: str
baseline_run_id: str
baseline_timestamp: str
baseline_cer: Optional[float]
current_run_id: str
current_timestamp: str
current_cer: Optional[float]
delta_cer: Optional[float]
"""Delta CER (current - baseline). Positif = régression."""
is_regression: bool
threshold: float
def as_dict(self) -> dict:
return {
"engine_name": self.engine_name,
"corpus_name": self.corpus_name,
"baseline_run_id": self.baseline_run_id,
"baseline_timestamp": self.baseline_timestamp,
"baseline_cer": self.baseline_cer,
"current_run_id": self.current_run_id,
"current_timestamp": self.current_timestamp,
"current_cer": self.current_cer,
"delta_cer": self.delta_cer,
"is_regression": self.is_regression,
"threshold": self.threshold,
}
# ---------------------------------------------------------------------------
# BenchmarkHistory
# ---------------------------------------------------------------------------
class BenchmarkHistory:
"""Gestionnaire de l'historique des benchmarks dans SQLite.
Parameters
----------
db_path:
Chemin vers le fichier SQLite. Utiliser ``":memory:"`` pour les tests.
Examples
--------
>>> history = BenchmarkHistory("~/.picarones/history.db")
>>> history.record(benchmark)
>>> entries = history.query(engine="tesseract")
>>> for e in entries:
... print(e.timestamp, f"CER={e.cer_percent:.2f}%")
"""
_CREATE_TABLE = """
CREATE TABLE IF NOT EXISTS runs (
run_id TEXT PRIMARY KEY,
timestamp TEXT NOT NULL,
corpus_name TEXT NOT NULL,
engine_name TEXT NOT NULL,
cer_mean REAL,
wer_mean REAL,
doc_count INTEGER,
metadata TEXT
);
CREATE INDEX IF NOT EXISTS idx_engine ON runs (engine_name);
CREATE INDEX IF NOT EXISTS idx_corpus ON runs (corpus_name);
CREATE INDEX IF NOT EXISTS idx_timestamp ON runs (timestamp);
"""
def __init__(self, db_path: str = "~/.picarones/history.db") -> None:
if db_path != ":memory:":
path = Path(db_path).expanduser()
path.parent.mkdir(parents=True, exist_ok=True)
self.db_path = str(path)
else:
self.db_path = ":memory:"
self._conn: Optional[sqlite3.Connection] = None
self._init_db()
def _connect(self) -> sqlite3.Connection:
if self._conn is None:
self._conn = sqlite3.connect(self.db_path)
self._conn.row_factory = sqlite3.Row
return self._conn
def _init_db(self) -> None:
conn = self._connect()
conn.executescript(self._CREATE_TABLE)
conn.commit()
def close(self) -> None:
"""Ferme la connexion SQLite."""
if self._conn:
self._conn.close()
self._conn = None
# ------------------------------------------------------------------
# Enregistrement
# ------------------------------------------------------------------
def record(
self,
benchmark_result: "BenchmarkResult",
run_id: Optional[str] = None,
extra_metadata: Optional[dict] = None,
) -> str:
"""Enregistre les résultats d'un benchmark dans l'historique.
Parameters
----------
benchmark_result:
Résultats à enregistrer (``BenchmarkResult``).
run_id:
Identifiant du run (auto-généré si None).
extra_metadata:
Métadonnées supplémentaires à stocker.
Returns
-------
str
L'identifiant du run enregistré.
"""
if run_id is None:
run_id = str(uuid.uuid4())
timestamp = datetime.now(timezone.utc).isoformat()
conn = self._connect()
for report in benchmark_result.engine_reports:
ranking = benchmark_result.ranking()
engine_entry = next(
(r for r in ranking if r["engine"] == report.engine_name),
None,
)
cer_mean = engine_entry["mean_cer"] if engine_entry else None
wer_mean = engine_entry["mean_wer"] if engine_entry else None
meta = {
"engine_version": report.engine_version,
"engine_config": report.engine_config,
"picarones_version": benchmark_result.metadata.get("picarones_version", ""),
**(extra_metadata or {}),
}
conn.execute(
"""
INSERT OR REPLACE INTO runs
(run_id, timestamp, corpus_name, engine_name,
cer_mean, wer_mean, doc_count, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
f"{run_id}_{report.engine_name}",
timestamp,
benchmark_result.corpus_name,
report.engine_name,
cer_mean,
wer_mean,
benchmark_result.document_count,
json.dumps(meta, ensure_ascii=False),
),
)
conn.commit()
logger.info("Benchmark enregistré dans l'historique : run_id=%s", run_id)
return run_id
def record_single(
self,
run_id: str,
corpus_name: str,
engine_name: str,
cer_mean: Optional[float],
wer_mean: Optional[float],
doc_count: int,
timestamp: Optional[str] = None,
metadata: Optional[dict] = None,
) -> str:
"""Enregistre manuellement une entrée dans l'historique.
Utile pour les tests, les imports de données externes, ou pour
enregistrer des résultats calculés en dehors de Picarones.
Returns
-------
str
run_id enregistré.
"""
if timestamp is None:
timestamp = datetime.now(timezone.utc).isoformat()
conn = self._connect()
conn.execute(
"""
INSERT OR REPLACE INTO runs
(run_id, timestamp, corpus_name, engine_name,
cer_mean, wer_mean, doc_count, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
run_id,
timestamp,
corpus_name,
engine_name,
cer_mean,
wer_mean,
doc_count,
json.dumps(metadata or {}, ensure_ascii=False),
),
)
conn.commit()
return run_id
# ------------------------------------------------------------------
# Requêtes
# ------------------------------------------------------------------
def query(
self,
engine: Optional[str] = None,
corpus: Optional[str] = None,
since: Optional[str] = None,
limit: int = 100,
) -> list[HistoryEntry]:
"""Retourne l'historique des runs, avec filtres optionnels.
Parameters
----------
engine:
Filtre sur le nom du moteur.
corpus:
Filtre sur le nom du corpus.
since:
Date ISO 8601 minimale (``"2025-01-01"``).
limit:
Nombre maximum d'entrées retournées.
Returns
-------
list[HistoryEntry]
Entrées triées par timestamp croissant.
"""
clauses: list[str] = []
params: list = []
if engine:
clauses.append("engine_name = ?")
params.append(engine)
if corpus:
clauses.append("corpus_name = ?")
params.append(corpus)
if since:
clauses.append("timestamp >= ?")
params.append(since)
where = f"WHERE {' AND '.join(clauses)}" if clauses else ""
params.append(limit)
conn = self._connect()
rows = conn.execute(
f"SELECT * FROM runs {where} ORDER BY timestamp ASC LIMIT ?",
params,
).fetchall()
return [
HistoryEntry(
run_id=row["run_id"],
timestamp=row["timestamp"],
corpus_name=row["corpus_name"],
engine_name=row["engine_name"],
cer_mean=row["cer_mean"],
wer_mean=row["wer_mean"],
doc_count=row["doc_count"],
metadata=json.loads(row["metadata"] or "{}"),
)
for row in rows
]
def list_engines(self) -> list[str]:
"""Retourne la liste des moteurs présents dans l'historique."""
conn = self._connect()
rows = conn.execute(
"SELECT DISTINCT engine_name FROM runs ORDER BY engine_name"
).fetchall()
return [row[0] for row in rows]
def list_corpora(self) -> list[str]:
"""Retourne la liste des corpus présents dans l'historique."""
conn = self._connect()
rows = conn.execute(
"SELECT DISTINCT corpus_name FROM runs ORDER BY corpus_name"
).fetchall()
return [row[0] for row in rows]
def count(self) -> int:
"""Nombre total d'entrées dans l'historique."""
conn = self._connect()
return conn.execute("SELECT COUNT(*) FROM runs").fetchone()[0]
# ------------------------------------------------------------------
# Courbes d'évolution
# ------------------------------------------------------------------
def get_cer_curve(
self,
engine: str,
corpus: Optional[str] = None,
) -> list[dict]:
"""Retourne les données pour tracer la courbe d'évolution du CER.
Parameters
----------
engine:
Nom du moteur.
corpus:
Corpus spécifique (None = tous les corpus pour ce moteur).
Returns
-------
list[dict]
Chaque dict contient ``{"timestamp": str, "cer": float, "run_id": str}``.
"""
entries = self.query(engine=engine, corpus=corpus, limit=1000)
return [
{
"timestamp": e.timestamp,
"cer": e.cer_mean,
"cer_percent": e.cer_percent,
"run_id": e.run_id,
"corpus_name": e.corpus_name,
}
for e in entries
if e.cer_mean is not None
]
# ------------------------------------------------------------------
# Détection de régression
# ------------------------------------------------------------------
def detect_regression(
self,
engine: str,
corpus: Optional[str] = None,
threshold: float = 0.01,
baseline_run_id: Optional[str] = None,
) -> Optional[RegressionResult]:
"""Détecte une régression du CER entre deux runs.
Compare le run le plus récent à une baseline (le run précédent ou
un run spécifique).
Parameters
----------
engine:
Nom du moteur à surveiller.
corpus:
Corpus spécifique (None = tous).
threshold:
Seuil de régression en points absolus de CER (ex : 0.01 = 1%).
Si delta_cer > threshold → régression détectée.
baseline_run_id:
run_id de référence. Si None, utilise l'avant-dernier run.
Returns
-------
RegressionResult | None
None si moins de 2 runs disponibles.
"""
entries = self.query(engine=engine, corpus=corpus, limit=1000)
if len(entries) < 2:
logger.info("Pas assez de runs pour détecter une régression (moteur=%s)", engine)
return None
current = entries[-1]
if baseline_run_id:
baseline_list = [e for e in entries[:-1] if e.run_id == baseline_run_id]
baseline = baseline_list[0] if baseline_list else entries[-2]
else:
baseline = entries[-2]
delta = None
is_regression = False
if current.cer_mean is not None and baseline.cer_mean is not None:
delta = current.cer_mean - baseline.cer_mean
is_regression = delta > threshold
return RegressionResult(
engine_name=engine,
corpus_name=corpus or "tous",
baseline_run_id=baseline.run_id,
baseline_timestamp=baseline.timestamp,
baseline_cer=baseline.cer_mean,
current_run_id=current.run_id,
current_timestamp=current.timestamp,
current_cer=current.cer_mean,
delta_cer=delta,
is_regression=is_regression,
threshold=threshold,
)
def detect_all_regressions(
self,
threshold: float = 0.01,
) -> list[RegressionResult]:
"""Détecte les régressions pour tous les moteurs et corpus connus.
Parameters
----------
threshold:
Seuil de régression.
Returns
-------
list[RegressionResult]
Uniquement les moteurs où une régression est détectée.
"""
results: list[RegressionResult] = []
engines = self.list_engines()
corpora = self.list_corpora()
for engine in engines:
for corpus in corpora:
result = self.detect_regression(engine, corpus, threshold)
if result and result.is_regression:
results.append(result)
return results
# ------------------------------------------------------------------
# Export
# ------------------------------------------------------------------
def export_json(self, output_path: str) -> Path:
"""Exporte l'historique complet en JSON.
Parameters
----------
output_path:
Chemin du fichier JSON de sortie.
Returns
-------
Path
Chemin vers le fichier créé.
"""
entries = self.query(limit=100_000)
path = Path(output_path)
data = {
"picarones_history": True,
"exported_at": datetime.now(timezone.utc).isoformat(),
"total_runs": len(entries),
"engines": self.list_engines(),
"corpora": self.list_corpora(),
"runs": [e.as_dict() for e in entries],
}
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
return path
def __repr__(self) -> str:
return f"BenchmarkHistory(db='{self.db_path}', runs={self.count()})"
# ---------------------------------------------------------------------------
# Données de démonstration longitudinale
# ---------------------------------------------------------------------------
def generate_demo_history(
db: BenchmarkHistory,
n_runs: int = 8,
seed: int = 42,
) -> None:
"""Insère des données fictives de suivi longitudinal pour la démo.
Simule l'amélioration progressive d'un modèle tesseract sur 8 runs,
avec une légère régression au run 5.
Parameters
----------
db:
Base d'historique à remplir.
n_runs:
Nombre de runs à générer.
seed:
Graine aléatoire.
"""
import random
rng = random.Random(seed)
engines = ["tesseract", "pero_ocr", "ancien_moteur"]
corpus = "Chroniques médiévales"
# Trajectoires de CER simulées (amélioration progressive + bruit)
base_cers = {
"tesseract": 0.15,
"pero_ocr": 0.09,
"ancien_moteur": 0.28,
}
improvements = {
"tesseract": -0.008, # améliore de ~0.8% par run
"pero_ocr": -0.005, # améliore de ~0.5% par run
"ancien_moteur": -0.003,
}
from datetime import timedelta
base_date = datetime(2024, 9, 1, tzinfo=timezone.utc)
for run_idx in range(n_runs):
run_date = base_date + timedelta(weeks=run_idx * 2)
run_id = f"demo_run_{run_idx + 1:02d}"
for engine in engines:
cer = base_cers[engine] + improvements[engine] * run_idx
# Ajouter du bruit + régression au run 5
noise = rng.gauss(0, 0.005)
if run_idx == 4 and engine == "tesseract":
noise += 0.02 # régression simulée
cer = max(0.01, min(0.5, cer + noise))
wer = cer * 1.8 + rng.gauss(0, 0.01)
wer = max(0.01, min(0.9, wer))
db.record_single(
run_id=f"{run_id}_{engine}",
corpus_name=corpus,
engine_name=engine,
cer_mean=round(cer, 4),
wer_mean=round(wer, 4),
doc_count=12,
timestamp=run_date.isoformat(),
metadata={
"note": f"Run de démonstration #{run_idx + 1}",
"engine_version": f"5.{run_idx}.0" if engine == "tesseract" else "0.7.2",
},
)