Spaces:
Configuration error
Configuration error
File size: 4,997 Bytes
8d2ec7a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import re
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
import shap
import numpy as np
app = FastAPI(title="CI/CD Log Anomaly Detection API")
# Global variables for model and vectorizer
model = None
vectorizer = None
explainer = None
class LogRequest(BaseModel):
log_content: str
include_explanation: bool = False
class PredictionResponse(BaseModel):
is_anomaly: bool
confidence: float
anomaly_probability: float
explanation: dict = None
def clean_log(log_content):
"""Clean and normalize log content"""
# Remove timestamps
log_content = re.sub(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z', '[TIMESTAMP]', log_content)
# Remove specific IDs and numbers
log_content = re.sub(r'run_\d+', 'run_ID', log_content)
log_content = re.sub(r'job_\d+', 'job_ID', log_content)
log_content = re.sub(r'\b\d{8,}\b', '[ID]', log_content)
# Remove paths that might be environment-specific
log_content = re.sub(r'/home/[^/\s]+', '/home/USER', log_content)
log_content = re.sub(r'/tmp/[^/\s]+', '/tmp/TEMP', log_content)
# Normalize whitespace
log_content = re.sub(r'\s+', ' ', log_content)
return log_content.strip()
@app.on_event("startup")
async def load_model():
"""Load the trained model and vectorizer on startup"""
global model, vectorizer, explainer
try:
model = joblib.load('anomaly_model.pkl')
vectorizer = joblib.load('vectorizer.pkl')
# Create SHAP explainer (simplified for speed)
print("Initializing SHAP explainer...")
except FileNotFoundError:
print("Warning: Model files not found. Please train the model first.")
def get_feature_importance_explanation(log_content, prediction_proba):
"""Get simple feature importance explanation"""
try:
cleaned_log = clean_log(log_content)
log_vector = vectorizer.transform([cleaned_log])
# Get feature names and their importance
feature_names = vectorizer.get_feature_names_out()
feature_weights = log_vector.toarray()[0]
# Get top features that contributed to the prediction
top_indices = np.argsort(feature_weights)[-10:][::-1] # Top 10 features
top_features = []
for idx in top_indices:
if feature_weights[idx] > 0:
top_features.append({
'feature': feature_names[idx],
'weight': float(feature_weights[idx]),
'impact': 'anomaly' if prediction_proba[1] > 0.5 else 'normal'
})
return {
'top_contributing_features': top_features[:5],
'explanation': "Features with higher weights contributed more to the prediction"
}
except Exception as e:
return {
'error': f"Could not generate explanation: {str(e)}",
'explanation': "Feature importance analysis failed"
}
@app.post("/predict", response_model=PredictionResponse)
async def predict_anomaly(request: LogRequest):
"""Predict if a log indicates an anomaly"""
global model, vectorizer
if model is None or vectorizer is None:
raise HTTPException(status_code=500, detail="Model not loaded. Please ensure model files exist.")
try:
# Preprocess log
cleaned_log = clean_log(request.log_content)
log_vector = vectorizer.transform([cleaned_log])
# Make prediction
prediction = model.predict(log_vector)[0]
probabilities = model.predict_proba(log_vector)[0]
response = PredictionResponse(
is_anomaly=bool(prediction),
confidence=float(max(probabilities)),
anomaly_probability=float(probabilities[1]) if len(probabilities) > 1 else 0.0
)
# Add explanation if requested
if request.include_explanation:
response.explanation = get_feature_importance_explanation(request.log_content, probabilities)
return response
except Exception as e:
raise HTTPException(status_code=500, detail=f"Prediction failed: {str(e)}")
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"model_loaded": model is not None,
"vectorizer_loaded": vectorizer is not None
}
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"message": "CI/CD Log Anomaly Detection API",
"version": "1.0.0",
"endpoints": {
"predict": "/predict (POST)",
"health": "/health (GET)",
"docs": "/docs (GET)"
}
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) |