Omnibimol-Worker / multi_mutation_analysis.py
GitHub Actions Deployer
Automated Worker deployment from GitHub commit e01e0e57b4098452f16a9b5baf85b3b230865b5f
185ef9e
Raw
History Blame Contribute Delete
19.7 kB
"""Evidence-backed multi-mutation biomarker analysis."""
from __future__ import annotations
import json
import re
from dataclasses import dataclass
from itertools import combinations
from pathlib import Path
from typing import Any
RULESET_PATH = Path(__file__).resolve().parent / "data" / "multi_mutation_signatures.v1.json"
def _clamp(value: float, lower: float = 0.0, upper: float = 1.0) -> float:
return max(lower, min(value, upper))
def _text(value: Any) -> str:
return str(value or "").strip()
def _canonical_variant_id(row: dict[str, Any]) -> str:
explicit = _text(
row.get("variant_id")
or row.get("hgvsp")
or row.get("hgvsc")
or row.get("variant_key")
or row.get("id")
)
if explicit:
return explicit
chrom = _text(row.get("chrom") or row.get("chr"))
pos = _text(row.get("pos") or row.get("position"))
ref = _text(row.get("ref") or row.get("reference"))
alt = _text(row.get("alt") or row.get("alternate"))
return ":".join(part for part in (chrom, pos, f"{ref}>{alt}" if ref or alt else "") if part)
def _variant_key(row: dict[str, Any]) -> tuple[str, str, str]:
return (
_text(row.get("sample_id")).upper(),
_text(row.get("gene")).upper(),
re.sub(r"[^A-Z0-9]", "", _canonical_variant_id(row).upper()),
)
def _score(row: dict[str, Any]) -> float:
try:
return _clamp(float(row.get("pathogenicity_score") or 0.0))
except (TypeError, ValueError):
return 0.0
def _is_qualifying(row: dict[str, Any]) -> bool:
label = _text(
row.get("clinical_significance")
or row.get("clinvar_significance")
or row.get("pathogenicity")
).lower()
if label in {"pathogenic", "likely pathogenic"}:
return True
return _score(row) >= 0.7
def _supporting_variant(row: dict[str, Any]) -> dict[str, Any]:
fields = (
"gene",
"variant_id",
"chrom",
"pos",
"ref",
"alt",
"consequence",
"pathogenicity_score",
"pathogenicity_tier",
"genotype",
"zygosity",
"phase_set",
"allele_fraction",
"depth",
"origin",
"genome_build",
"sample_id",
)
result = {field_name: row.get(field_name) for field_name in fields}
result["variant_id"] = _canonical_variant_id(row)
result["gene"] = _text(row.get("gene")).upper()
return result
@dataclass
class MultiMutationBiomarker:
signature_id: str
interpretation_mode: str
relationship_type: str
participating_variants: list[dict[str, Any]]
disease_context: list[str]
evidence_level: str
confidence: float
source_references: list[str]
limitations: list[str]
interpretation: str
effect_direction: str = ""
sample_id: str | None = None
phase_status: str = "not_applicable"
ruleset_version: str = ""
def to_dict(self) -> dict[str, Any]:
return {
"signature_id": self.signature_id,
"interpretation_mode": self.interpretation_mode,
"relationship_type": self.relationship_type,
"participating_variants": self.participating_variants,
"disease_context": self.disease_context,
"evidence_level": self.evidence_level,
"confidence": round(self.confidence, 4),
"source_references": self.source_references,
"limitations": self.limitations,
"interpretation": self.interpretation,
"effect_direction": self.effect_direction,
"sample_id": self.sample_id,
"phase_status": self.phase_status,
"ruleset_version": self.ruleset_version,
}
class MultiMutationAnalyzer:
"""Apply versioned local rules without inferring unavailable phase or clonality."""
def __init__(self, ruleset_path: Path | None = None):
self.ruleset_path = ruleset_path or RULESET_PATH
payload = json.loads(self.ruleset_path.read_text(encoding="utf-8"))
self._validate_ruleset(payload)
self.ruleset_version = str(payload["ruleset_version"])
self.reviewed_at = str(payload["reviewed_at"])
self.signatures = list(payload["signatures"])
@staticmethod
def _validate_ruleset(payload: dict[str, Any]) -> None:
required_root = {"schema_version", "ruleset_version", "reviewed_at", "signatures"}
if not required_root.issubset(payload):
raise ValueError("Multi-mutation ruleset is missing required metadata")
seen: set[str] = set()
required_rule = {
"signature_id",
"mode",
"relationship_type",
"genes",
"disease_context",
"evidence_level",
"required_predicates",
"effect_direction",
"source_references",
}
for rule in payload["signatures"]:
if not required_rule.issubset(rule):
raise ValueError("Multi-mutation signature is missing required fields")
if rule["mode"] not in {"somatic", "germline"}:
raise ValueError(f"Invalid signature mode: {rule['mode']}")
signature_id = str(rule["signature_id"])
if signature_id in seen:
raise ValueError(f"Duplicate signature ID: {signature_id}")
seen.add(signature_id)
def analyze(
self,
variants: list[dict[str, Any]],
*,
interpretation_mode: str | None,
sample_id: str | None = None,
) -> dict[str, Any]:
if interpretation_mode not in {"somatic", "germline"}:
return {
"interpretation_mode": None,
"ruleset_version": self.ruleset_version,
"ruleset_reviewed_at": self.reviewed_at,
"detected_biomarkers": [],
"total_biomarkers": 0,
"status": "disabled",
"limitations": ["Select somatic or germline mode for composite interpretation."],
}
normalized = self._select_and_dedupe(variants, sample_id=sample_id)
normalized = [
row
for row in normalized
if _text(row.get("origin")).lower() in {"", "unknown", interpretation_mode}
]
findings = (
self._analyze_germline(normalized)
if interpretation_mode == "germline"
else self._analyze_somatic(normalized)
)
return {
"interpretation_mode": interpretation_mode,
"ruleset_version": self.ruleset_version,
"ruleset_reviewed_at": self.reviewed_at,
"detected_biomarkers": [finding.to_dict() for finding in findings],
"total_biomarkers": len(findings),
"status": "completed",
"limitations": [
"Research use only; composite findings require expert review.",
"Allele fraction and depth do not establish clonality.",
],
}
def _select_and_dedupe(
self, variants: list[dict[str, Any]], *, sample_id: str | None
) -> list[dict[str, Any]]:
observed_samples = {
_text(row.get("sample_id")) for row in variants if _text(row.get("sample_id"))
}
if sample_id:
if observed_samples and sample_id not in observed_samples:
raise ValueError(f"Sample '{sample_id}' is not present in variant evidence")
variants = [
row for row in variants if not _text(row.get("sample_id")) or _text(row.get("sample_id")) == sample_id
]
elif len(observed_samples) > 1:
raise ValueError("Multiple samples are present; select exactly one sample")
unique: dict[tuple[str, str, str], dict[str, Any]] = {}
for raw_row in variants:
row = dict(raw_row)
row["gene"] = _text(row.get("gene")).upper()
row["variant_id"] = _canonical_variant_id(row)
if not row["gene"] or not row["variant_id"]:
continue
key = _variant_key(row)
if key not in unique or _score(row) > _score(unique[key]):
unique[key] = row
return list(unique.values())
def _analyze_germline(self, variants: list[dict[str, Any]]) -> list[MultiMutationBiomarker]:
findings: list[MultiMutationBiomarker] = []
for rule in self.signatures:
if rule["mode"] != "germline":
continue
gene = str(rule["genes"][0]).upper()
candidates = [row for row in variants if row["gene"] == gene and _is_qualifying(row)]
homozygous = [row for row in candidates if self._is_homozygous(row)]
for row in homozygous:
findings.append(
self._build_finding(
rule,
[row],
relationship_type="homozygous",
phase_status="homozygous",
interpretation=(
f"A homozygous qualifying {gene} variant supports a biallelic research hypothesis."
),
limitations=["Clinical significance and phenotype compatibility require expert review."],
)
)
heterozygous = [row for row in candidates if not self._is_homozygous(row)]
for first, second in combinations(heterozygous, 2):
phase_status = self._phase_relationship(first, second)
if phase_status == "cis":
continue
confirmed = phase_status == "in_trans"
limitations = ["Clinical significance and phenotype compatibility require expert review."]
if not confirmed:
limitations.insert(0, "Phase is unknown; in-trans status was not inferred.")
findings.append(
self._build_finding(
rule,
[first, second],
relationship_type=(
"confirmed_compound_heterozygous"
if confirmed
else "possible_compound_heterozygous"
),
phase_status=phase_status,
interpretation=(
f"Two qualifying {gene} variants are confirmed in trans."
if confirmed
else f"Two qualifying {gene} variants provide possible compound evidence; phase is unresolved."
),
limitations=limitations,
)
)
return self._dedupe_findings(findings)
def _analyze_somatic(self, variants: list[dict[str, Any]]) -> list[MultiMutationBiomarker]:
findings: list[MultiMutationBiomarker] = []
qualifying = [row for row in variants if _is_qualifying(row)]
by_gene: dict[str, list[dict[str, Any]]] = {}
for row in qualifying:
by_gene.setdefault(row["gene"], []).append(row)
for gene, gene_variants in by_gene.items():
if len(gene_variants) < 2:
continue
generic_rule = {
"signature_id": f"SOMATIC_{gene}_MULTIPLE_HITS",
"mode": "somatic",
"relationship_type": "same_gene_multiple_hits",
"genes": [gene],
"disease_context": [],
"evidence_level": "Research",
"effect_direction": "multi_hit_context",
"source_references": ["Derived from sample-local structured variant evidence"],
}
findings.append(
self._build_finding(
generic_rule,
gene_variants,
relationship_type="same_gene_multiple_hits",
phase_status="not_applicable",
interpretation=f"Multiple qualifying somatic variants were observed in {gene} within one sample.",
limitations=[
"Multiple hits do not establish biallelic inactivation or clonality.",
"Copy-number and tumor-purity evidence were not evaluated.",
],
)
)
if len(by_gene) >= 2:
cross_gene_variants = [
max(gene_variants, key=_score) for gene_variants in by_gene.values()
]
generic_cross_gene_rule = {
"signature_id": "SOMATIC_CROSS_GENE_CO_MUTATION",
"mode": "somatic",
"relationship_type": "cross_gene_co_mutation",
"genes": sorted(by_gene),
"disease_context": [],
"evidence_level": "Research",
"effect_direction": "co_mutation_context",
"source_references": ["Derived from sample-local structured variant evidence"],
}
findings.append(
self._build_finding(
generic_cross_gene_rule,
cross_gene_variants,
relationship_type="cross_gene_co_mutation",
phase_status="not_applicable",
interpretation=(
"Qualifying somatic variants were observed across multiple genes in one sample."
),
limitations=[
"This generic co-mutation finding is not a curated disease-specific signature.",
"Allele fraction and depth do not prove that variants occur in the same clone.",
],
)
)
for rule in self.signatures:
if rule["mode"] != "somatic":
continue
matched = self._match_somatic_rule(rule, qualifying)
if not matched:
continue
findings.append(
self._build_finding(
rule,
matched,
relationship_type=str(rule["relationship_type"]),
phase_status="not_applicable",
interpretation=(
f"The sample matches curated research signature {rule['signature_id']}."
),
limitations=[
"This association is contextual and does not establish treatment eligibility.",
"Allele fraction and depth do not prove that variants occur in the same clone.",
],
)
)
return self._dedupe_findings(findings)
def _match_somatic_rule(
self, rule: dict[str, Any], variants: list[dict[str, Any]]
) -> list[dict[str, Any]]:
genes = [str(gene).upper() for gene in rule["genes"]]
matches = [row for row in variants if row["gene"] in genes]
if not all(any(row["gene"] == gene for row in matches) for gene in genes):
return []
requirements = rule.get("variant_requirements", {})
selected: list[dict[str, Any]] = []
for gene, aliases in requirements.items():
gene_rows = [row for row in matches if row["gene"] == str(gene).upper()]
normalized_aliases = {re.sub(r"[^A-Z0-9]", "", str(alias).upper()) for alias in aliases}
required_rows = [
row
for row in gene_rows
if any(
alias
in re.sub(r"[^A-Z0-9]", "", _canonical_variant_id(row).upper())
for alias in normalized_aliases
)
]
if not required_rows:
return []
selected.extend(required_rows)
required_genes = {str(gene).upper() for gene in requirements}
selected.extend(row for row in matches if row["gene"] not in required_genes)
return selected or matches
def _build_finding(
self,
rule: dict[str, Any],
variants: list[dict[str, Any]],
*,
relationship_type: str,
phase_status: str,
interpretation: str,
limitations: list[str],
) -> MultiMutationBiomarker:
confidence = self._confidence(variants, phase_status=phase_status)
return MultiMutationBiomarker(
signature_id=str(rule["signature_id"]),
interpretation_mode=str(rule["mode"]),
relationship_type=relationship_type,
participating_variants=[_supporting_variant(row) for row in variants],
disease_context=list(rule.get("disease_context", [])),
evidence_level=str(rule.get("evidence_level", "Research")),
confidence=confidence,
source_references=list(rule.get("source_references", [])),
limitations=limitations,
interpretation=interpretation,
effect_direction=str(rule.get("effect_direction", "")),
sample_id=next((_text(row.get("sample_id")) for row in variants if _text(row.get("sample_id"))), None),
phase_status=phase_status,
ruleset_version=self.ruleset_version,
)
@staticmethod
def _is_homozygous(row: dict[str, Any]) -> bool:
zygosity = _text(row.get("zygosity")).lower()
genotype = _text(row.get("genotype"))
return zygosity == "homozygous" or genotype in {"1/1", "1|1"}
@staticmethod
def _phase_relationship(first: dict[str, Any], second: dict[str, Any]) -> str:
first_gt = _text(first.get("genotype"))
second_gt = _text(second.get("genotype"))
first_ps = _text(first.get("phase_set"))
second_ps = _text(second.get("phase_set"))
if not first_ps or first_ps != second_ps or "|" not in first_gt or "|" not in second_gt:
return "unknown"
if {first_gt, second_gt} == {"0|1", "1|0"}:
return "in_trans"
if first_gt == second_gt and first_gt in {"0|1", "1|0"}:
return "cis"
return "unknown"
@staticmethod
def _confidence(variants: list[dict[str, Any]], *, phase_status: str) -> float:
scores = [_score(row) for row in variants]
base = sum(scores) / len(scores) if scores else 0.45
depth_values = [float(row["depth"]) for row in variants if row.get("depth") not in (None, "")]
vaf_values = [
float(row["allele_fraction"])
for row in variants
if row.get("allele_fraction") not in (None, "")
]
if depth_values and min(depth_values) >= 20:
base += 0.04
if vaf_values and all(0.05 <= value <= 1.0 for value in vaf_values):
base += 0.03
if phase_status == "in_trans":
base += 0.08
elif phase_status == "unknown":
base -= 0.12
return round(_clamp(base, 0.1, 0.98), 4)
@staticmethod
def _dedupe_findings(findings: list[MultiMutationBiomarker]) -> list[MultiMutationBiomarker]:
unique: dict[tuple[str, tuple[str, ...]], MultiMutationBiomarker] = {}
for finding in findings:
variants = tuple(
sorted(
f"{row.get('gene')}:{row.get('variant_id')}"
for row in finding.participating_variants
)
)
unique[(finding.signature_id, variants)] = finding
return sorted(
unique.values(), key=lambda item: (item.confidence, item.signature_id), reverse=True
)