"""Tests Sprint 20 — modélisation coût + vue Pareto. Sprint 5 du plan rapport. Couvre : 1. `pricing.py` : chargement de la table, estimation locale vs cloud. 2. `compute_pareto_front` : cas canoniques + dégénérés. 3. Intégration `_build_report_data` : coût annoté, front calculé, JSON ok. 4. Détecteurs narratifs `pareto_alternative` et `cost_outlier`. 5. Rendu HTML : section Pareto, toggles axes, notes méthodologiques. """ from __future__ import annotations import re from pathlib import Path import pytest from picarones.measurements.narrative import build_synthesis from picarones.measurements.narrative.detectors import ( detect_cost_outlier, detect_pareto_alternative, ) from picarones.core.facts import FactType from picarones.measurements.pricing import ( build_costs_for_benchmark, estimate_cost, load_pricing_database, ) from picarones.measurements.statistics import compute_pareto_front # --------------------------------------------------------------------------- # 1. Pricing # --------------------------------------------------------------------------- class TestLoadPricingDatabase: def test_default_file_loads(self): defaults, table = load_pricing_database() assert defaults.currency == "EUR" assert defaults.last_updated # doit être rempli assert "tesseract" in table assert "gpt-4o" in table assert "google_vision" in table def test_missing_file_returns_empty(self, tmp_path): missing = tmp_path / "nope.yaml" defaults, table = load_pricing_database(missing) assert table == {} assert defaults.currency == "EUR" # fallback class TestEstimateCost: def test_cloud_api_uses_listed_price(self): cost = estimate_cost("google_vision") assert cost.type == "cloud_api" assert cost.cost_per_1k_pages_eur > 0 assert cost.pricing_source_url is not None assert cost.api_price_per_1k_pages == cost.cost_per_1k_pages_eur def test_local_engine_uses_seconds_times_rate(self): cost = estimate_cost("tesseract") assert cost.type == "local" # 2s/page × 1000 pages / 3600 × 0.08 €/h ≈ 0.044 € assert cost.cost_per_1k_pages_eur == pytest.approx(0.044, abs=0.01) assert "Temps d'inférence" in " ".join(cost.assumptions) def test_measured_seconds_override_indicative(self): cost = estimate_cost("tesseract", measured_seconds_per_page=10.0) # Rate = 0.08 €/h → 10 × 1000 / 3600 × 0.08 ≈ 0.22 € assert cost.cost_per_1k_pages_eur == pytest.approx(0.222, abs=0.01) assert "mesuré" in " ".join(cost.assumptions) def test_pipeline_prefers_llm_model(self): cost = estimate_cost( engine_name="tesseract → gpt-4o", llm_model="gpt-4o", is_pipeline=True, ) assert cost.engine_key == "gpt-4o" assert cost.type == "cloud_api" def test_unknown_engine_returns_unknown_type(self): cost = estimate_cost("totally-not-a-real-engine") assert cost.type == "unknown" assert cost.cost_per_1k_pages_eur is None assert "Aucune entrée" in " ".join(cost.assumptions) def test_hourly_rate_override(self): cheap = estimate_cost("tesseract", hourly_rate_override_eur=0.01) expensive = estimate_cost("tesseract", hourly_rate_override_eur=10.0) assert expensive.cost_per_1k_pages_eur > cheap.cost_per_1k_pages_eur def test_carbon_estimate_computed(self): cost = estimate_cost("gpt-4o") assert cost.co2_per_1k_pages_g is not None assert cost.co2_per_1k_pages_g > 0 # kWh × grid intensity → positive et cohérent expected = cost.kwh_per_1k_pages * cost.grid_intensity_g_co2_per_kwh assert cost.co2_per_1k_pages_g == pytest.approx(expected) class TestBuildCostsForBenchmark: def test_annotates_all_engines(self): engines = [ {"name": "tesseract", "is_pipeline": False, "pipeline_info": {}}, {"name": "pipeline", "is_pipeline": True, "pipeline_info": {"llm_model": "gpt-4o"}}, ] durations = {"tesseract": 1.5, "pipeline": 12.0} costs = build_costs_for_benchmark(engines, durations) assert "tesseract" in costs assert "pipeline" in costs assert costs["tesseract"]["type"] == "local" assert costs["pipeline"]["type"] == "cloud_api" # --------------------------------------------------------------------------- # 2. Pareto # --------------------------------------------------------------------------- class TestComputeParetoFront: def test_trivial_front(self): points = [ {"engine": "A", "cer": 0.05, "cost": 1.0}, # meilleur CER {"engine": "B", "cer": 0.10, "cost": 0.1}, # meilleur coût {"engine": "C", "cer": 0.08, "cost": 2.0}, # dominé par A ] front = compute_pareto_front(points) assert set(front) == {"A", "B"} def test_empty_input(self): assert compute_pareto_front([]) == [] def test_single_point_is_its_own_front(self): assert compute_pareto_front([{"engine": "X", "cer": 0.1, "cost": 1.0}]) == ["X"] def test_skips_points_with_missing_values(self): points = [ {"engine": "A", "cer": 0.05, "cost": 1.0}, {"engine": "B", "cost": 0.5}, # pas de cer {"engine": "C", "cer": 0.10}, # pas de cost ] front = compute_pareto_front(points) assert front == ["A"] def test_three_dimensional_front(self): # 3 objectifs à minimiser — vérifie que le détecteur marche à k>2 points = [ {"name": "A", "a": 1, "b": 10, "c": 100}, # meilleur en a {"name": "B", "a": 10, "b": 1, "c": 100}, # meilleur en b {"name": "C", "a": 10, "b": 10, "c": 1}, # meilleur en c {"name": "D", "a": 20, "b": 20, "c": 200}, # dominé partout ] front = compute_pareto_front( points, objectives=("a", "b", "c"), name_key="name", ) assert set(front) == {"A", "B", "C"} assert "D" not in front def test_mixed_min_max(self): # Minimiser CER, maximiser ancrage points = [ {"engine": "A", "cer": 0.05, "anchor": 0.95}, # meilleur partout {"engine": "B", "cer": 0.10, "anchor": 0.85}, # dominé {"engine": "C", "cer": 0.08, "anchor": 0.99}, # meilleur anchor ] front = compute_pareto_front( points, objectives=("cer", "anchor"), minimize=(True, False), ) assert set(front) == {"A", "C"} def test_minimize_length_mismatch_raises(self): with pytest.raises(ValueError): compute_pareto_front([{"engine": "A", "cer": 0.1, "cost": 1.0}], objectives=("cer", "cost"), minimize=(True,)) # --------------------------------------------------------------------------- # 3. Détecteurs narratifs Pareto / cost # --------------------------------------------------------------------------- def _pareto_data(cost_points, front=None, speed_points=None, co2_points=None): return { "ranking": [{"engine": p["engine"], "mean_cer": p["cer"], "documents": 10, "failed": 0} for p in cost_points], "pareto": { "cost": {"points": cost_points, "front": front or [p["engine"] for p in cost_points]}, "speed": {"points": speed_points or [], "front": []}, "co2": {"points": co2_points or [], "front": []}, }, } class TestDetectParetoAlternative: def test_emits_when_alt_is_cheaper(self): data = _pareto_data( [ {"engine": "best", "cer": 0.02, "cost": 5.0}, {"engine": "cheap", "cer": 0.04, "cost": 0.1}, {"engine": "dominated", "cer": 0.05, "cost": 3.0}, ], front=["best", "cheap"], ) # Forcer "best" comme leader data["ranking"] = [ {"engine": "best", "mean_cer": 0.02, "documents": 10, "failed": 0}, {"engine": "cheap", "mean_cer": 0.04, "documents": 10, "failed": 0}, {"engine": "dominated", "mean_cer": 0.05, "documents": 10, "failed": 0}, ] facts = detect_pareto_alternative(data) assert len(facts) == 1 assert facts[0].payload["engine"] == "cheap" assert facts[0].payload["leader"] == "best" assert facts[0].payload["cost_saving_ratio"] >= 10 def test_empty_when_front_has_only_leader(self): data = _pareto_data( [{"engine": "best", "cer": 0.02, "cost": 5.0}], front=["best"], ) assert detect_pareto_alternative(data) == [] def test_empty_when_no_pareto_section(self): assert detect_pareto_alternative({}) == [] class TestDetectCostOutlier: def test_flags_expensive_dominated_engine(self): data = _pareto_data( [ {"engine": "cheap", "cer": 0.05, "cost": 0.1}, {"engine": "normal", "cer": 0.08, "cost": 1.0}, {"engine": "expensive_bad", "cer": 0.15, "cost": 20.0}, ], front=["cheap"], ) facts = detect_cost_outlier(data) assert any(f.payload["engine"] == "expensive_bad" for f in facts) def test_does_not_flag_expensive_on_front(self): # Un moteur cher mais sur le front = coût justifié par qualité unique data = _pareto_data( [ {"engine": "cheap", "cer": 0.30, "cost": 0.1}, {"engine": "normal", "cer": 0.15, "cost": 1.0}, {"engine": "expensive_best", "cer": 0.02, "cost": 20.0}, ], front=["cheap", "expensive_best"], ) facts = detect_cost_outlier(data) names = {f.payload["engine"] for f in facts} assert "expensive_best" not in names # --------------------------------------------------------------------------- # 4. Intégration rapport HTML # --------------------------------------------------------------------------- @pytest.fixture(scope="module") def benchmark_result(): from picarones import fixtures return fixtures.generate_sample_benchmark(n_docs=8) class TestReportIntegration: def test_report_contains_pareto_card(self, benchmark_result, tmp_path): from picarones.report.generator import ReportGenerator out = tmp_path / "report.html" ReportGenerator(benchmark_result).generate(out) html = out.read_text(encoding="utf-8") assert 'class="chart-card pareto-card"' in html assert 'id="pareto-chart"' in html assert 'setParetoAxis(\'cost\')' in html assert 'setParetoAxis(\'speed\')' in html assert 'setParetoAxis(\'co2\')' in html assert "pareto-experimental" in html # étiquette expérimental def test_report_json_contains_pareto_data(self, benchmark_result): from picarones.report.generator import _build_report_data data = _build_report_data(benchmark_result, images_b64={}) pareto = data.get("pareto", {}) assert "cost" in pareto assert "speed" in pareto assert "co2" in pareto assert "pricing_meta" in pareto # Les moteurs doivent porter leur champ cost for e in data["engines"]: assert "cost" in e, f"Moteur {e.get('name')} sans champ cost" def test_synthesis_may_include_pareto_sentence(self, benchmark_result, tmp_path): # Sur la fixture de démo, pero_ocr + tesseract sont sur le front → la # synthèse doit remonter une alternative moins chère from picarones.report.generator import ReportGenerator out = tmp_path / "report.html" ReportGenerator(benchmark_result).generate(out) html = out.read_text(encoding="utf-8") m = re.search(r'