import os import json import re import textwrap import difflib import logging from copy import deepcopy def parse_ehr_chartsummarydtl(chartsummarydtl): """ Converts EHR API chartsummarydtl list to the internal visit format expected by the summarizer. """ visits = [] for entry in chartsummarydtl: visit = {} # Parse chartdate visit["chartdate"] = entry.get("chartdate", "")[:10] # YYYY-MM-DD # Parse vitals vitals_dict = {} weight = None if "vitals" in entry: for v in entry["vitals"]: if ":" in v: k, val = v.split(":", 1) k = k.strip() val = val.strip() if k.lower().startswith("weight"): weight = val else: vitals_dict[k] = val visit["vitals"] = vitals_dict if weight: visit["weight"] = weight # Allergies if "allergies" in entry: visit["allergies"] = entry["allergies"] # Diagnosis if "diagnosis" in entry: visit["diagnosis"] = entry["diagnosis"] # Medications if "medications" in entry: visit["medications"] = entry["medications"] # Labtests labtests = [] if "labtests" in entry: for l in entry["labtests"]: name = l.get("name", "") value = l.get("value", "") if name or value: labtests.append({"name": name, "value": value}) visit["labtests"] = labtests # Radiology orders if "radiologyorders" in entry: visit["radiologyorders"] = [r.get("name", "") for r in entry["radiologyorders"] if r.get("name")] visits.append(visit) return visits # ========== PROMPT, DELTA, VALIDATION LOGIC (adapted from your script) ============= ALIASES = {("vitals","Bp(sys)(mmHg)"): [("vitals","Bp(sys)(mmHg)"), ("vitals","Bp_sys"), ("vitals","SBP")], ("vitals","Bp(dia)(mmHg)"): [("vitals","Bp(dia)(mmHg)"), ("vitals","Bp_dia"), ("vitals","DBP")], ("labtests","HbA1c (%)"): [("labtests","HbA1c (%)"), ("labtests","HbA1c")], ("labtests","Creatinine Ratio"): [("labtests","Creatinine Ratio"), ("labtests","Creatinine")], } def visits_sorted(v): return sorted(v, key=lambda v: v.get("chartdate", "")) def to_float(val): try: s = str(val); m = re.findall(r"-?\d+\.?\d*", s) return float(m[0]) if m else None except: return None def _latest_value_exact(visits, key_path): v_sorted = visits_sorted(visits) if not v_sorted: return None if key_path[0] == "labtests": for v in reversed(v_sorted): for lab in v.get("labtests", []): if lab.get("name") == key_path[1]: return lab.get("value") return None for v in reversed(v_sorted): cur = v; ok = True for k in key_path: if isinstance(cur, dict) and k in cur: cur = cur[k] else: ok=False; break if ok: return cur return None def latest_value(visits, key_path): for kp in ALIASES.get(key_path, [key_path]): val = _latest_value_exact(visits, kp) if val is not None: return val return None def active_set(visits, field): s = set() for v in visits: s.update(v.get(field, [])) return s def _fmt(x, spec=None): if x is None: return "N/A" try: return format(x, spec) if spec else str(x) except Exception: return str(x) def compute_deltas(old_visits, new_visits): prev_all = old_visits curr_all = old_visits + new_visits def get_val(visits, path): return to_float(latest_value(visits, path)) # Get all unique lab test names from both old and new visits all_lab_names = set() for visits_list in [prev_all, curr_all]: for visit in visits_list: for lab in visit.get("labtests", []): if lab.get("name"): all_lab_names.add(lab["name"]) # Calculate deltas for weight and blood pressure (these are still specific) w_p, w_c = get_val(prev_all, ("weight",)), get_val(curr_all, ("weight",)) s_p, s_c = get_val(prev_all, ("vitals","Bp(sys)(mmHg)")), get_val(curr_all, ("vitals","Bp(sys)(mmHg)")) d_p, d_c = get_val(prev_all, ("vitals","Bp(dia)(mmHg)")), get_val(curr_all, ("vitals","Bp(dia)(mmHg)")) # Calculate deltas for all lab tests dynamically lab_deltas = {} for lab_name in all_lab_names: prev_val = get_val(prev_all, ("labtests", lab_name)) curr_val = get_val(curr_all, ("labtests", lab_name)) if prev_val is not None or curr_val is not None: delta = (curr_val - prev_val) if prev_val is not None and curr_val is not None else None lab_deltas[lab_name] = {"prev": prev_val, "curr": curr_val, "delta": delta} return { "added_dx": sorted(list(active_set(curr_all,"diagnosis") - active_set(prev_all,"diagnosis"))), "started_meds": sorted(list(active_set(curr_all,"medications") - active_set(prev_all,"medications"))), "stopped_meds": sorted(list(active_set(prev_all,"medications") - active_set(curr_all,"medications"))), "weight": {"prev": w_p, "curr": w_c, "delta": (w_c - w_p) if w_p and w_c else None}, "bp_sys": {"prev": s_p, "curr": s_c, "delta": (s_c - s_p) if s_p and s_c else None}, "bp_dia": {"prev": d_p, "curr": d_c, "delta": (d_c - d_p) if d_p and d_c else None}, "labs": lab_deltas, } def build_compact_baseline(all_visits): # Get all unique lab test names all_lab_names = set() for visit in all_visits: for lab in visit.get("labtests", []): if lab.get("name"): all_lab_names.add(lab["name"]) # Build lab string dynamically lab_strings = [] for lab_name in sorted(all_lab_names): lab_value = latest_value(all_visits, ("labtests", lab_name)) if lab_value is not None: lab_strings.append(f"{lab_name}: {lab_value}") labs_text = ", ".join(lab_strings) if lab_strings else "N/A" return f"Latest date: {latest_value(all_visits,('chartdate',)) or 'N/A'}\n" \ f"Active Diagnoses: {', '.join(sorted(active_set(all_visits,'diagnosis'))) or 'N/A'}\n" \ f"Active Medications: {', '.join(sorted(active_set(all_visits,'medications'))) or 'N/A'}\n" \ f"Latest Vitals: Bp: {latest_value(all_visits,('vitals','Bp(sys)(mmHg)'))}/{latest_value(all_visits,('vitals','Bp(dia)(mmHg)'))} mmHg, Weight: {latest_value(all_visits,('weight',))}\n" \ f"Latest Labs: {labs_text}" def delta_to_text(delta): L = [] if delta["added_dx"]: L.append("New Diagnoses: " + ", ".join(delta["added_dx"])) if delta["started_meds"]: L.append("Medications Started: " + ", ".join(delta["started_meds"])) if delta["stopped_meds"]: L.append("Medications Stopped: " + ", ".join(delta["stopped_meds"])) w = delta["weight"]; L.append(f"Weight: {_fmt(w['prev'])} -> {_fmt(w['curr'])} (Δ {_fmt(w['delta'], '+.1f')})") s, d = delta["bp_sys"], delta["bp_dia"]; L.append(f"BP: {_fmt(s['curr'])}/{_fmt(d['curr'])} (Δs {_fmt(s['delta'], '+.0f')}, Δd {_fmt(d['delta'], '+.0f')})") # Add all lab deltas dynamically for lab_name, lab_data in delta["labs"].items(): if lab_data["prev"] is not None or lab_data["curr"] is not None: L.append(f"{lab_name}: {_fmt(lab_data['prev'])} -> {_fmt(lab_data['curr'])} (Δ {_fmt(lab_data['delta'], '+.1f')})") return "\n".join(L) from concurrent.futures import ThreadPoolExecutor, as_completed import threading def generate_section(pipeline, prompt, section_name, timeout=60): """Generate one section with timeout protection.""" try: # If your pipeline supports timeout, pass it. Otherwise, wrap in future. from concurrent.futures import ThreadPoolExecutor as TPE, TimeoutError as TE with TPE(max_workers=1) as executor: future = executor.submit(pipeline.generate_full_summary, prompt, max_tokens=2000, max_loops=3) raw = future.result(timeout=timeout) # Clean: remove instruction residue, extract content patterns_to_split = [ "Now generate the complete", "## Clinical Assessment", "# Clinical Assessment", "Clinical Assessment", "Output ONLY the section content" ] content = raw for pat in patterns_to_split: if pat in content: content = content.split(pat)[-1].strip() # Ensure it starts with section header if not present header = f"## {section_name}" if not content.startswith(header): content = f"{header}\n{content.strip()}" return content.strip() except Exception as e: # Return placeholder if generation fails logging.Logger.error(f"Section '{section_name}' generation failed: {e}") return f"## {section_name}\n- *Generation failed or timed out. Please retry or check logs.*" def build_main_prompt(baseline, delta_text, patient_info="", section=None): base_prompt = ( "You are an expert clinical AI assistant. Your task is to generate a patient summary.\n" "Use the chartsummarydtl for context. The STRUCTURED BASELINE and DELTAS are the absolute ground truth.\n" "Produce a concise, physician-ready summary. Never omit critical new information from the deltas.\n\n" f"PATIENT INFORMATION:\n{patient_info}\n\n" f"STRUCTURED BASELINE (authoritative):\n{baseline}\n\n" f"STRUCTURED DELTAS (authoritative):\n{delta_text}\n\n" ) if section: section_prompts = { "Clinical Assessment": "Generate ONLY the 'Clinical Assessment' section. Be concise, accurate, and evidence-based.", "Key Trends & Changes": "Generate ONLY the 'Key Trends & Changes' section. Focus on deltas, trends, vitals, labs, and med changes.", "Plan & Suggested Actions": "Generate ONLY the 'Plan & Suggested Actions' section. Suggest next steps, monitoring, referrals, or med adjustments.", "Direct Guidance for Physician": "Generate ONLY the 'Direct Guidance for Physician' section. Give clear, actionable advice for the clinician." } instruction = section_prompts.get(section, f"Generate the '{section}' section.") return base_prompt + f"{instruction}\n\nOutput ONLY the section content. Do not include headers unless specified.\n\n" # Default: generate full 4-section summary return base_prompt + ( "The summary MUST have four sections:\n" "1) Clinical Assessment\n" "2) Key Trends & Changes\n" "3) Plan & Suggested Actions\n" "4) Direct Guidance for Physician\n\n" "Now generate the complete clinical summary with all four sections in markdown format:" ) def validate_and_compare_summaries(old_summary, new_summary, update_name=""): report = f"### Validation Report for {update_name}\n" report += "This report validates that the updated summary incorporates new information correctly.\n" report += "\n**Unified Diff (Line-by-Line Changes):**\n" diff = difflib.unified_diff( old_summary.splitlines(), new_summary.splitlines(), fromfile='Previous Summary', tofile='Current Summary', lineterm='' ) diff_text = "\n".join(list(diff)) if not diff_text: report += "No textual differences found between summaries.\n" else: report += "```diff\n" + diff_text + "\n```\n" return report