| """ |
| CLI Usage Logger - A simplified version of the Gradio CSVLogger_custom for CLI usage logging. |
| This module provides functionality to log usage data from CLI operations to CSV files and optionally DynamoDB. |
| """ |
|
|
| import csv |
| import os |
| import uuid |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Any, List |
|
|
| import boto3 |
|
|
| from tools.aws_functions import upload_log_file_to_s3 |
| from tools.config import ( |
| AWS_ACCESS_KEY, |
| AWS_REGION, |
| AWS_SECRET_KEY, |
| CSV_USAGE_LOG_HEADERS, |
| DOCUMENT_REDACTION_BUCKET, |
| DYNAMODB_USAGE_LOG_HEADERS, |
| HOST_NAME, |
| RUN_AWS_FUNCTIONS, |
| S3_USAGE_LOGS_FOLDER, |
| SAVE_LOGS_TO_CSV, |
| SAVE_LOGS_TO_DYNAMODB, |
| USAGE_LOG_DYNAMODB_TABLE_NAME, |
| USAGE_LOGS_FOLDER, |
| ) |
|
|
|
|
| class CLIUsageLogger: |
| """ |
| A simplified usage logger for CLI operations that mimics the functionality |
| of the Gradio CSVLogger_custom class. |
| """ |
|
|
| def __init__( |
| self, dataset_file_name: str = "usage_log.csv", logs_folder: str = None |
| ): |
| """ |
| Initialize the CLI usage logger. |
| |
| Args: |
| dataset_file_name: Name of the CSV file to store logs |
| logs_folder: Custom folder for logs (uses USAGE_LOGS_FOLDER if None) |
| """ |
| self.dataset_file_name = dataset_file_name |
| self.flagging_dir = Path(logs_folder if logs_folder else USAGE_LOGS_FOLDER) |
| self.dataset_filepath = None |
| self.headers = None |
|
|
| def setup(self, headers: List[str]): |
| """ |
| Setup the logger with the specified headers. |
| |
| Args: |
| headers: List of column headers for the CSV file |
| """ |
| self.headers = headers |
| self._create_dataset_file() |
|
|
| def _create_dataset_file(self): |
| """Create the dataset CSV file with headers if it doesn't exist.""" |
| os.makedirs(self.flagging_dir, exist_ok=True) |
|
|
| |
| full_headers = self.headers + ["id", "timestamp"] |
|
|
| self.dataset_filepath = self.flagging_dir / self.dataset_file_name |
|
|
| if not Path(self.dataset_filepath).exists(): |
| with open( |
| self.dataset_filepath, "w", newline="", encoding="utf-8" |
| ) as csvfile: |
| writer = csv.writer(csvfile) |
| writer.writerow(full_headers) |
| print(f"Created usage log file at: {self.dataset_filepath}") |
| else: |
| print(f"Using existing usage log file at: {self.dataset_filepath}") |
|
|
| def log_usage( |
| self, |
| data: List[Any], |
| save_to_csv: bool = None, |
| save_to_dynamodb: bool = None, |
| save_to_s3: bool = None, |
| s3_bucket: str = None, |
| s3_key_prefix: str = None, |
| dynamodb_table_name: str = None, |
| dynamodb_headers: List[str] = None, |
| replacement_headers: List[str] = None, |
| ) -> int: |
| """ |
| Log usage data to CSV and optionally DynamoDB and S3. |
| |
| Args: |
| data: List of data values to log |
| save_to_csv: Whether to save to CSV (defaults to config setting) |
| save_to_dynamodb: Whether to save to DynamoDB (defaults to config setting) |
| save_to_s3: Whether to save to S3 (defaults to config setting) |
| s3_bucket: S3 bucket name (defaults to config setting) |
| s3_key_prefix: S3 key prefix (defaults to config setting) |
| dynamodb_table_name: DynamoDB table name (defaults to config setting) |
| dynamodb_headers: DynamoDB headers (defaults to config setting) |
| replacement_headers: Replacement headers for CSV (defaults to config setting) |
| |
| Returns: |
| Number of lines written |
| """ |
| |
| if save_to_csv is None: |
| save_to_csv = SAVE_LOGS_TO_CSV |
| if save_to_dynamodb is None: |
| save_to_dynamodb = SAVE_LOGS_TO_DYNAMODB |
| if save_to_s3 is None: |
| save_to_s3 = RUN_AWS_FUNCTIONS and SAVE_LOGS_TO_CSV |
| if s3_bucket is None: |
| s3_bucket = DOCUMENT_REDACTION_BUCKET |
| if s3_key_prefix is None: |
| s3_key_prefix = S3_USAGE_LOGS_FOLDER |
| if dynamodb_table_name is None: |
| dynamodb_table_name = USAGE_LOG_DYNAMODB_TABLE_NAME |
| if dynamodb_headers is None: |
| dynamodb_headers = DYNAMODB_USAGE_LOG_HEADERS |
| if replacement_headers is None: |
| replacement_headers = CSV_USAGE_LOG_HEADERS |
|
|
| |
| generated_id = str(uuid.uuid4()) |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[ |
| :-3 |
| ] |
| csv_data = data + [generated_id, timestamp] |
|
|
| line_count = 0 |
|
|
| |
| if save_to_csv and self.dataset_filepath: |
| try: |
| with open( |
| self.dataset_filepath, "a", newline="", encoding="utf-8-sig" |
| ) as csvfile: |
| writer = csv.writer(csvfile) |
| writer.writerow(csv_data) |
| line_count = 1 |
| print(f"Logged usage data to CSV: {self.dataset_filepath}") |
| except Exception as e: |
| print(f"Error writing to CSV: {e}") |
|
|
| |
| if save_to_s3 and self.dataset_filepath and s3_bucket and s3_key_prefix: |
| try: |
| |
| upload_result = upload_log_file_to_s3( |
| local_file_paths=[str(self.dataset_filepath)], |
| s3_key=s3_key_prefix, |
| s3_bucket=s3_bucket, |
| RUN_AWS_FUNCTIONS=RUN_AWS_FUNCTIONS, |
| SAVE_LOGS_TO_CSV=SAVE_LOGS_TO_CSV, |
| ) |
| print(f"S3 upload result: {upload_result}") |
| except Exception as e: |
| print(f"Error uploading log file to S3: {e}") |
|
|
| |
| if save_to_dynamodb and dynamodb_table_name and dynamodb_headers: |
| try: |
| |
| if AWS_ACCESS_KEY and AWS_SECRET_KEY: |
| dynamodb = boto3.resource( |
| "dynamodb", |
| region_name=AWS_REGION, |
| aws_access_key_id=AWS_ACCESS_KEY, |
| aws_secret_access_key=AWS_SECRET_KEY, |
| ) |
| else: |
| dynamodb = boto3.resource("dynamodb", region_name=AWS_REGION) |
|
|
| table = dynamodb.Table(dynamodb_table_name) |
|
|
| |
| generated_id = str(uuid.uuid4()) |
|
|
| |
| item = { |
| "id": generated_id, |
| "timestamp": timestamp, |
| } |
|
|
| |
| item.update( |
| { |
| header: str(value) |
| for header, value in zip(dynamodb_headers, data) |
| } |
| ) |
|
|
| table.put_item(Item=item) |
| print("Successfully uploaded usage log to DynamoDB") |
|
|
| except Exception as e: |
| print(f"Could not upload usage log to DynamoDB: {e}") |
|
|
| return line_count |
|
|
|
|
| def create_cli_usage_logger(logs_folder: str = None) -> CLIUsageLogger: |
| """ |
| Create and setup a CLI usage logger with the standard headers. |
| |
| Args: |
| logs_folder: Custom folder for logs (uses USAGE_LOGS_FOLDER if None) |
| |
| Returns: |
| Configured CLIUsageLogger instance |
| """ |
| |
| try: |
| headers = CSV_USAGE_LOG_HEADERS |
| if not headers or len(headers) == 0: |
| raise ValueError("Empty headers list") |
| except Exception as e: |
| print(f"Error using CSV usage log headers: {e}") |
| |
| headers = [ |
| "session_hash_textbox", |
| "doc_full_file_name_textbox", |
| "data_full_file_name_textbox", |
| "actual_time_taken_number", |
| "total_page_count", |
| "textract_query_number", |
| "pii_detection_method", |
| "comprehend_query_number", |
| "cost_code", |
| "textract_handwriting_signature", |
| "host_name_textbox", |
| "text_extraction_method", |
| "is_this_a_textract_api_call", |
| "task", |
| ] |
|
|
| logger = CLIUsageLogger(logs_folder=logs_folder) |
| logger.setup(headers) |
| return logger |
|
|
|
|
| def log_redaction_usage( |
| logger: CLIUsageLogger, |
| session_hash: str, |
| doc_file_name: str, |
| data_file_name: str, |
| time_taken: float, |
| total_pages: int, |
| textract_queries: int, |
| pii_method: str, |
| comprehend_queries: int, |
| cost_code: str, |
| handwriting_signature: str, |
| text_extraction_method: str, |
| is_textract_call: bool, |
| task: str, |
| save_to_dynamodb: bool = None, |
| save_to_s3: bool = None, |
| s3_bucket: str = None, |
| s3_key_prefix: str = None, |
| vlm_model_name: str = "", |
| vlm_total_input_tokens: int = 0, |
| vlm_total_output_tokens: int = 0, |
| llm_model_name: str = "", |
| llm_total_input_tokens: int = 0, |
| llm_total_output_tokens: int = 0, |
| ): |
| """ |
| Log redaction usage data using the provided logger. |
| |
| Args: |
| logger: CLIUsageLogger instance |
| session_hash: Session identifier |
| doc_file_name: Document file name (or placeholder if not displaying names) |
| data_file_name: Data file name (or placeholder if not displaying names) |
| time_taken: Time taken for processing in seconds |
| total_pages: Total number of pages processed |
| textract_queries: Number of Textract API calls made |
| pii_method: PII detection method used |
| comprehend_queries: Number of Comprehend API calls made |
| cost_code: Cost code for the operation |
| handwriting_signature: Handwriting/signature extraction options |
| text_extraction_method: Text extraction method used |
| is_textract_call: Whether this was a Textract API call |
| task: The task performed (redact, deduplicate, textract) |
| save_to_dynamodb: Whether to save to DynamoDB (overrides config default) |
| save_to_s3: Whether to save to S3 (overrides config default) |
| s3_bucket: S3 bucket name (overrides config default) |
| s3_key_prefix: S3 key prefix (overrides config default) |
| vlm_model_name: VLM model name used for OCR |
| vlm_total_input_tokens: Total VLM input tokens used |
| vlm_total_output_tokens: Total VLM output tokens used |
| llm_model_name: LLM model name used for PII detection |
| llm_total_input_tokens: Total LLM input tokens used |
| llm_total_output_tokens: Total LLM output tokens used |
| """ |
| |
| |
| rounded_time_taken = round(time_taken, 2) |
|
|
| data = [ |
| session_hash, |
| doc_file_name, |
| data_file_name, |
| rounded_time_taken, |
| total_pages, |
| textract_queries, |
| pii_method, |
| comprehend_queries, |
| cost_code, |
| handwriting_signature, |
| HOST_NAME, |
| text_extraction_method, |
| is_textract_call, |
| task, |
| vlm_model_name, |
| vlm_total_input_tokens, |
| vlm_total_output_tokens, |
| llm_model_name, |
| llm_total_input_tokens, |
| llm_total_output_tokens, |
| ] |
|
|
| logger.log_usage( |
| data, |
| save_to_dynamodb=save_to_dynamodb, |
| save_to_s3=save_to_s3, |
| s3_bucket=s3_bucket, |
| s3_key_prefix=s3_key_prefix, |
| ) |
|
|