import base64 import os import secrets import time import unicodedata from concurrent.futures import ThreadPoolExecutor from typing import Any, Dict, List, Optional, Tuple import boto3 import botocore import docx import gradio as gr import pandas as pd import polars as pl from botocore.client import BaseClient from docx.oxml.ns import qn from docx.table import Table from docx.text.paragraph import Paragraph from faker import Faker from gradio import Progress from openpyxl import Workbook from presidio_analyzer import ( AnalyzerEngine, BatchAnalyzerEngine, DictAnalyzerResult, RecognizerResult, ) from presidio_anonymizer import AnonymizerEngine, BatchAnonymizerEngine from presidio_anonymizer.entities import OperatorConfig from tools.config import ( AWS_ACCESS_KEY, AWS_LLM_PII_OPTION, AWS_REGION, AWS_SECRET_KEY, CLOUD_LLM_PII_CUSTOM_INSTRUCTIONS_MODEL_CHOICE, CLOUD_LLM_PII_MODEL_CHOICE, # Legacy alias for CLOUD_LLM_PII_MODEL_CHOICE CUSTOM_ENTITIES, DEFAULT_LANGUAGE, DO_INITIAL_TABULAR_DATA_CLEAN, FULL_COMPREHEND_ENTITY_LIST, INFERENCE_SERVER_PII_OPTION, LLM_MAX_NEW_TOKENS, LLM_TEMPERATURE, LOCAL_TRANSFORMERS_LLM_PII_MODEL_CHOICE, LOCAL_TRANSFORMERS_LLM_PII_OPTION, MAX_SIMULTANEOUS_FILES, MAX_TABLE_COLUMNS, MAX_TABLE_ROWS, MAX_WORKERS, OUTPUT_FOLDER, PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS, RUN_AWS_FUNCTIONS, aws_comprehend_language_choices, ) from tools.helper_functions import ( detect_file_type, get_file_name_without_type, read_file, ) from tools.llm_entity_detection import call_llm_for_entity_detection from tools.load_spacy_model_custom_recognisers import ( CustomWordFuzzyRecognizer, create_nlp_analyser, custom_word_list_recogniser, load_spacy_model, nlp_analyser, score_threshold, ) # Use custom version of analyze_dict to be able to track progress from tools.presidio_analyzer_custom import analyze_dict, analyze_iterator_custom from tools.secure_path_utils import secure_join # AWS Comprehend billing: 1 unit = 100 characters (entity recognition, PII, etc.) COMPREHEND_CHARACTERS_PER_UNIT = 100 # Max concurrent API calls for Bedrock/LLM (avoid rate limits; Comprehend uses MAX_WORKERS) LLM_PII_MAX_CONCURRENT_REQUESTS = min(MAX_WORKERS, 10) _COMPREHEND_CONNECTIVITY_PROBE_TEXT = "connectivity check" def _is_non_retryable_aws_error(exc: Exception) -> bool: """Return True for credential/auth failures that should not be retried.""" if isinstance( exc, ( botocore.exceptions.NoCredentialsError, botocore.exceptions.TokenRetrievalError, ), ): return True if isinstance(exc, botocore.exceptions.ClientError): code = exc.response.get("Error", {}).get("Code", "") return code in ( "UnrecognizedClientException", "InvalidSignatureException", "AccessDeniedException", "ExpiredTokenException", "InvalidClientTokenId", ) err = str(exc).lower() return "token has expired" in err or ( "sso" in err and "token" in err and "retriev" in err ) def _comprehend_connectivity_error_message(exc: Exception) -> str: """User-facing message when Comprehend credentials or connectivity fail.""" if isinstance(exc, botocore.exceptions.NoCredentialsError): return ( "Cannot connect to AWS Comprehend service. Please provide access keys " "under Textract settings on the Redaction settings tab, or choose another " "PII identification method." ) if isinstance(exc, botocore.exceptions.TokenRetrievalError) or ( "token has expired" in str(exc).lower() ): return ( "Cannot connect to AWS Comprehend service. AWS SSO token has expired — " "please run `aws sso login` or provide access keys under Textract settings, " "or choose another PII identification method." ) if isinstance(exc, botocore.exceptions.ClientError): code = exc.response.get("Error", {}).get("Code", "") if code in ( "UnrecognizedClientException", "InvalidSignatureException", "AccessDeniedException", "ExpiredTokenException", "InvalidClientTokenId", ): return ( "Cannot connect to AWS Comprehend service. Please check your AWS " "credentials (SSO login or access keys under Textract settings), " "or choose another PII identification method." ) return ( f"Cannot connect to AWS Comprehend service: {exc}. Please check your AWS " "credentials or choose another PII identification method." ) def verify_comprehend_connectivity( comprehend_client: BaseClient, language: str ) -> None: """Fail fast if Comprehend credentials are missing, expired, or otherwise invalid.""" try: comprehend_client.detect_pii_entities( Text=_COMPREHEND_CONNECTIVITY_PROBE_TEXT, LanguageCode=language, ) except Exception as exc: raise Exception(_comprehend_connectivity_error_message(exc)) from exc def _comprehend_one_cell( comprehend_client: BaseClient, text_str: str, language: str, chosen_redact_comprehend_entities: List[str], in_allow_list_flat: List[str], max_retries: int = 3, retry_delay: int = 3, ) -> Tuple[List[RecognizerResult], int]: """Call AWS Comprehend for one text cell. Returns (recognizer_results, query_units).""" aws_entity_types = [ t for t in (chosen_redact_comprehend_entities or []) if t in (FULL_COMPREHEND_ENTITY_LIST or []) and t not in ("CUSTOM", "CUSTOM_FUZZY") ] if not text_str.strip() or not aws_entity_types: return ([], 0) query_units = ( len(text_str.strip()) + COMPREHEND_CHARACTERS_PER_UNIT - 1 ) // COMPREHEND_CHARACTERS_PER_UNIT for attempt in range(max_retries): try: response = comprehend_client.detect_pii_entities( Text=text_str, LanguageCode=language ) results = [] for entity in response["Entities"]: if entity.get("Type") not in aws_entity_types: continue entity_text = text_str[entity["BeginOffset"] : entity["EndOffset"]] if in_allow_list_flat: allow_list_normalized = [ item.strip().lower() for item in in_allow_list_flat if item ] if entity_text.strip().lower() in allow_list_normalized: continue results.append( RecognizerResult( entity_type=entity["Type"], start=entity["BeginOffset"], end=entity["EndOffset"], score=entity["Score"], ) ) return (results, query_units) except Exception as exc: if _is_non_retryable_aws_error(exc) or attempt == max_retries - 1: raise time.sleep(retry_delay) return ([], query_units) custom_entities = CUSTOM_ENTITIES fake = Faker("en_UK") def fake_first_name(x): return fake.first_name() # #### Some of my cleaning functions url_pattern = r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+|(?:www\.)[a-zA-Z0-9._-]+\.[a-zA-Z]{2,}" html_pattern_regex = r"<.*?>|&([a-z0-9]+|#[0-9]{1,6}|#x[0-9a-f]{1,6});|\xa0| " html_start_pattern_end_dots_regex = r"<(.*?)\.\." non_ascii_pattern = r"[^\x00-\x7F]+" and_sign_regex = r"&" multiple_spaces_regex = r"\s{2,}" multiple_new_lines_regex = r"(\r\n|\n)+" multiple_punctuation_regex = r"(\p{P})\p{P}+" def initial_clean(texts: pd.Series) -> pd.Series: """ This function cleans the text by removing URLs, HTML tags, and non-ASCII characters. """ for text in texts: if not text or pd.isnull(text): text = "" # Normalize unicode characters to decompose any special forms normalized_text = unicodedata.normalize("NFKC", text) # Replace smart quotes and special punctuation with standard ASCII equivalents replacements = { "‘": "'", "’": "'", "“": '"', "”": '"', "–": "-", "—": "-", "…": "...", "•": "*", } # Perform replacements for old_char, new_char in replacements.items(): normalised_text = normalized_text.replace(old_char, new_char) text = normalised_text # Convert to polars Series texts = pl.Series(texts).str.strip_chars() # Define a list of patterns and their replacements patterns = [ (multiple_new_lines_regex, " "), (r"\r", ""), (url_pattern, " "), (html_pattern_regex, " "), (html_start_pattern_end_dots_regex, " "), (non_ascii_pattern, " "), (multiple_spaces_regex, " "), (multiple_punctuation_regex, "${1}"), (and_sign_regex, "and"), ] # Apply each regex replacement for pattern, replacement in patterns: texts = texts.str.replace_all(pattern, replacement) # Convert the series back to a list texts = texts.to_list() return texts def process_recognizer_result( result: RecognizerResult, recognizer_result: RecognizerResult, data_row: int, dictionary_key: int, df_dict: Dict[str, List[Any]], keys_to_keep: List[str], ) -> Tuple[List[str], List[Dict[str, Any]]]: output = list() output_dicts = list() if hasattr(result, "value"): text = result.value[data_row] else: text = "" if isinstance(recognizer_result, list): for sub_result in recognizer_result: if isinstance(text, str): found_text = text[sub_result.start : sub_result.end] else: found_text = "" analysis_explanation = { key: sub_result.__dict__[key] for key in keys_to_keep } analysis_explanation.update( { "data_row": str(data_row), "column": list(df_dict.keys())[dictionary_key], "entity": found_text, } ) output.append(str(analysis_explanation)) output_dicts.append(analysis_explanation) return output, output_dicts # Writing decision making process to file def generate_log( analyzer_results: List[DictAnalyzerResult], df_dict: Dict[str, List[Any]] ) -> Tuple[str, pd.DataFrame]: """ Generate a detailed output of the decision process for entity recognition. This function takes the results from the analyzer and the original data dictionary, and produces a string output detailing the decision process for each recognized entity. It includes information such as entity type, position, confidence score, and the context in which the entity was found. Args: analyzer_results (List[DictAnalyzerResult]): The results from the entity analyzer. df_dict (Dict[str, List[Any]]): The original data in dictionary format. Returns: Tuple[str, pd.DataFrame]: A tuple containing the string output and DataFrame with all columns. """ decision_process_output = list() decision_process_output_dicts = list() # New list to store dictionaries keys_to_keep = ["entity_type", "start", "end"] # Run through each column to analyse for PII for i, result in enumerate(analyzer_results): # If a single result if isinstance(result, RecognizerResult): output, output_dicts = process_recognizer_result( result, result, 0, i, df_dict, keys_to_keep ) decision_process_output.extend(output) decision_process_output_dicts.extend(output_dicts) # If a list of results elif isinstance(result, list) or isinstance(result, DictAnalyzerResult): for x, recognizer_result in enumerate(result.recognizer_results): output, output_dicts = process_recognizer_result( result, recognizer_result, x, i, df_dict, keys_to_keep ) decision_process_output.extend(output) decision_process_output_dicts.extend(output_dicts) else: try: output, output_dicts = process_recognizer_result( result, result, 0, i, df_dict, keys_to_keep ) decision_process_output.extend(output) decision_process_output_dicts.extend(output_dicts) except Exception as e: print(e) decision_process_output_str = "\n".join(decision_process_output) decision_process_output_df = pd.DataFrame(decision_process_output_dicts) return decision_process_output_str, decision_process_output_df def anon_consistent_names(df: pd.DataFrame) -> pd.DataFrame: # ## Pick out common names and replace them with the same person value df_dict = df.to_dict(orient="list") # analyzer = AnalyzerEngine() batch_analyzer = BatchAnalyzerEngine(analyzer_engine=nlp_analyser) analyzer_results = batch_analyzer.analyze_dict(df_dict, language=DEFAULT_LANGUAGE) analyzer_results = list(analyzer_results) text = analyzer_results[3].value recognizer_result = str(analyzer_results[3].recognizer_results) data_str = recognizer_result # abbreviated for brevity # Adjusting the parse_dict function to handle trailing ']' # Splitting the main data string into individual list strings list_strs = data_str[1:-1].split("], [") def parse_dict(s): s = s.strip("[]") # Removing any surrounding brackets items = s.split(", ") d = {} for item in items: key, value = item.split(": ") if key == "score": d[key] = float(value) elif key in ["start", "end"]: d[key] = int(value) else: d[key] = value return d # Re-running the improved processing code result = list() for lst_str in list_strs: # Splitting each list string into individual dictionary strings dict_strs = lst_str.split(", type: ") dict_strs = [dict_strs[0]] + [ "type: " + s for s in dict_strs[1:] ] # Prepending "type: " back to the split strings # Parsing each dictionary string dicts = [parse_dict(d) for d in dict_strs] result.append(dicts) names = list() for idx, paragraph in enumerate(text): paragraph_texts = list() for dictionary in result[idx]: if dictionary["type"] == "PERSON": paragraph_texts.append( paragraph[dictionary["start"] : dictionary["end"]] ) names.append(paragraph_texts) # Flatten the list of lists and extract unique names unique_names = list(set(name for sublist in names for name in sublist)) fake_names = pd.Series(unique_names).apply(fake_first_name) mapping_df = pd.DataFrame( data={"Unique names": unique_names, "Fake names": fake_names} ) # Convert mapping dataframe to dictionary, adding word boundaries for full-word match name_map = { r"\b" + k + r"\b": v for k, v in zip(mapping_df["Unique names"], mapping_df["Fake names"]) } name_map scrubbed_df_consistent_names = df.replace(name_map, regex=True) scrubbed_df_consistent_names return scrubbed_df_consistent_names REDACTION_EXAMPLE_PLACEHOLDER = ( "_Run redaction to see an example of the redacted output._" ) def _first_non_empty_cell_text( df: pd.DataFrame, cols: Optional[List[str]] = None ) -> str: """Return text from the first non-empty cell (row-major within chosen columns).""" if df is None or df.empty: return "" check_cols = [c for c in (cols if cols else list(df.columns)) if c in df.columns] if not check_cols: return "" for _, row in df[check_cols].iterrows(): for col in check_cols: val = row[col] if pd.notna(val) and str(val).strip(): return str(val).strip() return "" def format_redaction_example_markdown(text: str, source_label: str) -> str: """Format a sample redacted string for display in a Gradio Markdown component.""" if not text or not str(text).strip(): return REDACTION_EXAMPLE_PLACEHOLDER display = str(text).strip() max_len = 3000 if len(display) > max_len: display = display[:max_len] + "…" display = display.replace("```", "'''") return f"### Example redacted output\n\n_{source_label}_\n\n```\n{display}\n```" def _iter_docx_block_items(document: docx.Document): """Yield paragraphs and tables in document body order.""" for child in document.element.body.iterchildren(): if child.tag == qn("w:p"): yield Paragraph(child, document) elif child.tag == qn("w:tbl"): yield Table(child, document) def _docx_paragraph_page_break_before(paragraph: Paragraph) -> bool: p_pr = paragraph._element.find(qn("w:pPr")) if p_pr is not None and p_pr.find(qn("w:pageBreakBefore")) is not None: return True return False def _docx_paragraph_has_inline_page_break(paragraph: Paragraph) -> bool: for br in paragraph._element.iter(qn("w:br")): if br.get(qn("w:type")) == "page": return True if paragraph._element.find(f".//{qn('w:lastRenderedPageBreak')}") is not None: return True return False def _docx_paragraph_section_starts_new_page(paragraph: Paragraph) -> bool: p_pr = paragraph._element.find(qn("w:pPr")) if p_pr is None: return False sect_pr = p_pr.find(qn("w:sectPr")) if sect_pr is None: return False type_el = sect_pr.find(qn("w:type")) if type_el is None: return True val = type_el.get(qn("w:val")) return val in (None, "nextPage", "oddPage", "evenPage") def _iter_docx_table_unique_cells(table: Table): """Yield each physical table cell once (merged cells repeat in row.cells).""" seen_tc_ids: set[int] = set() for row in table.rows: for cell in row.cells: tc_id = id(cell._tc) if tc_id in seen_tc_ids: continue seen_tc_ids.add(tc_id) yield cell def _extract_docx_text_blocks_with_pages( document: docx.Document, ) -> List[Tuple[Any, str, int]]: """Return (element, text, page_num) tuples in document order.""" blocks: List[Tuple[Any, str, int]] = [] current_page = 1 saw_content = False for block in _iter_docx_block_items(document): if isinstance(block, Paragraph): text = block.text.strip() if not text: continue if saw_content and _docx_paragraph_page_break_before(block): current_page += 1 blocks.append((block, text, current_page)) saw_content = True if _docx_paragraph_has_inline_page_break(block) or ( _docx_paragraph_section_starts_new_page(block) ): current_page += 1 else: for cell in _iter_docx_table_unique_cells(block): text = cell.text.strip() if not text: continue blocks.append((cell, text, current_page)) saw_content = True return blocks def _accumulate_text_blocks_up_to_chars(texts: List[str], max_chars: int) -> List[str]: selected: List[str] = [] total = 0 for text in texts: cleaned = str(text).strip() if not cleaned: continue if selected and total + len(cleaned) > max_chars: break selected.append(cleaned) total += len(cleaned) + 3 return selected def _docx_first_page_redacted_preview( block_pages: List[int], anonymised_texts: List[str] ) -> str: """Join redacted text blocks from page 1 for the UI preview.""" if not anonymised_texts: return "" paired = list(zip(block_pages, anonymised_texts)) first_page_texts = [ str(text).strip() for page, text in paired if page == 1 and str(text).strip() ] if not first_page_texts: first_page_texts = [str(anonymised_texts[0]).strip()] # Without page-break markers every block stays on page 1; cap the preview size. if set(block_pages) == {1} and len(paired) > 15: first_page_texts = _accumulate_text_blocks_up_to_chars( [str(text) for _, text in paired], 2500, ) return "\n\n\n".join(first_page_texts) def handle_docx_anonymisation( file_path: str, output_folder: str, anon_strategy: str, chosen_redact_entities: List[str], in_allow_list: List[str], in_deny_list: List[str], max_fuzzy_spelling_mistakes_num: int, pii_identification_method: str, chosen_redact_comprehend_entities: List[str], comprehend_query_number: int, comprehend_client: BaseClient, language: Optional[str] = DEFAULT_LANGUAGE, out_file_paths: List[str] = list(), nlp_analyser: AnalyzerEngine = nlp_analyser, ): """ Anonymises a .docx file by extracting text, processing it, and re-inserting it. Returns: A tuple containing the output file path and the log file path. """ # 1. Load the document and extract text elements (document order, with page numbers) doc = docx.Document(file_path) blocks_with_pages = _extract_docx_text_blocks_with_pages(doc) paragraph_count = len(doc.paragraphs) if paragraph_count > MAX_TABLE_ROWS: out_message = f"Number of paragraphs in document is greater than {MAX_TABLE_ROWS}. Please submit a smaller document." print(out_message) raise Exception(out_message) if not blocks_with_pages: print(f"No text found in {file_path}. Skipping.") return out_file_paths, comprehend_query_number, 0, 0, "", "" text_elements = [block[0] for block in blocks_with_pages] original_texts = [block[1] for block in blocks_with_pages] block_pages = [block[2] for block in blocks_with_pages] # 2. Convert to a DataFrame for the existing anonymisation script df_to_anonymise = pd.DataFrame({"text_to_redact": original_texts}) # 3. Call the core anonymisation script ( anonymised_df, _, decision_log, comprehend_query_number, decision_process_output_df, llm_total_input_tokens, llm_total_output_tokens, llm_model_name, ) = anonymise_script( df=df_to_anonymise, anon_strategy=anon_strategy, language=language, chosen_redact_entities=chosen_redact_entities, in_allow_list=in_allow_list, in_deny_list=in_deny_list, max_fuzzy_spelling_mistakes_num=max_fuzzy_spelling_mistakes_num, pii_identification_method=pii_identification_method, chosen_redact_comprehend_entities=chosen_redact_comprehend_entities, comprehend_query_number=comprehend_query_number, comprehend_client=comprehend_client, nlp_analyser=nlp_analyser, output_folder=output_folder, ) anonymised_texts = anonymised_df["text_to_redact"].tolist() # 4. Re-insert the anonymised text back into the document objects for element, new_text in zip(text_elements, anonymised_texts): if isinstance(element, docx.text.paragraph.Paragraph): # Clear existing content (runs) and add the new text in a single new run element.clear() element.add_run(new_text) elif isinstance(element, docx.table._Cell): # For cells, setting .text works similarly element.text = new_text # 5. Save the redacted document and the log file base_name = os.path.basename(file_path) file_name_without_ext = os.path.splitext(base_name)[0] output_docx_path = secure_join( output_folder, f"{file_name_without_ext}_redacted.docx" ) out_file_paths.append(output_docx_path) output_xlsx_path = secure_join( output_folder, f"{file_name_without_ext}_redacted.csv" ) anonymised_df.to_csv(output_xlsx_path, encoding="utf-8-sig", index=None) doc.save(output_docx_path) out_file_paths.append(output_xlsx_path) # Reconstruct log_file_path for return value log_file_path = secure_join( output_folder, f"{file_name_without_ext}_redacted_log.csv" ) decision_process_output_df.to_csv(log_file_path, index=None, encoding="utf-8-sig") out_file_paths.append(log_file_path) first_redacted_text = _docx_first_page_redacted_preview( block_pages, anonymised_texts ) return ( out_file_paths, comprehend_query_number, llm_total_input_tokens, llm_total_output_tokens, llm_model_name, first_redacted_text, ) def anonymise_files_with_open_text( file_paths: List[str], in_text: str, anon_strategy: str, chosen_cols: List[str], chosen_redact_entities: List[str], in_allow_list: List[str] = None, latest_file_completed: int = 0, out_message: list = list(), out_file_paths: list = list(), log_files_output_paths: list = list(), in_excel_sheets: list = list(), first_loop_state: bool = False, output_folder: str = OUTPUT_FOLDER, in_deny_list: list[str] = list(), max_fuzzy_spelling_mistakes_num: int = 0, pii_identification_method: str = "Local", chosen_redact_comprehend_entities: List[str] = list(), comprehend_query_number: int = 0, aws_access_key_textbox: str = "", aws_secret_key_textbox: str = "", actual_time_taken_number: float = 0, do_initial_clean: bool = DO_INITIAL_TABULAR_DATA_CLEAN, language: Optional[str] = None, progress: Progress = Progress(track_tqdm=True), custom_llm_instructions: str = "", chosen_llm_entities: Optional[List[str]] = None, ): """ This function anonymises data files based on the provided parameters. Parameters: - file_paths (List[str]): A list of file paths to anonymise: '.xlsx', '.xls', '.csv', '.parquet', or '.docx'. - in_text (str): The text to anonymise if file_paths is 'open_text'. - anon_strategy (str): The anonymisation strategy to use. - chosen_cols (List[str]): A list of column names to anonymise. - language (str): The language of the text to anonymise. - chosen_redact_entities (List[str]): A list of entities to redact. - in_allow_list (List[str], optional): A list of allowed values. Defaults to None. - latest_file_completed (int, optional): The index of the last file completed. Defaults to 0. - out_message (list, optional): A list to store output messages. Defaults to an empty list. - out_file_paths (list, optional): A list to store output file paths. Defaults to an empty list. - log_files_output_paths (list, optional): A list to store log file paths. Defaults to an empty list. - in_excel_sheets (list, optional): A list of Excel sheet names. Defaults to an empty list. - first_loop_state (bool, optional): Indicates if this is the first loop iteration. Defaults to False. - output_folder (str, optional): The output folder path. Defaults to the global output_folder variable. - in_deny_list (list[str], optional): A list of specific terms to redact. - max_fuzzy_spelling_mistakes_num (int, optional): The maximum number of spelling mistakes allowed in a searched phrase for fuzzy matching. Can range from 0-9. - pii_identification_method (str, optional): The method to redact personal information. Either 'Local' (spacy model), or 'AWS Comprehend' (AWS Comprehend API). - chosen_redact_comprehend_entities (List[str]): A list of entity types to redact from files, chosen from the official list from AWS Comprehend service. - comprehend_query_number (int, optional): A counter for AWS Comprehend usage in units of 100 characters (1 unit = 100 characters, per AWS billing). Defaults to 0. - aws_access_key_textbox (str, optional): AWS access key for account with Textract and Comprehend permissions. - aws_secret_key_textbox (str, optional): AWS secret key for account with Textract and Comprehend permissions. - actual_time_taken_number (float, optional): Time taken to do the redaction. - language (str, optional): The language of the text to anonymise. - progress (Progress, optional): A Progress object to track progress. Defaults to a Progress object with track_tqdm=True. - do_initial_clean (bool, optional): Whether to perform an initial cleaning of the text. Defaults to True. - custom_llm_instructions (str, optional): Custom instructions for LLM entity detection (tabular). Defaults to "". - chosen_llm_entities (List[str], optional): Entity types to detect when using LLM PII method (tabular). Defaults to None (uses chosen_redact_comprehend_entities). """ tic = time.perf_counter() comprehend_client = "" out_message_out = "" llm_total_input_tokens = 0 llm_total_output_tokens = 0 llm_model_name = "" redaction_example_markdown = REDACTION_EXAMPLE_PLACEHOLDER # Normalise LLM params (Gradio may send None or single value) if custom_llm_instructions is None: custom_llm_instructions = "" if chosen_llm_entities is not None and not isinstance(chosen_llm_entities, list): chosen_llm_entities = [chosen_llm_entities] if chosen_llm_entities else None # If output folder doesn't end with a forward slash, add one if not output_folder.endswith("/"): output_folder = output_folder + "/" # Use provided language or default language = language or DEFAULT_LANGUAGE if pii_identification_method == "AWS Comprehend": if language not in aws_comprehend_language_choices: out_message = f"Please note that this language is not supported by AWS Comprehend: {language}" raise Warning(out_message) # If this is the first time around, set variables to 0/blank if first_loop_state is True: latest_file_completed = 0 out_message = list() out_file_paths = list() redaction_example_markdown = REDACTION_EXAMPLE_PLACEHOLDER # Load file # If out message or out_file_paths are blank, change to a list so it can be appended to if isinstance(out_message, str): out_message = [out_message] if isinstance(log_files_output_paths, str): log_files_output_paths = list() if not out_file_paths: out_file_paths = list() # Handle both list (new Dropdown format) and DataFrame (legacy) if isinstance(in_allow_list, list): # Dropdown component returns a list directly in_allow_list_flat = ( [str(item) for item in in_allow_list if item] if in_allow_list else list() ) elif isinstance(in_allow_list, pd.DataFrame): if not in_allow_list.empty: in_allow_list_flat = list(in_allow_list.iloc[:, 0].unique()) else: in_allow_list_flat = list() else: in_allow_list_flat = list() anon_df = pd.DataFrame() # Try to connect to AWS services directly only if RUN_AWS_FUNCTIONS environmental variable is 1, otherwise an environment variable or direct textbox input is needed. if pii_identification_method == "AWS Comprehend": print("Trying to connect to AWS Comprehend service") if RUN_AWS_FUNCTIONS and PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS: print("Connecting to Comprehend via existing SSO connection") comprehend_client = boto3.client("comprehend", region_name=AWS_REGION) elif aws_access_key_textbox and aws_secret_key_textbox: print( "Connecting to Comprehend using AWS access key and secret keys from textboxes." ) comprehend_client = boto3.client( "comprehend", aws_access_key_id=aws_access_key_textbox, aws_secret_access_key=aws_secret_key_textbox, ) elif RUN_AWS_FUNCTIONS: print("Connecting to Comprehend via existing SSO connection") comprehend_client = boto3.client("comprehend") elif AWS_ACCESS_KEY and AWS_SECRET_KEY: print("Getting Comprehend credentials from environment variables") comprehend_client = boto3.client( "comprehend", aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, ) else: comprehend_client = "" out_message = "Cannot connect to AWS Comprehend service. Please provide access keys under Textract settings on the Redaction settings tab, or choose another PII identification method." raise (out_message) verify_comprehend_connectivity(comprehend_client, language) # Create Bedrock runtime client when using LLM-based PII detection with AWS Bedrock bedrock_runtime = None if pii_identification_method == AWS_LLM_PII_OPTION: if RUN_AWS_FUNCTIONS and PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS: print("Connecting to Bedrock via existing SSO connection") bedrock_runtime = boto3.client("bedrock-runtime", region_name=AWS_REGION) elif aws_access_key_textbox and aws_secret_key_textbox: print( "Connecting to Bedrock using AWS access key and secret keys from user input." ) bedrock_runtime = boto3.client( "bedrock-runtime", aws_access_key_id=aws_access_key_textbox, aws_secret_access_key=aws_secret_key_textbox, region_name=AWS_REGION, ) elif RUN_AWS_FUNCTIONS: print("Connecting to Bedrock via existing SSO connection") bedrock_runtime = boto3.client("bedrock-runtime", region_name=AWS_REGION) elif AWS_ACCESS_KEY and AWS_SECRET_KEY: print("Getting Bedrock credentials from environment variables") bedrock_runtime = boto3.client( "bedrock-runtime", aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=AWS_REGION, ) else: out_message = "Cannot connect to AWS Bedrock service. Please provide access keys under Textract settings on the Redaction settings tab, or choose another PII identification method." print(out_message) raise Exception(out_message) # Check if files and text exist if not file_paths: if in_text: file_paths = ["open_text"] else: out_message = "Please enter text or a file to redact." raise Exception(out_message) if not isinstance(file_paths, list): file_paths = [file_paths] def _maybe_set_redaction_example(first_redacted_text: str, source_label: str): nonlocal redaction_example_markdown if ( redaction_example_markdown == REDACTION_EXAMPLE_PLACEHOLDER and first_redacted_text ): redaction_example_markdown = format_redaction_example_markdown( first_redacted_text, source_label ) if len(file_paths) > MAX_SIMULTANEOUS_FILES: out_message = f"Number of files to anonymise is greater than {MAX_SIMULTANEOUS_FILES}. Please submit a smaller number of files." print(out_message) raise Exception(out_message) # If we have already redacted the last file, return the input out_message and file list to the relevant components if latest_file_completed >= len(file_paths): print("Last file reached") # , returning files:", str(latest_file_completed)) # Set to a very high number so as not to mess with subsequent file processing by the user # latest_file_completed = 99 final_out_message = "\n".join(out_message) gr.Info(final_out_message) return ( final_out_message, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, actual_time_taken_number, comprehend_query_number, llm_total_input_tokens, llm_total_output_tokens, llm_model_name, redaction_example_markdown, ) file_path_loop = [file_paths[int(latest_file_completed)]] for anon_file in progress.tqdm( file_path_loop, desc="Anonymising files", unit="files" ): # Get a string file path if isinstance(anon_file, str): file_path = anon_file else: file_path = anon_file if anon_file == "open_text": anon_df = pd.DataFrame(data={"text": [in_text]}) chosen_cols = ["text"] out_file_part = anon_file sheet_name = "" file_type = "" ( out_file_paths, out_message, key_string, log_files_output_paths, comprehend_query_number, tbl_llm_in, tbl_llm_out, tbl_llm_model, first_redacted_text, ) = tabular_anonymise_wrapper_func( file_path, anon_df, chosen_cols, out_file_paths, out_file_part, out_message, sheet_name, anon_strategy, language, chosen_redact_entities, in_allow_list, file_type, "", log_files_output_paths, in_deny_list, max_fuzzy_spelling_mistakes_num, pii_identification_method, language, chosen_redact_comprehend_entities, comprehend_query_number, comprehend_client, output_folder=output_folder, do_initial_clean=do_initial_clean, bedrock_runtime=bedrock_runtime, custom_llm_instructions=custom_llm_instructions, chosen_llm_entities=chosen_llm_entities, ) llm_total_input_tokens += tbl_llm_in llm_total_output_tokens += tbl_llm_out if tbl_llm_model and not llm_model_name: llm_model_name = tbl_llm_model _maybe_set_redaction_example(first_redacted_text, "open text input") else: # If file is an xlsx, we are going to run through all the Excel sheets to anonymise them separately. file_type = detect_file_type(file_path) # print("File type is:", file_type) out_file_part = get_file_name_without_type(file_path) if file_type == "docx": ( out_file_paths, comprehend_query_number, docx_llm_in, docx_llm_out, docx_llm_model, first_redacted_text, ) = handle_docx_anonymisation( file_path=file_path, output_folder=output_folder, anon_strategy=anon_strategy, chosen_redact_entities=chosen_redact_entities, in_allow_list=in_allow_list_flat, in_deny_list=in_deny_list, max_fuzzy_spelling_mistakes_num=max_fuzzy_spelling_mistakes_num, pii_identification_method=pii_identification_method, chosen_redact_comprehend_entities=chosen_redact_comprehend_entities, comprehend_query_number=comprehend_query_number, comprehend_client=comprehend_client, language=language, out_file_paths=out_file_paths, ) llm_total_input_tokens += docx_llm_in llm_total_output_tokens += docx_llm_out if docx_llm_model and not llm_model_name: llm_model_name = docx_llm_model _maybe_set_redaction_example( first_redacted_text, "first page of Word document" ) elif file_type == "xlsx": # print("Running through all xlsx sheets") if not in_excel_sheets: out_message.append( "No Excel sheets selected. Please select at least one to anonymise." ) continue # Create xlsx file: anon_xlsx = pd.ExcelFile(file_path) anon_xlsx_export_file_name = ( output_folder + out_file_part + "_redacted.xlsx" ) # Iterate through the sheet names for sheet_name in progress.tqdm( in_excel_sheets, desc="Anonymising sheets", unit="sheets" ): # Read each sheet into a DataFrame if sheet_name not in anon_xlsx.sheet_names: continue anon_df = pd.read_excel(file_path, sheet_name=sheet_name) ( out_file_paths, out_message, key_string, log_files_output_paths, comprehend_query_number, tbl_llm_in, tbl_llm_out, tbl_llm_model, first_redacted_text, ) = tabular_anonymise_wrapper_func( anon_file, anon_df, chosen_cols, out_file_paths, out_file_part, out_message, sheet_name, anon_strategy, language, chosen_redact_entities, in_allow_list, file_type, anon_xlsx_export_file_name, log_files_output_paths, in_deny_list, max_fuzzy_spelling_mistakes_num, pii_identification_method, language, chosen_redact_comprehend_entities, comprehend_query_number, comprehend_client, output_folder=output_folder, do_initial_clean=do_initial_clean, bedrock_runtime=bedrock_runtime, custom_llm_instructions=custom_llm_instructions, chosen_llm_entities=chosen_llm_entities, ) llm_total_input_tokens += tbl_llm_in llm_total_output_tokens += tbl_llm_out if tbl_llm_model and not llm_model_name: llm_model_name = tbl_llm_model sheet_label = ( f"first processed cell (sheet: {sheet_name})" if sheet_name else "first processed cell" ) _maybe_set_redaction_example(first_redacted_text, sheet_label) else: sheet_name = "" anon_df = read_file(file_path) out_file_part = get_file_name_without_type(file_path) ( out_file_paths, out_message, key_string, log_files_output_paths, comprehend_query_number, tbl_llm_in, tbl_llm_out, tbl_llm_model, first_redacted_text, ) = tabular_anonymise_wrapper_func( anon_file, anon_df, chosen_cols, out_file_paths, out_file_part, out_message, sheet_name, anon_strategy, language, chosen_redact_entities, in_allow_list, file_type, "", log_files_output_paths, in_deny_list, max_fuzzy_spelling_mistakes_num, pii_identification_method, language, chosen_redact_comprehend_entities, comprehend_query_number, comprehend_client, output_folder=output_folder, do_initial_clean=do_initial_clean, bedrock_runtime=bedrock_runtime, custom_llm_instructions=custom_llm_instructions, chosen_llm_entities=chosen_llm_entities, ) llm_total_input_tokens += tbl_llm_in llm_total_output_tokens += tbl_llm_out if tbl_llm_model and not llm_model_name: llm_model_name = tbl_llm_model _maybe_set_redaction_example( first_redacted_text, "first processed cell" ) out_message_out = "" # Increase latest file completed count unless we are at the last file if latest_file_completed != len(file_paths): print("Completed file number:", str(latest_file_completed)) latest_file_completed += 1 toc = time.perf_counter() out_time_float = toc - tic out_time = f"in {out_time_float:0.1f} seconds." print(out_time) actual_time_taken_number += out_time_float actual_time_taken_number = round(actual_time_taken_number, 1) if isinstance(out_message, str): out_message = [out_message] out_message.append( "Anonymisation of file '" + out_file_part + "' successfully completed in" ) out_message_out = "\n".join(out_message) out_message_out = out_message_out + " " + out_time if anon_strategy == "encrypt": out_message_out.append(". Your decryption key is " + key_string) from tools.secure_regex_utils import safe_remove_leading_newlines out_message_out = safe_remove_leading_newlines(out_message_out) out_message_out = out_message_out.lstrip(". ") return ( out_message_out, out_file_paths, out_file_paths, latest_file_completed, log_files_output_paths, log_files_output_paths, actual_time_taken_number, comprehend_query_number, llm_total_input_tokens, llm_total_output_tokens, llm_model_name, redaction_example_markdown, ) def tabular_anonymise_wrapper_func( anon_file: str, anon_df: pd.DataFrame, chosen_cols: List[str], out_file_paths: List[str], out_file_part: str, out_message: str, excel_sheet_name: str, anon_strategy: str, language: str, chosen_redact_entities: List[str], in_allow_list: List[str], file_type: str, anon_xlsx_export_file_name: str, log_files_output_paths: List[str], in_deny_list: List[str] = list(), max_fuzzy_spelling_mistakes_num: int = 0, pii_identification_method: str = "Local", comprehend_language: Optional[str] = None, chosen_redact_comprehend_entities: List[str] = list(), comprehend_query_number: int = 0, comprehend_client: botocore.client.BaseClient = "", nlp_analyser: AnalyzerEngine = nlp_analyser, output_folder: str = OUTPUT_FOLDER, do_initial_clean: bool = DO_INITIAL_TABULAR_DATA_CLEAN, bedrock_runtime=None, custom_llm_instructions: str = "", chosen_llm_entities: Optional[List[str]] = None, ): """ This function wraps the anonymisation process for a given dataframe. It filters the dataframe based on chosen columns, applies the specified anonymisation strategy using the anonymise_script function, and exports the anonymised data to a file. Input Variables: - anon_file: The path to the file containing the data to be anonymized. - anon_df: The pandas DataFrame containing the data to be anonymized. - chosen_cols: A list of column names to be anonymized. - out_file_paths: A list of paths where the anonymized files will be saved. - out_file_part: A part of the output file name. - out_message: A message to be displayed during the anonymization process. - excel_sheet_name: The name of the Excel sheet where the anonymized data will be exported. - anon_strategy: The anonymization strategy to be applied. - language: The language of the data to be anonymized. - chosen_redact_entities: A list of entities to be redacted. - in_allow_list: A list of allowed values. - file_type: The type of file to be exported. - anon_xlsx_export_file_name: The name of the anonymized Excel file. - log_files_output_paths: A list of paths where the log files will be saved. - in_deny_list: List of specific terms to remove from the data. - max_fuzzy_spelling_mistakes_num (int, optional): The maximum number of spelling mistakes allowed in a searched phrase for fuzzy matching. Can range from 0-9. - pii_identification_method (str, optional): The method to redact personal information. Either 'Local' (spacy model), or 'AWS Comprehend' (AWS Comprehend API). - chosen_redact_comprehend_entities (List[str]): A list of entity types to redact from files, chosen from the official list from AWS Comprehend service. - comprehend_query_number (int, optional): A counter for AWS Comprehend usage in units of 100 characters (1 unit = 100 characters, per AWS billing). Defaults to 0. - comprehend_client (optional): The client object from AWS containing a client connection to AWS Comprehend if that option is chosen on the first tab. - output_folder: The folder where the anonymized files will be saved. Defaults to the 'output_folder' variable. - do_initial_clean (bool, optional): Whether to perform an initial cleaning of the text. Defaults to True. """ def check_lists(list1, list2): return any(string in list2 for string in list1) def get_common_strings(list1, list2): """ Finds the common strings between two lists. Args: list1: The first list of strings. list2: The second list of strings. Returns: A list containing the common strings. """ common_strings = list() for string in list1: if string in list2: common_strings.append(string) return common_strings if pii_identification_method == "AWS Comprehend" and comprehend_client == "": raise ( "Connection to AWS Comprehend service not found, please check connection details." ) # Check for chosen col, skip file if not found all_cols_original_order = list(anon_df.columns) if not chosen_cols: chosen_cols = list(all_cols_original_order) any_cols_found = check_lists(chosen_cols, all_cols_original_order) if any_cols_found is False: out_message = "No chosen columns found in dataframe: " + out_file_part key_string = "" print(out_message) return ( out_file_paths, out_message, key_string, log_files_output_paths, comprehend_query_number, 0, 0, "", "", ) else: chosen_cols_in_anon_df = get_common_strings( chosen_cols, all_cols_original_order ) # Split dataframe to keep only selected columns # print("Remaining columns to redact:", chosen_cols_in_anon_df) if not anon_df.index.is_unique: anon_df = anon_df.reset_index(drop=True) anon_df_part = anon_df[chosen_cols_in_anon_df] anon_df_remain = anon_df.drop(chosen_cols_in_anon_df, axis=1) row_count = anon_df_part.shape[0] if row_count > MAX_TABLE_ROWS: out_message = f"Number of rows in dataframe is greater than {MAX_TABLE_ROWS}. Please submit a smaller dataframe." print(out_message) raise Exception(out_message) column_count = anon_df_part.shape[1] if column_count > MAX_TABLE_COLUMNS: out_message = f"Number of columns in dataframe is greater than {MAX_TABLE_COLUMNS}. Please submit a smaller dataframe." print(out_message) raise Exception(out_message) # Anonymise the selected columns ( anon_df_part_out, key_string, decision_process_output_str, comprehend_query_number, decision_process_output_df, llm_total_input_tokens, llm_total_output_tokens, llm_model_name, ) = anonymise_script( anon_df_part, anon_strategy, language, chosen_redact_entities, in_allow_list, in_deny_list, max_fuzzy_spelling_mistakes_num, pii_identification_method, chosen_redact_comprehend_entities, comprehend_query_number, comprehend_client, nlp_analyser=nlp_analyser, do_initial_clean=do_initial_clean, bedrock_runtime=bedrock_runtime, file_name=out_file_part, sheet_name=excel_sheet_name if excel_sheet_name else None, output_folder=output_folder, custom_llm_instructions=custom_llm_instructions, chosen_llm_entities=chosen_llm_entities, ) anon_df_part_out.replace("^nan$", "", regex=True, inplace=True) # Rejoin the dataframe together anon_df_out = pd.concat([anon_df_part_out, anon_df_remain], axis=1) # Reorder to match original column order; add any missing columns as empty # (avoids KeyError when e.g. chosen_cols referred to columns from another sheet/file) missing_cols = [c for c in all_cols_original_order if c not in anon_df_out.columns] if missing_cols: for c in missing_cols: anon_df_out[c] = "" anon_df_out = anon_df_out[all_cols_original_order] # Export file # Rename anonymisation strategy for file path naming if anon_strategy == "replace with 'REDACTED'": anon_strat_txt = "redact_replace" elif anon_strategy == "replace with ": anon_strat_txt = "redact_entity_type" elif anon_strategy == "redact completely": anon_strat_txt = "redact_remove" else: anon_strat_txt = anon_strategy # If the file is an xlsx, add a new sheet to the existing xlsx. Otherwise, write to csv if file_type == "xlsx": anon_export_file_name = anon_xlsx_export_file_name if not os.path.exists(anon_xlsx_export_file_name): wb = Workbook() ws = wb.active # Get the default active sheet ws.title = excel_sheet_name wb.save(anon_xlsx_export_file_name) # Create a Pandas Excel writer using XlsxWriter as the engine. with pd.ExcelWriter( anon_xlsx_export_file_name, engine="openpyxl", mode="a", if_sheet_exists="replace", ) as writer: # Write each DataFrame to a different worksheet. anon_df_out.to_excel(writer, sheet_name=excel_sheet_name, index=None) decision_process_log_output_file = ( anon_xlsx_export_file_name + "_" + excel_sheet_name + "_log.csv" ) decision_process_output_df.to_csv( decision_process_log_output_file, index=None, encoding="utf-8-sig" ) else: anon_export_file_name = ( output_folder + out_file_part + "_anon_" + anon_strat_txt + ".csv" ) anon_df_out.to_csv(anon_export_file_name, index=None, encoding="utf-8-sig") decision_process_log_output_file = anon_export_file_name + "_log.csv" decision_process_output_df.to_csv( decision_process_log_output_file, index=None, encoding="utf-8-sig" ) out_file_paths.append(anon_export_file_name) out_file_paths.append(decision_process_log_output_file) # As files are created in a loop, there is a risk of duplicate file names being output. Use set to keep uniques. out_file_paths = list(set(out_file_paths)) # Print result text to output text box if just anonymising open text if anon_file == "open_text": out_message = ["'" + anon_df_out["text"][0] + "'"] if anon_file == "open_text" and "text" in anon_df_out.columns: first_redacted_text = _first_non_empty_cell_text(anon_df_out, ["text"]) else: first_redacted_text = _first_non_empty_cell_text( anon_df_part_out, chosen_cols_in_anon_df ) return ( out_file_paths, out_message, key_string, log_files_output_paths, comprehend_query_number, llm_total_input_tokens, llm_total_output_tokens, llm_model_name, first_redacted_text, ) def anonymise_script( df: pd.DataFrame, anon_strategy: str, language: str, chosen_redact_entities: List[str], in_allow_list: List[str] = list(), in_deny_list: List[str] = list(), max_fuzzy_spelling_mistakes_num: int = 0, pii_identification_method: str = "Local", chosen_redact_comprehend_entities: List[str] = list(), comprehend_query_number: int = 0, comprehend_client: botocore.client.BaseClient = "", custom_entities: List[str] = custom_entities, nlp_analyser: AnalyzerEngine = nlp_analyser, do_initial_clean: bool = DO_INITIAL_TABULAR_DATA_CLEAN, progress: Progress = Progress(track_tqdm=True), bedrock_runtime=None, model_choice: str = CLOUD_LLM_PII_MODEL_CHOICE, custom_llm_instructions: str = "", chosen_llm_entities: List[str] = None, file_name: Optional[str] = None, sheet_name: Optional[str] = None, output_folder: Optional[str] = None, **text_analyzer_kwargs, ): """ Conduct anonymisation of a dataframe using Presidio, AWS Comprehend, or LLM if chosen. Args: df (pd.DataFrame): The input DataFrame containing text to be anonymised. anon_strategy (str): The anonymisation strategy to apply (e.g., "replace with 'REDACTED'", "replace with ", "redact completely"). language (str): The language of the text for analysis (e.g., "en", "es"). chosen_redact_entities (List[str]): A list of entity types to redact using the local (Presidio) method. in_allow_list (List[str], optional): A list of terms to explicitly allow and not redact. Defaults to an empty list. in_deny_list (List[str], optional): A list of terms to explicitly deny and always redact. Defaults to an empty list. max_fuzzy_spelling_mistakes_num (int, optional): The maximum number of fuzzy spelling mistakes to tolerate for custom recognizers. Defaults to 0. pii_identification_method (str, optional): The method for PII identification ("Local", "AWS Comprehend", or "LLM (AWS Bedrock)"). Defaults to "Local". chosen_redact_comprehend_entities (List[str], optional): A list of entity types to redact using AWS Comprehend or LLM. Defaults to an empty list. comprehend_query_number (int, optional): For AWS Comprehend, counter in units of 100 characters (1 unit = 100 characters, per AWS billing). For LLM, incremented per batch. Defaults to 0. comprehend_client (botocore.client.BaseClient, optional): An initialized AWS Comprehend client. Defaults to an empty string. custom_entities (List[str], optional): A list of custom entities to be recognized. Defaults to `custom_entities`. nlp_analyser (AnalyzerEngine, optional): The Presidio AnalyzerEngine instance to use. Defaults to `nlp_analyser`. do_initial_clean (bool, optional): Whether to perform an initial cleaning of the text. Defaults to True. progress (Progress, optional): Gradio Progress object for tracking progress. Defaults to Progress(track_tqdm=False). bedrock_runtime (optional): AWS Bedrock runtime client for LLM-based entity detection. model_choice (str, optional): LLM model choice for entity detection. Defaults to CLOUD_LLM_PII_MODEL_CHOICE. custom_llm_instructions (str, optional): Custom instructions for LLM entity detection. Defaults to empty string. chosen_llm_entities (List[str], optional): List of entity types to detect using LLM. Defaults to None (uses chosen_redact_comprehend_entities). file_name (Optional[str], optional): File name for logging purposes. Defaults to None. output_folder (Optional[str], optional): Folder for LLM prompt/response logs. When None, uses OUTPUT_FOLDER from config. Pass the session output folder (e.g. from output_folder_textbox) so logs go to the same place as other outputs. **text_analyzer_kwargs: Additional keyword arguments for text analyzer (e.g., temperature, max_tokens, inference_method). """ print("Identifying personal information") analyse_tic = time.perf_counter() # LLM token counts (used when pii_identification_method is an LLM option) llm_total_input_tokens = 0 llm_total_output_tokens = 0 llm_model_name = "" # Initialize analyzer_results as an empty dictionary to store results by column results_by_column = dict() key_string = "" # Handle both list (new Dropdown format) and DataFrame (legacy) if isinstance(in_allow_list, list): # Dropdown component returns a list directly in_allow_list_flat = ( [str(item) for item in in_allow_list if item] if in_allow_list else list() ) elif isinstance(in_allow_list, pd.DataFrame): if not in_allow_list.empty: in_allow_list_flat = list(in_allow_list.iloc[:, 0].unique()) else: in_allow_list_flat = list() else: in_allow_list_flat = list() ### Language check - check if selected language packs exist try: if language != "en": progress(0.1, desc=f"Loading spaCy model for {language}") load_spacy_model(language) except Exception as e: out_message = f"Error downloading language packs for {language}: {e}" print(out_message) raise Exception(out_message) # Try updating the supported languages for the spacy analyser try: nlp_analyser = create_nlp_analyser(language, existing_nlp_analyser=nlp_analyser) # Check list of nlp_analyser recognisers and languages if language != "en": gr.Info( f"Language: {language} only supports the following entity detection: {str(nlp_analyser.registry.get_supported_entities(languages=[language]))}" ) except Exception as e: out_message = f"Error creating nlp_analyser for {language}: {e}" print(out_message) raise Exception(out_message) # Handle both list (new Dropdown format) and DataFrame (legacy) if isinstance(in_deny_list, list): # Dropdown component returns a list directly in_deny_list = ( [str(item) for item in in_deny_list if item] if in_deny_list else list() ) # Sort the strings in order from the longest string to the shortest in_deny_list = sorted(in_deny_list, key=len, reverse=True) elif isinstance(in_deny_list, pd.DataFrame): if not in_deny_list.empty: in_deny_list = in_deny_list.iloc[:, 0].tolist() else: # Handle the case where the DataFrame is empty in_deny_list = list() # or some default value # Sort the strings in order from the longest string to the shortest in_deny_list = sorted(in_deny_list, key=len, reverse=True) if in_deny_list: nlp_analyser.registry.remove_recognizer("CUSTOM") new_custom_recogniser = custom_word_list_recogniser(in_deny_list) nlp_analyser.registry.add_recognizer(new_custom_recogniser) nlp_analyser.registry.remove_recognizer("CustomWordFuzzyRecognizer") new_custom_fuzzy_recogniser = CustomWordFuzzyRecognizer( supported_entities=["CUSTOM_FUZZY"], custom_list=in_deny_list, spelling_mistakes_max=in_deny_list, search_whole_phrase=max_fuzzy_spelling_mistakes_num, ) nlp_analyser.registry.add_recognizer(new_custom_fuzzy_recogniser) # analyzer = nlp_analyser #AnalyzerEngine() batch_analyzer = BatchAnalyzerEngine(analyzer_engine=nlp_analyser) anonymizer = ( AnonymizerEngine() ) # conflict_resolution=ConflictResolutionStrategy.MERGE_SIMILAR_OR_CONTAINED) batch_anonymizer = BatchAnonymizerEngine(anonymizer_engine=anonymizer) analyzer_results = list() if do_initial_clean: progress(0.2, desc="Cleaning text") columns = list(df.columns) max_workers = min(MAX_WORKERS, len(columns)) with ThreadPoolExecutor(max_workers=max_workers) as executor: cleaned = list( progress.tqdm( executor.map(lambda col: (col, initial_clean(df[col])), columns), total=len(columns), desc="Cleaning text", unit="Columns", ) ) for col, cleaned_series in cleaned: df[col] = cleaned_series # DataFrame to dict df_dict = df.to_dict(orient="list") if pii_identification_method == "Local": # Run Local (Presidio) analysis in parallel over columns def _analyze_one_column_local(item): column_name, texts = item if not texts or (isinstance(texts, (list, tuple)) and len(texts) == 0): return DictAnalyzerResult( key=column_name, value=texts, recognizer_results=[] ) if not isinstance(texts, (list, tuple)): texts = [texts] try: results = analyze_iterator_custom( batch_analyzer, texts=texts, language=language, list_length=len(texts), context=[column_name], entities=chosen_redact_entities, score_threshold=score_threshold, return_decision_process=True, allow_list=in_allow_list_flat, ) return DictAnalyzerResult( key=column_name, value=texts, recognizer_results=results ) except Exception as e: return (column_name, None, e) local_tasks = list(df_dict.items()) max_workers = min(MAX_WORKERS, len(local_tasks)) if local_tasks else 1 with ThreadPoolExecutor(max_workers=max_workers) as executor: local_results = list( progress.tqdm( executor.map(_analyze_one_column_local, local_tasks), total=len(local_tasks), desc="Analyzing text (Local PII).", unit="columns", ) ) for result in local_results: if isinstance(result, tuple) and len(result) == 3 and result[2] is not None: _, _, err = result raise err results_by_column[result.key] = result # Convert the dictionary of results back to a list analyzer_results = list(results_by_column.values()) # AWS Comprehend calls elif pii_identification_method == "AWS Comprehend" and comprehend_client: # Match CustomImageAnalyzerEngine.analyze_text (AWS path): run Presidio first for # CUSTOM / CUSTOM_FUZZY and other custom-entity types, then merge Comprehend hits # per cell (deny list is enforced via CUSTOM / CUSTOM_FUZZY recognizers). from tools.custom_image_analyser_engine import filter_entities_for_language valid_language_entities = nlp_analyser.registry.get_supported_entities( languages=[language] ) if "CUSTOM" not in valid_language_entities: valid_language_entities.append("CUSTOM") if "CUSTOM_FUZZY" not in valid_language_entities: valid_language_entities.append("CUSTOM_FUZZY") local_custom_entities = [ entity for entity in (chosen_redact_comprehend_entities or []) if entity in (custom_entities or []) or entity in ("CUSTOM", "CUSTOM_FUZZY") ] if in_deny_list: for ent in ("CUSTOM", "CUSTOM_FUZZY"): if ent not in local_custom_entities: local_custom_entities.append(ent) language_supported_entities = [] if local_custom_entities: language_supported_entities = filter_entities_for_language( local_custom_entities, valid_language_entities, language ) if language_supported_entities: custom_results = analyze_dict( batch_analyzer, df_dict, language=language, entities=language_supported_entities, score_threshold=score_threshold, return_decision_process=True, allow_list=in_allow_list_flat, ) for result in custom_results: results_by_column[result.key] = result max_retries = 3 retry_delay = 3 # Build list of (column_name, text_idx, text_str) for all cells comprehend_tasks = [] for column_name, texts in df_dict.items(): if column_name in results_by_column: column_results = results_by_column[column_name] else: column_results = DictAnalyzerResult( recognizer_results=[[] for _ in texts], key=column_name, value=texts ) results_by_column[column_name] = column_results for text_idx, text in enumerate(texts): text_str = str(text) if text else "" comprehend_tasks.append((column_name, text_idx, text_str)) def _run_comprehend_task(item): column_name, text_idx, text_str = item try: recognizer_list, units = _comprehend_one_cell( comprehend_client, text_str, language, chosen_redact_comprehend_entities, in_allow_list_flat, max_retries=max_retries, retry_delay=retry_delay, ) return (column_name, text_idx, recognizer_list, units, None) except Exception as e: return (column_name, text_idx, [], 0, e) max_workers = min(MAX_WORKERS, len(comprehend_tasks)) if comprehend_tasks else 1 with ThreadPoolExecutor(max_workers=max_workers) as executor: completed = list( progress.tqdm( executor.map(_run_comprehend_task, comprehend_tasks), total=len(comprehend_tasks), desc="Querying AWS Comprehend service.", unit="cells", ) ) for column_name, text_idx, recognizer_list, units, err in completed: if err is not None: print( f"AWS Comprehend calls failed for cell ({column_name}, {text_idx}) due to", err, ) raise err comprehend_query_number += units prior = results_by_column[column_name].recognizer_results[text_idx] results_by_column[column_name].recognizer_results[text_idx] = list( prior ) + list(recognizer_list) # Convert the dictionary of results back to a list analyzer_results = list(results_by_column.values()) elif (pii_identification_method == "AWS Comprehend") & (not comprehend_client): raise ("Unable to redact, Comprehend connection details not found.") # LLM-based entity detection elif pii_identification_method == AWS_LLM_PII_OPTION: if not bedrock_runtime and text_analyzer_kwargs.get("inference_method") not in [ "local", "inference-server", "azure-openai", "gemini", ]: raise ValueError( "bedrock_runtime is required when using LLM-based PII detection with AWS Bedrock" ) # Set inference method to aws-bedrock if not already set if text_analyzer_kwargs.get("inference_method") is None: text_analyzer_kwargs["inference_method"] = "aws-bedrock" # Set model choice if not already set if text_analyzer_kwargs.get("model_choice") is None: text_analyzer_kwargs["model_choice"] = ( model_choice or CLOUD_LLM_PII_MODEL_CHOICE ) # Default chosen_llm_entities to chosen_redact_comprehend_entities if not provided if chosen_llm_entities is None: chosen_llm_entities = chosen_redact_comprehend_entities elif pii_identification_method == INFERENCE_SERVER_PII_OPTION: # LLM-based entity detection using inference server from tools.config import ( INFERENCE_SERVER_API_URL, ) # Set inference method to inference-server if not already set if text_analyzer_kwargs.get("inference_method") is None: text_analyzer_kwargs["inference_method"] = "inference-server" # Set API URL if not already set if text_analyzer_kwargs.get("api_url") is None: text_analyzer_kwargs["api_url"] = INFERENCE_SERVER_API_URL # Set model choice if not already set - use INFERENCE_SERVER_LLM_PII_MODEL_CHOICE if text_analyzer_kwargs.get("model_choice") is None: from tools.config import INFERENCE_SERVER_LLM_PII_MODEL_CHOICE text_analyzer_kwargs["model_choice"] = INFERENCE_SERVER_LLM_PII_MODEL_CHOICE # Use the same logic as AWS_LLM_PII_OPTION for the rest # Default chosen_llm_entities to chosen_redact_comprehend_entities if not provided if chosen_llm_entities is None: chosen_llm_entities = chosen_redact_comprehend_entities elif pii_identification_method == LOCAL_TRANSFORMERS_LLM_PII_OPTION: # LLM-based entity detection using local transformers models # Set inference method to local if not already set if text_analyzer_kwargs.get("inference_method") is None: text_analyzer_kwargs["inference_method"] = "local" # Set model choice if not already set - use LOCAL_TRANSFORMERS_LLM_PII_MODEL_CHOICE if text_analyzer_kwargs.get("model_choice") is None: text_analyzer_kwargs["model_choice"] = ( LOCAL_TRANSFORMERS_LLM_PII_MODEL_CHOICE ) # Use the same logic as AWS_LLM_PII_OPTION for the rest # Default chosen_llm_entities to chosen_redact_comprehend_entities if not provided if chosen_llm_entities is None: chosen_llm_entities = chosen_redact_comprehend_entities # Shared LLM column/cell detection for AWS Bedrock, Inference Server, and Local Transformers if pii_identification_method in ( AWS_LLM_PII_OPTION, INFERENCE_SERVER_PII_OPTION, LOCAL_TRANSFORMERS_LLM_PII_OPTION, ): # Handle custom entities first (same as AWS Comprehend) if custom_entities: custom_redact_entities = [ entity for entity in chosen_llm_entities if entity in custom_entities ] if custom_redact_entities: # Get valid language entities valid_language_entities = nlp_analyser.registry.get_supported_entities( languages=[language] ) if "CUSTOM" not in valid_language_entities: valid_language_entities.append("CUSTOM") if "CUSTOM_FUZZY" not in valid_language_entities: valid_language_entities.append("CUSTOM_FUZZY") # Filter entities to only include those supported by the language from tools.custom_image_analyser_engine import ( filter_entities_for_language, ) language_supported_entities = filter_entities_for_language( custom_redact_entities, valid_language_entities, language ) if language_supported_entities: custom_results = analyze_dict( batch_analyzer, df_dict, language=language, entities=language_supported_entities, score_threshold=score_threshold, return_decision_process=True, allow_list=in_allow_list_flat, ) # Initialize results_by_column with custom entity results for result in custom_results: results_by_column[result.key] = result # Remove 'CUSTOM' and 'CUSTOM_VLM_*' entities from the chosen_llm_entities list # CUSTOM_VLM_* entities are handled separately via VLM, not LLM llm_chosen_redact_entities = [ entity for entity in chosen_llm_entities if entity != "CUSTOM" and not entity.startswith("CUSTOM_VLM_") ] # Validate: if no standard entities and no custom instructions, raise error if not llm_chosen_redact_entities and ( not custom_llm_instructions or not custom_llm_instructions.strip() ): raise ValueError( "No standard entities selected for LLM PII detection and no custom instructions provided. " "Please select at least one entity type (excluding CUSTOM_VLM_* entities) or provide custom instructions." ) # If no LLM entities to detect but custom instructions exist, still call LLM with custom instructions only # If no entities and no custom instructions, the validation above will have raised an error # So at this point, we either have entities OR custom instructions (or both) max_retries = 3 retry_delay = 3 # Use model_choice from kwargs when set (e.g. by INFERENCE_SERVER or LOCAL_TRANSFORMERS branches) effective_model_choice = text_analyzer_kwargs.get("model_choice", model_choice) llm_total_input_tokens = 0 llm_total_output_tokens = 0 # Report the model actually used: upgraded to custom-instructions model when applicable custom_instructions_model = ( CLOUD_LLM_PII_CUSTOM_INSTRUCTIONS_MODEL_CHOICE.strip() if isinstance(CLOUD_LLM_PII_CUSTOM_INSTRUCTIONS_MODEL_CHOICE, str) and CLOUD_LLM_PII_CUSTOM_INSTRUCTIONS_MODEL_CHOICE.strip() else "" ) if ( (custom_llm_instructions or "").strip() and effective_model_choice == CLOUD_LLM_PII_MODEL_CHOICE and custom_instructions_model ): llm_model_name = custom_instructions_model else: llm_model_name = effective_model_choice or "" # Build list of (task_idx, column_name, text_idx, text_str) for non-empty cells llm_tasks = [] for column_name, texts in df_dict.items(): if column_name in results_by_column: column_results = results_by_column[column_name] else: column_results = DictAnalyzerResult( recognizer_results=[[] for _ in texts], key=column_name, value=texts, ) results_by_column[column_name] = column_results for text_idx, text in enumerate(texts): text_str = str(text) if text else "" if not text_str.strip(): continue llm_tasks.append((len(llm_tasks), column_name, text_idx, text_str)) def _run_llm_task(item): task_idx, column_name, text_idx, text_str = item for attempt in range(max_retries): try: entities, batch_input_tokens, batch_output_tokens = ( call_llm_for_entity_detection( text=text_str, entities_to_detect=llm_chosen_redact_entities, language=language, bedrock_runtime=bedrock_runtime, model_choice=effective_model_choice, temperature=text_analyzer_kwargs.get( "temperature", LLM_TEMPERATURE ), max_tokens=text_analyzer_kwargs.get( "max_tokens", LLM_MAX_NEW_TOKENS ), output_folder=( output_folder if output_folder is not None else OUTPUT_FOLDER ), batch_number=task_idx + 1, custom_instructions=custom_llm_instructions, file_name=file_name, page_number=None, sheet_name=sheet_name, column_name=column_name, row_number=text_idx + 1, inference_method=text_analyzer_kwargs.get( "inference_method" ), client=text_analyzer_kwargs.get("client"), client_config=text_analyzer_kwargs.get("client_config"), api_url=text_analyzer_kwargs.get("api_url"), ) ) return ( column_name, text_idx, text_str, entities, batch_input_tokens, batch_output_tokens, None, ) except Exception as e: if attempt == max_retries - 1: return (column_name, text_idx, text_str, [], 0, 0, e) time.sleep(retry_delay) return (column_name, text_idx, text_str, [], 0, 0, None) max_llm_workers = ( min(LLM_PII_MAX_CONCURRENT_REQUESTS, len(llm_tasks)) if llm_tasks else 1 ) with ThreadPoolExecutor(max_workers=max_llm_workers) as executor: llm_results = list( progress.tqdm( executor.map(_run_llm_task, llm_tasks), total=len(llm_tasks), desc="Querying LLM service.", unit="cells", ) ) for ( column_name, text_idx, text_str, entities, batch_input_tokens, batch_output_tokens, err, ) in llm_results: if err is not None: print( f"LLM entity detection failed for text: {text_str[:100]}... due to", err, ) raise err llm_total_input_tokens += batch_input_tokens llm_total_output_tokens += batch_output_tokens column_results = results_by_column[column_name] for entity in entities: if not isinstance(entity, dict): continue entity_type = entity.get("Type", "") begin_offset = entity.get("BeginOffset", 0) end_offset = entity.get("EndOffset", 0) entity_text = entity.get("Text", text_str[begin_offset:end_offset]) if in_allow_list_flat: allow_list_normalized = [ item.strip().lower() for item in in_allow_list_flat if item ] if entity_text.strip().lower() in allow_list_normalized: continue if ( llm_chosen_redact_entities and entity_type not in llm_chosen_redact_entities ): if not ( custom_llm_instructions and str(custom_llm_instructions).strip() ): continue recognizer_result = RecognizerResult( entity_type=entity_type, start=begin_offset, end=end_offset, score=entity.get("Score", 0.0), ) column_results.recognizer_results[text_idx].append(recognizer_result) # Convert the dictionary of results back to a list analyzer_results = list(results_by_column.values()) # Usage in the main function: decision_process_output_str, decision_process_output_df = generate_log( analyzer_results, df_dict ) analyse_toc = time.perf_counter() analyse_time_out = ( f"Analysing the text took {analyse_toc - analyse_tic:0.1f} seconds." ) print(analyse_time_out) # Set up the anonymization configuration WITHOUT DATE_TIME simple_replace_config = { "DEFAULT": OperatorConfig("replace", {"new_value": "REDACTED"}) } replace_config = {"DEFAULT": OperatorConfig("replace")} redact_config = {"DEFAULT": OperatorConfig("redact")} hash_config = {"DEFAULT": OperatorConfig("hash")} mask_config = { "DEFAULT": OperatorConfig( "mask", {"masking_char": "*", "chars_to_mask": 100, "from_end": True} ) } people_encrypt_config = { "PERSON": OperatorConfig("encrypt", {"key": key_string}) } # The encryption is using AES cypher in CBC mode and requires a cryptographic key as an input for both the encryption and the decryption. fake_first_name_config = { "PERSON": OperatorConfig("custom", {"lambda": fake_first_name}) } if anon_strategy == "replace with 'REDACTED'": chosen_mask_config = simple_replace_config elif anon_strategy == "replace_redacted": chosen_mask_config = simple_replace_config elif anon_strategy == "replace with ": chosen_mask_config = replace_config elif anon_strategy == "entity_type": chosen_mask_config = replace_config elif anon_strategy == "redact completely": chosen_mask_config = redact_config elif anon_strategy == "redact": chosen_mask_config = redact_config elif anon_strategy == "hash": chosen_mask_config = hash_config elif anon_strategy == "mask": chosen_mask_config = mask_config elif anon_strategy == "encrypt": chosen_mask_config = people_encrypt_config key = secrets.token_bytes(16) # 128 bits = 16 bytes key_string = base64.b64encode(key).decode("utf-8") # Now inject the key into the operator config for entity, operator in chosen_mask_config.items(): if operator.operator_name == "encrypt": operator.params = {"key": key_string} elif anon_strategy == "fake_first_name": chosen_mask_config = fake_first_name_config else: print("Anonymisation strategy not found. Redacting completely by default.") chosen_mask_config = redact_config # Redact completely by default combined_config = {**chosen_mask_config} anonymizer_results = batch_anonymizer.anonymize_dict( analyzer_results, operators=combined_config ) scrubbed_df = pd.DataFrame(anonymizer_results) return ( scrubbed_df, key_string, decision_process_output_str, comprehend_query_number, decision_process_output_df, llm_total_input_tokens, llm_total_output_tokens, llm_model_name, )