document_redaction / tools /post_redaction_pass1_qa.py
seanpedrickcase's picture
Sync: Merge pull request #199 from seanpedrick-case/startup_optimise
a2e06b3
Raw
History Blame Contribute Delete
7.07 kB
"""
post_redaction_pass1_qa.py
==========================
Optional Pass 1 sanity QA at the end of initial redaction (pre-review-apply).
Writes a coverage JSON report and optionally a sibling pruned review CSV.
Does not run VLM or call /review_apply.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import pandas as pd
from tools.config import (
POST_REDACT_PASS1_AUTO_PRUNE,
POST_REDACT_PASS1_INCLUDE_IN_OUTPUTS,
POST_REDACT_PASS1_MIN_WORD_LENGTH,
POST_REDACT_PASS1_MUST_NOT_REDACT_PATH,
POST_REDACT_PASS1_MUST_REDACT_PATH,
POST_REDACT_PASS1_QA,
POST_REDACT_PASS1_USE_DENY_ALLOW_LISTS,
)
from tools.verify_redaction_coverage import (
prune_suspicious_review_csv,
verify_redaction_coverage,
)
def load_regex_patterns_from_csv(path: str | Path) -> list[str]:
"""Load regex patterns from column 0 of a CSV (same shape as deny/allow list files)."""
p = Path(path)
if not p.is_file():
return []
df = pd.read_csv(p, header=None, low_memory=False)
if df.empty:
return []
return [str(x).strip() for x in df.iloc[:, 0].dropna().tolist() if str(x).strip()]
def merge_policy_patterns(
deny_list: list[str] | None,
allow_list: list[str] | None,
*,
must_redact_path: str = "",
must_not_redact_path: str = "",
use_deny_allow_lists: bool = True,
) -> tuple[list[str], list[str]]:
"""Build must_redact / must_not_redact regex lists from run lists and env CSV paths."""
must_redact: list[str] = []
must_not: list[str] = []
if use_deny_allow_lists:
if deny_list:
must_redact.extend(str(x).strip() for x in deny_list if str(x).strip())
if allow_list:
must_not.extend(str(x).strip() for x in allow_list if str(x).strip())
env_must = must_redact_path or POST_REDACT_PASS1_MUST_REDACT_PATH
env_must_not = must_not_redact_path or POST_REDACT_PASS1_MUST_NOT_REDACT_PATH
if env_must:
must_redact.extend(load_regex_patterns_from_csv(env_must))
if env_must_not:
must_not.extend(load_regex_patterns_from_csv(env_must_not))
return must_redact, must_not
def _pruned_review_csv_path(review_csv_path: str | Path) -> Path:
p = Path(review_csv_path)
return p.with_name(f"{p.stem}_pruned.csv")
def _coverage_report_path(review_csv_path: str | Path) -> Path:
p = Path(review_csv_path)
return p.with_name(f"{p.stem}_coverage_report.json")
def build_qa_summary(report: dict[str, Any]) -> str:
"""Human-readable summary for combined_out_message."""
summary = report.get("summary") or {}
n_vlm = len(summary.get("pages_flagged_for_vlm") or [])
n_cleanup = len(summary.get("pages_needing_csv_cleanup") or [])
return (
"Pass 1 QA: "
f"pass_strict={report.get('pass_strict', report.get('pass'))}, "
f"pass_with_cleanup={report.get('pass_with_cleanup')}, "
f"pages_flagged_for_vlm={n_vlm}, "
f"pages_needing_csv_cleanup={n_cleanup}."
)
def run_post_redaction_pass1_qa(
*,
review_csv_path: str | Path,
ocr_words_csv_path: str | Path,
output_folder: str | None = None,
total_pages: int | None = None,
must_redact: list[str] | None = None,
must_not_redact: list[str] | None = None,
deny_list: list[str] | None = None,
allow_list: list[str] | None = None,
auto_prune: bool | None = None,
min_word_length: int | None = None,
enabled: bool | None = None,
use_deny_allow_lists: bool | None = None,
include_in_outputs: bool | None = None,
) -> dict[str, Any]:
"""
Run post-redaction Pass 1 QA on initial review CSV + word OCR.
Returns dict with keys: enabled, paths_created, report, summary, prune_log.
"""
use_enabled = POST_REDACT_PASS1_QA if enabled is None else bool(enabled)
if not use_enabled:
return {
"enabled": False,
"paths_created": [],
"report": None,
"summary": "",
"prune_log": None,
}
review_path = Path(review_csv_path)
ocr_path = Path(ocr_words_csv_path)
if not review_path.is_file():
print("Post-redaction Pass 1 QA skipped: review CSV not found.")
return {
"enabled": True,
"paths_created": [],
"report": None,
"summary": "",
"prune_log": None,
"error": "review_csv_missing",
}
if not ocr_path.is_file():
print("Post-redaction Pass 1 QA skipped: OCR words CSV not found.")
return {
"enabled": True,
"paths_created": [],
"report": None,
"summary": "",
"prune_log": None,
"error": "ocr_words_csv_missing",
}
if must_redact is None or must_not_redact is None:
merged_must, merged_must_not = merge_policy_patterns(
deny_list,
allow_list,
use_deny_allow_lists=(
POST_REDACT_PASS1_USE_DENY_ALLOW_LISTS
if use_deny_allow_lists is None
else use_deny_allow_lists
),
)
if must_redact is None:
must_redact = merged_must
if must_not_redact is None:
must_not_redact = merged_must_not
min_wl = (
POST_REDACT_PASS1_MIN_WORD_LENGTH
if min_word_length is None
else min_word_length
)
do_prune = POST_REDACT_PASS1_AUTO_PRUNE if auto_prune is None else bool(auto_prune)
paths_created: list[str] = []
prune_log: dict[str, Any] | None = None
csv_for_report = review_path
if do_prune:
pruned_path = _pruned_review_csv_path(review_path)
prune_log = prune_suspicious_review_csv(
review_path,
pruned_path,
must_redact=must_redact,
min_word_length=min_wl,
)
csv_for_report = pruned_path
paths_created.append(str(pruned_path))
report_obj = verify_redaction_coverage(
csv_for_report,
ocr_path,
must_redact=must_redact,
must_not_redact=must_not_redact,
total_pages=total_pages,
min_word_length=min_wl,
)
report = report_obj.to_dict()
report_path = _coverage_report_path(review_path)
if output_folder:
report_path = Path(output_folder) / report_path.name
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(json.dumps(report, indent=2), encoding="utf-8")
paths_created.append(str(report_path))
include = (
POST_REDACT_PASS1_INCLUDE_IN_OUTPUTS
if include_in_outputs is None
else include_in_outputs
)
if not include:
paths_created = []
summary = build_qa_summary(report)
print(summary)
return {
"enabled": True,
"paths_created": paths_created,
"report": report,
"summary": summary,
"prune_log": prune_log,
"review_csv_for_report": str(csv_for_report),
"coverage_report_path": str(report_path),
}