Spaces:
Running
Running
GitHub Actions Deployer
Automated Worker deployment from GitHub commit e01e0e57b4098452f16a9b5baf85b3b230865b5f
185ef9e | """Evidence-backed multi-mutation biomarker analysis.""" | |
| from __future__ import annotations | |
| import json | |
| import re | |
| from dataclasses import dataclass | |
| from itertools import combinations | |
| from pathlib import Path | |
| from typing import Any | |
| RULESET_PATH = Path(__file__).resolve().parent / "data" / "multi_mutation_signatures.v1.json" | |
| def _clamp(value: float, lower: float = 0.0, upper: float = 1.0) -> float: | |
| return max(lower, min(value, upper)) | |
| def _text(value: Any) -> str: | |
| return str(value or "").strip() | |
| def _canonical_variant_id(row: dict[str, Any]) -> str: | |
| explicit = _text( | |
| row.get("variant_id") | |
| or row.get("hgvsp") | |
| or row.get("hgvsc") | |
| or row.get("variant_key") | |
| or row.get("id") | |
| ) | |
| if explicit: | |
| return explicit | |
| chrom = _text(row.get("chrom") or row.get("chr")) | |
| pos = _text(row.get("pos") or row.get("position")) | |
| ref = _text(row.get("ref") or row.get("reference")) | |
| alt = _text(row.get("alt") or row.get("alternate")) | |
| return ":".join(part for part in (chrom, pos, f"{ref}>{alt}" if ref or alt else "") if part) | |
| def _variant_key(row: dict[str, Any]) -> tuple[str, str, str]: | |
| return ( | |
| _text(row.get("sample_id")).upper(), | |
| _text(row.get("gene")).upper(), | |
| re.sub(r"[^A-Z0-9]", "", _canonical_variant_id(row).upper()), | |
| ) | |
| def _score(row: dict[str, Any]) -> float: | |
| try: | |
| return _clamp(float(row.get("pathogenicity_score") or 0.0)) | |
| except (TypeError, ValueError): | |
| return 0.0 | |
| def _is_qualifying(row: dict[str, Any]) -> bool: | |
| label = _text( | |
| row.get("clinical_significance") | |
| or row.get("clinvar_significance") | |
| or row.get("pathogenicity") | |
| ).lower() | |
| if label in {"pathogenic", "likely pathogenic"}: | |
| return True | |
| return _score(row) >= 0.7 | |
| def _supporting_variant(row: dict[str, Any]) -> dict[str, Any]: | |
| fields = ( | |
| "gene", | |
| "variant_id", | |
| "chrom", | |
| "pos", | |
| "ref", | |
| "alt", | |
| "consequence", | |
| "pathogenicity_score", | |
| "pathogenicity_tier", | |
| "genotype", | |
| "zygosity", | |
| "phase_set", | |
| "allele_fraction", | |
| "depth", | |
| "origin", | |
| "genome_build", | |
| "sample_id", | |
| ) | |
| result = {field_name: row.get(field_name) for field_name in fields} | |
| result["variant_id"] = _canonical_variant_id(row) | |
| result["gene"] = _text(row.get("gene")).upper() | |
| return result | |
| class MultiMutationBiomarker: | |
| signature_id: str | |
| interpretation_mode: str | |
| relationship_type: str | |
| participating_variants: list[dict[str, Any]] | |
| disease_context: list[str] | |
| evidence_level: str | |
| confidence: float | |
| source_references: list[str] | |
| limitations: list[str] | |
| interpretation: str | |
| effect_direction: str = "" | |
| sample_id: str | None = None | |
| phase_status: str = "not_applicable" | |
| ruleset_version: str = "" | |
| def to_dict(self) -> dict[str, Any]: | |
| return { | |
| "signature_id": self.signature_id, | |
| "interpretation_mode": self.interpretation_mode, | |
| "relationship_type": self.relationship_type, | |
| "participating_variants": self.participating_variants, | |
| "disease_context": self.disease_context, | |
| "evidence_level": self.evidence_level, | |
| "confidence": round(self.confidence, 4), | |
| "source_references": self.source_references, | |
| "limitations": self.limitations, | |
| "interpretation": self.interpretation, | |
| "effect_direction": self.effect_direction, | |
| "sample_id": self.sample_id, | |
| "phase_status": self.phase_status, | |
| "ruleset_version": self.ruleset_version, | |
| } | |
| class MultiMutationAnalyzer: | |
| """Apply versioned local rules without inferring unavailable phase or clonality.""" | |
| def __init__(self, ruleset_path: Path | None = None): | |
| self.ruleset_path = ruleset_path or RULESET_PATH | |
| payload = json.loads(self.ruleset_path.read_text(encoding="utf-8")) | |
| self._validate_ruleset(payload) | |
| self.ruleset_version = str(payload["ruleset_version"]) | |
| self.reviewed_at = str(payload["reviewed_at"]) | |
| self.signatures = list(payload["signatures"]) | |
| def _validate_ruleset(payload: dict[str, Any]) -> None: | |
| required_root = {"schema_version", "ruleset_version", "reviewed_at", "signatures"} | |
| if not required_root.issubset(payload): | |
| raise ValueError("Multi-mutation ruleset is missing required metadata") | |
| seen: set[str] = set() | |
| required_rule = { | |
| "signature_id", | |
| "mode", | |
| "relationship_type", | |
| "genes", | |
| "disease_context", | |
| "evidence_level", | |
| "required_predicates", | |
| "effect_direction", | |
| "source_references", | |
| } | |
| for rule in payload["signatures"]: | |
| if not required_rule.issubset(rule): | |
| raise ValueError("Multi-mutation signature is missing required fields") | |
| if rule["mode"] not in {"somatic", "germline"}: | |
| raise ValueError(f"Invalid signature mode: {rule['mode']}") | |
| signature_id = str(rule["signature_id"]) | |
| if signature_id in seen: | |
| raise ValueError(f"Duplicate signature ID: {signature_id}") | |
| seen.add(signature_id) | |
| def analyze( | |
| self, | |
| variants: list[dict[str, Any]], | |
| *, | |
| interpretation_mode: str | None, | |
| sample_id: str | None = None, | |
| ) -> dict[str, Any]: | |
| if interpretation_mode not in {"somatic", "germline"}: | |
| return { | |
| "interpretation_mode": None, | |
| "ruleset_version": self.ruleset_version, | |
| "ruleset_reviewed_at": self.reviewed_at, | |
| "detected_biomarkers": [], | |
| "total_biomarkers": 0, | |
| "status": "disabled", | |
| "limitations": ["Select somatic or germline mode for composite interpretation."], | |
| } | |
| normalized = self._select_and_dedupe(variants, sample_id=sample_id) | |
| normalized = [ | |
| row | |
| for row in normalized | |
| if _text(row.get("origin")).lower() in {"", "unknown", interpretation_mode} | |
| ] | |
| findings = ( | |
| self._analyze_germline(normalized) | |
| if interpretation_mode == "germline" | |
| else self._analyze_somatic(normalized) | |
| ) | |
| return { | |
| "interpretation_mode": interpretation_mode, | |
| "ruleset_version": self.ruleset_version, | |
| "ruleset_reviewed_at": self.reviewed_at, | |
| "detected_biomarkers": [finding.to_dict() for finding in findings], | |
| "total_biomarkers": len(findings), | |
| "status": "completed", | |
| "limitations": [ | |
| "Research use only; composite findings require expert review.", | |
| "Allele fraction and depth do not establish clonality.", | |
| ], | |
| } | |
| def _select_and_dedupe( | |
| self, variants: list[dict[str, Any]], *, sample_id: str | None | |
| ) -> list[dict[str, Any]]: | |
| observed_samples = { | |
| _text(row.get("sample_id")) for row in variants if _text(row.get("sample_id")) | |
| } | |
| if sample_id: | |
| if observed_samples and sample_id not in observed_samples: | |
| raise ValueError(f"Sample '{sample_id}' is not present in variant evidence") | |
| variants = [ | |
| row for row in variants if not _text(row.get("sample_id")) or _text(row.get("sample_id")) == sample_id | |
| ] | |
| elif len(observed_samples) > 1: | |
| raise ValueError("Multiple samples are present; select exactly one sample") | |
| unique: dict[tuple[str, str, str], dict[str, Any]] = {} | |
| for raw_row in variants: | |
| row = dict(raw_row) | |
| row["gene"] = _text(row.get("gene")).upper() | |
| row["variant_id"] = _canonical_variant_id(row) | |
| if not row["gene"] or not row["variant_id"]: | |
| continue | |
| key = _variant_key(row) | |
| if key not in unique or _score(row) > _score(unique[key]): | |
| unique[key] = row | |
| return list(unique.values()) | |
| def _analyze_germline(self, variants: list[dict[str, Any]]) -> list[MultiMutationBiomarker]: | |
| findings: list[MultiMutationBiomarker] = [] | |
| for rule in self.signatures: | |
| if rule["mode"] != "germline": | |
| continue | |
| gene = str(rule["genes"][0]).upper() | |
| candidates = [row for row in variants if row["gene"] == gene and _is_qualifying(row)] | |
| homozygous = [row for row in candidates if self._is_homozygous(row)] | |
| for row in homozygous: | |
| findings.append( | |
| self._build_finding( | |
| rule, | |
| [row], | |
| relationship_type="homozygous", | |
| phase_status="homozygous", | |
| interpretation=( | |
| f"A homozygous qualifying {gene} variant supports a biallelic research hypothesis." | |
| ), | |
| limitations=["Clinical significance and phenotype compatibility require expert review."], | |
| ) | |
| ) | |
| heterozygous = [row for row in candidates if not self._is_homozygous(row)] | |
| for first, second in combinations(heterozygous, 2): | |
| phase_status = self._phase_relationship(first, second) | |
| if phase_status == "cis": | |
| continue | |
| confirmed = phase_status == "in_trans" | |
| limitations = ["Clinical significance and phenotype compatibility require expert review."] | |
| if not confirmed: | |
| limitations.insert(0, "Phase is unknown; in-trans status was not inferred.") | |
| findings.append( | |
| self._build_finding( | |
| rule, | |
| [first, second], | |
| relationship_type=( | |
| "confirmed_compound_heterozygous" | |
| if confirmed | |
| else "possible_compound_heterozygous" | |
| ), | |
| phase_status=phase_status, | |
| interpretation=( | |
| f"Two qualifying {gene} variants are confirmed in trans." | |
| if confirmed | |
| else f"Two qualifying {gene} variants provide possible compound evidence; phase is unresolved." | |
| ), | |
| limitations=limitations, | |
| ) | |
| ) | |
| return self._dedupe_findings(findings) | |
| def _analyze_somatic(self, variants: list[dict[str, Any]]) -> list[MultiMutationBiomarker]: | |
| findings: list[MultiMutationBiomarker] = [] | |
| qualifying = [row for row in variants if _is_qualifying(row)] | |
| by_gene: dict[str, list[dict[str, Any]]] = {} | |
| for row in qualifying: | |
| by_gene.setdefault(row["gene"], []).append(row) | |
| for gene, gene_variants in by_gene.items(): | |
| if len(gene_variants) < 2: | |
| continue | |
| generic_rule = { | |
| "signature_id": f"SOMATIC_{gene}_MULTIPLE_HITS", | |
| "mode": "somatic", | |
| "relationship_type": "same_gene_multiple_hits", | |
| "genes": [gene], | |
| "disease_context": [], | |
| "evidence_level": "Research", | |
| "effect_direction": "multi_hit_context", | |
| "source_references": ["Derived from sample-local structured variant evidence"], | |
| } | |
| findings.append( | |
| self._build_finding( | |
| generic_rule, | |
| gene_variants, | |
| relationship_type="same_gene_multiple_hits", | |
| phase_status="not_applicable", | |
| interpretation=f"Multiple qualifying somatic variants were observed in {gene} within one sample.", | |
| limitations=[ | |
| "Multiple hits do not establish biallelic inactivation or clonality.", | |
| "Copy-number and tumor-purity evidence were not evaluated.", | |
| ], | |
| ) | |
| ) | |
| if len(by_gene) >= 2: | |
| cross_gene_variants = [ | |
| max(gene_variants, key=_score) for gene_variants in by_gene.values() | |
| ] | |
| generic_cross_gene_rule = { | |
| "signature_id": "SOMATIC_CROSS_GENE_CO_MUTATION", | |
| "mode": "somatic", | |
| "relationship_type": "cross_gene_co_mutation", | |
| "genes": sorted(by_gene), | |
| "disease_context": [], | |
| "evidence_level": "Research", | |
| "effect_direction": "co_mutation_context", | |
| "source_references": ["Derived from sample-local structured variant evidence"], | |
| } | |
| findings.append( | |
| self._build_finding( | |
| generic_cross_gene_rule, | |
| cross_gene_variants, | |
| relationship_type="cross_gene_co_mutation", | |
| phase_status="not_applicable", | |
| interpretation=( | |
| "Qualifying somatic variants were observed across multiple genes in one sample." | |
| ), | |
| limitations=[ | |
| "This generic co-mutation finding is not a curated disease-specific signature.", | |
| "Allele fraction and depth do not prove that variants occur in the same clone.", | |
| ], | |
| ) | |
| ) | |
| for rule in self.signatures: | |
| if rule["mode"] != "somatic": | |
| continue | |
| matched = self._match_somatic_rule(rule, qualifying) | |
| if not matched: | |
| continue | |
| findings.append( | |
| self._build_finding( | |
| rule, | |
| matched, | |
| relationship_type=str(rule["relationship_type"]), | |
| phase_status="not_applicable", | |
| interpretation=( | |
| f"The sample matches curated research signature {rule['signature_id']}." | |
| ), | |
| limitations=[ | |
| "This association is contextual and does not establish treatment eligibility.", | |
| "Allele fraction and depth do not prove that variants occur in the same clone.", | |
| ], | |
| ) | |
| ) | |
| return self._dedupe_findings(findings) | |
| def _match_somatic_rule( | |
| self, rule: dict[str, Any], variants: list[dict[str, Any]] | |
| ) -> list[dict[str, Any]]: | |
| genes = [str(gene).upper() for gene in rule["genes"]] | |
| matches = [row for row in variants if row["gene"] in genes] | |
| if not all(any(row["gene"] == gene for row in matches) for gene in genes): | |
| return [] | |
| requirements = rule.get("variant_requirements", {}) | |
| selected: list[dict[str, Any]] = [] | |
| for gene, aliases in requirements.items(): | |
| gene_rows = [row for row in matches if row["gene"] == str(gene).upper()] | |
| normalized_aliases = {re.sub(r"[^A-Z0-9]", "", str(alias).upper()) for alias in aliases} | |
| required_rows = [ | |
| row | |
| for row in gene_rows | |
| if any( | |
| alias | |
| in re.sub(r"[^A-Z0-9]", "", _canonical_variant_id(row).upper()) | |
| for alias in normalized_aliases | |
| ) | |
| ] | |
| if not required_rows: | |
| return [] | |
| selected.extend(required_rows) | |
| required_genes = {str(gene).upper() for gene in requirements} | |
| selected.extend(row for row in matches if row["gene"] not in required_genes) | |
| return selected or matches | |
| def _build_finding( | |
| self, | |
| rule: dict[str, Any], | |
| variants: list[dict[str, Any]], | |
| *, | |
| relationship_type: str, | |
| phase_status: str, | |
| interpretation: str, | |
| limitations: list[str], | |
| ) -> MultiMutationBiomarker: | |
| confidence = self._confidence(variants, phase_status=phase_status) | |
| return MultiMutationBiomarker( | |
| signature_id=str(rule["signature_id"]), | |
| interpretation_mode=str(rule["mode"]), | |
| relationship_type=relationship_type, | |
| participating_variants=[_supporting_variant(row) for row in variants], | |
| disease_context=list(rule.get("disease_context", [])), | |
| evidence_level=str(rule.get("evidence_level", "Research")), | |
| confidence=confidence, | |
| source_references=list(rule.get("source_references", [])), | |
| limitations=limitations, | |
| interpretation=interpretation, | |
| effect_direction=str(rule.get("effect_direction", "")), | |
| sample_id=next((_text(row.get("sample_id")) for row in variants if _text(row.get("sample_id"))), None), | |
| phase_status=phase_status, | |
| ruleset_version=self.ruleset_version, | |
| ) | |
| def _is_homozygous(row: dict[str, Any]) -> bool: | |
| zygosity = _text(row.get("zygosity")).lower() | |
| genotype = _text(row.get("genotype")) | |
| return zygosity == "homozygous" or genotype in {"1/1", "1|1"} | |
| def _phase_relationship(first: dict[str, Any], second: dict[str, Any]) -> str: | |
| first_gt = _text(first.get("genotype")) | |
| second_gt = _text(second.get("genotype")) | |
| first_ps = _text(first.get("phase_set")) | |
| second_ps = _text(second.get("phase_set")) | |
| if not first_ps or first_ps != second_ps or "|" not in first_gt or "|" not in second_gt: | |
| return "unknown" | |
| if {first_gt, second_gt} == {"0|1", "1|0"}: | |
| return "in_trans" | |
| if first_gt == second_gt and first_gt in {"0|1", "1|0"}: | |
| return "cis" | |
| return "unknown" | |
| def _confidence(variants: list[dict[str, Any]], *, phase_status: str) -> float: | |
| scores = [_score(row) for row in variants] | |
| base = sum(scores) / len(scores) if scores else 0.45 | |
| depth_values = [float(row["depth"]) for row in variants if row.get("depth") not in (None, "")] | |
| vaf_values = [ | |
| float(row["allele_fraction"]) | |
| for row in variants | |
| if row.get("allele_fraction") not in (None, "") | |
| ] | |
| if depth_values and min(depth_values) >= 20: | |
| base += 0.04 | |
| if vaf_values and all(0.05 <= value <= 1.0 for value in vaf_values): | |
| base += 0.03 | |
| if phase_status == "in_trans": | |
| base += 0.08 | |
| elif phase_status == "unknown": | |
| base -= 0.12 | |
| return round(_clamp(base, 0.1, 0.98), 4) | |
| def _dedupe_findings(findings: list[MultiMutationBiomarker]) -> list[MultiMutationBiomarker]: | |
| unique: dict[tuple[str, tuple[str, ...]], MultiMutationBiomarker] = {} | |
| for finding in findings: | |
| variants = tuple( | |
| sorted( | |
| f"{row.get('gene')}:{row.get('variant_id')}" | |
| for row in finding.participating_variants | |
| ) | |
| ) | |
| unique[(finding.signature_id, variants)] = finding | |
| return sorted( | |
| unique.values(), key=lambda item: (item.confidence, item.signature_id), reverse=True | |
| ) | |