sachinchandrankallar commited on
Commit
47d30e5
·
1 Parent(s): d58f7ff

Enhance patient summary processing with queue management and improved error handling. Introduced a queue manager to handle request slots, ensuring efficient processing and timeout management. Updated background task logic to include performance metrics and detailed error responses, enhancing overall reliability and maintainability of the patient summary generation workflow.

Browse files
HF_SPACES_CONCURRENT_HANDLING.md ADDED
@@ -0,0 +1,182 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Hugging Face Spaces & Concurrent Request Handling
2
+
3
+ ## Overview
4
+ The system now supports Hugging Face Spaces deployment (T4 medium GPU) with proper concurrent request handling.
5
+
6
+ ## Features Implemented
7
+
8
+ ### 1. ✅ Request Queue Manager
9
+ **File**: `services/ai-service/src/ai_med_extract/services/request_queue.py`
10
+
11
+ **Features**:
12
+ - **Concurrent Request Limiting**: Max 2 concurrent requests for T4 medium GPU
13
+ - **Request Queuing**: Queue of up to 5 requests when all slots are busy
14
+ - **Priority System**: High/Normal/Low priority support
15
+ - **Automatic Slot Management**: Releases slots when requests complete
16
+ - **Queue Status API**: `/api/queue_status` endpoint for monitoring
17
+
18
+ **HF Spaces Configuration**:
19
+ - Max concurrent: 2 requests (T4 medium GPU limitation)
20
+ - Max queue size: 5 requests
21
+ - Queue timeout: 5 minutes
22
+
23
+ **Local/Dev Configuration**:
24
+ - Max concurrent: 4 requests
25
+ - Max queue size: 20 requests
26
+ - Queue timeout: 10 minutes
27
+
28
+ ### 2. ✅ Queue Integration in Routes
29
+
30
+ **Endpoints Updated**:
31
+ - `/generate_patient_summary` (streaming mode)
32
+ - `/generate_patient_summary_streaming`
33
+ - `/generate_patient_summary_large_data`
34
+
35
+ **How It Works**:
36
+ 1. Request arrives → Check queue capacity
37
+ 2. If capacity available → Enqueue request
38
+ 3. Create job → Wait for processing slot
39
+ 4. When slot available → Start background processing
40
+ 5. When complete → Release slot automatically
41
+
42
+ ### 3. ✅ HF Spaces Optimizations
43
+
44
+ **Automatic Detection**:
45
+ - Detects `HF_SPACES` environment variable
46
+ - Adjusts limits automatically for T4 medium GPU
47
+ - Optimizes memory usage
48
+
49
+ **Resource Management**:
50
+ - Limits concurrent GPU operations
51
+ - Prevents OOM errors
52
+ - Manages model loading/unloading
53
+
54
+ ## Usage
55
+
56
+ ### Check Queue Status
57
+ ```bash
58
+ GET /api/queue_status
59
+ ```
60
+
61
+ Response:
62
+ ```json
63
+ {
64
+ "active_requests": 1,
65
+ "queue_size": 2,
66
+ "max_concurrent": 2,
67
+ "max_queue_size": 5,
68
+ "total_processed": 10,
69
+ "total_rejected": 0,
70
+ "total_timeout": 0,
71
+ "queue_positions": [
72
+ {
73
+ "request_id": "...",
74
+ "job_id": "...",
75
+ "priority": "NORMAL",
76
+ "wait_time": 5.2
77
+ }
78
+ ]
79
+ }
80
+ ```
81
+
82
+ ### Making Requests
83
+
84
+ **Normal Request** (non-streaming):
85
+ - No queue management (processed immediately)
86
+ - Suitable for fast rule-based generation
87
+
88
+ **Streaming Request**:
89
+ - Automatically queued if slots are full
90
+ - Returns 503 if queue is full
91
+ - Streams progress updates including queue position
92
+
93
+ ## Error Handling
94
+
95
+ ### Queue Full (503 Service Unavailable)
96
+ ```json
97
+ {
98
+ "detail": "Queue full (5/5). Please try again later."
99
+ }
100
+ ```
101
+
102
+ ### Queue Timeout
103
+ - If request waits >5 minutes in queue
104
+ - Job marked as error
105
+ - Slot released automatically
106
+
107
+ ## Performance
108
+
109
+ ### T4 Medium GPU Limits
110
+ - **Concurrent Requests**: 2 (prevents GPU OOM)
111
+ - **Queue Size**: 5 (reasonable wait time)
112
+ - **Memory**: ~16GB GPU, shared between requests
113
+
114
+ ### Resource Sharing
115
+ - Models are cached and shared between requests
116
+ - GPU memory is managed per request
117
+ - CPU memory is cleaned up after each request
118
+
119
+ ## Monitoring
120
+
121
+ ### Queue Metrics
122
+ - Active requests count
123
+ - Queue size
124
+ - Total processed/rejected/timeout
125
+ - Average wait time
126
+
127
+ ### Job Status
128
+ - Queue position shown in job data
129
+ - Progress updates include queue status
130
+ - SSE stream shows queue position
131
+
132
+ ## Best Practices for HF Spaces
133
+
134
+ 1. **Use Streaming**: Always use `stream=true` for long operations
135
+ 2. **Monitor Queue**: Check `/api/queue_status` before making requests
136
+ 3. **Handle 503**: Implement retry logic for queue full errors
137
+ 4. **Timeout Handling**: Set appropriate client timeouts (>5 minutes)
138
+ 5. **Resource Limits**: Be aware of T4 medium GPU limitations
139
+
140
+ ## Configuration
141
+
142
+ ### Environment Variables
143
+ - `HF_SPACES=true` - Enables HF Spaces mode
144
+ - `SPACE_ID` - Auto-detected on HF Spaces
145
+
146
+ ### Adjusting Limits
147
+ Edit `services/ai-service/src/ai_med_extract/services/request_queue.py`:
148
+ ```python
149
+ # For HF Spaces
150
+ RequestQueueManager(
151
+ max_concurrent=2, # Adjust based on GPU
152
+ max_queue_size=5, # Adjust based on expected load
153
+ queue_timeout=300 # 5 minutes
154
+ )
155
+ ```
156
+
157
+ ## Testing Concurrent Requests
158
+
159
+ ```python
160
+ import requests
161
+ import concurrent.futures
162
+
163
+ def make_request(i):
164
+ response = requests.post(
165
+ "https://your-space.hf.space/generate_patient_summary",
166
+ json={"patientid": "...", "token": "...", "key": "...", "stream": True},
167
+ stream=True
168
+ )
169
+ return i, response.status_code
170
+
171
+ # Test 5 concurrent requests
172
+ with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
173
+ results = executor.map(make_request, range(5))
174
+ for i, status in results:
175
+ print(f"Request {i}: {status}")
176
+ ```
177
+
178
+ Expected behavior:
179
+ - 2 requests start immediately
180
+ - 3 requests queued
181
+ - Requests process in order as slots become available
182
+
services/ai-service/src/ai_med_extract/api/routes_fastapi.py CHANGED
@@ -38,12 +38,17 @@ from ..utils.constants import (
38
  CHUNKING_SIZE_THRESHOLD, CHUNK_SIZE_VISITS, SSE_CONFIG,
39
  JOB_STATUS, GENERATION_MODES
40
  )
41
- from ..services.job_manager import get_job_manager, update_job, cleanup_job
42
  from ..services.error_handler import (
43
  log_error_safely, handle_error_gracefully, update_job_with_error,
44
  ErrorCategory, PatientSummaryError
45
  )
46
- from ..services.sse_generator import sse_generator as sse_generator_service, sse_generator_extended as sse_generator_extended_service
 
 
 
 
 
47
  from ..utils.common_helpers import (
48
  extract_text_from_pipeline_result, validate_required_fields,
49
  is_error_response, create_error_dict, merge_config
@@ -55,8 +60,9 @@ GGUF_PIPELINE_CACHE = {}
55
  # Global agents variable - will be set during registration
56
  agents = {}
57
 
58
- # Initialize job manager
59
  job_manager = get_job_manager()
 
60
 
61
  # ========== PERFORMANCE TUNING HELPERS ==========
62
  def _effective_max_new_tokens(requested: int | None, default: int = 1024) -> int:
@@ -1663,144 +1669,137 @@ Current settings:
1663
  return error_response
1664
 
1665
  def process_patient_summary_background(data, job_id):
1666
- """Enhanced background task for patient summary generation with intelligent timeout handling"""
1667
- print(f"🚀 Enhanced background task started for job_id: {job_id}")
1668
  start_time = time.perf_counter()
1669
 
1670
  try:
1671
- # Create a new event loop for this thread
1672
- loop = asyncio.new_event_loop()
1673
- asyncio.set_event_loop(loop)
1674
-
1675
  try:
1676
- # Detect data size and adjust timeout mode accordingly
1677
- patientid = data.get("patientid")
1678
- token = data.get("token")
1679
- key = data.get("key")
1680
 
1681
- # Quick data size check to determine timeout mode
1682
- timeout_mode = data.get('timeout_mode', 'normal')
1683
- if timeout_mode == 'normal':
1684
- try:
1685
- # Get a sample of the data to estimate size using the same pattern as async_patient_summary
1686
- import requests
1687
- ehr_url = f"{key.strip()}/Transactionapi/api/PatientList/patientsummary"
1688
- headers = {"Authorization": f"Bearer {token}", "X-API-Key": key}
1689
-
1690
- response = requests.post(
1691
- ehr_url,
1692
- json={"patientid": patientid},
1693
- headers=headers,
1694
- timeout=30
1695
- )
1696
- if response.status_code == 200:
1697
- sample_data = response.json()
1698
- data_size = len(str(sample_data))
1699
- else:
1700
- data_size = 0 # Default to small data if can't fetch
1701
-
1702
- if data_size > 100000: # >100KB
1703
- timeout_mode = 'large_data'
1704
- data['timeout_mode'] = 'large_data'
1705
- print(f"📊 Large dataset detected ({data_size} chars), switching to large_data timeout mode")
1706
- elif data_size > 50000: # >50KB
1707
- timeout_mode = 'extended'
1708
- data['timeout_mode'] = 'extended'
1709
- print(f"📊 Medium dataset detected ({data_size} chars), switching to extended timeout mode")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1710
 
1711
- except Exception as e:
1712
- print(f"⚠️ Could not check data size, using default timeout mode: {e}")
1713
-
1714
- # Always use optimized parallel generation for better timeout handling
1715
- generation_mode = data.get('generation_mode', 'rule').lower()
1716
-
1717
- # Force optimized generation for large data or when explicitly requested
1718
- if (generation_mode in ['gguf', 'summarization'] or
1719
- timeout_mode in ['extended', 'large_data'] or
1720
- data_size > 30000): # Force optimization for >30KB data
 
 
 
 
1721
 
1722
- print(f"🚀 Using optimized parallel generation (mode: {generation_mode}, timeout: {timeout_mode}, size: {data_size} chars)")
1723
- result = loop.run_until_complete(async_patient_summary_optimized(data, job_id))
1724
- else:
1725
- print(f"⚠️ Using legacy generation (mode: {generation_mode}, timeout: {timeout_mode}, size: {data_size} chars)")
1726
- try:
1727
- result = loop.run_until_complete(async_patient_summary(data, job_id))
1728
- except Exception as legacy_error:
1729
- if "timeout" in str(legacy_error).lower() and data_size > 20000:
1730
- print(f"🔄 Legacy generation timed out, retrying with optimized generation...")
1731
- # Force optimized generation on timeout
1732
- data['generation_mode'] = 'summarization'
1733
- result = loop.run_until_complete(async_patient_summary_optimized(data, job_id))
1734
- else:
1735
- raise legacy_error
1736
-
1737
- # Add performance metrics
1738
- processing_time = time.perf_counter() - start_time
1739
- result["timing"]["total"] = processing_time
1740
- result["performance"] = {
1741
- "parallel_generation": generation_mode in ['gguf', 'summarization'],
1742
- "processing_time_seconds": processing_time,
1743
- "timeout_mode_used": timeout_mode,
1744
- "job_id": job_id
1745
- }
1746
-
1747
- update_job(job_id, 'completed', progress=100, data=result)
1748
- print(f"✅ Enhanced background task completed successfully for job_id: {job_id} in {processing_time:.2f}s (timeout_mode: {timeout_mode})")
1749
-
1750
- except Exception as e:
1751
- processing_time = time.perf_counter() - start_time
1752
- print(f"❌ Async task error for job_id {job_id} after {processing_time:.2f}s: {str(e)}")
1753
- import traceback
1754
- traceback.print_exc()
1755
- try:
1756
- log_exception_with_memory(f"[STREAM] Background task error (job_id={job_id})", e)
1757
- except Exception:
1758
- pass
1759
-
1760
- # Create detailed error response for background task errors
1761
- error_type = "generation_failed"
1762
- if "timeout" in str(e).lower():
1763
- error_type = "generation_timeout"
1764
- elif "memory" in str(e).lower():
1765
- error_type = "memory_error"
1766
- elif "connection" in str(e).lower():
1767
- error_type = "connection_error"
1768
-
1769
- error_response = {
1770
- "error": str(e),
1771
- "error_type": error_type,
1772
- "status": "error",
1773
- "timing": {
1774
- "total": processing_time,
1775
- "background_task": True
1776
- },
1777
- "prompt_info": {
1778
- "prompt_size_chars": 0,
1779
- "prompt_preview": "Background task error - prompt not available",
1780
- "full_prompt": "Background task error - prompt not available",
1781
- "data_size_chars": 0
1782
- },
1783
- "recommendations": [
1784
- "Check the error details",
1785
- "Consider using timeout_mode='extended' or 'large_data'",
1786
- "Try reducing data size or using chunking",
1787
- "Use the /generate_patient_summary_large_data endpoint for large datasets"
1788
- ]
1789
- }
1790
-
1791
- update_job_with_error(job_id, str(e), error_type, error_response)
1792
  finally:
1793
- loop.close()
1794
-
 
1795
  except Exception as e:
1796
  processing_time = time.perf_counter() - start_time
1797
- print(f"Background task outer error for job_id {job_id} after {processing_time:.2f}s: {str(e)}")
1798
- import traceback
1799
- traceback.print_exc()
1800
- try:
1801
- log_exception_with_memory(f"[STREAM] Background task outer error (job_id={job_id})", e)
1802
- except Exception:
1803
- pass
1804
 
1805
  # Create detailed error response for outer background task errors
1806
  error_type = "generation_failed"
@@ -1834,6 +1833,9 @@ def process_patient_summary_background(data, job_id):
1834
  }
1835
 
1836
  update_job_with_error(job_id, str(e), error_type, error_response)
 
 
 
1837
 
1838
  async def async_patient_summary_optimized(data, job_id=None):
1839
  """Optimized async implementation using enhanced timeout handling"""
@@ -2118,9 +2120,17 @@ async def ready():
2118
  except Exception:
2119
  return {"error": "Failed to get readiness status"}
2120
 
2121
- @router.get("/metrics")
2122
- async def metrics():
2123
- return {"performance": PERFORMANCE_METRICS, "loaded_models": {}}
 
 
 
 
 
 
 
 
2124
 
2125
  # Home route
2126
  @router.get("/", response_class=HTMLResponse)
@@ -2420,14 +2430,36 @@ async def generate_patient_summary_large_data(
2420
  # Log request start - use safe logging
2421
  log_error_safely(None, f"[LARGE_DATA] Starting large data processing request_id={request_id} timeout_mode={timeout_mode}", level=logging.INFO)
2422
 
 
 
 
 
 
 
 
 
 
 
2423
  # Create job for processing using job manager
2424
  job_id = job_manager.create_job(request_id=request_id, initial_data={
2425
- 'message': f'🚀 Starting large data processing with {timeout_mode} timeout mode...'
 
2426
  })
2427
  job_manager.update_job(job_id, JOB_STATUS["QUEUED"], progress=0)
2428
-
2429
- # Start background task with optimized generation
2430
- threading.Thread(target=process_patient_summary_background, args=(data, job_id), daemon=True).start()
 
 
 
 
 
 
 
 
 
 
 
2431
 
2432
  # Use SSE generator service instead of custom generator
2433
  return StreamingResponse(
@@ -2470,14 +2502,36 @@ async def generate_patient_summary_streaming(
2470
  # Log request start - use safe logging
2471
  log_error_safely(None, f"[STREAMING] Enhanced parallel generation start request_id={request_id}", level=logging.INFO)
2472
 
 
 
 
 
 
 
 
 
 
 
2473
  # Create job for streaming using job manager
2474
  job_id = job_manager.create_job(request_id=request_id, initial_data={
2475
- 'message': '🚀 Starting enhanced parallel generation...'
 
2476
  })
2477
  job_manager.update_job(job_id, JOB_STATUS["QUEUED"], progress=0)
2478
 
2479
- # Start background task with optimized generation
2480
- threading.Thread(target=process_patient_summary_background, args=(data, job_id), daemon=True).start()
 
 
 
 
 
 
 
 
 
 
 
2481
 
2482
  # Use SSE generator service instead of custom generator
2483
  return StreamingResponse(
@@ -2541,15 +2595,42 @@ async def generate_patient_summary(
2541
  print(f"🚀 Using extended streaming generator for ALL requests to prevent timeout issues")
2542
  print(f"🔍 Detection - HF Spaces: {is_hf_spaces}, GGUF Mode: {is_gguf_mode}")
2543
 
 
 
 
 
 
 
 
 
 
 
 
2544
  # Create job for streaming using job manager
2545
  job_id = job_manager.create_job(request_id=request_id, initial_data={
2546
  'request_id': request_id,
2547
- 'message': 'GGUF model loading and generation in progress...' if is_gguf_mode else 'Job queued...'
 
2548
  })
2549
  job_manager.update_job(job_id, JOB_STATUS["QUEUED"], progress=0)
2550
 
2551
- # Start background task
2552
- threading.Thread(target=process_patient_summary_background, args=(data, job_id), daemon=True).start()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2553
 
2554
  # ALWAYS use extended generator to prevent timeout issues
2555
  # Use SSE generator service directly
 
38
  CHUNKING_SIZE_THRESHOLD, CHUNK_SIZE_VISITS, SSE_CONFIG,
39
  JOB_STATUS, GENERATION_MODES
40
  )
41
+ from ..services.job_manager import get_job_manager
42
  from ..services.error_handler import (
43
  log_error_safely, handle_error_gracefully, update_job_with_error,
44
  ErrorCategory, PatientSummaryError
45
  )
46
+ from ..services.sse_generator import (
47
+ sse_generator as sse_generator_service,
48
+ sse_generator_extended as sse_generator_extended_service
49
+ )
50
+ from ..services.request_queue import get_queue_manager, RequestPriority, QueuedRequest
51
+ from collections import deque
52
  from ..utils.common_helpers import (
53
  extract_text_from_pipeline_result, validate_required_fields,
54
  is_error_response, create_error_dict, merge_config
 
60
  # Global agents variable - will be set during registration
61
  agents = {}
62
 
63
+ # Initialize job manager and queue manager
64
  job_manager = get_job_manager()
65
+ queue_manager = get_queue_manager()
66
 
67
  # ========== PERFORMANCE TUNING HELPERS ==========
68
  def _effective_max_new_tokens(requested: int | None, default: int = 1024) -> int:
 
1669
  return error_response
1670
 
1671
  def process_patient_summary_background(data, job_id):
1672
+ """Enhanced background task for patient summary generation with queue management"""
1673
+ request_id = data.get("request_id") or "n/a"
1674
  start_time = time.perf_counter()
1675
 
1676
  try:
1677
+ # Release queue slot when processing completes (in finally block)
 
 
 
1678
  try:
1679
+ # Create a new event loop for this thread
1680
+ loop = asyncio.new_event_loop()
1681
+ asyncio.set_event_loop(loop)
 
1682
 
1683
+ try:
1684
+ # Detect data size and adjust timeout mode accordingly
1685
+ patientid = data.get("patientid")
1686
+ token = data.get("token")
1687
+ key = data.get("key")
1688
+ data_size = 0
1689
+
1690
+ # Quick data size check to determine timeout mode
1691
+ timeout_mode = data.get('timeout_mode', 'normal')
1692
+ if timeout_mode == 'normal':
1693
+ try:
1694
+ # Get a sample of the data to estimate size using the same pattern as async_patient_summary
1695
+ import requests
1696
+ ehr_url = f"{key.strip()}/Transactionapi/api/PatientList/patientsummary"
1697
+ headers = {"Authorization": f"Bearer {token}", "X-API-Key": key}
1698
+
1699
+ response = requests.post(
1700
+ ehr_url,
1701
+ json={"patientid": patientid},
1702
+ headers=headers,
1703
+ timeout=30
1704
+ )
1705
+ if response.status_code == 200:
1706
+ sample_data = response.json()
1707
+ data_size = len(str(sample_data))
1708
+ else:
1709
+ data_size = 0 # Default to small data if can't fetch
1710
+
1711
+ if data_size >= LARGE_DATA_THRESHOLD:
1712
+ timeout_mode = 'large_data'
1713
+ data['timeout_mode'] = 'large_data'
1714
+ log_error_safely(None, f"Large dataset detected ({data_size} chars), switching to large_data timeout mode", level=logging.INFO)
1715
+ elif data_size >= MEDIUM_DATA_THRESHOLD:
1716
+ timeout_mode = 'extended'
1717
+ data['timeout_mode'] = 'extended'
1718
+ log_error_safely(None, f"Medium dataset detected ({data_size} chars), switching to extended timeout mode", level=logging.INFO)
1719
+ except Exception as e:
1720
+ log_error_safely(e, "Could not check data size, using default timeout mode", job_id)
1721
+
1722
+ # Always use optimized parallel generation for better timeout handling
1723
+ generation_mode = data.get('generation_mode', 'rule').lower()
1724
+
1725
+ # Force optimized generation for large data or when explicitly requested
1726
+ if (generation_mode in ['gguf', 'summarization'] or
1727
+ timeout_mode in ['extended', 'large_data']):
1728
 
1729
+ log_error_safely(None, f"Using optimized parallel generation (mode: {generation_mode}, timeout: {timeout_mode})", level=logging.INFO)
1730
+ result = loop.run_until_complete(async_patient_summary_optimized(data, job_id))
1731
+ else:
1732
+ log_error_safely(None, f"Using legacy generation (mode: {generation_mode}, timeout: {timeout_mode})", level=logging.INFO)
1733
+ try:
1734
+ result = loop.run_until_complete(async_patient_summary(data, job_id))
1735
+ except Exception as legacy_error:
1736
+ if "timeout" in str(legacy_error).lower():
1737
+ log_error_safely(None, "Legacy generation timed out, retrying with optimized generation...", level=logging.WARNING)
1738
+ # Force optimized generation on timeout
1739
+ data['generation_mode'] = 'summarization'
1740
+ result = loop.run_until_complete(async_patient_summary_optimized(data, job_id))
1741
+ else:
1742
+ raise legacy_error
1743
 
1744
+ # Add performance metrics
1745
+ processing_time = time.perf_counter() - start_time
1746
+ if isinstance(result, dict) and 'timing' in result:
1747
+ result["timing"]["total"] = processing_time
1748
+ result["performance"] = {
1749
+ "parallel_generation": generation_mode in ['gguf', 'summarization'],
1750
+ "processing_time_seconds": processing_time,
1751
+ "timeout_mode_used": timeout_mode,
1752
+ "job_id": job_id
1753
+ }
1754
+
1755
+ update_job(job_id, 'completed', progress=100, data=result)
1756
+ log_error_safely(None, f"Background task completed successfully for job_id: {job_id} (timeout_mode: {timeout_mode})", level=logging.INFO)
1757
+
1758
+ except Exception as e:
1759
+ processing_time = time.perf_counter() - start_time
1760
+ log_error_safely(e, f"Async task error for job_id {job_id}", job_id)
1761
+
1762
+ # Create detailed error response for background task errors
1763
+ error_type = "generation_failed"
1764
+ if "timeout" in str(e).lower():
1765
+ error_type = "generation_timeout"
1766
+ elif "memory" in str(e).lower():
1767
+ error_type = "memory_error"
1768
+ elif "connection" in str(e).lower():
1769
+ error_type = "connection_error"
1770
+
1771
+ error_response = {
1772
+ "error": str(e),
1773
+ "error_type": error_type,
1774
+ "status": "error",
1775
+ "timing": {
1776
+ "total": processing_time,
1777
+ "background_task": True
1778
+ },
1779
+ "prompt_info": {
1780
+ "prompt_size_chars": 0,
1781
+ "prompt_preview": "Background task error - prompt not available",
1782
+ "full_prompt": "Background task error - prompt not available",
1783
+ "data_size_chars": 0
1784
+ },
1785
+ "recommendations": [
1786
+ "Check the error details",
1787
+ "Consider using timeout_mode='extended' or 'large_data'",
1788
+ "Try reducing data size or using chunking",
1789
+ "Use the /generate_patient_summary_large_data endpoint for large datasets"
1790
+ ]
1791
+ }
1792
+
1793
+ update_job_with_error(job_id, str(e), error_type, error_response)
1794
+ finally:
1795
+ loop.close()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1796
  finally:
1797
+ # Always release queue slot when done
1798
+ queue_manager.release_slot(request_id)
1799
+
1800
  except Exception as e:
1801
  processing_time = time.perf_counter() - start_time
1802
+ log_error_safely(e, f"Background task outer error for job_id {job_id}", job_id)
 
 
 
 
 
 
1803
 
1804
  # Create detailed error response for outer background task errors
1805
  error_type = "generation_failed"
 
1833
  }
1834
 
1835
  update_job_with_error(job_id, str(e), error_type, error_response)
1836
+
1837
+ # Release queue slot even on outer error
1838
+ queue_manager.release_slot(request_id)
1839
 
1840
  async def async_patient_summary_optimized(data, job_id=None):
1841
  """Optimized async implementation using enhanced timeout handling"""
 
2120
  except Exception:
2121
  return {"error": "Failed to get readiness status"}
2122
 
2123
+ @router.get("/api/queue_status")
2124
+ async def get_queue_status():
2125
+ """Get current request queue status."""
2126
+ try:
2127
+ status = queue_manager.get_queue_status()
2128
+ return JSONResponse(content=status)
2129
+ except Exception as e:
2130
+ return JSONResponse(
2131
+ status_code=500,
2132
+ content={"error": f"Failed to get queue status: {str(e)}"}
2133
+ )
2134
 
2135
  # Home route
2136
  @router.get("/", response_class=HTMLResponse)
 
2430
  # Log request start - use safe logging
2431
  log_error_safely(None, f"[LARGE_DATA] Starting large data processing request_id={request_id} timeout_mode={timeout_mode}", level=logging.INFO)
2432
 
2433
+ # Check queue capacity and enqueue request
2434
+ accepted, error_msg = queue_manager.enqueue_request(
2435
+ request_id=request_id,
2436
+ job_id=None, # Will be set after job creation
2437
+ priority=RequestPriority.NORMAL
2438
+ )
2439
+
2440
+ if not accepted:
2441
+ raise HTTPException(status_code=503, detail=error_msg)
2442
+
2443
  # Create job for processing using job manager
2444
  job_id = job_manager.create_job(request_id=request_id, initial_data={
2445
+ 'message': f'🚀 Starting large data processing with {timeout_mode} timeout mode...',
2446
+ 'queue_position': queue_manager.get_queue_status()['queue_size'] + 1
2447
  })
2448
  job_manager.update_job(job_id, JOB_STATUS["QUEUED"], progress=0)
2449
+
2450
+ # Wait for processing slot and start background task
2451
+ def wait_and_process():
2452
+ """Wait for slot and then process."""
2453
+ if queue_manager.wait_for_slot(request_id, timeout=300):
2454
+ # Start background task with optimized generation
2455
+ threading.Thread(target=process_patient_summary_background, args=(data, job_id), daemon=True).start()
2456
+ else:
2457
+ # Timeout waiting for slot
2458
+ job_manager.update_job(job_id, JOB_STATUS["ERROR"], error="Request timed out waiting for processing slot")
2459
+ queue_manager.release_slot(request_id)
2460
+
2461
+ # Start waiting thread
2462
+ threading.Thread(target=wait_and_process, daemon=True).start()
2463
 
2464
  # Use SSE generator service instead of custom generator
2465
  return StreamingResponse(
 
2502
  # Log request start - use safe logging
2503
  log_error_safely(None, f"[STREAMING] Enhanced parallel generation start request_id={request_id}", level=logging.INFO)
2504
 
2505
+ # Check queue capacity and enqueue request
2506
+ accepted, error_msg = queue_manager.enqueue_request(
2507
+ request_id=request_id,
2508
+ job_id=None, # Will be set after job creation
2509
+ priority=RequestPriority.NORMAL
2510
+ )
2511
+
2512
+ if not accepted:
2513
+ raise HTTPException(status_code=503, detail=error_msg)
2514
+
2515
  # Create job for streaming using job manager
2516
  job_id = job_manager.create_job(request_id=request_id, initial_data={
2517
+ 'message': '🚀 Starting enhanced parallel generation...',
2518
+ 'queue_position': queue_manager.get_queue_status()['queue_size'] + 1
2519
  })
2520
  job_manager.update_job(job_id, JOB_STATUS["QUEUED"], progress=0)
2521
 
2522
+ # Wait for processing slot and start background task
2523
+ def wait_and_process():
2524
+ """Wait for slot and then process."""
2525
+ if queue_manager.wait_for_slot(request_id, timeout=300):
2526
+ # Start background task with optimized generation
2527
+ threading.Thread(target=process_patient_summary_background, args=(data, job_id), daemon=True).start()
2528
+ else:
2529
+ # Timeout waiting for slot
2530
+ job_manager.update_job(job_id, JOB_STATUS["ERROR"], error="Request timed out waiting for processing slot")
2531
+ queue_manager.release_slot(request_id)
2532
+
2533
+ # Start waiting thread
2534
+ threading.Thread(target=wait_and_process, daemon=True).start()
2535
 
2536
  # Use SSE generator service instead of custom generator
2537
  return StreamingResponse(
 
2595
  print(f"🚀 Using extended streaming generator for ALL requests to prevent timeout issues")
2596
  print(f"🔍 Detection - HF Spaces: {is_hf_spaces}, GGUF Mode: {is_gguf_mode}")
2597
 
2598
+ # Check queue capacity and enqueue request (for streaming only)
2599
+ if stream:
2600
+ accepted, error_msg = queue_manager.enqueue_request(
2601
+ request_id=request_id,
2602
+ job_id=None, # Will be set after job creation
2603
+ priority=RequestPriority.NORMAL
2604
+ )
2605
+
2606
+ if not accepted:
2607
+ raise HTTPException(status_code=503, detail=error_msg)
2608
+
2609
  # Create job for streaming using job manager
2610
  job_id = job_manager.create_job(request_id=request_id, initial_data={
2611
  'request_id': request_id,
2612
+ 'message': 'GGUF model loading and generation in progress...' if is_gguf_mode else 'Job queued...',
2613
+ 'queue_position': queue_manager.get_queue_status()['queue_size'] + 1 if stream else 0
2614
  })
2615
  job_manager.update_job(job_id, JOB_STATUS["QUEUED"], progress=0)
2616
 
2617
+ # Wait for processing slot and start background task (for streaming only)
2618
+ if stream:
2619
+ def wait_and_process():
2620
+ """Wait for slot and then process."""
2621
+ if queue_manager.wait_for_slot(request_id, timeout=300):
2622
+ # Start background task
2623
+ threading.Thread(target=process_patient_summary_background, args=(data, job_id), daemon=True).start()
2624
+ else:
2625
+ # Timeout waiting for slot
2626
+ job_manager.update_job(job_id, JOB_STATUS["ERROR"], error="Request timed out waiting for processing slot")
2627
+ queue_manager.release_slot(request_id)
2628
+
2629
+ # Start waiting thread
2630
+ threading.Thread(target=wait_and_process, daemon=True).start()
2631
+ else:
2632
+ # Non-streaming: process immediately (no queue management needed)
2633
+ pass
2634
 
2635
  # ALWAYS use extended generator to prevent timeout issues
2636
  # Use SSE generator service directly
services/ai-service/src/ai_med_extract/services/__init__.py ADDED
@@ -0,0 +1,31 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Services module for patient summary generation.
3
+ """
4
+
5
+ from .job_manager import get_job_manager, JobManager
6
+ from .error_handler import (
7
+ log_error_safely, handle_error_gracefully, update_job_with_error,
8
+ ErrorCategory, PatientSummaryError, categorize_error, create_error_response
9
+ )
10
+ from .sse_generator import sse_generator, sse_generator_extended, SSEGenerator
11
+ from .request_queue import get_queue_manager, RequestQueueManager, RequestPriority, QueuedRequest
12
+
13
+ __all__ = [
14
+ 'get_job_manager',
15
+ 'JobManager',
16
+ 'log_error_safely',
17
+ 'handle_error_gracefully',
18
+ 'update_job_with_error',
19
+ 'ErrorCategory',
20
+ 'PatientSummaryError',
21
+ 'categorize_error',
22
+ 'create_error_response',
23
+ 'sse_generator',
24
+ 'sse_generator_extended',
25
+ 'SSEGenerator',
26
+ 'get_queue_manager',
27
+ 'RequestQueueManager',
28
+ 'RequestPriority',
29
+ 'QueuedRequest',
30
+ ]
31
+
services/ai-service/src/ai_med_extract/services/request_queue.py ADDED
@@ -0,0 +1,303 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Request Queue Manager for handling concurrent requests on Hugging Face Spaces.
3
+
4
+ Provides request queuing, throttling, and resource management for limited-resource environments.
5
+ """
6
+
7
+ import asyncio
8
+ import time
9
+ import threading
10
+ from typing import Dict, Optional, Callable, Any
11
+ from collections import deque
12
+ from dataclasses import dataclass
13
+ from enum import Enum
14
+ import logging
15
+
16
+ logger = logging.getLogger(__name__)
17
+
18
+
19
+ class RequestPriority(Enum):
20
+ """Request priority levels."""
21
+ HIGH = 1
22
+ NORMAL = 2
23
+ LOW = 3
24
+
25
+
26
+ @dataclass
27
+ class QueuedRequest:
28
+ """Represents a queued request."""
29
+ request_id: str
30
+ job_id: str
31
+ priority: RequestPriority
32
+ timestamp: float
33
+ callback: Optional[Callable] = None
34
+
35
+
36
+ class RequestQueueManager:
37
+ """
38
+ Manages concurrent requests with queuing and resource limits.
39
+
40
+ Designed for Hugging Face Spaces with limited resources (T4 medium GPU).
41
+ """
42
+
43
+ def __init__(
44
+ self,
45
+ max_concurrent: int = 2, # T4 medium can handle 2 concurrent requests
46
+ max_queue_size: int = 10,
47
+ queue_timeout: int = 300 # 5 minutes max wait in queue
48
+ ):
49
+ """
50
+ Initialize request queue manager.
51
+
52
+ Args:
53
+ max_concurrent: Maximum concurrent requests (default: 2 for T4 medium)
54
+ max_queue_size: Maximum queue size
55
+ queue_timeout: Maximum time to wait in queue (seconds)
56
+ """
57
+ self.max_concurrent = max_concurrent
58
+ self.max_queue_size = max_queue_size
59
+ self.queue_timeout = queue_timeout
60
+
61
+ self._queue: deque = deque()
62
+ self._active_requests: Dict[str, float] = {} # request_id -> start_time
63
+ self._lock = threading.RLock()
64
+ self._condition = threading.Condition(self._lock)
65
+
66
+ # Statistics
67
+ self._total_processed = 0
68
+ self._total_rejected = 0
69
+ self._total_timeout = 0
70
+
71
+ def can_accept_request(self) -> bool:
72
+ """
73
+ Check if a new request can be accepted.
74
+
75
+ Returns:
76
+ True if request can be accepted
77
+ """
78
+ with self._lock:
79
+ active_count = len(self._active_requests)
80
+ queue_size = len(self._queue)
81
+
82
+ # Check if we can accept immediately
83
+ if active_count < self.max_concurrent:
84
+ return True
85
+
86
+ # Check if queue has space
87
+ if queue_size < self.max_queue_size:
88
+ return True
89
+
90
+ return False
91
+
92
+ def enqueue_request(
93
+ self,
94
+ request_id: str,
95
+ job_id: str,
96
+ priority: RequestPriority = RequestPriority.NORMAL
97
+ ) -> tuple[bool, Optional[str]]:
98
+ """
99
+ Enqueue a request for processing.
100
+
101
+ Args:
102
+ request_id: Request identifier
103
+ job_id: Job identifier
104
+ priority: Request priority
105
+
106
+ Returns:
107
+ Tuple of (accepted, error_message)
108
+ """
109
+ with self._lock:
110
+ # Check if we can process immediately
111
+ if len(self._active_requests) < self.max_concurrent:
112
+ self._active_requests[request_id] = time.time()
113
+ self._total_processed += 1
114
+ logger.info(f"Request {request_id} accepted immediately (active: {len(self._active_requests)})")
115
+ return True, None
116
+
117
+ # Check queue capacity
118
+ if len(self._queue) >= self.max_queue_size:
119
+ self._total_rejected += 1
120
+ error_msg = f"Queue full ({len(self._queue)}/{self.max_queue_size}). Please try again later."
121
+ logger.warning(f"Request {request_id} rejected: {error_msg}")
122
+ return False, error_msg
123
+
124
+ # Add to queue
125
+ queued_request = QueuedRequest(
126
+ request_id=request_id,
127
+ job_id=job_id,
128
+ priority=priority,
129
+ timestamp=time.time()
130
+ )
131
+
132
+ # Insert based on priority
133
+ inserted = False
134
+ for i, req in enumerate(self._queue):
135
+ if priority.value < req.priority.value:
136
+ self._queue.insert(i, queued_request)
137
+ inserted = True
138
+ break
139
+
140
+ if not inserted:
141
+ self._queue.append(queued_request)
142
+
143
+ logger.info(f"Request {request_id} queued (position: {len(self._queue)}, active: {len(self._active_requests)})")
144
+
145
+ # Notify waiting threads
146
+ self._condition.notify_all()
147
+
148
+ return True, None
149
+
150
+ def wait_for_slot(self, request_id: str, timeout: Optional[int] = None) -> bool:
151
+ """
152
+ Wait for a processing slot to become available.
153
+
154
+ Args:
155
+ request_id: Request identifier
156
+ timeout: Timeout in seconds (defaults to queue_timeout)
157
+
158
+ Returns:
159
+ True if slot acquired, False if timeout
160
+ """
161
+ timeout = timeout or self.queue_timeout
162
+ start_time = time.time()
163
+
164
+ with self._condition:
165
+ while True:
166
+ # Check if we can process now
167
+ if len(self._active_requests) < self.max_concurrent:
168
+ # Remove from queue if present
169
+ self._queue = deque([r for r in self._queue if r.request_id != request_id])
170
+ self._active_requests[request_id] = time.time()
171
+ logger.info(f"Request {request_id} acquired slot (active: {len(self._active_requests)})")
172
+ return True
173
+
174
+ # Check timeout
175
+ elapsed = time.time() - start_time
176
+ if elapsed >= timeout:
177
+ # Remove from queue
178
+ self._queue = deque([r for r in self._queue if r.request_id != request_id])
179
+ self._total_timeout += 1
180
+ logger.warning(f"Request {request_id} timed out waiting for slot ({elapsed:.1f}s)")
181
+ return False
182
+
183
+ # Wait for notification
184
+ remaining_timeout = timeout - elapsed
185
+ self._condition.wait(timeout=min(remaining_timeout, 5.0))
186
+
187
+ def release_slot(self, request_id: str) -> None:
188
+ """
189
+ Release a processing slot.
190
+
191
+ Args:
192
+ request_id: Request identifier
193
+ """
194
+ with self._lock:
195
+ if request_id in self._active_requests:
196
+ processing_time = time.time() - self._active_requests[request_id]
197
+ del self._active_requests[request_id]
198
+ logger.info(f"Request {request_id} released slot (processing time: {processing_time:.1f}s, active: {len(self._active_requests)})")
199
+
200
+ # Notify waiting threads
201
+ self._condition.notify_all()
202
+
203
+ def get_queue_status(self) -> Dict[str, Any]:
204
+ """
205
+ Get current queue status.
206
+
207
+ Returns:
208
+ Dictionary with queue statistics
209
+ """
210
+ with self._lock:
211
+ return {
212
+ "active_requests": len(self._active_requests),
213
+ "queue_size": len(self._queue),
214
+ "max_concurrent": self.max_concurrent,
215
+ "max_queue_size": self.max_queue_size,
216
+ "total_processed": self._total_processed,
217
+ "total_rejected": self._total_rejected,
218
+ "total_timeout": self._total_timeout,
219
+ "queue_positions": [
220
+ {
221
+ "request_id": req.request_id,
222
+ "job_id": req.job_id,
223
+ "priority": req.priority.name,
224
+ "wait_time": time.time() - req.timestamp
225
+ }
226
+ for req in self._queue
227
+ ]
228
+ }
229
+
230
+ def cleanup_old_requests(self, max_age: int = 3600) -> int:
231
+ """
232
+ Clean up old requests from tracking.
233
+
234
+ Args:
235
+ max_age: Maximum age in seconds
236
+
237
+ Returns:
238
+ Number of requests cleaned up
239
+ """
240
+ with self._lock:
241
+ current_time = time.time()
242
+ cleaned = 0
243
+
244
+ # Clean active requests
245
+ to_remove = [
246
+ req_id for req_id, start_time in self._active_requests.items()
247
+ if current_time - start_time > max_age
248
+ ]
249
+
250
+ for req_id in to_remove:
251
+ del self._active_requests[req_id]
252
+ cleaned += 1
253
+
254
+ # Clean queue
255
+ queue_size_before = len(self._queue)
256
+ self._queue = deque([
257
+ req for req in self._queue
258
+ if current_time - req.timestamp < max_age
259
+ ])
260
+ cleaned += queue_size_before - len(self._queue)
261
+
262
+ if cleaned > 0:
263
+ logger.info(f"Cleaned up {cleaned} old requests")
264
+
265
+ return cleaned
266
+
267
+
268
+ # Global singleton instance
269
+ _queue_manager: Optional[RequestQueueManager] = None
270
+
271
+
272
+ def get_queue_manager() -> RequestQueueManager:
273
+ """
274
+ Get the global queue manager instance (singleton pattern).
275
+
276
+ Returns:
277
+ RequestQueueManager instance
278
+ """
279
+ global _queue_manager
280
+ if _queue_manager is None:
281
+ # Detect HF Spaces and adjust limits
282
+ import os
283
+ is_hf_spaces = os.environ.get('HF_SPACES', 'false').lower() == 'true'
284
+
285
+ if is_hf_spaces:
286
+ # T4 medium: 2 concurrent requests, queue of 5
287
+ _queue_manager = RequestQueueManager(
288
+ max_concurrent=2,
289
+ max_queue_size=5,
290
+ queue_timeout=300
291
+ )
292
+ logger.info("Initialized RequestQueueManager for Hugging Face Spaces (T4 medium)")
293
+ else:
294
+ # Local/dev: more generous limits
295
+ _queue_manager = RequestQueueManager(
296
+ max_concurrent=4,
297
+ max_queue_size=20,
298
+ queue_timeout=600
299
+ )
300
+ logger.info("Initialized RequestQueueManager for local/development")
301
+
302
+ return _queue_manager
303
+