File size: 11,655 Bytes
243a84a
 
 
 
 
 
0c91c9b
243a84a
0c91c9b
 
 
243a84a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0c91c9b
 
 
 
 
 
 
 
243a84a
 
 
 
 
 
 
 
 
0c91c9b
243a84a
 
 
 
 
 
 
 
 
781c660
243a84a
 
781c660
243a84a
 
781c660
243a84a
 
 
781c660
243a84a
 
 
 
 
 
 
 
 
 
781c660
243a84a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
781c660
243a84a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31f753b
 
 
 
d40d01e
31f753b
 
 
 
243a84a
 
 
 
 
 
 
31f753b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
243a84a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31f753b
 
243a84a
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
"""Γ‰tat partagΓ© du serveur web FastAPI β€” singletons et helpers transverses.

Ce module centralise tout ce qui est partagΓ© entre routeurs : la
classe ``BenchmarkJob`` qui modΓ©lise un job en cours, le store SQLite
qui le persiste, le rate limiter, le sΓ©maphore qui borne le nombre
de jobs concurrents, ainsi que les constantes et utilitaires
datetime/HTTP utilisΓ©s Γ  plusieurs endroits.

Discipline : aucun routeur ne doit dΓ©finir ses propres ``iso_now`` /
``enforce_rate_limit`` β€” tous passent par ce module pour garantir
la cohΓ©rence.
"""

from __future__ import annotations

import asyncio
import logging
import threading
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional

from fastapi import HTTPException, Request

from picarones.web.jobs import JobStore, get_default_store
from picarones.web.security import (
    RateLimiter,
    get_max_concurrent_jobs,
    get_rate_limit_per_hour,
)

_logger = logging.getLogger(__name__)


# ──────────────────────────────────────────────────────────────────────────
# Constantes partagΓ©es
# ──────────────────────────────────────────────────────────────────────────

IMAGE_EXTS = frozenset({".jpg", ".jpeg", ".png", ".tif", ".tiff", ".webp"})
"""Extensions d'image acceptΓ©es Γ  l'upload et lors de la validation corpus."""

UPLOADS_DIR = Path("./uploads")
"""Dossier oΓΉ sont stockΓ©s les corpus uploadΓ©s via l'interface web."""

SUPPORTED_LANGS = ("fr", "en")
"""Langues supportΓ©es par l'interface."""

LANG_COOKIE = "picarones_lang"
"""Nom du cookie qui mΓ©morise la langue choisie par l'utilisateur."""


# ──────────────────────────────────────────────────────────────────────────
# Helpers transverses
# ──────────────────────────────────────────────────────────────────────────

def iso_now() -> str:
    """Timestamp ISO 8601 UTC (prΓ©cision seconde)."""
    return datetime.now(timezone.utc).isoformat(timespec="seconds")


def _client_ip(request: Request) -> str:
    """IP client en respectant ``X-Forwarded-For`` derrière un proxy.

    Helper interne au module β€” utilisΓ© uniquement par
    ``enforce_rate_limit``. Pas exposΓ© dans ``__all__`` car aucun
    consommateur externe n'en a besoin (un router qui veut l'IP doit
    appeler ``enforce_rate_limit`` directement).
    """
    fwd = request.headers.get("x-forwarded-for") or ""
    if fwd:
        return fwd.split(",")[0].strip()
    return request.client.host if request.client else "unknown"


def enforce_rate_limit(request: Request) -> None:
    """Applique le rate limit ; lève ``HTTPException 429`` si dépassé."""
    try:
        RATE_LIMITER.check(_client_ip(request))
    except PermissionError as exc:
        raise HTTPException(status_code=429, detail=str(exc))


# ──────────────────────────────────────────────────────────────────────────
# Singletons : rate limiter, sΓ©maphore, job store
# ──────────────────────────────────────────────────────────────────────────

RATE_LIMITER = RateLimiter(max_per_hour=get_rate_limit_per_hour())
"""Rate limiter global (no-op si non public ou quota = 0)."""

JOBS_SEMAPHORE = threading.Semaphore(get_max_concurrent_jobs())
"""SΓ©maphore qui borne le nombre de benchmarks concurrents."""

JOB_STORE: JobStore = get_default_store()
"""Store SQLite singleton injectΓ© dans chaque ``BenchmarkJob``."""


# ──────────────────────────────────────────────────────────────────────────
# Modèle de job (avec persistance SQLite)
# ──────────────────────────────────────────────────────────────────────────

@dataclass
class BenchmarkJob:
    """Job de benchmark en cours d'exΓ©cution.

    Chaque job a un ``job_id`` unique, un statut, une progression et
    un flux d'Γ©vΓ©nements consommΓ© via SSE. La persistance est gΓ©rΓ©e
    par un ``JobStore`` SQLite optionnel β€” si prΓ©sent, chaque
    Γ©vΓ©nement est sΓ©rialisΓ© en base avant d'Γͺtre diffusΓ© aux abonnΓ©s
    SSE, ce qui permet la reprise via ``Last-Event-ID``.
    """

    job_id: str
    status: str = "pending"
    """Un des : ``pending``, ``running``, ``complete``, ``error``,
    ``cancelled``, ``interrupted``."""
    progress: float = 0.0  # 0.0 – 1.0
    current_engine: str = ""
    total_docs: int = 0
    processed_docs: int = 0
    output_path: str = ""
    error: str = ""
    started_at: Optional[str] = None
    finished_at: Optional[str] = None
    events: list[dict] = field(default_factory=list)
    _subscribers: list[asyncio.Queue] = field(default_factory=list)
    _lock: threading.Lock = field(default_factory=threading.Lock)
    _cancel_event: threading.Event = field(default_factory=threading.Event)
    _store: Optional[JobStore] = None
    """Store SQLite optionnel injectΓ© Γ  la crΓ©ation. Si ``None``,
    le job vit uniquement en mΓ©moire."""

    def add_event(self, kind: str, data: Any) -> None:
        """Persiste l'Γ©vΓ©nement dans le store puis le diffuse aux abonnΓ©s SSE.

        L'ordre persistance β†’ diffusion garantit qu'Γ  chaque ``seq``
        rendu visible au client, le snapshot du job en base est
        cohΓ©rent avec ce que vit le client (reprise possible via
        ``Last-Event-ID``).
        """
        seq: Optional[int] = None
        if self._store is not None:
            try:
                seq = self._store.append_event(self.job_id, kind, data)
                self._store.update_progress(
                    self.job_id,
                    progress=self.progress,
                    current_engine=self.current_engine,
                    total_docs=self.total_docs,
                    processed_docs=self.processed_docs,
                    output_path=self.output_path,
                )
            except Exception as exc:  # pragma: no cover β€” dΓ©fense en profondeur
                _logger.warning(
                    "[jobs] persistance d'Γ©vΓ©nement Γ©chouΓ©e pour %s : %s",
                    self.job_id, exc,
                )
        event = {"kind": kind, "data": data, "ts": iso_now(), "seq": seq}
        with self._lock:
            self.events.append(event)
            subscribers = list(self._subscribers)
        for q in subscribers:
            try:
                q.put_nowait(event)
            except asyncio.QueueFull:
                _logger.warning(
                    "[jobs] queue SSE pleine pour job %s β€” Γ©vΓ©nement dΓ©jΓ  persistΓ© seq=%s",
                    self.job_id, seq,
                )

    def set_status(self, status: str, error: str = "") -> None:
        """Met Γ  jour le statut + persiste vers le store."""
        self.status = status
        if error:
            self.error = error
        if status in ("complete", "error", "cancelled", "interrupted"):
            self.finished_at = iso_now()
        if self._store is not None:
            try:
                self._store.set_status(
                    self.job_id, status, error=error or None,
                )
            except Exception as exc:  # pragma: no cover
                _logger.warning(
                    "[jobs] set_status persistΓ© en Γ©chec pour %s : %s",
                    self.job_id, exc,
                )

    def subscribe(self) -> asyncio.Queue:
        q: asyncio.Queue = asyncio.Queue(maxsize=200)
        with self._lock:
            self._subscribers.append(q)
        return q

    def unsubscribe(self, q: asyncio.Queue) -> None:
        with self._lock:
            try:
                self._subscribers.remove(q)
            except ValueError:
                pass

    def as_dict(self) -> dict:
        return {
            "job_id": self.job_id,
            "status": self.status,
            "progress": self.progress,
            "current_engine": self.current_engine,
            "total_docs": self.total_docs,
            "processed_docs": self.processed_docs,
            "output_path": self.output_path,
            "error": self.error,
            "started_at": self.started_at,
            "finished_at": self.finished_at,
        }


# ──────────────────────────────────────────────────────────────────────────
# Registre en mΓ©moire des jobs actifs
# ──────────────────────────────────────────────────────────────────────────

JOBS: dict[str, BenchmarkJob] = {}
"""Registre en mΓ©moire des jobs (par ``job_id``).

**Discipline d'accès** : tous les ``read`` et ``write`` doivent passer
par les helpers ``register_job``, ``get_job_in_memory``,
``cleanup_old_jobs`` qui prennent ``JOBS_LOCK``. Lire ou muter ce dict
sans verrou expose Γ  un ``RuntimeError: dictionary changed size
during iteration`` sous charge concurrente (le GIL protège l'atomicité
d'une opΓ©ration mais pas la cohΓ©rence d'une boucle).
"""

JOBS_MAX = 100
"""Nombre maximum de jobs conservΓ©s en mΓ©moire avant nettoyage."""

JOBS_LOCK = threading.Lock()


def register_job(job: BenchmarkJob) -> None:
    """Enregistre ``job`` dans le registre mΓ©moire (thread-safe)."""
    with JOBS_LOCK:
        JOBS[job.job_id] = job


def get_job_in_memory(job_id: str) -> Optional[BenchmarkJob]:
    """Récupère un ``BenchmarkJob`` du registre mémoire (thread-safe).

    Retourne ``None`` si le job n'est pas (ou plus) en RAM. Les
    consommateurs qui veulent un fallback DB doivent appeler
    ``JOB_STORE.get_job(job_id)`` sΓ©parΓ©ment.
    """
    with JOBS_LOCK:
        return JOBS.get(job_id)


def cleanup_old_jobs() -> None:
    """Supprime les jobs terminΓ©s les plus anciens si on dΓ©passe ``JOBS_MAX``."""
    with JOBS_LOCK:
        if len(JOBS) <= JOBS_MAX:
            return
        finished = [
            (jid, j) for jid, j in JOBS.items()
            if j.status in ("complete", "error", "cancelled")
        ]
        finished.sort(key=lambda x: x[1].finished_at or "")
        to_remove = len(JOBS) - JOBS_MAX
        for jid, _ in finished[:to_remove]:
            del JOBS[jid]


__all__ = [
    "IMAGE_EXTS",
    "UPLOADS_DIR",
    "SUPPORTED_LANGS",
    "LANG_COOKIE",
    "iso_now",
    "enforce_rate_limit",
    "RATE_LIMITER",
    "JOBS_SEMAPHORE",
    "JOB_STORE",
    "BenchmarkJob",
    "JOBS",
    "JOBS_MAX",
    "JOBS_LOCK",
    "register_job",
    "get_job_in_memory",
    "cleanup_old_jobs",
]