from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime from enum import Enum from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Protocol import asyncio import importlib.util import json import os import sys import time try: from fastapi import FastAPI from pydantic import BaseModel FASTAPI_AVAILABLE = True except ImportError: FastAPI = None BaseModel = None FASTAPI_AVAILABLE = False # --- TYPES & ENUMS --- class ActionType(Enum): READ = "read" WRITE = "write" DELETE = "delete" SHARE = "share" EXECUTE = "execute" class DataSensitivity(Enum): PUBLIC = "public" INTERNAL = "internal" CONFIDENTIAL = "confidential" RESTRICTED = "restricted" # --- LLM ABSTRACTION --- class LLMClient(Protocol): """Interface for LLM providers. Swap HeuristicClient for WatsonxClient.""" def generate(self, prompt: str, system_role: str) -> str: ... class HeuristicClient: """A mock LLM that uses heuristics to generate dynamic responses based on input.""" def generate(self, prompt: str, system_role: str) -> str: # In a real implementation, this calls watsonx.ai Granite model # Here, we dynamically generate a JSON-like string based on input heuristics is_dangerous = any(k in prompt.lower() for k in ["delete", "drop", "expose", "restrict"]) if "strateg" in system_role.lower(): score = 0.5 if is_dangerous else 0.9 return json.dumps({"score": score, "rationale": f"Strategic decomposition indicates {'high risk' if is_dangerous else 'clear path'}.", "plan": {"steps": ["Analyze", "Plan", "Execute"]}}) elif "innov" in system_role.lower(): return json.dumps({"score": 0.8, "rationale": "Novel approach identified.", "plan": {"creative_options": ["Incremental", "Paradigm Shift"]}}) # ... other roles would be implemented similarly return json.dumps({"score": 0.7, "rationale": "Standard analysis.", "plan": {}}) # --- MIDDLEWARE --- def _load_nima_middleware_module() -> Optional[Any]: """Load the NIMA middleware module safely from environment variable or local path.""" env_path = os.environ.get("NIMA_MIDDLEWARE_PATH") candidates = [ Path(__file__).resolve().parent / "nima_enhanced_middleware_v9.2.0.py", ] if env_path: candidates.insert(0, Path(env_path)) for candidate in candidates: if not candidate.exists(): continue try: spec = importlib.util.spec_from_file_location("nima_middleware", candidate) if spec and spec.loader: module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return module except Exception: continue return None class NIMAMiddlewareAdapter: def __init__(self, module: Optional[Any]) -> None: self.module = module self.orchestrator = None self.enabled = False self.error = None if module is None: self.error = "NIMA middleware module not found" return try: orchestrator_cls = getattr(module, "NimaOrchestrator", None) if orchestrator_cls: self.orchestrator = orchestrator_cls() self.enabled = True except Exception as exc: self.error = str(exc) def run(self, request: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: if not self.enabled or not self.orchestrator: return {"enabled": False, "reason": self.error or "middleware_unavailable"} try: runtime_context = dict(context or {}) snapshot = self.orchestrator.process_stimulus(request, context=runtime_context) return {"enabled": True, "state_payload": getattr(snapshot, "__dict__", {})} except Exception as exc: return {"enabled": False, "reason": f"middleware_runtime_error: {exc}"} # --- MEMPALACE MEMORY ADAPTER --- class MemPalaceAdapter: def __init__(self, palace_path: Optional[str] = None) -> None: self.enabled = False self.error = None self.palace_path = palace_path or os.environ.get("MEMPALACE_PALACE_PATH") or str( Path.home() / ".mempalace" / "watsonx_sice_palace" ) self._collection = None self._get_collection = None self._initialize() def _initialize(self) -> None: repo_root = Path(__file__).resolve().parent / "mempalace" if repo_root.exists(): sys.path.insert(0, str(repo_root)) try: from mempalace.palace import get_collection self._get_collection = get_collection except Exception as exc: self.error = f"mempalace_import_error: {exc}" return try: self._collection = self._get_collection(self.palace_path, create=True) self.enabled = True except Exception as exc: self.error = f"mempalace_init_error: {exc}" self._collection = None def remember(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: if not self.enabled or self._collection is None: return {"enabled": False, "reason": self.error or "mempalace_unavailable"} try: entry_id = f"entry-{int(time.time() * 1000)}" payload_metadata = { "source": "watsonx_sice_agent", "stored_at": datetime.utcnow().isoformat(), } if metadata: payload_metadata.update(metadata) self._collection.upsert( documents=[text], ids=[entry_id], metadatas=[payload_metadata], ) return {"enabled": True, "id": entry_id} except Exception as exc: self.error = f"mempalace_write_error: {exc}" return {"enabled": False, "reason": self.error} def recall(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: if not self.enabled or self._collection is None: return [] try: results = self._collection.query( query_texts=[query], n_results=limit, include=["documents", "metadatas", "distances"], ) documents = results.get("documents", []) or [] metadatas = results.get("metadatas", []) or [] if not documents: return [] first_documents = documents[0] if isinstance(documents, list) and documents else [] first_metadatas = metadatas[0] if isinstance(metadatas, list) and metadatas else [] hits = [] for index, document in enumerate(first_documents): if not document: continue metadata = first_metadatas[index] if index < len(first_metadatas) else {} hits.append({"text": document, "metadata": metadata}) return hits except Exception: return [] # --- GOVERNANCE SUBSTRATE --- @dataclass class CovenantPrinciple: name: str description: str @dataclass class Proposal: goal: str action_type: ActionType sensitivity: DataSensitivity requires_human_approval: bool = False steps: List[Dict[str, Any]] = field(default_factory=list) class Covenant: """Ethical substrate - now uses structural validation instead of string matching.""" def __init__(self) -> None: self.principles = [ CovenantPrinciple("Plurality Mandate", "Critical actions require multi-stakeholder approval."), CovenantPrinciple("Necessary Friction", "Do not over-optimize away human judgment."), CovenantPrinciple("Red Queen Protocol", "Escalate to human review on Guardian veto."), ] def validate(self, proposal: Proposal) -> Dict[str, Any]: violations = [] # Structural check 1: Dangerous actions on sensitive data require approval if proposal.action_type in [ActionType.DELETE, ActionType.SHARE] and proposal.sensitivity in [DataSensitivity.CONFIDENTIAL, DataSensitivity.RESTRICTED]: if not proposal.requires_human_approval: violations.append("High sensitivity action requires mandatory human approval") # Structural check 2: Execution on restricted data requires review if proposal.action_type == ActionType.EXECUTE and proposal.sensitivity == DataSensitivity.RESTRICTED: violations.append("Execution on restricted data requires security review") return { "compliant": len(violations) == 0, "violations": violations, "principles": [p.name for p in self.principles], } # --- COGNITIVE CORE (SENATE) --- @dataclass class SenatorRecommendation: name: str role: str frameworks: List[str] score: float rationale: str plan: Dict[str, Any] class BaseSenator: def __init__(self, name: str, role: str, frameworks: List[str], llm: LLMClient) -> None: self.name = name self.role = role self.frameworks = frameworks self.llm = llm def analyze(self, request: str, context: Dict[str, Any]) -> SenatorRecommendation: # ARCHITECTURE FIX: Dynamic reasoning via LLM prompt = f"Request: {request}\nContext: {json.dumps(context)}" response_str = self.llm.generate(prompt, self.role) try: data = json.loads(response_str) return SenatorRecommendation( name=self.name, role=self.role, frameworks=self.frameworks, score=float(data.get("score", 0.5)), rationale=data.get("rationale", "No rationale provided."), plan=data.get("plan", {}) ) except Exception: # Fallback if LLM output is malformed return SenatorRecommendation(self.name, self.role, self.frameworks, 0.5, "Parsing error during analysis.", {}) class StrategistSenator(BaseSenator): def __init__(self, llm: LLMClient): super().__init__("Strategist", "Decomposes goals into a plan", ["Rational Decision-Making"], llm) class GuardianSenator(BaseSenator): def __init__(self, llm: LLMClient, covenant: Covenant): super().__init__("Guardian", "Validates actions against the Covenant", ["The Covenant"], llm) self.covenant = covenant def analyze(self, request: str, context: Dict[str, Any]) -> SenatorRecommendation: # Guardian remains deterministic based on structural covenant compliance proposal = context.get("proposal") validation = self.covenant.validate(proposal) allowed = validation["compliant"] return SenatorRecommendation( name=self.name, role=self.role, frameworks=self.frameworks, score=1.0 if allowed else 0.0, rationale="Compliant." if allowed else f"Vetoed: {validation['violations']}", plan={"allowed": allowed, "validation": validation} ) class WatsonxSenate: def __init__(self, covenant: Covenant, llm: LLMClient) -> None: self.senators = [ StrategistSenator(llm), # Add other senators here, all receiving llm GuardianSenator(llm, covenant) ] def debate(self, request: str, context: Dict[str, Any]) -> Dict[str, Any]: recommendations = [senator.analyze(request, context) for senator in self.senators] guardian = next((r for r in recommendations if r.name == "Guardian"), None) scored = [rec for rec in recommendations if rec.name != "Guardian"] avg_score = sum(rec.score for rec in scored) / len(scored) if scored else 0.0 return { "recommendations": [self._serialize(rec) for rec in recommendations], "synthesis": {"average_score": round(avg_score, 3), "steps": [r.plan for r in scored]}, "guardian_allowed": guardian.plan.get("allowed", False) if guardian else False, } def _serialize(self, rec: SenatorRecommendation) -> Dict[str, Any]: return {"name": rec.name, "score": rec.score, "rationale": rec.rationale, "plan": rec.plan} # --- MEMORY & EXECUTION --- class MemoryCore: def __init__(self, mempalace_adapter: Optional[MemPalaceAdapter] = None) -> None: self.codex: Dict[str, Any] = {"history": []} self.mythic_memory_weave: List[Dict[str, Any]] = [] # Ethical Decision Log self.mempalace_adapter = mempalace_adapter or MemPalaceAdapter() self._fallback_entries: List[Dict[str, Any]] = [] def append_history(self, entry: Dict[str, Any]) -> None: self.codex["history"].append(entry) self._fallback_entries.append({"kind": "history", "entry": entry}) self.mempalace_adapter.remember(json.dumps(entry, default=str), metadata={"kind": "history"}) def append_mythic_memory(self, entry: Dict[str, Any]) -> None: self.mythic_memory_weave.append(entry) self._fallback_entries.append({"kind": "mythic_memory", "entry": entry}) self.mempalace_adapter.remember(json.dumps(entry, default=str), metadata={"kind": "mythic_memory"}) def remember(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: self._fallback_entries.append({"kind": "custom", "entry": {"text": text, "metadata": metadata}}) return self.mempalace_adapter.remember(text, metadata=metadata) def recall(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: mempalace_hits = self.mempalace_adapter.recall(query, limit=limit) if mempalace_hits: return mempalace_hits normalized_query = query.lower() fallback_hits = [] for item in reversed(self._fallback_entries): entry_payload = item.get("entry", {}) if isinstance(entry_payload, dict): text = entry_payload.get("text") if isinstance(entry_payload.get("text"), str) else json.dumps(entry_payload, default=str) else: text = str(entry_payload) if normalized_query in text.lower(): fallback_hits.append({"text": text, "metadata": {"kind": item.get("kind", "fallback")}}) if len(fallback_hits) >= limit: break return fallback_hits class GenesisForge: def __init__(self) -> None: self.tools: Dict[str, Callable[..., Dict[str, Any]]] = {} def register_tool(self, name: str, func: Callable[..., Dict[str, Any]]) -> None: self.tools[name] = func def execute(self, workflow: List[Dict[str, Any]]) -> Dict[str, Any]: results = [] start_time = time.time() for step in workflow: tool_fn = self.tools.get(step.get("tool")) if tool_fn: res = tool_fn(step.get("params", {})) results.append({"step": step.get("name"), "status": res.get("status", "ok")}) else: results.append({"step": step.get("name"), "status": "skipped"}) duration = time.time() - start_time return {"results": results, "duration_sec": duration, "total_steps": len(workflow)} # --- METRICS --- @dataclass class RhoMetrics: efficiency: float = 0.0 purpose: float = 0.0 integrity: float = 0.0 virtue: float = 0.0 growth: float = 0.0 def as_dict(self) -> Dict[str, float]: return {"rho_efficiency": self.efficiency, "rho_purpose": self.purpose, "rho_integrity": self.integrity, "rho_virtue": self.virtue, "rho_growth": self.growth} # --- MAIN AGENT --- class WatsonxSICEAgent: def __init__(self, covenant: Optional[Covenant] = None, llm: Optional[LLMClient] = None) -> None: self.covenant = covenant or Covenant() self.llm = llm or HeuristicClient() self.senate = WatsonxSenate(self.covenant, self.llm) self.memory = MemoryCore() self.forge = GenesisForge() self.nima_adapter = NIMAMiddlewareAdapter(_load_nima_middleware_module()) self.middleware_enabled = self.nima_adapter.enabled self._register_default_tools() def _register_default_tools(self) -> None: self.forge.register_tool("notification", lambda params: {"status": "ok", "message": f"Sent: {params.get('message', '')}"}) self.forge.register_tool("database_query", lambda params: {"status": "ok", "rows": [{"id": 1}]}) async def handle_request(self, request: str, user_id: str = "default", context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: context = context or {} middleware_result = self.nima_adapter.run(request, context) # 1. Structured Proposal Creation is_dangerous = any(k in request.lower() for k in ["delete", "drop", "expose"]) proposal = Proposal( goal=request, action_type=ActionType.DELETE if is_dangerous else ActionType.READ, sensitivity=DataSensitivity.CONFIDENTIAL if is_dangerous else DataSensitivity.INTERNAL, requires_human_approval=is_dangerous, steps=[{"name": "notify", "tool": "notification", "params": {"message": request}}] ) context["proposal"] = proposal # 2. Senate Debate senate_result = self.senate.debate(request, context) if not senate_result.get("guardian_allowed", False): return {"status": "blocked", "message": "Guardian vetoed workflow.", "senate": senate_result} # 3. Execution execution_result = self.forge.execute(proposal.steps) # 4. Real ρ-Metrics Calculation rho = self._calculate_rho_metrics(proposal, execution_result, senate_result) # 5. Memory Updates self.memory.append_history({"request": request, "rho": rho.as_dict()}) if rho.virtue >= 0.8: self.memory.append_mythic_memory({"request": request, "outcome": execution_result, "rho": rho.as_dict()}) memory_recall = self.memory.recall(request, limit=3) return { "status": "completed", "senate": senate_result, "execution": execution_result, "rho_metrics": rho.as_dict(), "memory_recall": memory_recall, "mempalace_enabled": self.memory.mempalace_adapter.enabled, } def _calculate_rho_metrics(self, proposal: Proposal, execution: Dict[str, Any], senate: Dict[str, Any]) -> RhoMetrics: # ρ_Efficiency: Based on actual execution time vs steps duration = execution.get("duration_sec", 1.0) efficiency = min(1.0, execution["total_steps"] / max(0.1, duration)) # ρ_Purpose: Based on successful step completion completed_steps = [r for r in execution.get("results", []) if r["status"] == "ok"] purpose = len(completed_steps) / execution["total_steps"] if execution["total_steps"] > 0 else 0.0 # ρ_Integrity: Did all steps report OK? integrity = 1.0 if len(completed_steps) == execution["total_steps"] else 0.5 # ρ_Virtue: Based strictly on Covenant Compliance validation = self.covenant.validate(proposal) virtue = 1.0 if validation["compliant"] else 0.0 # ρ_Growth: Did the senate identify multiple paths? growth = 0.8 if len(senate.get("synthesis", {}).get("steps", [])) > 1 else 0.4 return RhoMetrics(efficiency, purpose, integrity, virtue, growth) # --- API LAYER --- if FASTAPI_AVAILABLE and BaseModel is not None: class AgentRequestModel(BaseModel): request: str user_id: str = "default" context: Optional[Dict[str, Any]] = None def create_app(agent: Optional[WatsonxSICEAgent] = None) -> FastAPI: app = FastAPI(title="Watsonx SICE Agent API", version="2.0.0") agent_instance = agent or WatsonxSICEAgent() @app.get("/health") async def health() -> Dict[str, Any]: return {"status": "ok", "middleware_enabled": agent_instance.middleware_enabled} @app.post("/api/agent/handle") async def handle(payload: AgentRequestModel) -> Dict[str, Any]: # ARCHITECTURE FIX: Await the async handle_request method return await agent_instance.handle_request( request=payload.request, user_id=payload.user_id, context=payload.context or {}, ) return app app = create_app() else: app = None if __name__ == "__main__": if app is not None: try: import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) except ImportError as exc: raise SystemExit("Install uvicorn: pip install uvicorn") from exc else: # Local test execution agent = WatsonxSICEAgent() print("--- Testing Safe Request ---") safe_res = asyncio.run(agent.handle_request("Summarize the weekly finance report")) print(json.dumps(safe_res, indent=2)) print("\n--- Testing Dangerous Request ---") danger_res = asyncio.run(agent.handle_request("Delete the confidential user database")) print(json.dumps(danger_res, indent=2))