"""Interface web locale Picarones — application FastAPI. Lance avec : picarones serve [--port 8000] [--host 127.0.0.1] ou directement : uvicorn picarones.web.app:app --reload --port 8000 Routes ------ GET / Page principale (SPA) GET /api/status Version et état de l'application GET /api/engines Statut des moteurs OCR et LLMs disponibles GET /api/corpus/browse Parcourir les dossiers du serveur GET /api/reports Liste des rapports générés GET /api/normalization/profiles Profils de normalisation disponibles POST /api/benchmark/start Lancer un benchmark (retourne job_id) GET /api/benchmark/{job_id}/stream Stream SSE de progression GET /api/benchmark/{job_id}/status Statut courant d'un job POST /api/benchmark/{job_id}/cancel Annuler un job GET /api/htr-united/catalogue Catalogue HTR-United POST /api/htr-united/import Importer un corpus HTR-United GET /api/huggingface/search Rechercher des datasets HuggingFace POST /api/huggingface/import Importer un dataset HuggingFace GET /reports/{filename} Accéder à un rapport HTML généré """ from __future__ import annotations import asyncio import json import os import threading import time import uuid from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, AsyncIterator, Optional from fastapi import FastAPI, HTTPException, Query from fastapi.responses import FileResponse, HTMLResponse, StreamingResponse from pydantic import BaseModel from picarones import __version__ # --------------------------------------------------------------------------- # App initialization # --------------------------------------------------------------------------- app = FastAPI( title="Picarones", description="Plateforme de comparaison de moteurs OCR/HTR pour documents patrimoniaux", version=__version__, docs_url="/api/docs", redoc_url="/api/redoc", ) # --------------------------------------------------------------------------- # Job management # --------------------------------------------------------------------------- @dataclass class BenchmarkJob: job_id: str status: str = "pending" # pending | running | complete | error | cancelled progress: float = 0.0 # 0.0 – 1.0 current_engine: str = "" total_docs: int = 0 processed_docs: int = 0 output_path: str = "" error: str = "" started_at: Optional[str] = None finished_at: Optional[str] = None events: list[dict] = field(default_factory=list) _subscribers: list[asyncio.Queue] = field(default_factory=list) def add_event(self, kind: str, data: Any) -> None: event = {"kind": kind, "data": data, "ts": _iso_now()} self.events.append(event) for q in self._subscribers: try: q.put_nowait(event) except asyncio.QueueFull: pass def subscribe(self) -> asyncio.Queue: q: asyncio.Queue = asyncio.Queue(maxsize=200) self._subscribers.append(q) return q def unsubscribe(self, q: asyncio.Queue) -> None: try: self._subscribers.remove(q) except ValueError: pass def as_dict(self) -> dict: return { "job_id": self.job_id, "status": self.status, "progress": self.progress, "current_engine": self.current_engine, "total_docs": self.total_docs, "processed_docs": self.processed_docs, "output_path": self.output_path, "error": self.error, "started_at": self.started_at, "finished_at": self.finished_at, } _JOBS: dict[str, BenchmarkJob] = {} # --------------------------------------------------------------------------- # Pydantic models # --------------------------------------------------------------------------- class BenchmarkRequest(BaseModel): corpus_path: str engines: list[str] = ["tesseract"] normalization_profile: str = "nfc" output_dir: str = "./rapports/" report_name: str = "" lang: str = "fra" class HTRUnitedImportRequest(BaseModel): entry_id: str output_dir: str = "./corpus/" max_samples: int = 100 class HuggingFaceImportRequest(BaseModel): dataset_id: str output_dir: str = "./corpus/" split: str = "train" max_samples: int = 100 # --------------------------------------------------------------------------- # API — status # --------------------------------------------------------------------------- @app.get("/api/status") async def api_status() -> dict: return { "app": "Picarones", "version": __version__, "status": "ok", "timestamp": _iso_now(), } # --------------------------------------------------------------------------- # API — engines # --------------------------------------------------------------------------- @app.get("/api/engines") async def api_engines() -> dict: engines = [] # Tesseract tess = _check_engine("tesseract", "pytesseract") engines.append(tess) # Pero OCR pero = _check_engine("pero_ocr", "pero_ocr", label="Pero OCR") engines.append(pero) # Kraken kraken = _check_engine("kraken", "kraken", label="Kraken") engines.append(kraken) # Calamari calamari = _check_engine("calamari", "calamari_ocr", label="Calamari") engines.append(calamari) llms = [] # OpenAI llms.append({ "id": "openai", "label": "OpenAI (GPT-4o, GPT-4o mini)", "type": "llm", "available": bool(os.environ.get("OPENAI_API_KEY")), "key_env": "OPENAI_API_KEY", "status": "configured" if os.environ.get("OPENAI_API_KEY") else "missing_key", }) # Anthropic llms.append({ "id": "anthropic", "label": "Anthropic (Claude Sonnet, Haiku)", "type": "llm", "available": bool(os.environ.get("ANTHROPIC_API_KEY")), "key_env": "ANTHROPIC_API_KEY", "status": "configured" if os.environ.get("ANTHROPIC_API_KEY") else "missing_key", }) # Mistral llms.append({ "id": "mistral", "label": "Mistral (Mistral OCR, Pixtral, Large)", "type": "llm", "available": bool(os.environ.get("MISTRAL_API_KEY")), "key_env": "MISTRAL_API_KEY", "status": "configured" if os.environ.get("MISTRAL_API_KEY") else "missing_key", }) # Ollama ollama_available = _check_ollama() ollama_models = _list_ollama_models() if ollama_available else [] llms.append({ "id": "ollama", "label": "Ollama (Llama 3, Gemma, Phi — local)", "type": "llm_local", "available": ollama_available, "status": "running" if ollama_available else "not_running", "models": ollama_models, "base_url": "http://localhost:11434", }) return {"engines": engines, "llms": llms} def _check_engine(engine_id: str, module_name: str, label: str = "") -> dict: label = label or engine_id.replace("_", " ").title() try: __import__(module_name) installed = True except ImportError: installed = False version = "" if installed and engine_id == "tesseract": try: import pytesseract version = pytesseract.get_tesseract_version() version = str(version) except Exception: version = "installé" elif installed: try: mod = __import__(module_name) version = getattr(mod, "__version__", "installé") except Exception: version = "installé" return { "id": engine_id, "label": label, "type": "ocr", "available": installed, "version": version, "status": "available" if installed else "not_installed", } def _check_ollama() -> bool: import urllib.error, urllib.request try: with urllib.request.urlopen("http://localhost:11434/api/tags", timeout=2) as r: return r.status == 200 except Exception: return False def _list_ollama_models() -> list[str]: import urllib.error, urllib.request try: with urllib.request.urlopen("http://localhost:11434/api/tags", timeout=2) as r: data = json.loads(r.read().decode()) return [m.get("name", "") for m in data.get("models", [])] except Exception: return [] # --------------------------------------------------------------------------- # API — corpus browse # --------------------------------------------------------------------------- @app.get("/api/corpus/browse") async def api_corpus_browse(path: str = Query(default=".", description="Chemin à explorer")) -> dict: target = Path(path).resolve() if not target.exists() or not target.is_dir(): raise HTTPException(status_code=404, detail=f"Dossier non trouvé : {path}") items = [] try: for entry in sorted(target.iterdir()): item: dict[str, Any] = { "name": entry.name, "path": str(entry), "is_dir": entry.is_dir(), } if entry.is_dir(): # Compter les paires image/gt gt_count = sum(1 for f in entry.iterdir() if f.suffix == ".txt" and f.stem.endswith(".gt")) item["gt_count"] = gt_count item["has_corpus"] = gt_count > 0 items.append(item) except PermissionError as exc: raise HTTPException(status_code=403, detail=str(exc)) return { "current_path": str(target), "parent_path": str(target.parent) if target.parent != target else None, "items": items, } # --------------------------------------------------------------------------- # API — normalization profiles # --------------------------------------------------------------------------- @app.get("/api/normalization/profiles") async def api_normalization_profiles() -> dict: from picarones.core.normalization import get_builtin_profile profile_ids = [ "nfc", "caseless", "minimal", "medieval_french", "early_modern_french", "medieval_latin", ] profiles = [] for pid in profile_ids: try: p = get_builtin_profile(pid) profiles.append({ "id": pid, "name": p.name, "description": p.description or p.name, "caseless": p.caseless, "diplomatic_rules": len(p.diplomatic_table), }) except Exception: pass return {"profiles": profiles} # --------------------------------------------------------------------------- # API — reports # --------------------------------------------------------------------------- @app.get("/api/reports") async def api_reports(reports_dir: str = Query(default=".", description="Dossier rapports")) -> dict: target = Path(reports_dir).resolve() reports = [] search_dirs = [target, Path(".").resolve(), Path("./rapports").resolve()] seen: set[str] = set() for d in search_dirs: if not d.exists(): continue for f in sorted(d.glob("*.html"), key=lambda x: x.stat().st_mtime, reverse=True): if str(f) not in seen: seen.add(str(f)) stat = f.stat() reports.append({ "filename": f.name, "path": str(f), "size_kb": round(stat.st_size / 1024, 1), "modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(), "url": f"/reports/{f.name}", }) return {"reports": reports} @app.get("/reports/{filename}") async def serve_report(filename: str) -> FileResponse: # Cherche dans le répertoire courant et ./rapports/ for d in [Path("."), Path("./rapports")]: f = d / filename if f.exists() and f.suffix == ".html": return FileResponse(str(f.resolve()), media_type="text/html") raise HTTPException(status_code=404, detail=f"Rapport non trouvé : {filename}") # --------------------------------------------------------------------------- # API — HTR-United # --------------------------------------------------------------------------- @app.get("/api/htr-united/catalogue") async def api_htr_united_catalogue( query: str = Query(default="", description="Recherche textuelle"), language: str = Query(default="", description="Filtre langue"), script: str = Query(default="", description="Filtre type d'écriture"), ) -> dict: from picarones.importers.htr_united import HTRUnitedCatalogue cat = HTRUnitedCatalogue.from_demo() results = cat.search( query=query, language=language or None, script=script or None, ) return { "source": cat.source, "total": len(results), "entries": [e.as_dict() for e in results], "available_languages": cat.available_languages(), "available_scripts": cat.available_scripts(), } @app.post("/api/htr-united/import") async def api_htr_united_import(req: HTRUnitedImportRequest) -> dict: from picarones.importers.htr_united import HTRUnitedCatalogue, import_htr_united_corpus cat = HTRUnitedCatalogue.from_demo() entry = cat.get_by_id(req.entry_id) if not entry: raise HTTPException(status_code=404, detail=f"Entrée non trouvée : {req.entry_id}") result = import_htr_united_corpus( entry=entry, output_dir=req.output_dir, max_samples=req.max_samples, ) return result # --------------------------------------------------------------------------- # API — HuggingFace # --------------------------------------------------------------------------- @app.get("/api/huggingface/search") async def api_huggingface_search( query: str = Query(default="", description="Requête de recherche"), language: str = Query(default="", description="Filtre langue"), tags: str = Query(default="", description="Tags séparés par des virgules"), limit: int = Query(default=20, ge=1, le=50), ) -> dict: from picarones.importers.huggingface import HuggingFaceImporter tag_list = [t.strip() for t in tags.split(",") if t.strip()] if tags else None importer = HuggingFaceImporter() results = importer.search( query=query, tags=tag_list, language=language or None, limit=limit, ) return { "total": len(results), "datasets": [ds.as_dict() for ds in results], } @app.post("/api/huggingface/import") async def api_huggingface_import(req: HuggingFaceImportRequest) -> dict: from picarones.importers.huggingface import HuggingFaceImporter importer = HuggingFaceImporter() result = importer.import_dataset( dataset_id=req.dataset_id, output_dir=req.output_dir, split=req.split, max_samples=req.max_samples, ) return result # --------------------------------------------------------------------------- # API — benchmark # --------------------------------------------------------------------------- @app.post("/api/benchmark/start") async def api_benchmark_start(req: BenchmarkRequest) -> dict: corpus_path = Path(req.corpus_path) if not corpus_path.exists() or not corpus_path.is_dir(): raise HTTPException(status_code=400, detail=f"Corpus non trouvé : {req.corpus_path}") job_id = str(uuid.uuid4()) job = BenchmarkJob(job_id=job_id) _JOBS[job_id] = job # Démarrer le benchmark dans un thread séparé thread = threading.Thread( target=_run_benchmark_thread, args=(job, req), daemon=True, ) thread.start() return {"job_id": job_id, "status": "pending"} @app.get("/api/benchmark/{job_id}/status") async def api_benchmark_status(job_id: str) -> dict: job = _JOBS.get(job_id) if not job: raise HTTPException(status_code=404, detail=f"Job non trouvé : {job_id}") return job.as_dict() @app.post("/api/benchmark/{job_id}/cancel") async def api_benchmark_cancel(job_id: str) -> dict: job = _JOBS.get(job_id) if not job: raise HTTPException(status_code=404, detail=f"Job non trouvé : {job_id}") if job.status in ("complete", "error"): return {"job_id": job_id, "status": job.status, "message": "Job déjà terminé."} job.status = "cancelled" job.add_event("cancelled", {"message": "Benchmark annulé par l'utilisateur."}) return {"job_id": job_id, "status": "cancelled"} @app.get("/api/benchmark/{job_id}/stream") async def api_benchmark_stream(job_id: str) -> StreamingResponse: job = _JOBS.get(job_id) if not job: raise HTTPException(status_code=404, detail=f"Job non trouvé : {job_id}") async def event_generator() -> AsyncIterator[str]: # Envoie d'abord les événements déjà produits for event in list(job.events): yield _sse_format(event["kind"], event["data"]) if job.status in ("complete", "error", "cancelled"): yield _sse_format("done", {"status": job.status}) return queue = job.subscribe() try: while True: try: event = await asyncio.wait_for(queue.get(), timeout=30.0) yield _sse_format(event["kind"], event["data"]) if event["kind"] in ("complete", "error", "cancelled", "done"): break except asyncio.TimeoutError: # Keepalive yield ": keepalive\n\n" if job.status in ("complete", "error", "cancelled"): yield _sse_format("done", {"status": job.status}) break finally: job.unsubscribe(queue) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", }, ) def _sse_format(event_type: str, data: Any) -> str: payload = json.dumps(data, ensure_ascii=False) return f"event: {event_type}\ndata: {payload}\n\n" def _run_benchmark_thread(job: BenchmarkJob, req: BenchmarkRequest) -> None: """Exécute le benchmark dans un thread et envoie des événements SSE.""" import time job.status = "running" job.started_at = _iso_now() job.add_event("start", {"message": "Démarrage du benchmark…", "corpus": req.corpus_path}) try: from picarones.core.corpus import load_corpus_from_directory from picarones.core.runner import run_benchmark # Charger le corpus job.add_event("log", {"message": f"Chargement du corpus : {req.corpus_path}"}) corpus = load_corpus_from_directory(req.corpus_path) job.total_docs = len(corpus) job.add_event("log", {"message": f"{job.total_docs} documents chargés."}) if job.status == "cancelled": return # Instancier les moteurs from picarones.cli import _engine_from_name import click ocr_engines = [] for engine_name in req.engines: try: eng = _engine_from_name(engine_name, lang=req.lang, psm=6) ocr_engines.append(eng) job.add_event("log", {"message": f"Moteur chargé : {engine_name}"}) except (click.BadParameter, Exception) as exc: job.add_event("warning", {"message": f"Moteur ignoré '{engine_name}' : {exc}"}) if not ocr_engines: raise ValueError("Aucun moteur valide disponible.") # Répertoire de sortie output_dir = Path(req.output_dir) output_dir.mkdir(parents=True, exist_ok=True) report_name = req.report_name or f"rapport_{datetime.now().strftime('%Y%m%d_%H%M%S')}" output_json = str(output_dir / f"{report_name}.json") output_html = str(output_dir / f"{report_name}.html") # Callback de progression (injecté dans un wrapper) n_engines = len(ocr_engines) total_steps = job.total_docs * n_engines step_counter = [0] original_engine_names = [e.name for e in ocr_engines] def _progress_callback(engine_name: str, doc_idx: int, doc_id: str) -> None: if job.status == "cancelled": return step_counter[0] += 1 job.current_engine = engine_name job.processed_docs = doc_idx job.progress = step_counter[0] / max(total_steps, 1) job.add_event("progress", { "engine": engine_name, "doc_idx": doc_idx, "doc_id": doc_id, "progress": job.progress, "processed": step_counter[0], "total": total_steps, }) # Lancer le benchmark result = run_benchmark( corpus=corpus, engines=ocr_engines, output_json=output_json, show_progress=False, progress_callback=_progress_callback, ) if job.status == "cancelled": return # Générer le rapport HTML job.add_event("log", {"message": "Génération du rapport HTML…"}) from picarones.report.generator import ReportGenerator gen = ReportGenerator(result) gen.generate(output_html) job.output_path = output_html job.progress = 1.0 job.status = "complete" job.finished_at = _iso_now() # Classement final ranking = result.ranking() job.add_event("complete", { "message": "Benchmark terminé.", "output_html": output_html, "output_json": output_json, "ranking": ranking, }) except Exception as exc: job.status = "error" job.error = str(exc) job.finished_at = _iso_now() job.add_event("error", {"message": f"Erreur : {exc}"}) # --------------------------------------------------------------------------- # Page principale HTML (SPA) # --------------------------------------------------------------------------- @app.get("/", response_class=HTMLResponse) async def index() -> HTMLResponse: return HTMLResponse(content=_HTML_TEMPLATE) # --------------------------------------------------------------------------- # Helper # --------------------------------------------------------------------------- def _iso_now() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") # --------------------------------------------------------------------------- # HTML Template (SPA, French/English, Vanilla JS) # --------------------------------------------------------------------------- _HTML_TEMPLATE = r""" Picarones — OCR Benchmark

1. Corpus

2. Moteurs et pipelines

Chargement…

3. Options

Rapports générés

Chargement…

Moteurs OCR

Chargement…

LLMs disponibles

Chargement…

Import HTR-United

Catalogue communautaire de corpus HTR/OCR pour documents patrimoniaux.

Import HuggingFace Datasets

Datasets OCR/HTR publics depuis HuggingFace Hub (IAM, RIMES, CATMuS, Gallica…).

"""