| """ |
| post_redaction_pass1_qa.py |
| ========================== |
| Optional Pass 1 sanity QA at the end of initial redaction (pre-review-apply). |
| |
| Writes a coverage JSON report and optionally a sibling pruned review CSV. |
| Does not run VLM or call /review_apply. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
| import pandas as pd |
|
|
| from tools.config import ( |
| POST_REDACT_PASS1_AUTO_PRUNE, |
| POST_REDACT_PASS1_INCLUDE_IN_OUTPUTS, |
| POST_REDACT_PASS1_MIN_WORD_LENGTH, |
| POST_REDACT_PASS1_MUST_NOT_REDACT_PATH, |
| POST_REDACT_PASS1_MUST_REDACT_PATH, |
| POST_REDACT_PASS1_QA, |
| POST_REDACT_PASS1_USE_DENY_ALLOW_LISTS, |
| ) |
| from tools.verify_redaction_coverage import ( |
| prune_suspicious_review_csv, |
| verify_redaction_coverage, |
| ) |
|
|
|
|
| def load_regex_patterns_from_csv(path: str | Path) -> list[str]: |
| """Load regex patterns from column 0 of a CSV (same shape as deny/allow list files).""" |
| p = Path(path) |
| if not p.is_file(): |
| return [] |
| df = pd.read_csv(p, header=None, low_memory=False) |
| if df.empty: |
| return [] |
| return [str(x).strip() for x in df.iloc[:, 0].dropna().tolist() if str(x).strip()] |
|
|
|
|
| def merge_policy_patterns( |
| deny_list: list[str] | None, |
| allow_list: list[str] | None, |
| *, |
| must_redact_path: str = "", |
| must_not_redact_path: str = "", |
| use_deny_allow_lists: bool = True, |
| ) -> tuple[list[str], list[str]]: |
| """Build must_redact / must_not_redact regex lists from run lists and env CSV paths.""" |
| must_redact: list[str] = [] |
| must_not: list[str] = [] |
|
|
| if use_deny_allow_lists: |
| if deny_list: |
| must_redact.extend(str(x).strip() for x in deny_list if str(x).strip()) |
| if allow_list: |
| must_not.extend(str(x).strip() for x in allow_list if str(x).strip()) |
|
|
| env_must = must_redact_path or POST_REDACT_PASS1_MUST_REDACT_PATH |
| env_must_not = must_not_redact_path or POST_REDACT_PASS1_MUST_NOT_REDACT_PATH |
| if env_must: |
| must_redact.extend(load_regex_patterns_from_csv(env_must)) |
| if env_must_not: |
| must_not.extend(load_regex_patterns_from_csv(env_must_not)) |
|
|
| return must_redact, must_not |
|
|
|
|
| def _pruned_review_csv_path(review_csv_path: str | Path) -> Path: |
| p = Path(review_csv_path) |
| return p.with_name(f"{p.stem}_pruned.csv") |
|
|
|
|
| def _coverage_report_path(review_csv_path: str | Path) -> Path: |
| p = Path(review_csv_path) |
| return p.with_name(f"{p.stem}_coverage_report.json") |
|
|
|
|
| def build_qa_summary(report: dict[str, Any]) -> str: |
| """Human-readable summary for combined_out_message.""" |
| summary = report.get("summary") or {} |
| n_vlm = len(summary.get("pages_flagged_for_vlm") or []) |
| n_cleanup = len(summary.get("pages_needing_csv_cleanup") or []) |
| return ( |
| "Pass 1 QA: " |
| f"pass_strict={report.get('pass_strict', report.get('pass'))}, " |
| f"pass_with_cleanup={report.get('pass_with_cleanup')}, " |
| f"pages_flagged_for_vlm={n_vlm}, " |
| f"pages_needing_csv_cleanup={n_cleanup}." |
| ) |
|
|
|
|
| def run_post_redaction_pass1_qa( |
| *, |
| review_csv_path: str | Path, |
| ocr_words_csv_path: str | Path, |
| output_folder: str | None = None, |
| total_pages: int | None = None, |
| must_redact: list[str] | None = None, |
| must_not_redact: list[str] | None = None, |
| deny_list: list[str] | None = None, |
| allow_list: list[str] | None = None, |
| auto_prune: bool | None = None, |
| min_word_length: int | None = None, |
| enabled: bool | None = None, |
| use_deny_allow_lists: bool | None = None, |
| include_in_outputs: bool | None = None, |
| ) -> dict[str, Any]: |
| """ |
| Run post-redaction Pass 1 QA on initial review CSV + word OCR. |
| |
| Returns dict with keys: enabled, paths_created, report, summary, prune_log. |
| """ |
| use_enabled = POST_REDACT_PASS1_QA if enabled is None else bool(enabled) |
| if not use_enabled: |
| return { |
| "enabled": False, |
| "paths_created": [], |
| "report": None, |
| "summary": "", |
| "prune_log": None, |
| } |
|
|
| review_path = Path(review_csv_path) |
| ocr_path = Path(ocr_words_csv_path) |
| if not review_path.is_file(): |
| print("Post-redaction Pass 1 QA skipped: review CSV not found.") |
| return { |
| "enabled": True, |
| "paths_created": [], |
| "report": None, |
| "summary": "", |
| "prune_log": None, |
| "error": "review_csv_missing", |
| } |
| if not ocr_path.is_file(): |
| print("Post-redaction Pass 1 QA skipped: OCR words CSV not found.") |
| return { |
| "enabled": True, |
| "paths_created": [], |
| "report": None, |
| "summary": "", |
| "prune_log": None, |
| "error": "ocr_words_csv_missing", |
| } |
|
|
| if must_redact is None or must_not_redact is None: |
| merged_must, merged_must_not = merge_policy_patterns( |
| deny_list, |
| allow_list, |
| use_deny_allow_lists=( |
| POST_REDACT_PASS1_USE_DENY_ALLOW_LISTS |
| if use_deny_allow_lists is None |
| else use_deny_allow_lists |
| ), |
| ) |
| if must_redact is None: |
| must_redact = merged_must |
| if must_not_redact is None: |
| must_not_redact = merged_must_not |
|
|
| min_wl = ( |
| POST_REDACT_PASS1_MIN_WORD_LENGTH |
| if min_word_length is None |
| else min_word_length |
| ) |
| do_prune = POST_REDACT_PASS1_AUTO_PRUNE if auto_prune is None else bool(auto_prune) |
|
|
| paths_created: list[str] = [] |
| prune_log: dict[str, Any] | None = None |
| csv_for_report = review_path |
|
|
| if do_prune: |
| pruned_path = _pruned_review_csv_path(review_path) |
| prune_log = prune_suspicious_review_csv( |
| review_path, |
| pruned_path, |
| must_redact=must_redact, |
| min_word_length=min_wl, |
| ) |
| csv_for_report = pruned_path |
| paths_created.append(str(pruned_path)) |
|
|
| report_obj = verify_redaction_coverage( |
| csv_for_report, |
| ocr_path, |
| must_redact=must_redact, |
| must_not_redact=must_not_redact, |
| total_pages=total_pages, |
| min_word_length=min_wl, |
| ) |
| report = report_obj.to_dict() |
|
|
| report_path = _coverage_report_path(review_path) |
| if output_folder: |
| report_path = Path(output_folder) / report_path.name |
| report_path.parent.mkdir(parents=True, exist_ok=True) |
| report_path.write_text(json.dumps(report, indent=2), encoding="utf-8") |
| paths_created.append(str(report_path)) |
|
|
| include = ( |
| POST_REDACT_PASS1_INCLUDE_IN_OUTPUTS |
| if include_in_outputs is None |
| else include_in_outputs |
| ) |
| if not include: |
| paths_created = [] |
|
|
| summary = build_qa_summary(report) |
| print(summary) |
|
|
| return { |
| "enabled": True, |
| "paths_created": paths_created, |
| "report": report, |
| "summary": summary, |
| "prune_log": prune_log, |
| "review_csv_for_report": str(csv_for_report), |
| "coverage_report_path": str(report_path), |
| } |
|
|