trace-field-notes / privacy_filter.py
JacobLinCool's picture
feat: add privacy filtering and execution modes
8457788 verified
Raw
History Blame Contribute Delete
6.71 kB
"""Optional model-based PII redaction using ``openai/privacy-filter``.
The deterministic pipeline always runs regex redaction (:mod:`redaction`). On the
Hugging Face Space GPU this module adds a second pass: a token-classification
model (``openai/privacy-filter``) flags personal or sensitive spans that regex
patterns miss — names, phone numbers, postal addresses, and the like — and masks
them with typed placeholders.
Heavy imports (``torch``/``transformers``) load lazily so the deterministic
analyzer, the test suite, and local development keep working without GPU
dependencies. If the model cannot be loaded, the caller falls back to regex-only
redaction and records the reason in the privacy notes.
"""
from __future__ import annotations
import functools
import time
from collections import Counter
from typing import Any, Callable
from model_runtime import resolve_device
from profiling import get_logger
from redaction import RedactionResult
logger = get_logger()
PRIVACY_MODEL_ID = "openai/privacy-filter"
# Only mask spans the model is reasonably confident about.
PRIVACY_MIN_SCORE = 0.5
# Model entity group -> (placeholder written into the text, human label for notes).
PII_TYPES: dict[str, tuple[str, str]] = {
"private_person": ("[REDACTED_NAME]", "personal name"),
"private_email": ("[REDACTED_EMAIL]", "email address"),
"private_phone": ("[REDACTED_PHONE]", "phone number"),
"private_address": ("[REDACTED_ADDRESS]", "postal address"),
"private_url": ("[REDACTED_URL]", "personal URL"),
"private_date": ("[REDACTED_DATE]", "personal date"),
"account_number": ("[REDACTED_ACCOUNT]", "account number"),
"secret": ("[REDACTED_SECRET]", "secret"),
}
# (texts) -> per-text list of {"start", "end", "label"} spans.
DetectFn = Callable[[list[str]], list[list[dict[str, Any]]]]
_PIPELINE_CACHE: dict[str, Any] = {}
def redact_texts(
texts: list[str],
*,
detect: DetectFn | None = None,
device: str | None = None,
) -> list[RedactionResult]:
"""Detect and mask PII in each text, returning one result per input.
``detect`` defaults to :func:`_local_detect` (the lazy model); tests inject a
stand-in so the masking logic runs without ``torch``. ``device`` forces the
compute device for the default detector (``cuda`` / ``mps`` / ``cpu``).
"""
detector = detect or functools.partial(_local_detect, device=device)
spans_per_text = detector(texts)
return [_apply_spans(text, spans) for text, spans in zip(texts, spans_per_text)]
def _merge_spans(text: str, spans: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Drop malformed spans and merge same-label runs into clean, disjoint spans.
``openai/privacy-filter`` uses BIOES tags, which the pipeline's IOB-oriented
"simple" aggregation can split into adjacent fragments of one entity (and a
leading separator can leave a one-character gap). Merging same-label spans
that overlap or sit within one character keeps each entity to a single
placeholder; a remaining different-label overlap is clipped to stay disjoint.
"""
valid = [
span
for span in spans
if span.get("label") in PII_TYPES
and 0 <= int(span["start"]) < int(span["end"]) <= len(text)
]
valid.sort(key=lambda span: (int(span["start"]), int(span["end"])))
merged: list[dict[str, Any]] = []
for span in valid:
start, end, label = int(span["start"]), int(span["end"]), span["label"]
if merged:
prev = merged[-1]
if label == prev["label"] and start <= prev["end"] + 1:
prev["end"] = max(prev["end"], end)
continue
if start < prev["end"]: # different-label overlap: keep them disjoint
start = prev["end"]
if start >= end:
continue
merged.append({"start": start, "end": end, "label": label})
return merged
def _apply_spans(text: str, spans: list[dict[str, Any]]) -> RedactionResult:
"""Replace detected spans with typed placeholders, right-to-left."""
counts: Counter[str] = Counter()
redacted = text
for span in sorted(_merge_spans(text, spans), key=lambda span: span["start"], reverse=True):
placeholder, label = PII_TYPES[span["label"]]
redacted = redacted[: span["start"]] + placeholder + redacted[span["end"] :]
counts[label] += 1
notes = [f"{label}: {count}" for label, count in sorted(counts.items())]
return RedactionResult(text=redacted, notes=notes, count=sum(counts.values()))
def _local_detect(texts: list[str], device: str | None = None) -> list[list[dict[str, Any]]]:
"""Run ``openai/privacy-filter`` and return confident PII spans per text.
Imported lazily: ``transformers``/``torch`` only need to exist where the
model actually runs, never for the deterministic path, tests, or light local
development.
"""
pipe = _load_pipeline(device=device)
started = time.perf_counter()
results: list[list[dict[str, Any]]] = []
for text in texts:
if not text.strip():
results.append([])
continue
entities = pipe(text)
spans = [
{
"start": int(entity["start"]),
"end": int(entity["end"]),
"label": entity["entity_group"],
}
for entity in entities
if entity.get("entity_group") in PII_TYPES
and entity.get("start") is not None
and entity.get("end") is not None
and float(entity.get("score", 1.0)) >= PRIVACY_MIN_SCORE
]
results.append(spans)
detected = sum(len(spans) for spans in results)
logger.debug(
"privacy-filter scanned %d messages, %d raw spans in %.2fs",
len(texts),
detected,
time.perf_counter() - started,
)
return results
def _load_pipeline(device: str | None = None) -> Any:
"""Lazily build and cache the token-classification pipeline per device."""
resolved = resolve_device(device)
cached = _PIPELINE_CACHE.get(resolved)
if cached is not None:
return cached
from transformers import pipeline
# transformers pipeline device: 0 for cuda, "mps"/"cpu" otherwise.
pipe_device = 0 if resolved == "cuda" else resolved
started = time.perf_counter()
pipe = pipeline(
"token-classification",
model=PRIVACY_MODEL_ID,
aggregation_strategy="simple",
device=pipe_device,
)
logger.info(
"loaded %s on %s in %.1fs", PRIVACY_MODEL_ID, resolved, time.perf_counter() - started
)
_PIPELINE_CACHE[resolved] = pipe
return pipe