""" Math Engine Validation Suite — Orsync Scenarist v7.0 PRD Constraints ============================================================= Targets: vectorizer (PCA/EVR), heatmap (Mahalanobis), quality_gate (KL Divergence) """ from __future__ import annotations import numpy as np import pytest from numpy.testing import assert_allclose from sklearn.decomposition import PCA from backend.app.services.campaign_vectorizer import FEATURE_KEYS from backend.app.services.vectorizer import vectorize_doctors from backend.app.services.heatmap import mahalanobis_distance, build_heatmap from backend.app.services.quality_gate import ( kl_divergence, mmd_rbf, pass_quality_gate, regenerate_until_quality, ) # ═══════════════════════════════════════════════════════════════════ # 1. PCA / EVR — Weighted-average activates strictly when PC1 < 0.80 # ═══════════════════════════════════════════════════════════════════ def _make_records(n: int, dim: int, dominant: bool) -> list[dict[str, float]]: """Generate synthetic doctor profiles. When *dominant* is True, feature_0 carries ~99 % of variance so PC1-EVR will be > 0.80. When False, variance is spread uniformly so PC1-EVR will be < 0.80. Data is centered (mean ~0) with moderate scale so the vectorizer's skewness guard (|skew| > 2) does NOT trigger log1p — keeping the variance structure intact for the PCA assertion. """ rng = np.random.RandomState(42) if dominant: latent = rng.randn(n, 1) loadings = rng.randn(1, dim) * 10.0 data = latent @ loadings + rng.randn(n, dim) * 0.1 + 500.0 else: data = rng.randn(n, dim) * 10.0 + 500.0 active_keys = FEATURE_KEYS[:dim] records: list[dict[str, float]] = [] for row in data: record = {key: 500.0 for key in FEATURE_KEYS} for j, key in enumerate(active_keys): record[key] = float(row[j]) records.append(record) return records class TestPCAWeightedAverageActivation: """PRD §2: weighted-average path activates IFF PC1 EVR < 0.80.""" def test_pc1_dominant_uses_first_component_only(self): records = _make_records(n=50, dim=6, dominant=True) result = vectorize_doctors(records) evr = result["explained_variance_ratio"] assert evr[0] >= 0.80, f"Expected dominant PC1 but got EVR[0]={evr[0]:.4f}" vectors = np.array(result["vectors"]) assert vectors.shape[1] == 1 def test_pc1_weak_activates_weighted_average(self): records = _make_records(n=50, dim=6, dominant=False) result = vectorize_doctors(records) evr = result["explained_variance_ratio"] assert evr[0] < 0.80, f"Expected weak PC1 but got EVR[0]={evr[0]:.4f}" vectors = np.array(result["vectors"]) assert vectors.shape[1] == 1 def test_weighted_average_formula_matches_manual_calculation(self): records = _make_records(n=40, dim=5, dominant=False) result = vectorize_doctors(records) evr = np.array(result["explained_variance_ratio"]) assert evr[0] < 0.80 numeric_keys = list(FEATURE_KEYS) raw = np.array( [[float(r.get(k, 0.0)) for k in numeric_keys] for r in records], dtype=float, ) from scipy.stats import skew as sp_skew from sklearn.preprocessing import MinMaxScaler, RobustScaler if np.any(np.abs(sp_skew(raw, axis=0, nan_policy="omit")) > 2.0): x_scaled = np.log1p(np.clip(raw, 0.0, None)) else: x_scaled = RobustScaler().fit_transform(raw) pca = PCA(n_components=min(x_scaled.shape)) x_pca = pca.fit_transform(x_scaled) evr_manual = pca.explained_variance_ratio_ weighted = (x_pca * evr_manual).sum(axis=1, keepdims=True) / max( float(evr_manual.sum()), 1e-9 ) bounded = MinMaxScaler(feature_range=(0.0, 1.0)).fit_transform(weighted) assert_allclose(np.array(result["vectors"]), bounded, atol=1e-9) def test_empty_records_returns_empty(self): result = vectorize_doctors([]) assert result["vectors"] == [] def test_evr_boundary_at_exactly_080(self): """The threshold is strictly < 0.80; at exactly 0.80 we use PC1 path.""" rng = np.random.RandomState(99) for _ in range(20): records = _make_records(n=60, dim=4, dominant=True) result = vectorize_doctors(records) evr = result["explained_variance_ratio"] if abs(evr[0] - 0.80) < 0.01: break assert result["vectors"] # ═══════════════════════════════════════════════════════════════════ # 2. Mahalanobis Distance — Inverse Covariance Matrix (Σ⁻¹) # ═══════════════════════════════════════════════════════════════════ class TestMahalanobisDistance: """PRD §3: correct implementation of Σ⁻¹ for campaign–cluster distance.""" def test_identity_covariance_equals_euclidean(self): x = np.array([3.0, 4.0]) mu = np.array([0.0, 0.0]) sigma = np.eye(2) d = mahalanobis_distance(x, mu, sigma) assert_allclose(d, 5.0, atol=1e-9) def test_scaled_covariance(self): x = np.array([3.0, 4.0]) mu = np.array([0.0, 0.0]) sigma = np.array([[9.0, 0.0], [0.0, 16.0]]) d = mahalanobis_distance(x, mu, sigma) expected = np.sqrt((3.0**2) / 9.0 + (4.0**2) / 16.0) assert_allclose(d, expected, atol=1e-9) def test_correlated_covariance(self): x = np.array([1.0, 1.0]) mu = np.array([0.0, 0.0]) sigma = np.array([[1.0, 0.5], [0.5, 1.0]]) sigma_inv = np.linalg.inv(sigma) delta = x - mu expected = float(np.sqrt(delta @ sigma_inv @ delta)) d = mahalanobis_distance(x, mu, sigma) assert_allclose(d, expected, atol=1e-9) def test_singular_covariance_uses_pseudoinverse(self): x = np.array([1.0, 2.0]) mu = np.array([0.0, 0.0]) sigma = np.array([[1.0, 1.0], [1.0, 1.0]]) # rank 1 d = mahalanobis_distance(x, mu, sigma) assert np.isfinite(d) def test_heatmap_ranking_order_matches_distance(self): campaign = [1.0, 0.0] near = [1.0, 0.1] far = [10.0, 10.0] cov = [[1.0, 0.0], [0.0, 1.0]] result = build_heatmap(campaign, [near, far], [cov, cov]) ranking = result["ranking"] assert ranking[0]["distance"] < ranking[1]["distance"] assert ranking[0]["cluster_id"] == 0 def test_heatmap_probabilities_sum_to_one(self): campaign = [2.0, 3.0] centroids = [[0.0, 0.0], [5.0, 5.0], [2.0, 3.0]] cov = [[1.0, 0.0], [0.0, 1.0]] result = build_heatmap(campaign, centroids, [cov] * 3) total_prob = sum(r["probability"] for r in result["ranking"]) assert_allclose(total_prob, 1.0, atol=1e-9) def test_sigma_inv_manual_matches_numpy(self): sigma = np.array([[2.0, 1.0], [1.0, 3.0]]) sigma_inv_expected = np.linalg.inv(sigma) sigma_inv_actual = np.linalg.pinv(sigma) assert_allclose(sigma_inv_actual, sigma_inv_expected, atol=1e-9) # ═══════════════════════════════════════════════════════════════════ # 3. Quality Gate — KL Divergence rejection + regeneration # ═══════════════════════════════════════════════════════════════════ class TestKLDivergenceQualityGate: """PRD §4: D_KL > threshold ⟹ reject and regenerate immediately.""" def test_identical_distributions_have_near_zero_kl(self): rng = np.random.RandomState(0) data = rng.randn(500) d_kl = kl_divergence(data, data.copy()) assert d_kl < 0.01 def test_divergent_distributions_have_high_kl(self): rng = np.random.RandomState(0) p = rng.randn(500) q = rng.randn(500) + 10.0 d_kl = kl_divergence(p, q) assert d_kl > 0.05 def test_quality_gate_accepts_when_below_threshold(self): rng = np.random.RandomState(7) ref = rng.randn(100, 1).tolist() syn = (rng.randn(100, 1) * 1.01).tolist() verdict = pass_quality_gate(ref, syn, kl_threshold=5.0) assert verdict["accepted"] is True assert verdict["kl_divergence"] <= 5.0 def test_quality_gate_rejects_when_above_threshold(self): rng = np.random.RandomState(7) ref = rng.randn(200, 1).tolist() syn = (rng.randn(200, 1) + 50.0).tolist() verdict = pass_quality_gate(ref, syn, kl_threshold=0.01) assert verdict["accepted"] is False assert verdict["kl_divergence"] > 0.01 def test_rejection_triggers_regeneration(self): """Simulate D_KL >> threshold: regenerate_until_quality must accumulate rejection entries before (possibly) accepting.""" rng = np.random.RandomState(42) ref = rng.randn(50, 2).tolist() mu_far = [100.0, 100.0] sigma_tight = [[0.001, 0.0], [0.0, 0.001]] result = regenerate_until_quality( mu=mu_far, sigma=sigma_tight, reference_vectors=ref, max_attempts=5, kl_threshold=0.001, ) assert len(result["rejections"]) > 0, "Expected at least one rejection event" for rej in result["rejections"]: assert rej["kl_divergence"] > 0.001 def test_regeneration_accepts_matching_distribution(self): rng = np.random.RandomState(42) ref = rng.multivariate_normal([0.0, 0.0], [[1.0, 0.0], [0.0, 1.0]], size=200).tolist() result = regenerate_until_quality( mu=[0.0, 0.0], sigma=[[1.0, 0.0], [0.0, 1.0]], reference_vectors=ref, max_attempts=10, kl_threshold=1.0, ) assert result["accepted"] is True assert len(result["vector_samples"]) > 0 def test_mmd_rbf_identical_is_near_zero(self): rng = np.random.RandomState(0) x = rng.randn(50, 3) assert mmd_rbf(x, x.copy()) < 1e-9 def test_mmd_rbf_different_is_positive(self): rng = np.random.RandomState(0) x = rng.randn(50, 3) y = rng.randn(50, 3) + 5.0 assert mmd_rbf(x, y) > 0.0 def test_kl_divergence_is_non_negative(self): rng = np.random.RandomState(1) p = rng.randn(300) q = rng.randn(300) * 2.0 assert kl_divergence(p, q) >= 0.0