Spaces:
Sleeping
Sleeping
| """MΓ©triques inter-moteurs (Sprint 35 β Γtape 2 du plan d'Γ©volution). | |
| Deux familles de mesures qui rΓ©pondent Γ des questions diffΓ©rentes mais | |
| liΓ©es : | |
| 1. **Divergence taxonomique** (`kl_divergence`, `jensen_shannon_divergence`, | |
| `taxonomy_divergence_matrix`) β *Γ quel point les moteurs font-ils des | |
| erreurs de natures diffΓ©rentes ?* Une divergence Γ©levΓ©e signale des | |
| moteurs spΓ©cialisΓ©s sur des classes d'erreurs distinctes (visual vs | |
| abrΓ©viation vs casse) et donc des candidats pour un voting ensemble. | |
| 2. **ComplΓ©mentaritΓ©** (`oracle_token_recall`, `complementarity_gap`, | |
| `pairwise_disagreement_rate`) β *quel CER serait atteignable si on | |
| combinait les moteurs ?* La borne infΓ©rieure du CER atteignable par | |
| un voting majoritaire token-level est ``1 - oracle_token_recall``. | |
| Si elle est très inférieure au CER du meilleur moteur seul, l'effort | |
| d'un pipeline d'ensemble se justifie. Sinon non. | |
| Convention de typage | |
| -------------------- | |
| Toutes les fonctions sont enregistrables dans le registre Sprint 34 si | |
| on les wrappe par un adaptateur ``(input_types=(TEXT, TEXT))``. Pour | |
| limiter le bruit, on ne les enregistre **pas** automatiquement : ce sont | |
| des mΓ©triques d'agrΓ©gation (multi-moteurs ou multi-documents) qui ne | |
| correspondent pas au modèle « une jonction = une métrique » du runner. | |
| Elles sont consommΓ©es par les dΓ©tecteurs narratifs et le rapport HTML. | |
| Note sur l'oracle | |
| ----------------- | |
| La mΓ©trique ``oracle_token_recall`` retournΓ©e ici utilise un alignement | |
| bag-of-words pondΓ©rΓ© par multiplicitΓ©. Ce n'est **pas** une vraie | |
| borne atteignable par voting majoritaire sΓ©quentiel β c'est une borne | |
| supΓ©rieure (proxy optimiste). La vraie borne demanderait un | |
| alignement séquentiel des hypothèses, ce qui est plus coûteux. Pour | |
| le diagnostic Β« ensemble vaut-il le coup ? Β», le proxy suffit | |
| largement ; on documente clairement la limite dans le glossaire et le | |
| rapport. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import math | |
| from collections import Counter | |
| logger = logging.getLogger(__name__) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Divergence taxonomique (KL / Jensen-Shannon) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _smoothed_distribution( | |
| distribution: dict[str, float], | |
| keys: list[str], | |
| epsilon: float = 1e-12, | |
| ) -> list[float]: | |
| """Aligne une distribution sur l'ordre de ``keys`` et lisse les zΓ©ros. | |
| Le lissage Γ©vite ``log(0)`` dans la KL. ``epsilon`` est volontairement | |
| minuscule pour ne pas modifier le résultat de manière sensible. | |
| """ | |
| smoothed = [max(distribution.get(k, 0.0), epsilon) for k in keys] | |
| total = sum(smoothed) | |
| return [v / total for v in smoothed] | |
| def kl_divergence(p: dict[str, float], q: dict[str, float]) -> float: | |
| """KL-divergence ``D(P||Q)`` en bits, sur l'union des clΓ©s. | |
| Les distributions n'ont pas besoin de partager exactement les mΓͺmes | |
| clΓ©s ; les clΓ©s manquantes sont lissΓ©es Γ ``epsilon`` puis | |
| renormalisΓ©es. | |
| Returns | |
| ------- | |
| float | |
| ``D(P||Q) β₯ 0``. Vaut 0 si et seulement si P == Q. N'est pas | |
| symΓ©trique : ``kl(p, q) != kl(q, p)`` en gΓ©nΓ©ral. | |
| """ | |
| keys = sorted(set(p.keys()) | set(q.keys())) | |
| if not keys: | |
| return 0.0 | |
| p_vec = _smoothed_distribution(p, keys) | |
| q_vec = _smoothed_distribution(q, keys) | |
| return sum(pi * math.log2(pi / qi) for pi, qi in zip(p_vec, q_vec)) | |
| def jensen_shannon_divergence( | |
| p: dict[str, float], | |
| q: dict[str, float], | |
| ) -> float: | |
| """JS-divergence symΓ©trique en bits, bornΓ©e dans ``[0, 1]``. | |
| ``JS(P, Q) = Β½ D(P||M) + Β½ D(Q||M)`` avec ``M = (P + Q) / 2``. | |
| SymΓ©trique et bornΓ©e β prΓ©fΓ©rable Γ la KL pour construire une | |
| matrice triangulaire de divergences entre moteurs. | |
| """ | |
| keys = sorted(set(p.keys()) | set(q.keys())) | |
| if not keys: | |
| return 0.0 | |
| p_vec = _smoothed_distribution(p, keys) | |
| q_vec = _smoothed_distribution(q, keys) | |
| m_vec = [(pi + qi) / 2.0 for pi, qi in zip(p_vec, q_vec)] | |
| def _kl(a: list[float], b: list[float]) -> float: | |
| return sum(ai * math.log2(ai / bi) for ai, bi in zip(a, b) if ai > 0) | |
| js = 0.5 * _kl(p_vec, m_vec) + 0.5 * _kl(q_vec, m_vec) | |
| # Borne thΓ©orique : JS β [0, 1] en bits. Clamp pour absorber les | |
| # erreurs d'arrondi flottant. | |
| return max(0.0, min(1.0, js)) | |
| def taxonomy_divergence_matrix( | |
| distributions: dict[str, dict[str, float]], | |
| metric: str = "js", | |
| ) -> dict[str, dict[str, float]]: | |
| """Construit la matrice de divergence triangulaire entre moteurs. | |
| Parameters | |
| ---------- | |
| distributions: | |
| ``{engine_name: {error_class: probability}}``. Chaque | |
| distribution doit sommer Γ environ 1 (pas de validation stricte | |
| β les distributions taxonomiques de Picarones sont dΓ©jΓ | |
| normalisΓ©es par ``aggregate_taxonomy``). | |
| metric: | |
| ``"js"`` (dΓ©faut, symΓ©trique) ou ``"kl"`` (asymΓ©trique). | |
| Returns | |
| ------- | |
| dict[str, dict[str, float]] | |
| Matrice ``{engine_a: {engine_b: divergence}}`` symΓ©trique pour | |
| ``js``, asymΓ©trique pour ``kl``. La diagonale vaut 0. | |
| """ | |
| if metric not in ("js", "kl"): | |
| raise ValueError(f"metric doit Γͺtre 'js' ou 'kl' β reΓ§u {metric!r}") | |
| fn = jensen_shannon_divergence if metric == "js" else kl_divergence | |
| engines = sorted(distributions.keys()) | |
| matrix: dict[str, dict[str, float]] = {a: {} for a in engines} | |
| for a in engines: | |
| for b in engines: | |
| if a == b: | |
| matrix[a][b] = 0.0 | |
| elif metric == "js" and b in matrix and a in matrix[b]: | |
| # SymΓ©trique : recopie pour Γ©viter de recalculer | |
| matrix[a][b] = matrix[b][a] | |
| else: | |
| matrix[a][b] = fn(distributions[a], distributions[b]) | |
| return matrix | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ComplΓ©mentaritΓ© (oracle token recall) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _word_multiset(text: str) -> Counter[str]: | |
| """DΓ©composition en multiset de tokens (sΓ©parateur whitespace).""" | |
| return Counter(tok for tok in text.split() if tok) | |
| def oracle_token_recall( | |
| reference: str, | |
| hypotheses: dict[str, str], | |
| ) -> float: | |
| """Borne supΓ©rieure (proxy bag-of-words) du token-recall atteignable | |
| par un voting majoritaire entre tous les moteurs fournis. | |
| Pour chaque token de la rΓ©fΓ©rence (avec sa multiplicitΓ©), on | |
| considère qu'il est "préservé" par l'ensemble si au moins un moteur | |
| en produit une occurrence non encore comptΓ©e. Le score est le ratio | |
| d'occurrences GT prΓ©servΓ©es sur le total. | |
| Parameters | |
| ---------- | |
| reference: | |
| Texte GT. | |
| hypotheses: | |
| ``{engine_name: hypothesis_text}``. | |
| Returns | |
| ------- | |
| float | |
| Ratio dans ``[0, 1]``. ``1.0`` = chaque token GT est prΓ©sent | |
| dans au moins une hypothèse à hauteur de sa multiplicité. | |
| Note | |
| ---- | |
| Cette borne est **optimiste** (supΓ©rieure Γ la vraie borne par | |
| voting sΓ©quentiel) car elle ignore l'ordre d'apparition. Pour le | |
| diagnostic Β« un voting vaut-il l'effort ? Β» le proxy suffit ; pour | |
| une vraie borne il faudrait un alignement sΓ©quentiel. | |
| """ | |
| ref_counter = _word_multiset(reference) | |
| if not ref_counter or not hypotheses: | |
| return 1.0 if not ref_counter else 0.0 | |
| hyp_counters = [_word_multiset(h) for h in hypotheses.values()] | |
| total_ref = sum(ref_counter.values()) | |
| preserved = 0 | |
| for token, gt_count in ref_counter.items(): | |
| # Pour chaque moteur, le nombre d'occurrences disponibles, plafonnΓ© | |
| # Γ la multiplicitΓ© GT. L'oracle prend le max sur les moteurs. | |
| best = max((min(gt_count, hc.get(token, 0)) for hc in hyp_counters), default=0) | |
| preserved += best | |
| return preserved / total_ref | |
| def complementarity_gap( | |
| reference: str, | |
| hypotheses: dict[str, str], | |
| ) -> dict[str, float]: | |
| """Compare l'oracle au meilleur moteur seul. | |
| Returns | |
| ------- | |
| dict | |
| ``{ | |
| "oracle_recall": float, # bag-of-words recall de l'oracle | |
| "best_single_recall": float, # meilleur recall token d'un moteur seul | |
| "best_engine": str, # nom du moteur correspondant | |
| "absolute_gap": float, # oracle - best_single (toujours β₯ 0) | |
| "relative_gap": float, # absolute_gap / (1 - best_single + Ξ΅) | |
| # = fraction des erreurs encore Γ©vitables | |
| # par un ensemble | |
| }`` | |
| """ | |
| ref_counter = _word_multiset(reference) | |
| total = sum(ref_counter.values()) | |
| if not total: | |
| return { | |
| "oracle_recall": 1.0, | |
| "best_single_recall": 1.0, | |
| "best_engine": "", | |
| "absolute_gap": 0.0, | |
| "relative_gap": 0.0, | |
| } | |
| def _single_recall(hyp_text: str) -> float: | |
| hc = _word_multiset(hyp_text) | |
| preserved = sum(min(gt, hc.get(tok, 0)) for tok, gt in ref_counter.items()) | |
| return preserved / total | |
| if not hypotheses: | |
| return { | |
| "oracle_recall": 0.0, | |
| "best_single_recall": 0.0, | |
| "best_engine": "", | |
| "absolute_gap": 0.0, | |
| "relative_gap": 0.0, | |
| } | |
| per_engine = {name: _single_recall(h) for name, h in hypotheses.items()} | |
| best_engine, best_recall = max(per_engine.items(), key=lambda kv: kv[1]) | |
| oracle = oracle_token_recall(reference, hypotheses) | |
| absolute_gap = max(0.0, oracle - best_recall) | |
| # relative_gap : fraction des erreurs du meilleur moteur que l'ensemble | |
| # serait thΓ©oriquement capable de rΓ©cupΓ©rer (β [0, 1]) | |
| headroom = max(1.0 - best_recall, 1e-12) | |
| relative_gap = min(1.0, absolute_gap / headroom) | |
| return { | |
| "oracle_recall": oracle, | |
| "best_single_recall": best_recall, | |
| "best_engine": best_engine, | |
| "absolute_gap": absolute_gap, | |
| "relative_gap": relative_gap, | |
| } | |
| def pairwise_disagreement_rate( | |
| reference: str, | |
| hyp_a: str, | |
| hyp_b: str, | |
| ) -> float: | |
| """Fraction de tokens GT pour lesquels A et B sont en dΓ©saccord. | |
| Un dΓ©saccord = (l'un prΓ©serve le token, l'autre non) OU | |
| (les deux le ratent mais avec des substitutions diffΓ©rentes β non | |
| capturΓ© ici, on reste sur la version simple prΓ©sence/absence). | |
| Returns | |
| ------- | |
| float | |
| Ratio dans ``[0, 1]``. ``0`` = A et B font les mΓͺmes choix | |
| (pas de gain d'ensemble). ``1`` = A et B sont toujours en | |
| dΓ©saccord (gain d'ensemble maximal). | |
| """ | |
| ref_counter = _word_multiset(reference) | |
| if not ref_counter: | |
| return 0.0 | |
| a = _word_multiset(hyp_a) | |
| b = _word_multiset(hyp_b) | |
| total = sum(ref_counter.values()) | |
| disagree = 0 | |
| for tok, gt_count in ref_counter.items(): | |
| a_pres = min(gt_count, a.get(tok, 0)) | |
| b_pres = min(gt_count, b.get(tok, 0)) | |
| # Compte les positions oΓΉ A et B donnent une rΓ©ponse diffΓ©rente | |
| disagree += abs(a_pres - b_pres) | |
| return disagree / total | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # AgrΓ©gation au niveau benchmark (Sprint 36) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def compute_inter_engine_analysis( | |
| *, | |
| per_engine_outputs: dict[str, dict[str, str]], | |
| ground_truths: dict[str, str], | |
| taxonomy_distributions: dict[str, dict[str, float]] | None = None, | |
| divergence_metric: str = "js", | |
| ) -> dict: | |
| """Agrège les métriques inter-moteurs sur l'ensemble du corpus. | |
| Parameters | |
| ---------- | |
| per_engine_outputs: | |
| ``{engine_name: {doc_id: hypothesis_text}}``. Une entrΓ©e par | |
| moteur, avec une hypothèse par document. Les documents absents | |
| d'un moteur (Γ©checs, timeouts) sont simplement ignorΓ©s pour ce | |
| moteur β l'oracle est calculΓ© sur les moteurs qui ont produit | |
| une sortie pour le doc. | |
| ground_truths: | |
| ``{doc_id: ground_truth_text}``. La GT est la mΓͺme pour tous | |
| les moteurs ; on la passe une seule fois. | |
| taxonomy_distributions: | |
| ``{engine_name: {error_class: probability}}`` β typiquement | |
| ``EngineReport.aggregated_taxonomy["class_distribution"]``. Si | |
| ``None`` ou vide, la divergence taxonomique n'est pas calculΓ©e. | |
| divergence_metric: | |
| ``"js"`` (dΓ©faut, symΓ©trique) ou ``"kl"``. | |
| Returns | |
| ------- | |
| dict | |
| Structure stable consommable par les dΓ©tecteurs narratifs et le | |
| rapport HTML : | |
| ``{ | |
| "complementarity": { | |
| "oracle_recall": float, | |
| "best_single_recall": float, | |
| "best_engine": str, | |
| "absolute_gap": float, | |
| "relative_gap": float, | |
| "doc_count": int, | |
| "per_doc": [{doc_id, oracle, best, gap}, ...] # max 50 docs | |
| }, | |
| "taxonomy_divergence": { | |
| "metric": "js"|"kl", | |
| "matrix": {engine_a: {engine_b: divergence}}, | |
| "max_pair": [engine_a, engine_b, value] # paire la plus divergente | |
| } | None, | |
| "engines": [...], # liste des moteurs analysΓ©s (ordre stable) | |
| }`` | |
| """ | |
| engines = sorted(per_engine_outputs.keys()) | |
| result: dict = {"engines": engines} | |
| # ββ ComplΓ©mentaritΓ© agrΓ©gΓ©e doc par doc ββββββββββββββββββββββββββββββ | |
| if not engines: | |
| result["complementarity"] = None | |
| else: | |
| total_oracle_preserved = 0 | |
| total_ref_tokens = 0 | |
| per_engine_preserved: dict[str, int] = {name: 0 for name in engines} | |
| per_doc_records: list[dict] = [] | |
| for doc_id, gt in ground_truths.items(): | |
| ref_counter = _word_multiset(gt) | |
| ref_total = sum(ref_counter.values()) | |
| if not ref_total: | |
| continue | |
| total_ref_tokens += ref_total | |
| doc_hyps: dict[str, str] = {} | |
| for name in engines: | |
| hyp = per_engine_outputs.get(name, {}).get(doc_id) | |
| if hyp is not None: | |
| doc_hyps[name] = hyp | |
| if not doc_hyps: | |
| continue | |
| hyp_counters = {n: _word_multiset(h) for n, h in doc_hyps.items()} | |
| doc_oracle = 0 | |
| doc_best_per_engine: dict[str, int] = {n: 0 for n in doc_hyps} | |
| for tok, gt_count in ref_counter.items(): | |
| # Oracle : meilleur des moteurs sur ce token | |
| best_for_token = 0 | |
| for name, hc in hyp_counters.items(): | |
| preserved = min(gt_count, hc.get(tok, 0)) | |
| doc_best_per_engine[name] += preserved | |
| if preserved > best_for_token: | |
| best_for_token = preserved | |
| doc_oracle += best_for_token | |
| total_oracle_preserved += doc_oracle | |
| for name, count in doc_best_per_engine.items(): | |
| per_engine_preserved[name] += count | |
| doc_best = max(doc_best_per_engine.values()) if doc_best_per_engine else 0 | |
| per_doc_records.append({ | |
| "doc_id": doc_id, | |
| "oracle_recall": doc_oracle / ref_total, | |
| "best_single_recall": doc_best / ref_total, | |
| "absolute_gap": (doc_oracle - doc_best) / ref_total, | |
| }) | |
| if total_ref_tokens == 0: | |
| result["complementarity"] = None | |
| else: | |
| oracle_recall = total_oracle_preserved / total_ref_tokens | |
| recalls = { | |
| name: per_engine_preserved[name] / total_ref_tokens | |
| for name in engines | |
| } | |
| best_engine, best_recall = max(recalls.items(), key=lambda kv: kv[1]) | |
| absolute_gap = max(0.0, oracle_recall - best_recall) | |
| headroom = max(1.0 - best_recall, 1e-12) | |
| relative_gap = min(1.0, absolute_gap / headroom) | |
| # Garder les ``per_doc_records`` les plus instructifs : tri par | |
| # gap absolu dΓ©croissant, top 50. Les dΓ©tecteurs narratifs | |
| # n'en consomment que quelques-uns. | |
| per_doc_records.sort(key=lambda r: r["absolute_gap"], reverse=True) | |
| per_doc_top = per_doc_records[:50] | |
| result["complementarity"] = { | |
| "oracle_recall": oracle_recall, | |
| "best_single_recall": best_recall, | |
| "best_engine": best_engine, | |
| "absolute_gap": absolute_gap, | |
| "relative_gap": relative_gap, | |
| "doc_count": len(per_doc_records), | |
| "per_engine_recall": recalls, | |
| "per_doc": per_doc_top, | |
| } | |
| # ββ Divergence taxonomique βββββββββββββββββββββββββββββββββββββββββ | |
| if not taxonomy_distributions: | |
| result["taxonomy_divergence"] = None | |
| else: | |
| matrix = taxonomy_divergence_matrix( | |
| taxonomy_distributions, | |
| metric=divergence_metric, | |
| ) | |
| # Cherche la paire la plus divergente (utile pour la synthèse | |
| # narrative qui veut nommer les deux moteurs candidats Γ | |
| # l'ensemble). | |
| max_pair: tuple[str, str, float] = ("", "", 0.0) | |
| names = sorted(matrix.keys()) | |
| for i, a in enumerate(names): | |
| for b in names[i + 1:]: | |
| v = matrix[a][b] | |
| if v > max_pair[2]: | |
| max_pair = (a, b, v) | |
| result["taxonomy_divergence"] = { | |
| "metric": divergence_metric, | |
| "matrix": matrix, | |
| "max_pair": list(max_pair) if max_pair[2] > 0 else None, | |
| } | |
| return result | |
| __all__ = [ | |
| "kl_divergence", | |
| "jensen_shannon_divergence", | |
| "taxonomy_divergence_matrix", | |
| "oracle_token_recall", | |
| "complementarity_gap", | |
| "pairwise_disagreement_rate", | |
| "compute_inter_engine_analysis", | |
| ] | |