"""VCF parsing and variant evidence normalization helpers.""" from __future__ import annotations import re import tempfile from pathlib import Path from typing import Any class VariantEvidenceError(ValueError): """Raised when structured variant evidence cannot be interpreted safely.""" def _format_genotype(gt: tuple[int | None, ...] | None, phased: bool) -> str: if not gt: return "" separator = "|" if phased else "/" return separator.join("." if allele is None else str(allele) for allele in gt) def _zygosity(gt: tuple[int | None, ...] | None) -> str: called = [allele for allele in (gt or ()) if allele is not None] if not called or all(allele == 0 for allele in called): return "reference" non_reference = [allele for allele in called if allele > 0] if len(non_reference) == len(called) and len(set(called)) == 1: return "homozygous" if non_reference: return "heterozygous" return "unknown" def _annotation_fields(record: Any) -> dict[str, str]: info = getattr(record, "INFO", None) or getattr(record, "info", {}) annotations = info.get("ANN") or info.get("CSQ") or () if isinstance(annotations, str): annotations = (annotations,) if not annotations: return {} parts = str(annotations[0]).split("|") return { "consequence": parts[1] if len(parts) > 1 else "", "gene": parts[3] if len(parts) > 3 else "", "hgvsc": parts[9] if len(parts) > 9 else "", "hgvsp": parts[10] if len(parts) > 10 else "", } class VCFPyParser: """Parse VCF text with vcfpy and preserve sample-level genotype evidence.""" def parse( self, vcf_text: str, *, sample_id: str | None = None, genome_build: str | None = None, ) -> dict[str, Any]: try: import vcfpy except ImportError as exc: # pragma: no cover - depends on environment packaging raise RuntimeError("VCF parsing requires the 'vcfpy' package") from exc if not str(vcf_text or "").strip(): raise VariantEvidenceError("VCF input is empty") path: Path | None = None try: with tempfile.NamedTemporaryFile( mode="w", suffix=".vcf", encoding="utf-8", delete=False ) as handle: handle.write(vcf_text) path = Path(handle.name) reader = vcfpy.Reader.from_path(str(path)) try: samples = list(reader.header.samples.names) selected_sample = sample_id.strip() if sample_id else None if selected_sample and selected_sample not in samples: raise VariantEvidenceError( f"Sample '{selected_sample}' is not present in the VCF" ) if len(samples) > 1 and not selected_sample: raise VariantEvidenceError( "VCF contains multiple samples; select exactly one sample for interpretation" ) if selected_sample is None and len(samples) == 1: selected_sample = samples[0] variants: list[dict[str, Any]] = [] for record in reader: annotation = _annotation_fields(record) sample = record.call_for_sample[selected_sample] if selected_sample else None sample_data = sample.data if sample is not None else {} raw_gt = str(sample_data.get("GT") or "") gt = tuple( None if allele == "." else int(allele) for allele in re.split(r"[/|]", raw_gt) if allele != "" ) phased = "|" in raw_gt ad = list(sample_data.get("AD") or []) dp = sample_data.get("DP") phase_set = sample_data.get("PS") for alt_index, alternate_record in enumerate(record.ALT or (), start=1): alternate = alternate_record.value alt_depth = ad[alt_index] if len(ad) > alt_index else None allele_fraction = ( float(alt_depth) / float(dp) if alt_depth is not None and dp not in (None, 0) else None ) variant_id = str( next(iter(record.ID or []), "") or annotation.get("hgvsp") or annotation.get("hgvsc") or "" ) if not variant_id: variant_id = f"{record.CHROM}:{record.POS}:{record.REF}>{alternate}" variants.append( { "chrom": str(record.CHROM), "pos": int(record.POS), "ref": str(record.REF), "alt": str(alternate), "variant_id": variant_id, "gene": annotation.get("gene", ""), "consequence": annotation.get("consequence", ""), "hgvsc": annotation.get("hgvsc", ""), "hgvsp": annotation.get("hgvsp", ""), "filter": ";".join(record.FILTER or []) or "PASS", "genotype": _format_genotype(gt, phased), "zygosity": _zygosity(gt), "phase_set": str(phase_set) if phase_set is not None else None, "allele_fraction": allele_fraction, "depth": int(dp) if dp is not None else None, "allele_depth": int(alt_depth) if alt_depth is not None else None, "sample_id": selected_sample, "genome_build": genome_build, "info": dict(record.INFO), } ) return {"variants": variants, "samples": samples, "selected_sample": selected_sample} finally: reader.close() except VariantEvidenceError: raise except Exception as exc: raise VariantEvidenceError(f"Unable to parse VCF: {exc}") from exc finally: if path is not None: path.unlink(missing_ok=True) # Backward-compatible import name used by earlier development builds. PysamVCFParser = VCFPyParser