File size: 7,073 Bytes
a2e06b3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 | """
post_redaction_pass1_qa.py
==========================
Optional Pass 1 sanity QA at the end of initial redaction (pre-review-apply).
Writes a coverage JSON report and optionally a sibling pruned review CSV.
Does not run VLM or call /review_apply.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import pandas as pd
from tools.config import (
POST_REDACT_PASS1_AUTO_PRUNE,
POST_REDACT_PASS1_INCLUDE_IN_OUTPUTS,
POST_REDACT_PASS1_MIN_WORD_LENGTH,
POST_REDACT_PASS1_MUST_NOT_REDACT_PATH,
POST_REDACT_PASS1_MUST_REDACT_PATH,
POST_REDACT_PASS1_QA,
POST_REDACT_PASS1_USE_DENY_ALLOW_LISTS,
)
from tools.verify_redaction_coverage import (
prune_suspicious_review_csv,
verify_redaction_coverage,
)
def load_regex_patterns_from_csv(path: str | Path) -> list[str]:
"""Load regex patterns from column 0 of a CSV (same shape as deny/allow list files)."""
p = Path(path)
if not p.is_file():
return []
df = pd.read_csv(p, header=None, low_memory=False)
if df.empty:
return []
return [str(x).strip() for x in df.iloc[:, 0].dropna().tolist() if str(x).strip()]
def merge_policy_patterns(
deny_list: list[str] | None,
allow_list: list[str] | None,
*,
must_redact_path: str = "",
must_not_redact_path: str = "",
use_deny_allow_lists: bool = True,
) -> tuple[list[str], list[str]]:
"""Build must_redact / must_not_redact regex lists from run lists and env CSV paths."""
must_redact: list[str] = []
must_not: list[str] = []
if use_deny_allow_lists:
if deny_list:
must_redact.extend(str(x).strip() for x in deny_list if str(x).strip())
if allow_list:
must_not.extend(str(x).strip() for x in allow_list if str(x).strip())
env_must = must_redact_path or POST_REDACT_PASS1_MUST_REDACT_PATH
env_must_not = must_not_redact_path or POST_REDACT_PASS1_MUST_NOT_REDACT_PATH
if env_must:
must_redact.extend(load_regex_patterns_from_csv(env_must))
if env_must_not:
must_not.extend(load_regex_patterns_from_csv(env_must_not))
return must_redact, must_not
def _pruned_review_csv_path(review_csv_path: str | Path) -> Path:
p = Path(review_csv_path)
return p.with_name(f"{p.stem}_pruned.csv")
def _coverage_report_path(review_csv_path: str | Path) -> Path:
p = Path(review_csv_path)
return p.with_name(f"{p.stem}_coverage_report.json")
def build_qa_summary(report: dict[str, Any]) -> str:
"""Human-readable summary for combined_out_message."""
summary = report.get("summary") or {}
n_vlm = len(summary.get("pages_flagged_for_vlm") or [])
n_cleanup = len(summary.get("pages_needing_csv_cleanup") or [])
return (
"Pass 1 QA: "
f"pass_strict={report.get('pass_strict', report.get('pass'))}, "
f"pass_with_cleanup={report.get('pass_with_cleanup')}, "
f"pages_flagged_for_vlm={n_vlm}, "
f"pages_needing_csv_cleanup={n_cleanup}."
)
def run_post_redaction_pass1_qa(
*,
review_csv_path: str | Path,
ocr_words_csv_path: str | Path,
output_folder: str | None = None,
total_pages: int | None = None,
must_redact: list[str] | None = None,
must_not_redact: list[str] | None = None,
deny_list: list[str] | None = None,
allow_list: list[str] | None = None,
auto_prune: bool | None = None,
min_word_length: int | None = None,
enabled: bool | None = None,
use_deny_allow_lists: bool | None = None,
include_in_outputs: bool | None = None,
) -> dict[str, Any]:
"""
Run post-redaction Pass 1 QA on initial review CSV + word OCR.
Returns dict with keys: enabled, paths_created, report, summary, prune_log.
"""
use_enabled = POST_REDACT_PASS1_QA if enabled is None else bool(enabled)
if not use_enabled:
return {
"enabled": False,
"paths_created": [],
"report": None,
"summary": "",
"prune_log": None,
}
review_path = Path(review_csv_path)
ocr_path = Path(ocr_words_csv_path)
if not review_path.is_file():
print("Post-redaction Pass 1 QA skipped: review CSV not found.")
return {
"enabled": True,
"paths_created": [],
"report": None,
"summary": "",
"prune_log": None,
"error": "review_csv_missing",
}
if not ocr_path.is_file():
print("Post-redaction Pass 1 QA skipped: OCR words CSV not found.")
return {
"enabled": True,
"paths_created": [],
"report": None,
"summary": "",
"prune_log": None,
"error": "ocr_words_csv_missing",
}
if must_redact is None or must_not_redact is None:
merged_must, merged_must_not = merge_policy_patterns(
deny_list,
allow_list,
use_deny_allow_lists=(
POST_REDACT_PASS1_USE_DENY_ALLOW_LISTS
if use_deny_allow_lists is None
else use_deny_allow_lists
),
)
if must_redact is None:
must_redact = merged_must
if must_not_redact is None:
must_not_redact = merged_must_not
min_wl = (
POST_REDACT_PASS1_MIN_WORD_LENGTH
if min_word_length is None
else min_word_length
)
do_prune = POST_REDACT_PASS1_AUTO_PRUNE if auto_prune is None else bool(auto_prune)
paths_created: list[str] = []
prune_log: dict[str, Any] | None = None
csv_for_report = review_path
if do_prune:
pruned_path = _pruned_review_csv_path(review_path)
prune_log = prune_suspicious_review_csv(
review_path,
pruned_path,
must_redact=must_redact,
min_word_length=min_wl,
)
csv_for_report = pruned_path
paths_created.append(str(pruned_path))
report_obj = verify_redaction_coverage(
csv_for_report,
ocr_path,
must_redact=must_redact,
must_not_redact=must_not_redact,
total_pages=total_pages,
min_word_length=min_wl,
)
report = report_obj.to_dict()
report_path = _coverage_report_path(review_path)
if output_folder:
report_path = Path(output_folder) / report_path.name
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(json.dumps(report, indent=2), encoding="utf-8")
paths_created.append(str(report_path))
include = (
POST_REDACT_PASS1_INCLUDE_IN_OUTPUTS
if include_in_outputs is None
else include_in_outputs
)
if not include:
paths_created = []
summary = build_qa_summary(report)
print(summary)
return {
"enabled": True,
"paths_created": paths_created,
"report": report,
"summary": summary,
"prune_log": prune_log,
"review_csv_for_report": str(csv_for_report),
"coverage_report_path": str(report_path),
}
|