Spaces:
Running on Zero
Running on Zero
| """Optional model-based PII redaction using ``openai/privacy-filter``. | |
| The deterministic pipeline always runs regex redaction (:mod:`redaction`). On the | |
| Hugging Face Space GPU this module adds a second pass: a token-classification | |
| model (``openai/privacy-filter``) flags personal or sensitive spans that regex | |
| patterns miss — names, phone numbers, postal addresses, and the like — and masks | |
| them with typed placeholders. | |
| Heavy imports (``torch``/``transformers``) load lazily so the deterministic | |
| analyzer, the test suite, and local development keep working without GPU | |
| dependencies. If the model cannot be loaded, the caller falls back to regex-only | |
| redaction and records the reason in the privacy notes. | |
| """ | |
| from __future__ import annotations | |
| import functools | |
| import time | |
| from collections import Counter | |
| from typing import Any, Callable | |
| from model_runtime import resolve_device | |
| from profiling import get_logger | |
| from redaction import RedactionResult | |
| logger = get_logger() | |
| PRIVACY_MODEL_ID = "openai/privacy-filter" | |
| # Only mask spans the model is reasonably confident about. | |
| PRIVACY_MIN_SCORE = 0.5 | |
| # Model entity group -> (placeholder written into the text, human label for notes). | |
| PII_TYPES: dict[str, tuple[str, str]] = { | |
| "private_person": ("[REDACTED_NAME]", "personal name"), | |
| "private_email": ("[REDACTED_EMAIL]", "email address"), | |
| "private_phone": ("[REDACTED_PHONE]", "phone number"), | |
| "private_address": ("[REDACTED_ADDRESS]", "postal address"), | |
| "private_url": ("[REDACTED_URL]", "personal URL"), | |
| "private_date": ("[REDACTED_DATE]", "personal date"), | |
| "account_number": ("[REDACTED_ACCOUNT]", "account number"), | |
| "secret": ("[REDACTED_SECRET]", "secret"), | |
| } | |
| # (texts) -> per-text list of {"start", "end", "label"} spans. | |
| DetectFn = Callable[[list[str]], list[list[dict[str, Any]]]] | |
| _PIPELINE_CACHE: dict[str, Any] = {} | |
| def redact_texts( | |
| texts: list[str], | |
| *, | |
| detect: DetectFn | None = None, | |
| device: str | None = None, | |
| ) -> list[RedactionResult]: | |
| """Detect and mask PII in each text, returning one result per input. | |
| ``detect`` defaults to :func:`_local_detect` (the lazy model); tests inject a | |
| stand-in so the masking logic runs without ``torch``. ``device`` forces the | |
| compute device for the default detector (``cuda`` / ``mps`` / ``cpu``). | |
| """ | |
| detector = detect or functools.partial(_local_detect, device=device) | |
| spans_per_text = detector(texts) | |
| return [_apply_spans(text, spans) for text, spans in zip(texts, spans_per_text)] | |
| def _merge_spans(text: str, spans: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| """Drop malformed spans and merge same-label runs into clean, disjoint spans. | |
| ``openai/privacy-filter`` uses BIOES tags, which the pipeline's IOB-oriented | |
| "simple" aggregation can split into adjacent fragments of one entity (and a | |
| leading separator can leave a one-character gap). Merging same-label spans | |
| that overlap or sit within one character keeps each entity to a single | |
| placeholder; a remaining different-label overlap is clipped to stay disjoint. | |
| """ | |
| valid = [ | |
| span | |
| for span in spans | |
| if span.get("label") in PII_TYPES | |
| and 0 <= int(span["start"]) < int(span["end"]) <= len(text) | |
| ] | |
| valid.sort(key=lambda span: (int(span["start"]), int(span["end"]))) | |
| merged: list[dict[str, Any]] = [] | |
| for span in valid: | |
| start, end, label = int(span["start"]), int(span["end"]), span["label"] | |
| if merged: | |
| prev = merged[-1] | |
| if label == prev["label"] and start <= prev["end"] + 1: | |
| prev["end"] = max(prev["end"], end) | |
| continue | |
| if start < prev["end"]: # different-label overlap: keep them disjoint | |
| start = prev["end"] | |
| if start >= end: | |
| continue | |
| merged.append({"start": start, "end": end, "label": label}) | |
| return merged | |
| def _apply_spans(text: str, spans: list[dict[str, Any]]) -> RedactionResult: | |
| """Replace detected spans with typed placeholders, right-to-left.""" | |
| counts: Counter[str] = Counter() | |
| redacted = text | |
| for span in sorted(_merge_spans(text, spans), key=lambda span: span["start"], reverse=True): | |
| placeholder, label = PII_TYPES[span["label"]] | |
| redacted = redacted[: span["start"]] + placeholder + redacted[span["end"] :] | |
| counts[label] += 1 | |
| notes = [f"{label}: {count}" for label, count in sorted(counts.items())] | |
| return RedactionResult(text=redacted, notes=notes, count=sum(counts.values())) | |
| def _local_detect(texts: list[str], device: str | None = None) -> list[list[dict[str, Any]]]: | |
| """Run ``openai/privacy-filter`` and return confident PII spans per text. | |
| Imported lazily: ``transformers``/``torch`` only need to exist where the | |
| model actually runs, never for the deterministic path, tests, or light local | |
| development. | |
| """ | |
| pipe = _load_pipeline(device=device) | |
| started = time.perf_counter() | |
| results: list[list[dict[str, Any]]] = [] | |
| for text in texts: | |
| if not text.strip(): | |
| results.append([]) | |
| continue | |
| entities = pipe(text) | |
| spans = [ | |
| { | |
| "start": int(entity["start"]), | |
| "end": int(entity["end"]), | |
| "label": entity["entity_group"], | |
| } | |
| for entity in entities | |
| if entity.get("entity_group") in PII_TYPES | |
| and entity.get("start") is not None | |
| and entity.get("end") is not None | |
| and float(entity.get("score", 1.0)) >= PRIVACY_MIN_SCORE | |
| ] | |
| results.append(spans) | |
| detected = sum(len(spans) for spans in results) | |
| logger.debug( | |
| "privacy-filter scanned %d messages, %d raw spans in %.2fs", | |
| len(texts), | |
| detected, | |
| time.perf_counter() - started, | |
| ) | |
| return results | |
| def _load_pipeline(device: str | None = None) -> Any: | |
| """Lazily build and cache the token-classification pipeline per device.""" | |
| resolved = resolve_device(device) | |
| cached = _PIPELINE_CACHE.get(resolved) | |
| if cached is not None: | |
| return cached | |
| from transformers import pipeline | |
| # transformers pipeline device: 0 for cuda, "mps"/"cpu" otherwise. | |
| pipe_device = 0 if resolved == "cuda" else resolved | |
| started = time.perf_counter() | |
| pipe = pipeline( | |
| "token-classification", | |
| model=PRIVACY_MODEL_ID, | |
| aggregation_strategy="simple", | |
| device=pipe_device, | |
| ) | |
| logger.info( | |
| "loaded %s on %s in %.1fs", PRIVACY_MODEL_ID, resolved, time.perf_counter() - started | |
| ) | |
| _PIPELINE_CACHE[resolved] = pipe | |
| return pipe | |