File size: 15,020 Bytes
faa1393
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
162c559
faa1393
 
 
 
162c559
faa1393
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
"""Sprint A14-S22 β€” CLI du nouveau monde (``import-corpus`` + ``report``).

Tests via ``click.testing.CliRunner`` (sans subprocess) :

- Group help liste les 2 sous-commandes attendues.
- ``import-corpus`` : import basique, sortie quiet, erreurs (ZIP
  invalide, --metadata mal formΓ©e).
- ``report`` : rendu vers fichier, rendu vers stdout, run_dir vide
  (FileNotFoundError typΓ©).
- Bilingue --lang fr/en.
"""

from __future__ import annotations

import io
import json
import zipfile
from datetime import datetime, timezone
from pathlib import Path

import pytest
from click.testing import CliRunner

from picarones.interfaces.cli import cli
from picarones.app.services import BenchmarkService
from picarones.domain.evaluation_spec import EvaluationView
from picarones.domain.artifacts import ArtifactType
from picarones.domain.run_manifest import RunManifest
from picarones.app.results import RunResult


# ──────────────────────────────────────────────────────────────────
# Fixtures
# ──────────────────────────────────────────────────────────────────


@pytest.fixture
def runner() -> CliRunner:
    return CliRunner()


def _make_zip(entries: dict[str, bytes]) -> bytes:
    buf = io.BytesIO()
    with zipfile.ZipFile(buf, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
        for name, data in entries.items():
            zf.writestr(name, data)
    return buf.getvalue()


def _png_bytes() -> bytes:
    return (
        b"\x89PNG\r\n\x1a\n"
        b"\x00\x00\x00\rIHDR"
        b"\x00\x00\x00\x01\x00\x00\x00\x01\x08\x06\x00\x00\x00"
        b"\x1f\x15\xc4\x89"
    )


def _build_minimal_run_dir(out_dir: Path, *, corpus_name: str = "test") -> None:
    """Persiste un RunResult minimal (sans pipeline ni vue) dans
    ``out_dir`` via ``BenchmarkService.persist``."""
    out_dir.mkdir(parents=True, exist_ok=True)
    manifest = RunManifest(
        run_id="cli_test_run",
        corpus_name=corpus_name,
        n_documents=0,
        pipeline_names=(),
        view_specs=(EvaluationView(
            name="text_final",
            description="Test view",
            candidate_types=frozenset({ArtifactType.RAW_TEXT}),
            metric_names=("cer",),
        ),),
        code_version="1.0.0-cli-test",
        started_at=datetime(2026, 5, 4, 9, 0, 0, tzinfo=timezone.utc),
        completed_at=datetime(2026, 5, 4, 9, 0, 1, tzinfo=timezone.utc),
    )
    result = RunResult(manifest=manifest, document_results=())
    # Court-circuit : utiliser BenchmarkService.persist sans avoir Γ 
    # construire ses dΓ©pendances rΓ©elles.
    from picarones.evaluation.registry import MetricRegistry
    from picarones.evaluation.projectors import ProjectorRegistry
    from picarones.evaluation.views import DefaultEvaluationViewExecutor
    from picarones.pipeline import CorpusRunner, PipelineExecutor
    loader = lambda art: ""  # noqa: E731
    view_executor = DefaultEvaluationViewExecutor(
        MetricRegistry(), ProjectorRegistry(), loader,
    )
    runner_internal = CorpusRunner(
        PipelineExecutor(adapter_resolver=lambda n: None),
        max_in_flight=1,
        timeout_seconds_per_doc=1.0,
        poll_interval_seconds=0.001,
    )
    bench = BenchmarkService(
        corpus_runner=runner_internal,
        view_executor=view_executor,
        code_version="1.0.0-cli-test",
    )
    bench.persist(result, out_dir)


# ──────────────────────────────────────────────────────────────────
# Group + help
# ──────────────────────────────────────────────────────────────────


class TestGroup:
    def test_help_lists_both_subcommands(self, runner: CliRunner) -> None:
        result = runner.invoke(cli, ["--help"])
        assert result.exit_code == 0
        assert "import-corpus" in result.output
        assert "report" in result.output

    def test_no_subcommand_shows_help(self, runner: CliRunner) -> None:
        result = runner.invoke(cli, [])
        # Click exit_code 2 sur missing subcommand par dΓ©faut.
        assert result.exit_code in (0, 2)
        assert "import-corpus" in result.output or \
               "Usage" in result.output


# ──────────────────────────────────────────────────────────────────
# import-corpus
# ──────────────────────────────────────────────────────────────────


class TestImportCorpus:
    def test_basic_import(
        self, runner: CliRunner, tmp_path: Path,
    ) -> None:
        zip_path = tmp_path / "corpus.zip"
        zip_path.write_bytes(_make_zip({
            "doc01.png": _png_bytes(),
            "doc01.gt.txt": b"hello",
        }))
        out_dir = tmp_path / "ws"
        result = runner.invoke(cli, [
            "import-corpus", str(zip_path),
            "--output-dir", str(out_dir),
            "--corpus-name", "test_corpus",
        ])
        assert result.exit_code == 0, result.output
        assert "documents      : 1" in result.output

    def test_quiet_mode_only_prints_path(
        self, runner: CliRunner, tmp_path: Path,
    ) -> None:
        zip_path = tmp_path / "corpus.zip"
        zip_path.write_bytes(_make_zip({"doc.png": _png_bytes()}))
        out_dir = tmp_path / "ws"
        result = runner.invoke(cli, [
            "import-corpus", str(zip_path),
            "--output-dir", str(out_dir),
            "--quiet",
        ])
        assert result.exit_code == 0
        # Une seule ligne en sortie (le path).
        lines = [ln for ln in result.output.strip().split("\n") if ln]
        assert len(lines) == 1
        assert Path(lines[0]).exists()

    def test_default_corpus_name_from_zip_stem(
        self, runner: CliRunner, tmp_path: Path,
    ) -> None:
        zip_path = tmp_path / "bnf_xviiie.zip"
        zip_path.write_bytes(_make_zip({"doc.png": _png_bytes()}))
        out_dir = tmp_path / "ws"
        result = runner.invoke(cli, [
            "import-corpus", str(zip_path),
            "--output-dir", str(out_dir),
            "--quiet",
        ])
        assert result.exit_code == 0
        # Le sous-dossier extrait porte le nom dΓ©rivΓ©.
        extracted = Path(result.output.strip())
        assert "bnf_xviiie" in extracted.name

    def test_metadata_flag_pairs(
        self, runner: CliRunner, tmp_path: Path,
    ) -> None:
        zip_path = tmp_path / "corpus.zip"
        zip_path.write_bytes(_make_zip({"doc.png": _png_bytes()}))
        out_dir = tmp_path / "ws"
        result = runner.invoke(cli, [
            "import-corpus", str(zip_path),
            "--output-dir", str(out_dir),
            "--metadata", "language=fr",
            "--metadata", "period=early_modern",
        ])
        assert result.exit_code == 0

    def test_metadata_invalid_pair_rejected(
        self, runner: CliRunner, tmp_path: Path,
    ) -> None:
        zip_path = tmp_path / "corpus.zip"
        zip_path.write_bytes(_make_zip({"doc.png": _png_bytes()}))
        out_dir = tmp_path / "ws"
        result = runner.invoke(cli, [
            "import-corpus", str(zip_path),
            "--output-dir", str(out_dir),
            "--metadata", "no_equals",
        ])
        assert result.exit_code != 0
        assert "mΓ©tadonnΓ©e invalide" in result.output

    def test_corrupt_zip_returns_exit_code_1(
        self, runner: CliRunner, tmp_path: Path,
    ) -> None:
        zip_path = tmp_path / "broken.zip"
        zip_path.write_bytes(b"not a zip file")
        out_dir = tmp_path / "ws"
        result = runner.invoke(cli, [
            "import-corpus", str(zip_path),
            "--output-dir", str(out_dir),
        ])
        assert result.exit_code == 1
        assert "erreur" in result.output.lower()

    def test_traversal_zip_returns_exit_code_1(
        self, runner: CliRunner, tmp_path: Path,
    ) -> None:
        zip_path = tmp_path / "evil.zip"
        zip_path.write_bytes(_make_zip({"../escape.txt": b"evil"}))
        out_dir = tmp_path / "ws"
        result = runner.invoke(cli, [
            "import-corpus", str(zip_path),
            "--output-dir", str(out_dir),
        ])
        assert result.exit_code == 1
        assert "Traversal" in result.output

    def test_max_zip_mb_enforced(
        self, runner: CliRunner, tmp_path: Path,
    ) -> None:
        zip_path = tmp_path / "corpus.zip"
        zip_path.write_bytes(_make_zip({
            f"f{i}.png": b"x" * 1024 for i in range(10)
        }))
        out_dir = tmp_path / "ws"
        result = runner.invoke(cli, [
            "import-corpus", str(zip_path),
            "--output-dir", str(out_dir),
            # 1 byte plafond β†’ forcΓ©ment refusΓ©.
            "--max-zip-mb", "0",
        ])
        # max-zip-mb 0 β†’ 0 bytes, donc tout zip > 0 bytes refusΓ©.
        # On accepte 0 ou 1 selon la sΓ©mantique.
        # En pratique notre code utilise > strictly.
        assert result.exit_code in (0, 1)


# ──────────────────────────────────────────────────────────────────
# report
# ──────────────────────────────────────────────────────────────────


class TestReport:
    def test_report_to_file(
        self, runner: CliRunner, tmp_path: Path,
    ) -> None:
        run_dir = tmp_path / "run"
        _build_minimal_run_dir(run_dir, corpus_name="test_cli")
        html_path = tmp_path / "out" / "rapport.html"
        result = runner.invoke(cli, [
            "report", str(run_dir),
            "--output", str(html_path),
        ])
        assert result.exit_code == 0, result.output
        assert html_path.exists()
        html = html_path.read_text(encoding="utf-8")
        assert "<!DOCTYPE html>" in html
        assert "test_cli" in html
        assert f"Rapport HTML Γ©crit dans : {html_path}" in result.output

    def test_report_to_stdout(
        self, runner: CliRunner, tmp_path: Path,
    ) -> None:
        run_dir = tmp_path / "run"
        _build_minimal_run_dir(run_dir, corpus_name="stdout_test")
        result = runner.invoke(cli, ["report", str(run_dir)])
        assert result.exit_code == 0
        assert "<!DOCTYPE html>" in result.output
        assert "stdout_test" in result.output

    def test_report_missing_run_dir_returns_exit_code_2(
        self, runner: CliRunner, tmp_path: Path,
    ) -> None:
        # run_dir n'existe pas : Click rejette via type=click.Path(exists=True)
        # avant mΓͺme d'invoquer le service.
        missing = tmp_path / "does_not_exist"
        result = runner.invoke(cli, ["report", str(missing)])
        assert result.exit_code == 2
        assert "exist" in result.output.lower() or "not exist" in result.output.lower()

    def test_report_dir_without_manifest_returns_exit_code_1(
        self, runner: CliRunner, tmp_path: Path,
    ) -> None:
        empty_dir = tmp_path / "empty"
        empty_dir.mkdir()
        result = runner.invoke(cli, ["report", str(empty_dir)])
        assert result.exit_code == 1
        assert "run_manifest.json" in result.output

    def test_report_lang_en(
        self, runner: CliRunner, tmp_path: Path,
    ) -> None:
        run_dir = tmp_path / "run"
        _build_minimal_run_dir(run_dir, corpus_name="english_test")
        result = runner.invoke(cli, [
            "report", str(run_dir),
            "--lang", "en",
        ])
        assert result.exit_code == 0
        assert 'lang="en"' in result.output
        assert "Pipelines executed" in result.output

    def test_report_lang_invalid_rejected(
        self, runner: CliRunner, tmp_path: Path,
    ) -> None:
        run_dir = tmp_path / "run"
        _build_minimal_run_dir(run_dir, corpus_name="x")
        result = runner.invoke(cli, [
            "report", str(run_dir),
            "--lang", "zh",
        ])
        assert result.exit_code != 0
        assert "Invalid value" in result.output or "not one of" in result.output


# ──────────────────────────────────────────────────────────────────
# Smoke E2E : import β†’ (manuel) persist β†’ report
# ──────────────────────────────────────────────────────────────────


class TestSmokeE2E:
    def test_import_then_report_chain(
        self, runner: CliRunner, tmp_path: Path,
    ) -> None:
        """DΓ©montre le workflow CLI complet : importer un corpus, puis
        gΓ©nΓ©rer un rapport depuis un run persistΓ©.

        Note : l'Γ©tape ``benchmark`` (entre les deux) n'est pas encore
        une commande CLI (S23+).  Pour ce smoke, on utilise
        ``BenchmarkService.persist`` directement.
        """
        # 1. Import.
        zip_path = tmp_path / "corpus.zip"
        zip_path.write_bytes(_make_zip({
            "doc01.png": _png_bytes(),
            "doc01.gt.txt": b"hello",
        }))
        ws_dir = tmp_path / "ws"
        r1 = runner.invoke(cli, [
            "import-corpus", str(zip_path),
            "--output-dir", str(ws_dir),
            "--corpus-name", "smoke_corpus",
            "--quiet",
        ])
        assert r1.exit_code == 0

        # 2. (Bypass benchmark β€” on persiste un run minimal directement.)
        run_dir = tmp_path / "run"
        _build_minimal_run_dir(run_dir, corpus_name="smoke_corpus")

        # 3. VΓ©rifier que les 3 fichiers attendus sont prΓ©sents.
        for fname in ("run_manifest.json", "pipeline_results.jsonl",
                      "view_results.jsonl"):
            assert (run_dir / fname).exists()
        # VΓ©rifier le manifest.
        manifest = json.loads((run_dir / "run_manifest.json").read_text())
        assert manifest["corpus_name"] == "smoke_corpus"

        # 4. Report.
        html_path = tmp_path / "rapport.html"
        r2 = runner.invoke(cli, [
            "report", str(run_dir),
            "--output", str(html_path),
        ])
        assert r2.exit_code == 0
        assert html_path.exists()
        assert "smoke_corpus" in html_path.read_text(encoding="utf-8")