""" Test script to demonstrate automatic job cleanup functionality. This script shows: 1. Jobs are automatically cleaned up after reaching terminal states (completed/error) 2. The cleanup happens after a configurable delay (default 30 seconds) 3. Manual cleanup of terminal jobs is also available """ import sys import os import time # Add the src directory to the path sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) from ai_med_extract.services.job_manager import get_job_manager from ai_med_extract.utils.constants import JOB_STATUS def print_job_status(): """Print current job status.""" job_manager = get_job_manager() all_jobs = job_manager.get_all_jobs() print(f"\n{'='*60}") print(f"Total jobs in memory: {len(all_jobs)}") print(f"{'='*60}") if all_jobs: for job_id, job in all_jobs.items(): status = job.get('status', 'unknown') created = time.time() - job.get('created_at', time.time()) updated = time.time() - job.get('updated_at', time.time()) print(f" Job {job_id[:8]}... | Status: {status:12} | Created: {created:5.1f}s ago | Updated: {updated:5.1f}s ago") else: print(" No jobs in memory") print(f"{'='*60}\n") def test_auto_cleanup(): """Test automatic cleanup of terminal jobs.""" print("\n" + "="*60) print("TEST: Automatic Job Cleanup") print("="*60) job_manager = get_job_manager() # Set a shorter cleanup delay for testing (5 seconds instead of 30) job_manager._terminal_job_cleanup_delay = 5 print("\n1. Creating 6 test jobs...") job_ids = [] for i in range(6): job_id = job_manager.create_job(request_id=f"test-{i}", initial_data={'test': i}) job_ids.append(job_id) print(f" Created job {i+1}: {job_id[:8]}...") print_job_status() print("2. Marking jobs as completed/errored...") # Mark first 3 as completed for i in range(3): job_manager.update_job(job_ids[i], status=JOB_STATUS["COMPLETED"], progress=100) print(f" Job {i+1} marked as COMPLETED") # Mark next 2 as error for i in range(3, 5): job_manager.update_job(job_ids[i], status=JOB_STATUS["ERROR"], error="Test error") print(f" Job {i+1} marked as ERROR") # Leave last one as queued print(f" Job 6 left as QUEUED") print_job_status() print(f"3. Waiting {job_manager._terminal_job_cleanup_delay} seconds for auto-cleanup...") for i in range(job_manager._terminal_job_cleanup_delay): time.sleep(1) print(f" {i+1}s...", end='\r') print() # Give a bit more time for the timer threads to execute time.sleep(1) print("\n4. After auto-cleanup:") print_job_status() print("[OK] Expected: Only 1 job remaining (the QUEUED one)") print(f"[OK] Actual: {job_manager.get_job_count()} job(s) remaining") # Cleanup remaining job job_manager.delete_job(job_ids[5]) def test_manual_cleanup(): """Test manual cleanup of terminal jobs.""" print("\n" + "="*60) print("TEST: Manual Terminal Job Cleanup") print("="*60) job_manager = get_job_manager() # Disable auto-cleanup for this test job_manager._auto_cleanup_enabled = False print("\n1. Creating 5 test jobs...") job_ids = [] for i in range(5): job_id = job_manager.create_job(request_id=f"manual-test-{i}", initial_data={'test': i}) job_ids.append(job_id) print_job_status() print("2. Marking jobs with different statuses...") job_manager.update_job(job_ids[0], status=JOB_STATUS["COMPLETED"], progress=100) job_manager.update_job(job_ids[1], status=JOB_STATUS["ERROR"], error="Test error") job_manager.update_job(job_ids[2], status=JOB_STATUS["COMPLETED"], progress=100) job_manager.update_job(job_ids[3], status=JOB_STATUS["PROCESSING"], progress=50) # job_ids[4] stays as QUEUED print_job_status() print("3. Calling cleanup_terminal_jobs()...") cleaned = job_manager.cleanup_terminal_jobs() print(f" Cleaned up {cleaned} terminal jobs") print("\n4. After manual cleanup:") print_job_status() print("[OK] Expected: 2 jobs remaining (PROCESSING and QUEUED)") print(f"[OK] Actual: {job_manager.get_job_count()} job(s) remaining") # Cleanup remaining jobs for job_id in job_ids[3:]: job_manager.delete_job(job_id) # Re-enable auto-cleanup job_manager._auto_cleanup_enabled = True if __name__ == "__main__": print("\n" + "="*60) print("JOB CLEANUP DEMONSTRATION") print("="*60) # Test 1: Automatic cleanup test_auto_cleanup() # Test 2: Manual cleanup test_manual_cleanup() print("\n" + "="*60) print("ALL TESTS COMPLETED") print("="*60) print("\nSummary:") print(" [OK] Automatic cleanup: Jobs are deleted 30s after reaching terminal state") print(" [OK] Manual cleanup: cleanup_terminal_jobs() removes all completed/errored jobs") print(" [OK] This prevents queue buildup and 'always queued' issues") print("="*60 + "\n")