Spaces:
Paused
Paused
feat: Add JobManager for asynchronous task tracking with auto-cleanup and a verification test.
56882e1 | """ | |
| Test script to verify the auto-cleanup mechanism for completed jobs. | |
| This script tests that: | |
| 1. Jobs are created and tracked correctly | |
| 2. Jobs transition to COMPLETED status | |
| 3. Jobs are automatically cleaned up after 30 seconds | |
| 4. Slots are freed after cleanup | |
| """ | |
| import time | |
| import sys | |
| from pathlib import Path | |
| # Add the src directory to the path | |
| src_path = Path(__file__).parent / "src" | |
| sys.path.insert(0, str(src_path)) | |
| from ai_med_extract.services.job_manager import get_job_manager | |
| from ai_med_extract.utils.constants import JOB_STATUS | |
| def test_auto_cleanup_after_completion(): | |
| """Test that completed jobs are automatically cleaned up after 30 seconds.""" | |
| print("=" * 80) | |
| print("AUTO-CLEANUP VERIFICATION TEST") | |
| print("=" * 80) | |
| print() | |
| job_manager = get_job_manager() | |
| # Get initial job count | |
| initial_count = job_manager.get_job_count() | |
| print(f"Initial job count: {initial_count}") | |
| print() | |
| # Step 1: Create a test job | |
| print("Step 1: Creating test job...") | |
| job_id = job_manager.create_job(request_id="test-cleanup-001") | |
| print(f"[OK] Created job: {job_id}") | |
| print(f" Current job count: {job_manager.get_job_count()}") | |
| print() | |
| # Step 2: Verify job exists | |
| print("Step 2: Verifying job exists...") | |
| job = job_manager.get_job(job_id) | |
| if job: | |
| print(f"[OK] Job found with status: {job['status']}") | |
| print(f" Created at: {time.strftime('%H:%M:%S', time.localtime(job['created_at']))}") | |
| else: | |
| print("[FAIL] ERROR: Job not found!") | |
| return False | |
| print() | |
| # Step 3: Update job to PROCESSING | |
| print("Step 3: Updating job to PROCESSING...") | |
| job_manager.update_job(job_id, status=JOB_STATUS["PROCESSING"], progress=50) | |
| job = job_manager.get_job(job_id) | |
| print(f"[OK] Job status: {job['status']}, Progress: {job['progress']}%") | |
| print() | |
| # Step 4: Complete the job (this should trigger auto-cleanup timer) | |
| print("Step 4: Completing the job (triggering auto-cleanup)...") | |
| completion_time = time.time() | |
| job_manager.update_job( | |
| job_id, | |
| status=JOB_STATUS["COMPLETED"], | |
| progress=100, | |
| data={"result": "Test summary generated successfully"} | |
| ) | |
| job = job_manager.get_job(job_id) | |
| print(f"[OK] Job completed at: {time.strftime('%H:%M:%S', time.localtime(completion_time))}") | |
| print(f" Status: {job['status']}") | |
| print(f" Auto-cleanup scheduled for: {time.strftime('%H:%M:%S', time.localtime(completion_time + 30))}") | |
| print() | |
| # Step 5: Verify job still exists immediately after completion | |
| print("Step 5: Verifying job still exists immediately after completion...") | |
| time.sleep(1) # Wait 1 second | |
| if job_manager.job_exists(job_id): | |
| print(f"[OK] Job still exists (as expected)") | |
| print(f" Current job count: {job_manager.get_job_count()}") | |
| else: | |
| print("[FAIL] ERROR: Job was deleted too early!") | |
| return False | |
| print() | |
| # Step 6: Wait for auto-cleanup (30 seconds) | |
| cleanup_delay = 30 | |
| print(f"Step 6: Waiting {cleanup_delay} seconds for auto-cleanup...") | |
| print(" Progress: ", end="", flush=True) | |
| for i in range(cleanup_delay): | |
| time.sleep(1) | |
| # Print progress indicator | |
| if (i + 1) % 5 == 0: | |
| print(f"{i + 1}s ", end="", flush=True) | |
| else: | |
| print(".", end="", flush=True) | |
| print() | |
| print() | |
| # Step 7: Verify job has been cleaned up | |
| print("Step 7: Verifying job has been cleaned up...") | |
| time.sleep(2) # Give a bit of extra time for the cleanup thread to execute | |
| if job_manager.job_exists(job_id): | |
| print("[FAIL] FAILED: Job still exists after cleanup delay!") | |
| job = job_manager.get_job(job_id) | |
| print(f" Job status: {job['status']}") | |
| print(f" Updated at: {time.strftime('%H:%M:%S', time.localtime(job['updated_at']))}") | |
| return False | |
| else: | |
| print(f"[OK] SUCCESS: Job was automatically cleaned up!") | |
| print(f" Current job count: {job_manager.get_job_count()}") | |
| print(f" Slot is now FREE for new jobs") | |
| print() | |
| return True | |
| def test_error_job_cleanup(): | |
| """Test that jobs with ERROR status are also cleaned up.""" | |
| print("=" * 80) | |
| print("ERROR JOB CLEANUP TEST") | |
| print("=" * 80) | |
| print() | |
| job_manager = get_job_manager() | |
| # Create a job that will error | |
| print("Creating job that will error...") | |
| job_id = job_manager.create_job(request_id="test-error-001") | |
| print(f"[OK] Created job: {job_id}") | |
| print() | |
| # Mark job as errored | |
| print("Marking job as ERROR (triggering auto-cleanup)...") | |
| error_time = time.time() | |
| job_manager.update_job( | |
| job_id, | |
| status=JOB_STATUS["ERROR"], | |
| error="Test error message", | |
| error_data={"error_type": "TestError"} | |
| ) | |
| print(f"[OK] Job errored at: {time.strftime('%H:%M:%S', time.localtime(error_time))}") | |
| print(f" Auto-cleanup scheduled for: {time.strftime('%H:%M:%S', time.localtime(error_time + 30))}") | |
| print() | |
| # Verify job exists immediately | |
| print("Verifying job still exists...") | |
| time.sleep(1) | |
| if job_manager.job_exists(job_id): | |
| print("[OK] Job still exists (as expected)") | |
| else: | |
| print("[FAIL] ERROR: Job was deleted too early!") | |
| return False | |
| print() | |
| # Wait for cleanup | |
| print("Waiting 30 seconds for auto-cleanup...") | |
| print(" Progress: ", end="", flush=True) | |
| for i in range(30): | |
| time.sleep(1) | |
| if (i + 1) % 5 == 0: | |
| print(f"{i + 1}s ", end="", flush=True) | |
| else: | |
| print(".", end="", flush=True) | |
| print() | |
| print() | |
| # Verify cleanup | |
| time.sleep(2) | |
| if job_manager.job_exists(job_id): | |
| print("[FAIL] FAILED: Error job still exists after cleanup delay!") | |
| return False | |
| else: | |
| print("[OK] SUCCESS: Error job was automatically cleaned up!") | |
| print() | |
| return True | |
| def main(): | |
| """Run all cleanup verification tests.""" | |
| print("\n") | |
| print("+" + "=" * 78 + "+") | |
| print("|" + " " * 20 + "JOB CLEANUP VERIFICATION SUITE" + " " * 28 + "|") | |
| print("+" + "=" * 78 + "+") | |
| print() | |
| print(f"Test started at: {time.strftime('%Y-%m-%d %H:%M:%S')}") | |
| print() | |
| results = [] | |
| # Test 1: Completed job cleanup | |
| try: | |
| result1 = test_auto_cleanup_after_completion() | |
| results.append(("Completed Job Auto-Cleanup", result1)) | |
| except Exception as e: | |
| print(f"[FAIL] Test failed with exception: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| results.append(("Completed Job Auto-Cleanup", False)) | |
| print() | |
| # Test 2: Error job cleanup | |
| try: | |
| result2 = test_error_job_cleanup() | |
| results.append(("Error Job Auto-Cleanup", result2)) | |
| except Exception as e: | |
| print(f"[FAIL] Test failed with exception: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| results.append(("Error Job Auto-Cleanup", False)) | |
| # Print summary | |
| print() | |
| print("=" * 80) | |
| print("TEST SUMMARY") | |
| print("=" * 80) | |
| print() | |
| for test_name, passed in results: | |
| status = "[OK] PASSED" if passed else "[FAIL] FAILED" | |
| print(f"{status}: {test_name}") | |
| print() | |
| all_passed = all(result for _, result in results) | |
| if all_passed: | |
| print("+" + "=" * 78 + "+") | |
| print("|" + " " * 25 + "ALL TESTS PASSED!" + " " * 36 + "|") | |
| print("+" + "=" * 78 + "+") | |
| print() | |
| print("[OK] Auto-cleanup mechanism is working correctly") | |
| print("[OK] Jobs are removed 30 seconds after completion") | |
| print("[OK] Slots are freed for new jobs") | |
| else: | |
| print("+" + "=" * 78 + "+") | |
| print("|" + " " * 25 + "SOME TESTS FAILED!" + " " * 35 + "|") | |
| print("+" + "=" * 78 + "+") | |
| print() | |
| print(f"Test completed at: {time.strftime('%Y-%m-%d %H:%M:%S')}") | |
| print() | |
| return 0 if all_passed else 1 | |
| if __name__ == "__main__": | |
| exit_code = main() | |
| sys.exit(exit_code) | |