Spaces:
Sleeping
Sleeping
File size: 11,655 Bytes
243a84a 0c91c9b 243a84a 0c91c9b 243a84a 0c91c9b 243a84a 0c91c9b 243a84a 781c660 243a84a 781c660 243a84a 781c660 243a84a 781c660 243a84a 781c660 243a84a 781c660 243a84a 31f753b d40d01e 31f753b 243a84a 31f753b 243a84a 31f753b 243a84a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 | """Γtat partagΓ© du serveur web FastAPI β singletons et helpers transverses.
Ce module centralise tout ce qui est partagΓ© entre routeurs : la
classe ``BenchmarkJob`` qui modΓ©lise un job en cours, le store SQLite
qui le persiste, le rate limiter, le sΓ©maphore qui borne le nombre
de jobs concurrents, ainsi que les constantes et utilitaires
datetime/HTTP utilisΓ©s Γ plusieurs endroits.
Discipline : aucun routeur ne doit dΓ©finir ses propres ``iso_now`` /
``enforce_rate_limit`` β tous passent par ce module pour garantir
la cohΓ©rence.
"""
from __future__ import annotations
import asyncio
import logging
import threading
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
from fastapi import HTTPException, Request
from picarones.web.jobs import JobStore, get_default_store
from picarones.web.security import (
RateLimiter,
get_max_concurrent_jobs,
get_rate_limit_per_hour,
)
_logger = logging.getLogger(__name__)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Constantes partagΓ©es
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
IMAGE_EXTS = frozenset({".jpg", ".jpeg", ".png", ".tif", ".tiff", ".webp"})
"""Extensions d'image acceptΓ©es Γ l'upload et lors de la validation corpus."""
UPLOADS_DIR = Path("./uploads")
"""Dossier oΓΉ sont stockΓ©s les corpus uploadΓ©s via l'interface web."""
SUPPORTED_LANGS = ("fr", "en")
"""Langues supportΓ©es par l'interface."""
LANG_COOKIE = "picarones_lang"
"""Nom du cookie qui mΓ©morise la langue choisie par l'utilisateur."""
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Helpers transverses
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def iso_now() -> str:
"""Timestamp ISO 8601 UTC (prΓ©cision seconde)."""
return datetime.now(timezone.utc).isoformat(timespec="seconds")
def _client_ip(request: Request) -> str:
"""IP client en respectant ``X-Forwarded-For`` derrière un proxy.
Helper interne au module β utilisΓ© uniquement par
``enforce_rate_limit``. Pas exposΓ© dans ``__all__`` car aucun
consommateur externe n'en a besoin (un router qui veut l'IP doit
appeler ``enforce_rate_limit`` directement).
"""
fwd = request.headers.get("x-forwarded-for") or ""
if fwd:
return fwd.split(",")[0].strip()
return request.client.host if request.client else "unknown"
def enforce_rate_limit(request: Request) -> None:
"""Applique le rate limit ; lève ``HTTPException 429`` si dépassé."""
try:
RATE_LIMITER.check(_client_ip(request))
except PermissionError as exc:
raise HTTPException(status_code=429, detail=str(exc))
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Singletons : rate limiter, sΓ©maphore, job store
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
RATE_LIMITER = RateLimiter(max_per_hour=get_rate_limit_per_hour())
"""Rate limiter global (no-op si non public ou quota = 0)."""
JOBS_SEMAPHORE = threading.Semaphore(get_max_concurrent_jobs())
"""SΓ©maphore qui borne le nombre de benchmarks concurrents."""
JOB_STORE: JobStore = get_default_store()
"""Store SQLite singleton injectΓ© dans chaque ``BenchmarkJob``."""
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Modèle de job (avec persistance SQLite)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@dataclass
class BenchmarkJob:
"""Job de benchmark en cours d'exΓ©cution.
Chaque job a un ``job_id`` unique, un statut, une progression et
un flux d'Γ©vΓ©nements consommΓ© via SSE. La persistance est gΓ©rΓ©e
par un ``JobStore`` SQLite optionnel β si prΓ©sent, chaque
Γ©vΓ©nement est sΓ©rialisΓ© en base avant d'Γͺtre diffusΓ© aux abonnΓ©s
SSE, ce qui permet la reprise via ``Last-Event-ID``.
"""
job_id: str
status: str = "pending"
"""Un des : ``pending``, ``running``, ``complete``, ``error``,
``cancelled``, ``interrupted``."""
progress: float = 0.0 # 0.0 β 1.0
current_engine: str = ""
total_docs: int = 0
processed_docs: int = 0
output_path: str = ""
error: str = ""
started_at: Optional[str] = None
finished_at: Optional[str] = None
events: list[dict] = field(default_factory=list)
_subscribers: list[asyncio.Queue] = field(default_factory=list)
_lock: threading.Lock = field(default_factory=threading.Lock)
_cancel_event: threading.Event = field(default_factory=threading.Event)
_store: Optional[JobStore] = None
"""Store SQLite optionnel injectΓ© Γ la crΓ©ation. Si ``None``,
le job vit uniquement en mΓ©moire."""
def add_event(self, kind: str, data: Any) -> None:
"""Persiste l'Γ©vΓ©nement dans le store puis le diffuse aux abonnΓ©s SSE.
L'ordre persistance β diffusion garantit qu'Γ chaque ``seq``
rendu visible au client, le snapshot du job en base est
cohΓ©rent avec ce que vit le client (reprise possible via
``Last-Event-ID``).
"""
seq: Optional[int] = None
if self._store is not None:
try:
seq = self._store.append_event(self.job_id, kind, data)
self._store.update_progress(
self.job_id,
progress=self.progress,
current_engine=self.current_engine,
total_docs=self.total_docs,
processed_docs=self.processed_docs,
output_path=self.output_path,
)
except Exception as exc: # pragma: no cover β dΓ©fense en profondeur
_logger.warning(
"[jobs] persistance d'Γ©vΓ©nement Γ©chouΓ©e pour %s : %s",
self.job_id, exc,
)
event = {"kind": kind, "data": data, "ts": iso_now(), "seq": seq}
with self._lock:
self.events.append(event)
subscribers = list(self._subscribers)
for q in subscribers:
try:
q.put_nowait(event)
except asyncio.QueueFull:
_logger.warning(
"[jobs] queue SSE pleine pour job %s β Γ©vΓ©nement dΓ©jΓ persistΓ© seq=%s",
self.job_id, seq,
)
def set_status(self, status: str, error: str = "") -> None:
"""Met Γ jour le statut + persiste vers le store."""
self.status = status
if error:
self.error = error
if status in ("complete", "error", "cancelled", "interrupted"):
self.finished_at = iso_now()
if self._store is not None:
try:
self._store.set_status(
self.job_id, status, error=error or None,
)
except Exception as exc: # pragma: no cover
_logger.warning(
"[jobs] set_status persistΓ© en Γ©chec pour %s : %s",
self.job_id, exc,
)
def subscribe(self) -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue(maxsize=200)
with self._lock:
self._subscribers.append(q)
return q
def unsubscribe(self, q: asyncio.Queue) -> None:
with self._lock:
try:
self._subscribers.remove(q)
except ValueError:
pass
def as_dict(self) -> dict:
return {
"job_id": self.job_id,
"status": self.status,
"progress": self.progress,
"current_engine": self.current_engine,
"total_docs": self.total_docs,
"processed_docs": self.processed_docs,
"output_path": self.output_path,
"error": self.error,
"started_at": self.started_at,
"finished_at": self.finished_at,
}
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Registre en mΓ©moire des jobs actifs
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
JOBS: dict[str, BenchmarkJob] = {}
"""Registre en mΓ©moire des jobs (par ``job_id``).
**Discipline d'accès** : tous les ``read`` et ``write`` doivent passer
par les helpers ``register_job``, ``get_job_in_memory``,
``cleanup_old_jobs`` qui prennent ``JOBS_LOCK``. Lire ou muter ce dict
sans verrou expose Γ un ``RuntimeError: dictionary changed size
during iteration`` sous charge concurrente (le GIL protège l'atomicité
d'une opΓ©ration mais pas la cohΓ©rence d'une boucle).
"""
JOBS_MAX = 100
"""Nombre maximum de jobs conservΓ©s en mΓ©moire avant nettoyage."""
JOBS_LOCK = threading.Lock()
def register_job(job: BenchmarkJob) -> None:
"""Enregistre ``job`` dans le registre mΓ©moire (thread-safe)."""
with JOBS_LOCK:
JOBS[job.job_id] = job
def get_job_in_memory(job_id: str) -> Optional[BenchmarkJob]:
"""Récupère un ``BenchmarkJob`` du registre mémoire (thread-safe).
Retourne ``None`` si le job n'est pas (ou plus) en RAM. Les
consommateurs qui veulent un fallback DB doivent appeler
``JOB_STORE.get_job(job_id)`` sΓ©parΓ©ment.
"""
with JOBS_LOCK:
return JOBS.get(job_id)
def cleanup_old_jobs() -> None:
"""Supprime les jobs terminΓ©s les plus anciens si on dΓ©passe ``JOBS_MAX``."""
with JOBS_LOCK:
if len(JOBS) <= JOBS_MAX:
return
finished = [
(jid, j) for jid, j in JOBS.items()
if j.status in ("complete", "error", "cancelled")
]
finished.sort(key=lambda x: x[1].finished_at or "")
to_remove = len(JOBS) - JOBS_MAX
for jid, _ in finished[:to_remove]:
del JOBS[jid]
__all__ = [
"IMAGE_EXTS",
"UPLOADS_DIR",
"SUPPORTED_LANGS",
"LANG_COOKIE",
"iso_now",
"enforce_rate_limit",
"RATE_LIMITER",
"JOBS_SEMAPHORE",
"JOB_STORE",
"BenchmarkJob",
"JOBS",
"JOBS_MAX",
"JOBS_LOCK",
"register_job",
"get_job_in_memory",
"cleanup_old_jobs",
]
|