Spaces:
Sleeping
Sleeping
| """Sprint S8.7 β couverture des helpers env-var fallback et | |
| dΓ©fense Pillow de ``picarones/interfaces/web/security.py``. | |
| Cible (avant) : 92.18% patch coverage avec 15 lignes manquantes | |
| sur des chemins testables sans mock lourd : | |
| - ``compute_workspace_roots`` avec ``PICARONES_WORKSPACE_ROOTS`` set ; | |
| - ``get_max_upload_mb`` / ``get_max_concurrent_jobs`` / | |
| ``get_rate_limit_per_hour`` sur valeur invalide β fallback log ; | |
| - ``validate_image_safe`` sur ``DecompressionBombError`` (vraie | |
| image bomb simulΓ©e via abaissement temporaire de | |
| ``MAX_IMAGE_PIXELS``) ; | |
| - ``_get_csrf_secret`` génère un secret runtime quand | |
| ``PICARONES_CSRF_SECRET`` absent ; | |
| - ``RateLimiter.check`` purge les hits hors fenΓͺtre. | |
| Tous les tests sont des assertions de comportement rΓ©el β pas | |
| de simple Β« Γ§a ne plante pas Β». | |
| """ | |
| from __future__ import annotations | |
| import io | |
| import os | |
| import time | |
| import pytest | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Env var fallbacks β doivent retourner le default sur valeur invalide | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestEnvVarFallbacks: | |
| def test_max_upload_mb_invalid_returns_default( | |
| self, monkeypatch, caplog, | |
| ) -> None: | |
| from picarones.interfaces.web.security import get_max_upload_mb | |
| monkeypatch.setenv("PICARONES_MAX_UPLOAD_MB", "not-a-number") | |
| with caplog.at_level("WARNING"): | |
| value = get_max_upload_mb() | |
| assert value == 100, "default value not returned on invalid env" | |
| assert any( | |
| "PICARONES_MAX_UPLOAD_MB" in rec.message for rec in caplog.records | |
| ), "warning log not emitted on invalid env" | |
| def test_max_upload_mb_valid_overrides_default( | |
| self, monkeypatch, | |
| ) -> None: | |
| from picarones.interfaces.web.security import get_max_upload_mb | |
| monkeypatch.setenv("PICARONES_MAX_UPLOAD_MB", "250") | |
| assert get_max_upload_mb() == 250 | |
| def test_max_upload_mb_clamped_to_one(self, monkeypatch) -> None: | |
| """Valeur β€ 0 β clampΓ©e Γ 1 (pas un upload de 0 Mo acceptΓ©).""" | |
| from picarones.interfaces.web.security import get_max_upload_mb | |
| monkeypatch.setenv("PICARONES_MAX_UPLOAD_MB", "0") | |
| assert get_max_upload_mb() == 1 | |
| def test_max_concurrent_jobs_invalid_returns_default( | |
| self, monkeypatch, caplog, | |
| ) -> None: | |
| from picarones.interfaces.web.security import get_max_concurrent_jobs | |
| monkeypatch.setenv("PICARONES_MAX_CONCURRENT_JOBS", "abc") | |
| with caplog.at_level("WARNING"): | |
| value = get_max_concurrent_jobs() | |
| assert value == 2 | |
| assert any( | |
| "PICARONES_MAX_CONCURRENT_JOBS" in rec.message | |
| for rec in caplog.records | |
| ) | |
| def test_rate_limit_invalid_in_public_mode_returns_default( | |
| self, monkeypatch, | |
| ) -> None: | |
| from picarones.interfaces.web.security import get_rate_limit_per_hour | |
| monkeypatch.setenv("PICARONES_PUBLIC_MODE", "1") | |
| monkeypatch.setenv("PICARONES_RATE_LIMIT_PER_HOUR", "not-int") | |
| assert get_rate_limit_per_hour() == 5 | |
| def test_rate_limit_dev_mode_returns_zero(self, monkeypatch) -> None: | |
| """Hors mode public, pas de rate limit (0 = illimitΓ©).""" | |
| from picarones.interfaces.web.security import get_rate_limit_per_hour | |
| monkeypatch.delenv("PICARONES_PUBLIC_MODE", raising=False) | |
| assert get_rate_limit_per_hour() == 0 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # compute_workspace_roots avec env var explicite | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestComputeWorkspaceRoots: | |
| def test_env_var_overrides_defaults(self, monkeypatch, tmp_path) -> None: | |
| from picarones.interfaces.web.security import compute_workspace_roots | |
| d1 = tmp_path / "ws1" | |
| d2 = tmp_path / "ws2" | |
| d1.mkdir() | |
| d2.mkdir() | |
| monkeypatch.setenv( | |
| "PICARONES_WORKSPACE_ROOTS", f"{d1}{os.pathsep}{d2}", | |
| ) | |
| roots = compute_workspace_roots(tmp_path / "uploads") | |
| # Les deux paths explicites doivent Γͺtre prΓ©sents et rΓ©solus. | |
| resolved = [r.resolve() for r in roots] | |
| assert d1.resolve() in resolved | |
| assert d2.resolve() in resolved | |
| def test_no_env_var_uses_defaults(self, monkeypatch, tmp_path) -> None: | |
| from picarones.interfaces.web.security import compute_workspace_roots | |
| monkeypatch.delenv("PICARONES_WORKSPACE_ROOTS", raising=False) | |
| uploads = tmp_path / "uploads" | |
| uploads.mkdir() | |
| roots = compute_workspace_roots(uploads) | |
| # Au moins ``uploads`` ou un parent doit Γͺtre inclus. | |
| resolved = [r.resolve() for r in roots] | |
| assert any( | |
| uploads.resolve() == r or uploads.resolve().is_relative_to(r) | |
| for r in resolved | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # validate_image_safe β branche DecompressionBombError | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _tiny_png_bytes() -> bytes: | |
| """Produit un PNG 4Γ4 minimal (assez pour dΓ©clencher la bomb | |
| si ``MAX_IMAGE_PIXELS`` est abaissΓ© Γ 1).""" | |
| from PIL import Image | |
| img = Image.new("RGB", (4, 4), color=(255, 255, 255)) | |
| buf = io.BytesIO() | |
| img.save(buf, format="PNG") | |
| return buf.getvalue() | |
| class TestValidateImageSafe: | |
| def test_decompression_bomb_rejected(self, monkeypatch) -> None: | |
| """Simule une bomb en abaissant ``MAX_IMAGE_PIXELS`` sous la | |
| taille de l'image β Pillow lΓ¨ve alors | |
| ``DecompressionBombError`` que le helper doit transformer | |
| en ``ValueError`` propre.""" | |
| from PIL import Image | |
| from picarones.interfaces.web.security import validate_image_safe | |
| data = _tiny_png_bytes() | |
| monkeypatch.setattr(Image, "MAX_IMAGE_PIXELS", 2) | |
| with pytest.raises(ValueError, match="bombe|dΓ©compression"): | |
| validate_image_safe(data, filename="bomb.png") | |
| def test_size_limit_enforced(self, monkeypatch) -> None: | |
| """Buffer trop gros β rejet sans tenter Pillow.""" | |
| from picarones.interfaces.web.security import validate_image_safe | |
| monkeypatch.setenv("PICARONES_MAX_UPLOAD_MB", "1") | |
| data = b"\x00" * (2 * 1024 * 1024) # 2 MB > 1 MB limit | |
| with pytest.raises(ValueError, match="taille"): | |
| validate_image_safe(data, filename="big.bin") | |
| def test_valid_image_passes(self) -> None: | |
| """ContrΓ΄le positif : image valide β aucune exception.""" | |
| from picarones.interfaces.web.security import validate_image_safe | |
| validate_image_safe(_tiny_png_bytes(), filename="ok.png") # no raise | |
| def test_corrupt_bytes_rejected(self) -> None: | |
| """DonnΓ©es non-image β ``ValueError`` (UnidentifiedImage ou | |
| autre).""" | |
| from picarones.interfaces.web.security import validate_image_safe | |
| with pytest.raises(ValueError): | |
| validate_image_safe(b"not-an-image-at-all", filename="nope.png") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # _get_csrf_secret β fallback runtime | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestCSRFSecretRuntime: | |
| def test_env_var_used_when_set(self, monkeypatch) -> None: | |
| import picarones.interfaces.web.security as sec | |
| monkeypatch.setenv("PICARONES_CSRF_SECRET", "fixed-secret") | |
| # Reset le runtime secret pour s'assurer qu'on prend bien l'env. | |
| monkeypatch.setattr(sec, "_csrf_secret_runtime", None) | |
| secret = sec._get_csrf_secret() | |
| assert secret == b"fixed-secret" | |
| def test_runtime_generated_when_env_absent( | |
| self, monkeypatch, caplog, | |
| ) -> None: | |
| import picarones.interfaces.web.security as sec | |
| monkeypatch.delenv("PICARONES_CSRF_SECRET", raising=False) | |
| monkeypatch.setattr(sec, "_csrf_secret_runtime", None) | |
| with caplog.at_level("WARNING"): | |
| secret1 = sec._get_csrf_secret() | |
| assert isinstance(secret1, bytes) | |
| assert len(secret1) == 32, "secrets.token_bytes(32) attendu" | |
| # Warning Γ©mis pour signaler la config manquante. | |
| assert any( | |
| "PICARONES_CSRF_SECRET" in rec.message for rec in caplog.records | |
| ) | |
| # Appel suivant β mΓͺme secret (persistant durant la vie du process). | |
| secret2 = sec._get_csrf_secret() | |
| assert secret1 == secret2 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # RateLimiter.check β pruning de la fenΓͺtre | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TestRateLimiterPruning: | |
| def test_prunes_expired_hits(self) -> None: | |
| """Un hit > 1h β purgΓ© du bucket Γ l'appel suivant. Couvre | |
| la branche ``while bucket and bucket[0] < cutoff: popleft()``.""" | |
| from collections import deque | |
| from picarones.interfaces.web.security import RateLimiter | |
| rl = RateLimiter(max_per_hour=2) | |
| # Pose un hit ancien (> 3600s) directement dans le bucket | |
| # interne pour simuler le passage du temps sans sleep. | |
| rl._buckets["1.2.3.4"] = deque([time.monotonic() - 7200.0]) | |
| rl.check("1.2.3.4") # ne doit pas lever | |
| # Le hit ancien est purgΓ©, seul le nouveau reste. | |
| assert len(rl._buckets["1.2.3.4"]) == 1, ( | |
| "le hit ancien aurait dΓ» Γͺtre purgΓ©" | |
| ) | |
| def test_quota_exceeded_raises(self) -> None: | |
| from picarones.interfaces.web.security import RateLimiter | |
| rl = RateLimiter(max_per_hour=2) | |
| rl.check("5.6.7.8") | |
| rl.check("5.6.7.8") | |
| with pytest.raises(PermissionError, match="Quota"): | |
| rl.check("5.6.7.8") | |
| def test_disabled_when_max_zero(self) -> None: | |
| """``max_per_hour=0`` β dΓ©sactivΓ©, jamais de PermissionError.""" | |
| from picarones.interfaces.web.security import RateLimiter | |
| rl = RateLimiter(max_per_hour=0) | |
| for _ in range(100): | |
| rl.check("9.9.9.9") # no raise | |