ci-cd-anomaly-detection / preprocess_logs.py
maryangel101
Initial commit with calculator app and CI/CD workflow
8d2ec7a
Raw
History Blame Contribute Delete
5.04 kB
import os
import re
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
import joblib
import numpy as np
class LogPreprocessor:
def __init__(self):
self.vectorizer = TfidfVectorizer(
max_features=1000,
stop_words='english',
ngram_range=(1, 2),
min_df=2
)
def clean_log(self, log_content):
# Remove timestamps
log_content = re.sub(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z', '[TIMESTAMP]', log_content)
# Remove specific IDs and numbers
log_content = re.sub(r'run_\d+', 'run_ID', log_content)
log_content = re.sub(r'job_\d+', 'job_ID', log_content)
log_content = re.sub(r'\b\d{8,}\b', '[ID]', log_content)
# Remove paths that might be environment-specific
log_content = re.sub(r'/home/[^/\s]+', '/home/USER', log_content)
log_content = re.sub(r'/tmp/[^/\s]+', '/tmp/TEMP', log_content)
# Normalize whitespace
log_content = re.sub(r'\s+', ' ', log_content)
return log_content.strip()
def load_logs(self, data_dir):
logs = []
labels = []
# Load normal logs
normal_dir = os.path.join(data_dir, 'normal')
if os.path.exists(normal_dir):
for filename in os.listdir(normal_dir):
filepath = os.path.join(normal_dir, filename)
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
logs.append(self.clean_log(content))
labels.append(0) # 0 for normal
# Load anomalous logs
anomalous_dir = os.path.join(data_dir, 'anomalous')
if os.path.exists(anomalous_dir):
for filename in os.listdir(anomalous_dir):
filepath = os.path.join(anomalous_dir, filename)
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
logs.append(self.clean_log(content))
labels.append(1) # 1 for anomalous
return logs, labels
class AnomalyDetectionModel:
def __init__(self):
self.preprocessor = LogPreprocessor()
self.model = RandomForestClassifier(
n_estimators=100,
random_state=42,
class_weight='balanced'
)
self.is_trained = False
def train(self, data_dir='data'):
print("Loading logs...")
logs, labels = self.preprocessor.load_logs(data_dir)
if len(logs) == 0:
raise ValueError("No logs found. Please run data collection first.")
print(f"Loaded {len(logs)} logs ({labels.count(0)} normal, {labels.count(1)} anomalous)")
# Vectorize logs
print("Vectorizing logs...")
X = self.preprocessor.vectorizer.fit_transform(logs)
y = np.array(labels)
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Train model
print("Training model...")
self.model.fit(X_train, y_train)
# Evaluate
y_pred = self.model.predict(X_test)
print("\nModel Performance:")
print(classification_report(y_test, y_pred, target_names=['Normal', 'Anomalous']))
print("\nConfusion Matrix:")
print(confusion_matrix(y_test, y_pred))
self.is_trained = True
# Save model and vectorizer
joblib.dump(self.model, 'anomaly_model.pkl')
joblib.dump(self.preprocessor.vectorizer, 'vectorizer.pkl')
print("\nModel saved as 'anomaly_model.pkl' and 'vectorizer.pkl'")
def predict(self, log_content):
if not self.is_trained:
# Load saved model
try:
self.model = joblib.load('anomaly_model.pkl')
self.preprocessor.vectorizer = joblib.load('vectorizer.pkl')
self.is_trained = True
except FileNotFoundError:
raise ValueError("No trained model found. Please train first.")
# Preprocess and predict
cleaned_log = self.preprocessor.clean_log(log_content)
log_vector = self.preprocessor.vectorizer.transform([cleaned_log])
prediction = self.model.predict(log_vector)[0]
probability = self.model.predict_proba(log_vector)[0]
return {
'is_anomaly': bool(prediction),
'confidence': float(max(probability)),
'anomaly_probability': float(probability[1]) if len(probability) > 1 else 0.0
}
if __name__ == "__main__":
model = AnomalyDetectionModel()
model.train()