Spaces:
Runtime error
Runtime error
| """ | |
| HF Spaces application for WebAI verification worker. | |
| Uses ZeroGPU for on-demand GPU processing with webAI-ColVec1-4b. | |
| """ | |
| import os | |
| import json | |
| import time | |
| import hashlib | |
| import gc | |
| from typing import Dict, Any, Optional, List | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| import gradio as gr | |
| import spaces | |
| import torch | |
| from transformers import AutoModel, AutoProcessor, BitsAndBytesConfig | |
| def env_bool(name: str, default: bool = False) -> bool: | |
| """Parse boolean from environment variable""" | |
| value = os.getenv(name, "").lower() | |
| return value in ("true", "1", "yes", "on") if value else default | |
| import psycopg | |
| from psycopg.rows import dict_row | |
| import httpx | |
| from PIL import Image | |
| import io | |
| # Configuration | |
| class WebAIConfig: | |
| """Configuration for WebAI verification worker""" | |
| database_url: str = os.getenv("DATABASE_URL", "").strip() | |
| supabase_url: str = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "").rstrip("/") | |
| supabase_service_role_key: str = os.getenv("SUPABASE_SERVICE_ROLE_KEY", "").strip() | |
| storage_bucket: str = os.getenv("SUPABASE_STORAGE_BUCKET", "tender-documents") | |
| # WebAI Model Configuration | |
| # Using 4b model weights with 9b processor (shared Qwen 3.5 architecture) | |
| model_name: str = "webAI-Official/webAI-ColVec1-4b" | |
| processor_name: str = "webAI-Official/webAI-ColVec1-9b" # Has working config | |
| use_8bit_quantization: bool = True | |
| use_4bit_quantization: bool = False | |
| use_flash_attention_2: bool = True | |
| max_new_tokens: int = 512 | |
| image_size: int = 336 # webAI models typically use 336x336 | |
| pdf_dpi: int = int(os.getenv("PDF_DPI", "200")) # High DPI for better extraction | |
| adaptive_dpi: bool = env_bool("ADAPTIVE_DPI", True) # Scale DPI based on memory | |
| # Processing Configuration | |
| batch_size: int = 1 | |
| timeout_seconds: int = 300 | |
| def __post_init__(self): | |
| if not self.database_url: | |
| raise ValueError("DATABASE_URL is required") | |
| if not self.supabase_url or not self.supabase_service_role_key: | |
| raise ValueError("Supabase credentials are required") | |
| # Global configuration | |
| CONFIG = WebAIConfig() | |
| # Model loading (will be moved to CPU initially) | |
| model = None | |
| processor = None | |
| def log_event(event: str, payload: Dict[str, Any]) -> None: | |
| """Structured logging for monitoring""" | |
| record = { | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "event": event, | |
| "service": "webai-verification-worker", | |
| **payload | |
| } | |
| print(json.dumps(record, default=str), flush=True) | |
| def setup_model(): | |
| """Initialize WebAI model with memory optimization""" | |
| global model, processor | |
| try: | |
| log_event("model.setup.start", {"model": CONFIG.model_name}) | |
| # Configure quantization for memory efficiency | |
| quantization_config = None | |
| if CONFIG.use_8bit_quantization: | |
| quantization_config = BitsAndBytesConfig( | |
| load_in_8bit=True, | |
| ) | |
| elif CONFIG.use_4bit_quantization: | |
| quantization_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.float16, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_use_double_quant=True, | |
| ) | |
| # Load processor from 9b repo (has working config) + model from 4b repo | |
| log_event("model.setup.using_processor_workaround", { | |
| "model": CONFIG.model_name, | |
| "processor_source": CONFIG.processor_name | |
| }) | |
| processor = AutoProcessor.from_pretrained( | |
| CONFIG.processor_name, # Use 9b config | |
| trust_remote_code=True, | |
| ) | |
| # Note: processor is tied to model architecture, both use Qwen 3.5-VL base | |
| # Configure model kwargs for FlashAttention-2 | |
| model_kwargs = { | |
| "quantization_config": quantization_config, | |
| "device_map": "auto", # Will map to CPU initially | |
| "trust_remote_code": True, | |
| "low_cpu_mem_usage": True, | |
| } | |
| # Add FlashAttention-2 configuration if enabled | |
| if CONFIG.use_flash_attention_2: | |
| try: | |
| # Check if FlashAttention-2 is available | |
| import flash_attn | |
| model_kwargs.update({ | |
| "attn_implementation": "flash_attention_2", | |
| "torch_dtype": torch.float16, # FlashAttention-2 works best with fp16 | |
| }) | |
| log_event("flash_attention.enabled", {"version": getattr(flash_attn, '__version__', 'unknown')}) | |
| except ImportError: | |
| log_event("flash_attention.unavailable", {"reason": "flash_attn not installed"}) | |
| # Fallback to standard attention | |
| model_kwargs["torch_dtype"] = torch.float16 | |
| # Load model with memory optimization | |
| model = AutoModel.from_pretrained( | |
| CONFIG.model_name, | |
| **model_kwargs | |
| ) | |
| quantization_type = "8bit" if CONFIG.use_8bit_quantization else ("4bit" if CONFIG.use_4bit_quantization else "none") | |
| flash_attention_enabled = CONFIG.use_flash_attention_2 and "flash_attn" in globals() | |
| log_event("model.setup.success", { | |
| "model": CONFIG.model_name, | |
| "quantization": quantization_type, | |
| "flash_attention_2": flash_attention_enabled, | |
| "memory_optimization": "8bit_quantization" + ("_flash_attention" if flash_attention_enabled else "") | |
| }) | |
| except Exception as e: | |
| log_event("model.setup.failed", {"error": str(e)}) | |
| raise | |
| def connect_database() -> psycopg.Connection: | |
| """Connect to PostgreSQL database""" | |
| try: | |
| return psycopg.connect( | |
| CONFIG.database_url, | |
| autocommit=False, | |
| prepare_threshold=None, | |
| ) | |
| except Exception as e: | |
| log_event("db.connect.failed", {"error": str(e)}) | |
| raise | |
| def download_document(storage_path: str) -> bytes: | |
| """Download document from Supabase storage""" | |
| from urllib.parse import quote | |
| encoded_path = quote(storage_path, safe="/") | |
| url = f"{CONFIG.supabase_url}/storage/v1/object/{CONFIG.storage_bucket}/{encoded_path}" | |
| headers = { | |
| "Authorization": f"Bearer {CONFIG.supabase_service_role_key}", | |
| "apikey": CONFIG.supabase_service_role_key, | |
| } | |
| with httpx.Client(timeout=90.0, follow_redirects=True) as client: | |
| response = client.get(url, headers=headers) | |
| if response.status_code != 200: | |
| raise RuntimeError(f"Failed to download document: {response.status_code}") | |
| return response.content | |
| def prepare_image_for_model(document_bytes: bytes, mime_type: str) -> Image.Image: | |
| """Convert document to image format for WebAI model with high DPI for better extraction""" | |
| if mime_type == "application/pdf": | |
| # For PDFs, use high DPI conversion for better text extraction | |
| try: | |
| from pdf2image import convert_from_bytes | |
| # Get optimal DPI based on available memory | |
| optimal_dpi = get_optimal_dpi(CONFIG) | |
| log_event("pdf.conversion.start", { | |
| "dpi": optimal_dpi, | |
| "adaptive": CONFIG.adaptive_dpi | |
| }) | |
| # Convert with high DPI for better text extraction | |
| images = convert_from_bytes( | |
| document_bytes, | |
| dpi=optimal_dpi, | |
| first_page=True, | |
| fmt='JPEG', | |
| thread_count=1 # Reduce memory usage | |
| ) | |
| if images: | |
| img = images[0] | |
| # Log image dimensions for monitoring | |
| original_size = img.size | |
| estimated_size_mb = (original_size[0] * original_size[1] * 3) / (1024**2) | |
| log_event("pdf.converted", { | |
| "original_size": original_size, | |
| "estimated_size_mb": estimated_size_mb, | |
| "dpi_used": optimal_dpi | |
| }) | |
| # Resize to model's expected size while preserving quality | |
| img = img.resize((CONFIG.image_size, CONFIG.image_size), Image.Resampling.LANCZOS) | |
| return img | |
| except ImportError: | |
| log_event("pdf.fallback", {"reason": "pdf2image not available"}) | |
| return create_placeholder_image() | |
| except Exception as e: | |
| log_event("pdf.conversion.failed", {"error": str(e)}) | |
| # Fallback to lower DPI if high DPI fails | |
| try: | |
| from pdf2image import convert_from_bytes | |
| images = convert_from_bytes(document_bytes, dpi=150, first_page=True) | |
| if images: | |
| img = images[0].resize((CONFIG.image_size, CONFIG.image_size), Image.Resampling.LANCZOS) | |
| return img | |
| except Exception: | |
| return create_placeholder_image() | |
| # For image formats, convert and resize | |
| try: | |
| img = Image.open(io.BytesIO(document_bytes)) | |
| # For existing images, check if they need enhancement | |
| original_size = img.size | |
| if max(original_size) < 1000: # Small image, might be low quality | |
| log_event("image.low_resolution", {"size": original_size}) | |
| img = img.resize((CONFIG.image_size, CONFIG.image_size), Image.Resampling.LANCZOS) | |
| return img.convert("RGB") | |
| except Exception as e: | |
| log_event("image.convert.failed", {"error": str(e)}) | |
| return create_placeholder_image() | |
| def create_placeholder_image() -> Image.Image: | |
| """Create a placeholder image when conversion fails""" | |
| from PIL import Image, ImageDraw, ImageFont | |
| img = Image.new('RGB', (CONFIG.image_size, CONFIG.image_size), color='white') | |
| draw = ImageDraw.Draw(img) | |
| # Draw placeholder text | |
| try: | |
| font = ImageFont.load_default() | |
| text = "Document\nProcessing\nError" | |
| bbox = draw.textbbox((0, 0), text, font=font) | |
| text_width = bbox[2] - bbox[0] | |
| text_height = bbox[3] - bbox[1] | |
| x = (CONFIG.image_size - text_width) // 2 | |
| y = (CONFIG.image_size - text_height) // 2 | |
| draw.text((x, y), text, fill='black', font=font) | |
| except Exception: | |
| pass # Font loading failed | |
| return img | |
| def process_with_webai(image: Image.Image, prompt: str) -> Dict[str, Any]: | |
| """Process document with WebAI model on GPU with aggressive memory cleanup""" | |
| try: | |
| # Prepare inputs | |
| inputs = processor( | |
| images=image, | |
| text=prompt, | |
| return_tensors="pt" | |
| ).to("cuda") | |
| # Generate response | |
| with torch.no_grad(): | |
| outputs = model.generate( | |
| **inputs, | |
| max_new_tokens=CONFIG.max_new_tokens, | |
| do_sample=True, | |
| temperature=0.1, | |
| pad_token_id=processor.tokenizer.eos_token_id | |
| ) | |
| # Decode response | |
| response_text = processor.tokenizer.decode( | |
| outputs[0], | |
| skip_special_tokens=True | |
| ) | |
| # Clean up GPU memory | |
| del inputs, outputs | |
| torch.cuda.empty_cache() | |
| # Aggressive memory cleanup to prevent ghost memory from vision tensors | |
| aggressive_memory_cleanup() | |
| return {"response": response_text, "status": "success"} | |
| except Exception as e: | |
| log_event("webai.process.failed", {"error": str(e)}) | |
| # Still attempt cleanup even on failure | |
| try: | |
| aggressive_memory_cleanup() | |
| except Exception: | |
| pass | |
| return {"response": "", "status": "failed", "error": str(e)} | |
| def build_webai_prompt(tender_context: Dict[str, Any]) -> str: | |
| """Build prompt for WebAI model analysis""" | |
| return f""" | |
| Analyze this tender document and provide structured JSON output. | |
| Context: | |
| - Source: {tender_context.get('source_filename', 'Unknown')} | |
| - Organization: {tender_context.get('organization_id', 'Unknown')} | |
| Provide analysis in this exact JSON format: | |
| {{ | |
| "tenderTitle": "string", | |
| "procuringEntity": "string", | |
| "tenderCategory": "string", | |
| "submissionDeadline": "YYYY-MM-DD", | |
| "mandatoryDocuments": ["string"], | |
| "eligibilityCriteria": ["string"], | |
| "hardBlockers": ["string"], | |
| "risks": ["string"], | |
| "ambiguities": ["string"], | |
| "confidence": 0.0, | |
| "summary": "string", | |
| "complianceItems": [ | |
| {{ | |
| "title": "string", | |
| "requirementText": "string", | |
| "status": "PASS|FAIL|UNKNOWN", | |
| "severity": "LOW|MEDIUM|HIGH|CRITICAL", | |
| "remediation": "string", | |
| "pageReference": 1 | |
| }} | |
| ], | |
| "bidScore": {{ | |
| "score": 0, | |
| "decision": "BID|NO_BID|REVIEW", | |
| "blockers": ["string"], | |
| "confidence": 0.0, | |
| "explanation": "string" | |
| }} | |
| }} | |
| Focus on accuracy and completeness. Extract all visible requirements and criteria. | |
| """ | |
| def parse_webai_response(response_text: str) -> Dict[str, Any]: | |
| """Parse WebAI model response into structured format""" | |
| try: | |
| # Try to extract JSON from response | |
| import re | |
| # Look for JSON pattern | |
| json_match = re.search(r'\{.*\}', response_text, re.DOTALL) | |
| if json_match: | |
| json_str = json_match.group(0) | |
| return json.loads(json_str) | |
| else: | |
| # Fallback: try to parse entire response | |
| return json.loads(response_text) | |
| except json.JSONDecodeError: | |
| # Return error structure if parsing fails | |
| return { | |
| "error": "JSON_PARSE_ERROR", | |
| "raw_response": response_text[:500], # First 500 chars | |
| "confidence": 0.0, | |
| "summary": "Failed to parse WebAI response" | |
| } | |
| def compare_analyses(primary_analysis: Dict[str, Any], webai_analysis: Dict[str, Any]) -> Dict[str, Any]: | |
| """Compare primary worker and WebAI analysis results""" | |
| comparison = { | |
| "agreement_score": 0.0, | |
| "differences": [], | |
| "confidence_comparison": {}, | |
| "recommendation_comparison": {}, | |
| "timestamp": datetime.now(timezone.utc).isoformat() | |
| } | |
| try: | |
| # Compare bid decisions | |
| primary_decision = primary_analysis.get("bidScore", {}).get("decision", "REVIEW") | |
| webai_decision = webai_analysis.get("bidScore", {}).get("decision", "REVIEW") | |
| comparison["recommendation_comparison"] = { | |
| "primary": primary_decision, | |
| "webai": webai_decision, | |
| "agrees": primary_decision == webai_decision | |
| } | |
| # Compare confidence scores | |
| primary_conf = primary_analysis.get("confidence", 0.0) | |
| webai_conf = webai_analysis.get("confidence", 0.0) | |
| comparison["confidence_comparison"] = { | |
| "primary": primary_conf, | |
| "webai": webai_conf, | |
| "difference": abs(primary_conf - webai_conf) | |
| } | |
| # Calculate overall agreement score | |
| agreement_factors = [] | |
| # Bid decision agreement (weight: 0.4) | |
| agreement_factors.append(1.0 if primary_decision == webai_decision else 0.0) | |
| # Confidence similarity (weight: 0.3) | |
| conf_similarity = 1.0 - min(abs(primary_conf - webai_conf), 1.0) | |
| agreement_factors.append(conf_similarity) | |
| # Category agreement (weight: 0.2) | |
| primary_cat = primary_analysis.get("tenderCategory", "").lower() | |
| webai_cat = webai_analysis.get("tenderCategory", "").lower() | |
| category_similarity = 1.0 if primary_cat == webai_cat else 0.5 # Partial credit for similar categories | |
| agreement_factors.append(category_similarity) | |
| # Deadline agreement (weight: 0.1) | |
| primary_deadline = primary_analysis.get("submissionDeadline", "") | |
| webai_deadline = webai_analysis.get("submissionDeadline", "") | |
| deadline_similarity = 1.0 if primary_deadline == webai_deadline else 0.0 | |
| agreement_factors.append(deadline_similarity) | |
| # Weighted average | |
| weights = [0.4, 0.3, 0.2, 0.1] | |
| comparison["agreement_score"] = sum(f * w for f, w in zip(agreement_factors, weights)) | |
| # Identify key differences | |
| if primary_decision != webai_decision: | |
| comparison["differences"].append({ | |
| "type": "bid_decision", | |
| "primary": primary_decision, | |
| "webai": webai_decision, | |
| "severity": "HIGH" | |
| }) | |
| if abs(primary_conf - webai_conf) > 0.3: | |
| comparison["differences"].append({ | |
| "type": "confidence", | |
| "primary": primary_conf, | |
| "webai": webai_conf, | |
| "severity": "MEDIUM" | |
| }) | |
| except Exception as e: | |
| log_event("comparison.failed", {"error": str(e)}) | |
| comparison["error"] = str(e) | |
| return comparison | |
| def store_verification_result( | |
| conn: psycopg.Connection, | |
| tender_id: str, | |
| webai_analysis: Dict[str, Any], | |
| comparison: Dict[str, Any] | |
| ) -> None: | |
| """Store verification results in database""" | |
| try: | |
| with conn.transaction(): | |
| with conn.cursor() as cur: | |
| # Store WebAI analysis | |
| cur.execute(""" | |
| INSERT INTO public.webai_verifications | |
| (tender_id, analysis, comparison, created_at) | |
| VALUES (%s, %s::jsonb, %s::jsonb, now()) | |
| """, ( | |
| tender_id, | |
| json.dumps(webai_analysis), | |
| json.dumps(comparison) | |
| )) | |
| # Update tender with verification status | |
| cur.execute(""" | |
| UPDATE public.tenders | |
| SET verification_status = 'COMPLETED', | |
| verification_score = %s, | |
| updated_at = now() | |
| WHERE id = %s | |
| """, (comparison["agreement_score"], tender_id)) | |
| except Exception as e: | |
| log_event("verification.store.failed", {"error": str(e), "tender_id": tender_id}) | |
| raise | |
| def claim_verification_job(conn: psycopg.Connection) -> Optional[Dict[str, Any]]: | |
| """Claim a job for verification processing""" | |
| with conn.transaction(): | |
| with conn.cursor(row_factory=dict_row) as cur: | |
| cur.execute(""" | |
| SELECT j.id, j.tender_id, j.payload | |
| FROM public.processing_jobs j | |
| JOIN public.tenders t ON j.tender_id = t.id | |
| WHERE j.job_type = 'VERIFY' | |
| AND j.status = 'QUEUED' | |
| AND j.available_at <= now() | |
| AND t.status = 'ANALYSIS_READY' | |
| ORDER BY j.created_at ASC | |
| FOR UPDATE SKIP LOCKED | |
| LIMIT 1 | |
| """) | |
| job = cur.fetchone() | |
| if job: | |
| cur.execute(""" | |
| UPDATE public.processing_jobs | |
| SET status = 'RUNNING', | |
| locked_at = now(), | |
| attempt_count = attempt_count + 1, | |
| updated_at = now() | |
| WHERE id = %s | |
| """, (job["id"],)) | |
| return job | |
| def process_verification_job(job: Dict[str, Any]) -> None: | |
| """Process a single verification job with aggressive memory cleanup""" | |
| job_id = job["id"] | |
| tender_id = job["tender_id"] | |
| try: | |
| with connect_database() as conn: | |
| with conn.cursor(row_factory=dict_row) as cur: | |
| # Get tender context | |
| cur.execute(""" | |
| SELECT t.id, t.organization_id, t.source_filename, | |
| COALESCE(d.storage_path, t.storage_path) as storage_path, | |
| e.structured_output as primary_analysis | |
| FROM public.tenders t | |
| LEFT JOIN public.documents d ON d.tender_id = t.id | |
| LEFT JOIN public.extractions e ON e.tender_id = t.id | |
| WHERE t.id = %s | |
| """, (tender_id,)) | |
| context = cur.fetchone() | |
| if not context: | |
| raise RuntimeError(f"Tender {tender_id} not found") | |
| if not context.get("primary_analysis"): | |
| raise RuntimeError(f"Primary analysis not available for tender {tender_id}") | |
| # Download and process document | |
| document_bytes = download_document(context["storage_path"]) | |
| mime_type = infer_mime_type(context["storage_path"]) | |
| # Prepare image for WebAI | |
| image = prepare_image_for_model(document_bytes, mime_type) | |
| # Build prompt and process | |
| prompt = build_webai_prompt(context) | |
| webai_result = process_with_webai(image, prompt) | |
| if webai_result["status"] != "success": | |
| raise RuntimeError(f"WebAI processing failed: {webai_result.get('error', 'Unknown error')}") | |
| # Parse response | |
| webai_analysis = parse_webai_response(webai_result["response"]) | |
| if "error" in webai_analysis: | |
| raise RuntimeError(f"WebAI response parsing failed: {webai_analysis['error']}") | |
| # Compare with primary analysis | |
| comparison = compare_analyses( | |
| context["primary_analysis"], | |
| webai_analysis | |
| ) | |
| # Store results | |
| store_verification_result(conn, tender_id, webai_analysis, comparison) | |
| # Mark job as completed | |
| cur.execute(""" | |
| UPDATE public.processing_jobs | |
| SET status = 'SUCCEEDED', | |
| updated_at = now() | |
| WHERE id = %s | |
| """, (job_id,)) | |
| conn.commit() | |
| log_event("verification.completed", { | |
| "job_id": job_id, | |
| "tender_id": tender_id, | |
| "agreement_score": comparison["agreement_score"] | |
| }) | |
| except Exception as e: | |
| log_event("verification.failed", { | |
| "job_id": job_id, | |
| "tender_id": tender_id, | |
| "error": str(e) | |
| }) | |
| # Mark job as failed | |
| try: | |
| with connect_database() as conn: | |
| with conn.cursor() as cur: | |
| cur.execute(""" | |
| UPDATE public.processing_jobs | |
| SET status = 'FAILED', | |
| last_error = %s, | |
| updated_at = now() | |
| WHERE id = %s | |
| """, (str(e)[:2000], job_id)) | |
| conn.commit() | |
| except Exception: | |
| pass | |
| finally: | |
| # Aggressive cleanup after every job to prevent ghost memory | |
| try: | |
| aggressive_memory_cleanup() | |
| except Exception: | |
| pass | |
| def get_optimal_dpi(config: WebAIConfig) -> int: | |
| """Calculate optimal DPI based on available memory and configuration""" | |
| if not config.adaptive_dpi: | |
| return config.pdf_dpi | |
| try: | |
| # Check available GPU memory | |
| if torch.cuda.is_available(): | |
| gpu_memory_gb = torch.cuda.get_device_properties(0).total_memory / (1024**3) | |
| gpu_memory_used_gb = torch.cuda.memory_allocated(0) / (1024**3) | |
| available_memory_gb = gpu_memory_gb - gpu_memory_used_gb | |
| else: | |
| # Fallback to system memory estimation | |
| import psutil | |
| available_memory_gb = psutil.virtual_memory().available / (1024**3) | |
| # DPI scaling based on available memory | |
| if available_memory_gb >= 12: # High memory - use max quality | |
| optimal_dpi = 300 | |
| elif available_memory_gb >= 8: # Medium memory - use high quality | |
| optimal_dpi = 250 | |
| elif available_memory_gb >= 4: # Low memory - use medium quality | |
| optimal_dpi = 200 | |
| else: # Very low memory - use conservative DPI | |
| optimal_dpi = 150 | |
| # Ensure we don't exceed configured maximum | |
| optimal_dpi = min(optimal_dpi, config.pdf_dpi) | |
| log_event("dpi.adaptive", { | |
| "configured_dpi": config.pdf_dpi, | |
| "optimal_dpi": optimal_dpi, | |
| "available_memory_gb": available_memory_gb | |
| }) | |
| return optimal_dpi | |
| except Exception as e: | |
| log_event("dpi.adaptive.failed", {"error": str(e)}) | |
| return config.pdf_dpi | |
| def aggressive_memory_cleanup(): | |
| """Aggressive cleanup of memory after document processing to prevent ghost memory""" | |
| import gc | |
| import weakref | |
| cleanup_start = time.perf_counter() | |
| try: | |
| # Get memory before cleanup | |
| if torch.cuda.is_available(): | |
| gpu_memory_before = torch.cuda.memory_allocated() / (1024**3) | |
| gpu_memory_cached_before = torch.cuda.memory_reserved() / (1024**3) | |
| else: | |
| gpu_memory_before = 0 | |
| gpu_memory_cached_before = 0 | |
| import psutil | |
| process = psutil.Process() | |
| memory_before = process.memory_info().rss / (1024**3) | |
| # 1. Clear GPU cache and CUDA context | |
| if torch.cuda.is_available(): | |
| # Clear PyTorch CUDA cache | |
| torch.cuda.empty_cache() | |
| # Force garbage collection on CUDA tensors | |
| for _ in range(3): # Multiple passes to ensure cleanup | |
| torch.cuda.synchronize() | |
| torch.cuda.empty_cache() | |
| # 2. Aggressive Python garbage collection | |
| # Multiple passes to catch circular references and lazy cleanup | |
| collected_objects = [] | |
| for generation in range(3): | |
| collected = gc.collect() | |
| collected_objects.append(collected) | |
| # Force collection of weak references | |
| if hasattr(gc, 'collect'): | |
| gc.collect() | |
| # 3. Clear any remaining large objects | |
| # Clear PIL images from memory | |
| try: | |
| from PIL import Image | |
| if hasattr(Image, '_initialized'): | |
| # Clear PIL image cache | |
| Image._initialized = False | |
| except Exception: | |
| pass | |
| # 4. Final cleanup pass | |
| gc.collect() | |
| # Get memory after cleanup | |
| if torch.cuda.is_available(): | |
| gpu_memory_after = torch.cuda.memory_allocated() / (1024**3) | |
| gpu_memory_cached_after = torch.cuda.memory_reserved() / (1024**3) | |
| else: | |
| gpu_memory_after = 0 | |
| gpu_memory_cached_after = 0 | |
| memory_after = process.memory_info().rss / (1024**3) | |
| # Calculate cleanup effectiveness | |
| memory_freed_gb = memory_before - memory_after | |
| gpu_memory_freed_gb = gpu_memory_before - gpu_memory_after | |
| gpu_cache_freed_gb = gpu_memory_cached_before - gpu_memory_cached_after | |
| cleanup_time = (time.perf_counter() - cleanup_start) * 1000 | |
| log_event("memory.cleanup", { | |
| "memory_freed_gb": round(memory_freed_gb, 3), | |
| "gpu_memory_freed_gb": round(gpu_memory_freed_gb, 3), | |
| "gpu_cache_freed_gb": round(gpu_cache_freed_gb, 3), | |
| "cleanup_time_ms": round(cleanup_time, 2), | |
| "collected_objects": collected_objects, | |
| "memory_before_gb": round(memory_before, 3), | |
| "memory_after_gb": round(memory_after, 3), | |
| "gpu_memory_before_gb": round(gpu_memory_before, 3), | |
| "gpu_memory_after_gb": round(gpu_memory_after, 3) | |
| }) | |
| # Force a small delay to allow OS to reclaim memory | |
| time.sleep(0.1) | |
| except Exception as e: | |
| log_event("memory.cleanup.failed", {"error": str(e)}) | |
| # Still try basic cleanup even if detailed monitoring fails | |
| try: | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| except Exception: | |
| pass | |
| def infer_mime_type(storage_path: str) -> str: | |
| """Infer MIME type from file extension""" | |
| from pathlib import Path | |
| suffix = Path(storage_path.lower()).suffix | |
| if suffix == ".pdf": | |
| return "application/pdf" | |
| elif suffix in {".png"}: | |
| return "image/png" | |
| elif suffix in {".jpg", ".jpeg"}: | |
| return "image/jpeg" | |
| elif suffix in {".tif", ".tiff"}: | |
| return "image/tiff" | |
| return "application/octet-stream" | |
| # Gradio Interface | |
| def create_interface(): | |
| """Create Gradio interface for HF Spaces""" | |
| def verify_document(file_obj): | |
| """Interface function for document verification with memory cleanup""" | |
| if file_obj is None: | |
| return "Please upload a document", "" | |
| try: | |
| # Read file | |
| with open(file_obj.name, "rb") as f: | |
| document_bytes = f.read() | |
| # Process with WebAI | |
| mime_type = infer_mime_type(file_obj.name) | |
| image = prepare_image_for_model(document_bytes, mime_type) | |
| context = { | |
| "source_filename": file_obj.name, | |
| "organization_id": "demo" | |
| } | |
| prompt = build_webai_prompt(context) | |
| result = process_with_webai(image, prompt) | |
| if result["status"] == "success": | |
| analysis = parse_webai_response(result["response"]) | |
| return "Analysis Complete", json.dumps(analysis, indent=2) | |
| else: | |
| return "Processing Failed", result.get("error", "Unknown error") | |
| except Exception as e: | |
| return f"Error: {str(e)}", "" | |
| finally: | |
| # Ensure cleanup happens even if processing fails | |
| try: | |
| aggressive_memory_cleanup() | |
| except Exception: | |
| pass | |
| # Create Gradio interface | |
| with gr.Blocks(title="TenderHub WebAI Verification") as interface: | |
| gr.Markdown("# TenderHub WebAI Verification Worker") | |
| gr.Markdown("Upload tender documents for AI-powered verification analysis") | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_input = gr.File( | |
| label="Upload Document", | |
| file_types=[".pdf", ".png", ".jpg", ".jpeg", ".tiff"] | |
| ) | |
| verify_btn = gr.Button("Verify Document", variant="primary") | |
| with gr.Column(): | |
| status_output = gr.Textbox(label="Status") | |
| analysis_output = gr.Textbox( | |
| label="Analysis Result", | |
| lines=20, | |
| max_lines=30 | |
| ) | |
| verify_btn.click( | |
| verify_document, | |
| inputs=[file_input], | |
| outputs=[status_output, analysis_output] | |
| ) | |
| return interface | |
| # Worker loop for background processing | |
| def worker_loop(): | |
| """Background worker loop for processing verification jobs""" | |
| log_event("worker.start", {}) | |
| while True: | |
| try: | |
| with connect_database() as conn: | |
| job = claim_verification_job(conn) | |
| if job: | |
| log_event("job.claimed", {"job_id": job["id"], "tender_id": job["tender_id"]}) | |
| process_verification_job(job) | |
| else: | |
| # No jobs available, wait | |
| time.sleep(10) | |
| except Exception as e: | |
| log_event("worker.error", {"error": str(e)}) | |
| time.sleep(30) # Wait longer on errors | |
| # Main execution | |
| if __name__ == "__main__": | |
| # Setup model on startup | |
| setup_model() | |
| # Start background worker thread | |
| import threading | |
| worker_thread = threading.Thread(target=worker_loop, daemon=True) | |
| worker_thread.start() | |
| # Launch Gradio interface | |
| interface = create_interface() | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.getenv("PORT", "7860")), | |
| share=False | |
| ) | |