"""Evidence-backed multi-mutation biomarker analysis.""" from __future__ import annotations import json import re from dataclasses import dataclass from itertools import combinations from pathlib import Path from typing import Any RULESET_PATH = Path(__file__).resolve().parent / "data" / "multi_mutation_signatures.v1.json" def _clamp(value: float, lower: float = 0.0, upper: float = 1.0) -> float: return max(lower, min(value, upper)) def _text(value: Any) -> str: return str(value or "").strip() def _canonical_variant_id(row: dict[str, Any]) -> str: explicit = _text( row.get("variant_id") or row.get("hgvsp") or row.get("hgvsc") or row.get("variant_key") or row.get("id") ) if explicit: return explicit chrom = _text(row.get("chrom") or row.get("chr")) pos = _text(row.get("pos") or row.get("position")) ref = _text(row.get("ref") or row.get("reference")) alt = _text(row.get("alt") or row.get("alternate")) return ":".join(part for part in (chrom, pos, f"{ref}>{alt}" if ref or alt else "") if part) def _variant_key(row: dict[str, Any]) -> tuple[str, str, str]: return ( _text(row.get("sample_id")).upper(), _text(row.get("gene")).upper(), re.sub(r"[^A-Z0-9]", "", _canonical_variant_id(row).upper()), ) def _score(row: dict[str, Any]) -> float: try: return _clamp(float(row.get("pathogenicity_score") or 0.0)) except (TypeError, ValueError): return 0.0 def _is_qualifying(row: dict[str, Any]) -> bool: label = _text( row.get("clinical_significance") or row.get("clinvar_significance") or row.get("pathogenicity") ).lower() if label in {"pathogenic", "likely pathogenic"}: return True return _score(row) >= 0.7 def _supporting_variant(row: dict[str, Any]) -> dict[str, Any]: fields = ( "gene", "variant_id", "chrom", "pos", "ref", "alt", "consequence", "pathogenicity_score", "pathogenicity_tier", "genotype", "zygosity", "phase_set", "allele_fraction", "depth", "origin", "genome_build", "sample_id", ) result = {field_name: row.get(field_name) for field_name in fields} result["variant_id"] = _canonical_variant_id(row) result["gene"] = _text(row.get("gene")).upper() return result @dataclass class MultiMutationBiomarker: signature_id: str interpretation_mode: str relationship_type: str participating_variants: list[dict[str, Any]] disease_context: list[str] evidence_level: str confidence: float source_references: list[str] limitations: list[str] interpretation: str effect_direction: str = "" sample_id: str | None = None phase_status: str = "not_applicable" ruleset_version: str = "" def to_dict(self) -> dict[str, Any]: return { "signature_id": self.signature_id, "interpretation_mode": self.interpretation_mode, "relationship_type": self.relationship_type, "participating_variants": self.participating_variants, "disease_context": self.disease_context, "evidence_level": self.evidence_level, "confidence": round(self.confidence, 4), "source_references": self.source_references, "limitations": self.limitations, "interpretation": self.interpretation, "effect_direction": self.effect_direction, "sample_id": self.sample_id, "phase_status": self.phase_status, "ruleset_version": self.ruleset_version, } class MultiMutationAnalyzer: """Apply versioned local rules without inferring unavailable phase or clonality.""" def __init__(self, ruleset_path: Path | None = None): self.ruleset_path = ruleset_path or RULESET_PATH payload = json.loads(self.ruleset_path.read_text(encoding="utf-8")) self._validate_ruleset(payload) self.ruleset_version = str(payload["ruleset_version"]) self.reviewed_at = str(payload["reviewed_at"]) self.signatures = list(payload["signatures"]) @staticmethod def _validate_ruleset(payload: dict[str, Any]) -> None: required_root = {"schema_version", "ruleset_version", "reviewed_at", "signatures"} if not required_root.issubset(payload): raise ValueError("Multi-mutation ruleset is missing required metadata") seen: set[str] = set() required_rule = { "signature_id", "mode", "relationship_type", "genes", "disease_context", "evidence_level", "required_predicates", "effect_direction", "source_references", } for rule in payload["signatures"]: if not required_rule.issubset(rule): raise ValueError("Multi-mutation signature is missing required fields") if rule["mode"] not in {"somatic", "germline"}: raise ValueError(f"Invalid signature mode: {rule['mode']}") signature_id = str(rule["signature_id"]) if signature_id in seen: raise ValueError(f"Duplicate signature ID: {signature_id}") seen.add(signature_id) def analyze( self, variants: list[dict[str, Any]], *, interpretation_mode: str | None, sample_id: str | None = None, ) -> dict[str, Any]: if interpretation_mode not in {"somatic", "germline"}: return { "interpretation_mode": None, "ruleset_version": self.ruleset_version, "ruleset_reviewed_at": self.reviewed_at, "detected_biomarkers": [], "total_biomarkers": 0, "status": "disabled", "limitations": ["Select somatic or germline mode for composite interpretation."], } normalized = self._select_and_dedupe(variants, sample_id=sample_id) normalized = [ row for row in normalized if _text(row.get("origin")).lower() in {"", "unknown", interpretation_mode} ] findings = ( self._analyze_germline(normalized) if interpretation_mode == "germline" else self._analyze_somatic(normalized) ) return { "interpretation_mode": interpretation_mode, "ruleset_version": self.ruleset_version, "ruleset_reviewed_at": self.reviewed_at, "detected_biomarkers": [finding.to_dict() for finding in findings], "total_biomarkers": len(findings), "status": "completed", "limitations": [ "Research use only; composite findings require expert review.", "Allele fraction and depth do not establish clonality.", ], } def _select_and_dedupe( self, variants: list[dict[str, Any]], *, sample_id: str | None ) -> list[dict[str, Any]]: observed_samples = { _text(row.get("sample_id")) for row in variants if _text(row.get("sample_id")) } if sample_id: if observed_samples and sample_id not in observed_samples: raise ValueError(f"Sample '{sample_id}' is not present in variant evidence") variants = [ row for row in variants if not _text(row.get("sample_id")) or _text(row.get("sample_id")) == sample_id ] elif len(observed_samples) > 1: raise ValueError("Multiple samples are present; select exactly one sample") unique: dict[tuple[str, str, str], dict[str, Any]] = {} for raw_row in variants: row = dict(raw_row) row["gene"] = _text(row.get("gene")).upper() row["variant_id"] = _canonical_variant_id(row) if not row["gene"] or not row["variant_id"]: continue key = _variant_key(row) if key not in unique or _score(row) > _score(unique[key]): unique[key] = row return list(unique.values()) def _analyze_germline(self, variants: list[dict[str, Any]]) -> list[MultiMutationBiomarker]: findings: list[MultiMutationBiomarker] = [] for rule in self.signatures: if rule["mode"] != "germline": continue gene = str(rule["genes"][0]).upper() candidates = [row for row in variants if row["gene"] == gene and _is_qualifying(row)] homozygous = [row for row in candidates if self._is_homozygous(row)] for row in homozygous: findings.append( self._build_finding( rule, [row], relationship_type="homozygous", phase_status="homozygous", interpretation=( f"A homozygous qualifying {gene} variant supports a biallelic research hypothesis." ), limitations=["Clinical significance and phenotype compatibility require expert review."], ) ) heterozygous = [row for row in candidates if not self._is_homozygous(row)] for first, second in combinations(heterozygous, 2): phase_status = self._phase_relationship(first, second) if phase_status == "cis": continue confirmed = phase_status == "in_trans" limitations = ["Clinical significance and phenotype compatibility require expert review."] if not confirmed: limitations.insert(0, "Phase is unknown; in-trans status was not inferred.") findings.append( self._build_finding( rule, [first, second], relationship_type=( "confirmed_compound_heterozygous" if confirmed else "possible_compound_heterozygous" ), phase_status=phase_status, interpretation=( f"Two qualifying {gene} variants are confirmed in trans." if confirmed else f"Two qualifying {gene} variants provide possible compound evidence; phase is unresolved." ), limitations=limitations, ) ) return self._dedupe_findings(findings) def _analyze_somatic(self, variants: list[dict[str, Any]]) -> list[MultiMutationBiomarker]: findings: list[MultiMutationBiomarker] = [] qualifying = [row for row in variants if _is_qualifying(row)] by_gene: dict[str, list[dict[str, Any]]] = {} for row in qualifying: by_gene.setdefault(row["gene"], []).append(row) for gene, gene_variants in by_gene.items(): if len(gene_variants) < 2: continue generic_rule = { "signature_id": f"SOMATIC_{gene}_MULTIPLE_HITS", "mode": "somatic", "relationship_type": "same_gene_multiple_hits", "genes": [gene], "disease_context": [], "evidence_level": "Research", "effect_direction": "multi_hit_context", "source_references": ["Derived from sample-local structured variant evidence"], } findings.append( self._build_finding( generic_rule, gene_variants, relationship_type="same_gene_multiple_hits", phase_status="not_applicable", interpretation=f"Multiple qualifying somatic variants were observed in {gene} within one sample.", limitations=[ "Multiple hits do not establish biallelic inactivation or clonality.", "Copy-number and tumor-purity evidence were not evaluated.", ], ) ) if len(by_gene) >= 2: cross_gene_variants = [ max(gene_variants, key=_score) for gene_variants in by_gene.values() ] generic_cross_gene_rule = { "signature_id": "SOMATIC_CROSS_GENE_CO_MUTATION", "mode": "somatic", "relationship_type": "cross_gene_co_mutation", "genes": sorted(by_gene), "disease_context": [], "evidence_level": "Research", "effect_direction": "co_mutation_context", "source_references": ["Derived from sample-local structured variant evidence"], } findings.append( self._build_finding( generic_cross_gene_rule, cross_gene_variants, relationship_type="cross_gene_co_mutation", phase_status="not_applicable", interpretation=( "Qualifying somatic variants were observed across multiple genes in one sample." ), limitations=[ "This generic co-mutation finding is not a curated disease-specific signature.", "Allele fraction and depth do not prove that variants occur in the same clone.", ], ) ) for rule in self.signatures: if rule["mode"] != "somatic": continue matched = self._match_somatic_rule(rule, qualifying) if not matched: continue findings.append( self._build_finding( rule, matched, relationship_type=str(rule["relationship_type"]), phase_status="not_applicable", interpretation=( f"The sample matches curated research signature {rule['signature_id']}." ), limitations=[ "This association is contextual and does not establish treatment eligibility.", "Allele fraction and depth do not prove that variants occur in the same clone.", ], ) ) return self._dedupe_findings(findings) def _match_somatic_rule( self, rule: dict[str, Any], variants: list[dict[str, Any]] ) -> list[dict[str, Any]]: genes = [str(gene).upper() for gene in rule["genes"]] matches = [row for row in variants if row["gene"] in genes] if not all(any(row["gene"] == gene for row in matches) for gene in genes): return [] requirements = rule.get("variant_requirements", {}) selected: list[dict[str, Any]] = [] for gene, aliases in requirements.items(): gene_rows = [row for row in matches if row["gene"] == str(gene).upper()] normalized_aliases = {re.sub(r"[^A-Z0-9]", "", str(alias).upper()) for alias in aliases} required_rows = [ row for row in gene_rows if any( alias in re.sub(r"[^A-Z0-9]", "", _canonical_variant_id(row).upper()) for alias in normalized_aliases ) ] if not required_rows: return [] selected.extend(required_rows) required_genes = {str(gene).upper() for gene in requirements} selected.extend(row for row in matches if row["gene"] not in required_genes) return selected or matches def _build_finding( self, rule: dict[str, Any], variants: list[dict[str, Any]], *, relationship_type: str, phase_status: str, interpretation: str, limitations: list[str], ) -> MultiMutationBiomarker: confidence = self._confidence(variants, phase_status=phase_status) return MultiMutationBiomarker( signature_id=str(rule["signature_id"]), interpretation_mode=str(rule["mode"]), relationship_type=relationship_type, participating_variants=[_supporting_variant(row) for row in variants], disease_context=list(rule.get("disease_context", [])), evidence_level=str(rule.get("evidence_level", "Research")), confidence=confidence, source_references=list(rule.get("source_references", [])), limitations=limitations, interpretation=interpretation, effect_direction=str(rule.get("effect_direction", "")), sample_id=next((_text(row.get("sample_id")) for row in variants if _text(row.get("sample_id"))), None), phase_status=phase_status, ruleset_version=self.ruleset_version, ) @staticmethod def _is_homozygous(row: dict[str, Any]) -> bool: zygosity = _text(row.get("zygosity")).lower() genotype = _text(row.get("genotype")) return zygosity == "homozygous" or genotype in {"1/1", "1|1"} @staticmethod def _phase_relationship(first: dict[str, Any], second: dict[str, Any]) -> str: first_gt = _text(first.get("genotype")) second_gt = _text(second.get("genotype")) first_ps = _text(first.get("phase_set")) second_ps = _text(second.get("phase_set")) if not first_ps or first_ps != second_ps or "|" not in first_gt or "|" not in second_gt: return "unknown" if {first_gt, second_gt} == {"0|1", "1|0"}: return "in_trans" if first_gt == second_gt and first_gt in {"0|1", "1|0"}: return "cis" return "unknown" @staticmethod def _confidence(variants: list[dict[str, Any]], *, phase_status: str) -> float: scores = [_score(row) for row in variants] base = sum(scores) / len(scores) if scores else 0.45 depth_values = [float(row["depth"]) for row in variants if row.get("depth") not in (None, "")] vaf_values = [ float(row["allele_fraction"]) for row in variants if row.get("allele_fraction") not in (None, "") ] if depth_values and min(depth_values) >= 20: base += 0.04 if vaf_values and all(0.05 <= value <= 1.0 for value in vaf_values): base += 0.03 if phase_status == "in_trans": base += 0.08 elif phase_status == "unknown": base -= 0.12 return round(_clamp(base, 0.1, 0.98), 4) @staticmethod def _dedupe_findings(findings: list[MultiMutationBiomarker]) -> list[MultiMutationBiomarker]: unique: dict[tuple[str, tuple[str, ...]], MultiMutationBiomarker] = {} for finding in findings: variants = tuple( sorted( f"{row.get('gene')}:{row.get('variant_id')}" for row in finding.participating_variants ) ) unique[(finding.signature_id, variants)] = finding return sorted( unique.values(), key=lambda item: (item.confidence, item.signature_id), reverse=True )