# mypy: enable-error-code=var-annotated """ Sequence Analysis Suite - Comprehensive computational analysis of biological sequences Supports DNA, RNA, and protein sequences with multiple analysis tools. """ import re import io import math from typing import Any, Dict, List, Optional, Tuple, TypedDict from dataclasses import dataclass from collections import Counter, defaultdict import numpy as np from Bio import SeqIO, Align from Bio.Seq import Seq from Bio.SeqRecord import SeqRecord from Bio.Align import MultipleSeqAlignment from Bio.Phylo.TreeConstruction import DistanceCalculator, DistanceTreeConstructor from Bio.Phylo import write import networkx as nx import json import warnings warnings.filterwarnings('ignore') class AlignmentStats(TypedDict): alignment_length: int num_sequences: int conserved_positions: int conservation_percentage: float gap_positions: int gap_percentage: float class TreeMetadata(TypedDict, total=False): method: str num_taxa: int tree_length: float newick_format: str distance_method: str alignment_used: bool kmer_size: int class MotifAnnotation(TypedDict): motif: str start: int end: int length: int conservation: float sequences: list[str] @dataclass class Sequence: """Represents a biological sequence with metadata""" id: str sequence: str description: str = "" sequence_type: str = "unknown" # 'dna', 'rna', 'protein' def __post_init__(self) -> None: """Auto-detect sequence type if not specified""" if self.sequence_type == "unknown": self.sequence_type = self._detect_sequence_type() def _detect_sequence_type(self) -> str: """Detect if sequence is DNA, RNA, or protein""" seq_upper = self.sequence.upper().replace('-', '').replace('N', '').replace('X', '') if not seq_upper: return "unknown" # Check for RNA-specific nucleotides if 'U' in seq_upper and 'T' not in seq_upper: return "rna" # Check for DNA (T but no U) if 'T' in seq_upper and 'U' not in seq_upper: # Could be DNA or protein, check for protein-specific amino acids protein_chars = set('ACDEFGHIKLMNPQRSTVWY') if all(c in protein_chars for c in seq_upper): # Check if it's likely protein (has many non-ATGC chars) non_dna = len([c for c in seq_upper if c not in 'ATGC']) if non_dna > len(seq_upper) * 0.1: # More than 10% non-DNA chars return "protein" return "dna" # Check for protein amino acids protein_chars = set('ACDEFGHIKLMNPQRSTVWY') if all(c in protein_chars for c in seq_upper): return "protein" return "unknown" class FASTAParser: """Parse and validate FASTA format sequences""" @staticmethod def parse(file_content: str) -> list[Sequence]: """Parse FASTA content from string""" sequences = [] try: raw_content = file_content if isinstance(file_content, str) else file_content.read() # Handle both file-like objects and strings file_handle = io.StringIO(raw_content) try: for record in SeqIO.parse(file_handle, "fasta"): seq = Sequence( id=record.id, sequence=str(record.seq), description=record.description ) sequences.append(seq) except ValueError: sequences = [] if not sequences: plain_sequence = FASTAParser._parse_plain_sequence(raw_content) if plain_sequence: sequences.append( Sequence( id="Pasted_Sequence", sequence=plain_sequence, description="Plain sequence input", ) ) except Exception as e: raise ValueError(f"FASTA parsing error: {str(e)}") return sequences @staticmethod def _parse_plain_sequence(file_content: str) -> str: """Treat non-FASTA pasted text as one raw biological sequence.""" lines = [ line.strip() for line in str(file_content).splitlines() if line.strip() and not line.lstrip().startswith((">", ";", "#")) ] if not lines: return "" candidate = re.sub(r"\s+", "", "".join(lines)).upper() candidate = re.sub(r"[^A-Z*.-]", "", candidate) biological_symbols = set("ACDEFGHIKLMNPQRSTVWYUX*-.") if candidate and set(candidate) <= biological_symbols: return candidate.replace(".", "-") return "" @staticmethod def validate(sequences: list[Sequence]) -> tuple[bool, list[str]]: """Validate sequences and return (is_valid, error_messages)""" errors = [] if not sequences: errors.append("No sequences found in input") return False, errors # Check for empty sequences for seq in sequences: if not seq.sequence or len(seq.sequence.strip()) == 0: errors.append(f"Sequence {seq.id} is empty") # Check for invalid characters valid_dna = set('ATCGN-') valid_rna = set('AUCGN-') valid_protein = set('ACDEFGHIKLMNPQRSTVWYX*-') for seq in sequences: seq_upper = seq.sequence.upper() if seq.sequence_type == "dna": invalid = set(seq_upper) - valid_dna elif seq.sequence_type == "rna": invalid = set(seq_upper) - valid_rna elif seq.sequence_type == "protein": invalid = set(seq_upper) - valid_protein else: continue if invalid: errors.append(f"Sequence {seq.id} contains invalid characters: {invalid}") return len(errors) == 0, errors class MultipleSequenceAligner: """Perform Multiple Sequence Alignment using various algorithms""" def __init__(self, method: str = "clustalw") -> None: """ Initialize aligner Methods: 'clustalw', 'muscle', 'mafft', 'simple' """ self.method = method.lower() self.aligner = Align.PairwiseAligner() self.aligner.mode = 'global' self.aligner.match_score = 2.0 self.aligner.mismatch_score = -1.0 self.aligner.open_gap_score = -0.5 self.aligner.extend_gap_score = -0.1 def align(self, sequences: list[Sequence]) -> tuple[MultipleSeqAlignment, AlignmentStats]: """ Perform multiple sequence alignment Returns (alignment, metadata) """ if len(sequences) < 2: raise ValueError("Need at least 2 sequences for alignment") # Convert to BioPython SeqRecord objects seq_records = [] for seq in sequences: seq_rec = SeqRecord(Seq(seq.sequence), id=seq.id, description=seq.description) seq_records.append(seq_rec) # For now, use progressive alignment (simple method) # In production, would call external tools like MUSCLE or MAFFT alignment = self._progressive_align(seq_records) # Calculate alignment statistics metadata = self._calculate_alignment_stats(alignment) return alignment, metadata def _progressive_align(self, seq_records: list[SeqRecord]) -> MultipleSeqAlignment: """Progressively align sequences to the first sequence as a center.""" if len(seq_records) == 1: return MultipleSeqAlignment(seq_records) center_sequence = str(seq_records[0].seq) center_aligned = center_sequence aligned_sequences = [center_aligned] for seq_record in seq_records[1:]: pair_center, pair_query = self._pairwise_align_strings( center_sequence, str(seq_record.seq) ) center_aligned, aligned_sequences = self._merge_center_alignment( center_sequence, center_aligned, aligned_sequences, pair_center, pair_query, ) aligned_records = [ SeqRecord( Seq(aligned_seq), id=record.id, description=record.description, ) for record, aligned_seq in zip(seq_records, aligned_sequences) ] return MultipleSeqAlignment(aligned_records) def _pairwise_align_strings(self, reference: str, query: str) -> tuple[str, str]: """Return one optimal pairwise alignment without enumerating all optima.""" try: alignments = self.aligner.align(reference, query) alignment = next(iter(alignments)) aligned = self._alignment_to_strings(alignment) if len(aligned) == 2 and len(aligned[0]) == len(aligned[1]): return aligned[0], aligned[1] except Exception: pass max_len = max(len(reference), len(query)) return reference.ljust(max_len, "-"), query.ljust(max_len, "-") @staticmethod def _alignment_to_strings(alignment: Any) -> list[str]: """Extract gapped sequence strings from a Biopython pairwise alignment.""" aligned_sequences: list[str] = [] current: list[str] = [] for line in format(alignment, "fasta").splitlines(): if line.startswith(">"): if current: aligned_sequences.append("".join(current)) current = [] else: current.append(line.strip()) if current: aligned_sequences.append("".join(current)) return aligned_sequences @staticmethod def _split_center_columns( aligned_center: str, aligned_members: list[str], center_length: int ) -> tuple[list[list[list[str]]], list[list[str]]]: """Group alignment columns by center-sequence position.""" gap_columns = [[] for _ in range(center_length + 1)] residue_columns: list[list[str]] = [] center_pos = 0 for column_index, center_char in enumerate(aligned_center): column = [member[column_index] for member in aligned_members] if center_char == "-": gap_columns[center_pos].append(column) else: residue_columns.append(column) center_pos += 1 if len(residue_columns) != center_length: raise ValueError("Center alignment does not match the reference sequence") return gap_columns, residue_columns def _merge_center_alignment( self, center_sequence: str, center_aligned: str, aligned_sequences: list[str], pair_center: str, pair_query: str, ) -> tuple[str, list[str]]: """Merge a new pairwise center alignment into the growing MSA.""" center_length = len(center_sequence) existing_count = len(aligned_sequences) old_gaps, old_residues = self._split_center_columns( center_aligned, aligned_sequences, center_length ) new_gaps, new_residues = self._split_center_columns( pair_center, [pair_query], center_length ) merged_center: list[str] = [] merged_members: list[list[str]] = [[] for _ in range(existing_count + 1)] for pos in range(center_length + 1): gap_count = max(len(old_gaps[pos]), len(new_gaps[pos])) for gap_index in range(gap_count): merged_center.append("-") old_column = ( old_gaps[pos][gap_index] if gap_index < len(old_gaps[pos]) else ["-"] * existing_count ) new_char = ( new_gaps[pos][gap_index][0] if gap_index < len(new_gaps[pos]) else "-" ) for member_index, char in enumerate(old_column): merged_members[member_index].append(char) merged_members[-1].append(new_char) if pos < center_length: merged_center.append(center_sequence[pos]) old_column = old_residues[pos] new_char = new_residues[pos][0] for member_index, char in enumerate(old_column): merged_members[member_index].append(char) merged_members[-1].append(new_char) return "".join(merged_center), ["".join(member) for member in merged_members] def _calculate_alignment_stats(self, alignment: MultipleSeqAlignment) -> AlignmentStats: """Calculate alignment statistics""" if not alignment: return { "alignment_length": 0, "num_sequences": 0, "conserved_positions": 0, "conservation_percentage": 0.0, "gap_positions": 0, "gap_percentage": 0.0, } length = alignment.get_alignment_length() num_seqs = len(alignment) # Count conserved positions conserved = 0 gaps = 0 for i in range(length): column = alignment[:, i] # Check if all non-gap characters are the same non_gaps = [c for c in column if c != '-'] if len(non_gaps) > 0 and len(set(non_gaps)) == 1: conserved += 1 if '-' in column: gaps += 1 stats: AlignmentStats = { "alignment_length": length, "num_sequences": num_seqs, "conserved_positions": conserved, "conservation_percentage": (conserved / length * 100) if length > 0 else 0, "gap_positions": gaps, "gap_percentage": (gaps / length * 100) if length > 0 else 0 } return stats class PhylogeneticTreeBuilder: """Construct phylogenetic trees from aligned sequences""" def __init__(self, method: str = "neighbor_joining") -> None: """ Initialize tree builder Methods: 'neighbor_joining', 'upgma' """ self.method = method.lower() self.calculator = DistanceCalculator('identity') # For protein, use 'blosum62' def build_tree(self, alignment: MultipleSeqAlignment) -> tuple[str, TreeMetadata]: """ Build phylogenetic tree Returns (newick_string, metadata) """ if len(alignment) < 2: raise ValueError("Need at least 2 sequences for tree construction") # Calculate distance matrix try: dm = self.calculator.get_distance(alignment) except Exception as e: # Fallback to simple distance calculation dm = self._simple_distance_matrix(alignment) # Build tree constructor = DistanceTreeConstructor() if self.method == "neighbor_joining": tree = constructor.nj(dm) else: # UPGMA tree = constructor.upgma(dm) # Convert to Newick format from io import StringIO handle = StringIO() write([tree], handle, "newick") newick_string = handle.getvalue().strip() # Calculate tree statistics metadata: TreeMetadata = { "method": self.method, "num_taxa": len(alignment), "tree_length": sum(tree.depths().values()), "newick_format": newick_string, "distance_method": "aligned_identity", "alignment_used": True, } return newick_string, metadata def _simple_distance_matrix(self, alignment: MultipleSeqAlignment) -> Any: """Simple distance matrix calculation""" from Bio.Phylo.TreeConstruction import DistanceMatrix num_seqs = len(alignment) distances = [] for i in range(num_seqs): row = [] for j in range(i + 1): if i == j: row.append(0.0) else: # Calculate pairwise identity seq1 = str(alignment[i].seq) seq2 = str(alignment[j].seq) matches = sum(c1 == c2 and c1 != '-' for c1, c2 in zip(seq1, seq2)) total = sum(1 for c1, c2 in zip(seq1, seq2) if c1 != '-' or c2 != '-') identity = matches / total if total > 0 else 0.0 distance = 1.0 - identity row.append(distance) distances.append(row) names = [record.id for record in alignment] return DistanceMatrix(names, distances) def build_kmer_tree( self, sequences: list[Sequence], sequence_type: str ) -> tuple[str, TreeMetadata]: """Build an alignment-free neighbor-joining tree from exact k-mer sets.""" if len(sequences) < 2: raise ValueError("Need at least 2 sequences for tree construction") kmer_size = 5 if sequence_type == "protein" else 9 signatures = [ self._kmer_signature(seq.sequence, kmer_size, sequence_type) for seq in sequences ] names = self._unique_taxon_names([seq.id for seq in sequences]) distances: list[list[float]] = [] for i, signature in enumerate(signatures): row: list[float] = [] for j in range(i + 1): if i == j: row.append(0.0) continue other = signatures[j] union_size = len(signature | other) similarity = len(signature & other) / union_size if union_size else 1.0 row.append(1.0 - similarity) distances.append(row) from Bio.Phylo.TreeConstruction import DistanceMatrix tree = DistanceTreeConstructor().nj(DistanceMatrix(names, distances)) handle = io.StringIO() write([tree], handle, "newick") newick_string = handle.getvalue().strip() metadata: TreeMetadata = { "method": self.method, "num_taxa": len(sequences), "tree_length": sum(tree.depths().values()), "newick_format": newick_string, "distance_method": "kmer_jaccard", "alignment_used": False, "kmer_size": kmer_size, } return newick_string, metadata @staticmethod def _unique_taxon_names(names: list[str]) -> list[str]: """Return stable unique labels accepted by Biopython distance matrices.""" counts: dict[str, int] = {} unique: list[str] = [] for index, raw_name in enumerate(names, start=1): base = raw_name or f"Sequence_{index}" counts[base] = counts.get(base, 0) + 1 unique.append(base if counts[base] == 1 else f"{base}_{counts[base]}") return unique @staticmethod def _kmer_signature(sequence: str, k: int, sequence_type: str) -> set[Any]: """Encode unambiguous k-mers as integers to bound memory for long inputs.""" cleaned = sequence.upper().replace("-", "") if sequence_type == "rna": cleaned = cleaned.replace("U", "T") alphabet = "ACGT" if sequence_type in {"dna", "rna"} else "ACDEFGHIKLMNPQRSTVWY" symbols = {symbol: index for index, symbol in enumerate(alphabet)} if len(cleaned) < k: short = "".join(symbol for symbol in cleaned if symbol in symbols) return {short} if short else set() base = len(alphabet) high_place = base ** (k - 1) signature: set[Any] = set() value = 0 valid_length = 0 for symbol in cleaned: encoded = symbols.get(symbol) if encoded is None: value = 0 valid_length = 0 continue if valid_length < k: value = value * base + encoded valid_length += 1 if valid_length == k: signature.add(value) else: value = (value % high_place) * base + encoded signature.add(value) return signature class DomainIdentifier: """Identify functional domains in protein sequences""" def __init__(self) -> None: """Initialize domain identifier""" # Common protein domains patterns (simplified) # In production, would use Pfam HMMs or InterPro self.domain_patterns = { "Zinc Finger": [r"C.{2,4}C.{12,15}H.{3,5}H", r"H.{3,5}H.{12,15}C.{2,4}C"], "Helix-Turn-Helix": [r"[ILV].{5,7}[ILV].{5,7}[ILV]"], "Leucine Zipper": [r"L.{6}L.{6}L.{6}L"], "EF-hand": [r"D.{3}D.{3}[ILV].{6}[DE].{6}Y"], } def identify_domains(self, sequences: list[Sequence], alignment: Optional[MultipleSeqAlignment] = None) -> dict[str, list[dict[str, Any]]]: """ Identify domains in sequences Returns dict mapping sequence_id to list of domain annotations """ results = {} for seq in sequences: if seq.sequence_type != "protein": continue domains = [] seq_upper = seq.sequence.upper() # Search for known domain patterns for domain_name, patterns in self.domain_patterns.items(): for pattern in patterns: matches = re.finditer(pattern, seq_upper) for match in matches: domains.append({ "domain_name": domain_name, "start": match.start() + 1, # 1-indexed "end": match.end(), "sequence": match.group(), "confidence": 0.7, # Simplified confidence score "method": "pattern_match" }) # Remove overlapping domains (keep first) domains = self._remove_overlaps(domains) results[seq.id] = domains return results def _remove_overlaps(self, domains: list[dict[str, Any]]) -> list[dict[str, Any]]: """Remove overlapping domain annotations""" if not domains: return [] # Sort by start position sorted_domains = sorted(domains, key=lambda x: x["start"]) non_overlapping: list[dict[str, Any]] = [sorted_domains[0]] for domain in sorted_domains[1:]: last = non_overlapping[-1] # Check if overlapping if domain["start"] > last["end"]: non_overlapping.append(domain) elif domain["end"] - domain["start"] > last["end"] - last["start"]: # Replace if current domain is longer non_overlapping[-1] = domain return non_overlapping class MotifFinder: """Find conserved motifs in sequences""" def __init__(self, min_length: int = 4, max_length: int = 20) -> None: """Initialize motif finder""" self.min_length = min_length self.max_length = max_length def find_motifs(self, sequences: list[Sequence], alignment: Optional[MultipleSeqAlignment] = None) -> dict[str, Any]: """ Find conserved motifs Returns dict with motifs and their positions """ if alignment: return self._find_motifs_in_alignment(alignment) else: return self._find_motifs_in_sequences(sequences) def _find_motifs_in_alignment(self, alignment: MultipleSeqAlignment) -> dict[str, Any]: """Find motifs in aligned sequences""" motifs: list[dict[str, Any]] = [] length = alignment.get_alignment_length() # Look for conserved regions for window_size in range(self.min_length, min(self.max_length + 1, length // 2)): for start in range(length - window_size + 1): column = alignment[:, start:start + window_size] # Check conservation conserved_seqs = [] for i, seq in enumerate(column): seq_str = ''.join(seq) if '-' not in seq_str: conserved_seqs.append((i, seq_str)) if len(conserved_seqs) >= len(alignment) * 0.7: # 70% conservation # Check if all sequences have same pattern patterns = [seq for _, seq in conserved_seqs] if len(set(patterns)) == 1: motifs.append({ "motif": patterns[0], "start": start + 1, "end": start + window_size, "length": window_size, "conservation": len(conserved_seqs) / len(alignment), "sequences": [alignment[i].id for i, _ in conserved_seqs] }) # Remove duplicates and overlapping motifs motifs = self._deduplicate_motifs(motifs) return { "motifs": motifs, "num_motifs": len(motifs), "method": "alignment_based" } def _find_motifs_in_sequences(self, sequences: list[Sequence]) -> dict[str, Any]: """Find motifs using k-mer frequency analysis""" if len(sequences) < 2: return {"motifs": [], "num_motifs": 0, "method": "kmer_frequency"} # Count k-mers across all sequences kmer_counts: defaultdict[str, list[Tuple[str, int]]] = defaultdict(list) for k in range(self.min_length, self.max_length + 1): for seq in sequences: seq_upper = seq.sequence.upper().replace('-', '') for i in range(len(seq_upper) - k + 1): kmer = seq_upper[i:i+k] kmer_counts[kmer].append((seq.id, i + 1)) # Find k-mers present in multiple sequences motifs: list[dict[str, Any]] = [] for kmer, positions in kmer_counts.items(): unique_seqs = set(seq_id for seq_id, _ in positions) if len(unique_seqs) >= max(2, len(sequences) * 0.5): # Present in at least 50% of sequences motifs.append({ "motif": kmer, "length": len(kmer), "frequency": len(positions), "sequences": list(unique_seqs), "positions": positions[:10] # Limit positions }) # Sort by frequency motifs.sort(key=lambda x: x["frequency"], reverse=True) return { "motifs": motifs[:20], # Top 20 motifs "num_motifs": len(motifs), "method": "kmer_frequency" } def _deduplicate_motifs(self, motifs: list[dict[str, Any]]) -> list[dict[str, Any]]: """Remove duplicate and overlapping motifs""" if not motifs: return [] # Sort by length (longer first) and conservation sorted_motifs = sorted(motifs, key=lambda x: (x["length"], x.get("conservation", 0)), reverse=True) unique: list[dict[str, Any]] = [] for motif in sorted_motifs: # Check if overlaps with existing motifs overlaps = False for existing in unique: if (motif["start"] <= existing["end"] and motif["end"] >= existing["start"]): overlaps = True break if not overlaps: unique.append(motif) return unique class ConservationScorer: """Calculate conservation scores for aligned sequences""" def __init__(self, method: str = "shannon_entropy") -> None: """ Initialize conservation scorer Methods: 'shannon_entropy', 'simple_frequency' """ self.method = method.lower() def calculate_conservation(self, alignment: MultipleSeqAlignment) -> dict[str, Any]: """ Calculate conservation scores for each position Returns dict with position-wise scores and statistics """ if not alignment: return {} length = alignment.get_alignment_length() num_seqs = len(alignment) scores: list[float] = [] positions: list[dict[str, Any]] = [] for i in range(length): column = alignment[:, i] # Remove gaps non_gaps = [c for c in column if c != '-'] if not non_gaps: score = 0.0 # All gaps elif self.method == "shannon_entropy": score = self._shannon_entropy(non_gaps) else: # simple_frequency score = self._simple_frequency(non_gaps) scores.append(score) positions.append({ "position": i + 1, "score": score, "residues": dict(Counter(non_gaps)), "gap_count": column.count('-') }) # Calculate statistics scores_array = np.array(scores) results: dict[str, Any] = { "scores": positions, "mean_conservation": float(np.mean(scores_array)), "std_conservation": float(np.std(scores_array)), "min_conservation": float(np.min(scores_array)), "max_conservation": float(np.max(scores_array)), "highly_conserved_positions": [i + 1 for i, s in enumerate(scores) if s > np.percentile(scores_array, 90)], "method": self.method } return results def _shannon_entropy(self, residues: List[str]) -> float: """Calculate Shannon entropy (lower = more conserved)""" if not residues: return 0.0 counts = Counter(residues) total = len(residues) entropy = 0.0 for count in counts.values(): p = count / total if p > 0: entropy -= p * np.log2(p) # Normalize to 0-1 scale (1 = completely conserved, 0 = highly variable) max_entropy = np.log2(len(set(residues))) if len(set(residues)) > 1 else 0 if max_entropy > 0: conservation = 1.0 - (entropy / max_entropy) else: conservation = 1.0 return max(0.0, conservation) def _simple_frequency(self, residues: List[str]) -> float: """Simple frequency-based conservation (fraction of most common residue)""" if not residues: return 0.0 counts = Counter(residues) most_common_count = max(counts.values()) total = len(residues) return most_common_count / total if total > 0 else 0.0 class SequenceSpecificAnalyzer: """Generate per-sequence findings so FASTA reports reflect the submitted input.""" DNA_COMPLEMENT = str.maketrans("ATCGNatcgn", "TAGCNtagcn") RNA_COMPLEMENT = str.maketrans("AUCGNaucgn", "UAGCNuagcn") PROTEIN_MASSES = { "A": 89.09, "R": 174.20, "N": 132.12, "D": 133.10, "C": 121.15, "E": 147.13, "Q": 146.15, "G": 75.07, "H": 155.16, "I": 131.17, "L": 131.17, "K": 146.19, "M": 149.21, "F": 165.19, "P": 115.13, "S": 105.09, "T": 119.12, "W": 204.23, "Y": 181.19, "V": 117.15, } KYTE_DOOLITTLE = { "A": 1.8, "R": -4.5, "N": -3.5, "D": -3.5, "C": 2.5, "Q": -3.5, "E": -3.5, "G": -0.4, "H": -3.2, "I": 4.5, "L": 3.8, "K": -3.9, "M": 1.9, "F": 2.8, "P": -1.6, "S": -0.8, "T": -0.7, "W": -0.9, "Y": -1.3, "V": 4.2, } HYDROPHOBIC = set("AILMFWYV") POLAR = set("STNQCY") POSITIVE = set("KRH") NEGATIVE = set("DE") AROMATIC = set("FWY") def analyze(self, sequences: list[Sequence]) -> dict[str, dict[str, Any]]: """Return sequence-specific metrics and short findings keyed by FASTA ID.""" return {seq.id: self._analyze_one(seq) for seq in sequences} def _analyze_one(self, seq: Sequence) -> dict[str, Any]: clean = self._clean_sequence(seq.sequence) common = self._common_metrics(clean) if seq.sequence_type in {"dna", "rna"}: specific = self._analyze_nucleotide(clean, seq.sequence_type) elif seq.sequence_type == "protein": specific = self._analyze_protein(clean) else: specific = { "metrics": {}, "features": [], "composition": dict(Counter(clean)), "summary": "Sequence type could not be confidently classified; only common metrics were calculated.", } summary_points = self._build_summary(seq, common, specific) return { "id": seq.id, "type": seq.sequence_type, "length": len(clean), "common": common, **specific, "summary_points": summary_points, } @staticmethod def _clean_sequence(sequence: str) -> str: return "".join(c for c in sequence.upper() if c.isalpha() or c in {"-", "*"}) def _common_metrics(self, sequence: str) -> dict[str, Any]: ungapped = sequence.replace("-", "") counts = Counter(ungapped) length = len(ungapped) top_symbols = counts.most_common(5) return { "ungapped_length": length, "gap_count": sequence.count("-"), "terminal_stop_count": 1 if ungapped.endswith("*") else 0, "complexity": round(self._normalized_entropy(ungapped), 3), "longest_homopolymer": self._longest_homopolymer(ungapped), "top_symbols": [{"symbol": symbol, "count": count} for symbol, count in top_symbols], } def _analyze_nucleotide(self, sequence: str, sequence_type: str) -> dict[str, Any]: alphabet = "AUCGN" if sequence_type == "rna" else "ATCGN" seq = "".join(c for c in sequence if c in alphabet) length = len(seq) counts = Counter(seq) gc_count = counts["G"] + counts["C"] ambiguous_count = counts["N"] at_or_au_count = counts["A"] + (counts["U"] if sequence_type == "rna" else counts["T"]) start_codon = "AUG" if sequence_type == "rna" else "ATG" stop_codons = {"UAA", "UAG", "UGA"} if sequence_type == "rna" else {"TAA", "TAG", "TGA"} longest_orf = self._longest_orf(seq, start_codon, stop_codons) repeat_features = self._find_repeated_kmers(seq, k=6, limit=5) rev_comp = seq.translate( self.RNA_COMPLEMENT if sequence_type == "rna" else self.DNA_COMPLEMENT )[::-1] metrics = { "gc_percent": round((gc_count / length * 100) if length else 0.0, 2), "at_or_au_percent": round((at_or_au_count / length * 100) if length else 0.0, 2), "ambiguous_base_percent": round((ambiguous_count / length * 100) if length else 0.0, 2), "start_codon_count": self._count_overlapping(seq, start_codon), "stop_codon_count": sum(self._count_overlapping(seq, codon) for codon in stop_codons), "cpg_count": self._count_overlapping(seq.replace("U", "T"), "CG"), "longest_orf_length": longest_orf["length"], "longest_orf_frame": longest_orf["frame"], "reverse_complement_preview": rev_comp[:60], } features: list[dict[str, Any]] = [] if longest_orf["length"] >= 90: features.append( { "feature": "Longest ORF", "location": f"{longest_orf['start']}-{longest_orf['end']} ({longest_orf['frame']})", "detail": f"{longest_orf['length']} nt, {longest_orf['aa_length']} codons", } ) if repeat_features: features.extend(repeat_features) return { "metrics": metrics, "features": features, "composition": {base: counts.get(base, 0) for base in alphabet}, "summary": self._nucleotide_summary(seq, sequence_type, metrics, repeat_features), } def _analyze_protein(self, sequence: str) -> dict[str, Any]: seq = "".join(c for c in sequence if c in self.PROTEIN_MASSES or c in {"X", "*"}) residues = seq.replace("*", "") length = len(residues) counts = Counter(residues) hydrophobic_count = sum(counts[aa] for aa in self.HYDROPHOBIC) polar_count = sum(counts[aa] for aa in self.POLAR) positive_count = sum(counts[aa] for aa in self.POSITIVE) negative_count = sum(counts[aa] for aa in self.NEGATIVE) aromatic_count = sum(counts[aa] for aa in self.AROMATIC) known_residues = [aa for aa in residues if aa in self.PROTEIN_MASSES] mass = sum(self.PROTEIN_MASSES[aa] for aa in known_residues) if known_residues: mass -= 18.015 * max(len(known_residues) - 1, 0) gravy = ( sum(self.KYTE_DOOLITTLE.get(aa, 0.0) for aa in known_residues) / len(known_residues) if known_residues else 0.0 ) net_charge_proxy = positive_count - negative_count features = self._find_protein_features(residues) metrics = { "molecular_weight_kda": round(mass / 1000, 2), "hydrophobic_percent": round((hydrophobic_count / length * 100) if length else 0.0, 2), "polar_percent": round((polar_count / length * 100) if length else 0.0, 2), "charged_percent": round( ((positive_count + negative_count) / length * 100) if length else 0.0, 2 ), "positive_residues": positive_count, "negative_residues": negative_count, "net_charge_proxy": net_charge_proxy, "aromatic_percent": round((aromatic_count / length * 100) if length else 0.0, 2), "cysteine_count": counts["C"], "gravy": round(gravy, 3), "unknown_residue_percent": round((counts["X"] / length * 100) if length else 0.0, 2), } return { "metrics": metrics, "features": features, "composition": {aa: counts.get(aa, 0) for aa in sorted(self.PROTEIN_MASSES)}, "summary": self._protein_summary(residues, metrics, features), } def _build_summary( self, seq: Sequence, common: dict[str, Any], specific: dict[str, Any] ) -> list[str]: points = [ f"{seq.id} is classified as {seq.sequence_type} with {common['ungapped_length']} ungapped symbols.", specific.get("summary", ""), ] homopolymer = common.get("longest_homopolymer", {}) if homopolymer.get("length", 0) >= 6: points.append( f"Longest single-symbol run is {homopolymer['symbol']} x{homopolymer['length']} at positions {homopolymer['start']}-{homopolymer['end']}." ) if common.get("complexity", 0) < 0.55: points.append("Sequence complexity is low, so repetitive or composition-biased regions may dominate downstream signals.") return [point for point in points if point] def _nucleotide_summary( self, sequence: str, sequence_type: str, metrics: dict[str, Any], repeat_features: list[dict[str, Any]], ) -> str: molecule = "RNA" if sequence_type == "rna" else "DNA" gc = metrics["gc_percent"] if gc >= 60: gc_text = "GC-rich" elif gc <= 40: gc_text = "AT/AU-rich" else: gc_text = "balanced GC" orf_text = ( f"longest ORF is {metrics['longest_orf_length']} nt in {metrics['longest_orf_frame']}" if metrics["longest_orf_length"] else "no start-to-stop ORF was detected" ) repeat_text = f"; {len(repeat_features)} repeated 6-mers flagged" if repeat_features else "" return f"{molecule} profile is {gc_text} ({gc:.2f}% GC); {orf_text}{repeat_text}." def _protein_summary( self, sequence: str, metrics: dict[str, Any], features: list[dict[str, Any]] ) -> str: if metrics["gravy"] > 0.4: solubility_hint = "hydrophobic" elif metrics["gravy"] < -0.4: solubility_hint = "hydrophilic" else: solubility_hint = "mixed hydrophobic/hydrophilic" charge = metrics["net_charge_proxy"] feature_names = ", ".join(feature["feature"] for feature in features[:3]) feature_text = f" Detected features include {feature_names}." if feature_names else "" return ( f"Protein profile is {solubility_hint} with charge proxy {charge:+d}, " f"{metrics['hydrophobic_percent']:.2f}% hydrophobic residues, and " f"estimated mass {metrics['molecular_weight_kda']:.2f} kDa." f"{feature_text}" ) @staticmethod def _normalized_entropy(sequence: str) -> float: if not sequence: return 0.0 counts = Counter(sequence) entropy = 0.0 for count in counts.values(): p = count / len(sequence) entropy -= p * math.log2(p) max_entropy = math.log2(len(counts)) if len(counts) > 1 else 1.0 return entropy / max_entropy if max_entropy else 0.0 @staticmethod def _longest_homopolymer(sequence: str) -> dict[str, Any]: if not sequence: return {"symbol": "", "length": 0, "start": 0, "end": 0} best_symbol = sequence[0] best_length = 1 best_start = 1 current_symbol = sequence[0] current_start = 1 current_length = 1 for index, symbol in enumerate(sequence[1:], start=2): if symbol == current_symbol: current_length += 1 else: if current_length > best_length: best_symbol = current_symbol best_length = current_length best_start = current_start current_symbol = symbol current_start = index current_length = 1 if current_length > best_length: best_symbol = current_symbol best_length = current_length best_start = current_start return { "symbol": best_symbol, "length": best_length, "start": best_start, "end": best_start + best_length - 1, } @staticmethod def _count_overlapping(sequence: str, pattern: str) -> int: if not sequence or not pattern: return 0 return sum(1 for i in range(0, len(sequence) - len(pattern) + 1) if sequence[i : i + len(pattern)] == pattern) def _longest_orf( self, sequence: str, start_codon: str, stop_codons: set[str] ) -> dict[str, Any]: best = {"start": 0, "end": 0, "length": 0, "frame": "N/A", "aa_length": 0} for frame in range(3): active_start: Optional[int] = None for pos in range(frame, len(sequence) - 2, 3): codon = sequence[pos : pos + 3] if active_start is None and codon == start_codon: active_start = pos elif active_start is not None and codon in stop_codons: length = pos + 3 - active_start if length > best["length"]: best = { "start": active_start + 1, "end": pos + 3, "length": length, "frame": f"+{frame + 1}", "aa_length": length // 3, } active_start = None if active_start is not None: length = len(sequence) - active_start length -= length % 3 if length > best["length"]: best = { "start": active_start + 1, "end": active_start + length, "length": length, "frame": f"+{frame + 1}", "aa_length": length // 3, } return best def _find_repeated_kmers(self, sequence: str, k: int, limit: int) -> list[dict[str, Any]]: if len(sequence) < k: return [] positions: defaultdict[str, list[int]] = defaultdict(list) for i in range(len(sequence) - k + 1): kmer = sequence[i : i + k] if "N" not in kmer: positions[kmer].append(i + 1) repeated = [ { "feature": "Repeated k-mer", "location": ", ".join(map(str, sites[:5])), "detail": f"{kmer} occurs {len(sites)} times", "count": len(sites), } for kmer, sites in positions.items() if len(sites) >= 3 ] repeated.sort(key=lambda item: item["count"], reverse=True) return repeated[:limit] def _find_protein_features(self, sequence: str) -> list[dict[str, Any]]: features: list[dict[str, Any]] = [] features.extend(self._regex_features(sequence, "N-glycosylation motif", r"N[^P][ST][^P]")) features.extend(self._regex_features(sequence, "Cysteine pair", r"C.{2,8}C")) features.extend(self._regex_features(sequence, "Basic cleavage-like site", r"[KR][KR].{0,2}[KR]")) features.extend(self._low_complexity_features(sequence)) features.extend(self._hydrophobic_segment_features(sequence)) signal = self._signal_peptide_hint(sequence) if signal: features.append(signal) return features[:20] @staticmethod def _regex_features(sequence: str, name: str, pattern: str) -> list[dict[str, Any]]: return [ { "feature": name, "location": f"{match.start() + 1}-{match.end()}", "detail": match.group(), } for match in re.finditer(pattern, sequence) ] def _low_complexity_features(self, sequence: str) -> list[dict[str, Any]]: features = [] for match in re.finditer(r"([A-Z])\1{5,}", sequence): features.append( { "feature": "Low-complexity run", "location": f"{match.start() + 1}-{match.end()}", "detail": f"{match.group(1)} x{len(match.group())}", } ) return features def _hydrophobic_segment_features(self, sequence: str) -> list[dict[str, Any]]: features = [] window = 19 if len(sequence) < window: return features for start in range(0, len(sequence) - window + 1): fragment = sequence[start : start + window] avg = sum(self.KYTE_DOOLITTLE.get(aa, 0.0) for aa in fragment) / window hydrophobic_fraction = sum(1 for aa in fragment if aa in self.HYDROPHOBIC) / window if avg >= 1.6 and hydrophobic_fraction >= 0.65: features.append( { "feature": "Hydrophobic segment", "location": f"{start + 1}-{start + window}", "detail": f"avg hydropathy {avg:.2f}; possible transmembrane/span region", } ) if len(features) >= 5: break return features def _signal_peptide_hint(self, sequence: str) -> Optional[dict[str, Any]]: n_terminal = sequence[:30] if len(n_terminal) < 18: return None positively_charged = any(aa in self.POSITIVE for aa in n_terminal[:8]) hydrophobic_core = any( sum(1 for aa in n_terminal[start : start + 8] if aa in self.HYDROPHOBIC) >= 6 for start in range(0, max(len(n_terminal) - 7, 1)) ) if positively_charged and hydrophobic_core: return { "feature": "Signal peptide-like N-terminus", "location": "1-30", "detail": "basic N-region with hydrophobic core", } return None class SequenceAnalysisSuite: """Main class orchestrating all sequence analysis tools""" ALIGNMENT_MAX_SEQUENCE_LENGTH = 5_000 ALIGNMENT_MAX_TOTAL_SYMBOLS = 25_000 INTERACTIVE_MAX_SEQUENCES = 20 INTERACTIVE_MAX_SEQUENCE_LENGTH = 100_000 INTERACTIVE_MAX_TOTAL_SYMBOLS = 2_000_000 def __init__(self): """Initialize the analysis suite""" self.parser = FASTAParser() self.aligner = MultipleSequenceAligner() self.tree_builder = PhylogeneticTreeBuilder() self.domain_identifier = DomainIdentifier() self.motif_finder = MotifFinder() self.conservation_scorer = ConservationScorer() self.sequence_specific_analyzer = SequenceSpecificAnalyzer() def analyze(self, fasta_content: str, run_alignment: bool = True, run_phylogeny: bool = True, run_domains: bool = True, run_motifs: bool = True, run_conservation: bool = True) -> Dict[str, Any]: """ Run complete analysis pipeline Returns comprehensive analysis results """ results: Dict[str, Any] = { "input_sequences": [], "alignment": None, "phylogenetic_tree": None, "domains": {}, "motifs": {}, "conservation": {}, "sequence_insights": {}, "workload": {}, "warnings": [], "errors": [], } try: # 1. Parse sequences sequences = self.parser.parse(fasta_content) validation_result, errors = self.parser.validate(sequences) if not validation_result: results["errors"].extend(errors) return results results["input_sequences"] = [ { "id": seq.id, "description": seq.description, "length": len(seq.sequence), "type": seq.sequence_type } for seq in sequences ] results["sequence_insights"] = self.sequence_specific_analyzer.analyze(sequences) sequence_count = len(sequences) sequence_lengths = [len(seq.sequence) for seq in sequences] max_sequence_length = max(sequence_lengths, default=0) total_symbols = sum(sequence_lengths) sequence_types = {seq.sequence_type for seq in sequences} alignment_eligible = ( sequence_count >= 2 and max_sequence_length <= self.ALIGNMENT_MAX_SEQUENCE_LENGTH and total_symbols <= self.ALIGNMENT_MAX_TOTAL_SYMBOLS ) phylogeny_eligible = ( sequence_count >= 2 and sequence_count <= self.INTERACTIVE_MAX_SEQUENCES and max_sequence_length <= self.INTERACTIVE_MAX_SEQUENCE_LENGTH and total_symbols <= self.INTERACTIVE_MAX_TOTAL_SYMBOLS ) if sequence_count < 2: strategy = "single_sequence" elif alignment_eligible: strategy = "alignment" elif phylogeny_eligible: strategy = "kmer_fallback" else: strategy = "per_sequence_only" results["workload"] = { "sequence_count": sequence_count, "max_sequence_length": max_sequence_length, "total_symbols": total_symbols, "alignment_eligible": alignment_eligible, "phylogeny_eligible": phylogeny_eligible, "strategy": strategy, } # 2. Multiple Sequence Alignment alignment = None needs_alignment = ( run_alignment or run_conservation or run_phylogeny or run_motifs ) and alignment_eligible if needs_alignment: try: alignment, alignment_metadata = self.aligner.align(sequences) if run_alignment: results["alignment"] = { "aligned_sequences": [ { "id": record.id, "sequence": str(record.seq), "length": len(record.seq) } for record in alignment ], "metadata": alignment_metadata, "format": "fasta_aligned" } except Exception as e: results["errors"].append(f"Alignment failed: {str(e)}") elif run_alignment and sequence_count < 2: results["warnings"].append( f"Alignment skipped: at least 2 sequences are required; parsed {len(sequences)}." ) elif run_alignment and not alignment_eligible: results["warnings"].append( "Multiple sequence alignment skipped because the full input exceeds the " f"interactive alignment envelope ({self.ALIGNMENT_MAX_SEQUENCE_LENGTH:,} " f"symbols per sequence or {self.ALIGNMENT_MAX_TOTAL_SYMBOLS:,} total)." ) # 3. Conservation Scoring (requires alignment) if run_conservation and alignment: try: conservation_results = self.conservation_scorer.calculate_conservation(alignment) results["conservation"] = conservation_results except Exception as e: results["errors"].append(f"Conservation scoring failed: {str(e)}") elif run_conservation: results["warnings"].append( "Conservation scoring skipped because a multiple sequence alignment is unavailable." ) # 4. Domain Identification if run_domains: try: domains = self.domain_identifier.identify_domains(sequences, alignment) results["domains"] = domains except Exception as e: results["errors"].append(f"Domain identification failed: {str(e)}") # 5. Motif Finding if run_motifs and (alignment or sequence_count < 2): try: motifs = self.motif_finder.find_motifs(sequences, alignment) results["motifs"] = motifs except Exception as e: results["errors"].append(f"Motif finding failed: {str(e)}") elif run_motifs: results["warnings"].append( "Alignment-based motif finding skipped because a multiple sequence alignment " "is unavailable." ) # 6. Phylogenetic Tree Construction if run_phylogeny and sequence_count < 2: results["warnings"].append( f"Phylogenetic tree skipped: at least 2 sequences are required; parsed {sequence_count}." ) elif run_phylogeny and (len(sequence_types) != 1 or "unknown" in sequence_types): results["warnings"].append( "Phylogenetic tree skipped because all sequences must have the same detected " "molecule type." ) elif run_phylogeny and not phylogeny_eligible: results["warnings"].append( "Phylogenetic tree skipped because the input exceeds the interactive envelope " f"({self.INTERACTIVE_MAX_SEQUENCES} sequences, " f"{self.INTERACTIVE_MAX_SEQUENCE_LENGTH:,} symbols per sequence, or " f"{self.INTERACTIVE_MAX_TOTAL_SYMBOLS:,} total symbols)." ) elif run_phylogeny and alignment: try: newick_tree, tree_metadata = self.tree_builder.build_tree(alignment) results["phylogenetic_tree"] = { "newick": newick_tree, "metadata": tree_metadata } except Exception as e: results["errors"].append(f"Phylogenetic tree construction failed: {str(e)}") elif run_phylogeny: try: sequence_type = next(iter(sequence_types)) newick_tree, tree_metadata = self.tree_builder.build_kmer_tree( sequences, sequence_type ) results["phylogenetic_tree"] = { "newick": newick_tree, "metadata": tree_metadata, } results["warnings"].append( "The phylogenetic tree uses full-sequence k-mer Jaccard distances because " "multiple sequence alignment was unavailable." ) except Exception as e: results["errors"].append(f"Phylogenetic tree construction failed: {str(e)}") except Exception as e: results["errors"].append(f"Analysis pipeline error: {str(e)}") return results def generate_report(self, results: Dict) -> str: """Generate a comprehensive text report from analysis results""" report = [] report.append("=" * 80) report.append("SEQUENCE ANALYSIS REPORT") report.append("=" * 80) report.append("") # Input sequences report.append("INPUT SEQUENCES") report.append("-" * 80) for seq in results.get("input_sequences", []): report.append(f"ID: {seq['id']}") report.append(f" Type: {seq['type']}") report.append(f" Length: {seq['length']} residues") report.append(f" Description: {seq.get('description', 'N/A')}") report.append("") if results.get("workload"): workload = results["workload"] report.append("WORKLOAD STRATEGY") report.append("-" * 80) report.append(f"Strategy: {workload.get('strategy', 'N/A')}") report.append(f"Total Symbols: {workload.get('total_symbols', 0)}") report.append("") # Alignment if results.get("alignment"): align_data = results["alignment"] report.append("MULTIPLE SEQUENCE ALIGNMENT") report.append("-" * 80) metadata = align_data.get("metadata", {}) report.append(f"Alignment Length: {metadata.get('alignment_length', 'N/A')}") report.append(f"Number of Sequences: {metadata.get('num_sequences', 'N/A')}") report.append(f"Conserved Positions: {metadata.get('conserved_positions', 'N/A')}") report.append(f"Conservation: {metadata.get('conservation_percentage', 0):.2f}%") report.append("") # Sequence-specific insights if results.get("sequence_insights"): report.append("SEQUENCE-SPECIFIC INSIGHTS") report.append("-" * 80) for seq_id, insight in results["sequence_insights"].items(): report.append(f"Sequence: {seq_id}") for point in insight.get("summary_points", []): report.append(f" - {point}") metrics = insight.get("metrics", {}) if metrics: metric_text = ", ".join( f"{key}: {value}" for key, value in metrics.items() if not isinstance(value, (dict, list)) ) if metric_text: report.append(f" Metrics: {metric_text}") features = insight.get("features", []) if features: report.append(" Features:") for feature in features[:10]: report.append( f" - {feature.get('feature', 'Feature')} at {feature.get('location', 'N/A')}: {feature.get('detail', '')}" ) report.append("") # Conservation if results.get("conservation"): cons_data = results["conservation"] report.append("CONSERVATION ANALYSIS") report.append("-" * 80) report.append(f"Mean Conservation Score: {cons_data.get('mean_conservation', 0):.4f}") report.append(f"Highly Conserved Positions (>90th percentile): {len(cons_data.get('highly_conserved_positions', []))}") report.append("") # Domains if results.get("domains"): report.append("DOMAIN IDENTIFICATION") report.append("-" * 80) for seq_id, domains in results["domains"].items(): if domains: report.append(f"Sequence: {seq_id}") for domain in domains: report.append(f" {domain['domain_name']}: positions {domain['start']}-{domain['end']} (confidence: {domain['confidence']:.2f})") report.append("") # Motifs if results.get("motifs"): motifs_data = results["motifs"] report.append("MOTIF ANALYSIS") report.append("-" * 80) report.append(f"Number of Motifs Found: {motifs_data.get('num_motifs', 0)}") report.append(f"Method: {motifs_data.get('method', 'N/A')}") for motif in motifs_data.get("motifs", [])[:10]: # Top 10 report.append(f" Motif: {motif.get('motif', 'N/A')} (length: {motif.get('length', 'N/A')})") report.append("") # Phylogenetic Tree if results.get("phylogenetic_tree"): tree_data = results["phylogenetic_tree"] report.append("PHYLOGENETIC TREE") report.append("-" * 80) metadata = tree_data.get("metadata", {}) report.append(f"Method: {metadata.get('method', 'N/A')}") report.append(f"Distance Method: {metadata.get('distance_method', 'N/A')}") report.append(f"Alignment Used: {metadata.get('alignment_used', 'N/A')}") report.append(f"Newick Format:") report.append(tree_data.get("newick", "N/A")) report.append("") if results.get("warnings"): report.append("WARNINGS") report.append("-" * 80) for warning in results["warnings"]: report.append(f" - {warning}") report.append("") # Errors if results.get("errors"): report.append("ERRORS") report.append("-" * 80) for error in results["errors"]: report.append(f" - {error}") report.append("") report.append("=" * 80) report.append("End of Report") report.append("=" * 80) return "\n".join(report)