import streamlit as st import numpy as np import pandas as pd import matplotlib.pyplot as plt import torch from transformers import AutoImageProcessor, AutoModelForImageClassification from PIL import Image from difflib import get_close_matches from typing import Optional, Dict, Any import json import io import os import datetime import random # For simulating CGM data from datasets import load_dataset # Import datasets import plotly.graph_objects as go # Import Plotly # Configuration (unchanged) INSULIN_TYPES = { "Rapid-Acting": {"onset": 0.25, "duration": 4, "peak_time": 1.0}, "Long-Acting": {"onset": 2, "duration": 24, "peak_time": 8}, } DEFAULT_BASAL_RATES = { "00:00-06:00": 0.8, "06:00-12:00": 1.0, "12:00-18:00": 0.9, "18:00-24:00": 0.7 } GI_RANGES = { "low": (0, 55), "medium": (56, 69), "high": (70, 100) } # Utility Functions (mostly unchanged) def estimate_gi_timing(gi_value: Optional[int]) -> tuple[float, float]: if gi_value is None: return 1.0, 2.5 if gi_value <= 55: return 1.0, 3.0 elif 56 <= gi_value <= 69: return 0.75, 2.0 else: return 0.5, 1.5 def load_food_data(): try: ds = load_dataset("Anupam007/diabetic-food-analyzer") food_data = pd.DataFrame(ds['train']) food_data.columns = [col.lower().strip() for col in food_data.columns] food_data['food_name'] = food_data['food_name'].str.lower().str.strip() return food_data except Exception as e: print(f"Error loading food data: {e}") return pd.DataFrame() try: processor = AutoImageProcessor.from_pretrained("rajistics/finetuned-indian-food") model = AutoModelForImageClassification.from_pretrained("rajistics/finetuned-indian-food") model_loaded = True except Exception as e: print(f"Model Load Error: {e}") model_loaded = False processor = None model = None def classify_food(image): if not model_loaded or image is None: return "unknown" try: inputs = processor(images=image, return_tensors="pt") with torch.no_grad(): outputs = model(**inputs) predicted_idx = torch.argmax(outputs.logits, dim=-1).item() food_name = model.config.id2label.get(predicted_idx, "unknown").lower() return food_name except Exception as e: print(f"Classify food error: {e}") return "unknown" def get_food_nutrition(food_name: str, food_data: pd.DataFrame, weight_grams: Optional[float] = None) -> tuple[Optional[Dict[str, Any]], str]: food_name_lower = food_name.lower().strip() matches = get_close_matches(food_name_lower, food_data['food_name'].tolist(), n=1, cutoff=0.6) correction_note = "" if matches: corrected_name = matches[0] if corrected_name != food_name_lower: correction_note = f"Note: '{food_name}' corrected to '{corrected_name}'" matched_row = food_data[food_data['food_name'] == corrected_name].iloc[0] base_carbs = float(matched_row.get('unit_serving_carb_g', matched_row.get('carb_g', 0.0))) serving_size = matched_row.get('servings_unit', 'unknown') gi_value = matched_row.get('glycemic_index', None) if pd.isna(gi_value): gi_value = None else: try: gi_value = int(float(gi_value)) except (ValueError, TypeError): gi_value = None if weight_grams and serving_size != 'unknown': try: serving_weight = float(matched_row.get('serving_weight_grams', 100)) portion_size = weight_grams / serving_weight except (ValueError, TypeError): portion_size = 1.0 else: portion_size = 1.0 adjusted_carbs = base_carbs * portion_size nutrition_info = { 'matched_food': matched_row['food_name'], 'category': matched_row.get('primarysource', 'unknown'), 'subcategory': 'unknown', 'base_carbs': base_carbs, 'adjusted_carbs': adjusted_carbs, 'serving_size': f"1 {serving_size}", 'portion_multiplier': portion_size, 'notes': 'none', 'glycemic_index': gi_value } return nutrition_info, correction_note return None, f"No close match found for '{food_name}'" def determine_gi_level(gi_value: Optional[int]) -> str: if gi_value is None: return "Unknown" for level, (lower, upper) in GI_RANGES.items(): if lower <= gi_value <= upper: return level.capitalize() return "Unknown" def get_basal_rate(current_time_hour: float, basal_rates: Dict[str, float]) -> float: for interval, rate in basal_rates.items(): try: start, end = [int(x.split(':')[0]) for x in interval.split('-')] if start <= current_time_hour < end or (start <= current_time_hour and end == 24): return rate except Exception as e: print(f"Invalid basal interval {interval}: {e}") return 0.8 def insulin_activity(t: float, insulin_type: str, bolus_dose: float, bolus_duration: float = 0) -> float: insulin_data = INSULIN_TYPES.get(insulin_type, INSULIN_TYPES["Rapid-Acting"]) peak_time = insulin_data['peak_time'] duration = insulin_data['duration'] if bolus_duration > 0: if 0 <= t <= bolus_duration: return bolus_dose / bolus_duration return 0 if t < 0: return 0 elif t < peak_time: return bolus_dose * (t / peak_time) * np.exp(1 - t/peak_time) elif t < duration: return bolus_dose * np.exp((peak_time - t) / (duration - peak_time)) return 0 def calculate_active_insulin(insulin_history: list, current_time: float) -> float: return sum(insulin_activity(current_time - dose_time, insulin_type, dose_amount, bolus_duration) for dose_time, dose_amount, insulin_type, bolus_duration in insulin_history) def calculate_insulin_needs(carbs: float, glucose_current: float, glucose_target: float, tdd: float, weight: float, insulin_type: str = "Rapid-Acting", override_correction_dose: Optional[float] = None) -> Dict[str, Any]: if tdd <= 0 or weight <= 0: return {'error': 'TDD and weight must be positive'} insulin_data = INSULIN_TYPES.get(insulin_type, INSULIN_TYPES["Rapid-Acting"]) icr = 400 / tdd isf = 1700 / tdd correction_dose = (glucose_current - glucose_target) / isf if override_correction_dose is None else override_correction_dose carb_dose = carbs / icr total_bolus = max(0, carb_dose + correction_dose) basal_dose = weight * 0.5 return { 'icr': round(icr, 2), 'isf': round(isf, 2), 'correction_dose': round(correction_dose, 2), 'carb_dose': round(carb_dose, 2), 'total_bolus': round(total_bolus, 2), 'basal_dose': round(basal_dose, 2), 'insulin_type': insulin_type, 'insulin_onset': insulin_data['onset'], 'insulin_duration': insulin_data['duration'], 'peak_time': insulin_data['peak_time'] } def create_detailed_report(nutrition_info: Dict[str, Any], insulin_info: Dict[str, Any], current_basal_rate: float, correction_note: str) -> tuple[str, str, str]: gi_level = determine_gi_level(nutrition_info.get('glycemic_index')) peak_time, duration = estimate_gi_timing(nutrition_info.get('glycemic_index')) glucose_meal_details = f""" GLUCOSE & MEAL DETAILS: - Detected Food: {nutrition_info['matched_food']} - Category: {nutrition_info['category']} - Glycemic Index: {nutrition_info.get('glycemic_index', 'N/A')} ({gi_level}) - Peak Glucose Time: {peak_time} hours - Glucose Effect Duration: {duration} hours - Serving Size: {nutrition_info['serving_size']} - Carbs per Serving: {nutrition_info['base_carbs']}g - Portion Multiplier: {nutrition_info['portion_multiplier']}x - Total Carbs: {nutrition_info['adjusted_carbs']}g {correction_note} """ insulin_details = f""" INSULIN DETAILS: - ICR: 1:{insulin_info['icr']} - ISF: 1:{insulin_info['isf']} - Insulin Type: {insulin_info['insulin_type']} - Onset: {insulin_info['insulin_onset']}h - Duration: {insulin_info['insulin_duration']}h - Peak: {insulin_info['peak_time']}h - Correction Dose: {insulin_info['correction_dose']} units - Carb Dose: {insulin_info['carb_dose']} units - Total Bolus: {insulin_info['total_bolus']} units """ basal_details = f""" BASAL SETTINGS: - Basal Dose: {insulin_info['basal_dose']} units/day - Current Basal Rate: {current_basal_rate} units/h """ return glucose_meal_details, insulin_details, basal_details # ============================================================================= # CGM SIMULATION FUNCTIONS # ============================================================================= def simulate_cgm_data(start_time, end_time, frequency_minutes=5): """ Simulates CGM data for a given time range. Args: start_time: A datetime object representing the start time. end_time: A datetime object representing the end time. frequency_minutes: The interval between readings in minutes. Returns: A list of (timestamp, glucose_value) tuples. """ data = [] current_time = start_time while current_time <= end_time: # Simulate a glucose value (you can make this more realistic) glucose = random.randint(70, 180) # Example range: 70-180 mg/dL data.append((current_time, glucose)) current_time += datetime.timedelta(minutes=frequency_minutes) return data def get_latest_glucose(cgm_data): """ Extracts the most recent glucose value from the CGM data. Args: cgm_data: A list of (timestamp, glucose_value) tuples. Returns: The latest glucose value, or None if no data is available. """ if cgm_data: #Assumes data is sorted by time return cgm_data[-1][1] else: return None def calculate_cgm_statistics(cgm_data, target_range_lower, target_range_upper): """ Calculates statistics from the CGM data. Args: cgm_data: List of (timestamp, glucose_value) tuples. target_range_lower: Lower bound of the target range. target_range_upper: Upper bound of the target range. Returns: Tuple: (summary_text, avg_glucose, time_in_range) """ if not cgm_data: return "No CGM data available", 0, 0 glucose_values = [data[1] for data in cgm_data] avg_glucose = np.mean(glucose_values) std_dev = np.std(glucose_values) in_range_count = sum(1 for glucose in glucose_values if target_range_lower <= glucose <= target_range_upper) time_in_range = (in_range_count / len(cgm_data)) * 100 summary_text = ( f"Average Glucose: {avg_glucose:.2f} mg/dL\n" f"Standard Deviation: {std_dev:.2f} mg/dL\n" f"Time in Range: {time_in_range:.2f}%" ) return summary_text, avg_glucose, time_in_range def update_glucose_plot(cgm_data, target_glucose, target_range_lower, target_range_upper): """ Updates the glucose plot with customizable target range using Plotly. Args: cgm_data: List of (timestamp, glucose_value) tuples. target_glucose: The target glucose level. target_range_lower: Lower bound of the target range. target_range_upper: Upper bound of the target range. Returns: A Plotly figure. """ if not cgm_data: return None timestamps = [data[0] for data in cgm_data] glucose_values = [data[1] for data in cgm_data] fig = go.Figure() # CGM Glucose trace fig.add_trace(go.Scatter(x=timestamps, y=glucose_values, mode='lines', name='CGM Glucose')) # Target line fig.add_trace(go.Scatter(x=timestamps, y=[target_glucose] * len(timestamps), mode='lines', name='Target', line=dict(dash='dash'))) # Target range shading fig.add_trace(go.Scatter(x=timestamps + timestamps[::-1], # x, then x reversed y=[target_range_upper] * len(timestamps) + [target_range_lower] * len(timestamps[::-1]), fill='toself', fillcolor='rgba(0,128,0,0.1)', line=dict(color='rgba(255,255,255,0)'), # No line hoverinfo='skip', showlegend=False, name='Target Range')) fig.update_layout( xaxis_title='Time', yaxis_title='Glucose (mg/dL)', xaxis_tickformat='%H:%M', yaxis_range=[min(glucose_values) - 10, max(glucose_values) + 10] if glucose_values else [50, 200], title='CGM Glucose Over Time', template="plotly_white" ) return fig # Streamlit App def main(): st.title("Type 1 Diabetes Management Dashboard") # Sidebar inputs initial_glucose = st.sidebar.number_input("Manual Glucose (mg/dL)", value=120) target_glucose = st.sidebar.number_input("Target Glucose (mg/dL)", value=100) weight = st.sidebar.number_input("Weight (kg)", value=70) tdd = st.sidebar.number_input("Total Daily Dose (units)", value=40) # Main panel inputs food_name_input = st.text_input("Food Name (optional)", placeholder="Enter food name manually") weight_grams = st.number_input("Weight (grams, optional)", value=None) food_image = st.file_uploader("Food Image (optional)", type=["png", "jpg", "jpeg"]) # food_image = st.file_uploader("Food Image (optional)", type=["png", "jpg", "jpeg"]) insulin_type = st.selectbox("Insulin Type", list(INSULIN_TYPES.keys()), index=0) override_correction_dose = st.number_input("Override Correction Dose (units)", value=None) extended_bolus_duration = st.number_input("Extended Bolus Duration (h)", value=0) basal_rates_input = st.text_area("Basal Rates (JSON)", value=json.dumps(DEFAULT_BASAL_RATES), height=75) stress_level = st.slider("Stress Level", 1, 10, value=1) sleep_hours = st.number_input("Sleep Hours", value=7) exercise_duration = st.number_input("Exercise Duration (min)", value=0) exercise_intensity = st.slider("Exercise Intensity", 1, 10, value=1) time_hours = st.slider("Prediction Time (h)", 1, 24, value=6) target_range_lower = st.number_input("Target Range Lower (mg/dL)", value=70) target_range_upper = st.number_input("Target Range Upper (mg/dL)", value=180) # Button to trigger calculation if st.button("Calculate"): food_data = load_food_data() if food_data.empty: st.error("Error loading food data") return if food_name_input and food_name_input.strip(): food_name = food_name_input.strip() else: if food_image: food_name = classify_food(Image.open(food_image)) else: food_name = 'unknown' nutrition_info, correction_note = get_food_nutrition(food_name, food_data, weight_grams) if not nutrition_info: st.error(correction_note) return try: basal_rates = json.loads(basal_rates_input) except: st.warning("Invalid JSON for Basal Rates. Using Default Rates.") basal_rates = DEFAULT_BASAL_RATES # ========================================================================= # CGM DATA INTEGRATION (Simulated) # ========================================================================= now = datetime.datetime.now() cgm_start_time = now - datetime.timedelta(hours=1) cgm_data = simulate_cgm_data(cgm_start_time, now) latest_glucose = get_latest_glucose(cgm_data) if latest_glucose is not None: current_glucose = latest_glucose glucose_source = "CGM" else: current_glucose = initial_glucose glucose_source = "Manual Input" insulin_info = calculate_insulin_needs( nutrition_info['adjusted_carbs'], current_glucose, target_glucose, tdd, weight, insulin_type, override_correction_dose ) if 'error' in insulin_info: st.error(insulin_info['error']) return current_basal_rate = get_basal_rate(12, basal_rates) glucose_meal_details, insulin_details, basal_details = create_detailed_report(nutrition_info, insulin_info, current_basal_rate, correction_note) st.subheader("Glucose & Meal Details") st.write(glucose_meal_details) st.subheader("Insulin Details") st.write(insulin_details) st.subheader("Basal Settings") st.write(basal_details) # CGM Plotting and Statistics cgm_plot = update_glucose_plot(cgm_data, target_glucose, target_range_lower, target_range_upper) if cgm_plot: st.subheader("CGM Glucose Data") st.plotly_chart(cgm_plot) summary_text, avg_glucose, time_in_range = calculate_cgm_statistics(cgm_data, target_range_lower, target_range_upper) st.subheader("CGM Summary") st.write(summary_text) if __name__ == "__main__": main()