ci-cd-anomaly-detection / .github /scripts /anomaly_detector.py
maryangel101
Add robust anomaly detector with mock fallback
62ccdb1
Raw
History Blame Contribute Delete
6.59 kB
import os
import requests
import sys
import json
from urllib.parse import urljoin
def download_workflow_logs(run_id, token):
"""Download logs from a specific workflow run"""
headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3+json'
}
# Get repository info from environment
repo_full_name = os.environ.get('GITHUB_REPOSITORY')
# Download logs
url = f'https://api.github.com/repos/{repo_full_name}/actions/runs/{run_id}/logs'
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.text
else:
print(f"Failed to download logs: {response.status_code}")
print(f"Response: {response.text}")
return None
def analyze_logs_with_model(log_content, model_url):
"""Send logs to the model API for analysis with fallback to mock response"""
try:
# If no model URL provided, use mock response
if not model_url or model_url == "https://httpbin.org":
print("πŸ€– Using mock response (no valid API URL provided)")
return generate_mock_response(log_content)
# Ensure the URL has a scheme
if not model_url.startswith(('http://', 'https://')):
print(f"❌ MODEL_API_URL is missing scheme: {model_url}")
print("πŸ’‘ Using mock response instead")
return generate_mock_response(log_content)
# Construct the full prediction endpoint URL
if model_url.endswith('/predict'):
prediction_url = model_url
else:
prediction_url = urljoin(model_url.rstrip('/') + '/', 'predict')
print(f"πŸ”— Calling model API: {prediction_url}")
response = requests.post(
prediction_url,
json={
"log_content": log_content,
"include_explanation": True
},
timeout=30
)
if response.status_code == 200:
return response.json()
else:
print(f"❌ Model API error: {response.status_code}")
print(f"Response: {response.text}")
print("πŸ’‘ Falling back to mock response")
return generate_mock_response(log_content)
except requests.exceptions.Timeout:
print("❌ Model API request timed out (30 seconds)")
print("πŸ’‘ Falling back to mock response")
return generate_mock_response(log_content)
except requests.exceptions.ConnectionError:
print("❌ Failed to connect to Model API - check the URL")
print("πŸ’‘ Falling back to mock response")
return generate_mock_response(log_content)
except requests.exceptions.RequestException as e:
print(f"❌ Failed to call model API: {e}")
print("πŸ’‘ Falling back to mock response")
return generate_mock_response(log_content)
except Exception as e:
print(f"❌ Unexpected error: {e}")
print("πŸ’‘ Falling back to mock response")
return generate_mock_response(log_content)
def generate_mock_response(log_content):
"""Generate a mock response for testing when API is unavailable"""
# Simple anomaly detection based on log content
error_keywords = ['error', 'fail', 'failed', 'exception', 'traceback', 'timeout']
warning_keywords = ['warning', 'slow', 'retry', 'timeout']
error_count = sum(1 for keyword in error_keywords if keyword in log_content.lower())
warning_count = sum(1 for keyword in warning_keywords if keyword in log_content.lower())
# Determine if anomaly based on error count
is_anomaly = error_count > 2 or (error_count > 0 and warning_count > 3)
# Calculate confidence based on keyword matches
confidence = min(0.95, 0.7 + (error_count * 0.1) + (warning_count * 0.05))
return {
'is_anomaly': is_anomaly,
'confidence': round(confidence, 2),
'anomaly_probability': 0.8 if is_anomaly else 0.2,
'explanation': {
'detection_method': 'mock_analysis',
'error_keywords_found': [kw for kw in error_keywords if kw in log_content.lower()],
'warning_keywords_found': [kw for kw in warning_keywords if kw in log_content.lower()],
'error_count': error_count,
'warning_count': warning_count,
'message': 'This analysis used mock detection. Set up a real model API for accurate results.'
}
}
def main():
# Get environment variables
github_token = os.environ.get('GITHUB_TOKEN')
workflow_run_id = os.environ.get('WORKFLOW_RUN_ID')
model_api_url = os.environ.get('MODEL_API_URL')
print("πŸ” Environment variables check:")
print(f" GITHUB_TOKEN: {'βœ…' if github_token else '❌'}")
print(f" WORKFLOW_RUN_ID: {'βœ…' if workflow_run_id else '❌'} -> {workflow_run_id}")
print(f" MODEL_API_URL: {'βœ…' if model_api_url else '❌'} -> {model_api_url}")
# Only require GITHUB_TOKEN and WORKFLOW_RUN_ID, MODEL_API_URL is optional
if not github_token or not workflow_run_id:
print("❌ Missing required environment variables: GITHUB_TOKEN or WORKFLOW_RUN_ID")
sys.exit(0) # Exit gracefully, don't fail the workflow
print(f"πŸ“Š Analyzing workflow run: {workflow_run_id}")
# Download logs
print("⬇️ Downloading workflow logs...")
logs = download_workflow_logs(workflow_run_id, github_token)
if not logs:
print("❌ Failed to download logs")
sys.exit(0) # Exit gracefully, don't fail the workflow
print(f"πŸ“„ Logs downloaded ({len(logs)} characters)")
# Analyze with model (or mock)
result = analyze_logs_with_model(logs, model_api_url)
if not result:
print("❌ Failed to analyze logs")
sys.exit(0) # Exit gracefully, don't fail the workflow
print(f"πŸ“‹ Analysis result: {json.dumps(result, indent=2)}")
# Check if anomaly detected
if result.get('is_anomaly', False):
print("🚨 ANOMALY DETECTED!")
print(f" Confidence: {result.get('confidence', 0):.2%}")
print(f" Anomaly Probability: {result.get('anomaly_probability', 0):.2%}")
if result.get('explanation'):
print(" Explanation:", json.dumps(result['explanation'], indent=2))
# Exit with error code to mark step as failed
sys.exit(1)
else:
print("βœ… No anomalies detected")
sys.exit(0)
if __name__ == "__main__":
main()