HNTAI / services /ai-service /test_cleanup_verification.py
sachinchandrankallar's picture
feat: Add JobManager for asynchronous task tracking with auto-cleanup and a verification test.
56882e1
Raw
History Blame
8.2 kB
"""
Test script to verify the auto-cleanup mechanism for completed jobs.
This script tests that:
1. Jobs are created and tracked correctly
2. Jobs transition to COMPLETED status
3. Jobs are automatically cleaned up after 30 seconds
4. Slots are freed after cleanup
"""
import time
import sys
from pathlib import Path
# Add the src directory to the path
src_path = Path(__file__).parent / "src"
sys.path.insert(0, str(src_path))
from ai_med_extract.services.job_manager import get_job_manager
from ai_med_extract.utils.constants import JOB_STATUS
def test_auto_cleanup_after_completion():
"""Test that completed jobs are automatically cleaned up after 30 seconds."""
print("=" * 80)
print("AUTO-CLEANUP VERIFICATION TEST")
print("=" * 80)
print()
job_manager = get_job_manager()
# Get initial job count
initial_count = job_manager.get_job_count()
print(f"Initial job count: {initial_count}")
print()
# Step 1: Create a test job
print("Step 1: Creating test job...")
job_id = job_manager.create_job(request_id="test-cleanup-001")
print(f"[OK] Created job: {job_id}")
print(f" Current job count: {job_manager.get_job_count()}")
print()
# Step 2: Verify job exists
print("Step 2: Verifying job exists...")
job = job_manager.get_job(job_id)
if job:
print(f"[OK] Job found with status: {job['status']}")
print(f" Created at: {time.strftime('%H:%M:%S', time.localtime(job['created_at']))}")
else:
print("[FAIL] ERROR: Job not found!")
return False
print()
# Step 3: Update job to PROCESSING
print("Step 3: Updating job to PROCESSING...")
job_manager.update_job(job_id, status=JOB_STATUS["PROCESSING"], progress=50)
job = job_manager.get_job(job_id)
print(f"[OK] Job status: {job['status']}, Progress: {job['progress']}%")
print()
# Step 4: Complete the job (this should trigger auto-cleanup timer)
print("Step 4: Completing the job (triggering auto-cleanup)...")
completion_time = time.time()
job_manager.update_job(
job_id,
status=JOB_STATUS["COMPLETED"],
progress=100,
data={"result": "Test summary generated successfully"}
)
job = job_manager.get_job(job_id)
print(f"[OK] Job completed at: {time.strftime('%H:%M:%S', time.localtime(completion_time))}")
print(f" Status: {job['status']}")
print(f" Auto-cleanup scheduled for: {time.strftime('%H:%M:%S', time.localtime(completion_time + 30))}")
print()
# Step 5: Verify job still exists immediately after completion
print("Step 5: Verifying job still exists immediately after completion...")
time.sleep(1) # Wait 1 second
if job_manager.job_exists(job_id):
print(f"[OK] Job still exists (as expected)")
print(f" Current job count: {job_manager.get_job_count()}")
else:
print("[FAIL] ERROR: Job was deleted too early!")
return False
print()
# Step 6: Wait for auto-cleanup (30 seconds)
cleanup_delay = 30
print(f"Step 6: Waiting {cleanup_delay} seconds for auto-cleanup...")
print(" Progress: ", end="", flush=True)
for i in range(cleanup_delay):
time.sleep(1)
# Print progress indicator
if (i + 1) % 5 == 0:
print(f"{i + 1}s ", end="", flush=True)
else:
print(".", end="", flush=True)
print()
print()
# Step 7: Verify job has been cleaned up
print("Step 7: Verifying job has been cleaned up...")
time.sleep(2) # Give a bit of extra time for the cleanup thread to execute
if job_manager.job_exists(job_id):
print("[FAIL] FAILED: Job still exists after cleanup delay!")
job = job_manager.get_job(job_id)
print(f" Job status: {job['status']}")
print(f" Updated at: {time.strftime('%H:%M:%S', time.localtime(job['updated_at']))}")
return False
else:
print(f"[OK] SUCCESS: Job was automatically cleaned up!")
print(f" Current job count: {job_manager.get_job_count()}")
print(f" Slot is now FREE for new jobs")
print()
return True
def test_error_job_cleanup():
"""Test that jobs with ERROR status are also cleaned up."""
print("=" * 80)
print("ERROR JOB CLEANUP TEST")
print("=" * 80)
print()
job_manager = get_job_manager()
# Create a job that will error
print("Creating job that will error...")
job_id = job_manager.create_job(request_id="test-error-001")
print(f"[OK] Created job: {job_id}")
print()
# Mark job as errored
print("Marking job as ERROR (triggering auto-cleanup)...")
error_time = time.time()
job_manager.update_job(
job_id,
status=JOB_STATUS["ERROR"],
error="Test error message",
error_data={"error_type": "TestError"}
)
print(f"[OK] Job errored at: {time.strftime('%H:%M:%S', time.localtime(error_time))}")
print(f" Auto-cleanup scheduled for: {time.strftime('%H:%M:%S', time.localtime(error_time + 30))}")
print()
# Verify job exists immediately
print("Verifying job still exists...")
time.sleep(1)
if job_manager.job_exists(job_id):
print("[OK] Job still exists (as expected)")
else:
print("[FAIL] ERROR: Job was deleted too early!")
return False
print()
# Wait for cleanup
print("Waiting 30 seconds for auto-cleanup...")
print(" Progress: ", end="", flush=True)
for i in range(30):
time.sleep(1)
if (i + 1) % 5 == 0:
print(f"{i + 1}s ", end="", flush=True)
else:
print(".", end="", flush=True)
print()
print()
# Verify cleanup
time.sleep(2)
if job_manager.job_exists(job_id):
print("[FAIL] FAILED: Error job still exists after cleanup delay!")
return False
else:
print("[OK] SUCCESS: Error job was automatically cleaned up!")
print()
return True
def main():
"""Run all cleanup verification tests."""
print("\n")
print("+" + "=" * 78 + "+")
print("|" + " " * 20 + "JOB CLEANUP VERIFICATION SUITE" + " " * 28 + "|")
print("+" + "=" * 78 + "+")
print()
print(f"Test started at: {time.strftime('%Y-%m-%d %H:%M:%S')}")
print()
results = []
# Test 1: Completed job cleanup
try:
result1 = test_auto_cleanup_after_completion()
results.append(("Completed Job Auto-Cleanup", result1))
except Exception as e:
print(f"[FAIL] Test failed with exception: {e}")
import traceback
traceback.print_exc()
results.append(("Completed Job Auto-Cleanup", False))
print()
# Test 2: Error job cleanup
try:
result2 = test_error_job_cleanup()
results.append(("Error Job Auto-Cleanup", result2))
except Exception as e:
print(f"[FAIL] Test failed with exception: {e}")
import traceback
traceback.print_exc()
results.append(("Error Job Auto-Cleanup", False))
# Print summary
print()
print("=" * 80)
print("TEST SUMMARY")
print("=" * 80)
print()
for test_name, passed in results:
status = "[OK] PASSED" if passed else "[FAIL] FAILED"
print(f"{status}: {test_name}")
print()
all_passed = all(result for _, result in results)
if all_passed:
print("+" + "=" * 78 + "+")
print("|" + " " * 25 + "ALL TESTS PASSED!" + " " * 36 + "|")
print("+" + "=" * 78 + "+")
print()
print("[OK] Auto-cleanup mechanism is working correctly")
print("[OK] Jobs are removed 30 seconds after completion")
print("[OK] Slots are freed for new jobs")
else:
print("+" + "=" * 78 + "+")
print("|" + " " * 25 + "SOME TESTS FAILED!" + " " * 35 + "|")
print("+" + "=" * 78 + "+")
print()
print(f"Test completed at: {time.strftime('%Y-%m-%d %H:%M:%S')}")
print()
return 0 if all_passed else 1
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)