engresearch's picture
Upload 2 files
1434f14 verified
Raw
History Blame Contribute Delete
33 kB
"""
HF Spaces application for WebAI verification worker.
Uses ZeroGPU for on-demand GPU processing with webAI-ColVec1-4b.
"""
import os
import json
import time
import hashlib
import gc
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from datetime import datetime, timezone
import gradio as gr
import spaces
import torch
from transformers import AutoModel, AutoProcessor, BitsAndBytesConfig
def env_bool(name: str, default: bool = False) -> bool:
"""Parse boolean from environment variable"""
value = os.getenv(name, "").lower()
return value in ("true", "1", "yes", "on") if value else default
import psycopg
from psycopg.rows import dict_row
import httpx
from PIL import Image
import io
# Configuration
@dataclass
class WebAIConfig:
"""Configuration for WebAI verification worker"""
database_url: str = os.getenv("DATABASE_URL", "").strip()
supabase_url: str = os.getenv("NEXT_PUBLIC_SUPABASE_URL", "").rstrip("/")
supabase_service_role_key: str = os.getenv("SUPABASE_SERVICE_ROLE_KEY", "").strip()
storage_bucket: str = os.getenv("SUPABASE_STORAGE_BUCKET", "tender-documents")
# WebAI Model Configuration
# Using 4b model weights with 9b processor (shared Qwen 3.5 architecture)
model_name: str = "webAI-Official/webAI-ColVec1-4b"
processor_name: str = "webAI-Official/webAI-ColVec1-9b" # Has working config
use_8bit_quantization: bool = True
use_4bit_quantization: bool = False
use_flash_attention_2: bool = True
max_new_tokens: int = 512
image_size: int = 336 # webAI models typically use 336x336
pdf_dpi: int = int(os.getenv("PDF_DPI", "200")) # High DPI for better extraction
adaptive_dpi: bool = env_bool("ADAPTIVE_DPI", True) # Scale DPI based on memory
# Processing Configuration
batch_size: int = 1
timeout_seconds: int = 300
def __post_init__(self):
if not self.database_url:
raise ValueError("DATABASE_URL is required")
if not self.supabase_url or not self.supabase_service_role_key:
raise ValueError("Supabase credentials are required")
# Global configuration
CONFIG = WebAIConfig()
# Model loading (will be moved to CPU initially)
model = None
processor = None
def log_event(event: str, payload: Dict[str, Any]) -> None:
"""Structured logging for monitoring"""
record = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"event": event,
"service": "webai-verification-worker",
**payload
}
print(json.dumps(record, default=str), flush=True)
def setup_model():
"""Initialize WebAI model with memory optimization"""
global model, processor
try:
log_event("model.setup.start", {"model": CONFIG.model_name})
# Configure quantization for memory efficiency
quantization_config = None
if CONFIG.use_8bit_quantization:
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
)
elif CONFIG.use_4bit_quantization:
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True,
)
# Load processor from 9b repo (has working config) + model from 4b repo
log_event("model.setup.using_processor_workaround", {
"model": CONFIG.model_name,
"processor_source": CONFIG.processor_name
})
processor = AutoProcessor.from_pretrained(
CONFIG.processor_name, # Use 9b config
trust_remote_code=True,
)
# Note: processor is tied to model architecture, both use Qwen 3.5-VL base
# Configure model kwargs for FlashAttention-2
model_kwargs = {
"quantization_config": quantization_config,
"device_map": "auto", # Will map to CPU initially
"trust_remote_code": True,
"low_cpu_mem_usage": True,
}
# Add FlashAttention-2 configuration if enabled
if CONFIG.use_flash_attention_2:
try:
# Check if FlashAttention-2 is available
import flash_attn
model_kwargs.update({
"attn_implementation": "flash_attention_2",
"torch_dtype": torch.float16, # FlashAttention-2 works best with fp16
})
log_event("flash_attention.enabled", {"version": getattr(flash_attn, '__version__', 'unknown')})
except ImportError:
log_event("flash_attention.unavailable", {"reason": "flash_attn not installed"})
# Fallback to standard attention
model_kwargs["torch_dtype"] = torch.float16
# Load model with memory optimization
model = AutoModel.from_pretrained(
CONFIG.model_name,
**model_kwargs
)
quantization_type = "8bit" if CONFIG.use_8bit_quantization else ("4bit" if CONFIG.use_4bit_quantization else "none")
flash_attention_enabled = CONFIG.use_flash_attention_2 and "flash_attn" in globals()
log_event("model.setup.success", {
"model": CONFIG.model_name,
"quantization": quantization_type,
"flash_attention_2": flash_attention_enabled,
"memory_optimization": "8bit_quantization" + ("_flash_attention" if flash_attention_enabled else "")
})
except Exception as e:
log_event("model.setup.failed", {"error": str(e)})
raise
def connect_database() -> psycopg.Connection:
"""Connect to PostgreSQL database"""
try:
return psycopg.connect(
CONFIG.database_url,
autocommit=False,
prepare_threshold=None,
)
except Exception as e:
log_event("db.connect.failed", {"error": str(e)})
raise
def download_document(storage_path: str) -> bytes:
"""Download document from Supabase storage"""
from urllib.parse import quote
encoded_path = quote(storage_path, safe="/")
url = f"{CONFIG.supabase_url}/storage/v1/object/{CONFIG.storage_bucket}/{encoded_path}"
headers = {
"Authorization": f"Bearer {CONFIG.supabase_service_role_key}",
"apikey": CONFIG.supabase_service_role_key,
}
with httpx.Client(timeout=90.0, follow_redirects=True) as client:
response = client.get(url, headers=headers)
if response.status_code != 200:
raise RuntimeError(f"Failed to download document: {response.status_code}")
return response.content
def prepare_image_for_model(document_bytes: bytes, mime_type: str) -> Image.Image:
"""Convert document to image format for WebAI model with high DPI for better extraction"""
if mime_type == "application/pdf":
# For PDFs, use high DPI conversion for better text extraction
try:
from pdf2image import convert_from_bytes
# Get optimal DPI based on available memory
optimal_dpi = get_optimal_dpi(CONFIG)
log_event("pdf.conversion.start", {
"dpi": optimal_dpi,
"adaptive": CONFIG.adaptive_dpi
})
# Convert with high DPI for better text extraction
images = convert_from_bytes(
document_bytes,
dpi=optimal_dpi,
first_page=True,
fmt='JPEG',
thread_count=1 # Reduce memory usage
)
if images:
img = images[0]
# Log image dimensions for monitoring
original_size = img.size
estimated_size_mb = (original_size[0] * original_size[1] * 3) / (1024**2)
log_event("pdf.converted", {
"original_size": original_size,
"estimated_size_mb": estimated_size_mb,
"dpi_used": optimal_dpi
})
# Resize to model's expected size while preserving quality
img = img.resize((CONFIG.image_size, CONFIG.image_size), Image.Resampling.LANCZOS)
return img
except ImportError:
log_event("pdf.fallback", {"reason": "pdf2image not available"})
return create_placeholder_image()
except Exception as e:
log_event("pdf.conversion.failed", {"error": str(e)})
# Fallback to lower DPI if high DPI fails
try:
from pdf2image import convert_from_bytes
images = convert_from_bytes(document_bytes, dpi=150, first_page=True)
if images:
img = images[0].resize((CONFIG.image_size, CONFIG.image_size), Image.Resampling.LANCZOS)
return img
except Exception:
return create_placeholder_image()
# For image formats, convert and resize
try:
img = Image.open(io.BytesIO(document_bytes))
# For existing images, check if they need enhancement
original_size = img.size
if max(original_size) < 1000: # Small image, might be low quality
log_event("image.low_resolution", {"size": original_size})
img = img.resize((CONFIG.image_size, CONFIG.image_size), Image.Resampling.LANCZOS)
return img.convert("RGB")
except Exception as e:
log_event("image.convert.failed", {"error": str(e)})
return create_placeholder_image()
def create_placeholder_image() -> Image.Image:
"""Create a placeholder image when conversion fails"""
from PIL import Image, ImageDraw, ImageFont
img = Image.new('RGB', (CONFIG.image_size, CONFIG.image_size), color='white')
draw = ImageDraw.Draw(img)
# Draw placeholder text
try:
font = ImageFont.load_default()
text = "Document\nProcessing\nError"
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
x = (CONFIG.image_size - text_width) // 2
y = (CONFIG.image_size - text_height) // 2
draw.text((x, y), text, fill='black', font=font)
except Exception:
pass # Font loading failed
return img
@spaces.GPU
def process_with_webai(image: Image.Image, prompt: str) -> Dict[str, Any]:
"""Process document with WebAI model on GPU with aggressive memory cleanup"""
try:
# Prepare inputs
inputs = processor(
images=image,
text=prompt,
return_tensors="pt"
).to("cuda")
# Generate response
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=CONFIG.max_new_tokens,
do_sample=True,
temperature=0.1,
pad_token_id=processor.tokenizer.eos_token_id
)
# Decode response
response_text = processor.tokenizer.decode(
outputs[0],
skip_special_tokens=True
)
# Clean up GPU memory
del inputs, outputs
torch.cuda.empty_cache()
# Aggressive memory cleanup to prevent ghost memory from vision tensors
aggressive_memory_cleanup()
return {"response": response_text, "status": "success"}
except Exception as e:
log_event("webai.process.failed", {"error": str(e)})
# Still attempt cleanup even on failure
try:
aggressive_memory_cleanup()
except Exception:
pass
return {"response": "", "status": "failed", "error": str(e)}
def build_webai_prompt(tender_context: Dict[str, Any]) -> str:
"""Build prompt for WebAI model analysis"""
return f"""
Analyze this tender document and provide structured JSON output.
Context:
- Source: {tender_context.get('source_filename', 'Unknown')}
- Organization: {tender_context.get('organization_id', 'Unknown')}
Provide analysis in this exact JSON format:
{{
"tenderTitle": "string",
"procuringEntity": "string",
"tenderCategory": "string",
"submissionDeadline": "YYYY-MM-DD",
"mandatoryDocuments": ["string"],
"eligibilityCriteria": ["string"],
"hardBlockers": ["string"],
"risks": ["string"],
"ambiguities": ["string"],
"confidence": 0.0,
"summary": "string",
"complianceItems": [
{{
"title": "string",
"requirementText": "string",
"status": "PASS|FAIL|UNKNOWN",
"severity": "LOW|MEDIUM|HIGH|CRITICAL",
"remediation": "string",
"pageReference": 1
}}
],
"bidScore": {{
"score": 0,
"decision": "BID|NO_BID|REVIEW",
"blockers": ["string"],
"confidence": 0.0,
"explanation": "string"
}}
}}
Focus on accuracy and completeness. Extract all visible requirements and criteria.
"""
def parse_webai_response(response_text: str) -> Dict[str, Any]:
"""Parse WebAI model response into structured format"""
try:
# Try to extract JSON from response
import re
# Look for JSON pattern
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
json_str = json_match.group(0)
return json.loads(json_str)
else:
# Fallback: try to parse entire response
return json.loads(response_text)
except json.JSONDecodeError:
# Return error structure if parsing fails
return {
"error": "JSON_PARSE_ERROR",
"raw_response": response_text[:500], # First 500 chars
"confidence": 0.0,
"summary": "Failed to parse WebAI response"
}
def compare_analyses(primary_analysis: Dict[str, Any], webai_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""Compare primary worker and WebAI analysis results"""
comparison = {
"agreement_score": 0.0,
"differences": [],
"confidence_comparison": {},
"recommendation_comparison": {},
"timestamp": datetime.now(timezone.utc).isoformat()
}
try:
# Compare bid decisions
primary_decision = primary_analysis.get("bidScore", {}).get("decision", "REVIEW")
webai_decision = webai_analysis.get("bidScore", {}).get("decision", "REVIEW")
comparison["recommendation_comparison"] = {
"primary": primary_decision,
"webai": webai_decision,
"agrees": primary_decision == webai_decision
}
# Compare confidence scores
primary_conf = primary_analysis.get("confidence", 0.0)
webai_conf = webai_analysis.get("confidence", 0.0)
comparison["confidence_comparison"] = {
"primary": primary_conf,
"webai": webai_conf,
"difference": abs(primary_conf - webai_conf)
}
# Calculate overall agreement score
agreement_factors = []
# Bid decision agreement (weight: 0.4)
agreement_factors.append(1.0 if primary_decision == webai_decision else 0.0)
# Confidence similarity (weight: 0.3)
conf_similarity = 1.0 - min(abs(primary_conf - webai_conf), 1.0)
agreement_factors.append(conf_similarity)
# Category agreement (weight: 0.2)
primary_cat = primary_analysis.get("tenderCategory", "").lower()
webai_cat = webai_analysis.get("tenderCategory", "").lower()
category_similarity = 1.0 if primary_cat == webai_cat else 0.5 # Partial credit for similar categories
agreement_factors.append(category_similarity)
# Deadline agreement (weight: 0.1)
primary_deadline = primary_analysis.get("submissionDeadline", "")
webai_deadline = webai_analysis.get("submissionDeadline", "")
deadline_similarity = 1.0 if primary_deadline == webai_deadline else 0.0
agreement_factors.append(deadline_similarity)
# Weighted average
weights = [0.4, 0.3, 0.2, 0.1]
comparison["agreement_score"] = sum(f * w for f, w in zip(agreement_factors, weights))
# Identify key differences
if primary_decision != webai_decision:
comparison["differences"].append({
"type": "bid_decision",
"primary": primary_decision,
"webai": webai_decision,
"severity": "HIGH"
})
if abs(primary_conf - webai_conf) > 0.3:
comparison["differences"].append({
"type": "confidence",
"primary": primary_conf,
"webai": webai_conf,
"severity": "MEDIUM"
})
except Exception as e:
log_event("comparison.failed", {"error": str(e)})
comparison["error"] = str(e)
return comparison
def store_verification_result(
conn: psycopg.Connection,
tender_id: str,
webai_analysis: Dict[str, Any],
comparison: Dict[str, Any]
) -> None:
"""Store verification results in database"""
try:
with conn.transaction():
with conn.cursor() as cur:
# Store WebAI analysis
cur.execute("""
INSERT INTO public.webai_verifications
(tender_id, analysis, comparison, created_at)
VALUES (%s, %s::jsonb, %s::jsonb, now())
""", (
tender_id,
json.dumps(webai_analysis),
json.dumps(comparison)
))
# Update tender with verification status
cur.execute("""
UPDATE public.tenders
SET verification_status = 'COMPLETED',
verification_score = %s,
updated_at = now()
WHERE id = %s
""", (comparison["agreement_score"], tender_id))
except Exception as e:
log_event("verification.store.failed", {"error": str(e), "tender_id": tender_id})
raise
def claim_verification_job(conn: psycopg.Connection) -> Optional[Dict[str, Any]]:
"""Claim a job for verification processing"""
with conn.transaction():
with conn.cursor(row_factory=dict_row) as cur:
cur.execute("""
SELECT j.id, j.tender_id, j.payload
FROM public.processing_jobs j
JOIN public.tenders t ON j.tender_id = t.id
WHERE j.job_type = 'VERIFY'
AND j.status = 'QUEUED'
AND j.available_at <= now()
AND t.status = 'ANALYSIS_READY'
ORDER BY j.created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
""")
job = cur.fetchone()
if job:
cur.execute("""
UPDATE public.processing_jobs
SET status = 'RUNNING',
locked_at = now(),
attempt_count = attempt_count + 1,
updated_at = now()
WHERE id = %s
""", (job["id"],))
return job
def process_verification_job(job: Dict[str, Any]) -> None:
"""Process a single verification job with aggressive memory cleanup"""
job_id = job["id"]
tender_id = job["tender_id"]
try:
with connect_database() as conn:
with conn.cursor(row_factory=dict_row) as cur:
# Get tender context
cur.execute("""
SELECT t.id, t.organization_id, t.source_filename,
COALESCE(d.storage_path, t.storage_path) as storage_path,
e.structured_output as primary_analysis
FROM public.tenders t
LEFT JOIN public.documents d ON d.tender_id = t.id
LEFT JOIN public.extractions e ON e.tender_id = t.id
WHERE t.id = %s
""", (tender_id,))
context = cur.fetchone()
if not context:
raise RuntimeError(f"Tender {tender_id} not found")
if not context.get("primary_analysis"):
raise RuntimeError(f"Primary analysis not available for tender {tender_id}")
# Download and process document
document_bytes = download_document(context["storage_path"])
mime_type = infer_mime_type(context["storage_path"])
# Prepare image for WebAI
image = prepare_image_for_model(document_bytes, mime_type)
# Build prompt and process
prompt = build_webai_prompt(context)
webai_result = process_with_webai(image, prompt)
if webai_result["status"] != "success":
raise RuntimeError(f"WebAI processing failed: {webai_result.get('error', 'Unknown error')}")
# Parse response
webai_analysis = parse_webai_response(webai_result["response"])
if "error" in webai_analysis:
raise RuntimeError(f"WebAI response parsing failed: {webai_analysis['error']}")
# Compare with primary analysis
comparison = compare_analyses(
context["primary_analysis"],
webai_analysis
)
# Store results
store_verification_result(conn, tender_id, webai_analysis, comparison)
# Mark job as completed
cur.execute("""
UPDATE public.processing_jobs
SET status = 'SUCCEEDED',
updated_at = now()
WHERE id = %s
""", (job_id,))
conn.commit()
log_event("verification.completed", {
"job_id": job_id,
"tender_id": tender_id,
"agreement_score": comparison["agreement_score"]
})
except Exception as e:
log_event("verification.failed", {
"job_id": job_id,
"tender_id": tender_id,
"error": str(e)
})
# Mark job as failed
try:
with connect_database() as conn:
with conn.cursor() as cur:
cur.execute("""
UPDATE public.processing_jobs
SET status = 'FAILED',
last_error = %s,
updated_at = now()
WHERE id = %s
""", (str(e)[:2000], job_id))
conn.commit()
except Exception:
pass
finally:
# Aggressive cleanup after every job to prevent ghost memory
try:
aggressive_memory_cleanup()
except Exception:
pass
def get_optimal_dpi(config: WebAIConfig) -> int:
"""Calculate optimal DPI based on available memory and configuration"""
if not config.adaptive_dpi:
return config.pdf_dpi
try:
# Check available GPU memory
if torch.cuda.is_available():
gpu_memory_gb = torch.cuda.get_device_properties(0).total_memory / (1024**3)
gpu_memory_used_gb = torch.cuda.memory_allocated(0) / (1024**3)
available_memory_gb = gpu_memory_gb - gpu_memory_used_gb
else:
# Fallback to system memory estimation
import psutil
available_memory_gb = psutil.virtual_memory().available / (1024**3)
# DPI scaling based on available memory
if available_memory_gb >= 12: # High memory - use max quality
optimal_dpi = 300
elif available_memory_gb >= 8: # Medium memory - use high quality
optimal_dpi = 250
elif available_memory_gb >= 4: # Low memory - use medium quality
optimal_dpi = 200
else: # Very low memory - use conservative DPI
optimal_dpi = 150
# Ensure we don't exceed configured maximum
optimal_dpi = min(optimal_dpi, config.pdf_dpi)
log_event("dpi.adaptive", {
"configured_dpi": config.pdf_dpi,
"optimal_dpi": optimal_dpi,
"available_memory_gb": available_memory_gb
})
return optimal_dpi
except Exception as e:
log_event("dpi.adaptive.failed", {"error": str(e)})
return config.pdf_dpi
def aggressive_memory_cleanup():
"""Aggressive cleanup of memory after document processing to prevent ghost memory"""
import gc
import weakref
cleanup_start = time.perf_counter()
try:
# Get memory before cleanup
if torch.cuda.is_available():
gpu_memory_before = torch.cuda.memory_allocated() / (1024**3)
gpu_memory_cached_before = torch.cuda.memory_reserved() / (1024**3)
else:
gpu_memory_before = 0
gpu_memory_cached_before = 0
import psutil
process = psutil.Process()
memory_before = process.memory_info().rss / (1024**3)
# 1. Clear GPU cache and CUDA context
if torch.cuda.is_available():
# Clear PyTorch CUDA cache
torch.cuda.empty_cache()
# Force garbage collection on CUDA tensors
for _ in range(3): # Multiple passes to ensure cleanup
torch.cuda.synchronize()
torch.cuda.empty_cache()
# 2. Aggressive Python garbage collection
# Multiple passes to catch circular references and lazy cleanup
collected_objects = []
for generation in range(3):
collected = gc.collect()
collected_objects.append(collected)
# Force collection of weak references
if hasattr(gc, 'collect'):
gc.collect()
# 3. Clear any remaining large objects
# Clear PIL images from memory
try:
from PIL import Image
if hasattr(Image, '_initialized'):
# Clear PIL image cache
Image._initialized = False
except Exception:
pass
# 4. Final cleanup pass
gc.collect()
# Get memory after cleanup
if torch.cuda.is_available():
gpu_memory_after = torch.cuda.memory_allocated() / (1024**3)
gpu_memory_cached_after = torch.cuda.memory_reserved() / (1024**3)
else:
gpu_memory_after = 0
gpu_memory_cached_after = 0
memory_after = process.memory_info().rss / (1024**3)
# Calculate cleanup effectiveness
memory_freed_gb = memory_before - memory_after
gpu_memory_freed_gb = gpu_memory_before - gpu_memory_after
gpu_cache_freed_gb = gpu_memory_cached_before - gpu_memory_cached_after
cleanup_time = (time.perf_counter() - cleanup_start) * 1000
log_event("memory.cleanup", {
"memory_freed_gb": round(memory_freed_gb, 3),
"gpu_memory_freed_gb": round(gpu_memory_freed_gb, 3),
"gpu_cache_freed_gb": round(gpu_cache_freed_gb, 3),
"cleanup_time_ms": round(cleanup_time, 2),
"collected_objects": collected_objects,
"memory_before_gb": round(memory_before, 3),
"memory_after_gb": round(memory_after, 3),
"gpu_memory_before_gb": round(gpu_memory_before, 3),
"gpu_memory_after_gb": round(gpu_memory_after, 3)
})
# Force a small delay to allow OS to reclaim memory
time.sleep(0.1)
except Exception as e:
log_event("memory.cleanup.failed", {"error": str(e)})
# Still try basic cleanup even if detailed monitoring fails
try:
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
except Exception:
pass
def infer_mime_type(storage_path: str) -> str:
"""Infer MIME type from file extension"""
from pathlib import Path
suffix = Path(storage_path.lower()).suffix
if suffix == ".pdf":
return "application/pdf"
elif suffix in {".png"}:
return "image/png"
elif suffix in {".jpg", ".jpeg"}:
return "image/jpeg"
elif suffix in {".tif", ".tiff"}:
return "image/tiff"
return "application/octet-stream"
# Gradio Interface
def create_interface():
"""Create Gradio interface for HF Spaces"""
def verify_document(file_obj):
"""Interface function for document verification with memory cleanup"""
if file_obj is None:
return "Please upload a document", ""
try:
# Read file
with open(file_obj.name, "rb") as f:
document_bytes = f.read()
# Process with WebAI
mime_type = infer_mime_type(file_obj.name)
image = prepare_image_for_model(document_bytes, mime_type)
context = {
"source_filename": file_obj.name,
"organization_id": "demo"
}
prompt = build_webai_prompt(context)
result = process_with_webai(image, prompt)
if result["status"] == "success":
analysis = parse_webai_response(result["response"])
return "Analysis Complete", json.dumps(analysis, indent=2)
else:
return "Processing Failed", result.get("error", "Unknown error")
except Exception as e:
return f"Error: {str(e)}", ""
finally:
# Ensure cleanup happens even if processing fails
try:
aggressive_memory_cleanup()
except Exception:
pass
# Create Gradio interface
with gr.Blocks(title="TenderHub WebAI Verification") as interface:
gr.Markdown("# TenderHub WebAI Verification Worker")
gr.Markdown("Upload tender documents for AI-powered verification analysis")
with gr.Row():
with gr.Column():
file_input = gr.File(
label="Upload Document",
file_types=[".pdf", ".png", ".jpg", ".jpeg", ".tiff"]
)
verify_btn = gr.Button("Verify Document", variant="primary")
with gr.Column():
status_output = gr.Textbox(label="Status")
analysis_output = gr.Textbox(
label="Analysis Result",
lines=20,
max_lines=30
)
verify_btn.click(
verify_document,
inputs=[file_input],
outputs=[status_output, analysis_output]
)
return interface
# Worker loop for background processing
def worker_loop():
"""Background worker loop for processing verification jobs"""
log_event("worker.start", {})
while True:
try:
with connect_database() as conn:
job = claim_verification_job(conn)
if job:
log_event("job.claimed", {"job_id": job["id"], "tender_id": job["tender_id"]})
process_verification_job(job)
else:
# No jobs available, wait
time.sleep(10)
except Exception as e:
log_event("worker.error", {"error": str(e)})
time.sleep(30) # Wait longer on errors
# Main execution
if __name__ == "__main__":
# Setup model on startup
setup_model()
# Start background worker thread
import threading
worker_thread = threading.Thread(target=worker_loop, daemon=True)
worker_thread.start()
# Launch Gradio interface
interface = create_interface()
interface.launch(
server_name="0.0.0.0",
server_port=int(os.getenv("PORT", "7860")),
share=False
)