# # Parse vitals list into dictionary (retained from first code) # from typing import Dict, List # from attrs import field # def parse_vitals(vitals_list): # vitals_dict = { # "BP": "N/A", "HR": "N/A", "Temp": "N/A", # "SpO2": "N/A", "Height": "N/A", "Weight": "N/A", "BMI": "N/A" # } # if not isinstance(vitals_list, list): # return vitals_dict # bp_sys = None # bp_dia = None # for item in vitals_list: # if not isinstance(item, dict): # continue # name = str(item.get("name", "")).lower() # value = str(item.get("value", "N/A")) # if "bp" in name and "sys" in name: # bp_sys = value # elif "bp" in name and "dia" in name: # bp_dia = value # elif "pulse" in name or "hr" in name: # vitals_dict["HR"] = value # elif "temp" in name: # vitals_dict["Temp"] = value # elif "spo2" in name or "o2 sat" in name: # vitals_dict["SpO2"] = value # elif "height" in name: # vitals_dict["Height"] = value # elif "weight" in name: # vitals_dict["Weight"] = value # elif "bmi" in name: # vitals_dict["BMI"] = value # if bp_sys and bp_dia: # vitals_dict["BP"] = f"{bp_sys}/{bp_dia}" # elif bp_sys: # vitals_dict["BP"] = f"{bp_sys}/N/A" # return vitals_dict # def extract_vital_trends(chart_dtl): # """Extract time-series data for key vitals.""" # bp_trend = [] # hr_trend = [] # weight_trend = [] # spo2_trend = [] # for chart in sorted(chart_dtl, key=lambda x: x.get("chartdate", "")): # chart_date = chart["chartdate"][:10] # vitals = chart.get("vitals", []) # bp_sys = bp_dia = hr = wt = spo2 = None # for v in vitals: # name = v.get("name", "").lower() # val = v.get("value", "N/A") # if "bp(sys)" in name: bp_sys = val # elif "bp(dia)" in name: bp_dia = val # elif "pulse" in name or "hr" in name: hr = val # elif "weight" in name: wt = val # elif "spo2" in name: spo2 = val # if bp_sys and bp_dia: # bp_trend.append(f"{chart_date}: {bp_sys}/{bp_dia}") # if hr: # hr_trend.append(f"{chart_date}: {hr}") # if wt: # weight_trend.append(f"{chart_date}: {wt} kg") # if spo2: # spo2_trend.append(f"{chart_date}: {spo2}%") # return { # "bp_trend": bp_trend[-3:], # "hr_trend": hr_trend[-3:], # "weight_trend": weight_trend[-3:], # "spo2_trend": spo2_trend[-3:] # } # def clean_patient_data(raw_data: Dict) -> Dict: # result = raw_data.get("result", raw_data) # def get_value(field, default="Unknown"): # val = result.get(field) # return val if val not in [None, "", "Null", "NULL"] else default # def safe_list(value, default=None): # if default is None: # default = [] # return value if isinstance(value, list) and len(value) > 0 else default # cleaned = {} # chart_dtl = safe_list(result.get("chartsummarydtl")) # # Basic details # cleaned["Patient Name"] = get_value("patientname", "Unknown") # cleaned["Patient Number"] = get_value("patientnumber", "Unknown") # cleaned["Gender"] = get_value("gender", "Unknown") # cleaned["Age"] = str(get_value("agey", "Unknown")) # cleaned["DOB"] = get_value("dob", "N/A") # cleaned["Blood Group"] = get_value("bloodgrp", "N/A") # cleaned["Last Visit"] = get_value("lastvisitdt", "N/A") # cleaned["Chief Complaint"] = get_value("chief_complaint", "Not specified") # cleaned["Social History"] = get_value("social_history", "Not specified") # cleaned["Assessment Plan"] = get_value("assessment_plan", "No plan specified") # cleaned["Past Medical History"] = ", ".join(safe_list(result.get("pattypelst"))) or "None" # # Collections across visits # all_medications = [] # all_diagnoses = [] # all_labtests = [] # all_radiology = [] # allergies_set = set() # for chart in chart_dtl: # # Allergies # for allergy in safe_list(chart.get("allergies")): # if allergy not in ["None", "None known", "N/A", ""]: # allergies_set.add(allergy) # # Medications # for med in safe_list(chart.get("medications")): # if "||" in med: # clean_med = med.split("||")[0].strip() # if clean_med and clean_med not in ["", "None"]: # all_medications.append(clean_med) # # Diagnosis # for diag in safe_list(chart.get("diagnosis")): # if diag not in ["None", "N/A", ""]: # all_diagnoses.append(diag) # # Lab tests # all_labtests.extend([t for t in safe_list(chart.get("labtests")) if t not in ["None", "N/A", ""]]) # # Radiology # for rad in safe_list(chart.get("radiologyorders")): # if isinstance(rad, dict) and rad.get("name"): # name = rad["name"].strip() # if name not in ["None", "N/A", ""]: # all_radiology.append(name) # elif isinstance(rad, str) and rad.strip() not in ["None", "N/A", ""]: # all_radiology.append(rad.strip()) # # Assign cleaned values # cleaned["Allergies"] = ", ".join(sorted(allergies_set)) if allergies_set else "None known" # cleaned["Medications"] = ", ".join(sorted(set(all_medications))) if all_medications else "None" # cleaned["Diagnosis"] = ", ".join(sorted(set(all_diagnoses))) if all_diagnoses else "None" # cleaned["Lab Tests"] = ", ".join(sorted(set(all_labtests))) if all_labtests else "None" # cleaned["Radiology Orders"] = ", ".join(sorted(set(all_radiology))) if all_radiology else "None" # cleaned["Lab Results"] = ", ".join(safe_list(result.get("labresultlst"))) or "N/A" # # Extract trends # trends = extract_vital_trends(chart_dtl) # cleaned["BP Trend (Last 3 Visits)"] = "; ".join(trends["bp_trend"]) if trends["bp_trend"] else "Stable" # cleaned["HR Trend (Last 3 Visits)"] = "; ".join(trends["hr_trend"]) if trends["hr_trend"] else "Stable" # cleaned["Weight Trend (Last 3 Visits)"] = "; ".join(trends["weight_trend"]) if trends["weight_trend"] else "Stable" # cleaned["SpO2 Trend (Last 3 Visits)"] = "; ".join(trends["spo2_trend"]) if trends["spo2_trend"] else "Stable" # # Latest vitals # latest_chart = max(chart_dtl, key=lambda x: x.get("chartdate", ""), default={}) # latest_vitals = parse_vitals(latest_chart.get("vitals")) # for k, v in latest_vitals.items(): # cleaned[f"Latest {k}"] = v # # Copy rest (optional fields) # for key, value in result.items(): # if key not in cleaned and key not in ["chartsummarydtl", "pattypelst"]: # cleaned[key.capitalize()] = value if value not in [None, "", "Null", "NULL"] else "N/A" # return {"result": cleaned} # def flatten_to_string_list(cleaned_data: Dict) -> List[str]: # """Convert cleaned data into a clean list of formatted strings.""" # def format_value(val, indent=0): # prefix = " " * indent # lines = [] # if isinstance(val, dict): # for k, v in val.items(): # if isinstance(v, (dict, list)): # lines.append(f"{prefix}{k}:") # lines.extend(format_value(v, indent + 1)) # else: # display_v = v if v not in [None, "", "Unknown", "N/A"] else "N/A" # lines.append(f"{prefix}{k}: {display_v}") # elif isinstance(val, list): # if not val or val == ["None"]: # lines.append(f"{prefix}- None") # else: # for item in val: # if isinstance(item, (dict, list)): # lines.extend(format_value(item, indent + 1)) # else: # display_item = item if item not in [None, "", "Unknown", "N/A"] else "N/A" # lines.append(f"{prefix}- {display_item}") # else: # display_val = val if val not in [None, "", "Unknown", "N/A"] else "N/A" # lines.append(f"{prefix}{display_val}") # return lines # formatted_list = [] # data = cleaned_data.get("result", {}) # for key, value in data.items(): # formatted_list.append(f"{key}:") # formatted_list.extend(format_value(value, 1)) # return formatted_list # # Chunk text (from second code) # def patient_chunk_text(lines, chunk_size=1500): # chunks, chunk = [], "" # for line in lines: # if len(chunk) + len(line) < chunk_size: # chunk += line + ";" # else: # chunks.append(chunk.strip()) # chunk = line + ";" # if chunk: # chunks.append(chunk.strip()) # return chunks # # utils.py # from typing import Dict, List, Any # def parse_vitals(vitals_list: Any) -> Dict[str, str]: # vitals = {"BP": "N/A", "HR": "N/A", "SpO2": "N/A", "Weight": "N/A"} # if not isinstance(vitals_list, list): # return vitals # bp_sys = bp_dia = None # for v in vitals_list: # if not isinstance(v, dict): # continue # name = v.get("name", "").lower() # val = str(v.get("value", "N/A")) # if "bp(sys)" in name: bp_sys = val # elif "bp(dia)" in name: bp_dia = val # elif "pulse" in name or "hr" in name: vitals["HR"] = val # elif "spo2" in name: vitals["SpO2"] = val # elif "weight" in name: vitals["Weight"] = val # if bp_sys and bp_dia: # vitals["BP"] = f"{bp_sys}/{bp_dia}" # return vitals # def extract_trends(chart_dtl: List[Dict]) -> Dict[str, str]: # """Extract clean, readable trend strings.""" # bp = [] # hr = [] # wt = [] # spo2 = [] # # Sort by date ascending # sorted_charts = sorted( # [c for c in chart_dtl if c.get("chartdate")], # key=lambda x: x["chartdate"] # ) # for chart in sorted_charts: # date = chart["chartdate"][:10] # vitals = parse_vitals(chart.get("vitals", [])) # bp_val = vitals["BP"] # if "N/A" not in bp_val: # bp.append(f"{date}: {bp_val}") # if vitals["HR"] != "N/A": # hr.append(f"{date}: {vitals['HR']}") # if vitals["Weight"] != "N/A": # wt.append(f"{date}: {vitals['Weight']} kg") # if vitals["SpO2"] != "N/A": # spo2.append(f"{date}: {vitals['SpO2']}%") # return { # "bp_trend": "; ".join(bp[-3:]) if bp else "No data", # "hr_trend": "; ".join(hr[-3:]) if hr else "No data", # "weight_trend": "; ".join(wt[-3:]) if wt else "No data", # "spo2_trend": "; ".join(spo2[-3:]) if spo2 else "No data" # } # def extract_lab_history(chart_dtl: List[Dict]) -> str: # seen = {} # for chart in chart_dtl: # for test in chart.get("labtests", []): # if isinstance(test, dict) and test.get("name"): # name = test["name"].strip() # date = chart["chartdate"][:10] # if name not in seen: # seen[name] = [] # seen[name].append(f"{date}") # lines = [f"{k}: ordered on {', '.join(v)}" for k, v in seen.items()] # return ";".join(lines) if lines else "None" # def extract_radiology_history(chart_dtl: List[Dict]) -> str: # seen = {} # for chart in chart_dtl: # for order in chart.get("radiologyorders", []): # if isinstance(order, dict) and order.get("name"): # name = order["name"].strip() # date = chart["chartdate"][:10] # if name not in seen: # seen[name] = [] # seen[name].append(date) # lines = [f"{k}: ordered on {', '.join(v)}" for k, v in seen.items()] # return ";".join(lines) if lines else "None" # def extract_medications(chart_dtl: List[Dict]) -> str: # meds = {} # for chart in chart_dtl: # for med in chart.get("medications", []): # if isinstance(med, str) and "||" in med: # name = med.split("||")[0].strip() # if name: # if name not in meds: # meds[name] = 0 # meds[name] += 1 # # Chronic if in ≥2 visits # chronic = [m for m, c in meds.items() if c >= 2] # return ", ".join(chronic) if chronic else "None identified" # def extract_allergies(chart_dtl: List[Dict]) -> str: # allergies = set() # for chart in chart_dtl: # for a in chart.get("allergies", []): # if isinstance(a, str) and a.lower() not in ["none", "n/a", ""]: # allergies.add(a.strip()) # return ", ".join(sorted(allergies)) if allergies else "None known" # def extract_diagnosis(chart_dtl: List[Dict]) -> str: # diagnoses = set() # for chart in chart_dtl: # for d in chart.get("diagnosis", []): # if isinstance(d, str) and d.lower() not in ["none", "n/a", ""]: # diagnoses.add(d.strip()) # return ", ".join(sorted(diagnoses)) if diagnoses else "None" # def clean_patient_data(raw_data: Dict) -> Dict: # result = raw_data.get("result", raw_data) # chart_dtl = result.get("chartsummarydtl", []) # # Only extract what we need, as clean strings # cleaned = { # "Patient Name": result.get("patientname") or "Anonymous", # "Patient Number": result.get("patientnumber", "Unknown"), # "Gender": result.get("gender", "Unknown"), # "Age": str(result.get("agey", "Unknown")), # "DOB": result.get("dob", "N/A"), # "Last Visit": result.get("lastvisitdt", "N/A")[:10], # "Allergies": extract_allergies(chart_dtl), # "Diagnosis": extract_diagnosis(chart_dtl), # "Medications": ", ".join(sorted({med.split('||')[0].strip() for chart in chart_dtl for med in chart.get("medications", []) if isinstance(med, str) and "||" in med})) or "None", # "Chronic Medications": extract_medications(chart_dtl), # "Lab Test History": extract_lab_history(chart_dtl), # "Radiology Order History": extract_radiology_history(chart_dtl), # } # # Vital Trends # trends = extract_trends(chart_dtl) # cleaned.update({ # "BP Trend (Last 3 Visits)": trends["bp_trend"], # "HR Trend (Last 3 Visits)": trends["hr_trend"], # "Weight Trend (Last 3 Visits)": trends["weight_trend"], # "SpO2 Trend (Last 3 Visits)": trends["spo2_trend"], # }) # # Latest Vitals # if chart_dtl: # latest = max(chart_dtl, key=lambda x: x.get("chartdate", "")) # latest_vitals = parse_vitals(latest.get("vitals", [])) # for k, v in latest_vitals.items(): # cleaned[f"Latest {k}"] = v # return {"result": cleaned} # def flatten_to_string_list(cleaned_data: Dict) -> List[str]: # """Convert cleaned data to list of 'key: value' strings. Value must be string.""" # lines = [] # data = cleaned_data.get("result", {}) # for key, value in data.items(): # if isinstance(value, (dict, list)): # continue # Skip nested data — we already made it flat # lines.append(f"{key}: {value if value else 'N/A'}") # return lines # utils.py from typing import Dict, List, Any, Union import json # General helpers for robust parsing/normalization def _try_json_loads(text: str) -> Any: try: return json.loads(text) except Exception: return None def _parse_key_value_lines(text: str) -> Dict[str, Any]: """Parse simple key:value lines into a dict. Tolerates commas and semicolons.""" result: Dict[str, Any] = {} if not isinstance(text, str): return result # Split by newlines; if none, split by semicolons as fallback lines = [ln for ln in text.replace("\r", "\n").split("\n") if ln.strip()] or [ln for ln in text.split(";") if ln.strip()] for line in lines: if ":" in line: key, val = line.split(":", 1) key = key.strip() val = val.strip().strip(",") if key: result[key] = val return result def _first_match(dct: Dict[str, Any], candidate_keys: List[str], default: Any = None) -> Any: for key in candidate_keys: if key in dct: return dct.get(key) # case-insensitive scan lower_map = {k.lower(): k for k in dct.keys()} for key in candidate_keys: if key.lower() in lower_map: return dct.get(lower_map[key.lower()]) return default def _ensure_vitals_list(vitals: Any) -> List[Dict[str, Any]]: """Coerce various vitals formats into list of {name, value} dicts.""" if isinstance(vitals, list): # Normalize list items normalized: List[Dict[str, Any]] = [] for item in vitals: if isinstance(item, dict): if "name" in item and "value" in item: normalized.append({"name": item.get("name"), "value": item.get("value")}) else: # Try convert single-key dict {"BP": "120/80"} if len(item) == 1: k, v = next(iter(item.items())) normalized.append({"name": k, "value": v}) elif isinstance(item, str): # e.g., "BP: 120/80" if ":" in item: k, v = item.split(":", 1) normalized.append({"name": k.strip(), "value": v.strip()}) return normalized if isinstance(vitals, dict): return [{"name": k, "value": v} for k, v in vitals.items()] if isinstance(vitals, str): return _ensure_vitals_list(_parse_key_value_lines(vitals)) return [] def _find_chart_list(obj: Any) -> List[Dict[str, Any]]: """Try to locate the list of encounter/visit dictionaries in a flexible way.""" if isinstance(obj, dict): # Preferred keys and common synonyms candidate_keys = [ "chartsummarydtl", "chartSummaryDtl", "encounters", "visits", "charts", "history", "timeline" ] for key in candidate_keys: val = _first_match(obj, [key]) if isinstance(val, list) and any(isinstance(x, dict) for x in val): return val # search nested dicts for v in obj.values(): res = _find_chart_list(v) if res: return res elif isinstance(obj, list): if obj and all(isinstance(x, dict) for x in obj): return obj # search within items for v in obj: res = _find_chart_list(v) if res: return res return [] def normalize_raw_patient_response(raw: Any) -> Dict[str, Any]: """ Normalize arbitrary raw payload (dict/list/string) into a dict suitable for downstream processing. Returns a dict with at least keys: result (dict) which may include chartsummarydtl list and basic demographics. """ # Decode if it's a string if isinstance(raw, str): loaded = _try_json_loads(raw) if loaded is None: kv = _parse_key_value_lines(raw) return {"result": kv or {"raw_text": raw}} raw = loaded if isinstance(raw, list): # If list of dicts, assume encounters if raw and all(isinstance(x, dict) for x in raw): return {"result": {"chartsummarydtl": raw}} # Else return as raw_text return {"result": {"raw_text": "\n".join(str(x) for x in raw)}} if isinstance(raw, dict): result = raw.get("result", raw) # Build a normalized result view with common demographics norm: Dict[str, Any] = {} # Map common fields using flexible key matching norm["patientname"] = _first_match(result, ["patientname", "patient_name", "name", "patient"]) or "Anonymous" norm["patientnumber"] = _first_match(result, ["patientnumber", "patient_number", "id", "patientid"]) or "Unknown" norm["gender"] = _first_match(result, ["gender", "sex"]) or "Unknown" norm["agey"] = _first_match(result, ["agey", "age", "age_years"]) or "Unknown" norm["dob"] = _first_match(result, ["dob", "dateofbirth", "birthdate"]) or "N/A" norm["lastvisitdt"] = (_first_match(result, ["lastvisitdt", "last_visit", "last_visit_date"]) or "N/A") # Encounters chart_dtl = _find_chart_list(result) # Normalize vitals inside encounters normalized_charts: List[Dict[str, Any]] = [] for entry in chart_dtl: if not isinstance(entry, dict): continue entry_copy = dict(entry) entry_copy["vitals"] = _ensure_vitals_list(entry_copy.get("vitals", [])) # Normalize sublists to predictable forms for list_key in ["allergies", "diagnosis", "medications", "labtests", "radiologyorders"]: val = entry_copy.get(list_key) if isinstance(val, str): entry_copy[list_key] = [v.strip() for v in val.split(",") if v.strip()] elif not isinstance(val, list): entry_copy[list_key] = [] normalized_charts.append(entry_copy) if normalized_charts: norm["chartsummarydtl"] = normalized_charts # Preserve any fields not mapped for k, v in result.items(): if k not in norm: norm.setdefault(k, v) return {"result": norm} # Unknown shape return {"result": {"raw_text": str(raw)}} def parse_vitals(vitals_list: List[Dict]) -> str: """Extract readable vitals string from list.""" if not isinstance(vitals_list, list): return "N/A" bp_sys = bp_dia = hr = spo2 = wt = None for v in _ensure_vitals_list(vitals_list): if not isinstance(v, dict): continue name = v.get("name", "").lower() val = v.get("value", "N/A") if "bp(sys)" in name: bp_sys = val elif "bp(dia)" in name: bp_dia = val elif "pulse" in name or "hr" in name: hr = val elif "spo2" in name: spo2 = val elif "weight" in name: wt = val bp = f"{bp_sys}/{bp_dia}" if bp_sys and bp_dia else "N/A" return f"BP: {bp}, HR: {hr}, SpO2: {spo2}%, Weight: {wt} kg" def build_patient_timeline(chart_dtl: List[Dict]) -> str: """ Build a chronological narrative timeline from multiple visits. This is the core of trend-aware summarization. """ if not chart_dtl: return "No visit data available." # Sort by date sorted_charts = sorted(chart_dtl, key=lambda x: x.get("chartdate", "")) timeline_parts = [] for chart in sorted_charts: date = (chart.get("chartdate") or chart.get("date") or "Unknown")[:10] vitals_str = parse_vitals(chart.get("vitals", [])) allergies = ", ".join([a for a in chart.get("allergies", []) if a.lower() not in ["none", "n/a"]]) diagnosis = ", ".join([d for d in chart.get("diagnosis", []) if d.lower() not in ["none", "n/a"]]) medications = ", ".join([ med.split("||")[0].strip() for med in chart.get("medications", []) if isinstance(med, str) and "||" in med ]) lab_tests = ", ".join([ test.get("name", "") for test in chart.get("labtests", []) if isinstance(test, dict) and test.get("name") ]) radiology = ", ".join([ order.get("name", "") for order in chart.get("radiologyorders", []) if isinstance(order, dict) and order.get("name") ]) entry = ( f"On {date}, the patient presented with vitals: {vitals_str}. " ) if diagnosis: entry += f"Diagnosis: {diagnosis}. " if medications: entry += f"Medications prescribed: {medications}. " if lab_tests: entry += f"Laboratory tests ordered: {lab_tests}. " if radiology: entry += f"Imaging ordered: {radiology}. " if allergies: entry += f"Allergies noted: {allergies}. " timeline_parts.append(entry.strip()) return " ".join(timeline_parts) def extract_trends_and_recommendations(raw_data: Dict) -> Dict: """ Extract trend logic and generate structured insights (like in the Longformer script). """ result = raw_data.get("result", raw_data) chart_dtl = result.get("chartsummarydtl", []) if not chart_dtl: return {} # Latest visit latest = max(chart_dtl, key=lambda x: x.get("chartdate") or x.get("date") or "") latest_date = (latest.get("chartdate") or latest.get("date") or "Unknown")[:10] # BP Trend bp_vals = [] for chart in chart_dtl: vitals = _ensure_vitals_list(chart.get("vitals", [])) bp_sys = next((v["value"] for v in vitals if "bp(sys)" in str(v.get("name", "")).lower()), None) bp_dia = next((v["value"] for v in vitals if "bp(dia)" in str(v.get("name", "")).lower()), None) if bp_sys and bp_dia: date = (chart.get("chartdate") or chart.get("date") or "")[:10] bp_vals.append(f"{date}: {bp_sys}/{bp_dia}") bp_trend = "; ".join(bp_vals) if bp_vals else "No data" # Weight Trend weight_vals = [] for chart in chart_dtl: vitals = _ensure_vitals_list(chart.get("vitals", [])) wt = next((v["value"] for v in vitals if "weight" in str(v.get("name", "")).lower()), None) if wt: date = (chart.get("chartdate") or chart.get("date") or "")[:10] weight_vals.append(f"{date}: {wt} kg") weight_trend = "; ".join(weight_vals) if weight_vals else "No data" # Chronic Meds med_usage = {} for chart in chart_dtl: for med in chart.get("medications", []): if isinstance(med, str) and "||" in med: name = med.split("||")[0].strip() med_usage[name] = med_usage.get(name, 0) + 1 chronic_meds = [m for m, c in med_usage.items() if c >= 2] # Repeated Imaging imaging_orders = {} for chart in chart_dtl: for order in chart.get("radiologyorders", []): if isinstance(order, dict) and order.get("name"): name = order["name"] imaging_orders[name] = imaging_orders.get(name, 0) + 1 repeated_imaging = [name for name, count in imaging_orders.items() if count > 1] return { "latest_date": latest_date, "bp_trend": bp_trend, "weight_trend": weight_trend, "chronic_meds": chronic_meds, "repeated_imaging": repeated_imaging, "total_visits": len(chart_dtl) } def clean_patient_data(raw_data: Union[Dict, List, str]) -> Dict: """ Clean and enrich patient data with timeline and insights. No flattening yet — we'll pass timeline directly to model. """ try: normalized = normalize_raw_patient_response(raw_data) result = normalized.get("result", {}) chart_dtl = result.get("chartsummarydtl", []) # Build timeline timeline = build_patient_timeline(chart_dtl) # Extract insights insights = extract_trends_and_recommendations(normalized) # Basic demographics cleaned = { "Patient Name": result.get("patientname") or "Anonymous", "Patient Number": result.get("patientnumber", "Unknown"), "Gender": result.get("gender", "Unknown"), "Age": str(result.get("agey", "Unknown")), "DOB": result.get("dob", "N/A"), "Last Visit": (result.get("lastvisitdt", "N/A") or "N/A")[:10], "Timeline": timeline, "Insights": insights, # Expose normalized encounters for downstream structured summaries "chartsummarydtl": chart_dtl } return {"result": cleaned} except Exception as e: print(f"[ERROR] clean_patient_data: {str(e)}") return {"result": {"error": "Failed to process data"}} def flatten_to_string_list(cleaned_data: Dict) -> List[str]: """ Now we flatten, but Timeline is a single string. """ data = cleaned_data.get("result", {}) lines = [] for key, value in data.items(): if key == "Insights": continue # We'll use this in prompt, not in flattened if isinstance(value, str): lines.append(f"{key}: {value}") elif isinstance(value, (dict, list)): # Compact JSON representation for nested structures try: lines.append(f"{key}: {json.dumps(value, ensure_ascii=False)}") except Exception: lines.append(f"{key}: {str(value)}") return lines