"""Interface web locale Picarones — application FastAPI.
Lance avec :
picarones serve [--port 8000] [--host 127.0.0.1]
ou directement :
uvicorn picarones.web.app:app --reload --port 8000
Routes
------
GET / Page principale (SPA)
GET /api/status Version et état de l'application
GET /api/engines Statut des moteurs OCR et LLMs disponibles
GET /api/corpus/browse Parcourir les dossiers du serveur
GET /api/reports Liste des rapports générés
GET /api/normalization/profiles Profils de normalisation disponibles
POST /api/benchmark/start Lancer un benchmark (retourne job_id)
GET /api/benchmark/{job_id}/stream Stream SSE de progression
GET /api/benchmark/{job_id}/status Statut courant d'un job
POST /api/benchmark/{job_id}/cancel Annuler un job
GET /api/htr-united/catalogue Catalogue HTR-United
POST /api/htr-united/import Importer un corpus HTR-United
GET /api/huggingface/search Rechercher des datasets HuggingFace
POST /api/huggingface/import Importer un dataset HuggingFace
GET /reports/{filename} Accéder à un rapport HTML généré
"""
from __future__ import annotations
import asyncio
import json
import os
import threading
import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, AsyncIterator, Optional
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import FileResponse, HTMLResponse, StreamingResponse
from pydantic import BaseModel
from picarones import __version__
# ---------------------------------------------------------------------------
# App initialization
# ---------------------------------------------------------------------------
app = FastAPI(
title="Picarones",
description="Plateforme de comparaison de moteurs OCR/HTR pour documents patrimoniaux",
version=__version__,
docs_url="/api/docs",
redoc_url="/api/redoc",
)
# ---------------------------------------------------------------------------
# Job management
# ---------------------------------------------------------------------------
@dataclass
class BenchmarkJob:
job_id: str
status: str = "pending" # pending | running | complete | error | cancelled
progress: float = 0.0 # 0.0 – 1.0
current_engine: str = ""
total_docs: int = 0
processed_docs: int = 0
output_path: str = ""
error: str = ""
started_at: Optional[str] = None
finished_at: Optional[str] = None
events: list[dict] = field(default_factory=list)
_subscribers: list[asyncio.Queue] = field(default_factory=list)
def add_event(self, kind: str, data: Any) -> None:
event = {"kind": kind, "data": data, "ts": _iso_now()}
self.events.append(event)
for q in self._subscribers:
try:
q.put_nowait(event)
except asyncio.QueueFull:
pass
def subscribe(self) -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue(maxsize=200)
self._subscribers.append(q)
return q
def unsubscribe(self, q: asyncio.Queue) -> None:
try:
self._subscribers.remove(q)
except ValueError:
pass
def as_dict(self) -> dict:
return {
"job_id": self.job_id,
"status": self.status,
"progress": self.progress,
"current_engine": self.current_engine,
"total_docs": self.total_docs,
"processed_docs": self.processed_docs,
"output_path": self.output_path,
"error": self.error,
"started_at": self.started_at,
"finished_at": self.finished_at,
}
_JOBS: dict[str, BenchmarkJob] = {}
# ---------------------------------------------------------------------------
# Pydantic models
# ---------------------------------------------------------------------------
class BenchmarkRequest(BaseModel):
corpus_path: str
engines: list[str] = ["tesseract"]
normalization_profile: str = "nfc"
output_dir: str = "./rapports/"
report_name: str = ""
lang: str = "fra"
class HTRUnitedImportRequest(BaseModel):
entry_id: str
output_dir: str = "./corpus/"
max_samples: int = 100
class HuggingFaceImportRequest(BaseModel):
dataset_id: str
output_dir: str = "./corpus/"
split: str = "train"
max_samples: int = 100
# ---------------------------------------------------------------------------
# API — status
# ---------------------------------------------------------------------------
@app.get("/api/status")
async def api_status() -> dict:
return {
"app": "Picarones",
"version": __version__,
"status": "ok",
"timestamp": _iso_now(),
}
# ---------------------------------------------------------------------------
# API — engines
# ---------------------------------------------------------------------------
@app.get("/api/engines")
async def api_engines() -> dict:
engines = []
# Tesseract
tess = _check_engine("tesseract", "pytesseract")
engines.append(tess)
# Pero OCR
pero = _check_engine("pero_ocr", "pero_ocr", label="Pero OCR")
engines.append(pero)
# Kraken
kraken = _check_engine("kraken", "kraken", label="Kraken")
engines.append(kraken)
# Calamari
calamari = _check_engine("calamari", "calamari_ocr", label="Calamari")
engines.append(calamari)
llms = []
# OpenAI
llms.append({
"id": "openai",
"label": "OpenAI (GPT-4o, GPT-4o mini)",
"type": "llm",
"available": bool(os.environ.get("OPENAI_API_KEY")),
"key_env": "OPENAI_API_KEY",
"status": "configured" if os.environ.get("OPENAI_API_KEY") else "missing_key",
})
# Anthropic
llms.append({
"id": "anthropic",
"label": "Anthropic (Claude Sonnet, Haiku)",
"type": "llm",
"available": bool(os.environ.get("ANTHROPIC_API_KEY")),
"key_env": "ANTHROPIC_API_KEY",
"status": "configured" if os.environ.get("ANTHROPIC_API_KEY") else "missing_key",
})
# Mistral
llms.append({
"id": "mistral",
"label": "Mistral (Mistral OCR, Pixtral, Large)",
"type": "llm",
"available": bool(os.environ.get("MISTRAL_API_KEY")),
"key_env": "MISTRAL_API_KEY",
"status": "configured" if os.environ.get("MISTRAL_API_KEY") else "missing_key",
})
# Ollama
ollama_available = _check_ollama()
ollama_models = _list_ollama_models() if ollama_available else []
llms.append({
"id": "ollama",
"label": "Ollama (Llama 3, Gemma, Phi — local)",
"type": "llm_local",
"available": ollama_available,
"status": "running" if ollama_available else "not_running",
"models": ollama_models,
"base_url": "http://localhost:11434",
})
return {"engines": engines, "llms": llms}
def _check_engine(engine_id: str, module_name: str, label: str = "") -> dict:
label = label or engine_id.replace("_", " ").title()
try:
__import__(module_name)
installed = True
except ImportError:
installed = False
version = ""
if installed and engine_id == "tesseract":
try:
import pytesseract
version = pytesseract.get_tesseract_version()
version = str(version)
except Exception:
version = "installé"
elif installed:
try:
mod = __import__(module_name)
version = getattr(mod, "__version__", "installé")
except Exception:
version = "installé"
return {
"id": engine_id,
"label": label,
"type": "ocr",
"available": installed,
"version": version,
"status": "available" if installed else "not_installed",
}
def _check_ollama() -> bool:
import urllib.error, urllib.request
try:
with urllib.request.urlopen("http://localhost:11434/api/tags", timeout=2) as r:
return r.status == 200
except Exception:
return False
def _list_ollama_models() -> list[str]:
import urllib.error, urllib.request
try:
with urllib.request.urlopen("http://localhost:11434/api/tags", timeout=2) as r:
data = json.loads(r.read().decode())
return [m.get("name", "") for m in data.get("models", [])]
except Exception:
return []
# ---------------------------------------------------------------------------
# API — corpus browse
# ---------------------------------------------------------------------------
@app.get("/api/corpus/browse")
async def api_corpus_browse(path: str = Query(default=".", description="Chemin à explorer")) -> dict:
target = Path(path).resolve()
if not target.exists() or not target.is_dir():
raise HTTPException(status_code=404, detail=f"Dossier non trouvé : {path}")
items = []
try:
for entry in sorted(target.iterdir()):
item: dict[str, Any] = {
"name": entry.name,
"path": str(entry),
"is_dir": entry.is_dir(),
}
if entry.is_dir():
# Compter les paires image/gt
gt_count = sum(1 for f in entry.iterdir() if f.suffix == ".txt" and f.stem.endswith(".gt"))
item["gt_count"] = gt_count
item["has_corpus"] = gt_count > 0
items.append(item)
except PermissionError as exc:
raise HTTPException(status_code=403, detail=str(exc))
return {
"current_path": str(target),
"parent_path": str(target.parent) if target.parent != target else None,
"items": items,
}
# ---------------------------------------------------------------------------
# API — normalization profiles
# ---------------------------------------------------------------------------
@app.get("/api/normalization/profiles")
async def api_normalization_profiles() -> dict:
from picarones.core.normalization import get_builtin_profile
profile_ids = [
"nfc",
"caseless",
"minimal",
"medieval_french",
"early_modern_french",
"medieval_latin",
]
profiles = []
for pid in profile_ids:
try:
p = get_builtin_profile(pid)
profiles.append({
"id": pid,
"name": p.name,
"description": p.description or p.name,
"caseless": p.caseless,
"diplomatic_rules": len(p.diplomatic_table),
})
except Exception:
pass
return {"profiles": profiles}
# ---------------------------------------------------------------------------
# API — reports
# ---------------------------------------------------------------------------
@app.get("/api/reports")
async def api_reports(reports_dir: str = Query(default=".", description="Dossier rapports")) -> dict:
target = Path(reports_dir).resolve()
reports = []
search_dirs = [target, Path(".").resolve(), Path("./rapports").resolve()]
seen: set[str] = set()
for d in search_dirs:
if not d.exists():
continue
for f in sorted(d.glob("*.html"), key=lambda x: x.stat().st_mtime, reverse=True):
if str(f) not in seen:
seen.add(str(f))
stat = f.stat()
reports.append({
"filename": f.name,
"path": str(f),
"size_kb": round(stat.st_size / 1024, 1),
"modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(),
"url": f"/reports/{f.name}",
})
return {"reports": reports}
@app.get("/reports/{filename}")
async def serve_report(filename: str) -> FileResponse:
# Cherche dans le répertoire courant et ./rapports/
for d in [Path("."), Path("./rapports")]:
f = d / filename
if f.exists() and f.suffix == ".html":
return FileResponse(str(f.resolve()), media_type="text/html")
raise HTTPException(status_code=404, detail=f"Rapport non trouvé : {filename}")
# ---------------------------------------------------------------------------
# API — HTR-United
# ---------------------------------------------------------------------------
@app.get("/api/htr-united/catalogue")
async def api_htr_united_catalogue(
query: str = Query(default="", description="Recherche textuelle"),
language: str = Query(default="", description="Filtre langue"),
script: str = Query(default="", description="Filtre type d'écriture"),
) -> dict:
from picarones.importers.htr_united import HTRUnitedCatalogue
cat = HTRUnitedCatalogue.from_demo()
results = cat.search(
query=query,
language=language or None,
script=script or None,
)
return {
"source": cat.source,
"total": len(results),
"entries": [e.as_dict() for e in results],
"available_languages": cat.available_languages(),
"available_scripts": cat.available_scripts(),
}
@app.post("/api/htr-united/import")
async def api_htr_united_import(req: HTRUnitedImportRequest) -> dict:
from picarones.importers.htr_united import HTRUnitedCatalogue, import_htr_united_corpus
cat = HTRUnitedCatalogue.from_demo()
entry = cat.get_by_id(req.entry_id)
if not entry:
raise HTTPException(status_code=404, detail=f"Entrée non trouvée : {req.entry_id}")
result = import_htr_united_corpus(
entry=entry,
output_dir=req.output_dir,
max_samples=req.max_samples,
)
return result
# ---------------------------------------------------------------------------
# API — HuggingFace
# ---------------------------------------------------------------------------
@app.get("/api/huggingface/search")
async def api_huggingface_search(
query: str = Query(default="", description="Requête de recherche"),
language: str = Query(default="", description="Filtre langue"),
tags: str = Query(default="", description="Tags séparés par des virgules"),
limit: int = Query(default=20, ge=1, le=50),
) -> dict:
from picarones.importers.huggingface import HuggingFaceImporter
tag_list = [t.strip() for t in tags.split(",") if t.strip()] if tags else None
importer = HuggingFaceImporter()
results = importer.search(
query=query,
tags=tag_list,
language=language or None,
limit=limit,
)
return {
"total": len(results),
"datasets": [ds.as_dict() for ds in results],
}
@app.post("/api/huggingface/import")
async def api_huggingface_import(req: HuggingFaceImportRequest) -> dict:
from picarones.importers.huggingface import HuggingFaceImporter
importer = HuggingFaceImporter()
result = importer.import_dataset(
dataset_id=req.dataset_id,
output_dir=req.output_dir,
split=req.split,
max_samples=req.max_samples,
)
return result
# ---------------------------------------------------------------------------
# API — benchmark
# ---------------------------------------------------------------------------
@app.post("/api/benchmark/start")
async def api_benchmark_start(req: BenchmarkRequest) -> dict:
corpus_path = Path(req.corpus_path)
if not corpus_path.exists() or not corpus_path.is_dir():
raise HTTPException(status_code=400, detail=f"Corpus non trouvé : {req.corpus_path}")
job_id = str(uuid.uuid4())
job = BenchmarkJob(job_id=job_id)
_JOBS[job_id] = job
# Démarrer le benchmark dans un thread séparé
thread = threading.Thread(
target=_run_benchmark_thread,
args=(job, req),
daemon=True,
)
thread.start()
return {"job_id": job_id, "status": "pending"}
@app.get("/api/benchmark/{job_id}/status")
async def api_benchmark_status(job_id: str) -> dict:
job = _JOBS.get(job_id)
if not job:
raise HTTPException(status_code=404, detail=f"Job non trouvé : {job_id}")
return job.as_dict()
@app.post("/api/benchmark/{job_id}/cancel")
async def api_benchmark_cancel(job_id: str) -> dict:
job = _JOBS.get(job_id)
if not job:
raise HTTPException(status_code=404, detail=f"Job non trouvé : {job_id}")
if job.status in ("complete", "error"):
return {"job_id": job_id, "status": job.status, "message": "Job déjà terminé."}
job.status = "cancelled"
job.add_event("cancelled", {"message": "Benchmark annulé par l'utilisateur."})
return {"job_id": job_id, "status": "cancelled"}
@app.get("/api/benchmark/{job_id}/stream")
async def api_benchmark_stream(job_id: str) -> StreamingResponse:
job = _JOBS.get(job_id)
if not job:
raise HTTPException(status_code=404, detail=f"Job non trouvé : {job_id}")
async def event_generator() -> AsyncIterator[str]:
# Envoie d'abord les événements déjà produits
for event in list(job.events):
yield _sse_format(event["kind"], event["data"])
if job.status in ("complete", "error", "cancelled"):
yield _sse_format("done", {"status": job.status})
return
queue = job.subscribe()
try:
while True:
try:
event = await asyncio.wait_for(queue.get(), timeout=30.0)
yield _sse_format(event["kind"], event["data"])
if event["kind"] in ("complete", "error", "cancelled", "done"):
break
except asyncio.TimeoutError:
# Keepalive
yield ": keepalive\n\n"
if job.status in ("complete", "error", "cancelled"):
yield _sse_format("done", {"status": job.status})
break
finally:
job.unsubscribe(queue)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
def _sse_format(event_type: str, data: Any) -> str:
payload = json.dumps(data, ensure_ascii=False)
return f"event: {event_type}\ndata: {payload}\n\n"
def _run_benchmark_thread(job: BenchmarkJob, req: BenchmarkRequest) -> None:
"""Exécute le benchmark dans un thread et envoie des événements SSE."""
import time
job.status = "running"
job.started_at = _iso_now()
job.add_event("start", {"message": "Démarrage du benchmark…", "corpus": req.corpus_path})
try:
from picarones.core.corpus import load_corpus_from_directory
from picarones.core.runner import run_benchmark
# Charger le corpus
job.add_event("log", {"message": f"Chargement du corpus : {req.corpus_path}"})
corpus = load_corpus_from_directory(req.corpus_path)
job.total_docs = len(corpus)
job.add_event("log", {"message": f"{job.total_docs} documents chargés."})
if job.status == "cancelled":
return
# Instancier les moteurs
from picarones.cli import _engine_from_name
import click
ocr_engines = []
for engine_name in req.engines:
try:
eng = _engine_from_name(engine_name, lang=req.lang, psm=6)
ocr_engines.append(eng)
job.add_event("log", {"message": f"Moteur chargé : {engine_name}"})
except (click.BadParameter, Exception) as exc:
job.add_event("warning", {"message": f"Moteur ignoré '{engine_name}' : {exc}"})
if not ocr_engines:
raise ValueError("Aucun moteur valide disponible.")
# Répertoire de sortie
output_dir = Path(req.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
report_name = req.report_name or f"rapport_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
output_json = str(output_dir / f"{report_name}.json")
output_html = str(output_dir / f"{report_name}.html")
# Callback de progression (injecté dans un wrapper)
n_engines = len(ocr_engines)
total_steps = job.total_docs * n_engines
step_counter = [0]
original_engine_names = [e.name for e in ocr_engines]
def _progress_callback(engine_name: str, doc_idx: int, doc_id: str) -> None:
if job.status == "cancelled":
return
step_counter[0] += 1
job.current_engine = engine_name
job.processed_docs = doc_idx
job.progress = step_counter[0] / max(total_steps, 1)
job.add_event("progress", {
"engine": engine_name,
"doc_idx": doc_idx,
"doc_id": doc_id,
"progress": job.progress,
"processed": step_counter[0],
"total": total_steps,
})
# Lancer le benchmark
result = run_benchmark(
corpus=corpus,
engines=ocr_engines,
output_json=output_json,
show_progress=False,
progress_callback=_progress_callback,
)
if job.status == "cancelled":
return
# Générer le rapport HTML
job.add_event("log", {"message": "Génération du rapport HTML…"})
from picarones.report.generator import ReportGenerator
gen = ReportGenerator(result)
gen.generate(output_html)
job.output_path = output_html
job.progress = 1.0
job.status = "complete"
job.finished_at = _iso_now()
# Classement final
ranking = result.ranking()
job.add_event("complete", {
"message": "Benchmark terminé.",
"output_html": output_html,
"output_json": output_json,
"ranking": ranking,
})
except Exception as exc:
job.status = "error"
job.error = str(exc)
job.finished_at = _iso_now()
job.add_event("error", {"message": f"Erreur : {exc}"})
# ---------------------------------------------------------------------------
# Page principale HTML (SPA)
# ---------------------------------------------------------------------------
@app.get("/", response_class=HTMLResponse)
async def index() -> HTMLResponse:
return HTMLResponse(content=_HTML_TEMPLATE)
# ---------------------------------------------------------------------------
# Helper
# ---------------------------------------------------------------------------
def _iso_now() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
# ---------------------------------------------------------------------------
# HTML Template (SPA, French/English, Vanilla JS)
# ---------------------------------------------------------------------------
_HTML_TEMPLATE = r"""
Picarones — OCR Benchmark
Import HTR-United
Catalogue communautaire de corpus HTR/OCR pour documents patrimoniaux.
Import HuggingFace Datasets
Datasets OCR/HTR publics depuis HuggingFace Hub (IAM, RIMES, CATMuS, Gallica…).
"""