""" Test script to verify that queue slots are released immediately when jobs complete. This test verifies: 1. Queue slots are released IMMEDIATELY when jobs complete (not after 30s) 2. New requests can start without queuing after slot release 3. Jobs are still cleaned up after 30 seconds """ import time import sys from pathlib import Path # Add the src directory to the path src_path = Path(__file__).parent / "src" sys.path.insert(0, str(src_path)) from ai_med_extract.services.job_manager import get_job_manager from ai_med_extract.services.request_queue import get_queue_manager from ai_med_extract.utils.constants import JOB_STATUS def test_immediate_slot_release(): """Test that queue slots are released immediately when jobs complete.""" print("=" * 80) print("IMMEDIATE SLOT RELEASE TEST") print("=" * 80) print() job_manager = get_job_manager() queue_manager = get_queue_manager() # Get initial state initial_queue_status = queue_manager.get_queue_status() print(f"Initial state:") print(f" Active requests: {initial_queue_status['active_requests']}") print(f" Max concurrent: {initial_queue_status['max_concurrent']}") print() # Step 1: Enqueue a request and create a job print("Step 1: Creating request and job...") request_id = "test-slot-release-001" accepted, error_msg = queue_manager.enqueue_request(request_id, job_id=None) if not accepted: print(f"[FAIL] Request not accepted: {error_msg}") return False print(f"[OK] Request enqueued: {request_id}") # Wait for slot if not queue_manager.wait_for_slot(request_id, timeout=5): print("[FAIL] Failed to acquire slot") return False print(f"[OK] Slot acquired") # Create job job_id = job_manager.create_job(request_id=request_id) print(f"[OK] Created job: {job_id}") print() # Check queue status - should have 1 active request queue_status = queue_manager.get_queue_status() print(f"After slot acquisition:") print(f" Active requests: {queue_status['active_requests']}") if queue_status['active_requests'] != 1: print(f"[FAIL] Expected 1 active request, got {queue_status['active_requests']}") return False print(f"[OK] Slot is occupied (1 active request)") print() # Step 2: Complete the job print("Step 2: Completing the job...") completion_time = time.time() job_manager.update_job( job_id, status=JOB_STATUS["COMPLETED"], progress=100, data={"result": "Test completed"} ) print(f"[OK] Job completed at: {time.strftime('%H:%M:%S', time.localtime(completion_time))}") print() # Step 3: Verify slot is released IMMEDIATELY (not after 30s) print("Step 3: Verifying IMMEDIATE slot release...") time.sleep(1) # Wait 1 second (much less than 30s) queue_status = queue_manager.get_queue_status() print(f"After 1 second:") print(f" Active requests: {queue_status['active_requests']}") if queue_status['active_requests'] != 0: print(f"[FAIL] Slot NOT released immediately! Still {queue_status['active_requests']} active requests") print(" This means new requests will queue unnecessarily!") return False print(f"[OK] SUCCESS: Slot released IMMEDIATELY!") print(f" New requests can now start without queuing") print() # Step 4: Verify job still exists (cleanup happens after 30s) print("Step 4: Verifying job still exists (cleanup is delayed)...") if not job_manager.job_exists(job_id): print("[FAIL] Job was deleted too early!") return False print(f"[OK] Job still exists (will be cleaned up after 30s)") print() # Step 5: Verify a new request can start immediately print("Step 5: Testing that new request can start immediately...") request_id_2 = "test-slot-release-002" accepted, error_msg = queue_manager.enqueue_request(request_id_2, job_id=None) if not accepted: print(f"[FAIL] New request not accepted: {error_msg}") return False # Should get slot immediately since previous was released if not queue_manager.wait_for_slot(request_id_2, timeout=1): print("[FAIL] New request couldn't get slot immediately!") return False print(f"[OK] New request got slot immediately (no queuing!)") # Clean up queue_manager.release_slot(request_id_2) print() return True def test_error_job_slot_release(): """Test that queue slots are released immediately when jobs error.""" print("=" * 80) print("ERROR JOB SLOT RELEASE TEST") print("=" * 80) print() job_manager = get_job_manager() queue_manager = get_queue_manager() # Create request and job print("Creating request and job...") request_id = "test-error-slot-001" queue_manager.enqueue_request(request_id, job_id=None) queue_manager.wait_for_slot(request_id, timeout=5) job_id = job_manager.create_job(request_id=request_id) print(f"[OK] Created job: {job_id}") print() # Check active requests queue_status = queue_manager.get_queue_status() print(f"Active requests before error: {queue_status['active_requests']}") # Mark job as errored print("Marking job as ERROR...") job_manager.update_job( job_id, status=JOB_STATUS["ERROR"], error="Test error" ) print(f"[OK] Job errored") print() # Verify slot released immediately print("Verifying immediate slot release...") time.sleep(1) queue_status = queue_manager.get_queue_status() print(f"Active requests after error: {queue_status['active_requests']}") if queue_status['active_requests'] != 0: print(f"[FAIL] Slot NOT released for error job!") return False print(f"[OK] SUCCESS: Slot released immediately for error job!") print() return True def main(): """Run all slot release tests.""" print("\n") print("+" + "=" * 78 + "+") print("|" + " " * 20 + "QUEUE SLOT RELEASE VERIFICATION" + " " * 27 + "|") print("+" + "=" * 78 + "+") print() print(f"Test started at: {time.strftime('%Y-%m-%d %H:%M:%S')}") print() results = [] # Test 1: Immediate slot release on completion try: result1 = test_immediate_slot_release() results.append(("Immediate Slot Release on Completion", result1)) except Exception as e: print(f"[FAIL] Test failed with exception: {e}") import traceback traceback.print_exc() results.append(("Immediate Slot Release on Completion", False)) print() # Test 2: Immediate slot release on error try: result2 = test_error_job_slot_release() results.append(("Immediate Slot Release on Error", result2)) except Exception as e: print(f"[FAIL] Test failed with exception: {e}") import traceback traceback.print_exc() results.append(("Immediate Slot Release on Error", False)) # Print summary print() print("=" * 80) print("TEST SUMMARY") print("=" * 80) print() for test_name, passed in results: status = "[OK] PASSED" if passed else "[FAIL] FAILED" print(f"{status}: {test_name}") print() all_passed = all(result for _, result in results) if all_passed: print("+" + "=" * 78 + "+") print("|" + " " * 25 + "ALL TESTS PASSED!" + " " * 36 + "|") print("+" + "=" * 78 + "+") print() print("[OK] Queue slots are released IMMEDIATELY on job completion") print("[OK] New requests can start without queuing") print("[OK] Jobs are still cleaned up after 30 seconds") else: print("+" + "=" * 78 + "+") print("|" + " " * 25 + "SOME TESTS FAILED!" + " " * 35 + "|") print("+" + "=" * 78 + "+") print() print(f"Test completed at: {time.strftime('%Y-%m-%d %H:%M:%S')}") print() return 0 if all_passed else 1 if __name__ == "__main__": exit_code = main() sys.exit(exit_code)