Spaces:
Configuration error
Configuration error
File size: 5,044 Bytes
8d2ec7a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 | import os
import re
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
import joblib
import numpy as np
class LogPreprocessor:
def __init__(self):
self.vectorizer = TfidfVectorizer(
max_features=1000,
stop_words='english',
ngram_range=(1, 2),
min_df=2
)
def clean_log(self, log_content):
# Remove timestamps
log_content = re.sub(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z', '[TIMESTAMP]', log_content)
# Remove specific IDs and numbers
log_content = re.sub(r'run_\d+', 'run_ID', log_content)
log_content = re.sub(r'job_\d+', 'job_ID', log_content)
log_content = re.sub(r'\b\d{8,}\b', '[ID]', log_content)
# Remove paths that might be environment-specific
log_content = re.sub(r'/home/[^/\s]+', '/home/USER', log_content)
log_content = re.sub(r'/tmp/[^/\s]+', '/tmp/TEMP', log_content)
# Normalize whitespace
log_content = re.sub(r'\s+', ' ', log_content)
return log_content.strip()
def load_logs(self, data_dir):
logs = []
labels = []
# Load normal logs
normal_dir = os.path.join(data_dir, 'normal')
if os.path.exists(normal_dir):
for filename in os.listdir(normal_dir):
filepath = os.path.join(normal_dir, filename)
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
logs.append(self.clean_log(content))
labels.append(0) # 0 for normal
# Load anomalous logs
anomalous_dir = os.path.join(data_dir, 'anomalous')
if os.path.exists(anomalous_dir):
for filename in os.listdir(anomalous_dir):
filepath = os.path.join(anomalous_dir, filename)
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
logs.append(self.clean_log(content))
labels.append(1) # 1 for anomalous
return logs, labels
class AnomalyDetectionModel:
def __init__(self):
self.preprocessor = LogPreprocessor()
self.model = RandomForestClassifier(
n_estimators=100,
random_state=42,
class_weight='balanced'
)
self.is_trained = False
def train(self, data_dir='data'):
print("Loading logs...")
logs, labels = self.preprocessor.load_logs(data_dir)
if len(logs) == 0:
raise ValueError("No logs found. Please run data collection first.")
print(f"Loaded {len(logs)} logs ({labels.count(0)} normal, {labels.count(1)} anomalous)")
# Vectorize logs
print("Vectorizing logs...")
X = self.preprocessor.vectorizer.fit_transform(logs)
y = np.array(labels)
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Train model
print("Training model...")
self.model.fit(X_train, y_train)
# Evaluate
y_pred = self.model.predict(X_test)
print("\nModel Performance:")
print(classification_report(y_test, y_pred, target_names=['Normal', 'Anomalous']))
print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred))
self.is_trained = True
# Save model and vectorizer
joblib.dump(self.model, 'anomaly_model.pkl')
joblib.dump(self.preprocessor.vectorizer, 'vectorizer.pkl')
print("\nModel saved as 'anomaly_model.pkl' and 'vectorizer.pkl'")
def predict(self, log_content):
if not self.is_trained:
# Load saved model
try:
self.model = joblib.load('anomaly_model.pkl')
self.preprocessor.vectorizer = joblib.load('vectorizer.pkl')
self.is_trained = True
except FileNotFoundError:
raise ValueError("No trained model found. Please train first.")
# Preprocess and predict
cleaned_log = self.preprocessor.clean_log(log_content)
log_vector = self.preprocessor.vectorizer.transform([cleaned_log])
prediction = self.model.predict(log_vector)[0]
probability = self.model.predict_proba(log_vector)[0]
return {
'is_anomaly': bool(prediction),
'confidence': float(max(probability)),
'anomaly_probability': float(probability[1]) if len(probability) > 1 else 0.0
}
if __name__ == "__main__":
model = AnomalyDetectionModel()
model.train() |