File size: 7,073 Bytes
a2e06b3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
"""
post_redaction_pass1_qa.py
==========================
Optional Pass 1 sanity QA at the end of initial redaction (pre-review-apply).

Writes a coverage JSON report and optionally a sibling pruned review CSV.
Does not run VLM or call /review_apply.
"""

from __future__ import annotations

import json
from pathlib import Path
from typing import Any

import pandas as pd

from tools.config import (
    POST_REDACT_PASS1_AUTO_PRUNE,
    POST_REDACT_PASS1_INCLUDE_IN_OUTPUTS,
    POST_REDACT_PASS1_MIN_WORD_LENGTH,
    POST_REDACT_PASS1_MUST_NOT_REDACT_PATH,
    POST_REDACT_PASS1_MUST_REDACT_PATH,
    POST_REDACT_PASS1_QA,
    POST_REDACT_PASS1_USE_DENY_ALLOW_LISTS,
)
from tools.verify_redaction_coverage import (
    prune_suspicious_review_csv,
    verify_redaction_coverage,
)


def load_regex_patterns_from_csv(path: str | Path) -> list[str]:
    """Load regex patterns from column 0 of a CSV (same shape as deny/allow list files)."""
    p = Path(path)
    if not p.is_file():
        return []
    df = pd.read_csv(p, header=None, low_memory=False)
    if df.empty:
        return []
    return [str(x).strip() for x in df.iloc[:, 0].dropna().tolist() if str(x).strip()]


def merge_policy_patterns(
    deny_list: list[str] | None,
    allow_list: list[str] | None,
    *,
    must_redact_path: str = "",
    must_not_redact_path: str = "",
    use_deny_allow_lists: bool = True,
) -> tuple[list[str], list[str]]:
    """Build must_redact / must_not_redact regex lists from run lists and env CSV paths."""
    must_redact: list[str] = []
    must_not: list[str] = []

    if use_deny_allow_lists:
        if deny_list:
            must_redact.extend(str(x).strip() for x in deny_list if str(x).strip())
        if allow_list:
            must_not.extend(str(x).strip() for x in allow_list if str(x).strip())

    env_must = must_redact_path or POST_REDACT_PASS1_MUST_REDACT_PATH
    env_must_not = must_not_redact_path or POST_REDACT_PASS1_MUST_NOT_REDACT_PATH
    if env_must:
        must_redact.extend(load_regex_patterns_from_csv(env_must))
    if env_must_not:
        must_not.extend(load_regex_patterns_from_csv(env_must_not))

    return must_redact, must_not


def _pruned_review_csv_path(review_csv_path: str | Path) -> Path:
    p = Path(review_csv_path)
    return p.with_name(f"{p.stem}_pruned.csv")


def _coverage_report_path(review_csv_path: str | Path) -> Path:
    p = Path(review_csv_path)
    return p.with_name(f"{p.stem}_coverage_report.json")


def build_qa_summary(report: dict[str, Any]) -> str:
    """Human-readable summary for combined_out_message."""
    summary = report.get("summary") or {}
    n_vlm = len(summary.get("pages_flagged_for_vlm") or [])
    n_cleanup = len(summary.get("pages_needing_csv_cleanup") or [])
    return (
        "Pass 1 QA: "
        f"pass_strict={report.get('pass_strict', report.get('pass'))}, "
        f"pass_with_cleanup={report.get('pass_with_cleanup')}, "
        f"pages_flagged_for_vlm={n_vlm}, "
        f"pages_needing_csv_cleanup={n_cleanup}."
    )


def run_post_redaction_pass1_qa(
    *,
    review_csv_path: str | Path,
    ocr_words_csv_path: str | Path,
    output_folder: str | None = None,
    total_pages: int | None = None,
    must_redact: list[str] | None = None,
    must_not_redact: list[str] | None = None,
    deny_list: list[str] | None = None,
    allow_list: list[str] | None = None,
    auto_prune: bool | None = None,
    min_word_length: int | None = None,
    enabled: bool | None = None,
    use_deny_allow_lists: bool | None = None,
    include_in_outputs: bool | None = None,
) -> dict[str, Any]:
    """
    Run post-redaction Pass 1 QA on initial review CSV + word OCR.

    Returns dict with keys: enabled, paths_created, report, summary, prune_log.
    """
    use_enabled = POST_REDACT_PASS1_QA if enabled is None else bool(enabled)
    if not use_enabled:
        return {
            "enabled": False,
            "paths_created": [],
            "report": None,
            "summary": "",
            "prune_log": None,
        }

    review_path = Path(review_csv_path)
    ocr_path = Path(ocr_words_csv_path)
    if not review_path.is_file():
        print("Post-redaction Pass 1 QA skipped: review CSV not found.")
        return {
            "enabled": True,
            "paths_created": [],
            "report": None,
            "summary": "",
            "prune_log": None,
            "error": "review_csv_missing",
        }
    if not ocr_path.is_file():
        print("Post-redaction Pass 1 QA skipped: OCR words CSV not found.")
        return {
            "enabled": True,
            "paths_created": [],
            "report": None,
            "summary": "",
            "prune_log": None,
            "error": "ocr_words_csv_missing",
        }

    if must_redact is None or must_not_redact is None:
        merged_must, merged_must_not = merge_policy_patterns(
            deny_list,
            allow_list,
            use_deny_allow_lists=(
                POST_REDACT_PASS1_USE_DENY_ALLOW_LISTS
                if use_deny_allow_lists is None
                else use_deny_allow_lists
            ),
        )
        if must_redact is None:
            must_redact = merged_must
        if must_not_redact is None:
            must_not_redact = merged_must_not

    min_wl = (
        POST_REDACT_PASS1_MIN_WORD_LENGTH
        if min_word_length is None
        else min_word_length
    )
    do_prune = POST_REDACT_PASS1_AUTO_PRUNE if auto_prune is None else bool(auto_prune)

    paths_created: list[str] = []
    prune_log: dict[str, Any] | None = None
    csv_for_report = review_path

    if do_prune:
        pruned_path = _pruned_review_csv_path(review_path)
        prune_log = prune_suspicious_review_csv(
            review_path,
            pruned_path,
            must_redact=must_redact,
            min_word_length=min_wl,
        )
        csv_for_report = pruned_path
        paths_created.append(str(pruned_path))

    report_obj = verify_redaction_coverage(
        csv_for_report,
        ocr_path,
        must_redact=must_redact,
        must_not_redact=must_not_redact,
        total_pages=total_pages,
        min_word_length=min_wl,
    )
    report = report_obj.to_dict()

    report_path = _coverage_report_path(review_path)
    if output_folder:
        report_path = Path(output_folder) / report_path.name
    report_path.parent.mkdir(parents=True, exist_ok=True)
    report_path.write_text(json.dumps(report, indent=2), encoding="utf-8")
    paths_created.append(str(report_path))

    include = (
        POST_REDACT_PASS1_INCLUDE_IN_OUTPUTS
        if include_in_outputs is None
        else include_in_outputs
    )
    if not include:
        paths_created = []

    summary = build_qa_summary(report)
    print(summary)

    return {
        "enabled": True,
        "paths_created": paths_created,
        "report": report,
        "summary": summary,
        "prune_log": prune_log,
        "review_csv_for_report": str(csv_for_report),
        "coverage_report_path": str(report_path),
    }