import os from typing import List, Type import boto3 import pandas as pd from tools.config import ( AWS_ACCESS_KEY, AWS_REGION, AWS_SECRET_KEY, DOCUMENT_REDACTION_BUCKET, PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS, RUN_AWS_FUNCTIONS, S3_OUTPUTS_BUCKET, SAVE_LOGS_TO_CSV, SAVE_OUTPUTS_TO_S3, ) from tools.secure_path_utils import secure_join PandasDataFrame = Type[pd.DataFrame] def _effective_aws_region() -> str: """Resolve region at call time (env may be set after ``tools.config`` import).""" return ( os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION") or AWS_REGION or "" ).strip() def connect_to_bedrock_runtime( model_name_map: dict, model_choice: str, aws_access_key_textbox: str = "", aws_secret_key_textbox: str = "", aws_region_textbox: str = "", ): # If running an anthropic model, assume that running an AWS Bedrock model, load in Bedrock model_source = model_name_map[model_choice]["source"] # Use aws_region_textbox if provided, otherwise fall back to AWS_REGION from config region = ( aws_region_textbox if aws_region_textbox else _effective_aws_region() or AWS_REGION ) if "AWS" in model_source: if RUN_AWS_FUNCTIONS and PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS == "1": print("Connecting to Bedrock via existing SSO connection") bedrock_runtime = boto3.client("bedrock-runtime", region_name=region) elif aws_access_key_textbox and aws_secret_key_textbox: print( "Connecting to Bedrock using AWS access key and secret keys from user input." ) bedrock_runtime = boto3.client( "bedrock-runtime", aws_access_key_id=aws_access_key_textbox, aws_secret_access_key=aws_secret_key_textbox, region_name=region, ) elif AWS_ACCESS_KEY and AWS_SECRET_KEY: print("Getting Bedrock credentials from environment variables") bedrock_runtime = boto3.client( "bedrock-runtime", aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=region, ) elif RUN_AWS_FUNCTIONS == "1": print("Connecting to Bedrock via existing SSO connection") bedrock_runtime = boto3.client("bedrock-runtime", region_name=region) else: bedrock_runtime = "" out_message = "Cannot connect to AWS Bedrock service. Please provide access keys under LLM settings, or choose another model type." print(out_message) raise Exception(out_message) else: bedrock_runtime = None return bedrock_runtime def get_assumed_role_info(): region = _effective_aws_region() if not region: raise ValueError( "AWS region is not configured (set AWS_REGION or profile region)" ) sts_endpoint = f"https://sts.{region}.amazonaws.com" sts = boto3.client("sts", region_name=region, endpoint_url=sts_endpoint) response = sts.get_caller_identity() # Extract ARN of the assumed role assumed_role_arn = response["Arn"] # Extract the name of the assumed role from the ARN assumed_role_name = assumed_role_arn.split("/")[-1] return assumed_role_arn, assumed_role_name if RUN_AWS_FUNCTIONS: # Empty AWS_PROFILE (common from compose ``AWS_PROFILE=${AWS_PROFILE:-}``) breaks boto3. if not (os.environ.get("AWS_PROFILE") or "").strip(): os.environ.pop("AWS_PROFILE", None) region = _effective_aws_region() if region: try: session = boto3.Session(region_name=region) except Exception as e: print("Could not start boto3 session:", e) try: assumed_role_arn, assumed_role_name = get_assumed_role_info() print("Successfully assumed ARN role") # print("Assumed Role ARN:", assumed_role_arn) # print("Assumed Role Name:", assumed_role_name) except Exception as e: print("Could not get assumed role from STS:", e) else: print( "Skipping AWS startup checks: AWS_REGION is not set " "(set AWS_REGION or run Pi configure_aws_credentials before importing tools.aws_functions)" ) # Download direct from S3 - requires login credentials def download_file_from_s3( bucket_name: str, key: str, local_file_path_and_name: str, RUN_AWS_FUNCTIONS: bool = RUN_AWS_FUNCTIONS, ): if RUN_AWS_FUNCTIONS: try: # Ensure the local directory exists os.makedirs(os.path.dirname(local_file_path_and_name), exist_ok=True) s3 = boto3.client("s3", region_name=_effective_aws_region()) s3.download_file(bucket_name, key, local_file_path_and_name) print( f"File downloaded from s3://{bucket_name}/{key} to {local_file_path_and_name}" ) except Exception as e: print("Could not download file:", key, "from s3 due to", e) def download_folder_from_s3( bucket_name: str, s3_folder: str, local_folder: str, RUN_AWS_FUNCTIONS: bool = RUN_AWS_FUNCTIONS, ): """ Download all files from an S3 folder to a local folder. """ if RUN_AWS_FUNCTIONS: if bucket_name and s3_folder and local_folder: s3 = boto3.client("s3", region_name=_effective_aws_region()) # List objects in the specified S3 folder response = s3.list_objects_v2(Bucket=bucket_name, Prefix=s3_folder) # Download each object for obj in response.get("Contents", []): # Extract object key and construct local file path object_key = obj["Key"] local_file_path = secure_join( local_folder, os.path.relpath(object_key, s3_folder) ) # Create directories if necessary os.makedirs(os.path.dirname(local_file_path), exist_ok=True) # Download the object try: s3.download_file(bucket_name, object_key, local_file_path) print( f"Downloaded 's3://{bucket_name}/{object_key}' to '{local_file_path}'" ) except Exception as e: print(f"Error downloading 's3://{bucket_name}/{object_key}':", e) else: print( "One or more required variables are empty, could not download from S3" ) def download_files_from_s3( bucket_name: str, s3_folder: str, local_folder: str, filenames: List[str], RUN_AWS_FUNCTIONS: bool = RUN_AWS_FUNCTIONS, ): """ Download specific files from an S3 folder to a local folder. """ if RUN_AWS_FUNCTIONS: if bucket_name and s3_folder and local_folder and filenames: s3 = boto3.client("s3", region_name=_effective_aws_region()) print("Trying to download file: ", filenames) if filenames == "*": # List all objects in the S3 folder print("Trying to download all files in AWS folder: ", s3_folder) response = s3.list_objects_v2(Bucket=bucket_name, Prefix=s3_folder) print("Found files in AWS folder: ", response.get("Contents", [])) filenames = [ obj["Key"].split("/")[-1] for obj in response.get("Contents", []) ] print("Found filenames in AWS folder: ", filenames) for filename in filenames: object_key = secure_join(s3_folder, filename) local_file_path = secure_join(local_folder, filename) # Create directories if necessary os.makedirs(os.path.dirname(local_file_path), exist_ok=True) # Download the object try: s3.download_file(bucket_name, object_key, local_file_path) print( f"Downloaded 's3://{bucket_name}/{object_key}' to '{local_file_path}'" ) except Exception as e: print(f"Error downloading 's3://{bucket_name}/{object_key}':", e) else: print( "One or more required variables are empty, could not download from S3" ) def upload_file_to_s3( local_file_paths: List[str], s3_key: str, s3_bucket: str = DOCUMENT_REDACTION_BUCKET, RUN_AWS_FUNCTIONS: bool = RUN_AWS_FUNCTIONS, ): """ Uploads a file from local machine to Amazon S3. Args: - local_file_path: Local file path(s) of the file(s) to upload. - s3_key: Key (path) to the file in the S3 bucket. - s3_bucket: Name of the S3 bucket. Returns: - Message as variable/printed to console """ final_out_message = list() final_out_message_str = "" if RUN_AWS_FUNCTIONS: try: # Allow empty s3_key for uploads to bucket root if s3_bucket and local_file_paths: s3_client = boto3.client("s3", region_name=_effective_aws_region()) s3_key_prefix = s3_key if s3_key else "" if isinstance(local_file_paths, str): local_file_paths = [local_file_paths] for file in local_file_paths: if s3_client: # print(s3_client) try: # Get file name off file path file_name = os.path.basename(file) s3_key_full = s3_key_prefix + file_name # print("S3 key: ", s3_bucket, "/", s3_key_full, sep="") s3_client.upload_file(file, s3_bucket, s3_key_full) out_message = ( "File " + file_name + " uploaded successfully!" ) except Exception as e: out_message = f"Error uploading file(s): {e}" print(out_message) final_out_message.append(out_message) final_out_message_str = "\n".join(final_out_message) else: final_out_message_str = "Could not connect to AWS." else: final_out_message_str = ( "At least one essential variable is empty, could not upload to S3" ) except Exception as e: final_out_message_str = "Could not upload files to S3 due to: " + str(e) print(final_out_message_str) else: final_out_message_str = "App not set to run AWS functions" return final_out_message_str def upload_log_file_to_s3( local_file_paths: List[str], s3_key: str, s3_bucket: str = DOCUMENT_REDACTION_BUCKET, RUN_AWS_FUNCTIONS: bool = RUN_AWS_FUNCTIONS, SAVE_LOGS_TO_CSV: bool = SAVE_LOGS_TO_CSV, ): """ Uploads a log file from local machine to Amazon S3. Args: - local_file_path: Local file path(s) of the file(s) to upload. - s3_key: Key (path) to the file in the S3 bucket. - s3_bucket: Name of the S3 bucket. Returns: - Message as variable/printed to console """ final_out_message = list() final_out_message_str = "" if RUN_AWS_FUNCTIONS and SAVE_LOGS_TO_CSV: try: if s3_bucket and s3_key and local_file_paths: s3_client = boto3.client("s3", region_name=_effective_aws_region()) if isinstance(local_file_paths, str): local_file_paths = [local_file_paths] for file in local_file_paths: if s3_client: # print(s3_client) try: # Get file name off file path file_name = os.path.basename(file) s3_key_full = s3_key + file_name s3_client.upload_file(file, s3_bucket, s3_key_full) out_message = ( "File " + file_name + " uploaded successfully!" ) # print(out_message) except Exception as e: out_message = f"Error uploading file(s): {e}" print(out_message) final_out_message.append(out_message) final_out_message_str = "\n".join(final_out_message) else: final_out_message_str = "Could not connect to AWS." else: final_out_message_str = ( "At least one essential variable is empty, could not upload to S3" ) except Exception as e: final_out_message_str = "Could not upload files to S3 due to: " + str(e) print(final_out_message_str) else: final_out_message_str = "App not set to run AWS functions" return final_out_message_str def s3_outputs_upload_ready( *, save_outputs_to_s3: bool | None = None, s3_bucket: str | None = None, ) -> bool: """True when automatic redaction output upload to S3 should run.""" flag = SAVE_OUTPUTS_TO_S3 if save_outputs_to_s3 is None else save_outputs_to_s3 bucket = (s3_bucket if s3_bucket is not None else S3_OUTPUTS_BUCKET) or "" return bool(flag and RUN_AWS_FUNCTIONS and bucket.strip()) # Helper to upload outputs to S3 when enabled in config. def export_outputs_to_s3( file_list_state, s3_output_folder_state_value: str, save_outputs_to_s3_flag: bool, base_file_state=None, s3_bucket: str = S3_OUTPUTS_BUCKET, ) -> str | None: """ Upload a list of local output files to the configured S3 outputs folder. - file_list_state: Gradio dropdown state that holds a list of file paths or a single path/string. If blank/empty, no action is taken. - s3_output_folder_state_value: Final S3 key prefix (including any session hash) to use as the destination folder for uploads. - s3_bucket: Name of the S3 bucket. Returns a short user-facing warning when any upload fails, else ``None``. """ try: if not s3_outputs_upload_ready( save_outputs_to_s3=save_outputs_to_s3_flag, s3_bucket=s3_bucket, ): return None if not s3_output_folder_state_value: # No configured S3 outputs folder – nothing to do return None # Normalise input to a Python list of strings file_paths = file_list_state if not file_paths: return None # Gradio dropdown may return a single string or a list if isinstance(file_paths, str): file_paths = [file_paths] # Filter out any non-truthy values file_paths = [p for p in file_paths if p] if not file_paths: return None # Derive a base file stem (name without extension) from the original # file(s) being analysed, if provided. This is used to create an # additional subfolder layer so that outputs are grouped under the # analysed file name rather than under each output file name. base_stem = None if base_file_state: base_path = None # Gradio File components typically provide a list of objects with a `.name` attribute if isinstance(base_file_state, str): base_path = base_file_state elif isinstance(base_file_state, list) and base_file_state: first_item = base_file_state[0] base_path = getattr(first_item, "name", None) or str(first_item) else: base_path = getattr(base_file_state, "name", None) or str( base_file_state ) if base_path: base_name = os.path.basename(base_path) base_stem, _ = os.path.splitext(base_name) # Ensure base S3 prefix (session/date) ends with a trailing slash base_prefix = s3_output_folder_state_value if not base_prefix.endswith("/"): base_prefix = base_prefix + "/" # For each file, append a subfolder. If we have a derived base_stem # from the input being analysed, use that; otherwise, fall back to # the individual output file name stem. Final pattern: # /// # or, if base_file_stem is not available: # /// upload_failed = False last_error = "" for file in file_paths: file_name = os.path.basename(file) if base_stem: folder_stem = base_stem else: folder_stem, _ = os.path.splitext(file_name) per_file_prefix = base_prefix + folder_stem + "/" out_message = upload_file_to_s3( local_file_paths=[file], s3_key=per_file_prefix, s3_bucket=s3_bucket, ) # Log any issues to console so failures are visible in logs/stdout if ( "Error uploading file" in out_message or "could not upload" in out_message.lower() ): upload_failed = True last_error = out_message print("export_outputs_to_s3 encountered issues:", out_message) if not upload_failed: print("Successfully uploaded outputs to S3") return None summary = last_error or "One or more files could not be uploaded to S3." if "AccessDenied" in summary: return ( "Could not upload outputs to S3 (AccessDenied). " "Check ECS task role permissions on the outputs bucket." ) return f"Could not upload all outputs to S3. {summary[:400]}" except Exception as e: # Do not break the app flow if S3 upload fails – just report to console print(f"export_outputs_to_s3 failed with error: {e}") return f"S3 upload failed: {e}" return None