""" Main experiment runner for SLM+RAG anonymization evaluation. Pipeline: 1. Load dataset (tumeteor/Security-TTP-Mapping) 2. Fine-tune ATT&CK classifier (SecureBERT) on original data 3. Evaluate classifier on original test set → F1_original 4. Apply anonymization strategies to test set 5. Evaluate classifier on anonymized test set → F1_anon 6. Compute ΔF1 and statistical significance """ import argparse import ast import json import os import sys import time from collections import Counter, defaultdict from datetime import datetime import numpy as np import torch from datasets import load_dataset from sklearn.metrics import ( classification_report, f1_score, precision_score, recall_score, ) from sklearn.preprocessing import MultiLabelBinarizer from torch.utils.data import DataLoader from transformers import ( AutoModelForSequenceClassification, AutoTokenizer, Trainer, TrainingArguments, ) # Add project root to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from src.anonymizer.ner_detector import CTIEntityDetector from src.anonymizer.strategies import get_strategy def parse_labels(label_str): """Parse label string to list of technique IDs.""" if isinstance(label_str, list): return label_str try: return ast.literal_eval(label_str) except (ValueError, SyntaxError): return [label_str] def load_ttp_dataset(): """Load and prepare the Security-TTP-Mapping dataset.""" print("Loading tumeteor/Security-TTP-Mapping...") ds = load_dataset("tumeteor/Security-TTP-Mapping") all_labels = set() for split in ds: for example in ds[split]: labels = parse_labels(example["labels"]) all_labels.update(labels) all_labels = sorted(all_labels) label2id = {label: idx for idx, label in enumerate(all_labels)} id2label = {idx: label for label, idx in label2id.items()} print(f" Total unique techniques: {len(all_labels)}") print(f" Train: {len(ds['train'])}, Val: {len(ds['validation'])}, Test: {len(ds['test'])}") train_labels = [] for ex in ds["train"]: train_labels.extend(parse_labels(ex["labels"])) label_counts = Counter(train_labels) print(f" Top-10 techniques in train:") for label, count in label_counts.most_common(10): print(f" {label}: {count}") return ds, label2id, id2label class TTPDataset(torch.utils.data.Dataset): """PyTorch dataset for TTP classification.""" def __init__(self, texts, labels_list, tokenizer, label2id, max_length=512): self.texts = texts self.labels_list = labels_list self.tokenizer = tokenizer self.label2id = label2id self.max_length = max_length self.num_labels = len(label2id) def __len__(self): return len(self.texts) def __getitem__(self, idx): text = self.texts[idx] labels = self.labels_list[idx] encoding = self.tokenizer( text, truncation=True, max_length=self.max_length, padding="max_length", return_tensors="pt", ) label_vector = torch.zeros(self.num_labels) for label in labels: if label in self.label2id: label_vector[self.label2id[label]] = 1.0 return { "input_ids": encoding["input_ids"].squeeze(), "attention_mask": encoding["attention_mask"].squeeze(), "labels": label_vector, } def compute_metrics(eval_pred, id2label, threshold=0.5): """Compute multi-label classification metrics.""" logits, labels = eval_pred predictions = (torch.sigmoid(torch.tensor(logits)) > threshold).int().numpy() labels = labels.astype(int) return { "f1_micro": f1_score(labels, predictions, average="micro", zero_division=0), "f1_macro": f1_score(labels, predictions, average="macro", zero_division=0), "f1_weighted": f1_score(labels, predictions, average="weighted", zero_division=0), "precision_micro": precision_score(labels, predictions, average="micro", zero_division=0), "recall_micro": recall_score(labels, predictions, average="micro", zero_division=0), } def train_classifier(ds, label2id, id2label, model_name="ehsanaghaei/SecureBERT", output_dir="./results/classifier", hub_model_id=None, epochs=5, batch_size=16, learning_rate=2e-5): """Fine-tune SecureBERT for multi-label ATT&CK classification.""" print(f"\n=== Training ATT&CK Classifier ({model_name}) ===") tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForSequenceClassification.from_pretrained( model_name, num_labels=len(label2id), problem_type="multi_label_classification", id2label=id2label, label2id=label2id, ) train_texts = [ex["text1"] for ex in ds["train"]] train_labels = [parse_labels(ex["labels"]) for ex in ds["train"]] val_texts = [ex["text1"] for ex in ds["validation"]] val_labels = [parse_labels(ex["labels"]) for ex in ds["validation"]] train_dataset = TTPDataset(train_texts, train_labels, tokenizer, label2id) val_dataset = TTPDataset(val_texts, val_labels, tokenizer, label2id) training_args = TrainingArguments( output_dir=output_dir, num_train_epochs=epochs, per_device_train_batch_size=batch_size, per_device_eval_batch_size=batch_size * 2, learning_rate=learning_rate, warmup_ratio=0.1, weight_decay=0.01, eval_strategy="epoch", save_strategy="epoch", load_best_model_at_end=True, metric_for_best_model="f1_micro", greater_is_better=True, logging_strategy="steps", logging_steps=50, logging_first_step=True, disable_tqdm=True, push_to_hub=hub_model_id is not None, hub_model_id=hub_model_id, fp16=torch.cuda.is_available(), seed=42, report_to="none", ) trainer = Trainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=val_dataset, compute_metrics=lambda p: compute_metrics(p, id2label), ) trainer.train() print("\n=== Training Complete ===") eval_results = trainer.evaluate() print(f"Validation results: {json.dumps(eval_results, indent=2)}") return trainer, tokenizer def evaluate_on_texts(trainer, tokenizer, texts, labels_list, label2id, id2label, threshold=0.5): """Evaluate the classifier on a set of texts.""" dataset = TTPDataset(texts, labels_list, tokenizer, label2id) predictions_output = trainer.predict(dataset) logits = predictions_output.predictions probs = torch.sigmoid(torch.tensor(logits)).numpy() binary_preds = (probs > threshold).astype(int) num_labels = len(label2id) gt = np.zeros((len(labels_list), num_labels)) for i, labels in enumerate(labels_list): for label in labels: if label in label2id: gt[i, label2id[label]] = 1 metrics = { "f1_micro": f1_score(gt, binary_preds, average="micro", zero_division=0), "f1_macro": f1_score(gt, binary_preds, average="macro", zero_division=0), "f1_weighted": f1_score(gt, binary_preds, average="weighted", zero_division=0), "precision_micro": precision_score(gt, binary_preds, average="micro", zero_division=0), "recall_micro": recall_score(gt, binary_preds, average="micro", zero_division=0), } per_technique_f1 = {} for label, idx in label2id.items(): if gt[:, idx].sum() > 0: per_technique_f1[label] = f1_score(gt[:, idx], binary_preds[:, idx], zero_division=0) metrics["per_technique_f1"] = per_technique_f1 return metrics, binary_preds def anonymize_texts(texts, strategy_id, detector=None, **strategy_kwargs): """Apply an anonymization strategy to a list of texts.""" print(f"\n--- Anonymizing with strategy: {strategy_id} ---") strategy = get_strategy(strategy_id, **strategy_kwargs) if strategy_id == "baseline": return texts anonymized = [] entity_stats = defaultdict(int) for i, text in enumerate(texts): if detector is not None: entities = detector.detect_all(text) else: detector_fallback = CTIEntityDetector() entities = detector_fallback.detect_entities_regex(text) for e in entities: entity_stats[e["type"]] += 1 anon_text = strategy.anonymize(text, entities) anonymized.append(anon_text) if (i + 1) % 100 == 0: print(f" Anonymized {i + 1}/{len(texts)} texts") print(f" Entity stats: {dict(entity_stats)}") print(f" Total entities replaced: {sum(entity_stats.values())}") return anonymized def mcnemar_test(pred_original, pred_anonymized, ground_truth): """McNemar's test for paired nominal data.""" from scipy.stats import chi2 correct_orig = (pred_original == ground_truth).all(axis=1) correct_anon = (pred_anonymized == ground_truth).all(axis=1) b = ((correct_orig == True) & (correct_anon == False)).sum() c = ((correct_orig == False) & (correct_anon == True)).sum() if b + c == 0: return {"statistic": 0, "p_value": 1.0, "b": int(b), "c": int(c)} statistic = (abs(b - c) - 1) ** 2 / (b + c) p_value = 1 - chi2.cdf(statistic, df=1) return {"statistic": float(statistic), "p_value": float(p_value), "b": int(b), "c": int(c)} def bootstrap_f1_ci(gt, preds, n_iterations=10000, confidence=0.95): """Bootstrap confidence interval for F1 score.""" n = len(gt) f1_scores = [] rng = np.random.RandomState(42) for _ in range(n_iterations): indices = rng.randint(0, n, size=n) f1 = f1_score(gt[indices], preds[indices], average="micro", zero_division=0) f1_scores.append(f1) f1_scores = sorted(f1_scores) lower = f1_scores[int((1 - confidence) / 2 * n_iterations)] upper = f1_scores[int((1 + confidence) / 2 * n_iterations)] return {"mean": float(np.mean(f1_scores)), "lower": float(lower), "upper": float(upper)} def run_experiment(args): """Run the full experiment pipeline.""" results = { "experiment_id": datetime.now().strftime("%Y%m%d_%H%M%S"), "config": vars(args), "strategies": {}, } # Step 1: Load dataset print("\n" + "=" * 60) print("STEP 1: Loading Dataset") print("=" * 60) ds, label2id, id2label = load_ttp_dataset() # Step 2: Train classifier print("\n" + "=" * 60) print("STEP 2: Training ATT&CK Classifier") print("=" * 60) trainer, tokenizer = train_classifier( ds, label2id, id2label, model_name=args.classifier_model, output_dir=args.output_dir, hub_model_id=args.hub_model_id, epochs=args.epochs, batch_size=args.batch_size, learning_rate=args.learning_rate, ) # Step 3: Evaluate on original test set print("\n" + "=" * 60) print("STEP 3: Evaluating on Original Test Set") print("=" * 60) test_texts = [ex["text1"] for ex in ds["test"]] test_labels = [parse_labels(ex["labels"]) for ex in ds["test"]] original_metrics, original_preds = evaluate_on_texts( trainer, tokenizer, test_texts, test_labels, label2id, id2label ) print(f"\nOriginal test set results:") for k, v in original_metrics.items(): if k != "per_technique_f1": print(f" {k}: {v:.4f}") results["strategies"]["baseline"] = { "metrics": {k: v for k, v in original_metrics.items() if k != "per_technique_f1"}, "per_technique_f1": original_metrics.get("per_technique_f1", {}), } # Step 4: Apply anonymization strategies and evaluate print("\n" + "=" * 60) print("STEP 4: Anonymization + Evaluation") print("=" * 60) detector = None if args.use_gliner: print("Loading GLiNER model for NER...") detector = CTIEntityDetector(model_name=args.ner_model) detector.load_model() strategies_to_test = ["placeholder", "full_redact", "slm_replace"] if args.use_slm_rag: strategies_to_test.append("slm_rag") num_labels = len(label2id) gt_binary = np.zeros((len(test_labels), num_labels)) for i, labels in enumerate(test_labels): for label in labels: if label in label2id: gt_binary[i, label2id[label]] = 1 for strategy_id in strategies_to_test: print(f"\n--- Strategy: {strategy_id} ---") anon_texts = anonymize_texts(test_texts, strategy_id, detector=detector) anon_metrics, anon_preds = evaluate_on_texts( trainer, tokenizer, anon_texts, test_labels, label2id, id2label ) delta_f1 = original_metrics["f1_micro"] - anon_metrics["f1_micro"] mcnemar_result = mcnemar_test(original_preds, anon_preds, gt_binary.astype(int)) bootstrap_ci = bootstrap_f1_ci(gt_binary.astype(int), anon_preds) print(f"\n{strategy_id} results:") for k, v in anon_metrics.items(): if k != "per_technique_f1": print(f" {k}: {v:.4f}") print(f" ΔF1 (original - anonymized): {delta_f1:.4f}") print(f" McNemar's test p-value: {mcnemar_result['p_value']:.6f}") print(f" Bootstrap 95% CI: [{bootstrap_ci['lower']:.4f}, {bootstrap_ci['upper']:.4f}]") results["strategies"][strategy_id] = { "metrics": {k: v for k, v in anon_metrics.items() if k != "per_technique_f1"}, "per_technique_f1": anon_metrics.get("per_technique_f1", {}), "delta_f1_micro": delta_f1, "mcnemar_test": mcnemar_result, "bootstrap_ci_95": bootstrap_ci, "anonymization_example": { "original": test_texts[0][:500] if test_texts else "", "anonymized": anon_texts[0][:500] if anon_texts else "", }, } # Step 5: Summary print("\n" + "=" * 60) print("EXPERIMENT SUMMARY") print("=" * 60) print(f"\n{'Strategy':<20} {'F1_micro':<12} {'ΔF1':<12} {'p-value':<12} {'Significant?'}") print("-" * 68) baseline_f1 = results["strategies"]["baseline"]["metrics"]["f1_micro"] print(f"{'baseline':<20} {baseline_f1:<12.4f} {'N/A':<12} {'N/A':<12} {'N/A'}") for strategy_id in strategies_to_test: if strategy_id in results["strategies"]: s = results["strategies"][strategy_id] f1 = s["metrics"]["f1_micro"] delta = s["delta_f1_micro"] p_val = s["mcnemar_test"]["p_value"] sig = "YES" if p_val < 0.05 else "NO" print(f"{strategy_id:<20} {f1:<12.4f} {delta:<12.4f} {p_val:<12.6f} {sig}") # Save results os.makedirs(args.output_dir, exist_ok=True) results_path = os.path.join(args.output_dir, "experiment_results.json") def convert_numpy(obj): if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.ndarray): return obj.tolist() return obj with open(results_path, "w") as f: json.dump(results, f, indent=2, default=convert_numpy) print(f"\nResults saved to: {results_path}") return results def main(): parser = argparse.ArgumentParser(description="SLM+RAG Anonymization Experiment for TRAM") parser.add_argument("--classifier-model", default="ehsanaghaei/SecureBERT") parser.add_argument("--epochs", type=int, default=5) parser.add_argument("--batch-size", type=int, default=16) parser.add_argument("--learning-rate", type=float, default=2e-5) parser.add_argument("--ner-model", default="urchade/gliner_mediumv2.1") parser.add_argument("--use-gliner", action="store_true") parser.add_argument("--use-slm-rag", action="store_true") parser.add_argument("--slm-model", default="fdtn-ai/Foundation-Sec-8B-Instruct") parser.add_argument("--output-dir", default="./results/experiment") parser.add_argument("--hub-model-id", default=None) args = parser.parse_args() run_experiment(args) if __name__ == "__main__": main()