""" Medallion Pipeline Audit — Orsync Scenarist v7.0 PRD Constraints ========================================================== Traces a mock JSON payload from Bronze → Silver → Gold. Asserts: Median Imputation, Zero-PII Anonymization, quality filtering. """ from __future__ import annotations import json import os import tempfile from typing import Any from unittest import mock import numpy as np import pandas as pd import pytest REAL_NAMES = [ "Dr. Sarah Johnson", "Prof. Ahmed Al-Rashid", "Dr. Lena Müller", "Dr. Chen Wei", "Dr. María García", ] def _bronze_payload() -> list[dict[str, Any]]: """Simulates raw Bronze records with real names and missing fields.""" return [ { "display_name": REAL_NAMES[0], "code_name": "Dr_OA_Oncology_001", "source": "OpenAlex", "last_known_institution": "University of Cape Town", "institution_type": "education", "institution_country": "ZA", "h_index": 12, "works_count": 45, "cited_by_count": 800, "i10_index": 8, "2yr_mean_citedness": 3.2, "years_active": 10, "publication_count": None, "journal_impact_proxy": None, "co_author_count": 15, "grant_count": 3, "trials_led": None, "enrollment_size": None, "topics": ["Oncology", "Immunotherapy"], }, { "display_name": REAL_NAMES[1], "code_name": "Dr_OA_Oncology_002", "source": "OpenAlex", "last_known_institution": "Stellenbosch University", "institution_type": "education", "institution_country": "ZA", "h_index": 25, "works_count": 120, "cited_by_count": 3200, "i10_index": 20, "2yr_mean_citedness": 7.5, "years_active": 18, "publication_count": 110, "journal_impact_proxy": 4.5, "co_author_count": 40, "grant_count": 8, "trials_led": 5, "enrollment_size": 300, "topics": ["Oncology", "Clinical Trials"], }, { "display_name": REAL_NAMES[2], "code_name": "Dr_PM_Oncology_001", "source": "PubMed", "last_known_institution": "Groote Schuur Hospital", "institution_type": "healthcare", "institution_country": "ZA", "h_index": 8, "works_count": 20, "cited_by_count": 150, "i10_index": None, "2yr_mean_citedness": None, "years_active": 5, "publication_count": 18, "journal_impact_proxy": 2.1, "co_author_count": None, "grant_count": None, "trials_led": 1, "enrollment_size": 50, "topics": ["Radiation Oncology"], }, { "display_name": REAL_NAMES[3], "code_name": "Dr_CT_Oncology_001", "source": "ClinicalTrials", "last_known_institution": "Tygerberg Hospital", "institution_type": "healthcare", "institution_country": "ZA", "h_index": None, "works_count": None, "cited_by_count": None, "i10_index": None, "2yr_mean_citedness": None, "years_active": None, "publication_count": None, "journal_impact_proxy": None, "co_author_count": None, "grant_count": None, "trials_led": 3, "enrollment_size": 200, "topics": [], "trial_phases": ["Phase 2"], "trial_status": "RECRUITING", }, { "display_name": REAL_NAMES[4], "code_name": "Dr_OA_Oncology_003", "source": "OpenAlex", "last_known_institution": "Unknown", "institution_type": "unknown", "institution_country": "ZA", }, ] class TestZeroPIIAnonymization: """PRD §5: real names must NEVER survive past Bronze normalization.""" def test_silver_normalize_drops_display_name(self): silver_mod = _import_silver() raw = _bronze_payload()[0] normalized = silver_mod._normalize(raw) assert "display_name" not in normalized assert "name" not in normalized for value in normalized.values(): if isinstance(value, str): for real_name in REAL_NAMES: assert real_name not in value def test_silver_output_contains_only_code_names(self): silver_mod = _import_silver() for raw in _bronze_payload(): normalized = silver_mod._normalize(raw) assert "code_name" in normalized assert normalized["code_name"] is not None assert normalized["code_name"].startswith("Dr_") def test_no_pii_leaks_in_full_pipeline(self): silver_records = _run_silver_on_mock_bronze() json_dump = json.dumps(silver_records) for real_name in REAL_NAMES: assert real_name not in json_dump, f"PII leak: '{real_name}' found in Silver output" class TestMedianImputation: """PRD §6: Silver layer applies median imputation for numeric fields.""" def test_null_numeric_fields_are_imputed(self): silver_records = _run_silver_on_mock_bronze() for record in silver_records: for field in [ "h_index", "works_count", "cited_by_count", "i10_index", "2yr_mean_citedness", "years_active", "publication_count", "journal_impact_proxy", "co_author_count", "grant_count", "trials_led", "enrollment_size", ]: value = record.get(field) assert value is not None, f"Field '{field}' should be imputed, got None" assert not (isinstance(value, float) and np.isnan(value)), ( f"Field '{field}' contains NaN after imputation" ) def test_imputed_values_are_medians(self): """Verify imputation uses median, not mean or zero-fill.""" silver_records = _run_silver_on_mock_bronze() frame = pd.DataFrame.from_records(silver_records) bronze = _bronze_payload() known_h_indices = [r["h_index"] for r in bronze if r.get("h_index") is not None] expected_median = float(np.nanmedian(known_h_indices)) records_that_had_null_h = [ r for i, r in enumerate(silver_records) if _bronze_payload()[i].get("h_index") is None ] if len(silver_records) == len(_bronze_payload()) else [] if records_that_had_null_h: for r in records_that_had_null_h: assert abs(r["h_index"] - expected_median) < 1e-6 def test_ordinal_encoding_applied(self): silver_records = _run_silver_on_mock_bronze() for record in silver_records: assert "institution_type_encoded" in record assert "institution_country_encoded" in record assert isinstance(record["institution_type_encoded"], (int, float)) class TestGoldLayerQualityFilter: """PRD §7: Gold rejects records with > 50% missing data.""" def test_sparse_record_excluded_from_gold(self): silver_records = _run_silver_on_mock_bronze() gold_records = _run_gold_on_silver(silver_records) for record in gold_records: total = len(record) missing = sum(1 for v in record.values() if v in (None, "", [], {})) assert missing / total <= 0.5, ( f"Gold record has {missing}/{total} missing fields (>{50}%)" ) def test_feature_manifest_generated(self): silver_records = _run_silver_on_mock_bronze() gold_records = _run_gold_on_silver(silver_records) manifest = _build_manifest(gold_records) assert "features" in manifest assert "record_count" in manifest assert manifest["record_count"] == len(gold_records) # ═══════════════════════════════════════════════════════════════════ # Helpers — run pipeline stages in isolation with temp directories # ═══════════════════════════════════════════════════════════════════ def _import_silver(): """Import the silver pipeline module from the data-sandbox.""" import importlib import sys sandbox_root = os.path.abspath( os.path.join(os.path.dirname(__file__), "..", "data-sandbox") ) pipeline_dir = os.path.join(sandbox_root, "pipeline") if sandbox_root not in sys.path: sys.path.insert(0, sandbox_root) if pipeline_dir not in sys.path: sys.path.insert(0, pipeline_dir) import silver as silver_mod importlib.reload(silver_mod) return silver_mod def _run_silver_on_mock_bronze() -> list[dict[str, Any]]: """Execute Silver normalization + imputation on in-memory Bronze data.""" silver_mod = _import_silver() records = _bronze_payload() normalized = [silver_mod._normalize(r) for r in records] deduped = silver_mod._dedupe(normalized) imputed = silver_mod._impute_and_encode(deduped) return imputed def _run_gold_on_silver(silver_records: list[dict[str, Any]]) -> list[dict[str, Any]]: def _missing_ratio(record: dict[str, Any]) -> float: total = len(record) if total == 0: return 1.0 missing = sum(1 for v in record.values() if v in (None, "", [], {})) return missing / total return [r for r in silver_records if _missing_ratio(r) <= 0.5] def _build_manifest(records: list[dict[str, Any]]) -> dict[str, Any]: if not records: return {"features": [], "record_count": 0} sample = records[0] features = [] for key, value in sample.items(): numeric_values = [r[key] for r in records if isinstance(r.get(key), (int, float))] features.append({ "name": key, "type": type(value).__name__, "min": min(numeric_values) if numeric_values else None, "max": max(numeric_values) if numeric_values else None, }) return {"features": features, "record_count": len(records)}