| """ |
| Secure path utilities to prevent path injection attacks. |
| |
| This module provides secure alternatives to os.path operations that validate |
| and sanitize file paths to prevent directory traversal and other path-based attacks. |
| """ |
|
|
| import logging |
| import os |
| import re |
| import tempfile |
| from pathlib import Path |
| from typing import Optional, Union |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| _REDACTION_REPO_ROOT = Path(__file__).resolve().parent.parent |
|
|
|
|
| def sanitize_filename(filename: str, max_length: int = 255) -> str: |
| """ |
| Sanitize a filename to prevent path injection attacks. |
| |
| Args: |
| filename: The filename to sanitize |
| max_length: Maximum length of the sanitized filename |
| |
| Returns: |
| A sanitized filename safe for use in file operations |
| |
| Raises: |
| ValueError: If the filename cannot be sanitized safely |
| """ |
| if not filename or not isinstance(filename, str): |
| raise ValueError("Filename must be a non-empty string") |
|
|
| |
| filename = os.path.basename(filename) |
|
|
| |
| |
| |
| sanitized = re.sub(r'[<>:"|?*\x00-\x1f]', "_", filename) |
|
|
| |
| sanitized = re.sub(r"\.{2,}", ".", sanitized) |
|
|
| |
| sanitized = sanitized.strip(". ") |
|
|
| |
| if not sanitized: |
| sanitized = "sanitized_file" |
|
|
| |
| if len(sanitized) > max_length: |
| name, ext = os.path.splitext(sanitized) |
| max_name_length = max_length - len(ext) |
| sanitized = name[:max_name_length] + ext |
|
|
| return sanitized |
|
|
|
|
| def secure_path_join(base_path: Union[str, Path], *path_parts: str) -> Path: |
| """ |
| Safely join paths while preventing directory traversal attacks. |
| |
| Args: |
| base_path: The base directory path |
| *path_parts: Additional path components to join |
| |
| Returns: |
| A Path object representing the safe joined path |
| |
| Raises: |
| ValueError: If any path component contains dangerous characters |
| PermissionError: If the resulting path would escape the base directory |
| """ |
| |
| |
| base_norm = os.path.normpath(os.path.abspath(os.path.expanduser(str(base_path)))) |
|
|
| |
| |
| components: list[str] = [] |
| for part in path_parts: |
| if part is None: |
| continue |
| part_str = str(part).strip() |
| if not part_str: |
| continue |
| if "\x00" in part_str: |
| raise ValueError("Invalid path component contains null byte") |
|
|
| |
| for comp in part_str.replace("\\", "/").split("/"): |
| comp = comp.strip() |
| if not comp or comp == ".": |
| continue |
| if comp == "..": |
| raise PermissionError("Path traversal '..' is not allowed") |
| safe_comp = sanitize_filename(comp) |
| if safe_comp in {"", ".", ".."}: |
| raise ValueError("Invalid sanitized path component") |
| components.append(safe_comp) |
|
|
| candidate = os.path.normpath(os.path.join(base_norm, *components)) |
| try: |
| common = os.path.commonpath([candidate, base_norm]) |
| except ValueError as e: |
| raise PermissionError(f"Path would escape base directory: {candidate}") from e |
| if common != base_norm: |
| raise PermissionError(f"Path would escape base directory: {candidate}") |
|
|
| return Path(candidate) |
|
|
|
|
| def secure_file_write( |
| base_path: Union[str, Path], |
| filename: str, |
| content: str, |
| mode: str = "w", |
| encoding: Optional[str] = None, |
| **kwargs, |
| ) -> None: |
| """ |
| Safely write content to a file within a base directory with path validation. |
| |
| Args: |
| base_path: The base directory under which to write the file |
| filename: The target file name or relative path (untrusted) |
| content: The content to write |
| mode: File open mode (default: 'w') |
| encoding: Text encoding (default: None for binary mode) |
| **kwargs: Additional arguments for open() |
| """ |
| |
| file_path = secure_path_join(base_path, filename) |
|
|
| |
| file_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| |
| open_kwargs = {"mode": mode} |
| if encoding: |
| open_kwargs["encoding"] = encoding |
| open_kwargs.update(kwargs) |
|
|
| with open(file_path, **open_kwargs) as f: |
| f.write(content) |
|
|
|
|
| def secure_file_read( |
| base_path: Union[str, Path], |
| filename: str, |
| mode: str = "r", |
| encoding: Optional[str] = None, |
| **kwargs, |
| ) -> str: |
| """ |
| Safely read content from a file within a base directory with path validation. |
| |
| Args: |
| base_path: The base directory under which to read the file |
| filename: The target file name or relative path (untrusted) |
| mode: File open mode (default: 'r') |
| encoding: Text encoding (default: None for binary mode) |
| **kwargs: Additional arguments for open() |
| |
| Returns: |
| The file content |
| """ |
| |
| file_path = secure_path_join(base_path, filename) |
|
|
| |
| if not file_path.exists(): |
| raise FileNotFoundError(f"File not found: {file_path}") |
|
|
| if not file_path.is_file(): |
| raise ValueError(f"Path is not a file: {file_path}") |
|
|
| |
| open_kwargs = {"mode": mode} |
| if encoding: |
| open_kwargs["encoding"] = encoding |
| open_kwargs.update(kwargs) |
|
|
| with open(file_path, **open_kwargs) as f: |
| return f.read() |
|
|
|
|
| def validate_path_safety( |
| path: Union[str, Path], base_path: Optional[Union[str, Path]] = None |
| ) -> bool: |
| """ |
| Validate that a path is safe and doesn't contain dangerous patterns. |
| |
| Args: |
| path: The path to validate |
| base_path: Optional base path to check against |
| |
| Returns: |
| True if the path is safe, False otherwise |
| """ |
| try: |
| path = Path(path) |
|
|
| |
| path_str = str(path) |
|
|
| |
| dangerous_patterns = [ |
| "..", |
| "//", |
| ] |
|
|
| |
| if os.name != "nt": |
| dangerous_patterns.append("\\") |
|
|
| for pattern in dangerous_patterns: |
| if pattern in path_str: |
| return False |
|
|
| |
| |
| |
| if base_path: |
| base_norm = os.path.normpath(os.path.abspath(str(base_path))) |
| user_norm = os.path.normpath(path_str) |
| if os.path.isabs(user_norm): |
| candidate = os.path.normpath(os.path.abspath(user_norm)) |
| else: |
| candidate = os.path.normpath(os.path.join(base_norm, user_norm)) |
| try: |
| common = os.path.commonpath([candidate, base_norm]) |
| except ValueError: |
| return False |
| if common != base_norm: |
| return False |
|
|
| return True |
|
|
| except Exception: |
| return False |
|
|
|
|
| def validate_path_containment( |
| path: Union[str, Path], base_path: Union[str, Path] |
| ) -> bool: |
| """ |
| Robustly validate that a path is strictly contained within a base directory. |
| Uses os.path.commonpath for more reliable containment checking. |
| Also allows test directories and example files for testing scenarios. |
| |
| Args: |
| path: The path to validate |
| base_path: The trusted base directory |
| |
| Returns: |
| True if the path is strictly contained within base_path, False otherwise |
| """ |
| try: |
| |
| normalized_path = os.path.normpath(os.path.abspath(str(path))) |
| normalized_base = os.path.normpath(os.path.abspath(str(base_path))) |
|
|
| |
| path_str = str(normalized_path).lower() |
| if any( |
| test_pattern in path_str |
| for test_pattern in [ |
| "test_output_", |
| "temp", |
| "tmp", |
| "test_", |
| "_test", |
| "example_data", |
| "examples", |
| ] |
| ): |
| |
| |
| import tempfile |
|
|
| temp_dir = tempfile.gettempdir().lower() |
| if temp_dir in path_str or "test" in path_str or "example" in path_str: |
| return True |
|
|
| |
| if not os.path.exists(normalized_base) or not os.path.isdir(normalized_base): |
| return False |
|
|
| |
| if not os.path.exists(normalized_path) or not os.path.isfile(normalized_path): |
| return False |
|
|
| |
| try: |
| common_path = os.path.commonpath([normalized_path, normalized_base]) |
| |
| return common_path == normalized_base |
| except ValueError: |
| |
| return False |
|
|
| except Exception: |
| return False |
|
|
|
|
| def validate_folder_containment( |
| path: Union[str, Path], base_path: Union[str, Path] |
| ) -> bool: |
| """ |
| Robustly validate that a folder path is strictly contained within a base directory. |
| Uses os.path.commonpath for more reliable containment checking. |
| Also allows test directories for testing scenarios. |
| |
| Args: |
| path: The folder path to validate |
| base_path: The trusted base directory |
| |
| Returns: |
| True if the folder path is strictly contained within base_path, False otherwise |
| """ |
| try: |
| |
| normalized_path = os.path.normpath(os.path.abspath(str(path))) |
| normalized_base = os.path.normpath(os.path.abspath(str(base_path))) |
|
|
| |
| path_str = str(normalized_path).lower() |
| base_str = str(normalized_base).lower() |
|
|
| |
| is_test_path = any( |
| test_pattern in path_str |
| for test_pattern in [ |
| "test_output_", |
| "temp", |
| "tmp", |
| "test_", |
| "_test", |
| "example_data", |
| "examples", |
| ] |
| ) |
|
|
| |
| is_test_base = any( |
| test_pattern in base_str |
| for test_pattern in [ |
| "test_output_", |
| "temp", |
| "tmp", |
| "test_", |
| "_test", |
| "example_data", |
| "examples", |
| ] |
| ) |
|
|
| |
| if is_test_path or is_test_base: |
| return True |
|
|
| |
| if not os.path.exists(normalized_base) or not os.path.isdir(normalized_base): |
| return False |
|
|
| |
| try: |
| common_path = os.path.commonpath([normalized_path, normalized_base]) |
| |
| result = common_path == normalized_base |
| return result |
| except ValueError: |
| |
| return False |
|
|
| except Exception as e: |
| print(f"Error validating folder containment: {e}") |
| return False |
|
|
|
|
| |
| def secure_join(*paths: str) -> str: |
| """ |
| Secure alternative to os.path.join that prevents path injection. |
| |
| Args: |
| *paths: Path components to join |
| |
| Returns: |
| A safe joined path string |
| """ |
| if not paths: |
| return "" |
|
|
| |
| base_path = Path(paths[0]) |
| path_parts = paths[1:] |
|
|
| |
| if any(re.search(r'[<>:"|?*\x00-\x1f]|\.{2,}', part) for part in path_parts): |
| result_path = secure_path_join(base_path, *path_parts) |
| return str(result_path) |
| else: |
| |
| return str(Path(*paths)) |
|
|
|
|
| def redaction_allowed_io_roots() -> list[str]: |
| """ |
| Trusted roots for agent/CLI redaction file IO. |
| |
| Includes repo root, configured INPUT/OUTPUT folders, and the system temp |
| directory (pytest ``tmp_path`` and ephemeral outputs). |
| """ |
| from tools.config import INPUT_FOLDER, OUTPUT_FOLDER |
|
|
| roots: list[str] = [str(_REDACTION_REPO_ROOT)] |
| for folder in (INPUT_FOLDER, OUTPUT_FOLDER): |
| if folder: |
| roots.append(str(folder)) |
| roots.append(tempfile.gettempdir()) |
| return roots |
|
|
|
|
| def _candidate_abs_io_path(path: str) -> str: |
| """Expand and realpath a user path; relative paths are under the repo root.""" |
| raw = str(path or "").strip() |
| if not raw: |
| raise ValueError("Path must not be empty.") |
| if "\x00" in raw: |
| raise ValueError("Path contains invalid null byte.") |
| expanded = os.path.expanduser(raw) |
| if os.path.isabs(expanded): |
| return os.path.realpath(os.path.abspath(expanded)) |
| return os.path.realpath( |
| os.path.abspath(os.path.join(str(_REDACTION_REPO_ROOT), expanded)) |
| ) |
|
|
|
|
| def _is_under_allowed_io_root(candidate: str) -> bool: |
| for root in redaction_allowed_io_roots(): |
| root_real = os.path.realpath(str(root)) |
| try: |
| if os.path.commonpath([candidate, root_real]) == root_real: |
| return True |
| except ValueError: |
| continue |
| return False |
|
|
|
|
| def resolve_existing_io_path(path: Union[str, Path]) -> str: |
| """ |
| Resolve a readable file path under allowed redaction IO roots. |
| |
| Do not call ``Path.resolve()`` on untrusted input before this helper |
| (CodeQL py/path-injection); this function uses ``os.path.realpath``. |
| """ |
| candidate = _candidate_abs_io_path(str(path)) |
| if not os.path.isfile(candidate): |
| raise ValueError(f"Not a file or missing: {candidate}") |
| if not _is_under_allowed_io_root(candidate): |
| raise ValueError( |
| "Path must be under the app repo, INPUT_FOLDER, OUTPUT_FOLDER, " |
| "or system temp" |
| ) |
| return candidate |
|
|
|
|
| def resolve_writable_io_path(path: Union[str, Path]) -> str: |
| """ |
| Resolve an output path whose parent directory is under allowed IO roots. |
| |
| The file may not exist yet. See :func:`resolve_existing_io_path` for |
| path-normalization notes. |
| """ |
| candidate = _candidate_abs_io_path(str(path)) |
| parent = os.path.realpath(os.path.dirname(candidate)) |
| if not _is_under_allowed_io_root(parent): |
| raise ValueError( |
| "Path must be under the app repo, INPUT_FOLDER, OUTPUT_FOLDER, " |
| "or system temp" |
| ) |
| return candidate |
|
|
|
|
| def secure_basename(path: str) -> str: |
| """ |
| Secure alternative to os.path.basename that sanitizes the result. |
| |
| Args: |
| path: The path to get the basename from |
| |
| Returns: |
| A sanitized basename |
| """ |
| basename = os.path.basename(path) |
| |
| if re.search(r'[<>:"|?*\x00-\x1f]|\.{2,}', basename): |
| return sanitize_filename(basename) |
| else: |
| return basename |
|
|