""" Genome Analysis Engine - Sequence-based disease risk prediction and biomarker detection Provides mutation analysis, biomarker detection, disease association mapping, and personalized insights """ import hashlib import json import re from dataclasses import dataclass, field from enum import Enum from typing import Any, Dict, List, Optional, Set, Tuple from multi_mutation_analysis import MultiMutationAnalyzer from variant_evidence import VCFPyParser class VariantType(Enum): """Classification of genetic variants""" PATHOGENIC = "Pathogenic" LIKELY_PATHOGENIC = "Likely Pathogenic" UNCERTAIN_SIGNIFICANCE = "Uncertain Significance" LIKELY_BENIGN = "Likely Benign" BENIGN = "Benign" RISK_FACTOR = "Risk Factor" PROTECTIVE = "Protective" class ConfidenceLevel(Enum): """Confidence levels for predictions""" VERY_HIGH = "Very High" HIGH = "High" MODERATE = "Moderate" LOW = "Low" VERY_LOW = "Very Low" CONFIDENCE_NUMERIC = { ConfidenceLevel.VERY_HIGH: 0.92, ConfidenceLevel.HIGH: 0.78, ConfidenceLevel.MODERATE: 0.62, ConfidenceLevel.LOW: 0.38, ConfidenceLevel.VERY_LOW: 0.2, } def _clamp(value: float, lower: float = 0.0, upper: float = 1.0) -> float: """Clamp a numeric value into a bounded interval.""" return max(lower, min(value, upper)) def _normalize_sex(value: Optional[str]) -> str: """Normalize user-entered sex/gender values for eligibility rules.""" if value is None: return "unknown" normalized = str(value).strip().lower() if normalized in {"m", "male", "man"}: return "male" if normalized in {"f", "female", "woman"}: return "female" return "unknown" def _dedupe_preserve_order(values: List[str]) -> List[str]: """Remove duplicates while preserving insertion order.""" seen = set() ordered = [] for value in values: if value not in seen: seen.add(value) ordered.append(value) return ordered def _to_float(value: object) -> Optional[float]: """Safely coerce user-provided numeric metadata.""" if value is None: return None try: return float(value) except (TypeError, ValueError): return None def _extract_height_meters(user_metadata: Optional[Dict]) -> Optional[float]: """Read height from metadata and normalize to meters.""" if not user_metadata: return None raw_height = ( user_metadata.get("height_cm") if "height_cm" in user_metadata else user_metadata.get("height") ) height = _to_float(raw_height) if height is None or height <= 0: return None if height > 3.0: # Treat values > 3 as centimeters. height = height / 100.0 if height < 0.5 or height > 2.5: return None return height def _calculate_bmi(weight_kg: Optional[float], height_m: Optional[float]) -> Optional[float]: """Calculate BMI if enough valid anthropometric data is present.""" if weight_kg is None or height_m is None or weight_kg <= 0 or height_m <= 0: return None return weight_kg / (height_m ** 2) @dataclass class Variant: """Represents a genetic variant detected in sequence""" gene: str variant_id: str type: str description: str position: int = 0 reference: str = "" alternate: str = "" sequence_match: str = "" confidence: float = 0.8 # 0-1 pathogenicity_score: Optional[float] = None pathogenicity_tier: Optional[int] = None pathogenicity_method: str = "" model_confidence: str = "" evidence_summary: str = "" genotype: str = "" zygosity: str = "" phase_set: Optional[str] = None allele_fraction: Optional[float] = None depth: Optional[int] = None origin: str = "" genome_build: str = "" sample_id: str = "" def to_dict(self) -> Dict: """Convert to dictionary""" return { 'gene': self.gene, 'variant_id': self.variant_id, 'type': self.type, 'description': self.description, 'position': self.position, 'reference': self.reference, 'alternate': self.alternate, 'sequence_match': self.sequence_match, 'confidence': self.confidence, 'pathogenicity_score': self.pathogenicity_score, 'pathogenicity_tier': self.pathogenicity_tier, 'pathogenicity_method': self.pathogenicity_method, 'model_confidence': self.model_confidence, 'evidence_summary': self.evidence_summary, 'genotype': self.genotype, 'zygosity': self.zygosity, 'phase_set': self.phase_set, 'allele_fraction': self.allele_fraction, 'depth': self.depth, 'origin': self.origin, 'genome_build': self.genome_build, 'sample_id': self.sample_id, } @dataclass class Biomarker: """Represents a detected biomarker in the sequence""" name: str biomarker_type: str location: str sequence_pattern: str position: int = 0 length: int = 0 match_strength: float = 1.0 # 0-1, how well it matches associated_diseases: List[str] = field(default_factory=list) clinical_significance: str = "" def to_dict(self) -> Dict: """Convert to dictionary""" return { 'name': self.name, 'type': self.biomarker_type, 'location': self.location, 'pattern': self.sequence_pattern, 'position': self.position, 'length': self.length, 'match_strength': self.match_strength, 'diseases': self.associated_diseases, 'significance': self.clinical_significance } @dataclass class DiseaseAssociation: """Represents association between detected variants/biomarkers and a disease""" disease: str risk_score: float # 0-100 confidence: ConfidenceLevel detected_variants: List[Variant] = field(default_factory=list) detected_biomarkers: List[Biomarker] = field(default_factory=list) inheritance_pattern: str = "" prevalence: float = 0.0 evidence_strength: float = 0.0 clinical_actionability: float = 0.0 risk_increase: float = 0.0 confidence_score: float = 0.0 severity_weight: float = 0.0 priority_score: float = 0.0 priority_category: str = "Research Signal" eligibility_reason: str = "" uncertainty_messages: List[str] = field(default_factory=list) replicated_signal: bool = False def to_dict(self) -> Dict: """Convert to dictionary""" return { 'disease': self.disease, 'risk_score': self.risk_score, 'confidence': self.confidence.value, 'variants': len(self.detected_variants), 'biomarkers': len(self.detected_biomarkers), 'inheritance': self.inheritance_pattern, 'prevalence': self.prevalence, 'evidence_strength': round(self.evidence_strength, 4), 'clinical_actionability': round(self.clinical_actionability, 4), 'risk_increase': round(self.risk_increase, 4), 'confidence_score': round(self.confidence_score, 4), 'severity_weight': round(self.severity_weight, 4), 'priority_score': round(self.priority_score, 4), 'priority_category': self.priority_category, 'eligibility_reason': self.eligibility_reason, 'uncertainty_messages': list(self.uncertainty_messages), 'replicated_signal': self.replicated_signal, 'risk_label': f"Elevated genetic predisposition for {self.disease}", } class MutationAnalyzer: """ Analyze clinically relevant variants for downstream disease mapping. Recommended production workflow: 1. FASTA/FASTQ -> align against a reference with BWA, Minimap2, or a local Smith-Waterman-style aligner for short targeted regions. 2. Call variants with a standard caller such as samtools/bcftools or GATK. 3. Annotate consequences with VEP/ANNOVAR or equivalent transcript-aware logic. 4. Cross-reference ClinVar/OMIM/gnomAD-style evidence before emitting disease associations. This class accepts already-called VCF/annotated-variant payloads directly and keeps raw motif/k-mer matching as an optional exploratory fallback only. """ # Disease-causing genes database DISEASE_GENES = { 'BRCA1': { 'diseases': ['Breast Cancer', 'Ovarian Cancer', 'Prostate Cancer'], 'inheritance': 'Autosomal Dominant', 'penetrance': 0.72, 'variants': [ {'id': 'c.68_69delAG', 'type': 'Frameshift', 'pathogenicity': 'Pathogenic'}, {'id': 'c.5266dupC', 'type': 'Frameshift', 'pathogenicity': 'Pathogenic'}, {'id': '5382insC', 'type': 'Frameshift', 'pathogenicity': 'Pathogenic'}, ] }, 'BRCA2': { 'diseases': ['Breast Cancer', 'Ovarian Cancer', 'Pancreatic Cancer'], 'inheritance': 'Autosomal Dominant', 'penetrance': 0.62, 'variants': [ {'id': '6174delT', 'type': 'Frameshift', 'pathogenicity': 'Pathogenic'}, {'id': 'c.9097C>T', 'type': 'Nonsense', 'pathogenicity': 'Pathogenic'}, ] }, 'TP53': { 'diseases': ['Breast Cancer', 'Colorectal Cancer', 'Sarcoma'], 'inheritance': 'Autosomal Dominant', 'penetrance': 0.73, 'variants': [ {'id': 'R175H', 'type': 'Missense', 'pathogenicity': 'Pathogenic'}, {'id': 'c.215C>G', 'type': 'Missense', 'pathogenicity': 'Pathogenic'}, ] }, 'APOE': { 'diseases': ["Alzheimer's Disease"], 'inheritance': 'Complex', 'penetrance': 0.35, 'variants': [ {'id': 'ε4 allele', 'type': 'SNP', 'pathogenicity': 'Risk Factor'}, {'id': 'ε2 allele', 'type': 'SNP', 'pathogenicity': 'Protective'}, ] }, 'CFTR': { 'diseases': ['Cystic Fibrosis'], 'inheritance': 'Autosomal Recessive', 'penetrance': 1.0, 'variants': [ {'id': 'F508del', 'type': 'Deletion', 'pathogenicity': 'Pathogenic'}, {'id': 'G551D', 'type': 'Missense', 'pathogenicity': 'Pathogenic'}, ] }, 'HFE': { 'diseases': ['Hemochromatosis'], 'inheritance': 'Autosomal Recessive', 'penetrance': 0.10, 'variants': [ {'id': 'C282Y', 'type': 'Missense', 'pathogenicity': 'Pathogenic'}, {'id': 'H63D', 'type': 'Missense', 'pathogenicity': 'Likely Benign'}, ] }, 'FTO': { 'diseases': ['Obesity', 'Type 2 Diabetes'], 'inheritance': 'Complex', 'penetrance': 0.15, 'variants': [ {'id': 'rs9939609', 'type': 'SNP', 'pathogenicity': 'Risk Factor'}, ] }, 'TCF7L2': { 'diseases': ['Type 2 Diabetes'], 'inheritance': 'Complex', 'penetrance': 0.25, 'variants': [ {'id': 'rs7903146', 'type': 'SNP', 'pathogenicity': 'Risk Factor'}, ] }, 'MTHFR': { 'diseases': ['Neural Tube Defects', 'Thrombosis'], 'inheritance': 'Autosomal Recessive', 'penetrance': 0.05, 'variants': [ {'id': 'C677T', 'type': 'Missense', 'pathogenicity': 'Risk Factor'}, {'id': 'A1298C', 'type': 'Missense', 'pathogenicity': 'Risk Factor'}, ] }, 'LDLR': { 'diseases': ['Familial Hypercholesterolemia'], 'inheritance': 'Autosomal Dominant', 'penetrance': 0.9, 'variants': [ {'id': 'Exon 2-6 deletions', 'type': 'Deletion', 'pathogenicity': 'Pathogenic'}, ] } } CLINICALLY_ACTIONABLE_LABELS = {"pathogenic", "likely pathogenic"} BENIGN_LABELS = {"benign", "likely benign"} TRUNCATING_CLASSES = {"Frameshift", "Nonsense"} EXPLORATORY_KMER_SIZE = 7 MAX_LOCAL_ALIGNMENT_MATRIX = 250000 CODON_TABLE = { "TTT": "F", "TTC": "F", "TTA": "L", "TTG": "L", "CTT": "L", "CTC": "L", "CTA": "L", "CTG": "L", "ATT": "I", "ATC": "I", "ATA": "I", "ATG": "M", "GTT": "V", "GTC": "V", "GTA": "V", "GTG": "V", "TCT": "S", "TCC": "S", "TCA": "S", "TCG": "S", "CCT": "P", "CCC": "P", "CCA": "P", "CCG": "P", "ACT": "T", "ACC": "T", "ACA": "T", "ACG": "T", "GCT": "A", "GCC": "A", "GCA": "A", "GCG": "A", "TAT": "Y", "TAC": "Y", "TAA": "*", "TAG": "*", "CAT": "H", "CAC": "H", "CAA": "Q", "CAG": "Q", "AAT": "N", "AAC": "N", "AAA": "K", "AAG": "K", "GAT": "D", "GAC": "D", "GAA": "E", "GAG": "E", "TGT": "C", "TGC": "C", "TGA": "*", "TGG": "W", "CGT": "R", "CGC": "R", "CGA": "R", "CGG": "R", "AGT": "S", "AGC": "S", "AGA": "R", "AGG": "R", "GGT": "G", "GGC": "G", "GGA": "G", "GGG": "G", } def __init__(self, variant_pipeline: Optional[Any] = None, vcf_parser: Optional[Any] = None): self.variant_pipeline = variant_pipeline self.vcf_parser = vcf_parser or VCFPyParser() def set_variant_pipeline(self, variant_pipeline: Optional[Any]) -> None: """Attach a VCF normalization/annotation pipeline after construction.""" self.variant_pipeline = variant_pipeline def analyze_mutations( self, sequence: str, annotated_variants: Optional[List[Dict[str, Any]]] = None, vcf_text: Optional[str] = None, reference_sequence: Optional[str] = None, sample_id: Optional[str] = None, genome_build: Optional[str] = None, allow_exploratory_kmers: bool = False, ) -> List[Variant]: """ Analyze clinically supported variants for downstream disease mapping. Args: sequence: DNA/FASTA/FASTQ input or a placeholder sequence when VCF/annotations are provided separately. annotated_variants: Transcript/protein-annotated variant rows from a proper caller. vcf_text: Optional raw VCF text to normalize/annotate before filtering. reference_sequence: Optional local reference for short-region fallback calling. allow_exploratory_kmers: Keep non-reference k-mer exploration available, but never treat those hits as pathogenic evidence. Returns: List of disease-relevant variants only. """ prepared_variants = self.prepare_variant_evidence( sequence=sequence, annotated_variants=annotated_variants, vcf_text=vcf_text, reference_sequence=reference_sequence, sample_id=sample_id, genome_build=genome_build, ) if prepared_variants: return self._build_disease_variants(prepared_variants) if allow_exploratory_kmers: return self._detect_exploratory_kmers(sequence) return [] def prepare_variant_evidence( self, sequence: str, annotated_variants: Optional[List[Dict[str, Any]]] = None, vcf_text: Optional[str] = None, reference_sequence: Optional[str] = None, sample_id: Optional[str] = None, genome_build: Optional[str] = None, ) -> List[Dict[str, Any]]: """ Normalize sequence/VCF/annotation inputs into annotated variant records. Disease calls are only made from these annotated records, never from raw motif matches such as `ATG`. """ if annotated_variants: normalized = [dict(row) for row in annotated_variants] return self._ensure_variant_scores(normalized) if vcf_text: if self.variant_pipeline is not None: parsed = self.variant_pipeline.parse_vcf(vcf_text) normalized = self.variant_pipeline.normalize_variants(parsed.get("variants", [])) annotated = self.variant_pipeline.annotate_variant_effects(normalized) else: parsed = self.vcf_parser.parse( vcf_text, sample_id=sample_id, genome_build=genome_build, ) annotated = parsed.get("variants", []) return self._ensure_variant_scores(annotated) if reference_sequence: return self._call_and_annotate_local_variants(sequence, reference_sequence) return [] def _ensure_variant_scores(self, annotated_variants: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Attach pathogenicity metadata when upstream annotations did not already include it.""" if not annotated_variants: return [] if all( any(key in variant for key in ("pathogenicity_score", "pathogenicity_tier", "pathogenicity_method")) for variant in annotated_variants ): return annotated_variants if self.variant_pipeline is not None: return self.variant_pipeline.score_variant_pathogenicity( annotated_variants, use_prioritization=True, ) for variant in annotated_variants: consequence_class = self._classify_variant_consequence(variant) label = self._normalize_clinical_label( variant.get("clinical_significance") or variant.get("clinvar_significance") or variant.get("pathogenicity") ) fallback_score = { "Frameshift": 0.92, "Nonsense": 0.89, "Missense": 0.66, "Synonymous": 0.08, }.get(consequence_class, 0.35) if label == "pathogenic": fallback_score = max(fallback_score, 0.95) elif label == "likely pathogenic": fallback_score = max(fallback_score, 0.84) elif label in self.BENIGN_LABELS: fallback_score = min(fallback_score, 0.2) variant["pathogenicity_score"] = round(_clamp(fallback_score), 4) variant["pathogenicity_tier"] = 1 if fallback_score >= 0.85 else 2 if fallback_score >= 0.7 else 3 variant["pathogenicity_method"] = "annotation_heuristic" variant["model_confidence"] = str(variant.get("model_confidence") or "Moderate") variant["evidence_summary"] = ( "Heuristic pathogenicity derived from annotated consequence because no " "scoring pipeline was attached." ) return annotated_variants def _build_disease_variants(self, annotated_variants: List[Dict[str, Any]]) -> List[Variant]: """Convert annotated records into clinically supported Variant objects.""" detected_variants: List[Variant] = [] for row in annotated_variants: gene = str(row.get("gene", "")).strip().upper() if not gene: continue evidence = self._evaluate_clinical_evidence(row) if not evidence["supports_disease_association"]: continue variant = Variant( gene=gene, variant_id=self._select_variant_identifier(row), type=str(evidence["classification"]), description=str(evidence["description"]), position=self._safe_int(row.get("pos", row.get("position", 0))), reference=str(row.get("ref", row.get("reference", ""))), alternate=str(row.get("alt", row.get("alternate", ""))), sequence_match="", confidence=float(evidence["confidence"]), pathogenicity_score=evidence["pathogenicity_score"], pathogenicity_tier=evidence["pathogenicity_tier"], pathogenicity_method=str(row.get("pathogenicity_method", evidence["pathogenicity_method"])), model_confidence=str(row.get("model_confidence", row.get("confidence", ""))), evidence_summary=str(evidence["summary"]), genotype=str(row.get("genotype", "") or ""), zygosity=str(row.get("zygosity", "") or ""), phase_set=(str(row.get("phase_set")) if row.get("phase_set") is not None else None), allele_fraction=_to_float(row.get("allele_fraction")), depth=self._safe_int(row.get("depth", 0)) or None, origin=str(row.get("origin", "") or ""), genome_build=str(row.get("genome_build", "") or ""), sample_id=str(row.get("sample_id", "") or ""), ) detected_variants.append(variant) return detected_variants def _evaluate_clinical_evidence(self, row: Dict[str, Any]) -> Dict[str, Any]: """Decide whether an annotated variant is strong enough for disease association.""" consequence_class = self._classify_variant_consequence(row) db_annotation = self._cross_reference_variant_databases(row) pathogenicity_score = self._variant_pathogenicity_score(row, db_annotation, consequence_class) pathogenicity_tier = row.get("pathogenicity_tier") if pathogenicity_tier is None: pathogenicity_tier = 1 if pathogenicity_score >= 0.85 else 2 if pathogenicity_score >= 0.7 else 3 confidence = self._annotation_confidence(row, db_annotation, consequence_class, pathogenicity_score) clinical_label = db_annotation["label"] pass_like = str(row.get("filter", "PASS")).upper() in {"PASS", ".", ""} truncating_with_support = ( consequence_class in self.TRUNCATING_CLASSES and pass_like and confidence >= 0.8 and pathogenicity_score >= 0.75 ) supports_disease_association = ( clinical_label in self.CLINICALLY_ACTIONABLE_LABELS or truncating_with_support ) summary_parts = [f"consequence={consequence_class}"] if clinical_label: summary_parts.append(f"label={clinical_label}") if db_annotation["source"]: summary_parts.append(f"source={db_annotation['source']}") if db_annotation["matched_variant_id"]: summary_parts.append(f"matched={db_annotation['matched_variant_id']}") summary_parts.append(f"pathogenicity_score={pathogenicity_score:.2f}") if truncating_with_support and clinical_label not in self.CLINICALLY_ACTIONABLE_LABELS: summary_parts.append("support=high_confidence_truncating_event") gene = str(row.get("gene", "UNKNOWN")).strip().upper() or "UNKNOWN" variant_id = self._select_variant_identifier(row) return { "classification": consequence_class, "pathogenicity_score": round(pathogenicity_score, 4), "pathogenicity_tier": pathogenicity_tier, "pathogenicity_method": db_annotation["source"] or str(row.get("pathogenicity_method", "annotation_heuristic")), "confidence": round(confidence, 4), "supports_disease_association": supports_disease_association, "summary": "; ".join(summary_parts), "description": f"Annotated {consequence_class} variant in {gene}: {variant_id}", } def _calculate_variant_confidence( self, gene: str, pattern: str, variant_id: str, position: int, gene_info: Dict, ) -> float: """Derive a deterministic confidence score for exploratory pattern evidence only.""" penetrance = float(gene_info.get('penetrance', 0.5)) pattern_upper = pattern.upper() gene_upper = gene.upper() specificity_bonus = 0.18 if gene_upper in pattern_upper or pattern_upper in gene_upper else 0.08 unique_fraction = len(set(pattern_upper)) / max(len(pattern_upper), 1) uniqueness_bonus = min(unique_fraction * 0.12, 0.12) position_bonus = min((position % 11) * 0.005, 0.05) variant_hint = variant_id.lower() if any(token in variant_hint for token in ('frameshift', 'nonsense', 'stop')): evidence_modifier = 0.1 elif any(token in variant_hint for token in ('del', 'dup', 'ins', 'rs', '>')): evidence_modifier = 0.06 elif 'detected' in variant_hint: evidence_modifier = -0.12 else: evidence_modifier = -0.04 confidence = 0.28 + (penetrance * 0.24) + specificity_bonus + uniqueness_bonus + position_bonus + evidence_modifier return round(_clamp(confidence, 0.1, 0.8), 4) def _generate_gene_patterns(self, gene: str) -> List[Tuple[str, str]]: """Generate exploratory gene patterns without start-codon shortcuts.""" patterns = { 'BRCA1': [('BRCA', 'BRCA1_detected')], 'BRCA2': [('BRCA', 'BRCA2_detected')], 'TP53': [('TP53', 'TP53_detected'), ('AACG', 'TP53_core')], 'APOE': [('APOE', 'APOE_detected'), ('CGC', 'APOE_core')], 'CFTR': [('CFTR', 'CFTR_detected')], 'HFE': [('HFE', 'HFE_detected'), ('TGT', 'HFE_cys282')], 'FTO': [('FTO', 'FTO_detected')], 'TCF7L2': [('TCF7L2', 'TCF7L2_detected')], 'MTHFR': [('MTHFR', 'MTHFR_detected')], 'LDLR': [('LDLR', 'LDLR_detected')] } return patterns.get(gene, [(gene, f"{gene}_detected")]) def _normalize_clinical_label(self, value: object) -> str: """Normalize ClinVar-like labels into a compact lowercase vocabulary.""" label = str(value or "").strip().lower() if not label: return "" if "likely pathogenic" in label: return "likely pathogenic" if "pathogenic" in label: return "pathogenic" if "likely benign" in label: return "likely benign" if "benign" in label: return "benign" if "risk factor" in label: return "risk factor" if "protective" in label: return "protective" return label def _cross_reference_variant_databases(self, row: Dict[str, Any]) -> Dict[str, str]: """ Resolve a local ClinVar/OMIM/gnomAD-style label for a variant. This method prefers explicit upstream annotations and then falls back to the curated local disease-gene catalogue bundled in this engine. """ explicit_label = self._normalize_clinical_label( row.get("clinical_significance") or row.get("clinvar_significance") or row.get("pathogenicity") or row.get("classification") ) if explicit_label: return { "label": explicit_label, "source": "annotated_input", "matched_variant_id": str( row.get("variant_id") or row.get("variant_key") or row.get("id") or row.get("protein") or "" ), } info = row.get("info", {}) if isinstance(row.get("info"), dict) else {} for key in ("CLNSIG", "CLINVAR", "clinvar_significance"): if key in info: normalized = self._normalize_clinical_label(info.get(key)) if normalized: return { "label": normalized, "source": f"info:{key.lower()}", "matched_variant_id": str(info.get("ALLELEID", "") or info.get("RS", "") or ""), } population_label = self._population_frequency_label(info) if population_label: return { "label": population_label, "source": "gnomad_frequency", "matched_variant_id": "", } gene = str(row.get("gene", "")).strip().upper() if gene in self.DISEASE_GENES: known_variants = self.DISEASE_GENES[gene].get("variants", []) row_identifiers = { token.upper() for token in [ row.get("variant_id"), row.get("variant_key"), row.get("id"), row.get("protein"), row.get("hgvsp"), row.get("hgvsc"), row.get("c_hgvs"), row.get("p_hgvs"), ] if token } for known in known_variants: known_id = str(known.get("id", "")).strip() if known_id and known_id.upper() in row_identifiers: return { "label": self._normalize_clinical_label(known.get("pathogenicity")), "source": "local_curated_db", "matched_variant_id": known_id, } return {"label": "", "source": "", "matched_variant_id": ""} def _population_frequency_label(self, info: Dict[str, Any]) -> str: """Use high population frequency as conservative benign evidence when available.""" for key in ("GNOMAD_AF", "gnomad_af", "AF"): raw = info.get(key) try: if raw is not None and float(raw) >= 0.05: return "benign" except (TypeError, ValueError): continue return "" def _variant_pathogenicity_score( self, row: Dict[str, Any], db_annotation: Dict[str, str], consequence_class: str, ) -> float: """Prefer upstream pathogenicity scores, then fall back to consequence/database evidence.""" raw_score = row.get("pathogenicity_score") try: if raw_score is not None: return _clamp(float(raw_score)) except (TypeError, ValueError): pass impact_score = row.get("impact_score") try: if impact_score is not None: impact_score = float(impact_score) else: impact_score = 0.0 except (TypeError, ValueError): impact_score = 0.0 label = db_annotation["label"] if label == "pathogenic": return max(0.95, impact_score) if label == "likely pathogenic": return max(0.85, impact_score) if label == "benign": return min(impact_score or 0.15, 0.12) if label == "likely benign": return min(impact_score or 0.2, 0.2) default_scores = { "Frameshift": 0.88, "Nonsense": 0.86, "Missense": 0.65, "Synonymous": 0.08, } return _clamp(max(impact_score, default_scores.get(consequence_class, 0.35))) def _annotation_confidence( self, row: Dict[str, Any], db_annotation: Dict[str, str], consequence_class: str, pathogenicity_score: float, ) -> float: """Combine filter, annotation, and clinical evidence into a bounded confidence score.""" confidence_value = self._normalize_confidence_value( row.get("model_confidence", row.get("confidence")) ) pass_like = str(row.get("filter", "PASS")).upper() in {"PASS", ".", ""} consequence_bonus = { "Frameshift": 0.12, "Nonsense": 0.1, "Missense": 0.04, "Synonymous": -0.08, }.get(consequence_class, 0.0) label_bonus = { "pathogenic": 0.16, "likely pathogenic": 0.12, "benign": -0.18, "likely benign": -0.12, }.get(db_annotation["label"], 0.0) filter_bonus = 0.06 if pass_like else -0.18 annotation_bonus = 0.08 if row.get("consequence") else -0.1 base = max(confidence_value, pathogenicity_score * 0.8) return round(_clamp(base + consequence_bonus + label_bonus + filter_bonus + annotation_bonus, 0.05, 0.99), 4) def _normalize_confidence_value(self, value: object) -> float: """Normalize string or numeric confidence fields to a 0-1 score.""" if isinstance(value, (int, float)): return _clamp(float(value)) normalized = str(value or "").strip().lower() if normalized in {"very high", "very_high"}: return 0.92 if normalized == "high": return 0.84 if normalized in {"moderate", "medium"}: return 0.68 if normalized == "low": return 0.4 if normalized == "very low": return 0.2 return 0.5 def _classify_variant_consequence(self, row: Dict[str, Any]) -> str: """Collapse detailed consequence tags into disease-mapping consequence classes.""" consequence = str(row.get("consequence", "")).lower() ref = str(row.get("ref", row.get("reference", ""))).upper() alt = str(row.get("alt", row.get("alternate", ""))).upper() if "frameshift" in consequence or self._is_frameshift(ref, alt): return "Frameshift" if any(token in consequence for token in ("stop_gained", "nonsense", "stopgain")): return "Nonsense" if "missense" in consequence: return "Missense" if "synonymous" in consequence: return "Synonymous" if len(ref) == 1 and len(alt) == 1: return "SNP" if len(ref) != len(alt): return "Indel" return str(row.get("variant_type") or "Variant") def _is_frameshift(self, ref: str, alt: str) -> bool: """Identify frameshift indels from allele length differences.""" if not ref or not alt or len(ref) == len(alt): return False return abs(len(ref) - len(alt)) % 3 != 0 def _select_variant_identifier(self, row: Dict[str, Any]) -> str: """Prefer HGVS-like identifiers before falling back to a positional key.""" for key in ("variant_id", "hgvsc", "hgvsp", "protein", "variant_key", "id"): value = row.get(key) if value: return str(value) chrom = row.get("chrom", "") pos = row.get("pos", row.get("position", "")) ref = row.get("ref", row.get("reference", "")) alt = row.get("alt", row.get("alternate", "")) return f"{chrom}:{pos}:{ref}>{alt}".strip(":>") def _safe_int(self, value: object) -> int: """Safely coerce a position-like field to integer.""" try: return int(value) except (TypeError, ValueError): return 0 def _call_and_annotate_local_variants( self, sequence: str, reference_sequence: str, ) -> List[Dict[str, Any]]: """ Lightweight local fallback for short targeted regions when no external aligner is available. This is intentionally conservative and suitable only for small local cases. Whole-genome or long-read analyses should use BWA/Minimap2 plus a dedicated caller upstream. """ query = self._extract_primary_nucleotide_sequence(sequence) reference = self._extract_primary_nucleotide_sequence(reference_sequence) if not query or not reference: return [] if len(query) * len(reference) > self.MAX_LOCAL_ALIGNMENT_MATRIX: return [] aligned_ref, aligned_query = self._global_align(reference, query) return self._variants_from_alignment(aligned_ref, aligned_query, reference) def _extract_primary_nucleotide_sequence(self, raw_input: str) -> str: """Extract the primary nucleotide sequence from FASTA/FASTQ/plain-text input.""" if not raw_input: return "" lines = [line.strip() for line in str(raw_input).splitlines() if line.strip()] if not lines: return "" if lines[0].startswith(">"): return "".join(line for line in lines if not line.startswith(">")).upper() if lines[0].startswith("@") and len(lines) >= 2: fastq_bases: List[str] = [] index = 0 while index + 1 < len(lines): if lines[index].startswith("@"): fastq_bases.append(lines[index + 1]) index += 4 continue index += 1 if fastq_bases: return "".join(fastq_bases).upper() return re.sub(r"[^ACGTNacgtn]", "", "".join(lines)).upper() def _global_align(self, reference: str, query: str) -> Tuple[str, str]: """Needleman-Wunsch alignment for short local fallback cases.""" match_score = 2 mismatch_penalty = -1 gap_penalty = -2 rows = len(reference) + 1 cols = len(query) + 1 score = [[0] * cols for _ in range(rows)] trace = [[""] * cols for _ in range(rows)] for i in range(1, rows): score[i][0] = i * gap_penalty trace[i][0] = "U" for j in range(1, cols): score[0][j] = j * gap_penalty trace[0][j] = "L" for i in range(1, rows): for j in range(1, cols): diag = score[i - 1][j - 1] + (match_score if reference[i - 1] == query[j - 1] else mismatch_penalty) up = score[i - 1][j] + gap_penalty left = score[i][j - 1] + gap_penalty best = max(diag, up, left) score[i][j] = best trace[i][j] = "D" if best == diag else "U" if best == up else "L" aligned_ref: List[str] = [] aligned_query: List[str] = [] i = len(reference) j = len(query) while i > 0 or j > 0: direction = trace[i][j] if i >= 0 and j >= 0 else "" if i > 0 and j > 0 and direction == "D": aligned_ref.append(reference[i - 1]) aligned_query.append(query[j - 1]) i -= 1 j -= 1 elif i > 0 and (j == 0 or direction == "U"): aligned_ref.append(reference[i - 1]) aligned_query.append("-") i -= 1 else: aligned_ref.append("-") aligned_query.append(query[j - 1]) j -= 1 return "".join(reversed(aligned_ref)), "".join(reversed(aligned_query)) def _variants_from_alignment( self, aligned_ref: str, aligned_query: str, reference: str, ) -> List[Dict[str, Any]]: """Call SNPs and indels from an aligned short region.""" variants: List[Dict[str, Any]] = [] ref_pos = 0 index = 0 while index < len(aligned_ref): ref_base = aligned_ref[index] query_base = aligned_query[index] if ref_base == query_base: if ref_base != "-": ref_pos += 1 index += 1 continue start_pos = ref_pos + 1 ref_segment: List[str] = [] alt_segment: List[str] = [] while index < len(aligned_ref) and aligned_ref[index] != aligned_query[index]: if aligned_ref[index] != "-": ref_segment.append(aligned_ref[index]) ref_pos += 1 if aligned_query[index] != "-": alt_segment.append(aligned_query[index]) index += 1 ref_allele = "".join(ref_segment) or "-" alt_allele = "".join(alt_segment) or "-" consequence = self._infer_local_consequence(reference, start_pos, ref_allele, alt_allele) variants.append( { "chrom": "local_reference", "pos": start_pos, "ref": ref_allele, "alt": alt_allele, "variant_key": f"local_reference:{start_pos}:{ref_allele}>{alt_allele}", "gene": "UNKNOWN", "consequence": consequence, "filter": "PASS", "confidence": "Moderate", "pathogenicity_method": "local_alignment_fallback", } ) return self._ensure_variant_scores(variants) def _infer_local_consequence(self, reference: str, position: int, ref: str, alt: str) -> str: """Infer a basic coding consequence from a short-region local call.""" if self._is_frameshift(ref, alt): return "frameshift_variant" if len(ref) != len(alt): return "inframe_indel" if abs(len(ref) - len(alt)) % 3 == 0 else "frameshift_variant" if len(ref) == 1 and len(alt) == 1: ref_aa, alt_aa = self._translate_snv_effect(reference, position, alt) if ref_aa and alt_aa: if ref_aa == alt_aa: return "synonymous_variant" if alt_aa == "*": return "stop_gained" return "missense_variant" return "snv" return "complex_substitution" def _translate_snv_effect(self, reference: str, position: int, alt_base: str) -> Tuple[str, str]: """Translate a single-base substitution assuming reading frame starts at position 1.""" zero_based = position - 1 codon_start = (zero_based // 3) * 3 if codon_start < 0 or codon_start + 3 > len(reference): return "", "" ref_codon = reference[codon_start:codon_start + 3].upper() if any(base not in "ACGT" for base in ref_codon + alt_base.upper()): return "", "" alt_codon = list(ref_codon) alt_codon[zero_based % 3] = alt_base.upper() return self.CODON_TABLE.get(ref_codon, ""), self.CODON_TABLE.get("".join(alt_codon), "") def _detect_exploratory_kmers(self, sequence: str) -> List[Variant]: """ Optional exploratory k-mer matching for unsupported inputs. Returned signals stay low-confidence and are never used by the normal disease-calling path unless a caller explicitly opts into this fallback. """ sequence_upper = self._extract_primary_nucleotide_sequence(sequence) if len(sequence_upper) < self.EXPLORATORY_KMER_SIZE: return [] detected_variants: List[Variant] = [] for gene, gene_info in self.DISEASE_GENES.items(): for pattern, variant_id in self._generate_gene_patterns(gene): if pattern.upper() not in sequence_upper: continue position = sequence_upper.find(pattern.upper()) confidence = self._calculate_variant_confidence( gene=gene, pattern=pattern, variant_id=variant_id, position=position, gene_info=gene_info, ) detected_variants.append( Variant( gene=gene, variant_id=f"exploratory:{variant_id}", type="Exploratory", description=f"Exploratory k-mer match for {gene}; not a disease call.", position=position, sequence_match=pattern, confidence=confidence, pathogenicity_score=0.0, pathogenicity_tier=3, pathogenicity_method="exploratory_kmer", model_confidence="Low", evidence_summary="Exploratory k-mer signal only; no aligned or annotated variant evidence.", ) ) return detected_variants def _get_variant_type(self, gene_info: Dict) -> str: """Get variant type from gene info""" if 'inheritance' in gene_info: if 'Dominant' in gene_info['inheritance']: return 'Dominant' elif 'Recessive' in gene_info['inheritance']: return 'Recessive' return 'Complex' def calculate_risk_score(self, variants: List[Variant], user_metadata: Optional[Dict] = None) -> float: """ Calculate disease risk score based on detected variants. Args: variants: List of detected variants user_metadata: Optional user data (age, gender, etc.) Returns: Risk score 0-100 """ if not variants: return 10.0 # Baseline population risk risk_score = 10.0 # Start with baseline for variant in variants: gene_info = self.DISEASE_GENES.get(variant.gene, {}) penetrance = gene_info.get('penetrance', 0.5) confidence = max( variant.confidence, variant.pathogenicity_score if variant.pathogenicity_score is not None else 0.0, ) # Contribution increases with penetrance and confidence risk_contribution = penetrance * confidence * 40 # Scale to 0-40 risk_score += risk_contribution # Age adjustment (higher risk with age for late-onset diseases) if user_metadata and 'age' in user_metadata: age = user_metadata['age'] age_factor = 1.0 + (age - 40) * 0.01 if age > 40 else 1.0 risk_score *= min(age_factor, 2.0) # Cap at 2x return min(risk_score, 100.0) class BiomarkerDetector: """Detects disease-associated biomarkers in sequences""" # Known biomarker database BIOMARKERS = { 'HER2': { 'type': 'Protein-coding', 'patterns': ['ERBB2', 'HER2_amplification', 'GRB7'], 'diseases': ['Breast Cancer'], 'significance': 'Therapeutic Target', 'clinical_use': 'Trastuzumab (Herceptin) eligibility' }, 'EGFR': { 'type': 'Protein-coding', 'patterns': ['EGFR', 'EGF_receptor'], 'diseases': ['Lung Cancer', 'Glioblastoma'], 'significance': 'Therapeutic Target', 'clinical_use': 'EGFR inhibitor therapy' }, 'KRAS': { 'type': 'Oncogene', 'patterns': ['KRAS', 'G12C', 'G12V'], 'diseases': ['Colorectal Cancer', 'Pancreatic Cancer', 'Lung Cancer'], 'significance': 'Prognostic Marker', 'clinical_use': 'Prognosis and treatment selection' }, 'BRAF': { 'type': 'Oncogene', 'patterns': ['BRAF', 'V600E'], 'diseases': ['Melanoma', 'Colorectal Cancer'], 'significance': 'Therapeutic Target', 'clinical_use': 'BRAF inhibitor therapy' }, 'ER': { 'type': 'Receptor', 'patterns': ['ESR1', 'ERalpha'], 'diseases': ['Breast Cancer'], 'significance': 'Treatment Indicator', 'clinical_use': 'Hormone therapy eligibility' }, 'PR': { 'type': 'Receptor', 'patterns': ['PGR', 'PRG'], 'diseases': ['Breast Cancer'], 'significance': 'Treatment Indicator', 'clinical_use': 'Hormone therapy eligibility' }, 'PD-L1': { 'type': 'Immune Checkpoint', 'patterns': ['CD274', 'PD-L1'], 'diseases': ['Lung Cancer', 'Melanoma', 'Colorectal Cancer'], 'significance': 'Therapeutic Target', 'clinical_use': 'Immunotherapy eligibility' }, 'MSI': { 'type': 'Genomic Signature', 'patterns': ['microsatellite_instability', 'MSI-H'], 'diseases': ['Colorectal Cancer', 'Gastric Cancer'], 'significance': 'Prognostic Marker', 'clinical_use': 'Immunotherapy response prediction' }, 'TMPRSS2-ERG': { 'type': 'Gene Fusion', 'patterns': ['TMPRSS2_ERG_fusion', 'ERG_overexpression'], 'diseases': ['Prostate Cancer'], 'significance': 'Prognostic Marker', 'clinical_use': 'Risk stratification' }, 'ABL1': { 'type': 'Oncogene', 'patterns': ['BCR_ABL', 'BCR_ABL1'], 'diseases': ['Chronic Myeloid Leukemia'], 'significance': 'Diagnostic Marker', 'clinical_use': 'TKI therapy target' } } def detect_biomarkers(self, sequence: str, variants: Optional[List[Variant]] = None) -> List[Biomarker]: """ Detect disease-associated biomarkers from variant annotations when available. Falls back to raw sequence pattern matching only when annotations are not provided. Args: sequence: DNA sequence string variants: Optional list of detected/annotated variants Returns: List of detected biomarkers """ if variants is not None: return self._detect_biomarkers_from_variants(variants) return self._detect_biomarkers_from_sequence(sequence) def _detect_biomarkers_from_variants(self, variants: List[Variant]) -> List[Biomarker]: """Detect biomarkers by matching variant gene annotations against biomarker aliases.""" detected_biomarkers = [] seen_biomarkers: Set[str] = set() variant_gene_symbols = self._extract_variant_gene_symbols(variants) for biomarker_name, biomarker_info in self.BIOMARKERS.items(): if biomarker_name in seen_biomarkers: continue biomarker_aliases = self._build_biomarker_aliases(biomarker_name, biomarker_info) if not variant_gene_symbols.intersection(biomarker_aliases): continue matched_gene_symbol = None matched_variant = None for variant in variants: gene_symbol = self._normalize_symbol(self._get_variant_gene_symbol(variant)) if not gene_symbol: continue if gene_symbol in biomarker_aliases: matched_gene_symbol = self._get_variant_gene_symbol(variant).upper() matched_variant = variant break if not matched_gene_symbol: continue position = self._get_variant_position(matched_variant) match_strength = self._calculate_biomarker_strength( biomarker_name=biomarker_name, pattern=matched_gene_symbol, biomarker_info=biomarker_info, position=position, ) biomarker = Biomarker( name=biomarker_name, biomarker_type=biomarker_info['type'], location=f"Gene annotation: {matched_gene_symbol}", sequence_pattern=matched_gene_symbol, position=position, length=len(matched_gene_symbol), match_strength=match_strength, associated_diseases=biomarker_info['diseases'], clinical_significance=biomarker_info['significance'] ) detected_biomarkers.append(biomarker) seen_biomarkers.add(biomarker_name) return detected_biomarkers def _detect_biomarkers_from_sequence(self, sequence: str) -> List[Biomarker]: """Fallback sequence-based biomarker detection.""" detected_biomarkers = [] sequence_upper = sequence.upper() for biomarker_name, biomarker_info in self.BIOMARKERS.items(): canonical_patterns = ( [biomarker_name] if len(self._normalize_symbol(biomarker_name)) >= 3 else [] ) patterns = _dedupe_preserve_order(canonical_patterns + list(biomarker_info['patterns'])) for pattern in patterns: pattern_upper = pattern.upper() # Look for pattern in sequence if pattern_upper in sequence_upper: position = sequence_upper.find(pattern_upper) match_strength = self._calculate_biomarker_strength( biomarker_name=biomarker_name, pattern=pattern_upper, biomarker_info=biomarker_info, position=position, ) biomarker = Biomarker( name=biomarker_name, biomarker_type=biomarker_info['type'], location=f"Position {position}", sequence_pattern=pattern_upper, position=position, length=len(pattern_upper), match_strength=match_strength, associated_diseases=biomarker_info['diseases'], clinical_significance=biomarker_info['significance'] ) detected_biomarkers.append(biomarker) break # Count biomarker once per sequence return detected_biomarkers def _extract_variant_gene_symbols(self, variants: List[Variant]) -> Set[str]: """Collect normalized gene symbols from variant annotations.""" symbols: Set[str] = set() for variant in variants: gene_symbol = self._normalize_symbol(self._get_variant_gene_symbol(variant)) if gene_symbol: symbols.add(gene_symbol) return symbols def _get_variant_gene_symbol(self, variant: Variant) -> str: """Read gene symbol from Variant-like records.""" if isinstance(variant, dict): value = variant.get('gene', '') else: value = getattr(variant, 'gene', '') return str(value).strip() def _get_variant_position(self, variant: Optional[Variant]) -> int: """Read variant position from Variant-like records.""" if variant is None: return 0 raw_value: object if isinstance(variant, dict): raw_value = variant.get('position', 0) else: raw_value = getattr(variant, 'position', 0) try: return int(raw_value) except (TypeError, ValueError): return 0 def _normalize_symbol(self, symbol: str) -> str: """Normalize gene/biomarker symbols for robust matching.""" return re.sub(r'[^A-Z0-9]+', '', symbol.upper()) def _build_biomarker_aliases(self, biomarker_name: str, biomarker_info: Dict) -> Set[str]: """Build normalized aliases from biomarker name and configured patterns.""" aliases: Set[str] = set() source_terms = [biomarker_name] + list(biomarker_info.get('patterns', [])) stopwords = {'AMPLIFICATION', 'OVEREXPRESSION', 'FUSION', 'RECEPTOR', 'INSTABILITY'} for term in source_terms: term_upper = str(term).upper() normalized_term = self._normalize_symbol(term_upper) if normalized_term: aliases.add(normalized_term) for token in re.split(r'[^A-Z0-9]+', term_upper): if not token: continue if token in stopwords: continue if token.isalpha() and len(token) > 6: continue normalized_token = self._normalize_symbol(token) if normalized_token: aliases.add(normalized_token) return aliases def _calculate_biomarker_strength( self, biomarker_name: str, pattern: str, biomarker_info: Dict, position: int, ) -> float: """Derive a deterministic biomarker strength from pattern specificity.""" significance = biomarker_info.get('significance', '') significance_bonus = { 'Therapeutic Target': 0.2, 'Treatment Indicator': 0.16, 'Prognostic Marker': 0.1, 'Diagnostic Marker': 0.08, }.get(significance, 0.05) biomarker_upper = biomarker_name.upper() pattern_bonus = 0.16 if biomarker_upper in pattern or pattern in biomarker_upper else 0.08 complexity_bonus = 0.08 if any(token in pattern for token in ('_', '-', '+')) or len(pattern) > 6 else 0.03 position_bonus = min((position % 13) * 0.004, 0.04) strength = 0.4 + significance_bonus + pattern_bonus + complexity_bonus + position_bonus return round(_clamp(strength, 0.35, 0.98), 4) class DiseaseAssociationMapper: """Maps detected variants and biomarkers to disease associations""" # Disease-variant/biomarker associations DISEASE_ASSOCIATIONS = { 'Breast Cancer': { 'variants': ['BRCA1', 'BRCA2', 'TP53'], 'biomarkers': ['HER2', 'ER', 'PR'], 'inheritance': 'Autosomal Dominant (hereditary)', 'prevalence': 0.121, # ~12% lifetime risk 'baseline_risk': 12.0, 'supported_sexes': ['female'], 'min_age': 18, 'max_age': None, 'strict_rule': True, 'male_exception_policy': 'strong_brca_only', 'clinical_actionability': 0.86, 'severity_weight': 0.93, }, 'Ovarian Cancer': { 'variants': ['BRCA1', 'BRCA2'], 'biomarkers': ['HER2'], 'inheritance': 'Autosomal Dominant', 'prevalence': 0.014, 'baseline_risk': 1.4, 'supported_sexes': ['female'], 'min_age': 18, 'max_age': None, 'strict_rule': True, 'male_exception_policy': None, 'clinical_actionability': 0.79, 'severity_weight': 0.91, }, 'Colorectal Cancer': { 'variants': ['TP53', 'KRAS'], 'biomarkers': ['KRAS', 'MSI'], 'inheritance': 'Complex', 'prevalence': 0.046, 'baseline_risk': 4.6, 'supported_sexes': ['all'], 'min_age': 18, 'max_age': None, 'strict_rule': False, 'male_exception_policy': None, 'clinical_actionability': 0.8, 'severity_weight': 0.85, }, 'Lung Cancer': { 'variants': ['TP53'], 'biomarkers': ['EGFR', 'KRAS', 'PD-L1'], 'inheritance': 'Complex (mostly sporadic)', 'prevalence': 0.065, 'baseline_risk': 6.5, 'supported_sexes': ['all'], 'min_age': 18, 'max_age': None, 'strict_rule': False, 'male_exception_policy': None, 'clinical_actionability': 0.58, 'severity_weight': 0.9, }, "Alzheimer's Disease": { 'variants': ['APOE'], 'biomarkers': [], 'inheritance': 'Complex (late-onset)', 'prevalence': 0.065, 'baseline_risk': 6.5, 'supported_sexes': ['all'], 'min_age': 40, 'max_age': None, 'strict_rule': False, 'male_exception_policy': None, 'clinical_actionability': 0.42, 'severity_weight': 0.88, }, 'Type 2 Diabetes': { 'variants': ['FTO', 'TCF7L2', 'MTHFR'], 'biomarkers': [], 'inheritance': 'Complex (multifactorial)', 'prevalence': 0.097, 'baseline_risk': 9.7, 'supported_sexes': ['all'], 'min_age': 18, 'max_age': None, 'strict_rule': False, 'male_exception_policy': None, 'clinical_actionability': 0.74, 'severity_weight': 0.7, }, 'Cystic Fibrosis': { 'variants': ['CFTR'], 'biomarkers': [], 'inheritance': 'Autosomal Recessive', 'prevalence': 0.0003, 'baseline_risk': 0.03, 'supported_sexes': ['all'], 'min_age': 0, 'max_age': None, 'strict_rule': False, 'male_exception_policy': None, 'clinical_actionability': 0.61, 'severity_weight': 0.94, }, 'Hemochromatosis': { 'variants': ['HFE'], 'biomarkers': [], 'inheritance': 'Autosomal Recessive', 'prevalence': 0.001, 'baseline_risk': 0.1, 'supported_sexes': ['all'], 'min_age': 18, 'max_age': None, 'strict_rule': False, 'male_exception_policy': None, 'clinical_actionability': 0.73, 'severity_weight': 0.68, }, 'Prostate Cancer': { 'variants': ['BRCA1', 'BRCA2', 'TP53'], 'biomarkers': ['TMPRSS2-ERG'], 'inheritance': 'Complex', 'prevalence': 0.121, 'baseline_risk': 12.1, 'supported_sexes': ['male'], 'min_age': 18, 'max_age': None, 'strict_rule': True, 'male_exception_policy': None, 'clinical_actionability': 0.82, 'severity_weight': 0.84, }, 'Melanoma': { 'variants': ['TP53'], 'biomarkers': ['BRAF', 'PD-L1'], 'inheritance': 'Complex', 'prevalence': 0.024, 'baseline_risk': 2.4, 'supported_sexes': ['all'], 'min_age': 18, 'max_age': None, 'strict_rule': False, 'male_exception_policy': None, 'clinical_actionability': 0.55, 'severity_weight': 0.82, }, 'Chronic Myeloid Leukemia': { 'variants': [], 'biomarkers': ['ABL1'], 'inheritance': 'Somatic (acquired)', 'prevalence': 0.0002, 'baseline_risk': 0.02, 'supported_sexes': ['all'], 'min_age': 18, 'max_age': None, 'strict_rule': False, 'male_exception_policy': None, 'clinical_actionability': 0.64, 'severity_weight': 0.81, } } def __init__(self): self.last_suppressed_associations: List[Dict] = [] def is_eligible( self, user_metadata: Optional[Dict], disease_name: str, evidence_context: Optional[Dict] = None, ) -> Dict[str, object]: """Determine if a disease association is demographically eligible.""" disease_info = self.DISEASE_ASSOCIATIONS.get(disease_name, {}) supported_sexes = disease_info.get('supported_sexes', ['all']) strict_rule = bool(disease_info.get('strict_rule', False)) male_exception_policy = disease_info.get('male_exception_policy') age = None if not user_metadata else user_metadata.get('age') user_sex = _normalize_sex(None if not user_metadata else user_metadata.get('gender')) min_age = disease_info.get('min_age') max_age = disease_info.get('max_age') if min_age is not None and age is not None and age < min_age: return { 'eligible': False, 'reason': f"Age {age} is below the supported range for {disease_name}.", 'directive': 'suppress', } if max_age is not None and age is not None and age > max_age: return { 'eligible': False, 'reason': f"Age {age} is above the supported range for {disease_name}.", 'directive': 'suppress', } if 'all' in supported_sexes: return { 'eligible': True, 'reason': f"{disease_name} is not sex-restricted in this research model.", 'directive': 'none', } if user_sex in supported_sexes: return { 'eligible': True, 'reason': f"User metadata is compatible with {disease_name}.", 'directive': 'none', } if disease_name == 'Breast Cancer' and user_sex == 'male' and male_exception_policy == 'strong_brca_only': if self._passes_male_breast_exception(evidence_context or {}): return { 'eligible': True, 'reason': 'Male breast-cancer exception passed because strong BRCA evidence is present.', 'directive': 'none', } return { 'eligible': False, 'reason': 'Male breast-cancer signals are suppressed unless strong BRCA evidence clears the exception policy.', 'directive': 'suppress', } if strict_rule: supported = ", ".join(sex.title() for sex in supported_sexes) return { 'eligible': False, 'reason': f"{disease_name} is restricted to {supported} users in this research model.", 'directive': 'suppress', } return { 'eligible': True, 'reason': f"Demographic metadata for {disease_name} is incomplete; applying a cautionary penalty.", 'directive': 'penalize', } def _passes_male_breast_exception(self, evidence_context: Dict) -> bool: """Allow rare male breast-cancer prioritization only for strong BRCA evidence.""" for variant in evidence_context.get('variants', []): if variant.gene not in {'BRCA1', 'BRCA2'}: continue signal_strength = self._variant_signal_strength(variant) confidence = self._variant_confidence_score(variant) pathogenicity = float(variant.pathogenicity_score or 0.0) if signal_strength >= 0.82 and max(confidence, pathogenicity) >= 0.88: return True return False def map_disease_associations(self, variants: List[Variant], biomarkers: List[Biomarker], user_metadata: Optional[Dict] = None) -> List[DiseaseAssociation]: """ Map detected variants and biomarkers to disease associations. Args: variants: List of detected variants biomarkers: List of detected biomarkers user_metadata: Optional user data Returns: List of disease associations sorted by risk """ associations = [] self.last_suppressed_associations = [] for disease, disease_info in self.DISEASE_ASSOCIATIONS.items(): # Check for variant matches matching_variants = self._dedupe_matching_variants( [v for v in variants if v.gene in disease_info['variants']] ) matching_biomarkers = self._dedupe_matching_biomarkers( [b for b in biomarkers if b.name in disease_info['biomarkers']] ) if not matching_variants and not matching_biomarkers: continue # Skip diseases with no matches evidence_context = { 'variants': matching_variants, 'biomarkers': matching_biomarkers, } eligibility = self.is_eligible(user_metadata, disease, evidence_context) if not eligibility['eligible']: self.last_suppressed_associations.append( { 'disease': disease, 'reason': eligibility['reason'], 'directive': eligibility['directive'], 'variants': len(matching_variants), 'biomarkers': len(matching_biomarkers), 'risk_label': f"Suppressed research signal for {disease}", } ) continue scoring = self._score_disease_association( disease=disease, disease_info=disease_info, matching_variants=matching_variants, matching_biomarkers=matching_biomarkers, directive=str(eligibility['directive']), ) association = DiseaseAssociation( disease=disease, risk_score=scoring['risk_score'], confidence=scoring['confidence'], detected_variants=matching_variants, detected_biomarkers=matching_biomarkers, inheritance_pattern=disease_info['inheritance'], prevalence=disease_info['prevalence'], evidence_strength=scoring['evidence_strength'], clinical_actionability=scoring['clinical_actionability'], risk_increase=scoring['risk_increase'], confidence_score=scoring['confidence_score'], severity_weight=scoring['severity_weight'], priority_score=scoring['priority_score'], priority_category=scoring['priority_category'], eligibility_reason=str(eligibility['reason']), uncertainty_messages=scoring['uncertainty_messages'], replicated_signal=scoring['replicated_signal'], ) associations.append(association) # Sort deterministically by priority then risk. associations.sort( key=lambda x: ( x.priority_score, x.risk_score, x.confidence_score, x.disease, ), reverse=True, ) return associations def _score_disease_association( self, disease: str, disease_info: Dict, matching_variants: List[Variant], matching_biomarkers: List[Biomarker], directive: str, ) -> Dict[str, object]: """Score an eligible association using deterministic, research-oriented heuristics.""" base_risk = float(disease_info['baseline_risk']) variant_genes = {variant.gene.upper() for variant in matching_variants} independent_biomarkers = [ biomarker for biomarker in matching_biomarkers if biomarker.name.upper() not in variant_genes and biomarker.sequence_pattern.upper() not in variant_genes ] variant_strengths = [self._variant_signal_strength(variant) for variant in matching_variants] biomarker_strengths = [ self._biomarker_signal_strength(biomarker) for biomarker in independent_biomarkers ] variant_confidences = [self._variant_confidence_score(variant) for variant in matching_variants] biomarker_confidences = [ round(biomarker.match_strength, 4) for biomarker in independent_biomarkers ] all_strengths = variant_strengths + biomarker_strengths all_confidences = variant_confidences + biomarker_confidences num_matches = len(all_strengths) variant_contributions = sorted( ( self._variant_risk_contribution(variant, strength) for variant, strength in zip(matching_variants, variant_strengths, strict=True) ), reverse=True, ) variant_risk = sum(variant_contributions[:2]) biomarker_risk = sum(sorted((strength * 16.0 for strength in biomarker_strengths), reverse=True)[:1]) synergy_bonus = 6.0 if matching_variants and independent_biomarkers else 0.0 total_risk = min(base_risk + variant_risk + biomarker_risk + synergy_bonus, 100.0) risk_increase = 0.0 if total_risk <= base_risk else (total_risk - base_risk) / max(100.0 - base_risk, 1.0) evidence_strength = round(sum(all_strengths) / num_matches, 4) if num_matches else 0.0 confidence_score = round(sum(all_confidences) / num_matches, 4) if num_matches else 0.0 replicated_signal = ( num_matches >= 2 or (matching_variants and matching_biomarkers) or sum(1 for strength in all_strengths if strength >= 0.72) >= 2 ) clinical_actionability = float(disease_info.get('clinical_actionability', 0.5)) severity_weight = float(disease_info.get('severity_weight', 0.5)) confidence = self._confidence_level_for_score(confidence_score, num_matches, evidence_strength) low_confidence = confidence_score < 0.55 or confidence in {ConfidenceLevel.LOW, ConfidenceLevel.VERY_LOW} weak_replication = not replicated_signal weak_evidence = evidence_strength < 0.45 priority_score = ( (evidence_strength * 0.30) + (clinical_actionability * 0.25) + (risk_increase * 0.20) + (confidence_score * 0.15) + (severity_weight * 0.10) ) uncertainty_messages: List[str] = [] if low_confidence: priority_score *= 0.3 uncertainty_messages.append( "Confidence is below the preferred threshold, so this signal should be treated cautiously." ) if weak_replication: priority_score *= 0.5 uncertainty_messages.append( "Evidence is not replicated across multiple independent signals." ) if weak_evidence: priority_score *= 0.65 uncertainty_messages.append( "Evidence strength is modest and may reflect an exploratory association." ) if directive == 'penalize': priority_score *= 0.7 uncertainty_messages.append( "Demographic metadata is incomplete, so a cautionary penalty was applied." ) priority_score = round(_clamp(priority_score), 4) priority_category = self._categorize_priority( priority_score=priority_score, evidence_strength=evidence_strength, clinical_actionability=clinical_actionability, risk_increase=risk_increase, confidence_score=confidence_score, ) return { 'risk_score': round(min(total_risk, 100.0), 4), 'confidence': confidence, 'evidence_strength': evidence_strength, 'clinical_actionability': round(clinical_actionability, 4), 'risk_increase': round(risk_increase, 4), 'confidence_score': confidence_score, 'severity_weight': round(severity_weight, 4), 'priority_score': priority_score, 'priority_category': priority_category, 'uncertainty_messages': uncertainty_messages, 'replicated_signal': replicated_signal, } @staticmethod def _dedupe_matching_variants(variants: List[Variant]) -> List[Variant]: unique: Dict[Tuple[str, str, int, str, str], Variant] = {} for variant in variants: key = ( variant.gene.upper(), variant.variant_id.upper(), int(variant.position or 0), variant.reference.upper(), variant.alternate.upper(), ) existing = unique.get(key) if existing is None or float(variant.confidence) > float(existing.confidence): unique[key] = variant return list(unique.values()) @staticmethod def _dedupe_matching_biomarkers(biomarkers: List[Biomarker]) -> List[Biomarker]: unique: Dict[str, Biomarker] = {} for biomarker in biomarkers: key = biomarker.name.upper() existing = unique.get(key) if existing is None or biomarker.match_strength > existing.match_strength: unique[key] = biomarker return list(unique.values()) def _variant_signal_strength(self, variant: Variant) -> float: """Estimate variant evidence strength from consequence class and annotation confidence.""" confidence_component = self._variant_confidence_score(variant) pathogenicity_component = float(variant.pathogenicity_score or 0.0) annotation_bonus = { 1: 0.12, 2: 0.07, 3: 0.02, }.get(variant.pathogenicity_tier, 0.0) type_bonus = { 'Frameshift': 0.18, 'Nonsense': 0.16, 'Missense': 0.11, 'Synonymous': -0.08, 'SNP': 0.07, }.get(variant.type, 0.04) strength = max(confidence_component, pathogenicity_component * 0.95) strength += annotation_bonus + type_bonus return round(_clamp(strength, 0.1, 1.0), 4) def _variant_confidence_score(self, variant: Variant) -> float: """Convert variant evidence fields into a bounded confidence score.""" base = float(variant.confidence) pathogenicity = float(variant.pathogenicity_score or 0.0) tier_bonus = { 1: 0.12, 2: 0.07, 3: 0.02, }.get(variant.pathogenicity_tier, 0.0) model_bonus = { 'very high': 0.14, 'high': 0.1, 'medium': 0.04, 'moderate': 0.04, 'low': -0.06, }.get(variant.model_confidence.strip().lower(), 0.0) combined = max(base, pathogenicity) + tier_bonus + model_bonus return round(_clamp(combined, 0.05, 1.0), 4) def _variant_risk_contribution(self, variant: Variant, strength: float) -> float: """Turn variant evidence into a deterministic risk contribution.""" penetrance = float(MutationAnalyzer.DISEASE_GENES.get(variant.gene, {}).get('penetrance', 0.35)) return (strength * 18.0) + (penetrance * 16.0) def _biomarker_signal_strength(self, biomarker: Biomarker) -> float: """Estimate biomarker evidence strength from match quality and significance.""" significance_bonus = { 'Therapeutic Target': 0.08, 'Treatment Indicator': 0.06, 'Prognostic Marker': 0.04, 'Diagnostic Marker': 0.03, }.get(biomarker.clinical_significance, 0.01) strength = biomarker.match_strength + significance_bonus return round(_clamp(strength, 0.05, 1.0), 4) def _confidence_level_for_score( self, confidence_score: float, num_matches: int, evidence_strength: float, ) -> ConfidenceLevel: """Map a numeric confidence estimate to the response enum.""" if confidence_score >= 0.84 and evidence_strength >= 0.7 and num_matches >= 2: return ConfidenceLevel.VERY_HIGH if confidence_score >= 0.72 and evidence_strength >= 0.58: return ConfidenceLevel.HIGH if confidence_score >= 0.55: return ConfidenceLevel.MODERATE if confidence_score >= 0.35: return ConfidenceLevel.LOW return ConfidenceLevel.VERY_LOW def _categorize_priority( self, priority_score: float, evidence_strength: float, clinical_actionability: float, risk_increase: float, confidence_score: float, ) -> str: """Assign the research-oriented priority category.""" if ( priority_score >= 0.72 and evidence_strength >= 0.65 and clinical_actionability >= 0.65 and confidence_score >= 0.6 and risk_increase >= 0.35 ): return 'High Priority' if priority_score >= 0.5 and evidence_strength >= 0.45: return 'Moderate Priority' if priority_score >= 0.3: return 'Informational' return 'Research Signal' class PersonalizedRecommendationEngine: """Generates personalized recommendations based on genomic profile""" # Therapeutic recommendations database THERAPEUTIC_DATABASE = { 'Breast Cancer': { 'first_line': [ {'drug': 'Tamoxifen', 'indication': 'ER+ tumors', 'biomarker': 'ER+', 'notes': 'Monitor for side effects'}, {'drug': 'Aromatase Inhibitors (AI)', 'indication': 'Postmenopausal ER+ tumors', 'biomarker': 'ER+', 'notes': 'Bone health monitoring'}, ], 'targeted': [ {'drug': 'Trastuzumab (Herceptin)', 'indication': 'HER2+ tumors', 'biomarker': 'HER2+', 'notes': 'Requires cardiac monitoring'}, {'drug': 'Pertuzumab', 'indication': 'HER2+ advanced disease', 'biomarker': 'HER2+', 'notes': 'Used with Trastuzumab'}, {'drug': 'PARP Inhibitors (Olaparib)', 'indication': 'BRCA1/2 mutations', 'biomarker': 'BRCA+', 'notes': 'Maintenance therapy'}, ], 'lifestyle': ['Regular exercise', 'Mediterranean diet', 'Stress management', 'Weight management'], 'monitoring': ['Regular mammography', 'Clinical breast exams', 'Tumor markers'] }, "Alzheimer's Disease": { 'first_line': [ {'drug': 'Donepezil', 'indication': 'Mild to moderate AD', 'biomarker': 'APOE-ε4', 'notes': 'Cholinesterase inhibitor'}, {'drug': 'Memantine', 'indication': 'Moderate to severe AD', 'biomarker': 'General', 'notes': 'NMDA antagonist'}, ], 'targeted': [ {'drug': 'Lecanemab', 'indication': 'Early cognitive decline', 'biomarker': 'Amyloid-β+', 'notes': 'Anti-amyloid monoclonal'}, ], 'lifestyle': ['Cognitive training', 'Mediterranean diet', 'Physical activity', 'Social engagement'], 'monitoring': ['Cognitive testing', 'MRI surveillance', 'Caregiver support'] }, 'Type 2 Diabetes': { 'first_line': [ {'drug': 'Metformin', 'indication': 'First-line agent', 'biomarker': 'General', 'notes': 'Gastrointestinal side effects'}, {'drug': 'Lifestyle modification', 'indication': 'Diet and exercise', 'biomarker': 'General', 'notes': 'Most important intervention'}, ], 'targeted': [ {'drug': 'GLP-1 Agonists', 'indication': 'Additional glucose control needed', 'biomarker': 'FTO+', 'notes': 'Weight loss benefit'}, {'drug': 'SGLT2 Inhibitors', 'indication': 'Cardiovascular/renal protection', 'biomarker': 'General', 'notes': 'Additional benefits beyond glucose'}, ], 'lifestyle': ['Low glycemic diet', 'Regular exercise (150 min/week)', 'Weight loss', 'Stress management'], 'monitoring': ['HbA1c testing', 'Lipid panel', 'Kidney function', 'Blood pressure'] }, 'Colorectal Cancer': { 'first_line': [ {'drug': '5-Fluorouracil (5-FU)', 'indication': 'Standard chemotherapy', 'biomarker': 'General', 'notes': 'Often combined with Leucovorin'}, ], 'targeted': [ {'drug': 'Cetuximab', 'indication': 'KRAS wild-type tumors', 'biomarker': 'KRAS-WT', 'notes': 'EGFR inhibitor'}, {'drug': 'Pembrolizumab', 'indication': 'MSI-H tumors', 'biomarker': 'MSI-H', 'notes': 'Checkpoint inhibitor'}, ], 'lifestyle': ['High-fiber diet', 'Regular exercise', 'Limited alcohol', 'No smoking'], 'monitoring': ['CEA tumor marker', 'Colonoscopy surveillance', 'Imaging studies'] }, 'Hemochromatosis': { 'first_line': [ {'drug': 'Phlebotomy', 'indication': 'Iron removal', 'biomarker': 'HFE+', 'notes': 'Induction phase: weekly'}, {'drug': 'Deferasirox', 'indication': 'Iron chelation if phlebotomy not tolerated', 'biomarker': 'HFE+', 'notes': 'Oral agent'}, ], 'targeted': [ {'drug': 'Dietary iron restriction', 'indication': 'Maintenance therapy', 'biomarker': 'HFE+', 'notes': 'Avoid iron supplements'}, ], 'lifestyle': ['Low iron diet', 'Avoid alcohol', 'Avoid raw shellfish', 'Regular monitoring'], 'monitoring': ['Serum ferritin', 'Transferrin saturation', 'Liver function', 'Cardiac assessment'] } } # Pharmacogenomic guidance PHARMACOGENOMIC_GUIDANCE = { 'CYP2D6': { 'enzyme': 'Cytochrome P450 2D6', 'substrates': ['Codeine', 'Tramadol', 'Tamoxifen', 'Fluoxetine', 'Risperidone'], 'phenotypes': { 'Ultra-rapid metabolizer': {'action': 'May require higher doses or alternative drugs', 'risk': 'Therapeutic failure'}, 'Rapid metabolizer': {'action': 'Standard dosing usually appropriate', 'risk': 'Slight therapeutic benefit reduction'}, 'Normal metabolizer': {'action': 'Standard dosing', 'risk': 'No special concerns'}, 'Intermediate metabolizer': {'action': 'Monitor closely; may need dose adjustment', 'risk': 'Reduced efficacy or increased side effects'}, 'Poor metabolizer': {'action': 'Use alternative drug or significantly reduce dose', 'risk': 'Severe side effects'}, } }, 'CYP2C19': { 'enzyme': 'Cytochrome P450 2C19', 'substrates': ['Clopidogrel', 'Omeprazole', 'Escitalopram', 'Pantoprazole', 'Voriconazole'], 'phenotypes': { 'Rapid metabolizer': {'action': 'Higher doses needed for therapeutic effect', 'risk': 'Reduced efficacy'}, 'Normal metabolizer': {'action': 'Standard dosing', 'risk': 'No special concerns'}, 'Intermediate metabolizer': {'action': 'May need dose adjustment', 'risk': 'Monitor for efficacy'}, 'Poor metabolizer': {'action': 'Use alternative or reduce dose significantly', 'risk': 'Increased side effects'}, } }, 'TPMT': { 'enzyme': 'Thiopurine S-methyltransferase', 'substrates': ['Azathioprine', '6-Mercaptopurine', '6-Thioguanine'], 'phenotypes': { 'High activity': {'action': 'Standard dosing', 'risk': 'No special concerns'}, 'Intermediate activity': {'action': 'Reduce dose by 30-50%', 'risk': 'Bone marrow suppression risk'}, 'Low activity': {'action': 'Consider alternative; if used, significantly reduce dose', 'risk': 'Severe toxicity'}, } }, 'VKORC1': { 'enzyme': 'Vitamin K Epoxide Reductase', 'substrates': ['Warfarin'], 'phenotypes': { 'High activity': {'action': 'Higher warfarin doses usually needed', 'risk': 'Subtherapeutic INR'}, 'Normal activity': {'action': 'Standard dosing', 'risk': 'No special concerns'}, 'Low activity': {'action': 'Lower warfarin doses required', 'risk': 'Bleeding risk'}, } } } PRIORITY_WEIGHTS = { 'High Priority': 1.0, 'Moderate Priority': 0.82, 'Informational': 0.62, 'Research Signal': 0.42, } def generate_recommendations( self, associations: List[DiseaseAssociation], user_metadata: Dict, suppressed_associations: Optional[List[Dict]] = None, variant_evidence: Optional[List[Any]] = None, ) -> Dict: """ Generate personalized recommendations based on disease associations. Args: associations: List of disease associations user_metadata: User demographics and clinical data Returns: Dictionary with comprehensive recommendations """ recommendations = { 'high_priority': [], 'moderate_priority': [], 'informational': [], 'research_signal': [], 'lifestyle': [], 'monitoring': [], 'pharmacogenomics': [], 'disclaimers': [], 'suppressed_insights': list(suppressed_associations or []), 'risk_stratification': {}, } # Add standard disclaimer recommendations['disclaimers'].append( "These outputs describe research-based genetic predisposition signals only. They are not diagnoses and should not be used for patient-care decisions." ) recommendations['disclaimers'].append( "Eligibility rules suppress biologically implausible signals before prioritization, but all remaining insights still require external validation." ) grouped_keys = { 'High Priority': 'high_priority', 'Moderate Priority': 'moderate_priority', 'Informational': 'informational', 'Research Signal': 'research_signal', } user_factors = self._evaluate_user_factors(user_metadata, associations) lifestyle_candidates: List[Dict[str, object]] = [] monitoring_candidates: List[Dict[str, object]] = [] for assoc in sorted( associations, key=lambda item: (item.priority_score, item.risk_score, item.disease), reverse=True, ): insight = self._build_recommendation_entry(assoc) recommendations[grouped_keys[assoc.priority_category]].append(insight) therapy_info = self.THERAPEUTIC_DATABASE.get(assoc.disease, {}) for lifestyle in therapy_info.get('lifestyle', []): lifestyle_candidates.append( { 'disease': assoc.disease, 'priority_category': assoc.priority_category, 'priority_score': assoc.priority_score, 'risk_score': assoc.risk_score, 'evidence_strength': assoc.evidence_strength, 'text': lifestyle, } ) for monitoring in therapy_info.get('monitoring', []): monitoring_candidates.append( { 'disease': assoc.disease, 'priority_category': assoc.priority_category, 'priority_score': assoc.priority_score, 'risk_score': assoc.risk_score, 'evidence_strength': assoc.evidence_strength, 'text': monitoring, } ) recommendations['lifestyle'] = self._build_lifestyle_recommendations( lifestyle_candidates, user_factors ) recommendations['monitoring'] = self._build_monitoring_recommendations( monitoring_candidates, user_factors ) recommendations['risk_stratification'] = { 'score': user_factors['risk_score'], 'bmi': user_factors['bmi'], 'risk_factors': user_factors['risk_factors'], 'detected_associations': user_factors['detected_associations'], 'has_elevated_genomic_risk': user_factors['has_elevated_genomic_risk'], } # Add pharmacogenomic guidance recommendations['pharmacogenomics'] = self._get_pharmacogenomic_guidance( user_metadata, variant_evidence=[] if variant_evidence is None else variant_evidence, ) return recommendations def _evaluate_user_factors( self, user_metadata: Optional[Dict], associations: List[DiseaseAssociation], ) -> Dict[str, object]: age = None if user_metadata: raw_age = user_metadata.get('age') if raw_age is None: age = None else: try: age = int(raw_age) except (ValueError, TypeError): age = None sex = _normalize_sex(None if not user_metadata else user_metadata.get('gender')) weight_kg = _to_float(None if not user_metadata else user_metadata.get('weight')) height_m = _extract_height_meters(user_metadata) bmi = _calculate_bmi(weight_kg, height_m) risk_factors: List[str] = [] risk_score = 0.0 if bmi is not None: if bmi > 30.0: risk_score += 2.0 risk_factors.append('obesity') elif bmi > 25.0: risk_score += 1.5 risk_factors.append('overweight') elif bmi < 18.5: risk_score += 1.5 risk_factors.append('underweight') if age is not None: if age > 65: risk_score += 2.0 risk_factors.append('advanced_age') elif age > 50: risk_score += 1.5 risk_factors.append('older_age') elif age >= 40: risk_score += 1.0 risk_factors.append('midlife') detected_associations = [assoc.disease for assoc in associations] has_elevated_genomic_risk = any( assoc.priority_category in {'High Priority', 'Moderate Priority'} or assoc.risk_score >= 25.0 for assoc in associations ) if has_elevated_genomic_risk: risk_score += 2.0 risk_factors.append('elevated_genomic_risk') return { 'age': age, 'sex': sex, 'weight_kg': weight_kg, 'height_m': height_m, 'bmi': round(bmi, 2) if bmi is not None else None, 'risk_score': round(risk_score, 2), 'risk_factors': _dedupe_preserve_order(risk_factors), 'detected_associations': detected_associations, 'has_elevated_genomic_risk': has_elevated_genomic_risk, } def _association_relevance_score(self, candidate: Dict[str, object]) -> float: priority_category = str(candidate.get('priority_category', 'Research Signal')) priority_weight = self.PRIORITY_WEIGHTS.get(priority_category, self.PRIORITY_WEIGHTS['Research Signal']) priority_score = float(candidate.get('priority_score', 0.0) or 0.0) risk_score = float(candidate.get('risk_score', 0.0) or 0.0) evidence_strength = float(candidate.get('evidence_strength', 0.0) or 0.0) return (priority_weight * 40.0) + (priority_score * 35.0) + (risk_score * 0.45) + (evidence_strength * 20.0) def _format_weight_reduction_target(self, user_factors: Dict[str, object]) -> Optional[str]: current_weight = _to_float(user_factors.get('weight_kg')) if current_weight is None or current_weight <= 0: return None lower_target = current_weight * 0.9 upper_target = current_weight * 0.95 return ( f"Target 5-10% weight reduction (from current {current_weight:.1f}kg " f"to {lower_target:.1f}-{upper_target:.1f}kg range)." ) def _build_lifestyle_recommendations( self, candidates: List[Dict[str, object]], user_factors: Dict[str, object], ) -> List[str]: recommendations: List[str] = [] bmi = _to_float(user_factors.get('bmi')) weight_target = self._format_weight_reduction_target(user_factors) for candidate in candidates: if self._association_relevance_score(candidate) < 45.0: continue text = str(candidate.get('text', '')).strip() lower_text = text.lower() if 'weight loss' in lower_text or 'weight management' in lower_text: if bmi is None: continue if bmi > 25.0 and weight_target: recommendations.append(weight_target) continue recommendations.append(text) if bmi is not None and bmi < 18.5: recommendations.append( "Increase caloric intake by ~300-500 kcal/day with nutrient-dense foods until BMI reaches at least 18.5." ) return _dedupe_preserve_order(recommendations) def _build_monitoring_recommendations( self, candidates: List[Dict[str, object]], user_factors: Dict[str, object], ) -> List[str]: recommendations: List[str] = [] age = user_factors.get('age') sex = user_factors.get('sex') has_elevated_genomic_risk = bool(user_factors.get('has_elevated_genomic_risk')) for candidate in candidates: if self._association_relevance_score(candidate) < 42.0: continue recommendations.append(str(candidate.get('text', '')).strip()) if isinstance(age, int): if age > 65: recommendations.append("Screening interval: every 6-12 months for elevated-risk domains.") elif age > 50: recommendations.append("Screening interval: every 12 months for elevated-risk domains.") elif age >= 40 and has_elevated_genomic_risk: recommendations.append("Screening interval: every 12-24 months for elevated-risk domains.") if sex == 'female' and isinstance(age, int) and age > 50 and has_elevated_genomic_risk: recommendations.append( "Postmenopausal screening: annual bone-density and cardiometabolic risk review." ) return _dedupe_preserve_order(recommendations) def _build_recommendation_entry(self, assoc: DiseaseAssociation) -> Dict: """Create research-oriented recommendation payloads while preserving legacy keys.""" therapy_info = self.THERAPEUTIC_DATABASE.get(assoc.disease, {}) evidence_bits = [variant.gene for variant in assoc.detected_variants] evidence_bits.extend(biomarker.name for biomarker in assoc.detected_biomarkers) evidence_text = ", ".join(_dedupe_preserve_order(evidence_bits)) if evidence_bits else "current sequence patterns" title = f"Elevated genetic predisposition for {assoc.disease}" if assoc.priority_category == 'Research Signal': title = f"Preliminary research signal related to {assoc.disease}" actions = _dedupe_preserve_order( therapy_info.get('monitoring', [])[:2] + therapy_info.get('lifestyle', [])[:2] ) uncertainty_message = " ".join(assoc.uncertainty_messages).strip() if not uncertainty_message and assoc.confidence_score < 0.6: uncertainty_message = ( "Confidence is below the preferred threshold, so this association should be treated as exploratory." ) action_summary = ( "Consider preventive risk review and confirmatory follow-up in an appropriate research or clinical setting." ) if actions: action_summary = f"Preventive follow-up could prioritize {actions[0].lower()}." return { 'disease': assoc.disease, 'title': title, 'summary': ( f"{title} supported by {evidence_text}. This is a predisposition signal rather than a diagnosis." ), 'category': 'Preventive Health Recommendation', 'priority_category': assoc.priority_category, 'priority_score': round(assoc.priority_score, 4), 'treatment': action_summary, 'indication': 'Research-oriented risk management follow-up', 'confidence': assoc.confidence.value, 'confidence_score': round(assoc.confidence_score, 4), 'risk_score': round(assoc.risk_score, 4), 'risk_increase': round(assoc.risk_increase, 4), 'notes': assoc.eligibility_reason, 'recommended_actions': actions, 'research_context': [entry['drug'] for entry in therapy_info.get('targeted', [])[:2]], 'uncertainty_message': uncertainty_message, 'evidence_strength': round(assoc.evidence_strength, 4), } def _get_pharmacogenomic_guidance( self, user_metadata: Dict, variant_evidence: Optional[List[Any]] = None, ) -> List[Dict]: """Get pharmacogenomic guidance from detected PGx variant evidence.""" guidance = [] age = user_metadata.get('age') if user_metadata else None sex = _normalize_sex(None if not user_metadata else user_metadata.get('gender')) demographic_context = "" if sex == 'female' and isinstance(age, int) and 18 <= age <= 50: demographic_context = ( "Demographic context: Female of childbearing age; consider pregnancy-safe alternatives and " "confirm pregnancy status before selecting metabolized therapies." ) elif sex in {'female', 'male'} and age is not None: demographic_context = f"Demographic context: {sex.title()}, age {age}." elif sex in {'female', 'male'}: demographic_context = f"Demographic context: {sex.title()}." metabolizer_phenotypes = self._infer_pharmacogenomic_phenotypes(variant_evidence) if variant_evidence is None: # Legacy direct-helper behavior for callers that have not supplied variant evidence. metabolizer_phenotypes = { 'CYP2D6': ('Intermediate metabolizer', "Default metabolizer phenotype; no variant evidence supplied."), 'CYP2C19': ('Normal metabolizer', "Default metabolizer phenotype; no variant evidence supplied."), 'TPMT': ('Normal metabolizer', "Default metabolizer phenotype; no variant evidence supplied."), } for gene, phenotype_payload in metabolizer_phenotypes.items(): if gene in self.PHARMACOGENOMIC_GUIDANCE: phenotype, evidence_summary = phenotype_payload gene_info = self.PHARMACOGENOMIC_GUIDANCE[gene] phenotype_info = gene_info['phenotypes'].get(phenotype, {}) action = phenotype_info.get('action', 'Monitor') if demographic_context: action = f"{action}. {demographic_context}" guidance.append({ 'gene': gene, 'enzyme': gene_info['enzyme'], 'phenotype': phenotype, 'affected_drugs': gene_info['substrates'], 'action': action, 'risk': phenotype_info.get('risk', 'None'), 'evidence': evidence_summary, }) return guidance def _infer_pharmacogenomic_phenotypes( self, variant_evidence: Optional[List[Any]], ) -> Dict[str, Tuple[str, str]]: """Infer one conservative PGx phenotype per supported gene.""" if variant_evidence is None: return {} phenotypes: Dict[str, Tuple[str, str, int]] = {} severity_rank = { 'Ultra-rapid metabolizer': 5, 'Rapid metabolizer': 4, 'Poor metabolizer': 5, 'Low activity': 5, 'Intermediate metabolizer': 3, 'Intermediate activity': 3, 'High activity': 2, 'Normal metabolizer': 1, 'Normal activity': 1, } for raw_variant in variant_evidence: row = self._variant_to_mapping(raw_variant) gene = str(row.get('gene', '')).strip().upper() if gene not in self.PHARMACOGENOMIC_GUIDANCE: continue phenotype = self._extract_pgx_phenotype(row, gene) if not phenotype: continue evidence_summary = self._format_pgx_evidence(row) rank = severity_rank.get(phenotype, 1) current = phenotypes.get(gene) if current is None or rank > current[2]: phenotypes[gene] = (phenotype, evidence_summary, rank) return {gene: (payload[0], payload[1]) for gene, payload in phenotypes.items()} def _variant_to_mapping(self, variant: Any) -> Dict[str, Any]: if isinstance(variant, dict): return dict(variant) if hasattr(variant, 'to_dict'): return variant.to_dict() return { 'gene': getattr(variant, 'gene', ''), 'variant_id': getattr(variant, 'variant_id', ''), 'type': getattr(variant, 'type', ''), 'description': getattr(variant, 'description', ''), 'reference': getattr(variant, 'reference', ''), 'alternate': getattr(variant, 'alternate', ''), 'confidence': getattr(variant, 'confidence', None), 'pathogenicity_score': getattr(variant, 'pathogenicity_score', None), 'evidence_summary': getattr(variant, 'evidence_summary', ''), } def _extract_pgx_phenotype(self, row: Dict[str, Any], gene: str) -> str: valid_phenotypes = set(self.PHARMACOGENOMIC_GUIDANCE[gene]['phenotypes']) for key in ( 'pgx_phenotype', 'metabolizer_phenotype', 'predicted_phenotype', 'diplotype_phenotype', 'activity_phenotype', 'phenotype', ): normalized = self._normalize_pgx_phenotype(row.get(key), valid_phenotypes) if normalized: return normalized activity_score = _to_float(row.get('activity_score')) if activity_score is None: activity_score = _to_float(row.get('pgx_activity_score')) if activity_score is not None: score_based = self._phenotype_from_activity_score(gene, activity_score) if score_based: return score_based functional_text = " ".join( str(row.get(key, '')) for key in ( 'function', 'functional_status', 'clinical_significance', 'clinvar_significance', 'pathogenicity', 'type', 'consequence', 'description', 'evidence_summary', ) ).lower() variant_text = " ".join( str(row.get(key, '')) for key in ('variant_id', 'variant_key', 'id', 'hgvsc', 'hgvsp', 'protein', 'diplotype') ).lower() combined_text = f"{variant_text} {functional_text}" return self._phenotype_from_variant_text(gene, combined_text, row) def _normalize_pgx_phenotype(self, value: Any, valid_phenotypes: Set[str]) -> str: if value is None: return "" normalized = re.sub(r'[_-]+', ' ', str(value).strip().lower()) normalized = re.sub(r'\s+', ' ', normalized) if not normalized: return "" aliases = { 'ultrarapid metabolizer': 'Ultra-rapid metabolizer', 'ultra rapid metabolizer': 'Ultra-rapid metabolizer', 'rapid metabolizer': 'Rapid metabolizer', 'normal metabolizer': 'Normal metabolizer', 'extensive metabolizer': 'Normal metabolizer', 'intermediate metabolizer': 'Intermediate metabolizer', 'poor metabolizer': 'Poor metabolizer', 'high activity': 'High activity', 'normal activity': 'Normal activity', 'intermediate activity': 'Intermediate activity', 'low activity': 'Low activity', } phenotype = aliases.get(normalized) if phenotype in valid_phenotypes: return phenotype for candidate in valid_phenotypes: if normalized == candidate.lower(): return candidate return "" def _phenotype_from_activity_score(self, gene: str, activity_score: float) -> str: if gene == 'CYP2D6': if activity_score <= 0: return 'Poor metabolizer' if activity_score < 1.25: return 'Intermediate metabolizer' if activity_score <= 2.25: return 'Normal metabolizer' return 'Ultra-rapid metabolizer' if gene == 'CYP2C19': if activity_score <= 0: return 'Poor metabolizer' if activity_score < 1.5: return 'Intermediate metabolizer' if activity_score <= 2.25: return 'Normal metabolizer' return 'Rapid metabolizer' if gene in {'TPMT', 'VKORC1'}: if activity_score <= 0.5: return 'Low activity' if activity_score < 1.5: return 'Intermediate activity' return 'Normal activity' return "" def _phenotype_from_variant_text(self, gene: str, combined_text: str, row: Dict[str, Any]) -> str: has_homozygous_hint = any(token in combined_text for token in ('hom ', 'homozyg', '1/1', 'biallelic')) has_low_function_hint = any( token in combined_text for token in ( 'poor metabolizer', 'low activity', 'no function', 'loss of function', 'loss-of-function', 'lof', 'decreased function', 'reduced function', 'pathogenic', ) ) has_normal_hint = any(token in combined_text for token in ('normal function', 'normal metabolizer', 'normal activity')) if gene == 'CYP2D6': if any(token in combined_text for token in ('duplication', 'copy number gain', 'x2', '*1xn', '*2xn')): return 'Ultra-rapid metabolizer' if any(token in combined_text for token in ('*3', '*4', '*5', '*6', 'gene deletion', 'whole gene deletion')): return 'Poor metabolizer' if has_homozygous_hint else 'Intermediate metabolizer' if any(token in combined_text for token in ('*10', '*17', '*29', '*41')) or has_low_function_hint: return 'Intermediate metabolizer' if has_normal_hint or '*1' in combined_text or '*2' in combined_text: return 'Normal metabolizer' if gene == 'CYP2C19': if '*17' in combined_text or 'increased function' in combined_text: return 'Rapid metabolizer' if any(token in combined_text for token in ('*2', '*3', '*4', '*5', '*6', '*7', '*8')) or has_low_function_hint: return 'Poor metabolizer' if has_homozygous_hint else 'Intermediate metabolizer' if has_normal_hint or '*1' in combined_text: return 'Normal metabolizer' if gene == 'TPMT': if any(token in combined_text for token in ('*2', '*3a', '*3b', '*3c')) or has_low_function_hint: return 'Low activity' if has_homozygous_hint else 'Intermediate activity' if has_normal_hint or '*1' in combined_text: return 'Normal activity' if gene == 'VKORC1': alt = str(row.get('alt', row.get('alternate', ''))).strip().upper() if 'rs9923231' in combined_text or '-1639' in combined_text: if has_homozygous_hint or alt == 'A': return 'Low activity' return 'Intermediate activity' if has_low_function_hint: return 'Low activity' if has_homozygous_hint else 'Intermediate activity' if has_normal_hint: return 'Normal activity' return "" def _format_pgx_evidence(self, row: Dict[str, Any]) -> str: gene = str(row.get('gene', '')).strip().upper() variant_id = str( row.get('variant_id') or row.get('variant_key') or row.get('id') or row.get('hgvsc') or row.get('hgvsp') or 'detected variant' ).strip() evidence_bits = [f"{gene} {variant_id}".strip()] explicit_summary = str(row.get('evidence_summary', '')).strip() if explicit_summary: evidence_bits.append(explicit_summary) return "; ".join(_dedupe_preserve_order(evidence_bits)) class GenomeAnalysisEngine: """ Comprehensive genome analysis engine combining all components. Provides sequence-driven mutation analysis, biomarker detection, and personalized recommendations. Supports caching for performance optimization. """ def __init__(self, cache_manager=None, variant_scorer=None): self.cache_manager = cache_manager # Optional cache manager for persistent storage self.variant_scorer = variant_scorer self.mutation_analyzer = MutationAnalyzer(variant_pipeline=self.variant_scorer) self.biomarker_detector = BiomarkerDetector() self.multi_mutation_analyzer = MultiMutationAnalyzer() self.disease_mapper = DiseaseAssociationMapper() self.recommendation_engine = PersonalizedRecommendationEngine() def _generate_sequence_hash(self, sequence: str) -> str: """Generate a hash of the sequence for caching purposes""" return hashlib.md5(sequence.upper().encode()).hexdigest() def _build_cache_key( self, sequence: str, user_metadata: Optional[Dict], annotated_variants: Optional[List[Dict]], vcf_text: Optional[str] = None, reference_sequence: Optional[str] = None, biomarker_sequence: Optional[str] = None, interpretation_mode: Optional[str] = None, sample_id: Optional[str] = None, genome_build: Optional[str] = None, ) -> str: """Build a cache key scoped to sequence + user profile + recommendation logic version.""" sequence_hash = self._generate_sequence_hash(sequence) metadata = user_metadata or {} metadata_key = { "age": metadata.get("age"), "gender": _normalize_sex(metadata.get("gender")), "weight": _to_float(metadata.get("weight")), "height_cm": _to_float( metadata.get("height_cm") if metadata.get("height_cm") is not None else metadata.get("height") ), } annotated_hash = hashlib.md5( json.dumps(annotated_variants or [], sort_keys=True).encode("utf-8") ).hexdigest() vcf_hash = hashlib.md5((vcf_text or "").encode("utf-8")).hexdigest() reference_hash = self._generate_sequence_hash(reference_sequence or "") biomarker_hash = hashlib.md5((biomarker_sequence or "").encode("utf-8")).hexdigest() # Bump when variant evidence, biomarker detection, or recommendation rules change. recommendation_logic_version = "recommendation_v5_multi_mutation" profile_hash = hashlib.md5( json.dumps( { "metadata": metadata_key, "annotated_hash": annotated_hash, "vcf_hash": vcf_hash, "reference_hash": reference_hash, "biomarker_hash": biomarker_hash, "interpretation_mode": interpretation_mode, "sample_id": sample_id, "genome_build": genome_build, "multi_mutation_ruleset": self.multi_mutation_analyzer.ruleset_version, "logic_version": recommendation_logic_version, }, sort_keys=True, ).encode("utf-8") ).hexdigest() return f"genome_analysis_{sequence_hash}_{profile_hash}" def analyze_genome( self, sequence: str, user_metadata: Optional[Dict] = None, annotated_variants: Optional[List[Dict]] = None, vcf_text: Optional[str] = None, reference_sequence: Optional[str] = None, biomarker_sequence: Optional[str] = None, interpretation_mode: Optional[str] = None, sample_id: Optional[str] = None, genome_build: Optional[str] = None, ) -> Dict: """ Comprehensive genome analysis pipeline with caching support. Args: sequence: DNA sequence string user_metadata: User data (age, gender, weight, etc.) annotated_variants: Optional annotated/scored variants from a proper caller vcf_text: Optional VCF text to normalize/annotate before disease mapping reference_sequence: Optional short local reference for fallback variant calling biomarker_sequence: Optional unstripped input text for exploratory biomarker motifs Returns: Comprehensive analysis results """ if not user_metadata: user_metadata = {'age': 50, 'gender': 'Unknown', 'weight': 70, 'height_cm': 170} # Check cache if available if self.cache_manager: cache_key = self._build_cache_key( sequence, user_metadata, annotated_variants, vcf_text=vcf_text, reference_sequence=reference_sequence, biomarker_sequence=biomarker_sequence, interpretation_mode=interpretation_mode, sample_id=sample_id, genome_build=genome_build, ) cached_result = self.cache_manager.get(cache_key) if cached_result: return json.loads(cached_result) scored_variants_payload = None if annotated_variants: scored_variants_payload = self._score_annotated_variants(annotated_variants) prepared_variant_evidence = self.mutation_analyzer.prepare_variant_evidence( sequence=sequence, annotated_variants=scored_variants_payload, vcf_text=vcf_text, reference_sequence=reference_sequence, sample_id=sample_id, genome_build=genome_build, ) # Step 1: Mutation analysis variants = self.mutation_analyzer.analyze_mutations( sequence, annotated_variants=prepared_variant_evidence, sample_id=sample_id, genome_build=genome_build, allow_exploratory_kmers= not prepared_variant_evidence and not vcf_text and not reference_sequence, ) # Step 2: Biomarker detection has_structured_variant_source = ( annotated_variants is not None or vcf_text is not None or reference_sequence is not None ) biomarker_detection_sequence = biomarker_sequence if biomarker_sequence is not None else sequence biomarker_variants = prepared_variant_evidence if has_structured_variant_source else None biomarkers = self.biomarker_detector.detect_biomarkers( biomarker_detection_sequence, variants=biomarker_variants, ) multi_mutation_analysis = self.multi_mutation_analyzer.analyze( prepared_variant_evidence, interpretation_mode=interpretation_mode, sample_id=sample_id, ) # Step 3: Disease association mapping disease_associations = self.disease_mapper.map_disease_associations( variants, biomarkers, user_metadata ) suppressed_associations = list(self.disease_mapper.last_suppressed_associations) variant_prioritization = None if prepared_variant_evidence: variant_prioritization = { 'scored_variants': prepared_variant_evidence, 'total_variants': len(prepared_variant_evidence), 'high_confidence_variants': len([ row for row in prepared_variant_evidence if float(row.get('pathogenicity_score', 0.0) or 0.0) >= 0.75 ]), } # Step 4: Generate recommendations recommendations = self.recommendation_engine.generate_recommendations( disease_associations, user_metadata, suppressed_associations=suppressed_associations, variant_evidence=prepared_variant_evidence, ) # Compile results results = { 'sequence_analysis': { 'length': len(sequence), 'gc_content': self._calculate_gc_content(sequence), 'valid_nucleotides': sum(1 for c in sequence if c.upper() in 'ATCG') }, 'mutation_analysis': { 'detected_variants': [v.to_dict() for v in variants], 'total_variants': len(variants), 'high_risk_variants': len([v for v in variants if v.confidence > 0.85]) }, 'biomarker_detection': { 'detected_biomarkers': [b.to_dict() for b in biomarkers], 'total_biomarkers': len(biomarkers), 'therapeutic_targets': len([b for b in biomarkers if 'Therapeutic' in b.clinical_significance]) }, 'multi_mutation_analysis': multi_mutation_analysis, 'disease_associations': { 'associations': [assoc.to_dict() for assoc in disease_associations], 'high_confidence': len([a for a in disease_associations if a.confidence == ConfidenceLevel.VERY_HIGH]), 'moderate_confidence': len([a for a in disease_associations if a.confidence == ConfidenceLevel.HIGH]), 'suppressed_associations': suppressed_associations, }, 'variant_prioritization': variant_prioritization, 'recommendations': recommendations, 'suppressed_insights': suppressed_associations, 'analysis_metadata': { 'user_age': user_metadata.get('age'), 'user_gender': user_metadata.get('gender'), 'user_weight': user_metadata.get('weight'), 'user_height_cm': user_metadata.get('height_cm'), 'analysis_type': 'Research/Educational', 'interpretation_mode': interpretation_mode, 'selected_sample_id': sample_id, 'genome_build': genome_build, 'multi_mutation_ruleset_version': self.multi_mutation_analyzer.ruleset_version, } } # Cache results if cache manager available if self.cache_manager: try: self.cache_manager.set(cache_key, json.dumps(results)) except Exception: # Caching failure shouldn't break analysis pass return results def _score_annotated_variants(self, annotated_variants: List[Dict]) -> List[Dict]: """Score annotated variant payloads when a prioritizer is available.""" if not annotated_variants: return [] if self.variant_scorer is None: return [dict(variant) for variant in annotated_variants] try: return self.variant_scorer.score_variant_pathogenicity( [dict(variant) for variant in annotated_variants], use_prioritization=True, ) except Exception: return self.variant_scorer.score_variant_pathogenicity( [dict(variant) for variant in annotated_variants], use_prioritization=False, ) def _calculate_gc_content(self, sequence: str) -> float: """Calculate GC content percentage""" seq_upper = sequence.upper() gc_count = seq_upper.count('G') + seq_upper.count('C') total = len([c for c in seq_upper if c in 'ATGC']) return (gc_count / total * 100) if total > 0 else 0