""" Test script to verify the auto-cleanup mechanism for completed jobs. This script tests that: 1. Jobs are created and tracked correctly 2. Jobs transition to COMPLETED status 3. Jobs are automatically cleaned up after 30 seconds 4. Slots are freed after cleanup """ import time import sys from pathlib import Path # Add the src directory to the path src_path = Path(__file__).parent / "src" sys.path.insert(0, str(src_path)) from ai_med_extract.services.job_manager import get_job_manager from ai_med_extract.utils.constants import JOB_STATUS def test_auto_cleanup_after_completion(): """Test that completed jobs are automatically cleaned up after 30 seconds.""" print("=" * 80) print("AUTO-CLEANUP VERIFICATION TEST") print("=" * 80) print() job_manager = get_job_manager() # Get initial job count initial_count = job_manager.get_job_count() print(f"Initial job count: {initial_count}") print() # Step 1: Create a test job print("Step 1: Creating test job...") job_id = job_manager.create_job(request_id="test-cleanup-001") print(f"[OK] Created job: {job_id}") print(f" Current job count: {job_manager.get_job_count()}") print() # Step 2: Verify job exists print("Step 2: Verifying job exists...") job = job_manager.get_job(job_id) if job: print(f"[OK] Job found with status: {job['status']}") print(f" Created at: {time.strftime('%H:%M:%S', time.localtime(job['created_at']))}") else: print("[FAIL] ERROR: Job not found!") return False print() # Step 3: Update job to PROCESSING print("Step 3: Updating job to PROCESSING...") job_manager.update_job(job_id, status=JOB_STATUS["PROCESSING"], progress=50) job = job_manager.get_job(job_id) print(f"[OK] Job status: {job['status']}, Progress: {job['progress']}%") print() # Step 4: Complete the job (this should trigger auto-cleanup timer) print("Step 4: Completing the job (triggering auto-cleanup)...") completion_time = time.time() job_manager.update_job( job_id, status=JOB_STATUS["COMPLETED"], progress=100, data={"result": "Test summary generated successfully"} ) job = job_manager.get_job(job_id) print(f"[OK] Job completed at: {time.strftime('%H:%M:%S', time.localtime(completion_time))}") print(f" Status: {job['status']}") print(f" Auto-cleanup scheduled for: {time.strftime('%H:%M:%S', time.localtime(completion_time + 30))}") print() # Step 5: Verify job still exists immediately after completion print("Step 5: Verifying job still exists immediately after completion...") time.sleep(1) # Wait 1 second if job_manager.job_exists(job_id): print(f"[OK] Job still exists (as expected)") print(f" Current job count: {job_manager.get_job_count()}") else: print("[FAIL] ERROR: Job was deleted too early!") return False print() # Step 6: Wait for auto-cleanup (30 seconds) cleanup_delay = 30 print(f"Step 6: Waiting {cleanup_delay} seconds for auto-cleanup...") print(" Progress: ", end="", flush=True) for i in range(cleanup_delay): time.sleep(1) # Print progress indicator if (i + 1) % 5 == 0: print(f"{i + 1}s ", end="", flush=True) else: print(".", end="", flush=True) print() print() # Step 7: Verify job has been cleaned up print("Step 7: Verifying job has been cleaned up...") time.sleep(2) # Give a bit of extra time for the cleanup thread to execute if job_manager.job_exists(job_id): print("[FAIL] FAILED: Job still exists after cleanup delay!") job = job_manager.get_job(job_id) print(f" Job status: {job['status']}") print(f" Updated at: {time.strftime('%H:%M:%S', time.localtime(job['updated_at']))}") return False else: print(f"[OK] SUCCESS: Job was automatically cleaned up!") print(f" Current job count: {job_manager.get_job_count()}") print(f" Slot is now FREE for new jobs") print() return True def test_error_job_cleanup(): """Test that jobs with ERROR status are also cleaned up.""" print("=" * 80) print("ERROR JOB CLEANUP TEST") print("=" * 80) print() job_manager = get_job_manager() # Create a job that will error print("Creating job that will error...") job_id = job_manager.create_job(request_id="test-error-001") print(f"[OK] Created job: {job_id}") print() # Mark job as errored print("Marking job as ERROR (triggering auto-cleanup)...") error_time = time.time() job_manager.update_job( job_id, status=JOB_STATUS["ERROR"], error="Test error message", error_data={"error_type": "TestError"} ) print(f"[OK] Job errored at: {time.strftime('%H:%M:%S', time.localtime(error_time))}") print(f" Auto-cleanup scheduled for: {time.strftime('%H:%M:%S', time.localtime(error_time + 30))}") print() # Verify job exists immediately print("Verifying job still exists...") time.sleep(1) if job_manager.job_exists(job_id): print("[OK] Job still exists (as expected)") else: print("[FAIL] ERROR: Job was deleted too early!") return False print() # Wait for cleanup print("Waiting 30 seconds for auto-cleanup...") print(" Progress: ", end="", flush=True) for i in range(30): time.sleep(1) if (i + 1) % 5 == 0: print(f"{i + 1}s ", end="", flush=True) else: print(".", end="", flush=True) print() print() # Verify cleanup time.sleep(2) if job_manager.job_exists(job_id): print("[FAIL] FAILED: Error job still exists after cleanup delay!") return False else: print("[OK] SUCCESS: Error job was automatically cleaned up!") print() return True def main(): """Run all cleanup verification tests.""" print("\n") print("+" + "=" * 78 + "+") print("|" + " " * 20 + "JOB CLEANUP VERIFICATION SUITE" + " " * 28 + "|") print("+" + "=" * 78 + "+") print() print(f"Test started at: {time.strftime('%Y-%m-%d %H:%M:%S')}") print() results = [] # Test 1: Completed job cleanup try: result1 = test_auto_cleanup_after_completion() results.append(("Completed Job Auto-Cleanup", result1)) except Exception as e: print(f"[FAIL] Test failed with exception: {e}") import traceback traceback.print_exc() results.append(("Completed Job Auto-Cleanup", False)) print() # Test 2: Error job cleanup try: result2 = test_error_job_cleanup() results.append(("Error Job Auto-Cleanup", result2)) except Exception as e: print(f"[FAIL] Test failed with exception: {e}") import traceback traceback.print_exc() results.append(("Error Job Auto-Cleanup", False)) # Print summary print() print("=" * 80) print("TEST SUMMARY") print("=" * 80) print() for test_name, passed in results: status = "[OK] PASSED" if passed else "[FAIL] FAILED" print(f"{status}: {test_name}") print() all_passed = all(result for _, result in results) if all_passed: print("+" + "=" * 78 + "+") print("|" + " " * 25 + "ALL TESTS PASSED!" + " " * 36 + "|") print("+" + "=" * 78 + "+") print() print("[OK] Auto-cleanup mechanism is working correctly") print("[OK] Jobs are removed 30 seconds after completion") print("[OK] Slots are freed for new jobs") else: print("+" + "=" * 78 + "+") print("|" + " " * 25 + "SOME TESTS FAILED!" + " " * 35 + "|") print("+" + "=" * 78 + "+") print() print(f"Test completed at: {time.strftime('%Y-%m-%d %H:%M:%S')}") print() return 0 if all_passed else 1 if __name__ == "__main__": exit_code = main() sys.exit(exit_code)