Picarones / picarones /core /inter_engine.py
Claude
sprint36: cΓ’blage inter-moteurs au runner et au moteur narratif
6377044 unverified
Raw
History Blame
19.1 kB
"""MΓ©triques inter-moteurs (Sprint 35 β€” Γ‰tape 2 du plan d'Γ©volution).
Deux familles de mesures qui rΓ©pondent Γ  des questions diffΓ©rentes mais
liΓ©es :
1. **Divergence taxonomique** (`kl_divergence`, `jensen_shannon_divergence`,
`taxonomy_divergence_matrix`) β€” *Γ  quel point les moteurs font-ils des
erreurs de natures diffΓ©rentes ?* Une divergence Γ©levΓ©e signale des
moteurs spΓ©cialisΓ©s sur des classes d'erreurs distinctes (visual vs
abrΓ©viation vs casse) et donc des candidats pour un voting ensemble.
2. **ComplΓ©mentaritΓ©** (`oracle_token_recall`, `complementarity_gap`,
`pairwise_disagreement_rate`) β€” *quel CER serait atteignable si on
combinait les moteurs ?* La borne infΓ©rieure du CER atteignable par
un voting majoritaire token-level est ``1 - oracle_token_recall``.
Si elle est très inférieure au CER du meilleur moteur seul, l'effort
d'un pipeline d'ensemble se justifie. Sinon non.
Convention de typage
--------------------
Toutes les fonctions sont enregistrables dans le registre Sprint 34 si
on les wrappe par un adaptateur ``(input_types=(TEXT, TEXT))``. Pour
limiter le bruit, on ne les enregistre **pas** automatiquement : ce sont
des mΓ©triques d'agrΓ©gation (multi-moteurs ou multi-documents) qui ne
correspondent pas au modèle « une jonction = une métrique » du runner.
Elles sont consommΓ©es par les dΓ©tecteurs narratifs et le rapport HTML.
Note sur l'oracle
-----------------
La mΓ©trique ``oracle_token_recall`` retournΓ©e ici utilise un alignement
bag-of-words pondΓ©rΓ© par multiplicitΓ©. Ce n'est **pas** une vraie
borne atteignable par voting majoritaire sΓ©quentiel β€” c'est une borne
supΓ©rieure (proxy optimiste). La vraie borne demanderait un
alignement séquentiel des hypothèses, ce qui est plus coûteux. Pour
le diagnostic Β« ensemble vaut-il le coup ? Β», le proxy suffit
largement ; on documente clairement la limite dans le glossaire et le
rapport.
"""
from __future__ import annotations
import logging
import math
from collections import Counter
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────────────────
# Divergence taxonomique (KL / Jensen-Shannon)
# ──────────────────────────────────────────────────────────────────────────
def _smoothed_distribution(
distribution: dict[str, float],
keys: list[str],
epsilon: float = 1e-12,
) -> list[float]:
"""Aligne une distribution sur l'ordre de ``keys`` et lisse les zΓ©ros.
Le lissage Γ©vite ``log(0)`` dans la KL. ``epsilon`` est volontairement
minuscule pour ne pas modifier le résultat de manière sensible.
"""
smoothed = [max(distribution.get(k, 0.0), epsilon) for k in keys]
total = sum(smoothed)
return [v / total for v in smoothed]
def kl_divergence(p: dict[str, float], q: dict[str, float]) -> float:
"""KL-divergence ``D(P||Q)`` en bits, sur l'union des clΓ©s.
Les distributions n'ont pas besoin de partager exactement les mΓͺmes
clΓ©s ; les clΓ©s manquantes sont lissΓ©es Γ  ``epsilon`` puis
renormalisΓ©es.
Returns
-------
float
``D(P||Q) β‰₯ 0``. Vaut 0 si et seulement si P == Q. N'est pas
symΓ©trique : ``kl(p, q) != kl(q, p)`` en gΓ©nΓ©ral.
"""
keys = sorted(set(p.keys()) | set(q.keys()))
if not keys:
return 0.0
p_vec = _smoothed_distribution(p, keys)
q_vec = _smoothed_distribution(q, keys)
return sum(pi * math.log2(pi / qi) for pi, qi in zip(p_vec, q_vec))
def jensen_shannon_divergence(
p: dict[str, float],
q: dict[str, float],
) -> float:
"""JS-divergence symΓ©trique en bits, bornΓ©e dans ``[0, 1]``.
``JS(P, Q) = Β½ D(P||M) + Β½ D(Q||M)`` avec ``M = (P + Q) / 2``.
SymΓ©trique et bornΓ©e β€” prΓ©fΓ©rable Γ  la KL pour construire une
matrice triangulaire de divergences entre moteurs.
"""
keys = sorted(set(p.keys()) | set(q.keys()))
if not keys:
return 0.0
p_vec = _smoothed_distribution(p, keys)
q_vec = _smoothed_distribution(q, keys)
m_vec = [(pi + qi) / 2.0 for pi, qi in zip(p_vec, q_vec)]
def _kl(a: list[float], b: list[float]) -> float:
return sum(ai * math.log2(ai / bi) for ai, bi in zip(a, b) if ai > 0)
js = 0.5 * _kl(p_vec, m_vec) + 0.5 * _kl(q_vec, m_vec)
# Borne théorique : JS ∈ [0, 1] en bits. Clamp pour absorber les
# erreurs d'arrondi flottant.
return max(0.0, min(1.0, js))
def taxonomy_divergence_matrix(
distributions: dict[str, dict[str, float]],
metric: str = "js",
) -> dict[str, dict[str, float]]:
"""Construit la matrice de divergence triangulaire entre moteurs.
Parameters
----------
distributions:
``{engine_name: {error_class: probability}}``. Chaque
distribution doit sommer Γ  environ 1 (pas de validation stricte
β€” les distributions taxonomiques de Picarones sont dΓ©jΓ 
normalisΓ©es par ``aggregate_taxonomy``).
metric:
``"js"`` (dΓ©faut, symΓ©trique) ou ``"kl"`` (asymΓ©trique).
Returns
-------
dict[str, dict[str, float]]
Matrice ``{engine_a: {engine_b: divergence}}`` symΓ©trique pour
``js``, asymΓ©trique pour ``kl``. La diagonale vaut 0.
"""
if metric not in ("js", "kl"):
raise ValueError(f"metric doit Γͺtre 'js' ou 'kl' β€” reΓ§u {metric!r}")
fn = jensen_shannon_divergence if metric == "js" else kl_divergence
engines = sorted(distributions.keys())
matrix: dict[str, dict[str, float]] = {a: {} for a in engines}
for a in engines:
for b in engines:
if a == b:
matrix[a][b] = 0.0
elif metric == "js" and b in matrix and a in matrix[b]:
# SymΓ©trique : recopie pour Γ©viter de recalculer
matrix[a][b] = matrix[b][a]
else:
matrix[a][b] = fn(distributions[a], distributions[b])
return matrix
# ──────────────────────────────────────────────────────────────────────────
# ComplΓ©mentaritΓ© (oracle token recall)
# ──────────────────────────────────────────────────────────────────────────
def _word_multiset(text: str) -> Counter[str]:
"""DΓ©composition en multiset de tokens (sΓ©parateur whitespace)."""
return Counter(tok for tok in text.split() if tok)
def oracle_token_recall(
reference: str,
hypotheses: dict[str, str],
) -> float:
"""Borne supΓ©rieure (proxy bag-of-words) du token-recall atteignable
par un voting majoritaire entre tous les moteurs fournis.
Pour chaque token de la rΓ©fΓ©rence (avec sa multiplicitΓ©), on
considère qu'il est "préservé" par l'ensemble si au moins un moteur
en produit une occurrence non encore comptΓ©e. Le score est le ratio
d'occurrences GT prΓ©servΓ©es sur le total.
Parameters
----------
reference:
Texte GT.
hypotheses:
``{engine_name: hypothesis_text}``.
Returns
-------
float
Ratio dans ``[0, 1]``. ``1.0`` = chaque token GT est prΓ©sent
dans au moins une hypothèse à hauteur de sa multiplicité.
Note
----
Cette borne est **optimiste** (supΓ©rieure Γ  la vraie borne par
voting sΓ©quentiel) car elle ignore l'ordre d'apparition. Pour le
diagnostic Β« un voting vaut-il l'effort ? Β» le proxy suffit ; pour
une vraie borne il faudrait un alignement sΓ©quentiel.
"""
ref_counter = _word_multiset(reference)
if not ref_counter or not hypotheses:
return 1.0 if not ref_counter else 0.0
hyp_counters = [_word_multiset(h) for h in hypotheses.values()]
total_ref = sum(ref_counter.values())
preserved = 0
for token, gt_count in ref_counter.items():
# Pour chaque moteur, le nombre d'occurrences disponibles, plafonnΓ©
# Γ  la multiplicitΓ© GT. L'oracle prend le max sur les moteurs.
best = max((min(gt_count, hc.get(token, 0)) for hc in hyp_counters), default=0)
preserved += best
return preserved / total_ref
def complementarity_gap(
reference: str,
hypotheses: dict[str, str],
) -> dict[str, float]:
"""Compare l'oracle au meilleur moteur seul.
Returns
-------
dict
``{
"oracle_recall": float, # bag-of-words recall de l'oracle
"best_single_recall": float, # meilleur recall token d'un moteur seul
"best_engine": str, # nom du moteur correspondant
"absolute_gap": float, # oracle - best_single (toujours β‰₯ 0)
"relative_gap": float, # absolute_gap / (1 - best_single + Ξ΅)
# = fraction des erreurs encore Γ©vitables
# par un ensemble
}``
"""
ref_counter = _word_multiset(reference)
total = sum(ref_counter.values())
if not total:
return {
"oracle_recall": 1.0,
"best_single_recall": 1.0,
"best_engine": "",
"absolute_gap": 0.0,
"relative_gap": 0.0,
}
def _single_recall(hyp_text: str) -> float:
hc = _word_multiset(hyp_text)
preserved = sum(min(gt, hc.get(tok, 0)) for tok, gt in ref_counter.items())
return preserved / total
if not hypotheses:
return {
"oracle_recall": 0.0,
"best_single_recall": 0.0,
"best_engine": "",
"absolute_gap": 0.0,
"relative_gap": 0.0,
}
per_engine = {name: _single_recall(h) for name, h in hypotheses.items()}
best_engine, best_recall = max(per_engine.items(), key=lambda kv: kv[1])
oracle = oracle_token_recall(reference, hypotheses)
absolute_gap = max(0.0, oracle - best_recall)
# relative_gap : fraction des erreurs du meilleur moteur que l'ensemble
# serait théoriquement capable de récupérer (∈ [0, 1])
headroom = max(1.0 - best_recall, 1e-12)
relative_gap = min(1.0, absolute_gap / headroom)
return {
"oracle_recall": oracle,
"best_single_recall": best_recall,
"best_engine": best_engine,
"absolute_gap": absolute_gap,
"relative_gap": relative_gap,
}
def pairwise_disagreement_rate(
reference: str,
hyp_a: str,
hyp_b: str,
) -> float:
"""Fraction de tokens GT pour lesquels A et B sont en dΓ©saccord.
Un dΓ©saccord = (l'un prΓ©serve le token, l'autre non) OU
(les deux le ratent mais avec des substitutions diffΓ©rentes β€” non
capturΓ© ici, on reste sur la version simple prΓ©sence/absence).
Returns
-------
float
Ratio dans ``[0, 1]``. ``0`` = A et B font les mΓͺmes choix
(pas de gain d'ensemble). ``1`` = A et B sont toujours en
dΓ©saccord (gain d'ensemble maximal).
"""
ref_counter = _word_multiset(reference)
if not ref_counter:
return 0.0
a = _word_multiset(hyp_a)
b = _word_multiset(hyp_b)
total = sum(ref_counter.values())
disagree = 0
for tok, gt_count in ref_counter.items():
a_pres = min(gt_count, a.get(tok, 0))
b_pres = min(gt_count, b.get(tok, 0))
# Compte les positions oΓΉ A et B donnent une rΓ©ponse diffΓ©rente
disagree += abs(a_pres - b_pres)
return disagree / total
# ──────────────────────────────────────────────────────────────────────────
# AgrΓ©gation au niveau benchmark (Sprint 36)
# ──────────────────────────────────────────────────────────────────────────
def compute_inter_engine_analysis(
*,
per_engine_outputs: dict[str, dict[str, str]],
ground_truths: dict[str, str],
taxonomy_distributions: dict[str, dict[str, float]] | None = None,
divergence_metric: str = "js",
) -> dict:
"""Agrège les métriques inter-moteurs sur l'ensemble du corpus.
Parameters
----------
per_engine_outputs:
``{engine_name: {doc_id: hypothesis_text}}``. Une entrΓ©e par
moteur, avec une hypothèse par document. Les documents absents
d'un moteur (Γ©checs, timeouts) sont simplement ignorΓ©s pour ce
moteur β€” l'oracle est calculΓ© sur les moteurs qui ont produit
une sortie pour le doc.
ground_truths:
``{doc_id: ground_truth_text}``. La GT est la mΓͺme pour tous
les moteurs ; on la passe une seule fois.
taxonomy_distributions:
``{engine_name: {error_class: probability}}`` β€” typiquement
``EngineReport.aggregated_taxonomy["class_distribution"]``. Si
``None`` ou vide, la divergence taxonomique n'est pas calculΓ©e.
divergence_metric:
``"js"`` (dΓ©faut, symΓ©trique) ou ``"kl"``.
Returns
-------
dict
Structure stable consommable par les dΓ©tecteurs narratifs et le
rapport HTML :
``{
"complementarity": {
"oracle_recall": float,
"best_single_recall": float,
"best_engine": str,
"absolute_gap": float,
"relative_gap": float,
"doc_count": int,
"per_doc": [{doc_id, oracle, best, gap}, ...] # max 50 docs
},
"taxonomy_divergence": {
"metric": "js"|"kl",
"matrix": {engine_a: {engine_b: divergence}},
"max_pair": [engine_a, engine_b, value] # paire la plus divergente
} | None,
"engines": [...], # liste des moteurs analysΓ©s (ordre stable)
}``
"""
engines = sorted(per_engine_outputs.keys())
result: dict = {"engines": engines}
# ── ComplΓ©mentaritΓ© agrΓ©gΓ©e doc par doc ──────────────────────────────
if not engines:
result["complementarity"] = None
else:
total_oracle_preserved = 0
total_ref_tokens = 0
per_engine_preserved: dict[str, int] = {name: 0 for name in engines}
per_doc_records: list[dict] = []
for doc_id, gt in ground_truths.items():
ref_counter = _word_multiset(gt)
ref_total = sum(ref_counter.values())
if not ref_total:
continue
total_ref_tokens += ref_total
doc_hyps: dict[str, str] = {}
for name in engines:
hyp = per_engine_outputs.get(name, {}).get(doc_id)
if hyp is not None:
doc_hyps[name] = hyp
if not doc_hyps:
continue
hyp_counters = {n: _word_multiset(h) for n, h in doc_hyps.items()}
doc_oracle = 0
doc_best_per_engine: dict[str, int] = {n: 0 for n in doc_hyps}
for tok, gt_count in ref_counter.items():
# Oracle : meilleur des moteurs sur ce token
best_for_token = 0
for name, hc in hyp_counters.items():
preserved = min(gt_count, hc.get(tok, 0))
doc_best_per_engine[name] += preserved
if preserved > best_for_token:
best_for_token = preserved
doc_oracle += best_for_token
total_oracle_preserved += doc_oracle
for name, count in doc_best_per_engine.items():
per_engine_preserved[name] += count
doc_best = max(doc_best_per_engine.values()) if doc_best_per_engine else 0
per_doc_records.append({
"doc_id": doc_id,
"oracle_recall": doc_oracle / ref_total,
"best_single_recall": doc_best / ref_total,
"absolute_gap": (doc_oracle - doc_best) / ref_total,
})
if total_ref_tokens == 0:
result["complementarity"] = None
else:
oracle_recall = total_oracle_preserved / total_ref_tokens
recalls = {
name: per_engine_preserved[name] / total_ref_tokens
for name in engines
}
best_engine, best_recall = max(recalls.items(), key=lambda kv: kv[1])
absolute_gap = max(0.0, oracle_recall - best_recall)
headroom = max(1.0 - best_recall, 1e-12)
relative_gap = min(1.0, absolute_gap / headroom)
# Garder les ``per_doc_records`` les plus instructifs : tri par
# gap absolu dΓ©croissant, top 50. Les dΓ©tecteurs narratifs
# n'en consomment que quelques-uns.
per_doc_records.sort(key=lambda r: r["absolute_gap"], reverse=True)
per_doc_top = per_doc_records[:50]
result["complementarity"] = {
"oracle_recall": oracle_recall,
"best_single_recall": best_recall,
"best_engine": best_engine,
"absolute_gap": absolute_gap,
"relative_gap": relative_gap,
"doc_count": len(per_doc_records),
"per_engine_recall": recalls,
"per_doc": per_doc_top,
}
# ── Divergence taxonomique ─────────────────────────────────────────
if not taxonomy_distributions:
result["taxonomy_divergence"] = None
else:
matrix = taxonomy_divergence_matrix(
taxonomy_distributions,
metric=divergence_metric,
)
# Cherche la paire la plus divergente (utile pour la synthèse
# narrative qui veut nommer les deux moteurs candidats Γ 
# l'ensemble).
max_pair: tuple[str, str, float] = ("", "", 0.0)
names = sorted(matrix.keys())
for i, a in enumerate(names):
for b in names[i + 1:]:
v = matrix[a][b]
if v > max_pair[2]:
max_pair = (a, b, v)
result["taxonomy_divergence"] = {
"metric": divergence_metric,
"matrix": matrix,
"max_pair": list(max_pair) if max_pair[2] > 0 else None,
}
return result
__all__ = [
"kl_divergence",
"jensen_shannon_divergence",
"taxonomy_divergence_matrix",
"oracle_token_recall",
"complementarity_gap",
"pairwise_disagreement_rate",
"compute_inter_engine_analysis",
]