# mypy: enable-error-code=var-annotated # app.py - Streamlit protein analysis application import streamlit as st import streamlit.components.v1 as components import pandas as pd import plotly.graph_objects as go import plotly.express as px from typing import Any, Dict, List, Optional, TypedDict import os import time from datetime import datetime import json import re import hashlib import httpx import requests import textwrap from xml.etree import ElementTree as ET import html as html_lib import urllib.parse import logging import math from vcf_upload_utils import decode_uploaded_vcf_bytes import auth import state_manager from backend_router import get_active_backend_url_for_session, render_backend_status_badge from app_environment import get_environment # Configure structured logging logger = logging.getLogger(__name__) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) def create_log_context(event_name: str, **details: Any) -> Dict[str, Any]: """Build safe logging context fields for logger extra payloads.""" context: Dict[str, Any] = {"context_event": event_name} for key, value in details.items(): normalized_key = key if key.startswith("context_") else f"context_{key}" context[normalized_key] = value return context # Cache management: persistent SQLite caching and Streamlit native cache operations def get_progress_message(stage_label: str, status: str, completed: int, total: int, failed_count: int = 0, app_env=None) -> str: """ Generate user-friendly progress message, hiding technical details in production. Args: stage_label: Human-readable stage name status: Status type (success, warning, timeout, error, info) completed: Number of completed stages total: Total number of stages failed_count: Number of failed stages (production: hidden) app_env: AppEnvironment instance Returns: Production: Generic user-facing message Development: Detailed technical message """ if app_env is None: app_env = get_environment() if app_env.is_production(): # Production: generic, user-friendly messages if status == "success": return "✅ Analysis complete. Loading results..." elif status == "warning": return "⏳ Processing request... Some data could not be loaded." elif status == "timeout": return "⏳ Request is taking longer than expected. Displaying partial results..." elif status == "error": return "⏳ Processing request... Displaying available results." else: return "⏳ Analyzing protein. This may take a moment..." else: # Development: detailed technical messages if status == "success": if failed_count: return f"✅ {stage_label} complete with {failed_count} issue(s). {completed}/{total} sections loaded." else: return f"✅ {stage_label} complete. {completed}/{total} sections loaded." elif status == "warning": return f"⚠️ {stage_label} complete with {failed_count} stage issue(s). {completed}/{total} sections loaded." elif status == "timeout": return f"⏱️ {stage_label} timed out after {completed}/{total} sections. Rendering what we have." elif status == "error": return f"⚠️ {stage_label} failed. Rendering remaining sections." else: return f"**{stage_label}** ready. {completed}/{total} sections loaded." from cache_manager import ( CacheManager, clear_app_cache, cached_search_uniprot, cached_fetch_uniprot_data, cached_run_blast_search, cached_fetch_embl_sequence, cached_run_needle_alignment, cached_fetch_similar_compounds, cached_fetch_pubchem_structure, cached_predict_ligand_binding, cached_predict_protein_localization, ) # Protein visualization: interactive Plotly charts for protein analysis from visualizations import ProteinVisualizer # Protein API client: UniProt, HPA, and related external API integrations from api_client import ProteinAPIClient, get_drug_metadata from drug_repurposing_engine import DrugRepurposingEngine from sequence_analysis import SequenceAnalysisSuite, FASTAParser from genome_analysis_engine import GenomeAnalysisEngine from portfolio_engine import PortfolioEngine from backend.utils.clinical_trials import build_clinicaltrials_url, normalize_nct_id from backend.utils.common import run_async_safe, stream_async_safe from legal_pages import ( LEGAL_PAGE_SLUGS, render_data_security, render_disclaimer, render_footer, render_privacy_policy, render_terms, ) try: from streamlit.runtime.scriptrunner.script_runner import RerunException except Exception: RerunException = None _NCT_PATTERN = re.compile(r"\bNCT\d{8}\b", re.IGNORECASE) class OmniBiMolContextPayload(TypedDict): protein_profile: Dict[str, Any] structure_data: Any pathway_data: Any ppi_data: Any ligand_binding_data: Any docking_data: Any repurposing_data: Any genome_risk_data: Any pubmed_evidence: list[Any] clinical_trials_evidence: list[Any] OMNIBIMOL_REQUIRED_CONTEXT_KEYS = [ "protein_profile", "structure_data", "pathway_data", "ppi_data", "ligand_binding_data", "docking_data", "repurposing_data", "genome_risk_data", "pubmed_evidence", "clinical_trials_evidence", ] OMNIBIMOL_RESEARCH_COPILOT_SYSTEM_PROMPT = textwrap.dedent(""" You are OmniBiMol AI Research Copilot, a domain-aware biomedical assistant for protein evidence synthesis, ligandability analysis, and translational hypothesis generation. MISSION - Help researchers interpret protein evidence and identify practical validation paths. - Produce evidence-grounded, uncertainty-aware outputs using ONLY provided internal analysis results and cited external evidence (PubMed, ClinicalTrials, curated databases). - Never present speculation as fact. OPERATING RULES 1) Grounding First - Use internal computed artifacts as primary context: - protein annotation, sequence features, structure confidence, PPI, pathways, ligandability, docking outputs, repurposing network, genome risk outputs. - Use external evidence second: - PubMed abstracts/summaries, ClinicalTrials records, approved drug metadata. - Every key claim must include evidence tags: - [Internal:] and/or [PubMed:] and/or [Trial:]. - If evidence is missing or weak, explicitly say so. 2) Strict Data Boundaries - Do not fabricate PMIDs, NCT IDs, values, proteins, pathways, or mutations. - If data is unavailable, return: "Insufficient evidence with current context." - Distinguish: - Observed (from provided data), - Inferred (reasoned from observations), - Hypothesis (testable but unproven). 3) Scientific Rigor - Report confidence per conclusion: High / Medium / Low with rationale. - Mention conflicting evidence when present. - Highlight limitations (sample size, simulated docking, model assumptions, missing assay data). - Avoid clinical recommendations for patient care; provide research-use guidance only. 4) Output Quality - Be concise, structured, and decision-oriented. - Prefer ranked lists and clear next actions. - Include risk flags and potential failure modes. RESPONSE MODES A) If user asks "Why is this protein ligandable?" Return sections: 1. Verdict (1-2 lines) 2. Evidence for Druggability 3. Evidence Against / Gaps 4. Confidence + Why 5. Next 3 Experiments 6. Risk Flags 7. Citations B) If user asks for "hypothesis cards" Generate 3-5 cards in this template: - Hypothesis: - Rationale: - Supporting Evidence: - Disconfirming Evidence: - Minimal Experiment: - Readout / Success Criteria: - Priority: High/Med/Low - Risk Level: High/Med/Low - Confidence: High/Med/Low - Citations: C) If user asks for "experimental next steps" Return: - Immediate (1-2 weeks), Near-term (1-2 months), Later (quarter) - For each step: objective, assay/model, expected signal, go/no-go threshold, key risk. D) If user asks for "risk flags" Return categorized flags: - Biological risk - Translational risk - Data quality risk - Model/simulation risk - Regulatory/clinical feasibility risk Each with severity (High/Med/Low) and mitigation. DECISION HEURISTICS (apply transparently) - Favor proteins with convergent support across >=3 independent evidence types. - Downgrade confidence when core support depends on simulated/synthetic outputs. - Boost priority when: - tractable binding pocket evidence, - pathway centrality + disease relevance, - supportive human genetics/biomarkers, - existing chemical matter and trial activity. - Penalize when: - contradictory biology, - poor selectivity risk, - weak translatability or no viable assay path. STYLE - Audience: biomedical researchers and biotech decision-makers. - Tone: analytical, pragmatic, non-hyped. - Use bullet points and short paragraphs. - Always end with: - "What would increase confidence most?" (top 3 missing data items). INPUT CONTRACT (expected context variables) - protein_profile - structure_data - pathway_data - ppi_data - ligand_binding_data - docking_data - repurposing_data - genome_risk_data - pubmed_evidence - clinical_trials_evidence If any are missing, list them under "Missing Context". SAFETY - Research support only; not medical advice. - If user requests treatment decisions for a patient, refuse and suggest consulting a licensed clinician. """).strip() def get_missing_omnibimol_context(context_payload: Optional[Dict[str, Any]]) -> List[str]: """Return context keys missing from the OmniBiMol copilot input contract.""" payload = context_payload or {} missing_keys: List[str] = [] for key in OMNIBIMOL_REQUIRED_CONTEXT_KEYS: value = payload.get(key) if value is None: missing_keys.append(key) continue if isinstance(value, dict): if not value: missing_keys.append(key) continue if "available" in value and not value.get("available"): missing_keys.append(key) continue if isinstance(value, list) and len(value) == 0: missing_keys.append(key) return missing_keys def _is_patient_treatment_request(user_query: str) -> bool: query = (user_query or "").lower() treatment_terms = [ "patient", "treatment", "dose", "dosage", "prescribe", "which drug should", "what should i take", "therapy recommendation", ] return any(term in query for term in treatment_terms) def _infer_omnibimol_mode(user_query: str) -> str: query = (user_query or "").lower() if "hypothesis card" in query or "hypothesis cards" in query: return "hypothesis_cards" if "experimental next steps" in query or "next steps" in query: return "experimental_next_steps" if "risk flags" in query: return "risk_flags" if "why is this protein ligandable" in query or ("why" in query and "ligandable" in query): return "druggable_why" return "druggable_why" def _build_omnibimol_context_payload( data: Dict[str, Any], uniprot_data: Dict[str, Any] ) -> OmniBiMolContextPayload: literature = data.get("literature", {}) return { "protein_profile": { "uniprot_id": uniprot_data.get("uniprot_id"), "gene_name": uniprot_data.get("gene_name"), "protein_name": uniprot_data.get("protein_name"), "function": uniprot_data.get("function"), "sequence_length": uniprot_data.get("sequence_length"), "go_terms": uniprot_data.get("go_terms", {}), }, "structure_data": data.get("alphafold_structure") or data.get("pdb_structure"), "pathway_data": data.get("kegg_pathways"), "ppi_data": data.get("string_ppi"), "ligand_binding_data": data.get("chembl_ligands"), "docking_data": st.session_state.get("docking_results"), "repurposing_data": st.session_state.get("repurposing_report_data"), "genome_risk_data": st.session_state.get("genome_analysis_results"), "pubmed_evidence": literature.get("papers", []), "clinical_trials_evidence": data.get("clinical_trials", []), } def _generate_omnibimol_copilot_response(user_query: str, context_payload: Dict[str, Any]) -> str: mode = _infer_omnibimol_mode(user_query) missing_context = get_missing_omnibimol_context(context_payload) pubmed_entries = context_payload.get("pubmed_evidence", []) or [] pubmed_pmids = [str(p.get("pmid")) for p in pubmed_entries if p.get("pmid")] trial_entries = context_payload.get("clinical_trials_evidence", []) or [] trial_ids = [] for trial in trial_entries: trial_id = _extract_nct_id(trial if isinstance(trial, dict) else {}) if trial_id: trial_ids.append(trial_id) has_structure = isinstance(context_payload.get("structure_data"), dict) and context_payload.get( "structure_data", {} ).get("available") has_pathways = isinstance(context_payload.get("pathway_data"), dict) and context_payload.get( "pathway_data", {} ).get("available") has_ppi = isinstance(context_payload.get("ppi_data"), dict) and context_payload.get( "ppi_data", {} ).get("available") has_ligands = isinstance( context_payload.get("ligand_binding_data"), dict ) and context_payload.get("ligand_binding_data", {}).get("available") has_docking = bool(context_payload.get("docking_data")) has_genetics = bool(context_payload.get("genome_risk_data")) has_repurposing = bool(context_payload.get("repurposing_data")) evidence_types = sum( [ bool(has_structure), bool(has_pathways), bool(has_ppi), bool(has_ligands), bool(has_docking), bool(has_genetics), bool(has_repurposing), bool(pubmed_pmids), bool(trial_ids), ] ) confidence = "Low" confidence_rationale = "Fewer than 3 independent evidence types are available." if evidence_types >= 5: confidence = "High" confidence_rationale = "Convergent support is present across multiple independent internal and external evidence types." elif evidence_types >= 3: confidence = "Medium" confidence_rationale = ( "At least 3 independent evidence types are present, but important uncertainty remains." ) if has_docking and not (pubmed_pmids or trial_ids or has_genetics): confidence = "Low" confidence_rationale = ( "Core support is dominated by simulated outputs without enough orthogonal validation." ) if _is_patient_treatment_request(user_query): lines = [ "Research support only; I cannot provide patient-specific treatment recommendations.", "Please consult a licensed clinician for patient-care questions.", "", "## Missing Context", *( (f"- {k}" for k in missing_context) if missing_context else ["- None identified from the required contract."] ), "", "What would increase confidence most?", "- Prospectively validated clinical outcome data linked to this protein.", "- Orthogonal functional assays in disease-relevant models.", "- Curated human genetics evidence with effect size and directionality.", ] return "\n".join(lines) if evidence_types == 0: return "\n".join( [ "Insufficient evidence with current context.", "", "## Missing Context", *( (f"- {k}" for k in missing_context) if missing_context else ["- Required context artifacts are unavailable in the current session."] ), "", "What would increase confidence most?", "- Any protein-level internal artifact (structure/pathway/PPI/ligandability).", "- PubMed evidence with extractable PMIDs.", "- Clinical trial records with valid NCT identifiers.", ] ) # Build concise, evidence-linked snippets instead of generic placeholders. citations: List[str] = [] def _structure_snippet(sd: dict) -> Optional[str]: if not sd or not isinstance(sd, dict): return None if sd.get("available"): # Prefer experimental PDB entries when present if sd.get("structures"): top = sd.get("structures")[0] return f"- Structure: Experimental PDB {top.get('pdb_id', 'N/A')} ({top.get('method', 'Unknown')})" model_version = sd.get("model_version") if model_version: return f"- Structure: AlphaFold v{model_version} prediction (page: {sd.get('alphafold_page', 'link')})" return "- Structure: predicted model available" return None def _pathway_snippet(pd: dict) -> Optional[str]: if not pd or not isinstance(pd, dict) or not pd.get("available"): return None first = pd.get("first_result") or (pd.get("pathways") or [])[:1] name = first.get("pathway_name") if isinstance(first, dict) else None return f"- Pathway: {name or 'Mapped pathway evidence present'}" def _ppi_snippet(pp: dict) -> Optional[str]: if not pp or not isinstance(pp, dict) or not pp.get("available"): return None count = len(pp.get("interactions", [])) if pp.get("interactions") else pp.get("count") or 0 return f"- PPI: {count} interactions detected (string-db)" def _ligand_snippet(ld: dict) -> Optional[str]: if not ld or not isinstance(ld, dict) or not ld.get("available"): return None ligs = ld.get("ligands") or [] if not ligs: return None top = ligs[0] name = top.get("name") or top.get("chembl_id") act = top.get("activity_value") units = top.get("activity_units") or "" return f"- Ligand: {name} (best activity {act}{units})" def _docking_snippet(dd: dict) -> Optional[str]: if not dd: return None if dd.get("simulated"): return "- Docking: simulated binding modes available" if dd.get("available"): return "- Docking: experimental docking job completed" return None # Compose concise citation/evidence lines for fn, obj in [ (_structure_snippet, context_payload.get("structure_data")), (_pathway_snippet, context_payload.get("pathway_data")), (_ppi_snippet, context_payload.get("ppi_data")), (_ligand_snippet, context_payload.get("ligand_binding_data")), (_docking_snippet, context_payload.get("docking_data")), ]: try: s = fn(obj) except Exception: s = None if s: citations.append(s) # Add human external evidence references (PubMed / Trials) for pmid in pubmed_pmids[:5]: citations.append(f"- PubMed:{pmid}") for nct in trial_ids[:5]: citations.append(f"- Trial:{nct}") if not citations: citations.append("- Insufficient evidence with current context.") if mode == "hypothesis_cards": cards: List[str] = ["## Hypothesis Cards"] for idx in range(1, 4): cards.extend( [ f"### Card {idx}", f"- Hypothesis: Protein perturbation modulates disease-relevant biology through mechanism pathway #{idx}.", "- Rationale: Convergent internal signals suggest tractability and disease coupling. [Internal:protein_profile] [Internal:pathway_data]", "- Supporting Evidence: Structure/pathway/PPI/ligandability evidence available in session-specific artifacts.", "- Disconfirming Evidence: Contradictory biology and weak translatability remain plausible due to incomplete orthogonal validation.", "- Minimal Experiment: Perturb the protein in disease-relevant cells, then quantify pathway marker shift and viability.", "- Readout / Success Criteria: >=20% pathway marker shift with acceptable viability window versus control.", f"- Priority: {'High' if idx == 1 else 'Med'}", f"- Risk Level: {'Med' if evidence_types >= 3 else 'High'}", f"- Confidence: {confidence}", "- Citations: [Internal:protein_profile] [Internal:pathway_data] [Internal:ppi_data]", "", ] ) cards.append("## Missing Context") if missing_context: cards.extend([f"- {k}" for k in missing_context]) else: cards.append("- None identified from the required contract.") cards.extend( [ "", "What would increase confidence most?", f"- Missing artifacts: {', '.join(missing_context[:3]) if missing_context else 'No critical artifacts missing; next gains are from orthogonal validation.'}", "- Matched perturbation + rescue experiment in disease-relevant model.", "- Confirmatory external evidence (additional PMIDs / active trials).", ] ) return "\n".join(cards) if mode == "experimental_next_steps": lines = [ "## Experimental Next Steps", "- Immediate (1-2 weeks): objective=validate target engagement; assay/model=biochemical binding + rapid cellular perturbation; expected signal=directional biomarker shift; go/no-go=predefined potency/engagement threshold met; key risk=assay artifact. [Internal:ligand_binding_data] [Internal:docking_data]", "- Near-term (1-2 months): objective=establish mechanism and selectivity; assay/model=orthogonal cell models and pathway panels; expected signal=consistent pathway modulation; go/no-go=reproducible effect across models; key risk=off-target confounding. [Internal:pathway_data] [Internal:ppi_data]", "- Later (quarter): objective=translational confidence; assay/model=in vivo/advanced model + biomarker strategy; expected signal=efficacy-linked biomarker movement; go/no-go=effect size and exposure margins acceptable; key risk=poor translatability. [Internal:protein_profile]", ] elif mode == "risk_flags": lines = [ "## Risk Flags", f"- Biological risk: severity={'High' if not has_pathways else 'Med'}; mitigation=orthogonal pathway perturbation and rescue assays. [Internal:pathway_data]", f"- Translational risk: severity={'High' if not has_genetics else 'Med'}; mitigation=human genetics/biomarker triangulation.", f"- Data quality risk: severity={'High' if len(missing_context) >= 4 else 'Med'}; mitigation=complete missing contract artifacts and provenance checks.", f"- Model/simulation risk: severity={'High' if has_docking and not pubmed_pmids else 'Med'}; mitigation=prioritize orthogonal confirmation of docking-derived claims. [Internal:docking_data]", f"- Regulatory/clinical feasibility risk: severity={'High' if not trial_ids else 'Med'}; mitigation=map indication precedent and trial landscape. [Trial:{trial_ids[0]}]" if trial_ids else "- Regulatory/clinical feasibility risk: severity=High; mitigation=map indication precedent and trial landscape.", ] else: # Compose evidence for the summary using concise snippets built above internal_evidence = [c for c in citations if not c.startswith("- PubMed:") and not c.startswith("- Trial:")] if not internal_evidence: internal_evidence = [ "- Structural support is limited in the current context.", "- Pathway / PPI evidence is weak or unavailable.", ] lines = [ "## Verdict (1-2 lines)", "Protein shows conditional ligandability for research review; it is not yet de-risked for translational commitment.", "", "## Evidence for Druggability", *internal_evidence, "", "## Evidence Against / Gaps", "- Contradictory biology and selectivity risks cannot be excluded from current evidence alone.", "- Core support may rely on simulated outputs; external orthogonal validation is recommended.", "- Missing assay-level evidence constrains translatability confidence.", "", "## Confidence + Why", f"- {confidence}: {confidence_rationale}", "", "## Next 3 Experiments", "- Orthogonal engagement assay in a disease-relevant cellular model with predefined go/no-go potency.", "- Mechanism-of-action test (perturbation + rescue) to validate causal pathway linkage.", "- Early selectivity profiling across a relevant off-target panel.", "", "## Risk Flags", "- Biological risk: pathway compensation may mask expected responses.", "- Model/simulation risk: docking-derived claims require biochemical confirmation.", "- Translational risk: biomarker/genetics support may be incomplete.", "", "## Citations", *citations, ] lines.extend(["", "## Missing Context"]) if missing_context: lines.extend([f"- {key}" for key in missing_context]) else: lines.append("- None identified from the required contract.") lines.extend( [ "", "What would increase confidence most?", f"- Missing artifacts: {', '.join(missing_context[:3]) if missing_context else 'No critical artifact missing; prioritize orthogonal validation quality.'}", "- Prospective orthogonal validation in disease-relevant model systems.", "- Additional external support from PubMed/ClinicalTrials tied to this protein/indication.", ] ) return "\n".join(lines) def _extract_nct_id(trial: Dict[str, Any]) -> Optional[str]: for key in ("nct_id", "trial_id", "nctId", "nct", "id", "nct_number"): nct_id = normalize_nct_id(trial.get(key)) if nct_id: return nct_id for key in ("url", "link", "clinicaltrials_url"): value = str(trial.get(key) or "") match = _NCT_PATTERN.search(value) nct_id = normalize_nct_id(match.group(0) if match else None) if nct_id: return nct_id title_match = _NCT_PATTERN.search(str(trial.get("title") or "")) return normalize_nct_id(title_match.group(0) if title_match else None) def _get_docking_mode_choice() -> str: default_mode = "Use Real Docking (Beta)" if os.getenv("DOCKING_MODE_DEFAULT", "real").lower() != "real" or os.getenv( "DOCKING_ENABLED", "true" ).lower() not in {"1", "true", "yes"}: default_mode = "Use Simulation" return st.session_state.get("docking_mode_choice", default_mode) def _get_docking_mode_value() -> str: return "real" if _get_docking_mode_choice() == "Use Real Docking (Beta)" else "simulation" def _render_docking_site_controls(key_prefix: str) -> Dict[str, Any]: candidates = st.session_state.get(f"{key_prefix}_pocket_candidates", []) or [] labels = { "Automatic pocket discovery": "auto", "Manual grid": "manual", "Whole receptor (thorough)": "whole_receptor", } if candidates: labels["Previously detected pocket"] = "selected" choice = st.selectbox( "Docking site", list(labels), key=f"{key_prefix}_site_mode", help="Automatic performs a quick broad search and refines a ranked pocket.", ) site_mode = labels[choice] pocket_id = None grid = None if site_mode == "selected": candidate_by_label = { f"#{item.get('rank', index + 1)} {item.get('label', item.get('pocket_id', 'Pocket'))}": item for index, item in enumerate(candidates) } selected_label = st.selectbox( "Specific pocket", list(candidate_by_label), key=f"{key_prefix}_pocket_id" ) selected = candidate_by_label[selected_label] pocket_id = selected.get("pocket_id") grid = selected.get("grid") or { "center": selected.get("center"), "size": selected.get("size"), } elif site_mode == "manual": st.caption("Grid center and dimensions are in angstroms.") center_cols = st.columns(3) size_cols = st.columns(3) center = { axis: center_cols[index].number_input( f"Center {axis.upper()}", value=0.0, key=f"{key_prefix}_center_{axis}" ) for index, axis in enumerate(("x", "y", "z")) } size = { axis: size_cols[index].number_input( f"Size {axis.upper()}", min_value=8.0, max_value=60.0, value=22.0, step=1.0, key=f"{key_prefix}_size_{axis}", ) for index, axis in enumerate(("x", "y", "z")) } grid = {"center": center, "size": size} return {"site_mode": site_mode, "pocket_id": pocket_id, "grid": grid} def _render_docking_pocket_summary(result: Dict[str, Any], key_prefix: str) -> None: candidates = result.get("pocket_candidates") or [] if candidates: st.session_state[f"{key_prefix}_pocket_candidates"] = candidates selected = result.get("selected_pocket") or {} grid = selected.get("grid") or result.get("grid") or result.get("docking_box") or {} if selected: st.markdown(f"**Selected site:** {selected.get('label', selected.get('pocket_id', 'Pocket'))}") st.caption( f"Source: {selected.get('source', 'unknown')} | Confidence: " f"{float(selected.get('confidence', 0.0)):.2f} | " f"Nearby residues: {', '.join(selected.get('nearby_residues') or []) or 'Not available'}" ) if candidates: st.dataframe( [ { "Pocket": item.get("pocket_id"), "Source": item.get("source"), "Confidence": item.get("confidence"), "Center": ", ".join( f"{axis.upper()}={float(item.get('center', {}).get(axis, 0)):.2f}" for axis in ("x", "y", "z") ), "Grid": ", ".join( f"{axis.upper()}={float(item.get('size', {}).get(axis, 0)):.2f}" for axis in ("x", "y", "z") ), "Volume (ų)": item.get("estimated_volume"), "Nearby residues": ", ".join(item.get("nearby_residues") or []), } for item in candidates ], width="stretch", hide_index=True, ) if grid.get("center") and grid.get("size"): center, size = grid["center"], grid["size"] st.code( "Center: " + ", ".join(f"{axis.upper()}={float(center[axis]):.2f}" for axis in ("x", "y", "z")) + "\nGrid: " + ", ".join(f"{axis.upper()}={float(size[axis]):.2f}" for axis in ("x", "y", "z")), language=None, ) best_mode = result.get("best_mode") or {} if best_mode: if int(best_mode.get("mode", 1)) == 1: st.caption("Vina RMSD: 0.00 Å for mode 1, the reference pose generated by this run.") elif best_mode.get("rmsd_lb") is not None and best_mode.get("rmsd_ub") is not None: st.caption( f"Vina RMSD bounds relative to the best generated pose: " f"{float(best_mode['rmsd_lb']):.2f}–{float(best_mode['rmsd_ub']):.2f} Å." ) else: st.caption("Vina RMSD bounds: N/A") if result.get("reference_rmsd") is not None: st.caption( f"Experimental heavy-atom RMSD: {float(result['reference_rmsd']):.3f} Å " f"({result.get('matched_atom_count', 0)} matched atoms; no ligand superposition)." ) else: st.caption( result.get("reference_rmsd_message") or "Experimental RMSD unavailable: no matching reference ligand." ) timings = result.get("timings") or {} if timings: st.caption( "Docking timing: " + ", ".join( f"{name.replace('_seconds', '').replace('_', ' ')} {float(value):.2f}s" for name, value in timings.items() if name.endswith("_seconds") ) ) def _validated_docking_affinity(result: Dict[str, Any]) -> Optional[float]: try: affinity = float(result.get("binding_affinity")) except (TypeError, ValueError): return None return affinity if math.isfinite(affinity) else None def _docking_strength(affinity: Optional[float]) -> tuple[str, str]: if affinity is None: return "Unavailable", "#64748b" if affinity < -7: return "Strong", "#28a745" if affinity < -5: return "Moderate", "#b7791f" return "Weak", "#dc3545" def _render_docking_mode_details(result: Dict[str, Any]) -> None: rows = [] simulated = bool(result.get("simulated")) score_column = "Simulated score (kcal/mol)" if simulated else "Vina affinity (kcal/mol)" for mode in result.get("modes") or []: center = mode.get("center") or {} row = { "Mode": mode.get("mode"), score_column: mode.get("affinity"), "Center X (Å)": center.get("x"), "Center Y (Å)": center.get("y"), "Center Z (Å)": center.get("z"), } if not simulated: row.update( { "Vina RMSD lower (Å)": mode.get("rmsd_lb"), "Vina RMSD upper (Å)": mode.get("rmsd_ub"), "Intermolecular (kcal/mol)": mode.get("intermolecular_energy"), "Intramolecular (kcal/mol)": mode.get("intramolecular_energy"), "Torsional (kcal/mol)": mode.get("torsional_energy"), } ) rows.append(row) if not rows: st.info("No validated docking modes are available yet.") return st.dataframe(pd.DataFrame(rows), width="stretch", hide_index=True) if simulated: st.caption("Simulation scores are illustrative and do not include ligand pose coordinates or RMSD.") else: st.caption( "Affinity and energy terms are reported by AutoDock Vina. Vina RMSD bounds compare " "each generated mode with the best generated mode; they are not experimental RMSD." ) def _run_shared_docking( *, protein_prep: Dict[str, Any], selected_ligand: Dict[str, Any], ligand_name: str, protein_length: int, exhaustiveness: int, num_modes: int, energy_range: int, site_mode: str = "auto", pocket_id: str | None = None, grid: Dict[str, Any] | None = None, ) -> Dict[str, Any]: docking_protein = dict(protein_prep) feature_payload = st.session_state.get("embl_features") or {} if not docking_protein.get("features") and isinstance(feature_payload, dict): features = feature_payload.get("features") if isinstance(features, list): docking_protein["features"] = features return st.session_state.api_client.run_docking_workflow( protein_prep=docking_protein, ligand_data=selected_ligand, ligand_name=ligand_name, protein_length=protein_length, ligand_mw=float( selected_ligand.get("molecular_weight", selected_ligand.get("mw", 300)) or 300 ), activity_value=selected_ligand.get("activity_value"), mode=_get_docking_mode_value(), exhaustiveness=exhaustiveness, num_modes=num_modes, energy_range=energy_range, site_mode=site_mode, pocket_id=pocket_id, grid=grid, ) def _refresh_real_docking_result_if_needed(result: Dict[str, Any]) -> Dict[str, Any]: if not result or result.get("simulated"): return result job_id = result.get("job_id") current_status = str(result.get("status") or result.get("job_status") or "").lower().strip() if current_status not in {"queued", "running"} or not job_id: return result try: job_status = st.session_state.api_client.poll_docking_job(int(job_id)) except Exception: return result refreshed_status = str(job_status.get("status") or current_status).lower().strip() if refreshed_status in {"queued", "running"}: refreshed = dict(result) refreshed["status"] = refreshed_status refreshed["job_status"] = refreshed_status return refreshed if refreshed_status == "completed": payload = st.session_state.api_client.normalize_docking_result( job_status.get("result_payload") or {} ) payload.update( { "job_id": int(job_id), "job_status": "completed", "status": "completed", "job_url": f"{st.session_state.api_client.backend_api_url}/api/v1/jobs/{int(job_id)}", } ) return payload if refreshed_status == "failed": payload = st.session_state.api_client.normalize_docking_result( job_status.get("result_payload") or {} ) failure_reason = ( str(job_status.get("error_message") or "").strip() or str(payload.get("error_message") or "").strip() or str(payload.get("fallback_reason") or "").strip() or "Real docking job failed" ) payload.update( { "available": False, "mode": "real", "simulated": False, "status": "failed", "job_status": "failed", "job_id": int(job_id), "job_url": f"{st.session_state.api_client.backend_api_url}/api/v1/jobs/{int(job_id)}", "binding_affinity": None, "modes": [], "best_mode": {}, "has_coordinates": False, "fallback_reason": failure_reason, "error_message": failure_reason, } ) return payload return result def _format_phase(phase: Optional[str]) -> str: if not phase: return "N/A" phase_upper = str(phase).upper() if phase_upper in ("N/A", "NA"): return "N/A" if phase_upper == "EARLY_PHASE1": return "Early Phase 1" if phase_upper.startswith("PHASE"): if "_" in phase_upper: parts = [p for p in phase_upper.split("_") if p.startswith("PHASE")] numbers = [ p.replace("PHASE", "").strip() for p in parts if p.replace("PHASE", "").strip() ] if numbers: return f"Phase {'/'.join(numbers)}" num = phase_upper.replace("PHASE", "").strip() if num: return f"Phase {num}" return str(phase).replace("_", " ").title() def _format_status(status: Optional[str]) -> str: if not status: return "Unknown" return str(status).replace("_", " ").title() def render_kegg_interactive_pathway( first_result: Dict[str, Any], kegg_protein_id: Optional[str] = None ) -> None: """ Render an interactive KEGG pathway map using the official PNG image + KGML overlay. - Preserves original KEGG layout (no redraw) - Adds hover tooltips and click-through links for genes/proteins/enzymes - Gracefully falls back to static image if anything fails """ pathway_id = first_result.get("pathway_id") image_url = first_result.get("kegg_image_url") pathway_name = first_result.get("pathway_name", "") if not pathway_id or not image_url: # Fallback to static image if required fields are missing st.image( image_url or "", width="stretch", caption=f"{pathway_name} - Visual representation from KEGG", ) return kgml_url = f"https://rest.kegg.jp/get/{pathway_id}/kgml" try: resp = httpx.get(kgml_url, timeout=20.0) resp.raise_for_status() kgml_xml = resp.text except Exception: # If KGML fetch fails, keep existing static behaviour st.info( "Interactive KEGG map is temporarily unavailable. Showing static pathway image instead." ) st.image( image_url, width="stretch", caption=f"{pathway_name} - Visual representation from KEGG" ) return # Parse KGML entries for genes/proteins/enzymes try: root = ET.fromstring(kgml_xml) except Exception: st.info("Could not parse KEGG KGML for this pathway. Showing static pathway image instead.") st.image( image_url, width="stretch", caption=f"{pathway_name} - Visual representation from KEGG" ) return interactive_entries: List[Dict] = [] for entry in root.findall("entry"): etype = entry.get("type", "") # Focus on biological entities; ignore purely graphical/map entries if etype not in ("gene", "ortholog", "enzyme", "compound"): continue graphics = entry.find("graphics") if graphics is None: continue try: x = float(graphics.get("x", "0")) y = float(graphics.get("y", "0")) w = float(graphics.get("width", "0")) h = float(graphics.get("height", "0")) except ValueError: continue if w == 0 or h == 0: continue # KEGG entry name typically contains one or more IDs, e.g. "hsa:1234 hsa:5678" entry_name = entry.get("name", "") graphics_label = graphics.get("name") or entry_name # Try to derive a short symbol and description from the label symbol = graphics_label description = "" if graphics_label and " " in graphics_label: parts = graphics_label.split(",")[0].split(" ", 1) symbol = parts[0] if len(parts) > 1: description = parts[1] # Build KEGG link; fall back to dbget-bin if link attribute is missing link = entry.get("link", "") if not link and entry_name: entry_tokens = [token for token in entry_name.split() if token] link_query = "+".join(entry_tokens) if link_query: link = f"https://www.kegg.jp/dbget-bin/www_bget?{link_query}" is_highlight = False if kegg_protein_id and entry_name: # Highlight if this gene box includes the current protein's KEGG ID if kegg_protein_id in entry_name.split(): is_highlight = True interactive_entries.append( { "id": entry.get("id", ""), "etype": etype, "kegg_ids": entry_name, "label": graphics_label, "symbol": symbol, "description": description, "x": x, "y": y, "width": w, "height": h, "link": link, "highlight": is_highlight, } ) if not interactive_entries: # Nothing to overlay; show static image st.image( image_url, width="stretch", caption=f"{pathway_name} - Visual representation from KEGG" ) return # Prepare JSON payload for client-side JavaScript try: entries_json = json.dumps(interactive_entries) except TypeError: # Fallback: no interactivity if JSON serialization fails st.image( image_url, width="stretch", caption=f"{pathway_name} - Visual representation from KEGG" ) return escaped_image_url = html_lib.escape(image_url, quote=True) escaped_title = html_lib.escape(pathway_name, quote=True) html_content = f"""
{escaped_title}
Select a pathway element
Hover over a highlighted region to preview its symbol. Click a region to pin details here.
""" # Render raw component HTML directly; iframe() expects a URL, not an HTML document. st.components.v1.html(html_content, height=720, scrolling=True) # app.py - Main Streamlit application def main(): """OmniBiMol""" # Page configuration st.set_page_config( page_title="OmniBiMol - Protein Analysis Platform", page_icon="icons/Omnibimol_logo.png", layout="wide", initial_sidebar_state="expanded", ) # Determine if current page is a legal page to conditionally bypass auth page_param_map = { **LEGAL_PAGE_SLUGS, } page_param = ( str(st.query_params.get("page", "")).strip().lower() if hasattr(st, "query_params") else "" ) if page_param in page_param_map: st.session_state.current_page = page_param_map[page_param] legal_page_renderers = { "Privacy Policy": render_privacy_policy, "Terms of Service": render_terms, "Disclaimer": render_disclaimer, "Data Security": render_data_security, } PUBLIC_PAGES = set(legal_page_renderers) current_page = st.session_state.get("current_page", "Whole Genome Sequencing") if current_page not in PUBLIC_PAGES: # Enforce Firebase Authentication auth.require_auth() # Custom CSS for professional bioinformatics styling st.markdown( """ """, unsafe_allow_html=True, ) # Header with banner st.image("icons/Omnibimol_banner.png") # ── STEP 1: Bootstrap infrastructure (must precede routing) ──────────── if "cache_manager" not in st.session_state: st.session_state.cache_manager = CacheManager() active_backend_url = str(get_active_backend_url_for_session()) if ( "api_client" not in st.session_state or not hasattr(st.session_state.api_client, "fetch_clinical_trials_by_drug") or st.session_state.get("active_backend_url") != active_backend_url ): st.session_state.api_client = ProteinAPIClient( st.session_state.cache_manager, backend_api_url=active_backend_url, ) st.session_state.active_backend_url = active_backend_url if "portfolio_engine" not in st.session_state: st.session_state.portfolio_engine = PortfolioEngine() if "disable_phase_cleanup" not in st.session_state: st.session_state.disable_phase_cleanup = True # ── STEP 2: Phase routing — MUST run before ANY state reads ──────────── # Keep routing state in a non-widget key so reruns triggered before widget # re-render do not drop the selected phase back to default. _PHASES_EARLY = [ ("Genomics & Variant Discovery", ["Whole Genome Sequencing", "Sequence Analysis"]), ("Protein Analysis Suite", ["Protein Analysis", "🧬 Universal Gene Pipeline"]), ("Drug-Target Interaction & Matching", ["Drugs & Clinical Trials"]), ] _page_to_phase_idx = { module: idx for idx, (_title, modules) in enumerate(_PHASES_EARLY) for module in modules } if "selected_phase_idx" not in st.session_state: current_page_for_phase = st.session_state.get("current_page", "Whole Genome Sequencing") st.session_state.selected_phase_idx = _page_to_phase_idx.get(current_page_for_phase, 0) if "phase_selector" in st.session_state and isinstance(st.session_state.get("phase_selector"), int): st.session_state.selected_phase_idx = st.session_state.get("phase_selector") _early_phase_idx = max( 0, min( int(st.session_state.get("selected_phase_idx", 0)), len(_PHASES_EARLY) - 1, ), ) _early_phase_title, _ = _PHASES_EARLY[_early_phase_idx] # gc_stale_phases() + audit + transition + st.rerun()-if-changed state_manager.handle_phase_routing(_early_phase_title) # If we reach here the phase is stable; safe to read any state below. # ────────────────────────────────────────────────────────────────────── # ── STEP 3: Legal page short-circuit (no stale risk after routing) ───── current_page = st.session_state.get("current_page", "Whole Genome Sequencing") if current_page in legal_page_renderers: legal_page_renderers[current_page]() render_footer() return # ── STEP 4: Define phased workflow (sidebar + dispatcher) ────────────── PHASES = [ ("Genomics & Variant Discovery", ["Whole Genome Sequencing", "Sequence Analysis"]), ("Protein Analysis Suite", ["Protein Analysis", "🧬 Universal Gene Pipeline"]), ("Drug-Target Interaction & Matching", ["Drugs & Clinical Trials"]), ] # Sidebar (Phase-based hierarchical navigation) with st.sidebar: st.image("icons/Omnibimol_transparent_bg.png", width='stretch') st.divider() auth.render_user_profile() # ── Backend status badge (zero network I/O; reads cached snapshot) ── render_backend_status_badge() st.divider() current_page = st.session_state.get("current_page", "Whole Genome Sequencing") # Compact workflow indicator in the sidebar st.markdown("**Workflow**") st.markdown("[Genomics] → [Protein] → [Interactions]") st.divider() # Find current phase for initial selection current_phase_idx = 0 for idx, (phase_title, modules) in enumerate(PHASES): if current_page in modules: current_phase_idx = idx break # Radio buttons for phase selection with callback phase_titles = [title for title, _ in PHASES] def on_phase_change(): """Callback when phase selector radio button changes""" # Update current_page to the first module in the selected phase selected_phase_idx = st.session_state.get("phase_selector_widget", 0) st.session_state.selected_phase_idx = selected_phase_idx if 0 <= selected_phase_idx < len(PHASES): _, modules = PHASES[selected_phase_idx] if modules: st.session_state.current_page = modules[0] selected_phase_idx = st.radio( "Navigate by Phase", range(len(phase_titles)), format_func=lambda i: phase_titles[i], index=int(st.session_state.get("selected_phase_idx", current_phase_idx)), key="phase_selector_widget", on_change=on_phase_change, ) st.session_state.selected_phase_idx = selected_phase_idx st.session_state.phase_selector = selected_phase_idx st.divider() st.header("📋 About") st.markdown(""" **OmniBiMol** Integrated protein analysis platform combining: - UniProt: Protein function & annotations - Human Protein Atlas: Expression data - AlphaFold & PDB: Structural information - KEGG: Pathway mapping - GO: Gene ontology annotations - EMBL-EBI: Sequence analysis - NCBI BLAST: Homology search - EMBOSS Needle: Sequence alignment - Sequence Analysis Suite: MSA, Phylogeny, Domains, Motifs - And more... **Features:** - Real-time data retrieval - Interactive visualizations - 24-hour caching - Mobile-responsive design - User-friendly interface - Extensible architecture **Developed by:** Om Shrivastava All rights reserved. """) st.divider() # ── Support section — voluntary, mission-driven ────────────── st.markdown( """
🔬 Support This Research
OmniBiMol is free for the scientific community.
Your support helps keep it running and improving.
☕ Support OmniBiMol
""", unsafe_allow_html=True, ) st.divider() if st.button("🔄 Clear Cache", key="sidebar_clear_cache"): # Use comprehensive cache clearing function clear_app_cache() st.success("✅ Cache and app state cleared. Refreshing...") st.rerun() # Resolve final phase + modules for the dispatcher (routing already ran above) selected_phase_idx = max( 0, min( int(st.session_state.get("selected_phase_idx", 0)), len(PHASES) - 1, ), ) selected_phase_title, phase_modules = PHASES[selected_phase_idx] # Sync current_page if it doesn't belong to the active phase if st.session_state.get("current_page") not in phase_modules: st.session_state.current_page = phase_modules[0] # Mapping of module titles to human-friendly description and render function MODULE_DESCRIPTIONS = { "Sequence Analysis": "Tools for sequence alignment, MSA, domain detection and motif discovery.", "Whole Genome Sequencing": "Genome-scale variant discovery, annotation, and risk scoring.", "Drugs & Clinical Trials": "Search drugs, build clinical trials links and view trial metadata.", "Protein Analysis": "Search, visualize, and analyze protein structure, function, and interactions.", "🧬 Universal Gene Pipeline": "Upload a gene CSV to run universal pathway enrichment with optional protein CSV merge.", } MODULE_RENDERERS = { "Sequence Analysis": render_sequence_analysis_page, "Whole Genome Sequencing": render_whole_genome_sequencing_page, "🧬 Universal Gene Pipeline": render_universal_gene_pipeline_page, "Drugs & Clinical Trials": render_drugs_clinical_trials_page, } def render_module_page(module_name: str) -> None: desc = MODULE_DESCRIPTIONS.get(module_name, "") st.title(module_name) if desc: st.write(desc) # Render existing module content via its renderer renderer = MODULE_RENDERERS.get(module_name) if renderer: renderer() # Display phase title and render only one module per rerun. # Streamlit tabs execute all tab bodies each run, which creates 4-5s stalls # during rapid phase switching when heavy modules are present. st.header(f"📍 {selected_phase_title}") if len(phase_modules) > 1: active_module = st.session_state.get("current_page", phase_modules[0]) if active_module not in phase_modules: active_module = phase_modules[0] active_idx = phase_modules.index(active_module) selected_module = st.radio( "Module", range(len(phase_modules)), index=active_idx, format_func=lambda i: phase_modules[i], key=f"module_selector_{selected_phase_idx}", horizontal=True, ) module_name = phase_modules[int(selected_module)] st.session_state.current_page = module_name if module_name != "Protein Analysis": render_module_page(module_name) else: module_name = phase_modules[0] st.session_state.current_page = module_name if module_name == "Protein Analysis": st.session_state.current_page = "Protein Analysis" else: render_module_page(module_name) # Guard the rest of the script: only execute if Protein Analysis is the current page if st.session_state.get("current_page") != "Protein Analysis": render_footer() return # Render Protein Analysis header st.title("Protein Analysis") st.write(MODULE_DESCRIPTIONS.get("Protein Analysis", "")) # The rest of Protein Analysis code flows below # Define nested helper function for report generation def generate_full_report(prediction: Dict, protein_data: Dict) -> str: """Generate text report of all predictions""" report = f""" COMPREHENSIVE BINDING ANALYSIS REPORT ===================================== Protein: {protein_data.get("uniprot_id", "N/A")} Gene: {protein_data.get("gene_name", "N/A")} Analysis Date: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} BINDING RULES EXTRACTED ----------------------- {json.dumps(prediction.get("binding_rules", {}), indent=2)} TOP PREDICTED BINDERS (KNOWN LIGANDS) ------------------------------------- """ for idx, p in enumerate(prediction.get("known_ligands", [])[:10], 1): comp = p["compound"] report += f""" {idx}. {comp["name"]} Score: {p["predicted_score"]}/100 Confidence: {p["confidence_level"]} ({p["confidence"]:.0%}) Predicted Affinity: {p["predicted_affinity"]:.2f} kcal/mol Experimental: {comp.get("activity_value", "N/A")} {comp.get("activity_units", "")} Reasons: {"; ".join(p["reasons"])} """ report += "\n\nRECOMMENDATIONS\n" report += "-" * 50 + "\n" for rec in prediction.get("recommendations", []): report += f""" {rec["type"]}: {rec["compound"]} Action: {rec["action"]} Priority: {rec["priority"]} """ return report # Main input section st.header("🔍 Protein Search") def trigger_search(): """Callback to trigger search when Enter is pressed""" if st.session_state.get("search_input_key"): st.session_state.trigger_search = True protein_input = st.text_input( "Enter Protein Name or Gene Symbol:", value=st.session_state.get("protein_input", ""), placeholder="e.g., TP53, BRCA1, Insulin (Press Enter to search)", help="Enter a protein name, gene symbol, or UniProt ID", key="search_input_key", on_change=trigger_search, ) # Process search (triggered by Enter key or button) search_clicked = st.button("🔎 Search", key="main_search_button", type="primary", width="stretch") if (st.session_state.get("trigger_search") or search_clicked) and protein_input: with st.spinner("🔍 Searching UniProt database..."): # Search UniProt using cached function search_results = cached_search_uniprot(protein_input, st.session_state.api_client) if not search_results: st.error("❌ No results found. Please check your input and try again.") st.stop() # Store results in session state st.session_state.search_results = search_results st.session_state.show_results = True st.session_state.trigger_search = False st.rerun() # Display search results for confirmation if st.session_state.get("show_results") and st.session_state.get("search_results"): st.subheader("Select Protein:") results = st.session_state.search_results if len(results) == 1: st.info( f"✅ Found: **{results[0]['protein_name']}** ({results[0]['gene_name']}) - {results[0]['uniprot_id']}" ) selected_uniprot_id = results[0]["uniprot_id"] auto_load = True else: # Multiple results - let user choose options = [ f"{r['protein_name']} ({r['gene_name']}) - {r['uniprot_id']} | {r['organism']}" for r in results ] selected_idx = st.radio( "Multiple matches found:", range(len(options)), format_func=lambda i: options[i] ) selected_uniprot_id = results[selected_idx]["uniprot_id"] selected_gene_name = results[selected_idx]["gene_name"] auto_load = False # Get gene name for selected protein if auto_load: selected_gene_name = results[0]["gene_name"] # Confirm and load data if auto_load or st.button( "✅ Confirm Selection", key="protein_confirm_selection", type="primary" ): start_time = time.time() stage_labels = { "uniprot_data": "UniProt annotations", "alphafold_structure": "AlphaFold structure", "pdb_structure": "PDB structures", "kegg_pathways": "KEGG pathways", "chembl_ligands": "ChEMBL ligands", "string_ppi": "STRING network", "literature": "Literature summary", } def render_progress_snapshot(chunk: Dict[str, Any]) -> None: payload = chunk.get("results") or {} completed = int(chunk.get("completed_stages", 0)) total = int(chunk.get("total_stages", 7)) or 7 stage_name = chunk.get("stage", "loading") stage_label = stage_labels.get(stage_name, stage_name) stage_statuses = chunk.get("stage_statuses") or payload.get("stage_statuses") or {} progress_bar.progress(int((completed / total) * 100)) if chunk.get("final"): failed_count = sum( 1 for stage_info in stage_statuses.values() if str(getattr(stage_info, "get", lambda *_: None)("status") or "").lower() in {"timeout", "failed"} ) if failed_count: msg = get_progress_message(stage_label, "warning", completed, total, failed_count) status_placeholder.warning(msg) else: msg = get_progress_message(stage_label, "success", completed, total) status_placeholder.success(msg) elif chunk.get("status") == "timeout": msg = get_progress_message(stage_label, "timeout", completed, total) status_placeholder.warning(msg) elif chunk.get("status") in {"error", "failed"}: msg = get_progress_message(stage_label, "error", completed, total) status_placeholder.warning(msg) elif stage_name == "bootstrap": status_placeholder.info("⏳ Loading analysis...") else: msg = get_progress_message(stage_label, "info", completed, total) status_placeholder.info(msg) with preview_placeholder.container(): uniprot_preview = payload.get("uniprot_data") or {} alpha_preview = payload.get("alphafold_structure") or {} pdb_preview = payload.get("pdb_structure") or {} kegg_preview = payload.get("kegg_pathways") or {} ligands_preview = payload.get("chembl_ligands") or {} ppi_preview = payload.get("string_ppi") or {} # Development only: show detailed stage status app_env = get_environment() if stage_statuses and app_env.is_development(): status_rows = [] for key, value in stage_labels.items(): info = stage_statuses.get(key) if not isinstance(info, dict): continue row_status = str(info.get("status") or "unknown") row_error = str(info.get("error") or "") status_rows.append( { "stage": value, "status": row_status, "error": row_error, } ) if status_rows: st.dataframe( pd.DataFrame(status_rows), hide_index=True, width="stretch", ) c1, c2, c3 = st.columns(3) with c1: st.metric( "Sequence length", f"{uniprot_preview.get('sequence_length', 0):,} aa" if uniprot_preview.get("sequence_length") else "Loading...", ) with c2: structure_count = int(bool(alpha_preview.get("available"))) + int( bool(pdb_preview.get("available")) ) st.metric("Structures", f"{structure_count}" if completed else "Loading...") with c3: pathway_count = len(kegg_preview.get("pathways", [])) if kegg_preview else 0 st.metric( "Pathways", f"{pathway_count}" if pathway_count else "Loading...", ) ready_sections = [ label for key, label in stage_labels.items() if payload.get(key) ] st.caption( "Ready so far: " + (" · ".join(ready_sections) if ready_sections else "starting fetch") ) summary_bits = [] if uniprot_preview: summary_bits.append( f"UniProt {uniprot_preview.get('uniprot_id', selected_uniprot_id)}" ) if uniprot_preview.get("function"): summary_bits.append(uniprot_preview.get("function", "")) if pdb_preview.get("available"): summary_bits.append(f"{pdb_preview.get('count', 0)} PDB entries") if alpha_preview.get("available"): summary_bits.append("AlphaFold model available") if ligands_preview.get("available"): summary_bits.append(f"{len(ligands_preview.get('ligands', []))} ligands") if ppi_preview.get("available"): summary_bits.append( f"{len(ppi_preview.get('interactions', []))} STRING interactions" ) if summary_bits: st.info(" | ".join(summary_bits[:4])) loading_panel = st.container() with loading_panel: st.subheader("Loading protein data") progress_bar = st.progress(0) status_placeholder = st.empty() preview_placeholder = st.empty() progress_snapshot: Dict[str, Any] = {} all_data: Dict[str, Any] = {} received_final_chunk = False try: for chunk in stream_async_safe( st.session_state.api_client.fetch_all_data_stream( selected_uniprot_id, selected_gene_name, ) ): progress_snapshot = dict(chunk.get("results", progress_snapshot)) st.session_state["protein_fetch_progress"] = dict(progress_snapshot) render_progress_snapshot(chunk) if chunk.get("final"): all_data = dict(progress_snapshot) received_final_chunk = True except Exception as exc: app_env = get_environment() if app_env.is_development(): status_placeholder.warning(f"⚠️ Protein fetch stream stopped early: {exc}") logger.error(f"Protein fetch stream error: {exc}", extra=create_log_context("protein_fetch_error", error=str(exc))) else: status_placeholder.warning("⏳ Request interrupted. Displaying available results...") logger.error(f"Protein fetch stream error: {exc}", extra=create_log_context("protein_fetch_error", error=str(exc))) required_data_keys = ("uniprot_data", "tissue_expression", "subcellular") has_required_data = received_final_chunk and all( key in all_data and all_data.get(key) is not None for key in required_data_keys ) fetch_time = time.time() - start_time if has_required_data: st.session_state.current_data = all_data st.session_state.current_uniprot_id = selected_uniprot_id st.session_state.fetch_time = fetch_time st.session_state.show_results = False app_env = get_environment() if app_env.is_development(): st.success(f"✅ Data loaded in {fetch_time:.2f} seconds!") else: st.success("✅ Analysis complete!") st.rerun() else: st.session_state.show_results = True app_env = get_environment() if app_env.is_development(): status_placeholder.warning( "⚠️ Protein fetch did not complete successfully; keeping previous data and search results visible." ) else: status_placeholder.warning( "⏳ Analysis partially complete. Displaying available results..." ) st.error( "Protein search completed, but downstream protein data could not be loaded. " "Search results remain visible so you can retry or choose another match." ) # Display protein data if available if st.session_state.get("current_data"): data = st.session_state.current_data uniprot_data = data["uniprot_data"] tissue_df = data["tissue_expression"] subcellular_df = data["subcellular"] st.divider() # Section 1: Protein Information st.header(f"📖 Protein Information: {st.session_state.current_uniprot_id}") # Metrics row col1, col2, col3, col4 = st.columns(4) with col1: st.markdown( f"""

{uniprot_data.get("sequence_length", 0):,}

Amino Acids

""", unsafe_allow_html=True, ) with col2: st.markdown( f"""

{uniprot_data.get("mass", 0):,.0f}

Molecular Weight (Da)

""", unsafe_allow_html=True, ) with col3: tissue_count = ( len(tissue_df[tissue_df["level_numeric"] > 0]) if not tissue_df.empty else 0 ) st.markdown( f"""

{tissue_count}

Expressed Tissues

""", unsafe_allow_html=True, ) with col4: go_count = sum(len(v) for v in uniprot_data.get("go_terms", {}).values()) st.markdown( f"""

{go_count}

GO Terms

""", unsafe_allow_html=True, ) # Function description st.subheader("🔬 Protein Function") st.markdown( f"""
{uniprot_data.get("function", "No functional annotation available")}
""", unsafe_allow_html=True, ) # GO Terms st.subheader("🏷️ Gene Ontology Terms") go_terms = uniprot_data.get("go_terms", {}) for category, terms in go_terms.items(): if terms: st.markdown(f"**{category}:**") tags_html = "".join([f'{term}' for term in terms[:10]]) st.markdown(tags_html, unsafe_allow_html=True) if len(terms) > 10: with st.expander(f"+{len(terms) - 10} more"): extra_tags_html = "".join( [f'{term}' for term in terms[10:]] ) st.markdown(extra_tags_html, unsafe_allow_html=True) st.markdown("") # GO terms chart if go_count > 0: fig_go = ProteinVisualizer.create_go_terms_chart(go_terms) st.plotly_chart(fig_go, width="stretch") st.divider() # Section 2: FASTA Sequence & BLAST Analysis st.header("🧬 Protein Sequence Analysis") # Create tabs sequence_tabs = st.tabs( [ "📄 FASTA Sequence", "🔬 Sequence Composition", "🔍 BLAST Homology Search", "🧬 EMBL Features & Alignment", ] ) # Tab 1: FASTA Sequence with sequence_tabs[0]: st.subheader("FASTA Format Sequence") sequence = uniprot_data.get("sequence", "") if sequence: # Generate FASTA fasta_sequence = st.session_state.api_client.get_fasta_sequence(uniprot_data) # Display in text area st.text_area( "Protein Sequence (FASTA format):", fasta_sequence, height=300, help="Standard FASTA format with 60 characters per line", ) # Sequence statistics col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Length", f"{len(sequence)} aa") with col2: st.metric("Molecular Weight", f"{uniprot_data.get('mass', 0):,.0f} Da") with col3: # Calculate isoelectric point (simplified) basic = sequence.count("K") + sequence.count("R") + sequence.count("H") acidic = sequence.count("D") + sequence.count("E") st.metric("Basic Residues", basic) with col4: st.metric("Acidic Residues", acidic) # Download options col1, col2 = st.columns(2) with col1: st.download_button( "📥 Download FASTA", fasta_sequence, f"{st.session_state.current_uniprot_id}.fasta", "text/plain", help="Download sequence in FASTA format", ) with col2: st.download_button( "📥 Download Raw Sequence", sequence, f"{st.session_state.current_uniprot_id}_sequence.txt", "text/plain", help="Download sequence without header", ) else: st.warning("⚠️ No sequence data available") # Tab 2: Sequence Composition with sequence_tabs[1]: st.subheader("Amino Acid Composition Analysis") sequence = uniprot_data.get("sequence", "") if sequence: # Analyze composition composition = ProteinVisualizer.analyze_sequence_composition(sequence) # Summary metrics col1, col2, col3 = st.columns(3) with col1: st.markdown( f"""

{composition["hydrophobic_percent"]:.1f}%

Hydrophobic

""", unsafe_allow_html=True, ) with col2: st.markdown( f"""

{composition["polar_percent"]:.1f}%

Polar

""", unsafe_allow_html=True, ) with col3: st.markdown( f"""

{composition["charged_percent"]:.1f}%

Charged

""", unsafe_allow_html=True, ) st.markdown("---") # Composition chart fig_composition = ProteinVisualizer.create_sequence_composition_chart(composition) st.plotly_chart(fig_composition, width="stretch") st.info(""" **Color Legend:** - 🟠 **Orange**: Hydrophobic amino acids (A, V, I, L, M, F, W, P) - 🟢 **Green**: Polar amino acids (S, T, Y, N, Q, C) - 🔴 **Red**: Charged amino acids (K, R, H, D, E) - ⚫ **Gray**: Other (G) """) else: st.warning("⚠️ No sequence data available for analysis") # Tab 3: BLAST / Diamond Search with sequence_tabs[2]: st.subheader("BLAST Homology Search") sequence = uniprot_data.get("sequence", "") if sequence: st.info(""" **About Homology Search:** - ⚡ **SwissProt First**: Fast NCBI BLAST search against curated Swiss-Prot database - 🔄 **Automatic Fallback**: Falls back to comprehensive nr database if SwissProt returns no results - 🧬 Uses full sequence for maximum biological accuracy - 🏆 Returns top 15 matches from the successful database - 💾 Results cached for 24 hours """) # Cache check if ( "blast_results" not in st.session_state or st.session_state.get("blast_protein_id") != st.session_state.current_uniprot_id ): col1, col2 = st.columns([3, 1]) with col1: st.markdown( f"**Full sequence length:** {len(sequence)} amino acids | **Target hits:** 15" ) with col2: run_search = st.button( "🚀 Run Homology Search", type="primary", key="blast_run_search" ) if run_search: status_placeholder = st.empty() debug_placeholder = st.empty() start_time = time.time() elapsed = 0.0 max_search_time = 180 # Slightly higher timeout for remote BLAST status_placeholder.info( "⚡ Running NCBI BLAST (Swiss-Prot with nr fallback)..." ) try: blast_results = cached_run_blast_search( sequence, st.session_state.current_uniprot_id, st.session_state.api_client, ) except ExternalServiceError as e: logger.warning( f"BLAST service error: {e.internal_message}", extra=create_log_context( "blast_search", protein_id=st.session_state.current_uniprot_id, **e.log_details, ), ) blast_results = { "available": False, "error": "🔍 BLAST search is temporarily unavailable. This is often due to NCBI service load. Please try again in a few moments.", } except Exception as e: logger.exception( "Unexpected error during BLAST search", extra=create_log_context( "blast_search", protein_id=st.session_state.current_uniprot_id, error_type=type(e).__name__, ), ) blast_results = { "available": False, "error": "❌ An unexpected error occurred during BLAST search. Please contact support if this persists.", } elapsed = time.time() - start_time status_placeholder.empty() debug_placeholder.empty() # Store results st.session_state.blast_results = blast_results st.session_state.blast_protein_id = st.session_state.current_uniprot_id st.session_state.blast_time = elapsed st.rerun() # ---------------- DISPLAY RESULTS ---------------- if ( "blast_results" in st.session_state and st.session_state.get("blast_protein_id") == st.session_state.current_uniprot_id ): blast_data = st.session_state.blast_results if blast_data.get("available") and blast_data.get("hits"): hits = blast_data["hits"] elapsed = st.session_state.get("blast_time", 0) engine = blast_data.get("engine", "BLAST") database = blast_data.get("database", "nr") st.success( f"✅ Found {len(hits)} homologous proteins " f"using **{engine}** in {elapsed:.1f}s" ) col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Total Hits", len(hits)) with col2: high_identity = len([h for h in hits if h["identity_percent"] >= 80]) st.metric("High Identity (≥80%)", high_identity) with col3: avg_identity = sum(h["identity_percent"] for h in hits) / len(hits) st.metric("Avg Identity", f"{avg_identity:.1f}%") with col4: st.metric("Database", database) st.markdown("---") blast_table_html = ProteinVisualizer.create_blast_results_table_html(hits) st.components.v1.html(blast_table_html, height=800, scrolling=True) st.markdown("---") col1, col2 = st.columns(2) with col1: blast_df = pd.DataFrame(hits) csv_blast = blast_df.to_csv(index=False) st.download_button( "📥 Download Results (CSV)", csv_blast, f"{st.session_state.current_uniprot_id}_homology_results.csv", "text/csv", key="blast_download_csv", ) with col2: accessions = "\n".join( f">{h['accession']} {h['organism']}\n" f"# Identity: {h['identity_percent']}%" for h in hits ) st.download_button( "📥 Download Accession List", accessions, f"{st.session_state.current_uniprot_id}_accessions.txt", "text/plain", key="blast_download_accessions", ) if st.button("🔄 Run New Search", key="blast_run_new"): for key in ["blast_results", "blast_protein_id", "blast_time"]: st.session_state.pop(key, None) st.rerun() elif blast_data.get("error"): st.error(f"❌ Search failed: {blast_data['error']}") else: st.warning("⚠️ No significant homologs found") else: st.warning("⚠️ No sequence data available for homology search") # Tab 4: EMBL Features & Needle Alignment with sequence_tabs[3]: st.subheader("EMBL-EBI Sequence Analysis") sequence = uniprot_data.get("sequence", "") if sequence: # Create sub-tabs embl_subtabs = st.tabs(["🗺️ Protein Features", "⚡ Pairwise Alignment (Needle)"]) # Sub-tab 1: Protein Features with embl_subtabs[0]: st.markdown("**Protein Domain & Feature Annotations from EMBL-EBI**") # Check if EMBL data exists if ( "embl_features" not in st.session_state or st.session_state.get("embl_protein_id") != st.session_state.current_uniprot_id ): with st.spinner("📡 Fetching feature annotations from EMBL-EBI..."): embl_data = cached_fetch_embl_sequence( st.session_state.current_uniprot_id, st.session_state.api_client ) st.session_state.embl_features = embl_data st.session_state.embl_protein_id = st.session_state.current_uniprot_id embl_data = st.session_state.embl_features if embl_data.get("available") and embl_data.get("features"): features = embl_data["features"] st.success(f"✅ Found {len(features)} annotated features") # Feature statistics col1, col2, col3 = st.columns(3) # Count feature types feature_type_counts = {} for feat in features: ftype = feat.get("type", "Other") feature_type_counts[ftype] = feature_type_counts.get(ftype, 0) + 1 with col1: st.metric("Total Features", len(features)) with col2: st.metric("Feature Types", len(feature_type_counts)) with col3: # Find longest feature max_length = max([f.get("length", 0) for f in features], default=0) st.metric("Longest Feature", f"{max_length} aa") st.markdown("---") # Feature map visualization fig_features = ProteinVisualizer.create_feature_map( features, uniprot_data.get("sequence_length", len(sequence)) ) st.plotly_chart(fig_features, width="stretch") # Detailed feature table st.subheader("📋 Feature Details") feature_df = pd.DataFrame( [ { "Type": f.get("type", "Unknown"), "Description": f.get("description", "N/A"), "Start": f.get("start", 0), "End": f.get("end", 0), "Length": f.get("length", 0), } for f in features ] ) # Add filter feature_type_filter = st.multiselect( "Filter by feature type:", options=list(feature_type_counts.keys()), default=list(feature_type_counts.keys()), ) filtered_df = feature_df[feature_df["Type"].isin(feature_type_filter)] st.dataframe(filtered_df, width="stretch", hide_index=True) # Download csv_features = filtered_df.to_csv(index=False) st.download_button( "📥 Download Features", csv_features, f"{st.session_state.current_uniprot_id}_features.csv", "text/csv", ) else: st.info("ℹ️ No additional feature annotations available from EMBL-EBI") # Sub-tab 2: Needle Alignment with embl_subtabs[1]: st.markdown("**EMBOSS Needle - Global Pairwise Sequence Alignment**") st.info(""" **About Needle Alignment:** - Uses Needleman-Wunsch algorithm for global alignment - Compares your protein sequence with another sequence - Shows identity, similarity, gaps, and alignment score - Takes ~10-30 seconds to complete """) # Input for second sequence col1, col2 = st.columns([3, 1]) with col1: compare_option = st.radio( "Compare with:", ["Paste sequence", "Use UniProt ID"], horizontal=True ) sequence2 = "" seq2_id = "Sequence_2" if compare_option == "Paste sequence": sequence2_input = st.text_area( "Enter second sequence (FASTA or plain text):", height=150, placeholder=">Protein_Name\nMKWVTFISLLFLFSSAYS...\n\nOr paste plain sequence:\nMKWVTFISLLFLFSSAYS...", ) if sequence2_input: # Clean and parse input sequence2_input = sequence2_input.strip() # Parse if FASTA format if sequence2_input.startswith(">"): lines = sequence2_input.split("\n") seq2_id = lines[0][1:].strip().split()[0] if not seq2_id: seq2_id = "Sequence_2" sequence2 = "".join(lines[1:]) else: seq2_id = "Pasted_Sequence" sequence2 = sequence2_input # Remove all whitespace, numbers, and non-letter characters sequence2 = "".join(c for c in sequence2.upper() if c.isalpha()) # Validate sequence valid_aa = set("ACDEFGHIKLMNPQRSTVWY") invalid_chars = set(sequence2) - valid_aa if invalid_chars: st.warning( f"⚠️ Found non-standard amino acids: {', '.join(sorted(invalid_chars))}" ) st.info("Only standard 20 amino acids will be used for alignment") # Remove invalid characters sequence2 = "".join(c for c in sequence2 if c in valid_aa) if len(sequence2) < 10: st.error("❌ Sequence too short (minimum 10 amino acids)") sequence2 = "" elif len(sequence2) > 50000: st.error("❌ Sequence too long (maximum 50,000 amino acids)") sequence2 = "" else: st.success( f"✅ Parsed sequence: {len(sequence2)} amino acids (ID: {seq2_id})" ) st.session_state.compare_sequence = sequence2 st.session_state.compare_id = seq2_id else: compare_uniprot = st.text_input( "Enter UniProt ID:", placeholder="e.g., P04637, P38398" ) if compare_uniprot and st.button( "🔍 Fetch Sequence", key="needle_fetch_sequence" ): with st.spinner("Fetching sequence..."): compare_data = cached_fetch_uniprot_data( compare_uniprot, st.session_state.api_client ) if compare_data.get("sequence"): sequence2 = compare_data["sequence"] seq2_id = compare_uniprot st.success(f"✅ Loaded sequence from {compare_uniprot}") st.session_state.compare_sequence = sequence2 st.session_state.compare_id = seq2_id # Use stored sequence if available if "compare_sequence" in st.session_state: sequence2 = st.session_state.compare_sequence seq2_id = st.session_state.compare_id st.markdown("---") # Run alignment if sequence2: run_needle = st.button( "⚡ Run Needle Alignment", key="needle_run_alignment", type="primary" ) if run_needle: with st.spinner( "🧬 Running global alignment... This may take 10-30 seconds..." ): needle_results = cached_run_needle_alignment( sequence, sequence2, st.session_state.current_uniprot_id, seq2_id, st.session_state.api_client, ) st.session_state.needle_results = needle_results st.rerun() # Display alignment results if "needle_results" in st.session_state: needle_data = st.session_state.needle_results if needle_data.get("available"): # Show alignment visualization alignment_html = ProteinVisualizer.create_alignment_visualization( needle_data ) st.components.v1.html(alignment_html, height=800, scrolling=True) # Interpretation st.subheader("📊 Interpretation") identity = needle_data.get("identity", 0) if identity >= 70: st.success( "✅ **High similarity** - Sequences are highly related (likely orthologs or close homologs)" ) elif identity >= 40: st.warning( "⚠️ **Moderate similarity** - Sequences share common ancestry but have diverged" ) else: st.info( "ℹ️ **Low similarity** - Sequences are distantly related or unrelated" ) # Download alignment st.download_button( "📥 Download Alignment", needle_data.get("alignment_text", ""), f"alignment_{st.session_state.current_uniprot_id}_vs_{seq2_id}.txt", "text/plain", ) # Clear results if st.button("🔄 Run New Alignment", key="needle_run_new_alignment"): del st.session_state.needle_results if "compare_sequence" in st.session_state: del st.session_state.compare_sequence del st.session_state.compare_id st.rerun() elif needle_data.get("error"): error_msg = needle_data.get("error") st.error(f"❌ Alignment failed: {error_msg}") # Provide helpful suggestions if "400" in error_msg: st.info( "💡 **Tip:** Check that both sequences contain only valid amino acid letters (A-Z)." ) elif "timed out" in error_msg.lower(): st.info( "💡 **Tip:** Alignment is taking too long. Try with shorter sequences." ) if st.button("🔄 Try Again", key="needle_try_again"): del st.session_state.needle_results st.rerun() else: st.info("👆 Enter a second sequence above and click 'Run Needle Alignment'") else: st.warning("⚠️ No sequence data available for EMBL analysis") st.divider() # Section 3: 3D Protein Structure st.header("🧊 3D Protein Structure") alphafold_data = data.get("alphafold_structure", {}) pdb_data = data.get("pdb_structure", {}) # Create tabs for different structure types if pdb_data.get("available") and alphafold_data.get("available"): structure_tabs = st.tabs(["📊 Experimental (PDB)", "🤖 Predicted (AlphaFold)"]) elif pdb_data.get("available"): structure_tabs = st.tabs(["📊 Experimental (PDB)"]) elif alphafold_data.get("available"): structure_tabs = st.tabs(["🤖 Predicted (AlphaFold)"]) else: st.warning("⚠️ No 3D structure available for this protein") structure_tabs = None if structure_tabs: tab_index = 0 # Experimental structure tab if pdb_data.get("available"): with structure_tabs[tab_index]: st.markdown("**Available Experimental Structures:**") # Show all available PDB structures pdb_structures = pdb_data.get("structures", []) for idx, struct in enumerate(pdb_structures[:5]): # Show first 5 col1, col2, col3, col4, col5 = st.columns([2, 2, 2, 2, 2]) with col1: st.markdown(f"**PDB ID:** [{struct['pdb_id']}]({struct['rcsb_page']})") with col2: st.markdown(f"**Method:** {struct['method']}") with col3: st.markdown(f"**Resolution:** {struct['resolution']}") with col4: ligand_summary = struct.get("ligand_summary") if ligand_summary is None: if "bound_components" in struct: bound_components = struct.get("bound_components") or [] ligand_summary = ( ", ".join(bound_components) if bound_components else "None annotated" ) else: ligand_summary = "Not fetched" elif not ligand_summary: bound_components = struct.get("bound_components") or [] ligand_summary = ( ", ".join(bound_components) if bound_components else "None annotated" ) st.markdown(f"**Bound components:** {ligand_summary}") with col5: if idx == 0: st.markdown("✅ **Displayed below**") if len(pdb_structures) > 5: st.info( f"+ {len(pdb_structures) - 5} more structures available on RCSB PDB" ) st.markdown("---") # Display 3D viewer for PDB viewer_html = ProteinVisualizer.create_structure_viewer(pdb_data, "pdb") st.components.v1.html(viewer_html, height=600) # Download option pdb_file_content = None try: pdb_url = pdb_structures[0]["pdb_url"] try: response = requests.get(pdb_url, timeout=10) if response.status_code == 200: pdb_file_content = response.text else: raise ExternalServiceError( user_message="Could not retrieve PDB file. Please try again.", internal_message=f"PDB server returned status {response.status_code}", log_details={ "pdb_url": pdb_url, "status_code": response.status_code, }, ) except requests.Timeout: logger.warning( "PDB file download timeout", extra=create_log_context( "pdb_download", pdb_id=pdb_structures[0]["pdb_id"] ), ) raise ExternalServiceError( user_message="PDB file download took too long. Please try again.", internal_message="PDB server request timeout", log_details={"pdb_url": pdb_url, "timeout": 10}, ) except ExternalServiceError: raise except Exception as e: logger.exception( "Unexpected error downloading PDB file", extra=create_log_context( "pdb_download", pdb_id=pdb_structures[0].get("pdb_id"), error_type=type(e).__name__, ), ) pdb_file_content = ( "❌ Error fetching PDB file. Please try again or contact support." ) st.download_button( "📥 Download PDB File", data=pdb_file_content, file_name=f"{pdb_structures[0]['pdb_id']}.pdb", mime="text/plain", ) except Exception: # Outer exception handler for unexpected errors pdb_file_content = ( "❌ Error fetching PDB file. Please try again or contact support." ) st.download_button( "📥 Download PDB File", data=pdb_file_content, file_name="structure.pdb", mime="text/plain", disabled=True, ) tab_index += 1 # AlphaFold structure tab if alphafold_data.get("available"): with structure_tabs[tab_index]: col1, col2 = st.columns([1, 1]) with col1: st.markdown(f""" **AlphaFold Database Entry** - **UniProt ID:** {alphafold_data.get("uniprot_id")} - **Gene:** {alphafold_data.get("gene_name", "N/A")} - **Model Version:** v{alphafold_data.get("model_version", 4)} - **[View on AlphaFold DB]({alphafold_data.get("alphafold_page")})** - **[Download PDB]({alphafold_data.get("pdb_url")})** """) with col2: st.info(""" **Structure Display** The 3D model is shown with Mol* default structure rendering. AlphaFold confidence is shown in the pLDDT chart below from the current model file. """) st.markdown("---") # Display 3D viewer viewer_html = ProteinVisualizer.create_structure_viewer( alphafold_data, "alphafold" ) st.components.v1.html(viewer_html, height=600) # Confidence plot st.subheader("📈 Prediction Confidence") fig_confidence = ProteinVisualizer.create_confidence_plot( st.session_state.current_uniprot_id, alphafold_data.get("entry_id"), alphafold_data.get("pdb_url"), ) st.plotly_chart(fig_confidence, width="stretch") # Download options col1, col2 = st.columns(2) with col1: st.markdown(f"[📥 Download PDB File]({alphafold_data.get('pdb_url')})") with col2: st.markdown(f"[📥 Download PAE Data]({alphafold_data.get('pae_url')})") st.divider() # Section 4: Tissue Expression st.header("🧫 Tissue Expression Analysis") if not tissue_df.empty: # Prepare data chart_data = ProteinAPIClient.DataProcessor.prepare_tissue_chart_data( tissue_df, top_n=20 ) # Create and display chart fig_tissue = ProteinVisualizer.create_tissue_expression_chart(chart_data) st.plotly_chart(fig_tissue, width="stretch") # Expression summary high_tissues = tissue_df[tissue_df["level"] == "High"]["tissue"].tolist() if high_tissues: st.info( f"**High expression detected in:** {', '.join(high_tissues[:5])}" + ( f" and {len(high_tissues) - 5} more tissues" if len(high_tissues) > 5 else "" ) ) else: st.warning("⚠️ No tissue expression data available from Human Protein Atlas") st.divider() # Section 5: Subcellular Localization st.header("📍 Subcellular Localization") localization_threshold = st.slider( "Evidence filter threshold", min_value=0.0, max_value=1.0, value=0.6, step=0.05, help="Predictions below this confidence are marked as not evidence-passed.", key="localization_threshold", ) protein_sequence = uniprot_data.get("sequence", "") localization_prediction = None if protein_sequence: try: localization_prediction = cached_predict_protein_localization( protein_sequence, localization_threshold, st.session_state.api_client, ) except Exception as exc: logger.warning( "Localization prediction unavailable: %s", exc, extra=create_log_context( "protein_localization", protein_id=st.session_state.current_uniprot_id, error_type=type(exc).__name__, ), ) # Try to introspect backend health and, if only model artifacts are missing, # request the backend's fallback (rule-based) prediction instead of showing an error. try: health = st.session_state.api_client.get_protein_localization_health() except Exception: health = {} if isinstance(health, dict) and health.get("fallback"): try: localization_prediction = st.session_state.api_client.predict_protein_localization( sequence=protein_sequence, confidence_threshold=localization_threshold, timeout=30.0, ) except Exception as exc2: logger.warning("Fallback localization call failed: %s", exc2) localization_prediction = { "error": "Localization model is temporarily unavailable.", "detail": str(exc2), } else: # Surface backend load message when available to help debugging load_error = health.get("load_error") if isinstance(health, dict) else None localization_prediction = { "error": "Localization model is temporarily unavailable.", "detail": load_error or str(exc), } left_col, right_col = st.columns([1, 1]) with left_col: if localization_prediction and localization_prediction.get("error"): st.warning(f"⚠️ {localization_prediction['error']}") elif localization_prediction: st.markdown(f"### {localization_prediction.get('localization', 'Unknown')}") st.progress( min(max(float(localization_prediction.get("confidence", 0.0)), 0.0), 1.0) ) st.caption(f"Confidence: {localization_prediction.get('confidence', 0.0):.3f}") st.caption( f"Evidence pass: {'Yes' if localization_prediction.get('evidence_passed') else 'No'} | " f"Sequence length: {localization_prediction.get('sequence_length', len(protein_sequence))} aa" ) st.markdown( f"""

{localization_prediction.get("experimental_followup_score", 0):.1f}

Experimental Follow-Up Score

""", unsafe_allow_html=True, ) st.info( localization_prediction.get( "recommended_assay", "No assay recommendation available." ) ) if localization_prediction.get("all_probabilities"): st.dataframe( pd.DataFrame( [ {"Compartment": label, "Probability": probability} for label, probability in localization_prediction[ "all_probabilities" ].items() ] ).sort_values("Probability", ascending=False), width="stretch", hide_index=True, ) else: st.info( "⚠️ Run localization inference to see a predicted compartment and evidence guidance." ) with right_col: if not subcellular_df.empty: fig_subcellular = ProteinVisualizer.create_subcellular_heatmap(subcellular_df) st.plotly_chart(fig_subcellular, width="stretch") st.markdown("**Detected Locations:**") for idx, row in subcellular_df.iterrows(): st.markdown(f"- **{row['location']}** ({row['reliability']} confidence)") else: st.warning("⚠️ No subcellular localization data available from Human Protein Atlas") st.divider() # Section 6: KEGG Pathways for Proteins st.header("🧬 KEGG Pathways for Proteins") kegg_data = data.get("kegg_pathways", {}) if kegg_data.get("available"): # Summary metrics total_pathways = kegg_data.get("total_pathways", 0) col1, col2, col3 = st.columns(3) with col1: st.markdown( f"""

{total_pathways}

Total Pathways Found

""", unsafe_allow_html=True, ) with col2: st.markdown( f"""

{kegg_data.get("kegg_protein_id", "N/A")}

KEGG Protein ID

""", unsafe_allow_html=True, ) with col3: st.markdown( f"""

{kegg_data.get("protein_name", "N/A")}

Protein Name

""", unsafe_allow_html=True, ) st.markdown("---") # Create tabs for different display formats pathway_tabs = st.tabs( ["🖼️ Primary Pathway Map", "📋 Next 5 Pathways", "🔗 All Pathways Links"] ) # Tab 1: First Result with Full Details & Pathway Map first_result = kegg_data.get("first_result") with pathway_tabs[0]: if first_result: st.subheader( f"🏆 Primary Pathway: {first_result.get('pathway_name', 'Unknown')}" ) # Display all metadata col1, col2 = st.columns([1, 1]) with col1: st.markdown("**Pathway Details:**") st.markdown(f"- **ID:** `{first_result.get('pathway_id', 'N/A')}`") st.markdown(f"- **Name:** {first_result.get('pathway_name', 'N/A')}") if first_result.get("pathway_description"): st.markdown( f"- **Description:** {first_result.get('pathway_description', 'N/A')}" ) if first_result.get("pathway_class"): st.markdown( f"- **Classification:** {first_result.get('pathway_class', 'N/A')}" ) with col2: st.markdown("**Molecular Functions:**") functions = first_result.get("molecular_functions", []) if functions: for func in functions[:10]: # Limit to 10 functions st.markdown(f"• {func}") else: st.markdown("*No specific molecular functions listed*") st.markdown("---") # Display interactive pathway map (with graceful fallback) st.markdown("**Pathway Map (Interactive):**") try: render_kegg_interactive_pathway( first_result, kegg_protein_id=kegg_data.get("kegg_protein_id") ) except Exception: # Absolute fallback to original static image in case anything above fails try: st.image( first_result.get("kegg_image_url", ""), width="stretch", caption=f"{first_result.get('pathway_name')} - Visual representation from KEGG", ) except Exception: st.warning( f"Could not load pathway map image. " f"[View on KEGG Website]({first_result.get('kegg_url', '#')})" ) st.markdown("---") # Direct links col_link1, col_link2 = st.columns(2) with col_link1: st.markdown( f"**[📌 View on KEGG Website]({first_result.get('kegg_url', '#')})**" ) with col_link2: st.markdown( f"**[🔗 KEGG Gene Entry Page]({first_result.get('highlight_url', '#')})**" ) else: st.info("No primary pathway data available") # Tab 2: Next 5 Results next_results = kegg_data.get("next_results", []) with pathway_tabs[1]: if next_results: st.subheader("📊 Next 5 Pathways Associated with Protein") for idx, pathway in enumerate(next_results, 1): with st.container(): col1, col2, col3 = st.columns([2, 1, 1]) with col1: st.markdown(f"**{idx}. {pathway.get('pathway_name', 'Unknown')}**") if pathway.get("pathway_class"): st.caption(f"Class: {pathway.get('pathway_class', '')}") with col2: st.markdown(f"`{pathway.get('pathway_id', 'N/A')}`") with col3: st.markdown(f"**[View →]({pathway.get('kegg_url', '#')})**") st.divider() else: st.info("Less than 6 pathways found for this protein") # Tab 3: All Pathways Links all_pathways = kegg_data.get("pathways", []) with pathway_tabs[2]: st.subheader(f"🔗 All {len(all_pathways)} Associated Pathways") # Add filter and sort options col1, col2 = st.columns([3, 1]) with col1: search_term = st.text_input( "🔍 Search pathways:", placeholder="e.g., cancer, metabolism, signaling" ) with col2: sort_option = st.selectbox("Sort by:", ["Name", "ID"]) # Filter pathways filtered_pathways = all_pathways if search_term: search_term = search_term.lower() filtered_pathways = [ p for p in all_pathways if search_term in p.get("pathway_name", "").lower() or search_term in p.get("pathway_id", "").lower() ] # Sort pathways if sort_option == "Name": filtered_pathways = sorted( filtered_pathways, key=lambda x: x.get("pathway_name", "") ) elif sort_option == "ID": filtered_pathways = sorted( filtered_pathways, key=lambda x: x.get("pathway_id", "") ) # Display as table st.markdown("| # | Pathway Name | ID | KEGG Link |") st.markdown("|---|---|---|---|") for idx, pathway in enumerate(filtered_pathways, 1): pathway_name = pathway.get("pathway_name", "Unknown") pathway_id = pathway.get("pathway_id", "N/A") kegg_url = pathway.get("kegg_url", "#") st.markdown( f"| {idx} | {pathway_name} | `{pathway_id}` | [View Pathway]({kegg_url}) |" ) st.caption(f"Showing {len(filtered_pathways)} of {len(all_pathways)} pathways") # Download pathway data st.markdown("---") st.subheader("💾 Export Pathway Data") # Create DataFrame for export pathway_df = pd.DataFrame( [ { "Pathway_Name": p["pathway_name"], "Pathway_ID": p["pathway_id"], "Classification": p.get("pathway_class", ""), "Description": p.get("pathway_description", ""), "KEGG_URL": p["kegg_url"], "Highlighted_URL": p["highlight_url"], } for p in all_pathways ] ) csv_pathways = pathway_df.to_csv(index=False) st.download_button( "📥 Download Pathway List (CSV)", csv_pathways, f"{st.session_state.current_uniprot_id}_kegg_pathways.csv", "text/csv", ) else: st.warning( f"⚠️ No KEGG pathway data found for gene: {kegg_data.get('gene_name', 'Unknown')}" ) st.info(""" **Why might this happen?** - Gene name not recognized in KEGG database - Protein not associated with metabolic/signaling pathways - Limited annotation in KEGG for this specific protein Try searching directly on [KEGG website](https://www.kegg.jp/) """) st.divider() # Section: STRING Protein-Protein Interactions st.header("🔗 Protein-Protein Interaction Network (STRING)") string_data = data.get("string_ppi", {}) st.info(""" **About STRING Database:** - Comprehensive protein-protein interaction database - Combines experimental data, computational prediction, and text mining - Confidence scores from 0-1000 (higher = more reliable) """) if string_data.get("available") and string_data.get("interactions"): interactions = string_data["interactions"] gene_name = string_data.get("gene_name", st.session_state.current_uniprot_id) # Summary metrics col1, col2, col3 = st.columns(3) with col1: st.metric("Total Interactions", len(interactions)) with col2: high_conf = [i for i in interactions if i["combined_score"] >= 700] st.metric("High Confidence (≥700)", len(high_conf)) with col3: st.metric("STRING Protein ID", string_data.get("string_id", "N/A")) # Additional confidence metrics col4, col5, col6 = st.columns(3) with col4: highest_conf = [i for i in interactions if i["combined_score"] >= 900] st.metric("Highest Confidence (≥900)", len(highest_conf)) with col5: medium_conf = [ i for i in interactions if i["combined_score"] >= 400 and i["combined_score"] < 700 ] st.metric("Medium Confidence (≥400)", len(medium_conf)) with col6: low_conf = [i for i in interactions if i["combined_score"] < 400] st.metric("Low Confidence (<400)", len(low_conf)) st.markdown("---") # Create tabs ppi_tabs = st.tabs(["🕸️ Network Graph", "📋 Interaction Table"]) # Tab 1: Network visualization with ppi_tabs[0]: network_fig = ProteinVisualizer.create_ppi_network_chart(interactions, gene_name) st.plotly_chart(network_fig, width="stretch") st.caption(""" **Color Legend:** 🔴 Red = Query protein | 🔵 Dark Blue = Highest confidence (≥900) | 🟢 Green = High confidence (≥700) | 🟠 Orange = Medium confidence (≥400) | ⚪ Gray = Low confidence (<400) """) # Tab 2: Interaction table with ppi_tabs[1]: st.subheader("Protein Interaction Partners") # Display interactions in a table ppi_table_html = ProteinVisualizer.create_ppi_table_html(interactions) st.components.v1.html(ppi_table_html, height=600, scrolling=True) st.markdown("---") # External links col1, col2 = st.columns(2) with col1: st.markdown( f"**[🔗 View on STRING Database]({string_data.get('string_url', '#')})**" ) with col2: st.markdown(f"**[📊 Network Image]({string_data.get('network_image_url', '#')})**") st.markdown("---") # Download interaction data interaction_df = pd.DataFrame(interactions) csv_interactions = interaction_df.to_csv(index=False) st.download_button( "📥 Download Interaction Data", csv_interactions, f"{st.session_state.current_uniprot_id}_string_interactions.csv", "text/csv", ) else: st.warning( f"⚠️ No STRING interaction data found for {st.session_state.current_uniprot_id}" ) error_msg = string_data.get("error", "Unknown error") st.info(f""" **Possible reasons:** - Protein not found in STRING database (Gene: {string_data.get("gene_name", "Unknown")}) - Limited experimental or predicted interaction data - Protein may have few known interactors **Error:** {error_msg} """) st.divider() # Section 7: Molecular Docking with AutoDock Vina st.header("💊 Molecular Docking Analysis") chembl_data = data.get("chembl_ligands", {}) st.info(""" **About Molecular Docking:** - Predicts how small molecules (ligands/drugs) bind to proteins - Uses AutoDock Vina algorithm for binding affinity calculation - Negative values indicate favorable binding (more negative = stronger binding) - Typical drug-like binding: -7 to -12 kcal/mol - 3D visualization of ligand orientation and binding prediction """) docking_mode_options = ["Use Real Docking (Beta)", "Use Simulation"] default_mode_index = 0 if _get_docking_mode_choice() == "Use Real Docking (Beta)" else 1 st.radio( "Docking mode", docking_mode_options, index=default_mode_index, horizontal=True, key="docking_mode_choice", ) if _get_docking_mode_value() == "real": st.info( "Real docking submits a job to the docking worker. The UI stays responsive while the worker runs." ) else: st.info("Simulation mode keeps the current fast local fallback.") protein_prep = st.session_state.api_client.prepare_protein_for_docking( uniprot_data, data.get("pdb_structure", {}), data.get("alphafold_structure", {}), ) # Create tabs docking_tabs = st.tabs( [ "📚 Known Ligands", "🎯 Binding Predictor", "🔮 Ligand Binding Prediction", "🧪 Custom Docking", "📊 Docking Results", ] ) # Tab 1: Known Ligands from ChEMBL with docking_tabs[0]: st.subheader("Known Inhibitors & Ligands from ChEMBL") if chembl_data.get("available") and chembl_data.get("ligands"): ligands = chembl_data["ligands"] # Summary metrics col1, col2, col3 = st.columns(3) with col1: st.metric("Total Ligands", len(ligands)) with col2: strong_binders = [ l for l in ligands if l.get("activity_value", float("inf")) < 100 ] st.metric("Strong Binders (<100nM)", len(strong_binders)) with col3: st.metric("ChEMBL Target", chembl_data.get("chembl_target_id", "N/A")) st.markdown("---") # Display ligand cards with Dock buttons st.info( "💡 **Tip:** Click the 'Dock' button next to any ligand to run molecular docking simulation" ) # Show success message if docking was just completed if st.session_state.get("show_docking_success"): st.success( f"✅ Docking complete for {st.session_state.get('docked_ligand_name')}! 📊 Go to **Docking Results** tab to view results." ) st.session_state.show_docking_success = False for idx, ligand in enumerate(ligands[:20]): # Show top 20 with st.expander( f"🧪 {ligand.get('name', ligand.get('chembl_id'))} - {ligand.get('activity_value', 'N/A')} {ligand.get('activity_units', 'nM')}" ): col_img, col_info, col_action = st.columns([1, 2, 1]) with col_img: # Structure image img_url = f"https://www.ebi.ac.uk/chembl/api/data/image/{ligand.get('chembl_id')}.svg" st.image(img_url, width=150) with col_info: st.markdown( f"**ChEMBL ID:** [{ligand.get('chembl_id')}]({ligand.get('chembl_url', '#')})" ) st.markdown(f"**Activity:** {ligand.get('activity_type', 'N/A')}") st.markdown( f"**Value:** {ligand.get('activity_value', 'N/A')} {ligand.get('activity_units', 'nM')}" ) mw = ligand.get("molecular_weight") if mw and mw != "N/A": st.markdown(f"**MW:** {float(mw):.1f} Da") with col_action: if st.button(f"🎯 Dock", key=f"dock_ligand_{idx}"): # Store ligand for docking st.session_state.selected_ligand_for_docking = { "chembl_id": ligand.get("chembl_id"), "name": ligand.get("name", ligand.get("chembl_id")), "smiles": ligand.get("smiles", ""), "mw": ligand.get("molecular_weight", 0), "activity_value": ligand.get("activity_value", None), } # Run docking using the selected mode docking_result = _run_shared_docking( protein_prep=protein_prep, selected_ligand=st.session_state.selected_ligand_for_docking, ligand_name=st.session_state.selected_ligand_for_docking[ "name" ], protein_length=uniprot_data.get("sequence_length", 500), exhaustiveness=8, num_modes=9, energy_range=3, ) # Store results and ligand data for display st.session_state.docking_results = docking_result st.session_state.docked_ligand_name = ligand.get( "name", ligand.get("chembl_id") ) st.session_state.docked_ligand_data = { "chembl_id": ligand.get("chembl_id"), "name": ligand.get("name", ligand.get("chembl_id")), "smiles": ligand.get("smiles", ""), "molecular_weight": ligand.get("molecular_weight", 0), } # Store protein structure data (from AlphaFold or PDB if available) protein_struct = data.get("alphafold_structure", {}) if not protein_struct.get("available"): protein_struct = data.get("pdb_structure", {}) st.session_state.protein_structure = protein_struct st.session_state.show_docking_success = True st.rerun() st.markdown("---") # Download ligand data ligand_df = pd.DataFrame( [ { "ChEMBL_ID": str(l["chembl_id"]), "Name": str(l["name"]), "SMILES": str(l.get("smiles", "")), "Activity_Type": str(l["activity_type"]), "Activity_Value": str(l["activity_value"]) if l["activity_value"] is not None else "N/A", "Units": str(l["activity_units"]), "Molecular_Weight": str(l.get("molecular_weight", "N/A")), } for l in ligands ] ) csv_ligands = ligand_df.to_csv(index=False) st.download_button( "📥 Download Ligand Data", csv_ligands, f"{st.session_state.current_uniprot_id}_ligands.csv", "text/csv", key="download_ligands", ) else: st.warning( f"⚠️ No known ligands found in ChEMBL for {st.session_state.current_uniprot_id}" ) st.info(""" **Possible reasons:** - Protein not yet studied as drug target - No bioactivity data available in ChEMBL - Protein may not be druggable You can still try custom docking in the next tab! """) # Tab 2: Binding Predictor with docking_tabs[1]: st.subheader("🎯 AI-Powered Binding Predictor & Drug Discovery") st.markdown(""" **Comprehensive Binding Analysis:** - ✅ Predict binding for **known ligands** (from ChEMBL) - 🧬 Find **similar compounds** with binding potential - 📊 ML-based scoring with confidence levels """) # Create sub-tabs predictor_subtabs = st.tabs( ["🏆 Known Ligands Analysis", "🧪 Similar Compounds", "📋 Comprehensive Report"] ) # Sub-tab 1: Known Ligands Prediction with predictor_subtabs[0]: st.markdown("### Known Ligands Binding Prediction") if chembl_data.get("available") and chembl_data.get("ligands"): ligands = chembl_data["ligands"] col1, col2 = st.columns([3, 1]) with col1: st.info(f"📊 Analyzing {len(ligands)} known ligands from ChEMBL") with col2: if st.button("🔮 Predict Binding", type="primary", key="predict_known"): with st.spinner("🧠 Running ML-based binding prediction..."): prediction = ProteinVisualizer.advanced_binding_prediction( ligands, uniprot_data, novel_compounds=None ) st.session_state.binding_prediction = prediction st.rerun() # Display results if "binding_prediction" in st.session_state: pred = st.session_state.binding_prediction known_preds = pred.get("known_ligands", []) if known_preds: # Highlight top 3 st.success("✅ **Prediction Complete - Top 3 Predicted Binders:**") for idx, p in enumerate(known_preds[:3], 1): comp = p["compound"] with st.expander( f"#{idx} {comp['name']} - Score: {p['predicted_score']}/100 " f"({p['confidence_level']} confidence)", expanded=(idx == 1), ): col1, col2, col3 = st.columns([2, 1, 1]) with col1: st.markdown(f""" **Compound Details:** - **ChEMBL ID:** [{comp["chembl_id"]}]({comp.get("chembl_url", "#")}) - **Activity:** {comp["activity_value"]:.2f} {comp["activity_units"]} ({comp["activity_type"]}) - **Molecular Weight:** {comp.get("molecular_weight", "N/A")} Da - **Predicted Affinity:** {p["predicted_affinity"]:.2f} kcal/mol """) # Show structure img_url = f"https://www.ebi.ac.uk/chembl/api/data/image/{comp['chembl_id']}.svg" st.image(img_url, caption=comp["name"], width=200) with col2: st.markdown( f""" **Prediction Metrics:** Score: **{p["predicted_score"]}/100** Confidence: **{p["confidence_level"]}** ({p["confidence"]:.0%})
{p["recommendation"]}
""", unsafe_allow_html=True, ) with col3: st.markdown("**✅ Positive Factors:**") for reason in p["reasons"][:5]: st.caption(f"• {reason}") if p["warnings"]: st.markdown("**⚠️ Warnings:**") for warning in p["warnings"]: st.caption(f"• {warning}") # Action buttons col1, col2 = st.columns(2) with col1: if st.button( f"🚀 Dock This Compound", key=f"dock_known_{idx}" ): st.session_state.selected_ligand = comp st.session_state.selected_ligand_name = comp["name"] st.info( f"✅ Selected {comp['name']} - Go to 'Custom Docking' tab" ) with col2: if st.button( f"🔍 Find Similar", key=f"similar_known_{idx}" ): st.session_state.reference_smiles = comp.get("smiles") st.session_state.reference_name = comp.get("name") st.session_state.similar_auto_run = True st.session_state.similar_similarity = 0.7 st.rerun() # Show full ranking table st.markdown("---") st.subheader("📊 Complete Ranking") ranking_data = [] for idx, p in enumerate(known_preds, 1): ranking_data.append( { "Rank": idx, "Compound": p["compound"]["name"], "Predicted Score": f"{p['predicted_score']}/100", "Confidence": p["confidence_level"], "Predicted Affinity": f"{p['predicted_affinity']:.2f} kcal/mol", "Experimental Activity": f"{p['compound']['activity_value']:.2f} {p['compound']['activity_units']}", "Recommendation": p["recommendation"], } ) ranking_df = pd.DataFrame(ranking_data) st.dataframe(ranking_df, width="stretch", hide_index=True) # Download csv_ranking = ranking_df.to_csv(index=False) st.download_button( "📥 Download Prediction Results", csv_ranking, f"{st.session_state.current_uniprot_id}_binding_predictions.csv", "text/csv", key="download_predictions", ) else: st.warning("⚠️ No known ligands available for prediction") # Sub-tab 2: Similar Compounds with predictor_subtabs[1]: st.markdown("### Find Structurally Similar Compounds") st.info(""" Search for compounds chemically similar to a reference ligand. Similar structures often have similar biological activity. """) # Check if triggered from 'Find Similar' button auto_run = st.session_state.pop("similar_auto_run", False) preloaded_smiles = st.session_state.pop("reference_smiles", None) preloaded_name = st.session_state.pop("reference_name", None) default_similarity = st.session_state.pop("similar_similarity", 0.7) # Reference selection reference_source = st.radio( "Select reference compound:", ["From known ligands", "Enter SMILES manually"], key="similar_source", ) reference_smiles = None reference_name = None if reference_source == "From known ligands": if chembl_data.get("available") and chembl_data.get("ligands"): ligand_options = { f"{l['name']} ({l['chembl_id']})": l for l in chembl_data["ligands"][:20] } # Preselect if coming from 'Find Similar' preselect_idx = 0 if preloaded_name: for i, k in enumerate(ligand_options.keys()): if preloaded_name in k: preselect_idx = i break selected = st.selectbox( "Choose reference ligand:", list(ligand_options.keys()), index=preselect_idx, key="similar_ref_select", ) ref_lig = ligand_options[selected] reference_smiles = ref_lig.get("smiles") reference_name = ref_lig["name"] else: st.warning("No known ligands available") else: reference_smiles = st.text_input( "Enter SMILES:", value=preloaded_smiles if preloaded_smiles and reference_source == "Enter SMILES manually" else "", placeholder="e.g., CC(=O)Oc1ccccc1C(=O)O", key="similar_smiles_input", ) reference_name = preloaded_name or "Custom SMILES" # Similarity threshold similarity = st.slider( "Similarity threshold:", 0.5, 1.0, default_similarity if auto_run else 0.7, 0.05, help="Higher = more similar (0.7 = 70% similar)", key="similarity_threshold", ) # Auto-run if triggered from 'Find Similar' button if auto_run and reference_smiles and preloaded_smiles: with st.spinner( f"Searching for compounds ≥{similarity * 100:.0f}% similar to {reference_name}..." ): similar_data = cached_fetch_similar_compounds( reference_smiles, similarity, st.session_state.api_client ) if similar_data.get("available"): # Run predictions on similar compounds known_ligands = ( chembl_data.get("ligands", []) if chembl_data.get("available") else [] ) prediction = ProteinVisualizer.advanced_binding_prediction( known_ligands, uniprot_data, novel_compounds=similar_data.get("compounds", []), ) st.session_state.similar_prediction = prediction st.session_state.similar_data = similar_data if reference_smiles: if st.button("🔍 Find Similar Compounds", type="primary", key="find_similar"): with st.spinner( f"Searching for compounds ≥{similarity * 100:.0f}% similar to {reference_name}..." ): similar_data = cached_fetch_similar_compounds( reference_smiles, similarity, st.session_state.api_client ) if similar_data.get("available"): # Run predictions on similar compounds known_ligands = ( chembl_data.get("ligands", []) if chembl_data.get("available") else [] ) prediction = ProteinVisualizer.advanced_binding_prediction( known_ligands, uniprot_data, novel_compounds=similar_data.get("compounds", []), ) st.session_state.similar_prediction = prediction st.session_state.similar_data = similar_data st.rerun() # Display similar compounds if "similar_prediction" in st.session_state: pred = st.session_state.similar_prediction similar_preds = pred.get("novel_candidates", []) if similar_preds: st.success(f"✅ Found {len(similar_preds)} similar compounds") for idx, p in enumerate(similar_preds[:10], 1): comp = p["compound"] with st.expander(f"{idx}. {comp['name'][:50]}"): col1, col2 = st.columns([2, 1]) with col1: st.markdown(f""" **Compound:** {comp["name"]} **PubChem CID:** [{comp["cid"]}]({comp["pubchem_url"]}) **Formula:** {comp.get("formula", "N/A")} **MW:** {comp.get("molecular_weight", "N/A")} Da **SMILES:** `{comp.get("smiles", "N/A")[:50]}...` **Predicted Affinity:** {p["predicted_affinity"]:.2f} kcal/mol """) # Show reasons st.markdown("**Why this compound:**") for reason in p["reasons"][:3]: st.caption(f"• {reason}") with col2: # PubChem image img_url = f"https://pubchem.ncbi.nlm.nih.gov/image/imgsrv.fcgi?cid={comp['cid']}&t=l" st.image(img_url, caption=f"CID {comp['cid']}", width=150) if st.button(f"🚀 Dock", key=f"dock_similar_{idx}"): st.session_state.selected_ligand = comp st.session_state.selected_ligand_name = comp["name"] st.info("Go to 'Custom Docking' tab") else: st.info( "No similar compounds found. Try lowering the similarity threshold." ) # Sub-tab 3: Comprehensive Report with predictor_subtabs[2]: st.markdown("### 📋 Comprehensive Binding Analysis Report") if "binding_prediction" in st.session_state: pred = st.session_state.binding_prediction # Summary metrics col1, col2, col3 = st.columns(3) with col1: st.metric("Known Ligands", len(pred.get("known_ligands", []))) with col2: similar_count = len( st.session_state.get("similar_prediction", {}).get( "novel_candidates", [] ) ) st.metric("Similar Compounds", similar_count) with col3: total = len(pred.get("known_ligands", [])) + similar_count st.metric("Total Analyzed", total) st.markdown("---") # Binding rules extracted st.subheader("🧬 Extracted Binding Rules (SAR)") rules = pred.get("binding_rules", {}) col1, col2 = st.columns(2) with col1: if rules.get("optimal_mw_range"): st.info(f""" **Optimal Molecular Weight:** {rules["optimal_mw_range"][0]:.0f} - {rules["optimal_mw_range"][1]:.0f} Da *(Based on active compounds)* """) with col2: if rules.get("activity_threshold"): thresh = rules["activity_threshold"] st.info(f""" **Activity Thresholds:** Potent: <{thresh.get("potent", 0):.1f} nM Moderate: <{thresh.get("moderate", 0):.1f} nM Weak: >{thresh.get("moderate", 0):.1f} nM """) # Recommendations st.markdown("---") st.subheader("💡 Actionable Recommendations") recommendations = pred.get("recommendations", []) if recommendations: for rec in recommendations: priority_color = {"High": "🔴", "Medium": "🟡", "Low": "🟢"}.get( rec.get("priority", "Medium"), "⚪" ) st.markdown(f""" {priority_color} **{rec["type"]}:** {rec["compound"]} *Action:* {rec["action"]} *Priority:* {rec["priority"]} """) # Download full report st.markdown("---") # Generate comprehensive report report_text = generate_full_report(pred, uniprot_data) st.download_button( "📥 Download Full Report (TXT)", report_text, f"{st.session_state.current_uniprot_id}_binding_report.txt", "text/plain", key="download_full_report", ) else: st.info("Run predictions in other tabs to generate comprehensive report") # Tab 3: Custom Docking with docking_tabs[3]: st.subheader("Run Custom Molecular Docking") # Check if protein structure is available protein_prep = st.session_state.api_client.prepare_protein_for_docking( uniprot_data, data.get("pdb_structure", {}), data.get("alphafold_structure", {}) ) if not protein_prep.get("available"): st.error( "❌ No protein structure available for docking. Please ensure 3D structure is loaded." ) else: st.success( f"✅ Using {protein_prep['structure_type']} structure: {protein_prep['structure_id']}" ) # Ligand input options st.markdown("**Select Ligand Source:**") ligand_source = st.radio( "Choose ligand source", [ "Use predicted best binder", "Known ligand from ChEMBL", "Custom compound (PubChem)", "Upload SMILES/SDF", ], horizontal=False, key="ligand_source_radio", label_visibility="collapsed", ) selected_ligand = None ligand_name = None if ligand_source == "Use predicted best binder": # Offer any previously selected ligand from other tabs prev_candidates = [] if "selected_ligand" in st.session_state: prev_candidates.append( ( "Binding Predictor", st.session_state.selected_ligand, st.session_state.get("selected_ligand_name"), ) ) if "selected_ligand_for_docking" in st.session_state: prev_candidates.append( ( "Known Ligand", st.session_state.selected_ligand_for_docking, st.session_state.selected_ligand_for_docking.get("name"), ) ) if "docked_ligand_data" in st.session_state: prev_candidates.append( ( "Last Docked", st.session_state.docked_ligand_data, st.session_state.get("docked_ligand_name"), ) ) if prev_candidates: options = [f"{src}: {name}" for src, _, name in prev_candidates] sel = st.selectbox( "Use previously selected ligand:", ["(none)"] + options, key="use_prev_selected_ligand_select", ) if sel and sel != "(none)": idx = options.index(sel) selected_ligand = prev_candidates[idx][1] ligand_name = prev_candidates[idx][2] st.info(f"✅ Using: **{ligand_name}**") else: # Fallback to binding predictor best binder if "binding_prediction" in st.session_state: pred = st.session_state.binding_prediction if pred.get("available"): selected_ligand = pred["best_ligand"] ligand_name = selected_ligand["name"] st.info(f"✅ Using predicted best binder: **{ligand_name}**") else: st.warning("⚠️ No prediction available. Run predictor first.") else: st.warning("⚠️ Please run the Binding Predictor first (previous tab)") elif ligand_source == "Known ligand from ChEMBL": if chembl_data.get("available") and chembl_data.get("ligands"): ligand_options = { f"{l['name']} ({l['chembl_id']}) - {l['activity_type']}: {l['activity_value']:.1f} {l['activity_units']}": l for l in chembl_data["ligands"][:10] } selected_option = st.selectbox( "Choose ligand:", list(ligand_options.keys()), key="chembl_select" ) selected_ligand = ligand_options[selected_option] ligand_name = selected_ligand["name"] else: st.warning("No ChEMBL ligands available") elif ligand_source == "Custom compound (PubChem)": compound_name = st.text_input( "Enter compound name:", placeholder="e.g., Aspirin, Ibuprofen, Caffeine", key="pubchem_input", ) if compound_name and st.button("🔍 Search PubChem", key="pubchem_search"): with st.spinner("Searching PubChem..."): pubchem_data = cached_fetch_pubchem_structure( compound_name, st.session_state.api_client ) if pubchem_data.get("available"): st.success( f"✅ Found: {compound_name} (CID: {pubchem_data['cid']})" ) st.image(pubchem_data["image_url"], width=200) st.session_state.custom_ligand = pubchem_data selected_ligand = pubchem_data ligand_name = compound_name else: st.error(f"❌ Compound '{compound_name}' not found in PubChem") if "custom_ligand" in st.session_state: selected_ligand = st.session_state.custom_ligand ligand_name = compound_name else: # Upload SMILES/SDF smiles_input = st.text_input( "Enter SMILES string:", placeholder="e.g., CC(=O)Oc1ccccc1C(=O)O (Aspirin)", key="smiles_input", ) if smiles_input: ligand_name = "Custom_SMILES" selected_ligand = {"smiles": smiles_input, "name": ligand_name} st.markdown("---") # Docking parameters st.markdown("**Docking Parameters:**") col1, col2, col3 = st.columns(3) with col1: exhaustiveness = st.slider( "Exhaustiveness", 1, 16, 8, help="Higher = more thorough but slower", key="exhaustiveness_slider", ) with col2: num_modes = st.slider( "Number of modes", 1, 20, 9, help="Number of binding poses to generate", key="num_modes_slider", ) with col3: energy_range = st.slider( "Energy range (kcal/mol)", 1, 5, 3, key="energy_range_slider" ) site_config = _render_docking_site_controls("custom_docking") # Run docking button if selected_ligand: run_docking = st.button( "🚀 Run Molecular Docking", type="primary", key="run_docking_btn" ) if run_docking: spinner_message = ( "🧬 Submitting docking job to docking worker..." if _get_docking_mode_value() == "real" else "🧬 Running AutoDock Vina simulation... Calculating 3D orientation..." ) with st.spinner(spinner_message): docking_results = _run_shared_docking( protein_prep=protein_prep, selected_ligand=selected_ligand, ligand_name=ligand_name, protein_length=protein_prep["sequence_length"], exhaustiveness=exhaustiveness, num_modes=num_modes, energy_range=energy_range, **site_config, ) st.session_state.docking_results = docking_results st.session_state.docked_ligand_name = ligand_name st.session_state.docked_ligand_data = selected_ligand st.session_state.protein_structure = protein_prep st.rerun() else: st.info("👆 Please select or enter a ligand above") # Tab 2: Ligand Binding Prediction - Using Advanced Docking Interface with docking_tabs[2]: st.subheader("🔮 Ligand Binding Prediction & Docking") st.markdown(""" **Advanced ligand binding analysis:** - Predict binding affinity for any ligand SMILES - Run molecular docking simulations - View 3D protein-ligand complexes - Generate binding predictions with confidence scores """) # Ligand input: Single SMILES or compound search st.markdown("#### Ligand Input") input_method = st.radio( "Select input method:", ["Enter SMILES", "Search PubChem", "Previous ligands"], horizontal=True, key="ligand_binding_input_method", ) selected_ligand = None ligand_name = None if input_method == "Enter SMILES": smiles_input = st.text_input( "SMILES String:", placeholder="e.g., CC(=O)Oc1ccccc1C(=O)O", key="ligand_binding_smiles", ) if smiles_input: ligand_name = "Custom_SMILES" selected_ligand = { "smiles": smiles_input, "name": ligand_name, "molecular_weight": 200, } elif input_method == "Search PubChem": compound_name = st.text_input( "Compound Name:", placeholder="e.g., Aspirin", key="ligand_binding_compound" ) if compound_name and st.button("🔍 Search", key="ligand_binding_search"): pubchem_data = cached_fetch_pubchem_structure( compound_name, st.session_state.api_client ) if pubchem_data.get("available"): st.success(f"✅ Found: {compound_name}") st.image(pubchem_data["image_url"], width=200) st.session_state.ligand_binding_compound_data = pubchem_data selected_ligand = pubchem_data ligand_name = compound_name elif input_method == "Previous ligands": if "docked_ligand_data" in st.session_state: prev_ligand = st.session_state.docked_ligand_data st.info(f"Using: {prev_ligand.get('name', 'Unknown')}") selected_ligand = prev_ligand ligand_name = prev_ligand.get("name", "Unknown") else: st.info("No previously docked ligands available") # If we have a ligand from session state if ( "ligand_binding_compound_data" in st.session_state and input_method == "Search PubChem" ): selected_ligand = st.session_state.ligand_binding_compound_data ligand_name = st.session_state.get("ligand_binding_compound_name", "Unknown") if selected_ligand: quick_smiles = str(selected_ligand.get("smiles", "") or "").strip() quick_name = ligand_name or selected_ligand.get("name", "Unknown") st.markdown("#### Quick Affinity Estimate (SMILES -> pKd)") if not quick_smiles: st.info("No SMILES available for quick affinity estimate.") else: if st.button("⚡ Estimate Affinity", key="ligand_binding_quick_estimate"): with st.spinner("Running fast binding-affinity prediction..."): quick_prediction = cached_predict_ligand_binding( (quick_smiles,), (quick_name,), st.session_state.api_client, ) quick_prediction["_input_smiles"] = quick_smiles st.session_state.ligand_binding_quick_prediction = quick_prediction quick_prediction = st.session_state.get("ligand_binding_quick_prediction") if ( quick_prediction and quick_prediction.get("_input_smiles") == quick_smiles and quick_prediction.get("available") and quick_prediction.get("predictions") ): quick_item = quick_prediction["predictions"][0] if quick_item.get("is_valid"): quick_result = quick_item.get("prediction", {}) col1, col2, col3 = st.columns(3) with col1: st.metric( "Predicted pAffinity", f"{quick_result.get('binding_affinity', 0.0):.2f}", ) with col2: st.metric( "Binding Probability", f"{quick_result.get('binding_probability', 0.0):.2%}", ) with col3: st.metric("Method", quick_result.get("prediction_method", "N/A")) model_metadata = quick_result.get( "model_metadata" ) or quick_prediction.get("model_metadata") if model_metadata: model_id = model_metadata.get("model_id", "local") source = model_metadata.get("source", "unknown") st.caption(f"Model source: {source} | model_id: {model_id}") else: st.warning( quick_item.get("error") or "Invalid SMILES for quick estimate." ) elif quick_prediction and not quick_prediction.get("available"): st.warning(quick_prediction.get("error", "Quick estimate unavailable.")) st.divider() # Docking parameters st.markdown("#### Docking Configuration") col1, col2, col3 = st.columns(3) with col1: exhaustiveness = st.slider( "Exhaustiveness", 1, 16, 8, key="ligand_binding_exhaustiveness" ) with col2: num_modes = st.slider("Binding Modes", 1, 20, 9, key="ligand_binding_modes") with col3: energy_range = st.slider("Energy Range", 1, 5, 3, key="ligand_binding_energy") site_config = _render_docking_site_controls("ligand_binding") st.divider() # Run docking if st.button( "🚀 Predict & Dock", type="primary", width="stretch", key="ligand_binding_dock" ): spinner_message = ( "Submitting docking job to docking worker..." if _get_docking_mode_value() == "real" else "Running binding prediction and docking..." ) with st.spinner(spinner_message): docking_results = _run_shared_docking( protein_prep=protein_prep, selected_ligand=selected_ligand, ligand_name=ligand_name, protein_length=uniprot_data.get("sequence_length", 500), exhaustiveness=exhaustiveness, num_modes=num_modes, energy_range=energy_range, **site_config, ) st.session_state.ligand_binding_results = docking_results st.session_state.ligand_binding_ligand_name = ligand_name st.session_state.ligand_binding_ligand_data = selected_ligand # Get protein structure protein_struct = data.get("alphafold_structure", {}) if not protein_struct.get("available"): protein_struct = data.get("pdb_structure", {}) st.session_state.ligand_binding_protein = protein_struct st.rerun() # Display results if "ligand_binding_results" in st.session_state: results = _refresh_real_docking_result_if_needed( st.session_state.ligand_binding_results ) st.session_state.ligand_binding_results = results if results.get("available"): _render_docking_pocket_summary(results, "ligand_binding") best_affinity = _validated_docking_affinity(results) strength, color = _docking_strength(best_affinity) affinity_text = f"{best_affinity:.3f} kcal/mol" if best_affinity is not None else "N/A" if results.get("simulated") and results.get("fallback_reason"): st.warning(f"Simulation fallback reason: {results.get('fallback_reason')}") if results.get("status") in {"queued", "running"} and not results.get("simulated"): job_id = results.get('job_id') job_status = results.get('status') st.info( f"Real docking job {job_id} is {job_status}. The page will auto-refresh while the job is pending." ) if results.get("fallback_reason"): st.warning(results.get("fallback_reason")) try: refresh_secs = 5 st.markdown( "", unsafe_allow_html=True, ) except Exception: pass elif results.get("status") == "failed" and not results.get("simulated"): failure_reason = ( results.get("error_message") or results.get("fallback_reason") or "Real docking failed." ) st.error(f"Real docking failed: {failure_reason}") st.divider() st.subheader("📊 Binding Prediction Results") col1, col2, col3 = st.columns(3) with col1: st.markdown( f"""

{affinity_text}

Binding Affinity

""", unsafe_allow_html=True, ) with col2: st.metric("Strength", strength) with col3: st.metric("Modes", len(results.get("modes", []))) st.markdown("---") # 3D Visualization st.subheader("🔬 3D Complex") protein_structure = st.session_state.get("ligand_binding_protein", {}) ligand_data = st.session_state.get("ligand_binding_ligand_data", {}) if results.get("has_coordinates"): col1, col2 = st.columns([2, 1]) with col1: viewer_html = ProteinVisualizer.create_docking_3d_viewer( protein_structure, ligand_data, results, st.session_state.get("ligand_binding_ligand_name", "Unknown"), ) st.components.v1.html(viewer_html, height=650) with col2: pose_source = results.get("pose_source", "unavailable") st.markdown(f"**Pose source:** {pose_source}") st.markdown("---") # Results chart fig = ProteinVisualizer.create_docking_results_chart(results) st.plotly_chart(fig, width="stretch") st.subheader("Binding Mode Details") _render_docking_mode_details(results) # Download results st.markdown("---") results_csv = pd.DataFrame( [ { "Ligand": st.session_state.get( "ligand_binding_ligand_name", "Unknown" ), "Affinity_kcal_mol": best_affinity, "Modes": len(results.get("modes", [])), "Timestamp": datetime.now().isoformat(), } ] ).to_csv(index=False) st.download_button( "📥 Download Results", results_csv, f"ligand_binding_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", "text/csv", key="ligand_binding_download", ) else: st.info("👆 Select or enter a ligand above to begin") # Tab 4: Docking Results with docking_tabs[4]: st.subheader("Docking Results") if "docking_results" in st.session_state: # Refresh job status and update session state results = _refresh_real_docking_result_if_needed(st.session_state.docking_results) st.session_state.docking_results = results if results.get("available"): _render_docking_pocket_summary(results, "custom_docking") ligand_name = st.session_state.get("docked_ligand_name", "Unknown") ligand_data = st.session_state.get("docked_ligand_data", {}) protein_structure = st.session_state.get("protein_structure", {}) if results.get("simulated"): simulated_reason = ( results.get("fallback_reason") or "Production version would use actual AutoDock Vina calculations." ) st.warning( f"⚠️ **Note:** These are simulated results for demonstration. {simulated_reason}" ) if results.get("error_message"): st.info(f"Simulation fallback details: {results.get('error_message')}") st.markdown(f"### Results for: **{ligand_name}**") # Best binding affinity best_affinity = _validated_docking_affinity(results) strength, color = _docking_strength(best_affinity) affinity_text = f"{best_affinity:.3f} kcal/mol" if best_affinity is not None else "N/A" best_mode = results.get("best_mode", {}) # Status display with auto-refresh only while pending if results.get("status") in {"queued", "running"} and not results.get("simulated"): job_id = results.get('job_id') job_status = results.get('status') col_status, col_refresh = st.columns([3, 1]) with col_status: st.info( f"⏳ Real docking job {job_id} is {job_status}.\n\n" f"Refreshing automatically every 5 seconds..." ) if results.get("fallback_reason"): st.warning(results.get("fallback_reason")) with col_refresh: if st.button("🔄 Refresh Now", key="manual_refresh_docking"): st.rerun() # Auto-refresh only if still pending (don't reload if completed) try: refresh_secs = 5 st.markdown( "", unsafe_allow_html=True, ) except Exception: pass # Show placeholder for affinity while loading st.markdown("**Status:** Waiting for worker to complete docking calculations...") elif results.get("status") == "completed" and not results.get("simulated"): # Show success message with updated results st.success(f"✅ Real docking completed! Job {results.get('job_id')} result loaded.") elif results.get("status") == "failed" and not results.get("simulated"): failure_reason = ( results.get("error_message") or results.get("fallback_reason") or "Real docking failed." ) st.error(f"❌ Real docking failed: {failure_reason}") elif results.get("fallback_reason") and not results.get("simulated"): st.info(f"Docking details: {results.get('fallback_reason')}") col1, col2, col3 = st.columns(3) with col1: st.markdown( f"""

{affinity_text}

Best Binding Affinity

""", unsafe_allow_html=True, ) with col2: st.metric("Binding Strength", strength) with col3: st.metric("Binding Modes", len(results.get("modes", []))) st.markdown("---") # NEW: 3D Visualization st.subheader("🔬 3D Protein-Ligand Complex") # Visualization mode toggle col_viz_toggle, col_info = st.columns([1, 3]) with col_viz_toggle: viz_mode = st.radio( "View Mode:", options=["Cartoon (Ribbon)", "All-Atom (Ball-and-Stick)"], index=0, horizontal=False, key="docking_viz_mode", ) if results.get("has_coordinates"): # Show binding site coordinates col1, col2 = st.columns([2, 1]) with col1: viewer_html = ProteinVisualizer.create_docking_3d_viewer( protein_structure, ligand_data, results, ligand_name, view_mode=viz_mode ) st.components.v1.html(viewer_html, height=650) with col2: st.markdown("**Best Binding Mode:**") st.markdown(f""" **Position (Å):** - X: {best_mode.get("center", {}).get("x", 0):.2f} - Y: {best_mode.get("center", {}).get("y", 0):.2f} - Z: {best_mode.get("center", {}).get("z", 0):.2f} **Orientation:** - {best_mode.get("orientation", "N/A")} **RMSD:** - Lower bound vs best generated pose: {f'{best_mode.get("rmsd_lb"):.2f} Å' if best_mode.get("rmsd_lb") is not None else 'N/A'} - Upper bound vs best generated pose: {f'{best_mode.get("rmsd_ub"):.2f} Å' if best_mode.get("rmsd_ub") is not None else 'N/A'} """) if viz_mode == "Cartoon (Ribbon)": st.info(""" **View: Ribbon Mode** Clean academic view showing protein backbone as smooth ribbon structure. """) else: st.info(""" **View: All-Atom Mode** Detailed atomic structure with all atoms shown as balls and sticks. """) st.markdown("---") # Binding modes chart st.subheader("📊 All Binding Modes") fig_docking = ProteinVisualizer.create_docking_results_chart(results) st.plotly_chart(fig_docking, width="stretch") # Detailed modes table with coordinates st.subheader("📋 Binding Mode Details") modes_df = pd.DataFrame(results.get("modes", [])) _render_docking_mode_details(results) # Interpretation st.subheader("💡 Interpretation") if best_affinity is None: st.info("Binding strength cannot be interpreted until a validated Vina score is available.") elif best_affinity < -7: st.success(""" **Strong Binding** (< -7 kcal/mol) - Indicates favorable protein-ligand interaction - This compound shows drug-like binding affinity - Worth further experimental validation - Predicted binding orientation suggests stable complex """) elif best_affinity < -5: st.info(""" **Moderate Binding** (-5 to -7 kcal/mol) - Shows some binding potential - May require optimization for better affinity - Consider structural modifications - Multiple binding orientations possible """) else: st.warning(""" **Weak Binding** (> -5 kcal/mol) - Limited binding affinity - Unlikely to be effective inhibitor - Significant optimization needed - Consider alternative scaffolds """) # Download results col1, col2 = st.columns(2) with col1: csv_modes = modes_df.to_csv(index=False) st.download_button( "📥 Download Docking Results", csv_modes, f"docking_{st.session_state.current_uniprot_id}_{ligand_name}.csv", "text/csv", key="download_docking_results", ) with col2: # Coordinates for best mode coords_text = f"""Best Binding Mode Coordinates Ligand: {ligand_name} Protein: {st.session_state.current_uniprot_id} Affinity: {best_affinity} kcal/mol Position (Å): X: {best_mode.get("center", {}).get("x", 0):.3f} Y: {best_mode.get("center", {}).get("y", 0):.3f} Z: {best_mode.get("center", {}).get("z", 0):.3f} Orientation: {best_mode.get("orientation", "N/A")} """ st.download_button( "📥 Download 3D Coordinates", coords_text, f"coordinates_{ligand_name}.txt", "text/plain", key="download_coordinates", ) # Clear results if st.button("🔄 Run New Docking", key="docking_new_run"): del st.session_state.docking_results del st.session_state.docked_ligand_name del st.session_state.docked_ligand_data if "custom_ligand" in st.session_state: del st.session_state.custom_ligand if "binding_prediction" in st.session_state: del st.session_state.binding_prediction st.rerun() else: st.info( "👈 Run a docking simulation in the 'Custom Docking' tab to see results here" ) st.divider() # Section 9: Summary Table st.header("📊 Data Summary") summary_df = ProteinAPIClient.DataProcessor.create_summary_table( uniprot_data, tissue_df, subcellular_df, data.get("alphafold_structure"), data.get("pdb_structure"), data.get("kegg_pathways"), data.get("chembl_ligands"), ) st.dataframe(summary_df, width="stretch", hide_index=True) # Download options st.subheader("💾 Export Data") col1, col2, col3 = st.columns(3) with col1: if not tissue_df.empty: csv_tissue = tissue_df.to_csv(index=False) st.download_button( "📥 Download Tissue Data", csv_tissue, f"{st.session_state.current_uniprot_id}_tissue_expression.csv", "text/csv", ) with col2: if not subcellular_df.empty: csv_subcellular = subcellular_df.to_csv(index=False) st.download_button( "📥 Download Subcellular Data", csv_subcellular, f"{st.session_state.current_uniprot_id}_subcellular.csv", "text/csv", ) with col3: csv_summary = summary_df.to_csv(index=False) st.download_button( "📥 Download Summary", csv_summary, f"{st.session_state.current_uniprot_id}_summary.csv", "text/csv", ) # Footer st.divider() st.caption( f"⏱️ Data fetched in {st.session_state.get('fetch_time', 0):.2f}s | 💾 Cached for 24 hours | 🔬 Data sources: UniProt, Human Protein Atlas" ) # Section: Protein Literature Summary with st.expander("🔬 Literature & Overview", expanded=False): literature = data.get("literature", {}) # Wikipedia intro if literature.get("wiki_title"): st.info( f"**Wikipedia**: [{literature['wiki_title']}](https://en.wikipedia.org/wiki/{literature['wiki_title'].replace(' ', '_')})" ) st.caption(literature.get("wiki_snippet", "")) # Top papers if literature.get("papers"): st.subheader("Top 5 Research Papers") for i, p in enumerate(literature["papers"], 1): with st.container(): st.markdown(f"**{p['title']}**") st.caption( f"{p['authors']} | [PMID: {p['pmid']}](https://pubmed.ncbi.nlm.nih.gov/{p['pmid']})" ) st.caption(p["abstract_snip"]) st.divider() else: st.warning("No recent papers found; try official gene name.") st.divider() st.header("🧠 OmniBiMol AI Research Copilot") st.caption( "Evidence-grounded protein synthesis and translational hypothesis support (research use only)." ) with st.expander("Copilot Operating Contract", expanded=False): st.code(OMNIBIMOL_RESEARCH_COPILOT_SYSTEM_PROMPT, language="markdown") default_query = "Why is this protein ligandable?" copilot_query = st.text_area( "Ask OmniBiMol Copilot", value=st.session_state.get("omnibimol_copilot_query", default_query), height=120, key="omnibimol_copilot_query", help='Examples: "Why is this protein ligandable?", "hypothesis cards", "experimental next steps", "risk flags".', ) if st.button("Generate Copilot Analysis", key="run_omnibimol_copilot", type="primary"): with st.spinner("Synthesizing evidence-grounded copilot response..."): context_payload = _build_omnibimol_context_payload(data, uniprot_data) copilot_output = _generate_omnibimol_copilot_response( copilot_query, context_payload ) st.session_state.omnibimol_copilot_output = copilot_output if st.session_state.get("omnibimol_copilot_output"): st.markdown(st.session_state.omnibimol_copilot_output) st.download_button( "📥 Download Copilot Analysis", st.session_state.omnibimol_copilot_output, f"{st.session_state.current_uniprot_id}_omnibimol_copilot_analysis.md", "text/markdown", key="download_omnibimol_copilot_output", ) render_footer() # ============================================================================= # SEQUENCE ANALYSIS PAGE FUNCTIONS # ============================================================================= def render_sequence_analysis_page() -> None: """Render the main sequence analysis page""" st.header("🧬 Sequence Analysis Suite") st.markdown(""" Comprehensive computational analysis of biological sequences (DNA, RNA, or protein). Upload FASTA files to perform multiple sequence alignment, phylogenetic analysis, domain identification, motif finding, and conservation scoring. """) # Initialize analysis suite if "sequence_analyzer" not in st.session_state: st.session_state.sequence_analyzer = SequenceAnalysisSuite() analyzer = st.session_state.sequence_analyzer # File upload section st.subheader("📤 Upload Sequences") uploaded_file = st.file_uploader( "Upload FASTA file", type=["fasta", "fa", "fas", "txt"], help="Upload a FASTA file containing one or more sequences", ) # Alternative: text input st.markdown("**OR** paste FASTA content directly:") fasta_text = st.text_area( "FASTA Content", height=200, help="Paste FASTA formatted sequences here" ) # Get FASTA content fasta_content = None if uploaded_file is not None: fasta_content = uploaded_file.read().decode("utf-8") st.success(f"✅ File uploaded: {uploaded_file.name}") elif fasta_text.strip(): fasta_content = fasta_text # Analysis options (Sequence Analysis Suite) if fasta_content: st.subheader("⚙️ Analysis Options") col1, col2, col3 = st.columns(3) with col1: run_alignment = st.checkbox("Multiple Sequence Alignment", value=True) run_conservation = st.checkbox("Conservation Scoring", value=True) with col2: run_phylogeny = st.checkbox("Phylogenetic Tree", value=True) run_domains = st.checkbox("Domain Identification", value=True) with col3: run_motifs = st.checkbox("Motif Finding", value=True) # Run analysis button if st.button("🚀 Run Analysis", type="primary", width="stretch"): with st.spinner("Running sequence analysis..."): try: results = analyzer.analyze( fasta_content, run_alignment=run_alignment, run_phylogeny=run_phylogeny, run_domains=run_domains, run_motifs=run_motifs, run_conservation=run_conservation, ) st.session_state.sequence_analysis_results = results if results.get("errors"): st.warning( "⚠️ Analysis completed with some errors. Check the results section for details." ) elif results.get("warnings"): st.warning( "Analysis completed with workload adaptations. Check the warnings below." ) else: st.success("✅ Analysis completed successfully!") except ExternalServiceError as e: logger.warning( f"External service unavailable during sequence analysis: {e.internal_message}", extra=create_log_context( "sequence_analysis", file_name=uploaded_file.name if uploaded_file else "text_input", **e.log_details, ), ) st.error(f"⚠️ {e.user_message}") except AnalysisError as e: logger.info( f"Sequence analysis error: {e.internal_message}", extra=create_log_context( "sequence_analysis", file_name=uploaded_file.name if uploaded_file else "text_input", **e.log_details, ), ) st.error(f"❌ {e.user_message}") except Exception as e: logger.exception( "Unexpected error during sequence analysis", extra=create_log_context( "sequence_analysis", file_name=uploaded_file.name if uploaded_file else "text_input", error_type=type(e).__name__, ), ) st.error( "❌ Analysis failed. Please check your input and try again, or contact support." ) # Display results if "sequence_analysis_results" in st.session_state: results = st.session_state.sequence_analysis_results display_analysis_results(results, analyzer) # ------------------------------------------------------------------ # Protein Predictor section (always visible, separate FASTA input) # ------------------------------------------------------------------ st.divider() st.subheader("🧪 Protein Predictor") st.markdown( "Predict protein annotations and structure from amino acid FASTA " "and explore docking using the existing simulation pipeline." ) st.markdown("#### Protein FASTA Input") protein_uploaded_file = st.file_uploader( "Upload protein FASTA file", type=["fasta", "fa", "fas", "txt"], help="Upload a FASTA file containing one or more protein sequences", key="protein_predictor_file_uploader", ) st.markdown("**OR** paste protein FASTA content directly:") protein_fasta_text = st.text_area( "Protein FASTA Content", height=180, help="Paste amino acid FASTA formatted sequences here", key="protein_predictor_fasta_text", ) protein_fasta_content = None if protein_uploaded_file is not None: protein_fasta_content = protein_uploaded_file.read().decode("utf-8") st.success(f"✅ Protein FASTA file uploaded: {protein_uploaded_file.name}") elif protein_fasta_text.strip(): protein_fasta_content = protein_fasta_text _render_protein_predictor(protein_fasta_content or "") # Example FASTA with st.expander("📝 Example FASTA Format"): st.code( """>sequence1 ATGCGATCGATCGATCGATCG >sequence2 ATGCGATCGATCGATCGATCG >sequence3 ATGCGATCGATCGATCGATCG """, language="text", ) def _render_protein_predictor(protein_fasta_content: str) -> None: """ Render protein predictor with molecular docking capability. Uses FASTA input as the protein source for docking. """ if not protein_fasta_content or not protein_fasta_content.strip(): st.info("📝 Upload or paste a protein FASTA sequence above to proceed") return # Parse FASTA try: fasta_parser = FASTAParser() sequences = fasta_parser.parse_fasta_string(protein_fasta_content) if not sequences: st.error("❌ Invalid FASTA format. Please check your input.") return st.success(f"✅ Parsed {len(sequences)} sequence(s) from FASTA") # Use the first sequence for analysis seq_record = sequences[0] protein_sequence = seq_record["sequence"] protein_name = seq_record.get("id", "Predicted Protein") st.markdown(f"**Protein:** {protein_name} ({len(protein_sequence)} aa)") except DataValidationError as e: logger.info( f"FASTA validation error: {e.internal_message}", extra=create_log_context("fasta_parsing", **e.log_details), ) st.error(f"⚠️ {e.user_message}") return except Exception as e: logger.exception( "Unexpected error parsing FASTA", extra=create_log_context("fasta_parsing", error_type=type(e).__name__), ) st.error( "❌ Could not parse FASTA file. Please check the format and try again, or contact support." ) return # ------------------------------------------------------------------ # MOLECULAR DOCKING SECTION (replica of Protein Analysis tab) # ------------------------------------------------------------------ st.divider() st.subheader("💊 Molecular Docking Analysis") st.markdown("Predict ligand-protein binding using structure derived from your FASTA sequence.") st.info(""" **About Molecular Docking:** - Predicts how small molecules (ligands/drugs) bind to proteins - Uses AutoDock Vina algorithm for binding affinity calculation - Negative values indicate favorable binding (more negative = stronger binding) - Typical drug-like binding: -7 to -12 kcal/mol - 3D visualization of ligand orientation and binding prediction """) # Simulate protein structure preparation from FASTA protein_prep = { "available": True, "structure_type": "Predicted (from FASTA)", "structure_id": protein_name, "sequence_length": len(protein_sequence), "pdb_text": _generate_mock_pdb_from_sequence(protein_sequence, protein_name), "pdb_url": "", } if not protein_prep.get("available"): st.error("❌ Unable to prepare protein structure for docking.") return st.success( f"✅ Protein prepared: {protein_prep['structure_type']} - {protein_prep['sequence_length']} residues" ) # Docking interface st.markdown("#### Ligand Input & Docking Parameters") docking_col1, docking_col2 = st.columns(2) with docking_col1: st.markdown("**Select Ligand Source:**") ligand_source = st.radio( "Choose ligand source", ["Enter SMILES manually", "Custom compound (PubChem)", "Upload SMILES/SDF"], horizontal=False, key="seq_analysis_ligand_source", label_visibility="collapsed", ) selected_ligand = None ligand_name = None if ligand_source == "Enter SMILES manually": smiles_input = st.text_input( "Enter SMILES string:", placeholder="e.g., CC(=O)Oc1ccccc1C(=O)O (Aspirin)", key="seq_analysis_smiles_input", ) if smiles_input: ligand_name = "Custom_SMILES" selected_ligand = { "smiles": smiles_input, "name": ligand_name, "molecular_weight": 200, } elif ligand_source == "Custom compound (PubChem)": compound_name = st.text_input( "Enter compound name:", placeholder="e.g., Aspirin, Ibuprofen, Caffeine", key="seq_analysis_pubchem_input", ) if compound_name and st.button("🔍 Search PubChem", key="seq_analysis_pubchem_search"): with st.spinner("Searching PubChem..."): pubchem_data = cached_fetch_pubchem_structure( compound_name, st.session_state.api_client ) if pubchem_data.get("available"): st.success(f"✅ Found: {compound_name} (CID: {pubchem_data['cid']})") st.image(pubchem_data["image_url"], width=200) st.session_state.seq_analysis_custom_ligand = pubchem_data selected_ligand = pubchem_data ligand_name = compound_name else: st.error(f"❌ Compound '{compound_name}' not found in PubChem") if "seq_analysis_custom_ligand" in st.session_state: selected_ligand = st.session_state.seq_analysis_custom_ligand ligand_name = compound_name else: # Upload SMILES/SDF st.info("Upload SMILES or SDF file support would be added here") with docking_col2: st.markdown("**Docking Parameters:**") exhaustiveness = st.slider( "Exhaustiveness", 1, 16, 8, help="Higher = more thorough but slower", key="seq_analysis_exhaustiveness", ) num_modes = st.slider( "Number of modes", 1, 20, 9, help="Number of binding poses to generate", key="seq_analysis_num_modes", ) energy_range = st.slider( "Energy range (kcal/mol)", 1, 5, 3, key="seq_analysis_energy_range" ) site_config = _render_docking_site_controls("seq_analysis") st.markdown("---") # Run docking button if selected_ligand: if st.button( "🚀 Run Molecular Docking", type="primary", key="seq_analysis_run_docking", width="stretch", ): spinner_message = ( "🧬 Submitting docking job to docking worker..." if _get_docking_mode_value() == "real" else "🧬 Running AutoDock Vina simulation... Calculating 3D orientation..." ) with st.spinner(spinner_message): docking_results = _run_shared_docking( protein_prep=protein_prep, selected_ligand=selected_ligand, ligand_name=ligand_name, protein_length=protein_prep["sequence_length"], exhaustiveness=exhaustiveness, num_modes=num_modes, energy_range=energy_range, **site_config, ) # Store results in session state for display st.session_state.seq_analysis_docking_results = docking_results st.session_state.seq_analysis_docked_ligand_name = ligand_name st.session_state.seq_analysis_docked_ligand_data = selected_ligand st.session_state.seq_analysis_protein_structure = protein_prep st.rerun() else: st.info("👆 Please select or enter a ligand above to proceed with docking") # Display docking results (if available) if "seq_analysis_docking_results" in st.session_state: results = _refresh_real_docking_result_if_needed( st.session_state.seq_analysis_docking_results ) st.session_state.seq_analysis_docking_results = results if results.get("available"): _render_docking_pocket_summary(results, "seq_analysis") ligand_name_display = st.session_state.get("seq_analysis_docked_ligand_name", "Unknown") ligand_data = st.session_state.get("seq_analysis_docked_ligand_data", {}) protein_structure = st.session_state.get("seq_analysis_protein_structure", {}) st.divider() st.subheader("📊 Docking Results") if results.get("simulated"): st.warning( "⚠️ **Note:** These are simulated results for demonstration. Production version would use actual AutoDock Vina calculations." ) st.markdown(f"### Results for: **{ligand_name_display}**") # Best binding affinity best_affinity = _validated_docking_affinity(results) strength, color = _docking_strength(best_affinity) affinity_text = f"{best_affinity:.3f} kcal/mol" if best_affinity is not None else "N/A" best_mode = results.get("best_mode", {}) if results.get("status") in {"queued", "running"} and not results.get("simulated"): job_id = results.get('job_id') job_status = results.get('status') col_status, col_refresh = st.columns([3, 1]) with col_status: st.info( f"⏳ Real docking job {job_id} is {job_status}.\n\n" f"Refreshing automatically every 5 seconds..." ) if results.get("fallback_reason"): st.warning(results.get("fallback_reason")) with col_refresh: if st.button("🔄 Refresh Now", key="manual_refresh_ligand_binding"): st.rerun() try: refresh_secs = 5 st.markdown( "", unsafe_allow_html=True, ) except Exception: pass st.markdown("**Status:** Waiting for worker to complete docking calculations...") elif results.get("status") == "completed" and not results.get("simulated"): st.success(f"✅ Real docking completed! Job {results.get('job_id')} result loaded.") elif results.get("status") == "failed" and not results.get("simulated"): failure_reason = ( results.get("error_message") or results.get("fallback_reason") or "Real docking failed." ) st.error(f"❌ Real docking failed: {failure_reason}") col1, col2, col3 = st.columns(3) with col1: st.markdown( f"""

{affinity_text}

Best Binding Affinity

""", unsafe_allow_html=True, ) with col2: st.metric("Binding Strength", strength) with col3: st.metric("Binding Modes", len(results.get("modes", []))) st.markdown("---") # 3D Visualization st.subheader("🔬 3D Protein-Ligand Complex") if results.get("has_coordinates"): col1, col2 = st.columns([2, 1]) with col1: viewer_html = ProteinVisualizer.create_docking_3d_viewer( protein_structure, ligand_data, results, ligand_name_display ) st.components.v1.html(viewer_html, height=650) with col2: st.markdown("**Best Binding Mode:**") st.markdown(f""" **Position (Å):** - X: {best_mode.get("center", {}).get("x", 0):.2f} - Y: {best_mode.get("center", {}).get("y", 0):.2f} - Z: {best_mode.get("center", {}).get("z", 0):.2f} **RMSD:** - Lower bound vs best generated pose: {f'{best_mode.get("rmsd_lb"):.2f} Å' if best_mode.get("rmsd_lb") is not None else 'N/A'} - Upper bound vs best generated pose: {f'{best_mode.get("rmsd_ub"):.2f} Å' if best_mode.get("rmsd_ub") is not None else 'N/A'} """) st.markdown(f"**Pose source:** {results.get('pose_source', 'unavailable')}") st.markdown("---") # Binding modes chart st.subheader("📊 All Binding Modes") fig_docking = ProteinVisualizer.create_docking_results_chart(results) st.plotly_chart(fig_docking, width="stretch") st.subheader("Binding Mode Details") _render_docking_mode_details(results) # Download results st.markdown("---") st.subheader("📥 Export Results") results_csv = pd.DataFrame( [ { "Ligand": ligand_name_display, "Protein_Source": "FASTA Sequence", "Binding_Affinity_kcal_mol": best_affinity, "Strength": strength, "Modes": len(results.get("modes", [])), "Timestamp": datetime.now().isoformat(), } ] ).to_csv(index=False) st.download_button( "📥 Download Docking Results (CSV)", results_csv, f"docking_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", "text/csv", key="seq_analysis_download_docking", ) def _generate_mock_pdb_from_sequence(sequence: str, name: str) -> str: """ Generate a mock PDB file from a protein sequence for visualization. This is a placeholder that creates a simplified PDB structure. """ pdb_content = f"""HEADER SEQUENCE DERIVED STRUCTURE 01-JAN-26 PRED TITLE PREDICTED STRUCTURE FROM FASTA SEQUENCE REMARK 1 REFERENCE 1 REMARK 1 AUTH OMNIBIMOL SEQUENCE ANALYSIS SUITE REMARK 1 FASTA INPUT: {name[:60]} REMARK 2 SEQUENCE LENGTH: {len(sequence)} RESIDUES REMARK 3 STRUCTURE GENERATED FOR DOCKING VISUALIZATION REMARK 99 THIS IS A MOCK STRUCTURE FOR DEMONSTRATION PURPOSES """ # Add simple CA atom trace for i, aa in enumerate(sequence[:100]): # Limit to 100 residues for demo x = 10.0 + (i % 10) * 3.8 y = 10.0 + ((i // 10) % 10) * 3.8 z = 10.0 + ((i // 100) % 10) * 3.8 pdb_content += f"ATOM {i + 1:5d} CA ALA A{i + 1:4d} {x:8.3f}{y:8.3f}{z:8.3f} 1.00 50.00 C\n" pdb_content += "END\n" return pdb_content def display_analysis_results(results: Dict[str, Any], analyzer: SequenceAnalysisSuite) -> None: """Display comprehensive analysis results""" st.divider() st.subheader("📊 Analysis Results") # Errors if results.get("errors"): st.error("⚠️ Errors encountered:") for error in results["errors"]: st.error(f" - {error}") if results.get("warnings"): st.warning("Analysis warnings:") for warning in results["warnings"]: st.warning(f" - {warning}") if results.get("workload"): workload = results["workload"] st.caption( "Workload strategy: " f"{str(workload.get('strategy', 'unknown')).replace('_', ' ')} | " f"{workload.get('sequence_count', 0)} sequences | " f"{workload.get('total_symbols', 0):,} total symbols" ) # Input sequences summary if results.get("input_sequences"): st.markdown("### Input Sequences") seq_df = pd.DataFrame(results["input_sequences"]) st.dataframe(seq_df, width="stretch") # Sequence-specific insights if results.get("sequence_insights"): st.markdown("### Sequence-Specific Insights") st.caption( "Computed directly from the submitted FASTA sequence content, not from generic templates." ) for seq_id, insight in results["sequence_insights"].items(): with st.expander(f"Specific findings for {seq_id}", expanded=True): for point in insight.get("summary_points", []): st.markdown(f"- {point}") metric_rows = [] for group_name in ("common", "metrics"): for metric, value in insight.get(group_name, {}).items(): if isinstance(value, (dict, list)): continue metric_rows.append( { "Category": "Common" if group_name == "common" else "Sequence type", "Metric": metric.replace("_", " ").title(), "Value": value, } ) if metric_rows: st.dataframe(pd.DataFrame(metric_rows), width="stretch", hide_index=True) features = insight.get("features", []) if features: feature_df = pd.DataFrame( [ { "Feature": feature.get("feature", "Feature"), "Location": feature.get("location", "N/A"), "Detail": feature.get("detail", ""), } for feature in features ] ) st.markdown("**Detected sequence features**") st.dataframe(feature_df, width="stretch", hide_index=True) composition = insight.get("composition", {}) if composition: composition_df = pd.DataFrame( [ {"Symbol": symbol, "Count": count} for symbol, count in composition.items() if count ] ) if not composition_df.empty: st.markdown("**Composition**") st.dataframe(composition_df, width="stretch", hide_index=True) # Multiple Sequence Alignment if results.get("alignment"): st.markdown("### Multiple Sequence Alignment") align_data = results["alignment"] metadata = align_data.get("metadata", {}) # Display statistics col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Alignment Length", metadata.get("alignment_length", "N/A")) with col2: st.metric("Sequences", metadata.get("num_sequences", "N/A")) with col3: st.metric("Conserved Positions", metadata.get("conserved_positions", "N/A")) with col4: st.metric("Conservation", f"{metadata.get('conservation_percentage', 0):.1f}%") # Display aligned sequences with st.expander("View Aligned Sequences"): aligned_seqs = align_data.get("aligned_sequences", []) for seq in aligned_seqs: st.text(f">{seq['id']}") # Display in chunks for readability sequence = seq["sequence"] chunk_size = 80 for i in range(0, len(sequence), chunk_size): st.text(sequence[i : i + chunk_size]) # Conservation Analysis if results.get("conservation"): st.markdown("### Conservation Analysis") cons_data = results["conservation"] # Statistics col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Mean Conservation", f"{cons_data.get('mean_conservation', 0):.4f}") with col2: st.metric("Std Deviation", f"{cons_data.get('std_conservation', 0):.4f}") with col3: st.metric("Min Conservation", f"{cons_data.get('min_conservation', 0):.4f}") with col4: st.metric("Max Conservation", f"{cons_data.get('max_conservation', 0):.4f}") # Conservation plot scores = [pos["score"] for pos in cons_data.get("scores", [])] positions = [pos["position"] for pos in cons_data.get("scores", [])] if scores and positions: fig = go.Figure() fig.add_trace( go.Scatter( x=positions, y=scores, mode="lines", name="Conservation Score", line=dict(color="#1f77b4", width=2), fill="tozeroy", fillcolor="rgba(31, 119, 180, 0.2)", ) ) fig.update_layout( title="Conservation Score Across Alignment", xaxis_title="Position", yaxis_title="Conservation Score (1.0 = fully conserved)", height=400, hovermode="x unified", ) st.plotly_chart(fig, width="stretch") # Highly conserved positions highly_cons = cons_data.get("highly_conserved_positions", []) if highly_cons: st.info( f"🔍 Highly conserved positions (>90th percentile): {', '.join(map(str, highly_cons[:20]))}{'...' if len(highly_cons) > 20 else ''}" ) # Domain Identification if results.get("domains"): st.markdown("### Domain Identification") domains = results["domains"] domain_count = sum(len(d) for d in domains.values()) if domain_count > 0: st.metric("Total Domains Found", domain_count) # Display domains per sequence for seq_id, domain_list in domains.items(): if domain_list: with st.expander(f"Domains in {seq_id}"): domain_df = pd.DataFrame(domain_list) st.dataframe(domain_df, width="stretch") # Visualize domain positions if domain_list: fig = go.Figure() colors = px.colors.qualitative.Set3 for i, domain in enumerate(domain_list): fig.add_trace( go.Scatter( x=[domain["start"], domain["end"]], y=[seq_id] * 2, mode="lines+markers", name=domain["domain_name"], line=dict(width=10, color=colors[i % len(colors)]), marker=dict(size=10), ) ) fig.update_layout( title=f"Domain Positions in {seq_id}", xaxis_title="Position", yaxis_title="Sequence", height=300, showlegend=True, ) st.plotly_chart(fig, width="stretch") else: st.info("No domains identified in the sequences.") # Motif Finding if results.get("motifs"): st.markdown("### Motif Analysis") motifs_data = results["motifs"] num_motifs = motifs_data.get("num_motifs", 0) st.metric("Motifs Found", num_motifs) st.caption(f"Method: {motifs_data.get('method', 'N/A')}") motifs_list = motifs_data.get("motifs", []) if motifs_list: # Display top motifs with st.expander("View Motifs"): motif_df_data = [] for motif in motifs_list[:20]: # Top 20 motif_df_data.append( { "Motif": motif.get("motif", "N/A"), "Length": motif.get("length", "N/A"), "Frequency": motif.get("frequency", "N/A"), "Conservation": f"{motif.get('conservation', 0) * 100:.1f}%" if "conservation" in motif else "N/A", "Sequences": len(motif.get("sequences", [])), } ) if motif_df_data: motif_df = pd.DataFrame(motif_df_data) st.dataframe(motif_df, width="stretch") else: st.info("No motifs found in the sequences.") # Phylogenetic Tree if results.get("phylogenetic_tree"): st.markdown("### Phylogenetic Tree") tree_data = results["phylogenetic_tree"] metadata = tree_data.get("metadata", {}) # Display metadata and scientific provenance col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Method", metadata.get("method", "N/A").upper()) with col2: st.metric("Taxa", metadata.get("num_taxa", "N/A")) with col3: st.metric("Tree Length", f"{metadata.get('tree_length', 0):.4f}") with col4: st.metric( "Distance", str(metadata.get("distance_method", "N/A")).replace("_", " ").title(), ) st.markdown("---") newick = tree_data.get("newick", "") try: tree_png = ProteinVisualizer.render_phylogenetic_tree( newick, metadata.get("num_taxa", 2) ) st.image(tree_png, caption="Phylogenetic tree", width="stretch") except ValueError as exc: st.warning(f"Tree image rendering failed: {exc}") if not metadata.get("alignment_used", True): kmer_size = metadata.get("kmer_size") st.info( "This is an alignment-free tree based on full-sequence k-mer Jaccard " f"distances{f' (k={kmer_size})' if kmer_size else ''}." ) with st.expander("Raw Newick Format"): st.code(newick or "N/A", language=None) # Download Report st.divider() st.markdown("### 📥 Download Report") report_text = analyzer.generate_report(results) st.download_button( label="Download Analysis Report (TXT)", data=report_text, file_name="sequence_analysis_report.txt", mime="text/plain", ) # JSON export report_json = json.dumps(results, indent=2, default=str) st.download_button( label="Download Analysis Results (JSON)", data=report_json, file_name="sequence_analysis_results.json", mime="application/json", ) def _normalize_protein_predictor_sequence(sequence: str) -> str: """Normalize a parsed protein sequence for lookup, prediction, and cache identity.""" return "".join(str(sequence or "").split()).upper() def _protein_predictor_sequence_key(sequence: str) -> str: """Return a stable key based on the full normalized protein sequence.""" normalized = _normalize_protein_predictor_sequence(sequence) return hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:24] def _build_protein_predictor_sequence_options( protein_seqs: List[Any], ) -> List[Dict[str, Any]]: """Build unique selector options without collapsing same-name/same-length sequences.""" base_labels: List[str] = [] for seq in protein_seqs: normalized_sequence = _normalize_protein_predictor_sequence(seq.sequence) base_labels.append(f"{seq.id} ({len(normalized_sequence)} aa)") label_counts: Dict[str, int] = {} for label in base_labels: label_counts[label] = label_counts.get(label, 0) + 1 seen_labels: Dict[str, int] = {} options: List[Dict[str, Any]] = [] for index, seq in enumerate(protein_seqs, start=1): normalized_sequence = _normalize_protein_predictor_sequence(seq.sequence) seq_key = _protein_predictor_sequence_key(normalized_sequence) base_label = f"{seq.id} ({len(normalized_sequence)} aa)" label = base_label if label_counts.get(base_label, 0) > 1: seen_labels[base_label] = seen_labels.get(base_label, 0) + 1 label = f"{base_label} #{seen_labels[base_label]} [{seq_key[:8]}]" options.append( { "index": index, "label": label, "sequence": normalized_sequence, "sequence_key": seq_key, "source_id": seq.id, } ) return options def _render_protein_predictor(fasta_content: str) -> None: """Render the Protein Predictor section for protein FASTA sequences.""" if not fasta_content or not fasta_content.strip(): st.info("Upload or paste a protein FASTA sequence above to use the Protein Predictor.") return # Parse sequences with existing FASTA parser try: sequences = FASTAParser.parse(fasta_content) except DataValidationError as e: logger.info( f"FASTA validation error in protein predictor: {e.internal_message}", extra=create_log_context("protein_predictor_fasta_parse", **e.log_details), ) st.warning(f"⚠️ {e.user_message}") return except Exception as e: logger.exception( "Unexpected error parsing FASTA in protein predictor", extra=create_log_context("protein_predictor_fasta_parse", error_type=type(e).__name__), ) st.warning("⚠️ Could not parse the FASTA sequence. Please check the format.") return protein_seqs = [s for s in sequences if s.sequence_type == "protein"] if not protein_seqs: st.info( "Protein Predictor requires at least one amino acid (protein) sequence in the FASTA input." ) return # Sequence selector sequence_options = _build_protein_predictor_sequence_options(protein_seqs) selected_option_index = st.selectbox( "Select protein sequence for prediction", range(len(sequence_options)), format_func=lambda option_index: sequence_options[option_index]["label"], key="protein_predictor_seq_select_v2", ) selected_option = sequence_options[selected_option_index] selected_seq = selected_option["sequence"] seq_key = selected_option["sequence_key"] st.caption(f"Using sequence length: {len(selected_seq)} amino acids") # Create tabs for organized protein prediction analysis predictor_tabs = st.tabs( ["🔍 Protein Name (NCBI Lookup)", "🧠 Protein Structure Prediction", "🧪 Molecular Docking"] ) # ---------------------- # Tab 1: Protein Name (NCBI) # ---------------------- with predictor_tabs[0]: st.subheader("Protein Name Identification") st.info(""" **About NCBI Protein Lookup:** - 🔬 Uses BLASTp against curated protein databases - 🧬 Identifies known protein matches and annotations - 🏆 Returns best match with identity and coverage metrics - 💾 Results cached per sequence """) col_ncbi_btn, col_ncbi_status = st.columns([1, 2]) with col_ncbi_btn: lookup_clicked = st.button( "🔎 Search NCBI for known protein", key=f"protein_predictor_ncbi_btn_{seq_key}", help="Run BLASTp against curated protein databases to find known proteins", type="primary", ) with col_ncbi_status: st.caption("BLASTp search with short polling. Results are cached per sequence.") if lookup_clicked: st.session_state[f"protein_predictor_ncbi_pending_{seq_key}"] = True ncbi_result_key = f"protein_predictor_ncbi_result_{seq_key}" if ( st.session_state.get(f"protein_predictor_ncbi_pending_{seq_key}") and ncbi_result_key not in st.session_state ): # Trigger lookup only once per sequence if "api_client" in st.session_state: if not hasattr(st.session_state.api_client, "search_protein_ncbi"): st.error( "NCBI protein search is not available. Please make sure your " "`api_client.py` includes the `search_protein_ncbi` method and " "restart the Streamlit app." ) else: # Create a progress container for better user feedback progress_container = st.empty() status_container = st.empty() with progress_container.container(): progress_bar = st.progress(0) status_text = st.empty() status_text.text("🔍 Submitting BLAST query to NCBI...") try: # Use asyncio to run the search import time start_time = time.time() # Update progress simulation (since we can't get real-time updates from async) status_text.text("⏳ Searching NCBI database (typically 5-20 seconds)...") progress_bar.progress(20) ncbi_result = run_async_safe( st.session_state.api_client.search_protein_ncbi(selected_seq) ) elapsed = time.time() - start_time progress_bar.progress(100) status_text.text(f"✅ Search completed in {elapsed:.1f} seconds") st.session_state[ncbi_result_key] = ncbi_result # Clear progress indicators after a brief display time.sleep(1) progress_container.empty() status_container.empty() except ExternalServiceError as e: progress_container.empty() logger.warning( f"NCBI search service error: {e.internal_message}", extra=create_log_context( "ncbi_search", sequence_length=len(selected_seq), **e.log_details ), ) st.error(f"⚠️ {e.user_message}") except Exception as e: progress_container.empty() logger.exception( "Unexpected error during NCBI search", extra=create_log_context( "ncbi_search", sequence_length=len(selected_seq), error_type=type(e).__name__, ), ) st.error( "❌ NCBI search failed. This is often due to temporary service issues. Please try again in a few moments." ) else: app_env = get_environment() if app_env.is_development(): st.error("API client not available in session state; cannot contact NCBI.") else: st.error("Unable to perform search. Please refresh and try again.") logger.error("API client not available in session state for NCBI search", extra=create_log_context("api_client_missing")) ncbi_result = st.session_state.get(ncbi_result_key) if ncbi_result: if ncbi_result.get("available") and ncbi_result.get("match_found"): st.success("✅ Protein identified in NCBI database") st.markdown("---") # Display protein information with full text using native Streamlit components st.subheader("🔬 Protein Information") protein_name = ncbi_result.get("protein_name", "N/A") accession_id = ncbi_result.get("accession_id", "N/A") organism = ncbi_result.get("organism", "N/A") # Use container with background color with st.container(): st.markdown("**🧬 Protein Name:**") st.info(protein_name) st.markdown("**🔑 Accession ID:**") st.text(accession_id) st.markdown("**🦠 Organism:**") st.text(organism) st.markdown("---") st.subheader("📊 Alignment Metrics") # Alignment metrics - these are fine with st.metric as they're short col1, col2, col3 = st.columns(3) with col1: identity = ncbi_result.get("identity_percent", 0) st.metric("Identity", f"{identity:.2f}%") with col2: coverage = ncbi_result.get("coverage_percent", 0) st.metric("Coverage", f"{coverage:.2f}%") with col3: evalue = ncbi_result.get("e_value", 1.0) st.metric("E-value", f"{evalue:.2g}") if ncbi_result.get("ncbi_url"): st.markdown( f"🔗 [View detailed information in NCBI Protein Database]({ncbi_result['ncbi_url']})" ) elif ncbi_result.get("available") and not ncbi_result.get("match_found"): st.info("🔬 Protein name not found (novel or unannotated sequence)") else: st.warning(ncbi_result.get("error", "NCBI lookup unavailable.")) # ---------------------- # Tab 2: Protein Structure Prediction # ---------------------- with predictor_tabs[1]: st.subheader("3D Structure Prediction") st.info(""" **About Structure Prediction:** - 🧱 Uses ESMFold API for accurate structure prediction - 🎯 No local models or GPU required - 📊 Provides confidence scores (pLDDT) - 🔬 Interactive 3D visualization - 💾 Results cached per sequence """) col_struct_btn, col_struct_status = st.columns([1, 2]) with col_struct_btn: predict_clicked = st.button( "🧠 Predict 3D Structure (ESMFold)", key=f"protein_predictor_structure_btn_{seq_key}", type="primary", ) with col_struct_status: st.caption("Remote ESMFold API - no local models or GPU required.") struct_result_key = f"protein_predictor_structure_result_{seq_key}" if predict_clicked: st.session_state[f"protein_predictor_structure_pending_{seq_key}"] = True st.session_state.pop(struct_result_key, None) st.session_state.pop(f"protein_structure_for_docking_{seq_key}", None) if ( st.session_state.get(f"protein_predictor_structure_pending_{seq_key}") and struct_result_key not in st.session_state ): if "api_client" in st.session_state: with st.spinner("🧱 Predicting protein structure..."): struct_result = run_async_safe( st.session_state.api_client.predict_structure(selected_seq) ) st.session_state[struct_result_key] = struct_result else: st.error( "API client not available in session state; cannot run structure prediction." ) struct_result = st.session_state.get(struct_result_key) protein_structure_for_docking = None if struct_result: if struct_result.get("available"): avg_plddt = struct_result.get("avg_plddt") is_approximate = bool(struct_result.get("is_approximate")) if is_approximate: st.warning( struct_result.get( "warning", "ESMFold is temporarily unavailable. Showing an approximate CA trace for visualization only.", ) ) st.caption("Click the prediction button again later to retry ESMFold.") elif avg_plddt is not None: st.success(f"✅ Structure predicted successfully") # Display confidence metric col1, col2 = st.columns(2) with col1: st.metric("Average pLDDT Score", f"{avg_plddt:.1f}") with col2: confidence_level = ( "High" if avg_plddt > 80 else "Medium" if avg_plddt > 60 else "Low" ) st.metric("Confidence Level", confidence_level) else: st.success("✅ Structure predicted (confidence scores not provided)") st.markdown("---") pdb_text = struct_result.get("pdb", "") # Download PDB st.download_button( "📥 Download Predicted PDB File", pdb_text, file_name=( "approximate_sequence_trace.pdb" if is_approximate else "predicted_structure_esmfold.pdb" ), mime="chemical/x-pdb", key=f"download_predicted_pdb_{seq_key}", ) st.markdown("---") st.markdown( "**Approximate Structure Visualization**" if is_approximate else "**3D Structure Visualization**" ) if pdb_text: html_view = ProteinVisualizer.create_molstar_structure_viewer( { "available": True, "structure_type": "approximate" if is_approximate else "predicted", "structure_id": ( "APPROXIMATE_TRACE" if is_approximate else "ESMFOLD" ), "pdb_text": pdb_text, "pdb_url": "", }, "pdb", ) st.components.v1.html(html_view, height=580) # Prepare structure object for downstream docking (store PDB text directly) if pdb_text: protein_structure_for_docking = { "available": True, "structure_type": "approximate" if is_approximate else "predicted", "structure_id": ( "APPROXIMATE_TRACE" if is_approximate else "ESMFOLD" ), "pdb_text": pdb_text, # Store PDB text directly instead of data URI "pdb_url": "", # Empty URL to ensure we use pdb_text "is_approximate": is_approximate, "source": struct_result.get("source"), "warning": struct_result.get("warning"), } # Store for use in docking tab st.session_state[f"protein_structure_for_docking_{seq_key}"] = ( protein_structure_for_docking ) else: st.warning(struct_result.get("error", "Structure prediction unavailable.")) # ---------------------- # Tab 3: Molecular Docking # ---------------------- with predictor_tabs[2]: st.subheader("Molecular Docking Simulation") st.info(""" **About Molecular Docking:** - 🧪 Simulates protein-ligand interactions - 🎯 Uses the app's existing docking pipeline - 📊 Provides binding affinity predictions - 🔬 Interactive 3D docking visualization - 💡 Example ligand used only when no named ligand is provided """) col1, col2 = st.columns([2, 1]) with col1: default_mw = st.number_input( "Approximate ligand molecular weight (Da)", min_value=50.0, max_value=1000.0, value=300.0, step=10.0, key=f"protein_predictor_ligand_mw_{seq_key}", ) with col2: st.caption("") st.caption("") run_docking = st.button( "🚀 Run Docking", key=f"protein_predictor_run_docking_{seq_key}", type="primary", width="stretch", ) site_config = _render_docking_site_controls(f"protein_predictor_{seq_key}") docking_result_key = f"protein_predictor_docking_result_{seq_key}" if run_docking: if "api_client" not in st.session_state: st.error( "API client not available in session state; cannot run docking simulation." ) else: selected_ligand = { "name": "Example ligand (CC)", "smiles": "CC", "molecular_weight": float(default_mw), } protein_prep = protein_structure_for_docking or { "available": True, "structure_type": "predicted", "structure_id": f"ESMFOLD-{seq_key}", "sequence_length": len(selected_seq), "pdb_url": "", "pdb_text": protein_structure_for_docking.get("pdb_text", "") if protein_structure_for_docking else "", } with st.spinner( "🧪 Submitting molecular docking request..." if _get_docking_mode_value() == "real" else "🧪 Simulating molecular docking..." ): docking_results = _run_shared_docking( protein_prep=protein_prep, selected_ligand=selected_ligand, ligand_name=selected_ligand["name"], protein_length=len(selected_seq), exhaustiveness=8, num_modes=9, energy_range=3, **site_config, ) st.session_state[docking_result_key] = docking_results docking_results = st.session_state.get(docking_result_key) if docking_results: docking_results = _refresh_real_docking_result_if_needed(docking_results) st.session_state[docking_result_key] = docking_results if docking_results.get("available"): _render_docking_pocket_summary( docking_results, f"protein_predictor_{seq_key}" ) if docking_results: if docking_results.get("available"): st.success("✅ Docking simulation completed") elif docking_results.get("status") == "failed" and not docking_results.get("simulated"): failure_reason = ( docking_results.get("error_message") or docking_results.get("fallback_reason") or "Real docking failed." ) st.error(f"Real docking failed: {failure_reason}") elif docking_results.get("status") in {"queued", "running"} and not docking_results.get("simulated"): job_id = docking_results.get('job_id') job_status = docking_results.get('status') col_status, col_refresh = st.columns([3, 1]) with col_status: st.info( f"⏳ Real docking job {job_id} is {job_status}.\n\n" f"Refreshing automatically every 5 seconds..." ) if docking_results.get("fallback_reason"): st.warning(docking_results.get("fallback_reason")) with col_refresh: if st.button("🔄 Refresh Now", key="manual_refresh_seq_docking_alt"): st.rerun() try: refresh_secs = 5 st.markdown( "", unsafe_allow_html=True, ) except Exception: pass st.markdown("**Status:** Waiting for worker to complete docking calculations...") if docking_results and docking_results.get("available"): st.markdown("---") st.markdown("**Docking Scores & Binding Affinity**") # Reuse existing docking results chart fig = ProteinVisualizer.create_docking_results_chart(docking_results) st.plotly_chart(fig, width="stretch") st.markdown("**Binding Mode Details**") _render_docking_mode_details(docking_results) st.markdown("---") # Retrieve structure from previous tab if available protein_structure_for_docking = st.session_state.get( f"protein_structure_for_docking_{seq_key}" ) # Reuse existing 3D docking viewer if we have a predicted structure if protein_structure_for_docking: # Validate that we have actual PDB data pdb_text = protein_structure_for_docking.get("pdb_text", "") pdb_url = protein_structure_for_docking.get("pdb_url", "") if pdb_text or pdb_url: st.markdown("**3D Docking Visualization**") if protein_structure_for_docking.get("is_approximate"): st.warning( protein_structure_for_docking.get( "warning", "This visualization uses an approximate CA trace because ESMFold is temporarily unavailable.", ) ) ligand_name = docking_results.get("ligand_name") or "Ligand pose unavailable" ligand_data = { "name": ligand_name, "smiles": "", } try: viewer_html = ProteinVisualizer.create_docking_3d_viewer( protein_structure_for_docking, ligand_data, docking_results, ligand_name=ligand_name, ) st.components.v1.html(viewer_html, height=650) except Exception as e: logger.exception( "Error creating 3D visualization", extra=create_log_context( "docking_3d_visualization", error_type=type(e).__name__ ), ) st.error( "⚠️ Could not render 3D visualization. Try predicting the structure again." ) st.info( "💡 Make sure the protein structure prediction completed successfully in the previous tab." ) else: st.warning("⚠️ Protein structure data is incomplete.") st.info( "💡 Please predict the protein structure in the **Protein Structure Prediction** tab first." ) else: st.info( "💡 **To enable 3D docking visualization:**\n\n1. Go to the **Protein Structure Prediction** tab\n2. Click **Predict 3D Structure (ESMFold)**\n3. Wait for the prediction to complete\n4. Return to this tab to view the docking visualization" ) # ============================================================================= # WHOLE GENOME SEQUENCING PAGE # ============================================================================= def render_whole_genome_sequencing_page(): """ Render the Whole Genome Sequencing page with sequence-driven genomic risk-signal analysis, biomarker detection, and personalized research-based recommendations. """ st.title("🧬 Whole Genome Sequencing Analysis") # Critical disclaimers st.markdown( """
⚠️ IMPORTANT DISCLAIMER
This tool is for research, educational, and exploratory purposes only. It does NOT provide medical diagnosis or treatment recommendations. All results are based on computational analysis of genomic sequences and should NOT be used for patient-care decisions. Always consult qualified healthcare providers for medical advice.
""", unsafe_allow_html=True, ) st.markdown( """
This module analyzes uploaded genomic sequences to:
  • Detect mutations: Identify research-linked genomic variants and gene patterns
  • Analyze biomarkers: Scan for research-based biomarker signals and protein signatures
  • Estimate predisposition: Calculate bounded genetic risk-signal scores from sequence evidence
  • Genomic health insights: Generate research-based preventive follow-up suggestions
""", unsafe_allow_html=True, ) # Genome and structured variant input st.header("📄 Step 1: Input Genome Sequence") input_method = st.radio( "Choose input method:", ["Paste Sequence", "Upload FASTA File", "Upload VCF", "Upload Variant CSV", "Use Example"], horizontal=True, ) genome_sequence = "" vcf_text = None annotated_variants = None selected_sample_id = None if input_method == "Paste Sequence": genome_sequence = st.text_area( "Enter genome sequence (FASTA format or raw DNA sequence):", height=200, placeholder=">Genome_Sample\nATCGATCGATCGATCGATCGATCG...", help="Paste your DNA sequence in FASTA format or as raw nucleotides", ) elif input_method == "Upload FASTA File": uploaded_file = st.file_uploader( "Choose a FASTA file", type=["fasta", "fa", "fna", "txt"], help="Upload a FASTA file containing the genome sequence", ) if uploaded_file is not None: genome_sequence = uploaded_file.read().decode("utf-8") st.success(f"✅ File uploaded: {uploaded_file.name}") elif input_method == "Upload VCF": uploaded_file = st.file_uploader( "Choose a VCF file", type=["vcf", "gz"], help="VCF and VCF.GZ are parsed with vcfpy; multi-sample files require one selected sample.", key="genome_vcf_upload", ) if uploaded_file is not None: try: vcf_text = decode_uploaded_vcf_bytes(uploaded_file.name, uploaded_file.read()) header_line = next( (line for line in vcf_text.splitlines() if line.startswith("#CHROM")), "" ) samples = header_line.split("\t")[9:] if header_line else [] if samples: selected_sample_id = st.selectbox("VCF sample", samples) st.success(f"VCF loaded: {uploaded_file.name}") except Exception as exc: st.error(f"Unable to read VCF: {exc}") elif input_method == "Upload Variant CSV": uploaded_file = st.file_uploader( "Choose an annotated variant CSV", type=["csv"], help=( "Recommended columns: gene, variant_id, consequence, pathogenicity_score, " "genotype, phase_set, allele_fraction, depth, origin, genome_build, sample_id." ), key="genome_variant_csv_upload", ) if uploaded_file is not None: try: variant_frame = pd.read_csv(uploaded_file) variant_frame = variant_frame.where(pd.notna(variant_frame), None) annotated_variants = variant_frame.to_dict(orient="records") csv_samples = sorted( { str(row.get("sample_id")) for row in annotated_variants if row.get("sample_id") not in (None, "") } ) if csv_samples: selected_sample_id = st.selectbox("Variant sample", csv_samples) st.success(f"Loaded {len(annotated_variants):,} annotated variants") except Exception as exc: st.error(f"Unable to read variant CSV: {exc}") else: # Use Example st.info("Using example human genome sequence segment") # Use example with disease-related genes and an exploratory biomarker label. genome_sequence = """>Example_Human_Sequence_ERBB2_HER2_region ATGATGAATAAAAGAAAAAAAAAATATTGTGAAACAAGATGAGGATGAAAATGAA AATTGAAAGAAAATAAATGAGAAATTTCAGATAACAAATTTAGGAAGTATAATTAT ATTTATATTGTATACTGCGATCAACTTAGTAAGTAATGGATGATATAATATAATAA AGATGAATAAAGAAATGATGATGATATAATAAAGAAAAAGATGATGATGATGAT""" st.text_area( "Example sequence (ERBB2/HER2 region with biomarkers):", value=genome_sequence, height=150, disabled=True, ) has_genome_input = bool(genome_sequence or vcf_text or annotated_variants) if has_genome_input: biomarker_sequence = genome_sequence or None interpretation_mode = st.selectbox( "Interpretation mode", ["germline", "somatic"], help="Germline mode is phase-aware; somatic mode evaluates sample-local co-mutations.", ) genome_build = st.selectbox("Genome build", ["GRCh38", "GRCh37", "Unknown"]) # Parse sequence if genome_sequence.startswith(">"): lines = genome_sequence.split("\n") sequence = "".join(lines[1:]) elif genome_sequence: sequence = genome_sequence else: sequence = "" # Clean sequence: remove all whitespace and non-nucleotide characters, convert to uppercase sequence = sequence.upper() # Keep only valid nucleotides: A, T, C, G, U (RNA), N (unknown), and - (gap) sequence = "".join(c for c in sequence if c in "ATCGUMN-") # Validate sequence is not empty if genome_sequence and (not sequence or len(sequence) == 0): st.error( "❌ No valid DNA sequence found. Please check your input and ensure it contains DNA nucleotides (A, T, C, G)." ) st.stop() # Display sequence statistics col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Sequence Length", f"{len(sequence):,} bp") with col2: gc_content = ( ((sequence.count("G") + sequence.count("C")) / len(sequence) * 100) if len(sequence) > 0 else 0 ) st.metric("GC Content", f"{gc_content:.2f}%") with col3: valid_count = len(sequence) st.metric("Valid Nucleotides", f"{valid_count:,}") with col4: st.metric("Quality", "✅ Ready") st.divider() # User metadata collection (Step 2) st.header("👤 Step 2: Provide Personal Metadata") st.markdown("*(Optional but recommended for personalized analysis)*") col1, col2, col3, col4 = st.columns(4) user_age = 50 user_gender = "Unknown" user_weight = 70 user_height_cm = 170.0 with col1: user_age = st.number_input( "Age (years)", min_value=18, max_value=100, value=st.session_state.get("user_age", 50), help="Your current age", ) with col2: user_gender = st.selectbox( "Gender", ["Unknown", "Male", "Female", "Other"], index=st.session_state.get("gender_index", 0), ) with col3: user_weight = st.number_input( "Weight (kg)", min_value=30.0, max_value=200.0, value=st.session_state.get("user_weight", 70.0), help="Your body weight", ) with col4: user_height_cm = st.number_input( "Height (cm)", min_value=120.0, max_value=230.0, value=st.session_state.get("user_height_cm", 170.0), help="Used to calculate BMI-based recommendation filters.", ) # Store in session state st.session_state.user_age = user_age st.session_state.gender_index = ["Unknown", "Male", "Female", "Other"].index(user_gender) st.session_state.user_weight = user_weight st.session_state.user_height_cm = user_height_cm # Create user metadata for reference user_metadata = { "age": user_age, "gender": user_gender, "weight": user_weight, "height_cm": user_height_cm, } st.divider() # Analysis button (Step 3) execution_target = st.radio( "Execution target", ["Local analysis", "Backend worker"], horizontal=True, help="The backend worker uses the same evidence engine and returns a persisted job result.", ) st.header("🔬 Step 3: Run Analysis") if st.button( "▶️ Analyze Sequence", type="primary", width="stretch", key="analyze_genome_btn" ): with st.spinner("🧬 Running comprehensive genome analysis..."): if execution_target == "Backend worker": payload = { "interpretation_mode": interpretation_mode, "sample_id": selected_sample_id, "genome_build": None if genome_build == "Unknown" else genome_build, "user_metadata": user_metadata, } if sequence: payload["dna_sequence"] = sequence if vcf_text: payload["vcf_text"] = vcf_text if annotated_variants: payload["annotated_variants"] = annotated_variants job = st.session_state.api_client.submit_genome_analysis_job(payload=payload) st.session_state.genome_analysis_job = job st.session_state.user_metadata = user_metadata st.session_state.show_genome_results = False st.success(f"Genome analysis job submitted: #{job.get('id')}") st.rerun() # Initialize genome analysis engine with cache support if "genome_engine" not in st.session_state: st.session_state.genome_engine = GenomeAnalysisEngine( cache_manager=st.session_state.cache_manager ) # Run analysis analysis_results = st.session_state.genome_engine.analyze_genome( sequence=sequence, user_metadata=user_metadata, biomarker_sequence=biomarker_sequence, annotated_variants=annotated_variants, vcf_text=vcf_text, interpretation_mode=interpretation_mode, sample_id=selected_sample_id, genome_build=None if genome_build == "Unknown" else genome_build, ) # Store results st.session_state.genome_analysis_results = analysis_results st.session_state.genome_sequence = sequence st.session_state.user_metadata = user_metadata st.session_state.show_genome_results = True time.sleep(1) # Brief pause for user feedback st.success("✅ Analysis complete! Scroll down to view results.") st.rerun() genome_job = st.session_state.get("genome_analysis_job") if genome_job: st.info(f"Backend genome job #{genome_job.get('id')}: {genome_job.get('status')}") if st.button("Refresh Genome Job", key="refresh_genome_analysis_job"): refreshed = st.session_state.api_client.poll_genome_analysis_job( int(genome_job["id"]) ) st.session_state.genome_analysis_job = refreshed if refreshed.get("status") == "completed": st.session_state.genome_analysis_results = refreshed.get("result_payload") or {} st.session_state.show_genome_results = True elif refreshed.get("status") == "failed": st.error(refreshed.get("error_message") or "Genome analysis job failed") st.rerun() # Display comprehensive analysis results if st.session_state.get("show_genome_results") and st.session_state.get( "genome_analysis_results" ): render_genome_analysis_results( st.session_state.genome_analysis_results, st.session_state.user_metadata ) def render_genome_analysis_results(analysis_results: Dict, user_metadata: Dict): """Render comprehensive genome analysis results""" st.header("📊 Analysis Results") # Disclaimers at top of results st.markdown( """
⚠️ RESEARCH PURPOSES ONLY
These are predicted genetic risk indicators for research and educational purposes only. Results are computational predictions of variants of interest and research-linked signals, not intended for clinical use; consult a healthcare professional for clinical interpretation.
""", unsafe_allow_html=True, ) # Organize results into tabs tab1, tab2, tab3, tab4, tab5, tab6 = st.tabs( [ "📋 Sequence Summary", "🧬 Mutation Analysis", "🔬 Biomarker Detection", "Research-Based Risk Signals", "Genomic Health Insights", "Multi-Mutation Biomarkers", ] ) with tab1: render_sequence_summary(analysis_results) with tab2: render_mutation_analysis_results(analysis_results) with tab3: render_biomarker_detection_results(analysis_results) with tab4: render_disease_risk_assessment(analysis_results) with tab5: render_personalized_insights(analysis_results, user_metadata) with tab6: render_multi_mutation_analysis(analysis_results) def render_sequence_summary(analysis_results: Dict): """Render sequence summary statistics""" st.subheader("📋 Sequence Summary") seq_analysis = analysis_results["sequence_analysis"] col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Sequence Length", f"{seq_analysis['length']:,} bp") with col2: st.metric("GC Content", f"{seq_analysis['gc_content']:.2f}%") with col3: st.metric("Valid Nucleotides", f"{seq_analysis['valid_nucleotides']:,}") with col4: quality_pct = ( (seq_analysis["valid_nucleotides"] / seq_analysis["length"] * 100) if seq_analysis["length"] > 0 else 0 ) quality = "Excellent" if quality_pct > 95 else "Good" if quality_pct > 90 else "Fair" st.metric("Quality", quality) st.info(f""" **Sequence Information:** - Total analyzed: {seq_analysis["length"]:,} base pairs - Quality assessment indicates {"high-quality sequence suitable for analysis" if quality_pct > 95 else "acceptable quality for analysis"} """) def render_mutation_analysis_results(analysis_results: Dict): """Render mutation analysis results""" st.subheader("🧬 Mutation Analysis") st.markdown("Detected genomic variants and research-linked genes in your sequence") mutation_data = analysis_results["mutation_analysis"] variants = mutation_data["detected_variants"] if not variants: st.info("✅ No known variants of interest detected in this research analysis.") else: col1, col2, col3 = st.columns(3) with col1: st.metric("Total Variants Detected", mutation_data["total_variants"]) with col2: st.metric("High-Risk Variants", mutation_data["high_risk_variants"]) with col3: st.metric("Detection Confidence", "High") st.divider() # Variants table if variants: st.markdown("#### Detected Variants") variants_df = pd.DataFrame( [ { "Gene": v["gene"], "Variant ID": v["variant_id"], "Type": v["type"], "Description": v["description"], "Confidence": f"{v['confidence'] * 100:.0f}%", } for v in variants ] ) st.dataframe(variants_df, width="stretch", hide_index=True) # Detailed variant analysis st.markdown("#### Detailed Variant Information") for variant in variants: with st.expander(f"🔍 {variant['gene']} - {variant['variant_id']}"): col1, col2 = st.columns(2) with col1: st.markdown(f""" **Gene:** {variant["gene"]} **Variant ID:** {variant["variant_id"]} **Type:** {variant["type"]} **Position:** {variant["position"]} """) with col2: confidence = variant["confidence"] * 100 st.markdown(f""" **Confidence:** {confidence:.0f}% **Sequence Match:** {variant["sequence_match"]} **Description:** {variant["description"]} """) def render_biomarker_detection_results(analysis_results: Dict): """Render biomarker detection results""" st.subheader("🔬 Biomarker Detection") st.markdown("Research-linked biomarkers and protein signatures detected in your sequence") biomarker_data = analysis_results["biomarker_detection"] biomarkers = biomarker_data["detected_biomarkers"] if not biomarkers: st.info("⚠️ No research-linked biomarkers detected in this sequence analysis.") else: col1, col2, col3 = st.columns(3) with col1: st.metric("Total Biomarkers", biomarker_data["total_biomarkers"]) with col2: st.metric("Therapeutic Targets", biomarker_data["therapeutic_targets"]) with col3: st.metric("Detection Confidence", "Moderate-High") st.divider() # Biomarkers table if biomarkers: st.markdown("#### Detected Biomarkers") biomarkers_df = pd.DataFrame( [ { "Biomarker": b["name"], "Type": b["type"], "Location": b["location"], "Match Strength": f"{b['match_strength'] * 100:.0f}%", "Research-Linked Conditions": ", ".join(b["diseases"][:2]), "Research Significance": b["significance"], } for b in biomarkers ] ) st.dataframe(biomarkers_df, width="stretch", hide_index=True) # Detailed biomarker analysis st.markdown("#### Detailed Biomarker Information") for biomarker in biomarkers: with st.expander(f"🔬 {biomarker['name']} ({biomarker['type']})"): col1, col2 = st.columns(2) with col1: st.markdown(f""" **Name:** {biomarker["name"]} **Type:** {biomarker["type"]} **Location:** {biomarker["location"]} **Pattern:** {biomarker["pattern"]} """) with col2: match_pct = biomarker["match_strength"] * 100 st.markdown(f""" **Match Strength:** {match_pct:.0f}% **Research Significance:** {biomarker["significance"]} **Research-Linked Conditions:** {", ".join(biomarker["diseases"])} """) # Recommendation st.markdown("**Recommendation:**") st.info( f"This biomarker ({biomarker['name']}) is research-linked to {', '.join(biomarker['diseases'])}. " f"This output is not intended for clinical use; consult a healthcare professional for clinical interpretation." ) def render_multi_mutation_analysis(analysis_results: Dict): """Render phase-aware germline or sample-local somatic composite findings.""" data = analysis_results.get("multi_mutation_analysis") or {} st.subheader("Multi-Mutation Biomarkers") st.caption( f"Mode: {data.get('interpretation_mode') or 'not selected'} | " f"Ruleset: {data.get('ruleset_version', 'unknown')}" ) findings = data.get("detected_biomarkers") or [] if data.get("status") == "disabled": st.info("Choose somatic or germline mode and provide structured variants to enable composite interpretation.") return if not findings: st.info("No qualifying multi-mutation biomarker was detected in the selected sample.") return st.metric("Composite Biomarkers", len(findings)) summary = pd.DataFrame( [ { "Signature": finding.get("signature_id"), "Relationship": str(finding.get("relationship_type", "")).replace("_", " ").title(), "Variants": len(finding.get("participating_variants") or []), "Evidence": finding.get("evidence_level"), "Confidence": f"{float(finding.get('confidence', 0.0)) * 100:.0f}%", "Phase": str(finding.get("phase_status", "")).replace("_", " ").title(), } for finding in findings ] ) st.dataframe(summary, width="stretch", hide_index=True) for finding in findings: with st.expander(str(finding.get("signature_id", "Composite finding"))): st.markdown(f"**Interpretation:** {finding.get('interpretation', '')}") st.markdown( "**Disease context:** " + (", ".join(finding.get("disease_context") or []) or "Not specified") ) variants = pd.DataFrame(finding.get("participating_variants") or []) if not variants.empty: st.dataframe(variants, width="stretch", hide_index=True) limitations = finding.get("limitations") or [] if limitations: st.warning("\n".join(f"- {item}" for item in limitations)) references = finding.get("source_references") or [] if references: st.markdown("**Sources:**\n" + "\n".join(f"- {item}" for item in references)) def render_disease_risk_assessment(analysis_results: Dict): """Render research-based risk signals and associations""" st.subheader("Research-Based Risk Signals") st.markdown( "Eligibility-filtered genomic predisposition signals based on detected variants and biomarkers" ) disease_assoc = analysis_results["disease_associations"] associations = disease_assoc["associations"] suppressed = disease_assoc.get("suppressed_associations", []) col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Eligible Signals", len(associations)) with col2: st.metric("High Confidence", disease_assoc["high_confidence"]) with col3: st.metric("Moderate Confidence", disease_assoc["moderate_confidence"]) with col4: st.metric("Suppressed", len(suppressed)) st.divider() if not associations: st.info( "✅ No eligible research-based risk signals were detected from the current sequence analysis." ) else: st.markdown("#### Research-Based Signal Rankings") sorted_assoc = sorted( associations, key=lambda x: (x.get("priority_score", 0.0), x["risk_score"], x["disease"]), reverse=True, ) diseases = [a["disease"] for a in sorted_assoc[:10]] risks = [a["risk_score"] for a in sorted_assoc[:10]] confidences = [a["confidence"] for a in sorted_assoc[:10]] color_map = { "Very High": "#ff4444", "High": "#ff8844", "Moderate": "#ffaa44", "Low": "#ffcc44", "Very Low": "#cccccc", } colors = [color_map.get(c, "#cccccc") for c in confidences] fig = go.Figure() fig.add_trace( go.Bar( x=diseases, y=risks, marker=dict(color=colors), text=[f"{r:.1f}%" for r in risks], textposition="outside", hovertemplate="%{x}
Predisposition signal: %{y:.1f}%", ) ) fig.update_layout( title="Research-Based Genetic Predisposition Signals", xaxis_title="Condition", yaxis_title="Predisposition Signal Score (%)", yaxis=dict(range=[0, 100]), height=400, showlegend=False, ) st.plotly_chart(fig, width="stretch") st.divider() st.markdown("#### Detailed Signal Profiles") for assoc in sorted_assoc: disease = assoc["disease"] risk = assoc["risk_score"] confidence = assoc["confidence"] priority_category = assoc.get("priority_category", "Research Signal") uncertainty = assoc.get("uncertainty_messages", []) if risk > 50: risk_level = "🔴 High" elif risk > 25: risk_level = "🟡 Moderate" else: risk_level = "🟢 Low" with st.expander( f"{risk_level} {disease} - {priority_category} ({confidence} Confidence, {risk:.1f}%)" ): col1, col2 = st.columns(2) with col1: st.markdown(f""" **Signal:** Elevated genetic predisposition for {disease} **Predisposition Score:** {risk:.1f}% **Confidence Level:** {confidence} **Population Baseline:** ~{assoc["prevalence"] * 100:.1f}% """) with col2: st.markdown(f""" **Inheritance Pattern:** {assoc["inheritance"]} **Detected Variants:** {assoc["variants"]} **Detected Biomarkers:** {assoc["biomarkers"]} **Priority Category:** {priority_category} """) if assoc.get("eligibility_reason"): st.caption(f"Eligibility check: {assoc['eligibility_reason']}") if uncertainty: st.info(" ".join(uncertainty)) st.warning( "⚠️ **Important:** This is a computational prediction based on sequence analysis. " "It reflects a research-oriented predisposition signal, not a diagnosis, and should not be used for patient-care decisions." ) if suppressed: st.divider() st.markdown("#### Suppressed Signals") st.caption( "These signals matched sequence evidence but were removed from ranking because eligibility rules did not support them." ) for item in suppressed[:10]: st.markdown( f"- **{item['disease']}**: {item.get('eligibility_reason', item.get('reason', 'No reason provided'))}" ) def render_personalized_insights(analysis_results: Dict, user_metadata: Dict): """Render personalized research-based insights""" st.subheader("Genomic Health Insights") st.markdown( """
Research-based genomic health insights generated from eligible biomarkers, variant evidence, and your personal characteristics.
""", unsafe_allow_html=True, ) recommendations = analysis_results["recommendations"] # Personal summary st.markdown("#### Your Profile Summary") col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Age", f"{user_metadata.get('age', 'N/A')} years") with col2: st.metric("Gender", user_metadata.get("gender", "N/A")) with col3: st.metric("Weight", f"{user_metadata.get('weight', 'N/A')} kg") with col4: st.metric("Height", f"{user_metadata.get('height_cm', 'N/A')} cm") weight = user_metadata.get("weight") height_cm = user_metadata.get("height_cm") try: if weight is not None and height_cm is not None and float(height_cm) > 0: bmi = float(weight) / ((float(height_cm) / 100.0) ** 2) st.caption(f"Calculated BMI: {bmi:.1f}") except (TypeError, ValueError, ZeroDivisionError): pass st.divider() category_sections = [ ("high_priority", "🔴 High Priority", "Actionable, strong-evidence, person-relevant signals."), ("moderate_priority", "🟡 Moderate Priority", "Signals with useful follow-up value but more uncertainty or smaller effect."), ("informational", "🔵 Informational", "Signals worth noting for context and preventive awareness."), ("research_signal", "⚪ Research Signal", "Preliminary or weakly replicated findings kept in an exploratory bucket."), ] for key, heading, description in category_sections: if not recommendations.get(key): continue st.markdown(f"#### {heading}") st.caption(description) for rec in recommendations[key][:5]: with st.expander(f"{rec['title']} ({rec['confidence']} Confidence)", expanded=False): st.markdown(f""" **Signal:** {rec["title"]} **Category:** {rec["category"]} **Priority Score:** {rec["priority_score"]:.2f} **Predisposition Score:** {rec["risk_score"]:.1f}% **Confidence:** {rec["confidence"]} **Summary:** {rec["summary"]} **Preventive Follow-Up:** {rec["treatment"]} """) if rec.get("recommended_actions"): st.markdown("**Preventive Health Recommendations:**") for action in rec["recommended_actions"]: st.markdown(f"- {action}") if rec.get("uncertainty_message"): st.info(rec["uncertainty_message"]) st.caption("Research support only. This is not a diagnosis or a treatment recommendation.") if recommendations.get("suppressed_insights"): st.divider() st.markdown("#### Suppressed or Ineligible Signals") for item in recommendations["suppressed_insights"][:10]: st.markdown( f"- **{item['disease']}**: {item.get('eligibility_reason', item.get('reason', 'No reason provided'))}" ) st.divider() if recommendations.get("lifestyle"): lifestyle_items = list(recommendations.get("lifestyle", [])) bmi_for_display = None try: if weight is not None and height_cm is not None and float(height_cm) > 0: bmi_for_display = float(weight) / ((float(height_cm) / 100.0) ** 2) except (TypeError, ValueError, ZeroDivisionError): bmi_for_display = None filtered_lifestyle = [] for item in lifestyle_items: item_text = str(item) item_lower = item_text.lower() is_explicit_weight_loss_item = any( phrase in item_lower for phrase in ( "weight loss", "weight-loss", "weight reduction", "lose weight", "reduce weight", ) ) if is_explicit_weight_loss_item: if bmi_for_display is None: continue if bmi_for_display >= 25: current_weight = float(weight) target_10 = current_weight * 0.9 target_5 = current_weight * 0.95 filtered_lifestyle.append( f"Target 5-10% weight reduction (from current {current_weight:.1f}kg to {target_10:.1f}-{target_5:.1f}kg range)." ) continue filtered_lifestyle.append(item_text) if bmi_for_display is not None and bmi_for_display < 18.5 and not any( "increase caloric intake" in str(x).lower() for x in filtered_lifestyle ): filtered_lifestyle.append( "Increase caloric intake by ~300-500 kcal/day with nutrient-dense foods until BMI reaches at least 18.5." ) # Keep display deterministic and duplicate-free. seen = set() normalized_lifestyle = [] for item in filtered_lifestyle: if item not in seen: seen.add(item) normalized_lifestyle.append(item) st.markdown("#### Preventive Health Recommendations") st.markdown("*Based on eligible genomic risk signals and biomarker evidence:*") cols = st.columns(2) for idx, lifestyle in enumerate(normalized_lifestyle): with cols[idx % 2]: st.markdown(f"✅ {lifestyle}") if recommendations.get("monitoring"): st.markdown("#### Monitoring Considerations") st.markdown("*Consider discussing with healthcare providers when appropriate:*") for monitoring in recommendations["monitoring"]: st.markdown(f"📌 {monitoring}") st.divider() # Pharmacogenomic guidance if recommendations.get("pharmacogenomics"): st.markdown("#### 💊 Pharmacogenomic Guidance") st.markdown("*How your genetic variants may affect drug metabolism:*") for pharm in recommendations["pharmacogenomics"]: with st.expander(f"🧬 {pharm['gene']} ({pharm['phenotype']})"): st.markdown(f""" **Enzyme:** {pharm["enzyme"]} **Your Phenotype:** {pharm["phenotype"]} **Affected Drugs:** {", ".join(pharm["affected_drugs"])} **Action:** {pharm["action"]} **Risk:** {pharm["risk"]} """) st.divider() # Important disclaimers st.markdown("#### ⚠️ Important Disclaimers") for disclaimer in recommendations["disclaimers"]: st.warning(disclaimer) st.info(""" **About This Analysis:** - This is a computational analysis for research and educational purposes - Results should NOT be used for medical decision-making - All recommendations are research-based and NOT medical prescriptions - Always consult qualified healthcare providers before making health decisions - Genetic testing and counseling are recommended for confirmation """) def render_predictive_risk_calculator(genome_data): """Render the Predictive Risk Calculator section""" st.subheader("🎯 Predictive Risk Calculator") st.markdown( """
Calculate disease risk based on genetic variants and population statistics.
""", unsafe_allow_html=True, ) # Simulate variant detection detected_variants = [ { "gene": "BRCA1", "variant": "c.68_69delAG", "type": "Pathogenic", "diseases": ["Breast Cancer", "Ovarian Cancer"], "risk_increase": 65.0, "population_freq": 0.0006, }, { "gene": "APOE", "variant": "ε4 allele", "type": "Risk Factor", "diseases": ["Alzheimer's Disease"], "risk_increase": 12.0, "population_freq": 0.15, }, { "gene": "CFTR", "variant": "F508del", "type": "Carrier", "diseases": ["Cystic Fibrosis"], "risk_increase": 2.0, "population_freq": 0.03, }, { "gene": "HFE", "variant": "C282Y", "type": "Risk Factor", "diseases": ["Hemochromatosis"], "risk_increase": 8.5, "population_freq": 0.06, }, { "gene": "FTO", "variant": "rs9939609", "type": "Polygenic", "diseases": ["Type 2 Diabetes", "Obesity"], "risk_increase": 3.2, "population_freq": 0.42, }, ] # Risk summary st.markdown("### Risk Summary") risk_df = pd.DataFrame( [ { "Disease": ", ".join(v["diseases"]), "Gene": v["gene"], "Variant": v["variant"], "Type": v["type"], "Risk Increase": f"{v['risk_increase']}%", "Population Frequency": f"{v['population_freq'] * 100:.2f}%", } for v in detected_variants ] ) st.dataframe(risk_df, width="stretch", hide_index=True) # Visualize risk levels st.markdown("### Disease Risk Levels") # Calculate aggregate risk scores disease_risks = {} for variant in detected_variants: for disease in variant["diseases"]: if disease not in disease_risks: disease_risks[disease] = 10.0 # baseline disease_risks[disease] += variant["risk_increase"] / len(variant["diseases"]) # Create visualization fig = go.Figure() diseases = list(disease_risks.keys()) risks = [min(disease_risks[d], 100) for d in diseases] colors = ["#ff4444" if r > 40 else "#ffaa00" if r > 20 else "#44ff44" for r in risks] fig.add_trace( go.Bar( x=diseases, y=risks, marker=dict(color=colors), text=[f"{r:.1f}%" for r in risks], textposition="outside", ) ) fig.update_layout( title="Predicted Disease Risk Levels", xaxis_title="Disease", yaxis_title="Risk Level (%)", yaxis=dict(range=[0, 100]), height=400, ) st.plotly_chart(fig, width="stretch") # Risk interpretation st.markdown("### Risk Interpretation") for disease, risk in disease_risks.items(): risk_level = "High" if risk > 40 else "Moderate" if risk > 20 else "Low" risk_color = "🔴" if risk > 40 else "🟡" if risk > 20 else "🟢" with st.expander(f"{risk_color} {disease} - {risk_level} Risk ({risk:.1f}%)"): st.markdown(f""" **Risk Level:** {risk:.1f}% (Population average: ~10%) **Recommendations:** - {"Regular screening recommended" if risk > 40 else "Maintain healthy lifestyle"} - {"Consult with genetic counselor" if risk > 40 else "Standard preventive measures"} - {"Consider preventive strategies" if risk > 20 else "Continue monitoring"} **Contributing Variants:** """) def render_drugs_clinical_trials_page(): """Render the Drugs & Clinical Trials page""" st.title("💊 Drugs & Clinical Trials") st.markdown( """
Search for drug information, FDA approvals, clinical trials, and explore drug repurposing opportunities.
""", unsafe_allow_html=True, ) # Drug input st.header("🔍 Drug Search") # Get current drug from session state or show input current_drug = st.session_state.get("current_drug", "") drug_name = st.text_input( "Enter Drug Name:", value=current_drug, # Keep previous search if exists placeholder="e.g., Aspirin, Imatinib, Metformin", help="Enter the name of a drug to search for information", key="drug_search_input", ) if drug_name and drug_name != current_drug: # NEW SEARCH - Clear old data st.session_state.current_drug = drug_name.strip() st.session_state.repurposing_results = None # Clear old results st.session_state.show_drug_analysis = False st.rerun() if st.session_state.get("current_drug"): drug_name = st.session_state.current_drug # Action buttons col1, col2 = st.columns([1, 1]) with col1: run_analysis = st.button( "🔬 Analyze Repurposing Opportunities", type="primary", width="stretch", key="analyze_drug_button", ) with col2: clear_search = st.button("🔄 New Search", width="stretch", key="clear_drug_search") if clear_search: st.session_state.current_drug = None st.session_state.repurposing_results = None st.session_state.show_drug_analysis = False st.rerun() if run_analysis: with st.spinner( f"🔍 Analyzing {drug_name} across clinical trials, FDA database, and repurposing networks..." ): # Always fetch fresh data - don't use cached repurposing_results repurposing_report = _generate_repurposing_report_data( drug_name, st.session_state.api_client, st.session_state.cache_manager ) st.session_state.repurposing_results = repurposing_report st.session_state.show_drug_analysis = True st.success(f"✅ Analysis complete for {drug_name}!") st.rerun() # Display drug information if analysis was run if st.session_state.get("show_drug_analysis") and st.session_state.get( "repurposing_results" ): st.divider() st.header(f"📋 {drug_name} - Complete Profile") # Tabs for different sections tab1, tab2, tab3 = st.tabs( [ "📜 FDA-Approved Drugs & Clinical Trials", "🔄 Drug Repurposing Engine", "📊 Detailed Information", ] ) with tab1: render_fda_clinical_trials(drug_name, st.session_state.repurposing_results) with tab2: render_drug_repurposing_section(drug_name, st.session_state.repurposing_results) with tab3: render_drug_detailed_info(drug_name) def render_fda_clinical_trials(drug_name, report_data=None): """Render FDA approval status and clinical trials information""" st.subheader("📜 FDA Approval Status & Clinical Trials") # Use provided report data or show message if not report_data: st.info( "Click 'Analyze Repurposing Opportunities' to fetch clinical trial data for this drug" ) return # Get clinical trials from report clinical_trials = report_data.get("clinical_trials", []) if not clinical_trials: st.warning(f"⚠️ No clinical trials found for {drug_name} in ClinicalTrials.gov") st.info(""" This could mean: - The drug is not currently in active clinical trials - The drug name may need to be spelled differently - The drug may be an older medication with no new trials **To search manually:** Visit [ClinicalTrials.gov](https://clinicaltrials.gov/) """) else: st.success(f"✅ Found {len(clinical_trials)} clinical trial(s) for {drug_name}") st.markdown("---") st.markdown("### 🔬 Clinical Trials") # Filter options col1, col2, col3 = st.columns(3) with col1: trial_status = st.multiselect( "Trial Status:", [ "Recruiting", "Active, not recruiting", "Completed", "Terminated", "RECRUITING", "ACTIVE_NOT_RECRUITING", "COMPLETED", "TERMINATED", ], default=[ "Recruiting", "RECRUITING", "Active, not recruiting", "ACTIVE_NOT_RECRUITING", ], key="clinical_trial_status_filter", ) with col2: trial_phase = st.multiselect( "Phase:", [ "Phase 1", "Phase 2", "Phase 3", "Phase 4", "PHASE_1", "PHASE_2", "PHASE_3", "PHASE_4", ], default=["Phase 2", "Phase 3", "PHASE_2", "PHASE_3"], key="clinical_trial_phase_filter", ) with col3: # Get unique conditions from trials all_conditions = set() for trial in clinical_trials: cond = trial.get("condition", "") if cond and cond != "N/A": all_conditions.add(str(cond)[:50]) # Truncate long conditions condition = st.selectbox( "Filter by Condition:", ["All"] + sorted(list(all_conditions)), key="clinical_trial_condition_filter", ) st.markdown("---") # Display trials for trial in clinical_trials: nct_id = trial.get("nct_id") or trial.get("trial_id", "N/A") trial_status_val = trial.get("status", "Unknown") trial_phase_val = trial.get("phase", "N/A") trial_condition = trial.get("condition", "N/A") # Apply filters status_match = ( any(s.upper() in str(trial_status_val).upper() for s in trial_status) if trial_status else True ) phase_match = ( any(p.upper() in str(trial_phase_val).upper() for p in trial_phase) if trial_phase else True ) condition_match = ( condition == "All" or condition.lower() in str(trial_condition).lower() ) if not (status_match and phase_match and condition_match): continue status_color = ( "#28a745" if "COMPLETED" in str(trial_status_val).upper() else "#ff9800" if "RECRUITING" in str(trial_status_val).upper() else "#dc3545" ) phase_icon = ( "✅" if "PHASE_3" in str(trial_phase_val).upper() else "🔄" if "PHASE_2" in str(trial_phase_val).upper() else "🧪" ) with st.expander( f"{phase_icon} **{trial.get('title', 'N/A')[:70]}...** | {trial_status_val} | NCT: {nct_id}", expanded=False, ): col1, col2 = st.columns([2, 1]) with col1: st.markdown("**Trial Information:**") st.markdown(f""" - **Trial ID:** {nct_id} - **Phase:** {trial_phase_val} - **Status:** {trial_status_val} - **Start Date:** {trial.get("start_date", trial.get("start_year", "N/A"))} - **Enrolled Patients:** {trial.get("enrolled", "N/A")} - **Sponsor:** {trial.get("sponsor", "N/A")} """) st.markdown("**Study Details:**") st.markdown(f""" - **Condition:** {trial_condition} - **Intervention:** {trial.get("intervention", drug_name)} - **Primary Outcome:** {trial.get("primary_outcome", "N/A")} """) with col2: st.markdown( f"""

{trial_status_val}

Study Status

{trial_phase_val}
""", unsafe_allow_html=True, ) trial_url = trial.get("url", "") if trial_url: st.markdown(f"[View on ClinicalTrials.gov]({trial_url})") else: clinicaltrials_url = build_clinicaltrials_url(nct_id) if clinicaltrials_url: st.markdown(f"[View on ClinicalTrials.gov]({clinicaltrials_url})") st.divider() # Fetch clinical trials data from ClinicalTrials.gov (verified NCT IDs) clinical_trials = [] if "api_client" in st.session_state: try: clinical_trials = run_async_safe( st.session_state.api_client.fetch_clinical_trials_by_drug(drug_name) ) except ExternalServiceError as e: logger.warning( f"ClinicalTrials.gov fetch error: {e.internal_message}", extra=create_log_context( "clinical_trials_fetch", drug_name=drug_name, **e.log_details ), ) except Exception as e: logger.exception( f"Unexpected error fetching clinical trials for {drug_name}", extra=create_log_context( "clinical_trials_fetch", drug_name=drug_name, error_type=type(e).__name__ ), ) else: app_env = get_environment() if app_env.is_development(): st.warning("API client not available; cannot fetch ClinicalTrials.gov data.") else: st.warning("Unable to fetch clinical trials data. Please try again.") logger.error("API client not available for clinical trials fetch", extra=create_log_context("api_client_missing")) valid_trials = [] invalid_trials = [] for trial in clinical_trials: nct_id = _extract_nct_id(trial) if not nct_id: invalid_trials.append(trial) continue trial["nct_id"] = nct_id valid_trials.append(trial) if invalid_trials: logger.debug( f"ClinicalTrials.gov: filtered {len(invalid_trials)} invalid entries", extra=create_log_context( "clinical_trials_fetch", drug_name=drug_name, filtered_count=len(invalid_trials) ), ) st.caption("Some trial entries were excluded due to missing or invalid NCT IDs.") clinical_trials = valid_trials st.markdown(f"**Found {len(clinical_trials)} clinical trials**") if not clinical_trials: st.info("No verified ClinicalTrials.gov entries found for this drug.") encoded_drug = urllib.parse.quote_plus(drug_name) st.markdown( f'Search on ClinicalTrials.gov', unsafe_allow_html=True, ) for trial in clinical_trials: status_key = str(trial.get("status", "")).upper() status_color = ( "#44ff44" if status_key == "RECRUITING" else "#4444ff" if status_key == "ACTIVE_NOT_RECRUITING" else "#888888" ) nct_id = _extract_nct_id(trial) display_nct = nct_id or trial.get("nct_id", "NCT ID unavailable") display_status = _format_status(trial.get("status")) display_phase = _format_phase(trial.get("phase")) with st.expander(f"🔬 {display_nct} - {trial.get('title', 'N/A')}"): col1, col2 = st.columns([2, 1]) with col1: st.markdown(f""" **Title:** {trial["title"]} **Condition:** {", ".join(trial.get("conditions", [])) or trial.get("condition", "N/A")} **Sponsor:** {trial.get("sponsor", "N/A")} **Locations:** {trial.get("locations", "N/A")} """) with col2: st.markdown( f"""
Status: {display_status}
Phase: {display_phase}
Enrollment: {trial.get("enrollment", "N/A")} participants
""", unsafe_allow_html=True, ) st.markdown(f"**Start Date:** {trial.get('start_date', 'N/A')}") clinicaltrials_url = build_clinicaltrials_url(nct_id) if clinicaltrials_url: st.markdown( f'View on ClinicalTrials.gov', unsafe_allow_html=True, ) else: st.caption("ClinicalTrials.gov link unavailable (missing or invalid NCT ID).") # Trial statistics st.divider() st.markdown("### Trial Statistics") # Create visualization status_counts = {} phase_counts = {} for trial in clinical_trials: status_counts[trial["status"]] = status_counts.get(trial["status"], 0) + 1 phase_counts[trial["phase"]] = phase_counts.get(trial["phase"], 0) + 1 col1, col2 = st.columns(2) with col1: fig_status = go.Figure( data=[ go.Pie( labels=list(status_counts.keys()), values=list(status_counts.values()), hole=0.4 ) ] ) fig_status.update_layout(title="Trials by Status", height=300) st.plotly_chart(fig_status, width="stretch") with col2: fig_phase = go.Figure( data=[ go.Bar( x=list(phase_counts.keys()), y=list(phase_counts.values()), marker_color="#1f77b4", ) ] ) fig_phase.update_layout(title="Trials by Phase", height=300) st.plotly_chart(fig_phase, width="stretch") def _generate_detailed_text_report(drug_name: str, report_data: dict) -> str: """Generate a comprehensive detailed text report for drug repurposing analysis""" from datetime import datetime report = [] # Header report.append("=" * 80) report.append("COMPREHENSIVE DRUG REPURPOSING ANALYSIS REPORT") report.append("=" * 80) report.append("") # Report metadata report.append("REPORT METADATA") report.append("-" * 80) report.append(f"Drug Name: {drug_name}") report.append(f"Report Generated: {report_data['metadata']['report_date']}") report.append(f"Analysis Type: Computational Network Analysis + Clinical Evidence Review") report.append("") # Executive Summary report.append("EXECUTIVE SUMMARY") report.append("-" * 80) approved_count = len(report_data.get("approved_drugs", [])) trials_count = len(report_data.get("clinical_trials", [])) repurposing_count = len(report_data.get("repurposing_opportunities", [])) report.append(f"This comprehensive analysis examines {drug_name} for potential therapeutic") report.append(f"applications beyond current approved indications.") report.append("") report.append(f"Analysis Summary:") report.append(f" • Current FDA-Approved Indications: {approved_count}") report.append(f" • Associated Clinical Trials: {trials_count}") report.append(f" • Identified Repurposing Opportunities: {repurposing_count}") report.append("") # Section 1: FDA-Approved Indications report.append("SECTION 1: FDA-APPROVED INDICATIONS & CURRENT CLINICAL USE") report.append("=" * 80) report.append("") approved_drugs = report_data.get("approved_drugs", []) if approved_drugs: for i, drug in enumerate(approved_drugs, 1): report.append(f"{i}. {drug['indication']}") report.append("-" * 80) report.append(f" Drug Name: {drug['name']}") report.append(f" Approval Date: {drug['approval_date']}") report.append(f" Status: {drug['status']}") report.append(f" DrugBank ID: {drug.get('drugbank_id', 'N/A')}") report.append(f" PubChem ID: {drug.get('pubchem_id', 'N/A')}") report.append(f" Confidence Score: {drug['confidence_score']}%") report.append("") report.append(f" Mechanism of Action:") report.append(f" {drug['mechanism']}") report.append("") report.append(f" Target Proteins:") for target in drug.get("target_proteins", []): report.append(f" • {target}") report.append("") report.append(f" Evidence Source: {drug['evidence_source']}") report.append("") else: report.append("No approved indications found.") report.append("") # Section 2: Clinical Trials report.append("SECTION 2: ASSOCIATED CLINICAL TRIALS") report.append("=" * 80) report.append("") clinical_trials = report_data.get("clinical_trials", []) if clinical_trials: for i, trial in enumerate(clinical_trials, 1): nct_id = _extract_nct_id(trial) report.append(f"{i}. {trial['title']}") report.append("-" * 80) report.append(f" Trial ID (NCT): {nct_id or trial.get('trial_id', 'N/A')}") report.append(f" Phase: {trial['phase']}") report.append(f" Status: {trial['status']}") report.append(f" Condition: {trial['condition']}") report.append(f" Start Year: {trial['start_year']}") report.append(f" Enrolled Patients: {trial.get('enrolled', 'N/A')}") report.append(f" Sponsor: {trial.get('sponsor', 'N/A')}") report.append(f" Intervention: {trial.get('intervention', 'N/A')}") report.append(f" Primary Outcome: {trial.get('primary_outcome', 'N/A')}") clinicaltrials_url = build_clinicaltrials_url(nct_id) report.append(f" ClinicalTrials URL: {clinicaltrials_url or 'N/A'}") report.append("") else: report.append("No associated clinical trials found.") report.append("") # Section 3: Repurposing Opportunities report.append("SECTION 3: IDENTIFIED REPURPOSING OPPORTUNITIES") report.append("=" * 80) report.append("") repurposing_opps = report_data.get("repurposing_opportunities", []) if repurposing_opps: # Sort by confidence score (descending) sorted_opps = sorted(repurposing_opps, key=lambda x: x["confidence"], reverse=True) for i, opp in enumerate(sorted_opps, 1): report.append(f"{i}. {opp['disease']}") report.append("-" * 80) report.append(f" Confidence Score: {opp['confidence']:.1f}%") report.append(f" Priority Level: {opp['priority']}") report.append(f" Status: {opp['status']}") report.append("") report.append(f" PROPOSED MECHANISM OF ACTION:") report.append(f" {opp['mechanism']}") report.append("") report.append(f" CLINICAL RATIONALE:") report.append(f" {opp['clinical_rationale']}") report.append("") report.append(f" SUPPORTING EVIDENCE:") for j, evidence in enumerate(opp.get("evidence", []), 1): report.append(f" {j}. {evidence}") report.append("") if opp.get("external_references"): report.append(f" EXTERNAL REFERENCES:") for reference in opp.get("external_references", []): report.append( f" - {reference.get('label', 'Reference')}: {reference.get('url', 'N/A')}" ) report.append("") report.append(f" AFFECTED BIOLOGICAL PATHWAYS:") for pathway in opp.get("affected_pathways", []): report.append(f" • {pathway}") report.append("") report.append(f" NETWORK ANALYSIS:") report.append(f" • Shared Target Proteins: {opp.get('shared_targets', 'N/A')}") report.append( f" • Supporting Publications: {opp.get('supporting_publications', 'N/A')}" ) report.append("") else: report.append("No repurposing opportunities identified.") report.append("") # Section 4: Analysis Methodology report.append("SECTION 4: ANALYSIS METHODOLOGY") report.append("=" * 80) report.append("") report.append("This analysis was conducted using the following approach:") report.append("") report.append("1. BIOLOGICAL NETWORK ANALYSIS") report.append(" • Drug target identification and protein interaction networks") report.append(" • Pathway enrichment analysis") report.append(" • Disease similarity scoring") report.append("") report.append("2. CLINICAL TRIAL DATA INTEGRATION") report.append(" • Mining of ClinicalTrials.gov for past and ongoing trials") report.append(" • Analysis of trial outcomes and conditions") report.append("") report.append("3. LITERATURE-BASED EVIDENCE SYNTHESIS") report.append(" • PubMed literature mining for mechanistic evidence") report.append(" • Case reports and observational studies review") report.append(" • Preclinical model data integration") report.append("") report.append("4. CONFIDENCE SCORING") report.append(" • Multi-evidence confidence calculation (0-100%)") report.append(" • High (>70%): Strong mechanistic and clinical evidence") report.append(" • Moderate (50-70%): Reasonable mechanistic basis with some evidence") report.append(" • Low (<50%): Preliminary evidence or speculative indication") report.append("") # Section 5: Important Disclaimers report.append("SECTION 5: IMPORTANT DISCLAIMERS & LIMITATIONS") report.append("=" * 80) report.append("") report.append("DISCLAIMER:") report.append("This analysis is for RESEARCH AND EDUCATIONAL PURPOSES ONLY.") report.append("") report.append("⚠️ IMPORTANT:") report.append("• This computational analysis does NOT constitute medical advice") report.append("• Results should NOT be used for patient-care decisions") report.append("• All repurposing suggestions are EXPERIMENTAL and require") report.append(" rigorous clinical validation") report.append("• Consult qualified healthcare providers before any medical decisions") report.append("• All proposed uses require appropriate clinical trial design and") report.append(" regulatory approval") report.append("") report.append("LIMITATIONS:") report.append("• Analysis based on computational predictions and published literature") report.append("• Confidence scores reflect available evidence quality, not efficacy") report.append("• Drug safety and pharmacokinetics not fully addressed here") report.append("• Patient-specific factors (genetics, comorbidities) not considered") report.append("• Dosing recommendations NOT provided in this analysis") report.append("• Clinical trial phase-dependent safety concerns may apply") report.append("") # Section 6: Recommendations for Further Investigation report.append("SECTION 6: RECOMMENDATIONS FOR FURTHER INVESTIGATION") report.append("=" * 80) report.append("") # Identify high-priority opportunities high_priority = [opp for opp in repurposing_opps if opp["priority"] == "High"] if high_priority: report.append("PRIORITY ACTIONS (High Confidence Opportunities):") for opp in high_priority: report.append(f"• {opp['disease']} ({opp['confidence']:.1f}% confidence)") report.append(f" - Recommended: Systematic literature review + preclinical validation") report.append(f" - Next step: Clinical trial design feasibility assessment") report.append("") report.append("GENERAL RECOMMENDATIONS:") report.append("1. Validate findings through independent literature review") report.append("2. Conduct rigorous preclinical studies in relevant disease models") report.append("3. Assess pharmacokinetic/pharmacodynamic properties for new indications") report.append("4. Evaluate potential off-target effects and safety concerns") report.append("5. Design properly controlled clinical trials for validation") report.append("6. Consult with clinical experts in target disease areas") report.append("7. Consider existing regulatory pathways (fast-track, breakthrough therapy)") report.append("") # Footer report.append("=" * 80) report.append("END OF REPORT") report.append("=" * 80) report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") report.append("") report.append("For more information, contact the research team or visit:") report.append("• DrugBank: https://www.drugbank.ca/") report.append("• ClinicalTrials.gov: https://clinicaltrials.gov/") report.append("• PubMed: https://pubmed.ncbi.nlm.nih.gov/") report.append("• FDA Drug Approvals: https://www.fda.gov/drugs/") report.append("") return "\n".join(report) def _has_external_value(value: Any) -> bool: """Return True when an external database field contains a usable value.""" return bool(value) and str(value).strip().upper() not in {"N/A", "NA", "NONE", "NULL"} def _build_drugbank_url(drug_name: str, drugbank_id: str = "N/A") -> str: if _has_external_value(drugbank_id) and str(drugbank_id).upper().startswith("DB"): return f"https://go.drugbank.com/drugs/{urllib.parse.quote(str(drugbank_id).strip())}" return ( "https://go.drugbank.com/unearth/q" f"?searcher=drugs&query={urllib.parse.quote_plus(drug_name.strip())}" ) def _build_pubchem_url(drug_name: str, pubchem_id: str = "N/A") -> str: if _has_external_value(pubchem_id) and str(pubchem_id).isdigit(): return f"https://pubchem.ncbi.nlm.nih.gov/compound/{pubchem_id}" return f"https://pubchem.ncbi.nlm.nih.gov/compound/{urllib.parse.quote(drug_name.strip())}" def _normalize_orange_book_application(application_number: Any) -> Optional[tuple[str, str]]: """Return the Orange Book application type and number for NDA/ANDA records.""" raw_application = str(application_number or "").strip().upper().replace(" ", "") if not raw_application: return None match = re.fullmatch(r"(NDA|ANDA|N|A)(\d{1,6})", raw_application) if not match: return None prefix, number = match.groups() application_type = "A" if prefix in {"A", "ANDA"} else "N" return application_type, number.zfill(6) def _build_orange_book_search_url(drug_name: str) -> str: return ( "https://www.accessdata.fda.gov/scripts/cder/ob/index.cfm" f"?drugname={urllib.parse.quote_plus(drug_name.strip())}" ) def _build_orange_book_url(drug_name: str, application_number: Any = None) -> str: normalized_application = _normalize_orange_book_application(application_number) if normalized_application: application_type, application_no = normalized_application return ( "https://www.accessdata.fda.gov/scripts/cder/ob/results_product.cfm" f"?Appl_Type={application_type}&Appl_No={application_no}" ) return _build_orange_book_search_url(drug_name) def _build_openfda_label_url(drug_name: str) -> str: query = f'openfda.generic_name:"{drug_name.strip()}"' return ( "https://api.fda.gov/drug/label.json" f"?search={urllib.parse.quote(query)}&limit=1" ) def _build_openfda_label_api_url(search: str) -> str: return ( "https://api.fda.gov/drug/label.json" f"?search={urllib.parse.quote(str(search or '').strip())}&limit=1" ) def _build_dailymed_label_url(set_id: Any) -> str: if isinstance(set_id, list): set_id = next((item for item in set_id if str(item or "").strip()), "") set_id_text = str(set_id or "").strip() if not set_id_text or set_id_text == "N/A": return "" return ( "https://dailymed.nlm.nih.gov/dailymed/drugInfo.cfm" f"?setid={urllib.parse.quote(set_id_text)}" ) def _format_external_text(value: Any) -> str: if isinstance(value, list): chunks = [ re.sub(r"[ \t\r\f\v]+", " ", str(item)).strip() for item in value if str(item).strip() ] text = "\n\n".join(chunks) else: text = str(value or "").strip() text = re.sub(r"[ \t\r\f\v]+", " ", text) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() @st.cache_data(show_spinner=False, ttl=60 * 60 * 24) def _fetch_pubchem_compound_profile(drug_name: str, pubchem_id: str = "N/A") -> Dict[str, Any]: """Fetch drug-specific chemical properties from PubChem PUG-REST.""" requested_properties = ",".join( [ "Title", "MolecularFormula", "MolecularWeight", "IUPACName", "CanonicalSMILES", "IsomericSMILES", "InChIKey", "XLogP", "TPSA", "HBondDonorCount", "HBondAcceptorCount", "RotatableBondCount", "ExactMass", "MonoisotopicMass", "Complexity", ] ) if _has_external_value(pubchem_id) and str(pubchem_id).isdigit(): namespace = "cid" identifier = str(pubchem_id).strip() else: namespace = "name" identifier = drug_name.strip() url = ( f"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/{namespace}/" f"{urllib.parse.quote(identifier, safe='')}/property/{requested_properties}/JSON" ) try: response = requests.get(url, timeout=12) response.raise_for_status() data = response.json() properties = data.get("PropertyTable", {}).get("Properties", []) if not properties: return {"available": False, "error": "No PubChem property record found."} profile = properties[0] cid = str(profile.get("CID", pubchem_id if _has_external_value(pubchem_id) else "N/A")) profile["CID"] = cid profile["pubchem_url"] = _build_pubchem_url(drug_name, cid) profile["image_url"] = f"https://pubchem.ncbi.nlm.nih.gov/image/imgsrv.fcgi?cid={cid}&t=l" return {"available": True, "profile": profile} except Exception as exc: return {"available": False, "error": str(exc)} def _openfda_name_matches(candidate: Any, drug_name: str) -> bool: normalized_drug = re.sub(r"[^a-z0-9]+", "", drug_name.lower()) normalized_candidate = re.sub(r"[^a-z0-9]+", "", str(candidate or "").lower()) return bool(normalized_drug and normalized_candidate and normalized_drug == normalized_candidate) def _score_openfda_application_match(record: Dict[str, Any], drug_name: str) -> int: application_number = record.get("application_number", "") score = 0 if str(application_number).upper().startswith("NDA"): score += 40 elif str(application_number).upper().startswith("ANDA"): score += 20 products = record.get("products", []) or [] for product in products: if _openfda_name_matches(product.get("brand_name"), drug_name): score += 30 for ingredient in product.get("active_ingredients", []) or []: if _openfda_name_matches(ingredient.get("name"), drug_name): score += 24 openfda = record.get("openfda", {}) or {} for field_name in ("brand_name", "generic_name", "substance_name"): if any(_openfda_name_matches(value, drug_name) for value in openfda.get(field_name, []) or []): score += 18 return score @st.cache_data(show_spinner=False, ttl=60 * 60 * 24) def _fetch_openfda_application_summary(drug_name: str) -> Dict[str, Any]: """Resolve a drug name to an FDA application number that Orange Book can deep-link.""" drug_query = drug_name.strip() if not drug_query: return {"available": False, "source_url": _build_orange_book_search_url(drug_name)} exact_query = " OR ".join( [ f'openfda.brand_name:"{drug_query}"', f'openfda.generic_name:"{drug_query}"', f'openfda.substance_name:"{drug_query}"', f'products.brand_name:"{drug_query}"', f'products.active_ingredients.name:"{drug_query}"', ] ) source_url = ( "https://api.fda.gov/drug/drugsfda.json" f"?search={urllib.parse.quote(exact_query)}&limit=10" ) try: response = requests.get( "https://api.fda.gov/drug/drugsfda.json", params={"search": exact_query, "limit": 10}, timeout=15, ) response.raise_for_status() records = response.json().get("results", []) except Exception as e: logger.info( "Unable to resolve FDA application number", extra=create_log_context( "openfda_application_lookup", drug_name=drug_name, error_type=type(e).__name__, ), ) return { "available": False, "error": str(e), "source_url": source_url, "orange_book_url": _build_orange_book_search_url(drug_name), } scored_records = [ (_score_openfda_application_match(record, drug_name), record) for record in records if _normalize_orange_book_application(record.get("application_number")) ] if not scored_records: return { "available": False, "source_url": source_url, "orange_book_url": _build_orange_book_search_url(drug_name), } best_score, best_record = max(scored_records, key=lambda item: item[0]) if best_score <= 0: return { "available": False, "source_url": source_url, "orange_book_url": _build_orange_book_search_url(drug_name), } application_number = str(best_record.get("application_number", "")).strip() return { "available": True, "application_number": application_number, "sponsor_name": best_record.get("sponsor_name", "N/A"), "products": best_record.get("products", []), "source_url": source_url, "orange_book_url": _build_orange_book_url(drug_name, application_number), } @st.cache_data(show_spinner=False, ttl=60 * 60 * 24) def _fetch_openfda_label_summary(drug_name: str) -> Dict[str, Any]: """Fetch drug-specific safety sections from FDA labeling via openFDA.""" drug_query = drug_name.strip() searches = [ f'openfda.generic_name:"{drug_query}"', f'openfda.brand_name:"{drug_query}"', f'openfda.substance_name:"{drug_query}"', drug_query, ] for search in searches: try: response = requests.get( "https://api.fda.gov/drug/label.json", params={"search": search, "limit": 1}, timeout=12, ) if response.status_code == 404: continue response.raise_for_status() results = response.json().get("results", []) if not results: continue label = results[0] openfda = label.get("openfda", {}) sections = { "Boxed Warning": label.get("boxed_warning"), "Warnings": label.get("warnings"), "Warnings and Precautions": label.get("warnings_and_cautions"), "Contraindications": label.get("contraindications"), "Adverse Reactions": label.get("adverse_reactions"), "Drug Interactions": label.get("drug_interactions"), "Use in Specific Populations": label.get("use_in_specific_populations"), } rendered_sections = { title: _format_external_text(text) for title, text in sections.items() if text and _format_external_text(text) } spl_set_id = label.get("set_id", "N/A") return { "available": bool(rendered_sections), "sections": rendered_sections, "generic_names": openfda.get("generic_name", []), "brand_names": openfda.get("brand_name", []), "manufacturer_names": openfda.get("manufacturer_name", []), "spl_set_id": spl_set_id, "label_url": _build_dailymed_label_url(spl_set_id), "source_url": _build_openfda_label_api_url(search), } except Exception: continue return { "available": False, "sections": {}, "source_url": _build_openfda_label_url(drug_name), } def _phase_strength(phase: str) -> int: phase_text = str(phase or "").upper() if "4" in phase_text or "PHASE IV" in phase_text: return 18 if "3" in phase_text or "PHASE III" in phase_text: return 15 if "2" in phase_text or "PHASE II" in phase_text: return 10 if "1" in phase_text or "PHASE I" in phase_text: return 5 return 2 def _status_strength(status: str) -> int: status_text = str(status or "").upper() if "RECRUITING" in status_text: return 10 if "ACTIVE" in status_text or "ENROLLING" in status_text: return 8 if "COMPLETED" in status_text: return 5 if "TERMINATED" in status_text or "WITHDRAWN" in status_text: return -8 return 2 def _score_trial_condition( condition_trials: List[Dict[str, Any]], drug_metadata: Dict[str, Any], has_targets: bool, ) -> float: """Score a trial-derived opportunity from trial depth, maturity, and source support.""" score = 25.0 score += min(20.0, len(condition_trials) * 4.0) score += max((_phase_strength(trial.get("phase", "")) for trial in condition_trials), default=0) score += max((_status_strength(trial.get("status", "")) for trial in condition_trials), default=0) if _has_external_value(drug_metadata.get("drugbank_id")): score += 5.0 if _has_external_value(drug_metadata.get("pubchem_id")): score += 5.0 if has_targets: score += 8.0 return round(min(95.0, max(5.0, score)), 1) def _priority_from_confidence(confidence: float) -> str: if confidence >= 70: return "High" if confidence >= 45: return "Moderate" return "Low" def _format_target_effect_summary(drug_name: str, targets: List[Dict[str, Any]]) -> str: if not targets: return ( f"No ChEMBL/curated protein targets were resolved for {drug_name}; " "interpret repurposing signals from trial evidence only." ) parts = [] for target in targets[:4]: target_name = target.get("target_name") or target.get("uniprot_id") or "unknown target" action = target.get("action_type") or "reported activity" parts.append(f"{action} at {target_name}") extra = "" if len(targets) <= 4 else f", plus {len(targets) - 4} additional target(s)" return "; ".join(parts) + extra def _condition_reference_links( drug_name: str, condition: str, drug_metadata: Dict[str, Any], ) -> List[Dict[str, str]]: pubchem_id = drug_metadata.get("pubchem_id", "N/A") drugbank_id = drug_metadata.get("drugbank_id", "N/A") query = f"{drug_name} {condition}".strip() return [ { "label": "ClinicalTrials.gov", "url": f"https://clinicaltrials.gov/search?term={urllib.parse.quote(query)}", }, { "label": "PubMed", "url": f"https://pubmed.ncbi.nlm.nih.gov/?term={urllib.parse.quote(query)}", }, {"label": "DrugBank", "url": _build_drugbank_url(drug_name, drugbank_id)}, {"label": "PubChem", "url": _build_pubchem_url(drug_name, pubchem_id)}, ] def _generate_repurposing_report_data(drug_name, api_client=None, cache_manager=None): """Generate comprehensive repurposing report data with drugs and clinical trials - DYNAMIC PER DRUG""" # Normalize drug name for consistent lookups drug_name_normalized = drug_name.strip().lower() report_data = { "metadata": { "drug_name": drug_name, "report_date": datetime.now().isoformat(), "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), }, "approved_drugs": [], "clinical_trials": [], "repurposing_opportunities": [], } # ========== FETCH CLINICAL TRIALS DYNAMICALLY ========== report_data["clinical_trials"] = [] if api_client is not None: try: raw_trials = run_async_safe(api_client.fetch_clinical_trials_by_drug(drug_name)) for trial in raw_trials: nct_id = _extract_nct_id(trial) if not nct_id: continue start_year = "N/A" start_date = trial.get("start_date") if start_date and isinstance(start_date, str) and len(start_date) >= 4: start_year = start_date[:4] conditions = trial.get("conditions", []) condition_value = ( ", ".join(conditions) if conditions else trial.get("condition", "N/A") ) report_data["clinical_trials"].append( { "trial_id": nct_id, "nct_id": nct_id, "title": trial.get("title", "N/A"), "phase": trial.get("phase", "N/A"), "status": trial.get("status", "Unknown"), "condition": condition_value, "intervention": trial.get( "intervention_name", trial.get("drugs", drug_name) ), "sponsor": trial.get("sponsor", "N/A"), "start_year": start_year, "start_date": trial.get("start_date", "N/A"), "enrolled": trial.get("enrollment", "N/A"), "primary_outcome": trial.get("primary_outcome", "N/A"), "url": trial.get("url", ""), } ) except ExternalServiceError as e: logger.warning( f"Clinical trials fetch service error: {e.internal_message}", extra=create_log_context( "clinical_trials_fetch_report", drug_name=drug_name, **e.log_details ), ) st.warning( f"⚠️ Could not fetch clinical trials for {drug_name}. The service may be temporarily unavailable." ) report_data["clinical_trials"] = [] except Exception as e: logger.exception( f"Unexpected error fetching clinical trials for report", extra=create_log_context( "clinical_trials_fetch_report", drug_name=drug_name, error_type=type(e).__name__ ), ) st.warning( f"⚠️ Could not fetch clinical trials for {drug_name}. Please try again later." ) report_data["clinical_trials"] = [] # ========== GENERATE DYNAMIC APPROVED DRUGS SECTION ========== # Create a generic approved drug entry based on the searched drug name # Fetch drug metadata (DrugBank ID, PubChem ID, status) from database/ChEMBL drug_metadata = get_drug_metadata(drug_name) drugbank_id = drug_metadata.get("drugbank_id", "N/A") pubchem_id = drug_metadata.get("pubchem_id", "N/A") targets: List[Dict[str, Any]] = [] network_predictions: List[Dict[str, Any]] = [] if api_client is not None and cache_manager is not None: try: engine = DrugRepurposingEngine(api_client, cache_manager) resolved_drugbank_id = drugbank_id if _has_external_value(drugbank_id) else None target_data = run_async_safe(engine.fetch_drug_targets(drug_name, resolved_drugbank_id)) targets = target_data.get("targets", []) if isinstance(target_data, dict) else [] predictions = run_async_safe( engine.predict_repurposing_opportunities( drug_name, resolved_drugbank_id, max_results=5, ) ) if isinstance(predictions, list): network_predictions = [ prediction for prediction in predictions if prediction.get("disease_name") and prediction.get("confidence", 0) > 0 ] except Exception as e: logger.warning( "Drug repurposing network analysis failed", extra=create_log_context( "drug_repurposing_network", drug_name=drug_name, error_type=type(e).__name__, ), ) target_effect_summary = _format_target_effect_summary(drug_name, targets) target_proteins = [ target.get("target_name") or target.get("uniprot_id") for target in targets if target.get("target_name") or target.get("uniprot_id") ] # Determine confidence score based on data availability base_confidence = 0 if _has_external_value(drugbank_id): base_confidence += 25 # Has DrugBank ID if _has_external_value(pubchem_id): base_confidence += 25 # Has PubChem ID if drug_metadata.get("status") != "Status Unknown - Query FDA Database": base_confidence += 25 # Has known status if report_data["clinical_trials"]: base_confidence += 25 # Has clinical trials if targets: base_confidence = min(100, base_confidence + 10) approved_entry = { "name": drug_name, "drug_id": "N/A", "drugbank_id": drugbank_id, "pubchem_id": pubchem_id, "indication": f"Search Results for {drug_name}", "approval_date": "See FDA Orange Book / Drugs@FDA records", "mechanism": target_effect_summary, "target_proteins": target_proteins, "evidence_source": "ChEMBL/curated targets + ClinicalTrials.gov + PubChem/DrugBank IDs", "confidence_score": min(100, base_confidence), # Cap at 100% "status": drug_metadata.get("status", "Status Unknown - Query FDA Database"), } # If we found clinical trials, update indication if report_data["clinical_trials"]: approved_entry["indication"] = ( f"{len(report_data['clinical_trials'])} active clinical trials found" ) report_data["approved_drugs"] = [approved_entry] # ========== GENERATE DYNAMIC REPURPOSING OPPORTUNITIES ========== opportunities = [] seen_diseases = set() for prediction in network_predictions: disease = prediction.get("disease_name", "N/A") if disease in seen_diseases: continue confidence = float(prediction.get("confidence", 0.0)) affected_proteins = prediction.get("affected_proteins", []) pathways = prediction.get("pathways", []) distance = prediction.get("distance", "N/A") proximity = prediction.get("proximity_score", 0.0) priority = _priority_from_confidence(confidence) opportunities.append( { "disease": disease, "confidence": confidence, "mechanism": ( f"{target_effect_summary} This target profile places {drug_name} near " f"{disease} disease-associated nodes in the drug-protein-disease network. " f"{prediction.get('explanation', '')}" ).strip(), "evidence": [ f"Network proximity score: {float(proximity):.2f}; shortest path distance: {distance}", ( "Affected proteins in the predicted path: " + (", ".join(affected_proteins[:5]) if affected_proteins else "none resolved") ), f"Known/queried drug effect profile: {target_effect_summary}", "Review external literature and trial records before treating this as an actionable hypothesis.", ], "external_references": _condition_reference_links(drug_name, disease, drug_metadata), "status": "Network-Inferred Hypothesis", "clinical_rationale": ( f"The opportunity is ranked from network proximity, disease-protein association " f"strength, and pathway overlap rather than a fixed placeholder score." ), "priority": priority, "affected_pathways": pathways or ["No pathway node resolved in shortest path"], "shared_targets": len(affected_proteins), "supporting_publications": 0, } ) seen_diseases.add(disease) if report_data["clinical_trials"]: trials_by_condition: Dict[str, List[Dict[str, Any]]] = {} for trial in report_data["clinical_trials"]: condition_text = trial.get("condition", "") if not condition_text or condition_text == "N/A": continue for condition in [c.strip() for c in condition_text.split(",") if c.strip()]: trials_by_condition.setdefault(condition, []).append(trial) for condition, condition_trials in sorted( trials_by_condition.items(), key=lambda item: len(item[1]), reverse=True )[:4]: if condition in seen_diseases: continue confidence = _score_trial_condition(condition_trials, drug_metadata, bool(targets)) sample_trials = condition_trials[:3] trial_ids = [ trial.get("nct_id") or trial.get("trial_id") for trial in sample_trials if trial.get("nct_id") or trial.get("trial_id") ] phases = sorted({trial.get("phase", "N/A") for trial in condition_trials}) statuses = sorted({trial.get("status", "Unknown") for trial in condition_trials}) priority = _priority_from_confidence(confidence) opportunities.append( { "disease": condition, "confidence": confidence, "mechanism": ( f"{drug_name} has direct clinical-trial exposure in {condition}. " f"Drug effect profile resolved for this analysis: {target_effect_summary}" ), "evidence": [ f"{len(condition_trials)} ClinicalTrials.gov record(s) mention {drug_name} and {condition}.", f"Trial phase signal: {', '.join(phases)}; status signal: {', '.join(statuses)}.", ( "Representative NCT IDs: " + (", ".join(trial_ids) if trial_ids else "none resolved") ), "Use the linked trial protocols to inspect endpoints, dose, arm design, and sponsor context.", ], "external_references": _condition_reference_links( drug_name, condition, drug_metadata ), "status": "Clinical-Trial Signal", "clinical_rationale": ( f"This score is calculated from trial count, phase maturity, recruitment/completion " f"status, external identifiers, and resolved targets for {drug_name}." ), "priority": priority, "affected_pathways": ["Trial protocol dependent"], "shared_targets": len(targets), "supporting_publications": 0, } ) seen_diseases.add(condition) if opportunities: opportunities.sort(key=lambda item: item.get("confidence", 0), reverse=True) report_data["repurposing_opportunities"] = opportunities[:6] else: # No clinical trials found report_data["repurposing_opportunities"] = [ { "disease": "Insufficient Repurposing Evidence", "confidence": 0, "mechanism": ( f"No network-based disease hypothesis or clinical-trial condition was resolved for " f"{drug_name}. {target_effect_summary}" ), "evidence": [ "No scored network prediction was available from ChEMBL/curated target data.", "No verified ClinicalTrials.gov condition was available for trial-derived scoring.", "Check spelling, alternate brand/generic names, or external databases directly.", ], "external_references": _condition_reference_links( drug_name, drug_name, drug_metadata ), "status": "Insufficient Data", "clinical_rationale": "More research needed to assess repurposing potential", "priority": "Low", "affected_pathways": [], "shared_targets": 0, "supporting_publications": 0, } ] return report_data def render_drug_repurposing_section(drug_name, report_data=None): """Render the Drug Repurposing Engine section with detailed reports and downloads""" st.subheader("🔄 Drug Repurposing Engine") st.markdown( """
Explore potential new therapeutic uses for existing drugs based on clinical trial data and network analysis.
""", unsafe_allow_html=True, ) # Initialize drug repurposing engine if not already done if "repurposing_engine" not in st.session_state: st.session_state.repurposing_engine = DrugRepurposingEngine( st.session_state.api_client, st.session_state.cache_manager ) if not report_data: st.info("Click 'Analyze Repurposing Opportunities' to fetch repurposing data for this drug") return # Display report data report = report_data # ==================================================================== # APPROVED DRUGS & CURRENT INDICATIONS SECTION # ==================================================================== st.markdown("### 💊 Current Indications / Trial Information") st.info(f"Status and information for **{drug_name}** from clinical trial databases") for drug in report.get("approved_drugs", []): with st.expander( f"ℹ️ **{drug['indication']}** | Confidence: {drug['confidence_score']}%", expanded=True ): col1, col2 = st.columns([2, 1]) with col1: st.markdown("**Drug Details:**") st.markdown(f""" - **Drug Name:** {drug["name"]} - **DrugBank ID:** {drug.get("drugbank_id", "N/A")} - **PubChem ID:** {drug.get("pubchem_id", "N/A")} - **Status:** {drug["status"]} """) st.markdown("**Mechanism of Action:**") st.markdown(f"- {drug['mechanism']}") if drug.get("target_proteins"): st.markdown("**Target Protein(s):**") for target in drug.get("target_proteins", []): st.markdown(f"- {target}") st.markdown(f"**Evidence Source:** {drug['evidence_source']}") with col2: st.markdown( f"""

{drug["confidence_score"]}%

Confidence

{drug["status"]}
""", unsafe_allow_html=True, ) st.divider() # ==================================================================== # REPURPOSING OPPORTUNITIES SECTION # ==================================================================== st.markdown("### 🎯 Potential Repurposing Opportunities") st.info("Novel therapeutic indications discovered through trial data and network analysis") repurposing_opportunities = report.get("repurposing_opportunities", []) if repurposing_opportunities: for opp in repurposing_opportunities: confidence = opp.get("confidence", 0) priority = opp.get("priority", "Low") priority_color = ( "#ff4444" if priority == "High" else "#ffaa00" if priority == "Moderate" else "#4444ff" ) confidence_color = ( "#44ff44" if confidence > 70 else "#ffaa00" if confidence > 50 else "#ff9999" ) with st.expander( f"🎯 {opp['disease']} - {confidence:.1f}% Confidence ({priority} Priority)", expanded=False, ): col1, col2 = st.columns([2, 1]) with col1: st.markdown("**Proposed Mechanism:**") st.markdown(opp["mechanism"]) st.markdown("**Supporting Evidence:**") for evidence in opp.get("evidence", []): st.markdown(f"- {evidence}") if opp.get("external_references"): st.markdown("**External References:**") for reference in opp.get("external_references", []): label = reference.get("label", "Reference") url = reference.get("url", "") if url: st.markdown(f"- [{label}]({url})") st.markdown(f"**Clinical Rationale:** {opp['clinical_rationale']}") if opp.get("affected_pathways"): st.markdown("**Affected Pathways:**") for pathway in opp.get("affected_pathways", []): st.markdown(f"- {pathway}") st.markdown(f""" **Network Analysis:** - **Shared Targets:** {opp.get("shared_targets", "N/A")} proteins - **Supporting Publications:** {opp.get("supporting_publications", "N/A")} papers """) with col2: st.markdown( f"""

{confidence:.1f}%

Confidence Score

{priority} Priority
Status:
{opp["status"]}
""", unsafe_allow_html=True, ) st.divider() # ==================================================================== # DOWNLOAD & EXPORT SECTION # ==================================================================== st.markdown("### 💾 Export Report") # Prepare CSV data csv_drugs = pd.DataFrame(report.get("approved_drugs", [])) csv_trials = pd.DataFrame(report.get("clinical_trials", [])) csv_opportunities = pd.DataFrame(report.get("repurposing_opportunities", [])) # Prepare JSON data json_report = report.copy() json_string = json.dumps(json_report, indent=2) col1, col2, col3 = st.columns(3) with col1: # CSV download for drugs if not csv_drugs.empty: csv_drugs_export = csv_drugs.to_csv(index=False) st.download_button( "📥 Drug Info (CSV)", csv_drugs_export, f"{drug_name}_drug_info_{report['metadata']['timestamp']}.csv", "text/csv", key=f"download_drugs_csv_{drug_name}", ) with col2: # CSV download for clinical trials if not csv_trials.empty: csv_trials_export = csv_trials.to_csv(index=False) st.download_button( "📥 Clinical Trials (CSV)", csv_trials_export, f"{drug_name}_clinical_trials_{report['metadata']['timestamp']}.csv", "text/csv", key=f"download_trials_csv_{drug_name}", ) with col3: # CSV download for repurposing opportunities if not csv_opportunities.empty: csv_opps_export = csv_opportunities.to_csv(index=False) st.download_button( "📥 Repurposing (CSV)", csv_opps_export, f"{drug_name}_repurposing_opportunities_{report['metadata']['timestamp']}.csv", "text/csv", key=f"download_opportunities_csv_{drug_name}", ) st.download_button( "📥 Full Report (JSON)", json_string, f"{drug_name}_complete_report_{report['metadata']['timestamp']}.json", "application/json", key=f"download_report_json_{drug_name}", ) summary_col1, summary_col2, summary_col3 = st.columns(3) with summary_col2: st.markdown("**Summary Statistics**") st.markdown(f""" - **Total Indications:** {len(csv_drugs)} - **Active/Past Clinical Trials:** {len(csv_trials)} - **Repurposing Opportunities:** {len(csv_opportunities)} - **Report Generated:** {report["metadata"]["report_date"]} """) st.divider() def render_drug_detailed_info(drug_name): """Render detailed drug information - DYNAMIC PER DRUG""" st.subheader("📊 Detailed Drug Information") drug_metadata = get_drug_metadata(drug_name) drugbank_id = drug_metadata.get("drugbank_id", "N/A") pubchem_id = drug_metadata.get("pubchem_id", "N/A") pubchem_profile = _fetch_pubchem_compound_profile(drug_name, pubchem_id) if pubchem_profile.get("available"): pubchem_id = str(pubchem_profile["profile"].get("CID", pubchem_id)) drug_metadata["pubchem_id"] = pubchem_id fda_application = _fetch_openfda_application_summary(drug_name) st.info(f""" **Drug:** {drug_name} This section displays drug-specific links, PubChem chemical properties when available, and FDA label safety sections from public databases. """) st.divider() # Create tabs for different information types info_tabs = st.tabs( ["🔍 Search in External Databases", "💊 Generic Drug Properties", "⚠️ Safety Information"] ) with info_tabs[0]: st.markdown("### Direct Links to Drug Databases") col1, col2, col3 = st.columns(3) with col1: drugbank_url = _build_drugbank_url(drug_name, drugbank_id) db_caption = ( f"Open DrugBank page for {drug_name} ({drugbank_id})" if _has_external_value(drugbank_id) else f"Search DrugBank drugs for {drug_name}" ) st.markdown(f"**[🏥 DrugBank]({drugbank_url})**") st.caption(db_caption) with col2: pubchem_url = _build_pubchem_url(drug_name, pubchem_id) pc_caption = ( f"Open PubChem page for {drug_name} (CID {pubchem_id})" if _has_external_value(pubchem_id) else f"Open PubChem name-resolved page for {drug_name}" ) st.markdown(f"**[🧪 PubChem]({pubchem_url})**") st.caption(pc_caption) with col3: pubmed_url = f"https://pubmed.ncbi.nlm.nih.gov/?term={urllib.parse.quote(drug_name)}" st.markdown(f"**[📚 PubMed Literature]({pubmed_url})**") st.caption(f"Search PubMed for {drug_name}") st.markdown("---") col4, col5, col6 = st.columns(3) with col4: fda_url = fda_application.get("orange_book_url") or _build_orange_book_url(drug_name) st.markdown(f"**[📋 FDA Orange Book]({fda_url})**") if fda_application.get("available"): st.caption( "Open FDA Orange Book product record " f"for {drug_name} ({fda_application['application_number']})" ) else: st.caption(f"Search FDA Orange Book product records for {drug_name}") with col5: clinicaltrials_url = ( f"https://clinicaltrials.gov/search?term={urllib.parse.quote(drug_name)}" ) st.markdown(f"**[🏥 ClinicalTrials.gov]({clinicaltrials_url})**") st.caption(f"Search for {drug_name} trials") with col6: wikipedia_url = f"https://en.wikipedia.org/w/api.php?action=query&titles={urllib.parse.quote(drug_name)}&format=json" st.markdown( f"**[🔗 Wikipedia Search](https://en.wikipedia.org/w/index.php?search={urllib.parse.quote(drug_name)})**" ) st.caption(f"General information about {drug_name}") with info_tabs[1]: st.markdown("### Chemical & Physical Properties") if pubchem_profile.get("available"): profile = pubchem_profile["profile"] col_props, col_image = st.columns([2, 1]) property_rows = [ ("Title", profile.get("Title")), ("IUPAC Name", profile.get("IUPACName")), ("Molecular Formula", profile.get("MolecularFormula")), ("Molecular Weight", profile.get("MolecularWeight")), ("Exact Mass", profile.get("ExactMass")), ("Monoisotopic Mass", profile.get("MonoisotopicMass")), ("Canonical SMILES", profile.get("CanonicalSMILES")), ("Isomeric SMILES", profile.get("IsomericSMILES")), ("InChIKey", profile.get("InChIKey")), ("XLogP", profile.get("XLogP")), ("TPSA", profile.get("TPSA")), ("H-Bond Donors", profile.get("HBondDonorCount")), ("H-Bond Acceptors", profile.get("HBondAcceptorCount")), ("Rotatable Bonds", profile.get("RotatableBondCount")), ("Complexity", profile.get("Complexity")), ("DrugBank ID", drugbank_id), ("PubChem CID", profile.get("CID")), ("FDA Application", fda_application.get("application_number")), ("FDA Sponsor", fda_application.get("sponsor_name")), ("Regulatory / Development Status", drug_metadata.get("status")), ] property_df = pd.DataFrame( [ {"Property": label, "Value": value} for label, value in property_rows if _has_external_value(value) ] ) with col_props: st.dataframe(property_df, use_container_width=True, hide_index=True) st.markdown(f"[Open full PubChem compound page]({profile['pubchem_url']})") with col_image: if _has_external_value(profile.get("CID")): st.image( profile["image_url"], caption=f"PubChem structure image for CID {profile['CID']}", ) else: st.info( f"No PubChem chemical property record was resolved for {drug_name}. " "This can happen for biologics, ambiguous names, or compounds missing from PubChem." ) st.markdown( f"- [Try PubChem directly]({_build_pubchem_url(drug_name, pubchem_id)})\n" f"- [Search DrugBank]({_build_drugbank_url(drug_name, drugbank_id)})" ) with info_tabs[2]: st.markdown("### Safety & Side Effects Information") st.warning(f""" **IMPORTANT DISCLAIMER:** This application provides educational information only and should NOT be used for patient-care decisions. Always consult with a healthcare professional regarding drug safety and side effects. The sections below are pulled from FDA labeling when a matching label is available. """) st.markdown("---") safety_profile = _fetch_openfda_label_summary(drug_name) if safety_profile.get("available"): generic_names = ", ".join(safety_profile.get("generic_names") or []) brand_names = ", ".join(safety_profile.get("brand_names") or []) manufacturers = ", ".join(safety_profile.get("manufacturer_names") or []) label_context = [] if generic_names: label_context.append(f"Generic name(s): {generic_names}") if brand_names: label_context.append(f"Brand name(s): {brand_names}") if manufacturers: label_context.append(f"Manufacturer(s): {manufacturers}") if label_context: st.caption(" | ".join(label_context)) for section_title, section_text in safety_profile.get("sections", {}).items(): with st.expander( section_title, expanded=section_title in {"Boxed Warning", "Warnings"}, ): st.write(section_text) label_url = safety_profile.get("label_url") if label_url: st.markdown(f"[Open full readable FDA label on DailyMed]({label_url})") st.markdown( "[Search PubMed adverse-event literature]" f"(https://pubmed.ncbi.nlm.nih.gov/?term={urllib.parse.quote(drug_name + ' adverse reactions safety')})" ) else: st.info( f"No FDA label safety sections were resolved for {drug_name}. " "Use the links below to verify alternate spellings, brand names, or regulatory records." ) st.markdown( f"- [Search raw openFDA label API]({safety_profile['source_url']})\n" f"- [Search FDA Orange Book]({_build_orange_book_url(drug_name)})\n" f"- [Search DrugBank]({_build_drugbank_url(drug_name, drugbank_id)})\n" f"- [Search PubMed safety literature](https://pubmed.ncbi.nlm.nih.gov/?term={urllib.parse.quote(drug_name + ' adverse reactions safety')})" ) st.divider() # Summary st.markdown("### Summary") st.info(f""" **Drug: {drug_name}** ✅ **Next Steps:** 1. Use the external database links above for detailed chemical/pharmaceutical properties 2. Consult your healthcare provider for medical advice 3. Review clinical trial data from the "FDA-Approved Drugs & Clinical Trials" tab 4. Check the "Drug Repurposing Engine" tab for potential therapeutic opportunities 📊 **Data Sources:** - ClinicalTrials.gov API (clinical trial data) - FDA Drug Database - DrugBank (when available) - PubChem (chemical properties) - Published Literature (PubMed) """) def render_portfolio_mode_page(): """Render multi-project portfolio operations cockpit.""" st.header("📁 Portfolio Mode for Biotech Teams") st.caption( "Research portfolio decision support only. Not for clinical or patient-care decisions." ) engine: PortfolioEngine = st.session_state.portfolio_engine portfolios = engine.list_portfolios() portfolio_labels = [f"{p['name']} ({p['owner'] or 'unassigned'})" for p in portfolios] col_left, col_right = st.columns([2, 1]) with col_left: selected_label = st.selectbox( "Select portfolio", ["(none)"] + portfolio_labels, key="portfolio_selected_label" ) selected_portfolio = None if selected_label != "(none)": selected_portfolio = portfolios[portfolio_labels.index(selected_label)] with col_right: with st.expander("Create portfolio"): pf_name = st.text_input("Name", key="pf_name") pf_owner = st.text_input("Owner", key="pf_owner") pf_desc = st.text_area("Description", key="pf_desc", height=80) if st.button("Create portfolio", key="create_portfolio_btn"): if pf_name.strip(): engine.create_portfolio(name=pf_name, owner=pf_owner, description=pf_desc) st.success("Portfolio created.") st.rerun() else: st.warning("Portfolio name is required.") tabs = st.tabs( [ "Portfolio Home", "Project Dashboard", "Exports", ] ) with tabs[0]: st.subheader("Portfolio Home") if not portfolios: st.info("Create your first portfolio to start tracking programs.") else: all_projects = [] for pf in portfolios: all_projects.extend(engine.list_projects(pf["id"])) pipeline_health = { "portfolios": len(portfolios), "projects": len(all_projects), "active": sum( 1 for p in all_projects if (p.get("status") or "").lower() == "active" ), "on_hold": sum( 1 for p in all_projects if "hold" in (p.get("status") or "").lower() ), } c1, c2, c3, c4 = st.columns(4) c1.metric("Portfolios", pipeline_health["portfolios"]) c2.metric("Projects", pipeline_health["projects"]) c3.metric("Active", pipeline_health["active"]) c4.metric("On Hold", pipeline_health["on_hold"]) for pf in portfolios: st.markdown(f"**{pf['name']}** - owner: `{pf['owner'] or 'unassigned'}`") projects = engine.list_projects(pf["id"]) st.dataframe( pd.DataFrame(projects)[ ["name", "indication", "modality", "stage", "status", "owner"] ] if projects else pd.DataFrame( columns=["name", "indication", "modality", "stage", "status", "owner"] ), width="stretch", ) stage_dist = engine.get_stage_distribution(pf["id"]) st.plotly_chart( ProteinVisualizer.create_portfolio_funnel(stage_dist), width="stretch" ) if selected_portfolio: st.divider() st.markdown("**Create project in selected portfolio**") with st.form("create_project_form", clear_on_submit=True): prj_name = st.text_input("Project name") prj_indication = st.text_input("Indication") prj_modality = st.text_input("Modality") prj_stage = st.selectbox( "Stage", [ "discovery", "validation", "lead optimization", "translational", "clinical readiness", ], ) prj_owner = st.text_input("Project owner") prj_status = st.selectbox("Status", ["active", "hold", "completed", "archived"]) submit_project = st.form_submit_button("Create project") if submit_project and prj_name.strip(): engine.create_project( portfolio_id=selected_portfolio["id"], name=prj_name, indication=prj_indication, modality=prj_modality, stage=prj_stage, owner=prj_owner, status=prj_status, ) st.success("Project created.") st.rerun() selected_project = None if selected_portfolio: projects = engine.list_projects(selected_portfolio["id"]) if projects: prj_labels = [f"{p['name']} ({p['stage']})" for p in projects] picked = st.selectbox("Project context", prj_labels, key="portfolio_project_context") selected_project = projects[prj_labels.index(picked)] with tabs[1]: st.subheader("Project Dashboard") if not selected_project: st.info("Select a portfolio and project to view dashboard.") else: dash = engine.get_project_dashboard_data(selected_project["id"]) project = dash["project"] st.write( f"**{project.get('name')}** | stage `{project.get('stage')}` | owner `{project.get('owner') or 'unassigned'}`" ) st.caption(dash["disclaimer"]) col1, col2, col3, col4 = st.columns(4) col1.metric("Milestones", dash["milestone_metrics"]["total_count"]) col2.metric("Completion", f"{dash['milestone_metrics']['completion_pct']}%") col3.metric("Blocked", dash["milestone_metrics"]["blocker_count"]) col4.metric("Upcoming 30d", dash["milestone_metrics"]["upcoming_30d"]) st.markdown("**Milestone tracker**") st.dataframe(pd.DataFrame(dash["milestones"]), width="stretch") st.plotly_chart( ProteinVisualizer.create_milestone_burndown(dash["milestones"]), width="stretch" ) with st.expander("Add milestone"): ms_title = st.text_input("Milestone title", key="ms_title") ms_type = st.selectbox( "Category", [ "validation", "mechanism", "lead optimization", "translational", "clinical readiness", ], key="ms_type", ) ms_due = st.date_input("Due date", key="ms_due") ms_owner = st.text_input("Milestone owner", key="ms_owner") ms_status = st.selectbox( "Status", ["not started", "in progress", "blocked", "complete"], key="ms_status", ) if st.button("Create milestone", key="create_ms_btn"): engine.create_milestone( project_id=selected_project["id"], title=ms_title, milestone_type=ms_type, due_date=ms_due.isoformat(), owner=ms_owner, status=ms_status, criteria={"acceptance": ["Reviewer sign-off"]}, ) st.success("Milestone created.") st.rerun() with tabs[2]: st.subheader("Exports") if not selected_project: st.info("Select a project first.") else: export_fmt = st.selectbox( "Export format", ["json", "csv", "md"], key="portfolio_export_fmt" ) if st.button("Generate project packet", key="generate_packet_btn"): packet = engine.export_project_packet(selected_project["id"], format=export_fmt) st.session_state.portfolio_export_packet = packet packet = st.session_state.get("portfolio_export_packet") if packet: st.write(f"Schema valid: {packet.get('schema_valid', False)}") if export_fmt == "csv" and isinstance(packet.get("content"), dict): for section_name, csv_text in packet["content"].items(): st.text_area(f"CSV - {section_name}", value=csv_text, height=160) else: st.text_area("Export payload", value=packet.get("content", ""), height=320) def render_universal_gene_pipeline_page(): """Render async universal gene-to-pathway pipeline UI.""" st.header("🧬 Universal Gene Pipeline") st.caption( "Upload a gene CSV and optionally a protein CSV to generate pathway enrichment outputs and downloadable artifacts." ) st.warning("Research use only. Not for clinical diagnosis or treatment decisions.") api_client = st.session_state.get("api_client") if api_client is None: st.info("API client not initialized yet.") return with st.expander("CSV Format Guidance", expanded=False): st.markdown( """ - Required: one gene identifier column (prefer `gene`) and one or more numeric sample columns. - Optional: `gene_role` (`driver`, `suppressor`, `driver_suppressor`, `unknown`). - Optional second file: protein CSV including a gene identifier column for merge. """ ) template_df = pd.DataFrame( [ {"gene": "TP53", "gene_role": "driver", "sample_1": 4, "sample_2": 1}, {"gene": "BRCA1", "gene_role": "suppressor", "sample_1": 2, "sample_2": 3}, {"gene": "PIK3CA", "gene_role": "unknown", "sample_1": 1, "sample_2": 0}, ] ) st.download_button( "Download Template CSV", template_df.to_csv(index=False), file_name="universal_gene_pipeline_template.csv", mime="text/csv", key="universal_gene_pipeline_template", ) gene_csv = st.file_uploader( "Gene CSV (required)", type=["csv"], key="universal_gene_pipeline_gene_csv", ) protein_csv = st.file_uploader( "Protein CSV (optional)", type=["csv"], key="universal_gene_pipeline_protein_csv", ) with st.expander("Pipeline Options", expanded=False): auto_detect_columns = st.checkbox( "Auto-detect columns", value=True, key="universal_gene_pipeline_auto_detect", ) col_a, col_b = st.columns(2) with col_a: top_unknown_n = st.number_input( "Top-N load subset", min_value=1, max_value=50000, value=100, step=1, key="universal_gene_pipeline_topn", ) with col_b: workers = st.number_input( "Worker parallelism hint", min_value=1, max_value=32, value=4, step=1, key="universal_gene_pipeline_workers", ) gene_override = st.text_input( "Gene column override (optional)", value="", key="universal_gene_pipeline_gene_override", ) sample_override_text = st.text_input( "Sample column overrides (comma-separated, optional)", value="", key="universal_gene_pipeline_sample_override", ) if st.button( "Submit Universal Gene Pipeline", type="primary", key="universal_gene_pipeline_submit", disabled=gene_csv is None, ): if gene_csv is None: st.error("Gene CSV is required.") else: gene_bytes = gene_csv.getvalue() protein_payload = None if protein_csv is not None: protein_payload = api_client.encode_csv_for_pipeline( protein_csv.name, protein_csv.getvalue(), ) sample_overrides = [c.strip() for c in sample_override_text.split(",") if c.strip()] payload = { "gene_csv": api_client.encode_csv_for_pipeline(gene_csv.name, gene_bytes), "protein_csv": protein_payload, "options": { "auto_detect_columns": bool(auto_detect_columns), "top_unknown_n": int(top_unknown_n), "workers": int(workers), "use_cache": True, "column_overrides": { "gene_column": gene_override.strip() or None, "sample_columns": sample_overrides or None, }, }, } with st.spinner("Submitting pipeline job..."): try: job = api_client.submit_protein_pathway_pipeline_job(payload=payload) except Exception as exc: st.error(f"Pipeline submission failed: {exc}") return st.session_state.universal_gene_pipeline_job = job st.session_state.universal_gene_pipeline_report = None st.success(f"Pipeline job submitted: #{job.get('id')} ({job.get('status')})") st.rerun() job = st.session_state.get("universal_gene_pipeline_job") if not job: st.info("Upload a gene CSV and submit a job to begin.") return st.subheader("Job Status") c1, c2, c3, c4 = st.columns(4) c1.metric("Job ID", str(job.get("id", "N/A"))) c2.metric("Status", str(job.get("status", "queued")).title()) c3.metric("Job Type", str(job.get("job_type", "protein_pathway.enrichment"))) c4.metric("Idempotency", "Set" if job.get("idempotency_key") else "Derived") if st.button("Refresh status", key="universal_gene_pipeline_refresh"): try: job = api_client.poll_protein_pathway_pipeline_job(int(job["id"])) st.session_state.universal_gene_pipeline_job = job except Exception as exc: st.error(f"Unable to refresh job status: {exc}") return status_value = str(job.get("status") or "queued").lower() if status_value in {"queued", "running"}: st.info("Pipeline is still running. Refresh status to poll again.") return if status_value == "failed": st.error(str(job.get("error_message") or "Pipeline failed")) return report = st.session_state.get("universal_gene_pipeline_report") if report is None and status_value in {"completed", "partial"}: try: report = api_client.fetch_protein_pathway_pipeline_report(int(job["id"])) st.session_state.universal_gene_pipeline_report = report except Exception as exc: st.error(f"Unable to fetch pipeline report: {exc}") return if not isinstance(report, dict): st.info("Pipeline report is not ready yet.") return summary = report.get("summary", {}) if isinstance(report.get("summary"), dict) else {} s1, s2, s3, s4 = st.columns(4) s1.metric("Input genes", str(summary.get("total_input_genes", 0))) s2.metric("Mapped genes", str(summary.get("mapped_genes", 0))) s3.metric("Pathway rows", str(summary.get("pathway_rows", 0))) s4.metric("Has gene_role", "Yes" if report.get("has_gene_role") else "No") st.subheader("Input Detection") detection = report.get("input_detection", {}) st.json(detection if isinstance(detection, dict) else {}) skipped_sections = report.get("skipped_sections", []) if isinstance(skipped_sections, list) and skipped_sections: st.subheader("Skipped Sections") for row in skipped_sections: if isinstance(row, dict): st.warning(f"{row.get('section', 'section')}: {row.get('reason', 'skipped')}") stage_errors = report.get("stage_errors", []) if isinstance(stage_errors, list) and stage_errors: st.subheader("Stage Warnings") for row in stage_errors: if isinstance(row, dict): st.warning(f"{row.get('stage', 'stage')}: {row.get('error', 'unknown error')}") artifacts = report.get("artifacts", {}) files = artifacts.get("files", []) if isinstance(artifacts, dict) else [] if isinstance(files, list) and files: st.subheader("Artifacts") artifact_df = pd.DataFrame(files) st.dataframe(artifact_df, width="stretch", hide_index=True) if st.button("Prepare Artifacts ZIP", key="universal_gene_pipeline_download_zip"): try: blob, filename = api_client.fetch_protein_pathway_pipeline_artifacts(int(job["id"])) st.session_state.universal_gene_pipeline_zip_blob = blob st.session_state.universal_gene_pipeline_zip_filename = filename except Exception as exc: st.error(f"Unable to download artifacts: {exc}") zip_blob = st.session_state.get("universal_gene_pipeline_zip_blob") zip_filename = st.session_state.get("universal_gene_pipeline_zip_filename") if isinstance(zip_blob, bytes) and zip_filename: st.download_button( "Download Artifacts ZIP", data=zip_blob, file_name=str(zip_filename), mime="application/zip", key=f"universal_gene_pipeline_download_zip_payload_{job['id']}", ) st.download_button( "Download Report JSON", data=json.dumps(report, indent=2), file_name=f"protein_pathway_pipeline_report_{job['id']}.json", mime="application/json", key=f"universal_gene_pipeline_report_json_{job['id']}", ) if __name__ == "__main__": main()