File size: 5,567 Bytes
8d2ec7a
 
 
 
 
 
 
7340e3d
8d2ec7a
 
 
 
 
 
7340e3d
8d2ec7a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7340e3d
8d2ec7a
 
 
 
7340e3d
 
 
 
 
 
 
 
 
 
 
8d2ec7a
7340e3d
 
8d2ec7a
7340e3d
 
 
8d2ec7a
 
7340e3d
8d2ec7a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import re
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
# REMOVE: import shap  # This is causing the error

app = FastAPI(title="CI/CD Log Anomaly Detection API")

# Global variables for model and vectorizer
model = None
vectorizer = None
# REMOVE: explainer = None  # Not needed without SHAP

class LogRequest(BaseModel):
    log_content: str
    include_explanation: bool = False

class PredictionResponse(BaseModel):
    is_anomaly: bool
    confidence: float
    anomaly_probability: float
    explanation: dict = None

def clean_log(log_content):
    """Clean and normalize log content"""
    # Remove timestamps
    log_content = re.sub(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z', '[TIMESTAMP]', log_content)
    
    # Remove specific IDs and numbers
    log_content = re.sub(r'run_\d+', 'run_ID', log_content)
    log_content = re.sub(r'job_\d+', 'job_ID', log_content)
    log_content = re.sub(r'\b\d{8,}\b', '[ID]', log_content)
    
    # Remove paths that might be environment-specific
    log_content = re.sub(r'/home/[^/\s]+', '/home/USER', log_content)
    log_content = re.sub(r'/tmp/[^/\s]+', '/tmp/TEMP', log_content)
    
    # Normalize whitespace
    log_content = re.sub(r'\s+', ' ', log_content)
    
    return log_content.strip()

@app.on_event("startup")
async def load_model():
    """Load the trained model and vectorizer on startup"""
    global model, vectorizer
    
    try:
        model = joblib.load('anomaly_model.pkl')
        vectorizer = joblib.load('vectorizer.pkl')
        print("✅ Real model loaded successfully")
    except FileNotFoundError:
        print("⚠️ No trained model found. Creating mock model...")
        # Create a simple mock model
        texts = [
            "error failure exception crash",
            "success passed completed ok",
            "warning slow performance issue",
            "build successful tests passed"
        ]
        labels = [1, 0, 1, 0]  # 1=anomaly, 0=normal
        
        vectorizer = TfidfVectorizer(max_features=50)
        X = vectorizer.fit_transform(texts)
        
        model = RandomForestClassifier(n_estimators=10, random_state=42)
        model.fit(X, labels)
        print("✅ Mock model created for deployment")

def get_feature_importance_explanation(log_content, prediction_proba):
    """Get simple feature importance explanation without SHAP"""
    try:
        cleaned_log = clean_log(log_content)
        log_vector = vectorizer.transform([cleaned_log])
        
        # Get feature names and their importance
        feature_names = vectorizer.get_feature_names_out()
        feature_weights = log_vector.toarray()[0]
        
        # Get top features that contributed to the prediction
        top_indices = np.argsort(feature_weights)[-10:][::-1]  # Top 10 features
        top_features = []
        
        for idx in top_indices:
            if feature_weights[idx] > 0:
                top_features.append({
                    'feature': feature_names[idx],
                    'weight': float(feature_weights[idx]),
                    'impact': 'anomaly' if prediction_proba[1] > 0.5 else 'normal'
                })
        
        return {
            'top_contributing_features': top_features[:5],
            'explanation': "Features with higher weights contributed more to the prediction"
        }
    
    except Exception as e:
        return {
            'error': f"Could not generate explanation: {str(e)}",
            'explanation': "Feature importance analysis failed"
        }

@app.post("/predict", response_model=PredictionResponse)
async def predict_anomaly(request: LogRequest):
    """Predict if a log indicates an anomaly"""
    global model, vectorizer
    
    if model is None or vectorizer is None:
        raise HTTPException(status_code=500, detail="Model not loaded. Please ensure model files exist.")
    
    try:
        # Preprocess log
        cleaned_log = clean_log(request.log_content)
        log_vector = vectorizer.transform([cleaned_log])
        
        # Make prediction
        prediction = model.predict(log_vector)[0]
        probabilities = model.predict_proba(log_vector)[0]
        
        response = PredictionResponse(
            is_anomaly=bool(prediction),
            confidence=float(max(probabilities)),
            anomaly_probability=float(probabilities[1]) if len(probabilities) > 1 else 0.0
        )
        
        # Add explanation if requested
        if request.include_explanation:
            response.explanation = get_feature_importance_explanation(request.log_content, probabilities)
        
        return response
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Prediction failed: {str(e)}")

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {
        "status": "healthy",
        "model_loaded": model is not None,
        "vectorizer_loaded": vectorizer is not None
    }

@app.get("/")
async def root():
    """Root endpoint with API information"""
    return {
        "message": "CI/CD Log Anomaly Detection API",
        "version": "1.0.0",
        "endpoints": {
            "predict": "/predict (POST)",
            "health": "/health (GET)",
            "docs": "/docs (GET)"
        }
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)