Spaces:
Configuration error
Configuration error
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| import joblib | |
| import re | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| import numpy as np | |
| # REMOVE: import shap # This is causing the error | |
| app = FastAPI(title="CI/CD Log Anomaly Detection API") | |
| # Global variables for model and vectorizer | |
| model = None | |
| vectorizer = None | |
| # REMOVE: explainer = None # Not needed without SHAP | |
| class LogRequest(BaseModel): | |
| log_content: str | |
| include_explanation: bool = False | |
| class PredictionResponse(BaseModel): | |
| is_anomaly: bool | |
| confidence: float | |
| anomaly_probability: float | |
| explanation: dict = None | |
| def clean_log(log_content): | |
| """Clean and normalize log content""" | |
| # Remove timestamps | |
| log_content = re.sub(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z', '[TIMESTAMP]', log_content) | |
| # Remove specific IDs and numbers | |
| log_content = re.sub(r'run_\d+', 'run_ID', log_content) | |
| log_content = re.sub(r'job_\d+', 'job_ID', log_content) | |
| log_content = re.sub(r'\b\d{8,}\b', '[ID]', log_content) | |
| # Remove paths that might be environment-specific | |
| log_content = re.sub(r'/home/[^/\s]+', '/home/USER', log_content) | |
| log_content = re.sub(r'/tmp/[^/\s]+', '/tmp/TEMP', log_content) | |
| # Normalize whitespace | |
| log_content = re.sub(r'\s+', ' ', log_content) | |
| return log_content.strip() | |
| async def load_model(): | |
| """Load the trained model and vectorizer on startup""" | |
| global model, vectorizer | |
| try: | |
| model = joblib.load('anomaly_model.pkl') | |
| vectorizer = joblib.load('vectorizer.pkl') | |
| print("✅ Real model loaded successfully") | |
| except FileNotFoundError: | |
| print("⚠️ No trained model found. Creating mock model...") | |
| # Create a simple mock model | |
| texts = [ | |
| "error failure exception crash", | |
| "success passed completed ok", | |
| "warning slow performance issue", | |
| "build successful tests passed" | |
| ] | |
| labels = [1, 0, 1, 0] # 1=anomaly, 0=normal | |
| vectorizer = TfidfVectorizer(max_features=50) | |
| X = vectorizer.fit_transform(texts) | |
| model = RandomForestClassifier(n_estimators=10, random_state=42) | |
| model.fit(X, labels) | |
| print("✅ Mock model created for deployment") | |
| def get_feature_importance_explanation(log_content, prediction_proba): | |
| """Get simple feature importance explanation without SHAP""" | |
| try: | |
| cleaned_log = clean_log(log_content) | |
| log_vector = vectorizer.transform([cleaned_log]) | |
| # Get feature names and their importance | |
| feature_names = vectorizer.get_feature_names_out() | |
| feature_weights = log_vector.toarray()[0] | |
| # Get top features that contributed to the prediction | |
| top_indices = np.argsort(feature_weights)[-10:][::-1] # Top 10 features | |
| top_features = [] | |
| for idx in top_indices: | |
| if feature_weights[idx] > 0: | |
| top_features.append({ | |
| 'feature': feature_names[idx], | |
| 'weight': float(feature_weights[idx]), | |
| 'impact': 'anomaly' if prediction_proba[1] > 0.5 else 'normal' | |
| }) | |
| return { | |
| 'top_contributing_features': top_features[:5], | |
| 'explanation': "Features with higher weights contributed more to the prediction" | |
| } | |
| except Exception as e: | |
| return { | |
| 'error': f"Could not generate explanation: {str(e)}", | |
| 'explanation': "Feature importance analysis failed" | |
| } | |
| async def predict_anomaly(request: LogRequest): | |
| """Predict if a log indicates an anomaly""" | |
| global model, vectorizer | |
| if model is None or vectorizer is None: | |
| raise HTTPException(status_code=500, detail="Model not loaded. Please ensure model files exist.") | |
| try: | |
| # Preprocess log | |
| cleaned_log = clean_log(request.log_content) | |
| log_vector = vectorizer.transform([cleaned_log]) | |
| # Make prediction | |
| prediction = model.predict(log_vector)[0] | |
| probabilities = model.predict_proba(log_vector)[0] | |
| response = PredictionResponse( | |
| is_anomaly=bool(prediction), | |
| confidence=float(max(probabilities)), | |
| anomaly_probability=float(probabilities[1]) if len(probabilities) > 1 else 0.0 | |
| ) | |
| # Add explanation if requested | |
| if request.include_explanation: | |
| response.explanation = get_feature_importance_explanation(request.log_content, probabilities) | |
| return response | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Prediction failed: {str(e)}") | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return { | |
| "status": "healthy", | |
| "model_loaded": model is not None, | |
| "vectorizer_loaded": vectorizer is not None | |
| } | |
| async def root(): | |
| """Root endpoint with API information""" | |
| return { | |
| "message": "CI/CD Log Anomaly Detection API", | |
| "version": "1.0.0", | |
| "endpoints": { | |
| "predict": "/predict (POST)", | |
| "health": "/health (GET)", | |
| "docs": "/docs (GET)" | |
| } | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |