sachinchandrankallar commited on
Commit
0447ebe
·
1 Parent(s): 56882e1

h immediate queue slot release and delayed job cleanup

Browse files
services/ai-service/src/ai_med_extract/services/job_manager.py CHANGED
@@ -33,6 +33,10 @@ class JobManager:
33
  self._max_job_age = 7200 # 2 hours
34
  self._terminal_job_cleanup_delay = 30 # 30 seconds delay before cleaning up completed/errored jobs
35
  self._auto_cleanup_enabled = True # Enable automatic cleanup of terminal jobs
 
 
 
 
36
 
37
  def create_job(self, request_id: Optional[str] = None, initial_data: Optional[Dict] = None) -> str:
38
  """
@@ -107,10 +111,26 @@ class JobManager:
107
 
108
  job['updated_at'] = time.time()
109
 
110
- # Auto-cleanup terminal jobs after delay
111
- if self._auto_cleanup_enabled and status is not None:
112
- if status in [JOB_STATUS["COMPLETED"], JOB_STATUS["ERROR"]]:
113
- logger.info(f"Job {job_id} reached terminal state '{status}', scheduling cleanup in {self._terminal_job_cleanup_delay}s")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114
  threading.Timer(
115
  self._terminal_job_cleanup_delay,
116
  lambda: self.delete_job(job_id)
 
33
  self._max_job_age = 7200 # 2 hours
34
  self._terminal_job_cleanup_delay = 30 # 30 seconds delay before cleaning up completed/errored jobs
35
  self._auto_cleanup_enabled = True # Enable automatic cleanup of terminal jobs
36
+ self._auto_release_slots = True # Enable automatic queue slot release on terminal status
37
+
38
+ # Lazy import to avoid circular dependency
39
+ self._queue_manager = None
40
 
41
  def create_job(self, request_id: Optional[str] = None, initial_data: Optional[Dict] = None) -> str:
42
  """
 
111
 
112
  job['updated_at'] = time.time()
113
 
114
+ # Auto-release queue slot and schedule cleanup for terminal jobs
115
+ if status is not None and status in [JOB_STATUS["COMPLETED"], JOB_STATUS["ERROR"]]:
116
+ request_id = job.get('request_id')
117
+
118
+ # Release queue slot IMMEDIATELY to free up capacity
119
+ if self._auto_release_slots and request_id:
120
+ try:
121
+ # Lazy import to avoid circular dependency
122
+ if self._queue_manager is None:
123
+ from .request_queue import get_queue_manager
124
+ self._queue_manager = get_queue_manager()
125
+
126
+ self._queue_manager.release_slot(request_id)
127
+ logger.info(f"Job {job_id} reached terminal state '{status}', released queue slot for request {request_id}")
128
+ except Exception as e:
129
+ logger.warning(f"Failed to release queue slot for request {request_id}: {e}")
130
+
131
+ # Schedule job cleanup after delay (job data remains accessible for 30s)
132
+ if self._auto_cleanup_enabled:
133
+ logger.info(f"Scheduling cleanup for job {job_id} in {self._terminal_job_cleanup_delay}s")
134
  threading.Timer(
135
  self._terminal_job_cleanup_delay,
136
  lambda: self.delete_job(job_id)
services/ai-service/test_slot_release.py ADDED
@@ -0,0 +1,260 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Test script to verify that queue slots are released immediately when jobs complete.
3
+
4
+ This test verifies:
5
+ 1. Queue slots are released IMMEDIATELY when jobs complete (not after 30s)
6
+ 2. New requests can start without queuing after slot release
7
+ 3. Jobs are still cleaned up after 30 seconds
8
+ """
9
+
10
+ import time
11
+ import sys
12
+ from pathlib import Path
13
+
14
+ # Add the src directory to the path
15
+ src_path = Path(__file__).parent / "src"
16
+ sys.path.insert(0, str(src_path))
17
+
18
+ from ai_med_extract.services.job_manager import get_job_manager
19
+ from ai_med_extract.services.request_queue import get_queue_manager
20
+ from ai_med_extract.utils.constants import JOB_STATUS
21
+
22
+
23
+ def test_immediate_slot_release():
24
+ """Test that queue slots are released immediately when jobs complete."""
25
+
26
+ print("=" * 80)
27
+ print("IMMEDIATE SLOT RELEASE TEST")
28
+ print("=" * 80)
29
+ print()
30
+
31
+ job_manager = get_job_manager()
32
+ queue_manager = get_queue_manager()
33
+
34
+ # Get initial state
35
+ initial_queue_status = queue_manager.get_queue_status()
36
+ print(f"Initial state:")
37
+ print(f" Active requests: {initial_queue_status['active_requests']}")
38
+ print(f" Max concurrent: {initial_queue_status['max_concurrent']}")
39
+ print()
40
+
41
+ # Step 1: Enqueue a request and create a job
42
+ print("Step 1: Creating request and job...")
43
+ request_id = "test-slot-release-001"
44
+
45
+ accepted, error_msg = queue_manager.enqueue_request(request_id, job_id=None)
46
+ if not accepted:
47
+ print(f"[FAIL] Request not accepted: {error_msg}")
48
+ return False
49
+
50
+ print(f"[OK] Request enqueued: {request_id}")
51
+
52
+ # Wait for slot
53
+ if not queue_manager.wait_for_slot(request_id, timeout=5):
54
+ print("[FAIL] Failed to acquire slot")
55
+ return False
56
+
57
+ print(f"[OK] Slot acquired")
58
+
59
+ # Create job
60
+ job_id = job_manager.create_job(request_id=request_id)
61
+ print(f"[OK] Created job: {job_id}")
62
+ print()
63
+
64
+ # Check queue status - should have 1 active request
65
+ queue_status = queue_manager.get_queue_status()
66
+ print(f"After slot acquisition:")
67
+ print(f" Active requests: {queue_status['active_requests']}")
68
+
69
+ if queue_status['active_requests'] != 1:
70
+ print(f"[FAIL] Expected 1 active request, got {queue_status['active_requests']}")
71
+ return False
72
+ print(f"[OK] Slot is occupied (1 active request)")
73
+ print()
74
+
75
+ # Step 2: Complete the job
76
+ print("Step 2: Completing the job...")
77
+ completion_time = time.time()
78
+ job_manager.update_job(
79
+ job_id,
80
+ status=JOB_STATUS["COMPLETED"],
81
+ progress=100,
82
+ data={"result": "Test completed"}
83
+ )
84
+ print(f"[OK] Job completed at: {time.strftime('%H:%M:%S', time.localtime(completion_time))}")
85
+ print()
86
+
87
+ # Step 3: Verify slot is released IMMEDIATELY (not after 30s)
88
+ print("Step 3: Verifying IMMEDIATE slot release...")
89
+ time.sleep(1) # Wait 1 second (much less than 30s)
90
+
91
+ queue_status = queue_manager.get_queue_status()
92
+ print(f"After 1 second:")
93
+ print(f" Active requests: {queue_status['active_requests']}")
94
+
95
+ if queue_status['active_requests'] != 0:
96
+ print(f"[FAIL] Slot NOT released immediately! Still {queue_status['active_requests']} active requests")
97
+ print(" This means new requests will queue unnecessarily!")
98
+ return False
99
+
100
+ print(f"[OK] SUCCESS: Slot released IMMEDIATELY!")
101
+ print(f" New requests can now start without queuing")
102
+ print()
103
+
104
+ # Step 4: Verify job still exists (cleanup happens after 30s)
105
+ print("Step 4: Verifying job still exists (cleanup is delayed)...")
106
+ if not job_manager.job_exists(job_id):
107
+ print("[FAIL] Job was deleted too early!")
108
+ return False
109
+
110
+ print(f"[OK] Job still exists (will be cleaned up after 30s)")
111
+ print()
112
+
113
+ # Step 5: Verify a new request can start immediately
114
+ print("Step 5: Testing that new request can start immediately...")
115
+ request_id_2 = "test-slot-release-002"
116
+
117
+ accepted, error_msg = queue_manager.enqueue_request(request_id_2, job_id=None)
118
+ if not accepted:
119
+ print(f"[FAIL] New request not accepted: {error_msg}")
120
+ return False
121
+
122
+ # Should get slot immediately since previous was released
123
+ if not queue_manager.wait_for_slot(request_id_2, timeout=1):
124
+ print("[FAIL] New request couldn't get slot immediately!")
125
+ return False
126
+
127
+ print(f"[OK] New request got slot immediately (no queuing!)")
128
+
129
+ # Clean up
130
+ queue_manager.release_slot(request_id_2)
131
+ print()
132
+
133
+ return True
134
+
135
+
136
+ def test_error_job_slot_release():
137
+ """Test that queue slots are released immediately when jobs error."""
138
+
139
+ print("=" * 80)
140
+ print("ERROR JOB SLOT RELEASE TEST")
141
+ print("=" * 80)
142
+ print()
143
+
144
+ job_manager = get_job_manager()
145
+ queue_manager = get_queue_manager()
146
+
147
+ # Create request and job
148
+ print("Creating request and job...")
149
+ request_id = "test-error-slot-001"
150
+
151
+ queue_manager.enqueue_request(request_id, job_id=None)
152
+ queue_manager.wait_for_slot(request_id, timeout=5)
153
+
154
+ job_id = job_manager.create_job(request_id=request_id)
155
+ print(f"[OK] Created job: {job_id}")
156
+ print()
157
+
158
+ # Check active requests
159
+ queue_status = queue_manager.get_queue_status()
160
+ print(f"Active requests before error: {queue_status['active_requests']}")
161
+
162
+ # Mark job as errored
163
+ print("Marking job as ERROR...")
164
+ job_manager.update_job(
165
+ job_id,
166
+ status=JOB_STATUS["ERROR"],
167
+ error="Test error"
168
+ )
169
+ print(f"[OK] Job errored")
170
+ print()
171
+
172
+ # Verify slot released immediately
173
+ print("Verifying immediate slot release...")
174
+ time.sleep(1)
175
+
176
+ queue_status = queue_manager.get_queue_status()
177
+ print(f"Active requests after error: {queue_status['active_requests']}")
178
+
179
+ if queue_status['active_requests'] != 0:
180
+ print(f"[FAIL] Slot NOT released for error job!")
181
+ return False
182
+
183
+ print(f"[OK] SUCCESS: Slot released immediately for error job!")
184
+ print()
185
+
186
+ return True
187
+
188
+
189
+ def main():
190
+ """Run all slot release tests."""
191
+
192
+ print("\n")
193
+ print("+" + "=" * 78 + "+")
194
+ print("|" + " " * 20 + "QUEUE SLOT RELEASE VERIFICATION" + " " * 27 + "|")
195
+ print("+" + "=" * 78 + "+")
196
+ print()
197
+ print(f"Test started at: {time.strftime('%Y-%m-%d %H:%M:%S')}")
198
+ print()
199
+
200
+ results = []
201
+
202
+ # Test 1: Immediate slot release on completion
203
+ try:
204
+ result1 = test_immediate_slot_release()
205
+ results.append(("Immediate Slot Release on Completion", result1))
206
+ except Exception as e:
207
+ print(f"[FAIL] Test failed with exception: {e}")
208
+ import traceback
209
+ traceback.print_exc()
210
+ results.append(("Immediate Slot Release on Completion", False))
211
+
212
+ print()
213
+
214
+ # Test 2: Immediate slot release on error
215
+ try:
216
+ result2 = test_error_job_slot_release()
217
+ results.append(("Immediate Slot Release on Error", result2))
218
+ except Exception as e:
219
+ print(f"[FAIL] Test failed with exception: {e}")
220
+ import traceback
221
+ traceback.print_exc()
222
+ results.append(("Immediate Slot Release on Error", False))
223
+
224
+ # Print summary
225
+ print()
226
+ print("=" * 80)
227
+ print("TEST SUMMARY")
228
+ print("=" * 80)
229
+ print()
230
+
231
+ for test_name, passed in results:
232
+ status = "[OK] PASSED" if passed else "[FAIL] FAILED"
233
+ print(f"{status}: {test_name}")
234
+
235
+ print()
236
+ all_passed = all(result for _, result in results)
237
+
238
+ if all_passed:
239
+ print("+" + "=" * 78 + "+")
240
+ print("|" + " " * 25 + "ALL TESTS PASSED!" + " " * 36 + "|")
241
+ print("+" + "=" * 78 + "+")
242
+ print()
243
+ print("[OK] Queue slots are released IMMEDIATELY on job completion")
244
+ print("[OK] New requests can start without queuing")
245
+ print("[OK] Jobs are still cleaned up after 30 seconds")
246
+ else:
247
+ print("+" + "=" * 78 + "+")
248
+ print("|" + " " * 25 + "SOME TESTS FAILED!" + " " * 35 + "|")
249
+ print("+" + "=" * 78 + "+")
250
+
251
+ print()
252
+ print(f"Test completed at: {time.strftime('%Y-%m-%d %H:%M:%S')}")
253
+ print()
254
+
255
+ return 0 if all_passed else 1
256
+
257
+
258
+ if __name__ == "__main__":
259
+ exit_code = main()
260
+ sys.exit(exit_code)