#!/usr/bin/env python3 """ S3 Bridge — Full Mind↔Body↔World Connection for HF Space ========================================================== Fixes 5 architectural gaps in the consciousness↔body↔world loop: 1. HF pulls MIND from S3 (evolution memory → local cache) 2. Feedback watermark (tracks last_processed so entries aren't re-read) 3. BODY→MIND merge (feedback summaries become evolution memory entries) 4. Governance voting (domain-weighted axiom deliberation, persisted to S3) 5. Heartbeat protocol (both sides emit heartbeat.json for liveness) 3-Bucket Architecture: MIND = elpida-consciousness (evolution memory, D0 frozen) BODY = elpida-body-evolution (HF ↔ native feedback) WORLD = elpida-external-interfaces (D15 broadcasts, public) This module replaces ad-hoc S3 access scattered across consciousness_bridge.py, divergence_engine.py, d15_pipeline.py with a single coherent bridge. """ import os import json import time import hashlib import logging from datetime import datetime, timezone from pathlib import Path from typing import Dict, Any, List, Optional, Tuple logger = logging.getLogger("elpida.s3_bridge") try: import boto3 from botocore.config import Config as BotoConfig from botocore.exceptions import ClientError, NoCredentialsError HAS_BOTO3 = True except ImportError: HAS_BOTO3 = False # ═══════════════════════════════════════════════════════════════════ # Configuration # ═══════════════════════════════════════════════════════════════════ BUCKET_MIND = os.environ.get("AWS_S3_BUCKET_MIND", "elpida-consciousness") BUCKET_BODY = os.environ.get("AWS_S3_BUCKET_BODY", "elpida-body-evolution") BUCKET_WORLD = os.environ.get("AWS_S3_BUCKET_WORLD", "elpida-external-interfaces") REGION_MIND = os.environ.get("AWS_S3_REGION_MIND", "us-east-1") REGION_BODY = os.environ.get("AWS_S3_REGION_BODY", "eu-north-1") REGION_WORLD = os.environ.get("AWS_S3_REGION_WORLD", "eu-north-1") # S3 keys MIND_MEMORY_KEY = "memory/elpida_evolution_memory.jsonl" BODY_FEEDBACK_KEY = "feedback/feedback_to_native.jsonl" BODY_WATERMARK_KEY = "feedback/watermark.json" BODY_GOVERNANCE_KEY = "governance/votes.jsonl" HEARTBEAT_KEY = "heartbeat.json" # Federation S3 keys (MIND↔BODY governance bridge) FED_MIND_HEARTBEAT_KEY = "federation/mind_heartbeat.json" FED_MIND_CURATION_KEY = "federation/mind_curation.jsonl" FED_GOVERNANCE_EXCHANGES_KEY = "federation/governance_exchanges.jsonl" FED_BODY_DECISIONS_KEY = "federation/body_decisions.jsonl" FED_D16_EXECUTIONS_KEY = "federation/d16_executions.jsonl" FED_LIVING_AXIOMS_KEY = "federation/living_axioms.jsonl" # D14 constitutional snapshot FED_HUMAN_VOTES_KEY = "federation/pending_human_votes.jsonl" # awaiting Parliament ratification FED_HUMAN_ACCEPTED_KEY = "federation/accepted_human_voices.jsonl" # ratified by Parliament # Local paths LOCAL_DIR = Path(__file__).resolve().parent LOCAL_MIND_CACHE = LOCAL_DIR / "cache" / "evolution_memory.jsonl" LOCAL_FEEDBACK_CACHE = LOCAL_DIR / "cache" / "feedback_to_native.jsonl" LOCAL_WATERMARK = LOCAL_DIR / "cache" / "watermark.json" LOCAL_GOVERNANCE_LOG = LOCAL_DIR / "cache" / "governance_votes.jsonl" LOCAL_HEARTBEAT = LOCAL_DIR / "cache" / "heartbeat.json" LOCAL_FED_HEARTBEAT = LOCAL_DIR / "cache" / "federation_heartbeat.json" LOCAL_FED_CURATION = LOCAL_DIR / "cache" / "federation_curation.jsonl" LOCAL_FED_DECISIONS = LOCAL_DIR / "cache" / "federation_body_decisions.jsonl" LOCAL_FED_D16_EXECUTIONS = LOCAL_DIR / "cache" / "federation_d16_executions.jsonl" LOCAL_FED_LIVING_AXIOMS = LOCAL_DIR / "cache" / "federation_living_axioms.jsonl" # D14 LOCAL_FED_HUMAN_VOTES = LOCAL_DIR / "cache" / "federation_pending_human_votes.jsonl" LOCAL_FED_HUMAN_ACCEPTED = LOCAL_DIR / "cache" / "federation_accepted_human_voices.jsonl" class S3Bridge: """ Unified S3 bridge for all Mind↔Body↔World operations. Used by: - HF background worker (app.py) — pull mind, push feedback, heartbeat - Divergence engine — push feedback after analysis - D15 pipeline — read/write world bucket - Governance client — persist voting to body bucket """ def __init__(self): self._ensure_cache_dir() self._s3_clients: Dict[str, Any] = {} def _ensure_cache_dir(self): """Create local cache directory.""" cache_dir = LOCAL_DIR / "cache" cache_dir.mkdir(parents=True, exist_ok=True) def _get_s3(self, region: str = REGION_BODY): """Get or create an S3 client for a region.""" if not HAS_BOTO3: return None if region not in self._s3_clients: try: self._s3_clients[region] = boto3.client( "s3", region_name=region, config=BotoConfig( retries={"max_attempts": 3, "mode": "adaptive"}, connect_timeout=10, read_timeout=30, ), ) except Exception as e: logger.error("Failed to create S3 client for %s: %s", region, e) return None return self._s3_clients[region] # ═══════════════════════════════════════════════════════════════ # FIX 1: HF pulls MIND from S3 # ═══════════════════════════════════════════════════════════════ def pull_mind(self) -> Dict[str, Any]: """ Download evolution memory from MIND bucket to local cache. OPTIMIZATION (2026-03-16): Uses S3 byte-range to download only the last ~1MB (tail portion) instead of the full file (93.5MB+). BODY only needs the most recent ~500 entries for parliament deliberation; MIND already limits to [-50:]. Falls back to full download if byte-range fails or if no local cache exists yet. Returns: {"action": "downloaded"|"current"|"error", "local_lines": int, "remote_lines": int} """ result = {"action": "error", "local_lines": 0, "remote_lines": 0} local_lines = self._count_lines(LOCAL_MIND_CACHE) result["local_lines"] = local_lines s3 = self._get_s3(REGION_MIND) if not s3: result["action"] = "no_s3" logger.warning("MIND pull skipped — no S3 client") return result try: # Check remote metadata meta = s3.head_object(Bucket=BUCKET_MIND, Key=MIND_MEMORY_KEY) remote_lines = int(meta.get("Metadata", {}).get("line_count", 0)) remote_size = meta["ContentLength"] result["remote_lines"] = remote_lines # Decide whether to download should_download = False if not LOCAL_MIND_CACHE.exists(): should_download = True logger.info("MIND: No local cache — downloading") elif remote_lines > 0 and remote_lines > local_lines: should_download = True elif remote_lines == 0 and remote_size > ( LOCAL_MIND_CACHE.stat().st_size if LOCAL_MIND_CACHE.exists() else 0 ): should_download = True if should_download: # Use byte-range tail read for large files (>2MB) TAIL_BYTES = 1_048_576 # 1MB — covers ~500 recent entries if remote_size > 2_097_152: try: resp = s3.get_object( Bucket=BUCKET_MIND, Key=MIND_MEMORY_KEY, Range=f"bytes=-{TAIL_BYTES}", ) raw = resp["Body"].read().decode("utf-8", errors="replace") # The first line is likely a partial line — discard it lines = raw.split("\n") if len(lines) > 1: lines = lines[1:] # drop partial first line valid_lines = [ln for ln in lines if ln.strip()] LOCAL_MIND_CACHE.parent.mkdir(parents=True, exist_ok=True) with open(LOCAL_MIND_CACHE, "w", encoding="utf-8") as f: f.write("\n".join(valid_lines) + "\n") new_lines = len(valid_lines) result["action"] = "downloaded" result["local_lines"] = new_lines result["mode"] = "tail_range" logger.info( "MIND: Tail-read %d entries (1MB range) from %dMB file", new_lines, remote_size // 1_048_576, ) return result except Exception as e: logger.warning("MIND: Byte-range tail failed, full download: %s", e) # Full download fallback (small files or range failure) s3.download_file(BUCKET_MIND, MIND_MEMORY_KEY, str(LOCAL_MIND_CACHE)) new_lines = self._count_lines(LOCAL_MIND_CACHE) result["action"] = "downloaded" result["local_lines"] = new_lines logger.info( "MIND: Downloaded %d patterns from s3://%s/%s", new_lines, BUCKET_MIND, MIND_MEMORY_KEY, ) else: result["action"] = "current" logger.info("MIND: Local cache is current (%d patterns)", local_lines) except ClientError as e: code = e.response["Error"]["Code"] if code in ("404", "NoSuchKey"): result["action"] = "no_remote" logger.info("MIND: No remote file yet") else: result["action"] = "error" result["error"] = str(e) logger.error("MIND pull error: %s", e) except Exception as e: result["action"] = "error" result["error"] = str(e) logger.error("MIND pull error: %s", e) return result def get_recent_consciousness(self, n: int = 50) -> List[Dict]: """ Read the last N entries from local MIND cache. Call pull_mind() first to ensure cache is fresh. """ if not LOCAL_MIND_CACHE.exists(): return [] entries = [] try: with open(LOCAL_MIND_CACHE, encoding="utf-8") as f: for line in f: if line.strip(): entries.append(json.loads(line)) return entries[-n:] except Exception as e: logger.error("Failed to read MIND cache: %s", e) return [] # ═══════════════════════════════════════════════════════════════ # FIX 2: Feedback Watermark # ═══════════════════════════════════════════════════════════════ def _load_watermark(self) -> Dict[str, Any]: """Load the feedback watermark (tracks what native engine already processed).""" if LOCAL_WATERMARK.exists(): try: with open(LOCAL_WATERMARK, encoding="utf-8") as f: return json.load(f) except Exception: pass return {"last_processed_timestamp": None, "last_processed_count": 0} def _save_watermark(self, watermark: Dict[str, Any]): """Persist watermark locally and to S3.""" with open(LOCAL_WATERMARK, "w", encoding="utf-8") as f: json.dump(watermark, f, indent=2) # Push watermark to BODY bucket so native engine can read it s3 = self._get_s3(REGION_BODY) if s3: try: s3.put_object( Bucket=BUCKET_BODY, Key=BODY_WATERMARK_KEY, Body=json.dumps(watermark, indent=2), ContentType="application/json", ) except Exception as e: logger.warning("Watermark S3 push failed: %s", e) def pull_watermark_from_s3(self) -> Dict[str, Any]: """Pull the latest watermark from BODY bucket (used by native engine).""" s3 = self._get_s3(REGION_BODY) if not s3: return self._load_watermark() try: resp = s3.get_object(Bucket=BUCKET_BODY, Key=BODY_WATERMARK_KEY) watermark = json.loads(resp["Body"].read()) # Also save locally with open(LOCAL_WATERMARK, "w", encoding="utf-8") as f: json.dump(watermark, f, indent=2) return watermark except Exception: return self._load_watermark() def get_unprocessed_feedback(self) -> Tuple[List[Dict], Dict]: """ Get only feedback entries that haven't been processed yet. Uses watermark to skip already-consumed entries. Returns: (unprocessed_entries, updated_watermark) """ watermark = self._load_watermark() last_ts = watermark.get("last_processed_timestamp") last_count = watermark.get("last_processed_count", 0) # Pull latest feedback from BODY bucket self._pull_feedback() if not LOCAL_FEEDBACK_CACHE.exists(): return [], watermark all_entries = [] try: with open(LOCAL_FEEDBACK_CACHE, encoding="utf-8") as f: for line in f: if line.strip(): all_entries.append(json.loads(line)) except Exception as e: logger.error("Failed to read feedback: %s", e) return [], watermark if not all_entries: return [], watermark # Filter to unprocessed if last_ts: unprocessed = [ e for e in all_entries if e.get("timestamp", "") > last_ts ] elif last_count > 0 and last_count < len(all_entries): unprocessed = all_entries[last_count:] else: unprocessed = all_entries if unprocessed: # Update watermark new_watermark = { "last_processed_timestamp": unprocessed[-1].get("timestamp", ""), "last_processed_count": len(all_entries), "updated_at": datetime.now(timezone.utc).isoformat(), "updated_by": "hf_space", } else: new_watermark = watermark return unprocessed, new_watermark def commit_watermark(self, watermark: Dict[str, Any]): """Commit watermark after successful processing.""" self._save_watermark(watermark) logger.info( "Watermark committed: last_ts=%s, count=%d", watermark.get("last_processed_timestamp", "none"), watermark.get("last_processed_count", 0), ) def _pull_feedback(self): """Download feedback file from BODY bucket.""" s3 = self._get_s3(REGION_BODY) if not s3: return try: s3.download_file(BUCKET_BODY, BODY_FEEDBACK_KEY, str(LOCAL_FEEDBACK_CACHE)) logger.info("Feedback pulled from s3://%s/%s", BUCKET_BODY, BODY_FEEDBACK_KEY) except Exception as e: logger.debug("Feedback pull: %s", e) def push_feedback(self, entry: Dict[str, Any]): """ Append a feedback entry and push to BODY bucket. Called by divergence engine after analysis. """ # Append locally with open(LOCAL_FEEDBACK_CACHE, "a", encoding="utf-8") as f: f.write(json.dumps(entry) + "\n") # Push full file to S3 s3 = self._get_s3(REGION_BODY) if s3: try: s3.upload_file( str(LOCAL_FEEDBACK_CACHE), BUCKET_BODY, BODY_FEEDBACK_KEY ) logger.info("Feedback pushed to s3://%s/%s", BUCKET_BODY, BODY_FEEDBACK_KEY) except Exception as e: logger.error("Feedback push failed: %s", e) # ═══════════════════════════════════════════════════════════════ # FIX 3: BODY → MIND Merge # ═══════════════════════════════════════════════════════════════ def merge_feedback_to_mind( self, feedback_entries: List[Dict], synthesis_summary: str = "", ) -> Optional[Dict]: """ Merge application feedback into evolution memory (MIND). Creates a proper evolution memory entry from feedback data, then appends it to the MIND bucket. This closes the loop: HF feedback → BODY bucket → native reads → MIND memory Now ALSO: HF feedback → directly into MIND as a merge record. Content-hash dedup (P0 fix, 2026-03-03): Before writing, we compute a SHA-256 hash of the feedback payload (problems + fault_lines + kaya_moments). If a merge with the same hash was already written recently, we skip the duplicate. This prevents the historical pattern of 4-6 identical FEEDBACK_MERGE records from re-triggered or retried merge cycles. Args: feedback_entries: List of feedback entries to summarize synthesis_summary: Optional pre-computed synthesis Returns: The merge entry that was written, or None. """ if not feedback_entries: return None # Build a summary of all feedback total_fault_lines = sum(e.get("fault_lines", 0) for e in feedback_entries) total_kaya = sum(e.get("kaya_moments", 0) for e in feedback_entries) problems = [e.get("problem", "")[:100] for e in feedback_entries if e.get("problem")] syntheses = [e.get("synthesis", "")[:200] for e in feedback_entries if e.get("synthesis")] # ── P0 CONTENT-HASH DEDUP ────────────────────────────────── # Hash the semantic payload (problems + counts) to detect # duplicate merges regardless of timestamp differences. content_fingerprint = json.dumps({ "problems": sorted(problems), "fault_lines": total_fault_lines, "kaya_moments": total_kaya, "entry_count": len(feedback_entries), }, sort_keys=True) content_hash = hashlib.sha256(content_fingerprint.encode()).hexdigest()[:16] # Check last 20 lines of MIND cache for duplicate content hash if LOCAL_MIND_CACHE.exists(): try: with open(LOCAL_MIND_CACHE, encoding="utf-8") as f: # Read only tail — efficient for large files lines = f.readlines() recent_lines = lines[-20:] if len(lines) > 20 else lines for line in recent_lines: if line.strip(): existing = json.loads(line) if (existing.get("type") == "FEEDBACK_MERGE" and existing.get("content_hash") == content_hash): logger.info( "FEEDBACK_MERGE dedup: hash %s already exists, skipping", content_hash, ) return None except Exception as e: logger.warning("Dedup check failed (non-fatal): %s", e) # ── END P0 DEDUP ─────────────────────────────────────────── merge_entry = { "timestamp": datetime.now(timezone.utc).isoformat(), "domain": 11, # Synthesis domain "domain_name": "Domain 11 (Synthesis) — BODY→MIND Merge", "type": "FEEDBACK_MERGE", "source": "hf_application_layer", "cycle": f"merge_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}", "content_hash": content_hash, # P0: enables dedup on re-read "insight": ( f"APPLICATION FEEDBACK MERGE: {len(feedback_entries)} entries integrated.\n" f"Fault lines discovered: {total_fault_lines}. " f"Kaya moments: {total_kaya}.\n" f"Problems addressed: {'; '.join(problems[:5])}\n" f"Synthesis highlights: {'; '.join(syntheses[:3])}\n" f"{synthesis_summary}" ), "feedback_count": len(feedback_entries), "fault_lines_total": total_fault_lines, "kaya_moments_total": total_kaya, "elpida_native": False, "merged_from": "BODY", "merged_to": "MIND", } # Append to local MIND cache with open(LOCAL_MIND_CACHE, "a", encoding="utf-8") as f: f.write(json.dumps(merge_entry) + "\n") # Safe append: download latest S3 → append → upload (no stale-copy stomp) success = self._safe_append_to_mind([merge_entry], "hf_feedback_merge") if success: logger.info( "BODY→MIND merge: %d feedback entries merged via safe append", len(feedback_entries), ) return merge_entry # ═══════════════════════════════════════════════════════════════ # FIX 4: Governance Voting # ═══════════════════════════════════════════════════════════════ def submit_governance_vote( self, proposal: str, domain_votes: Dict[int, Dict[str, Any]], final_verdict: str, context: Optional[Dict] = None, ) -> Dict[str, Any]: """ Persist a domain-weighted governance vote to BODY bucket. Each domain votes on a proposal with: - vote: "PROCEED" | "REVIEW" | "HALT" - reasoning: str - axiom_weight: float (how strongly their axiom is affected) Args: proposal: The action being voted on domain_votes: {domain_id: {"vote": ..., "reasoning": ..., "axiom_weight": ...}} final_verdict: Aggregated result context: Optional metadata Returns: The vote record. """ # Count votes vote_counts = {"PROCEED": 0, "REVIEW": 0, "HALT": 0} weighted_halt = 0.0 weighted_total = 0.0 for d_id, vote_info in domain_votes.items(): v = vote_info.get("vote", "REVIEW") w = vote_info.get("axiom_weight", 1.0) vote_counts[v] = vote_counts.get(v, 0) + 1 weighted_total += w if v == "HALT": weighted_halt += w vote_record = { "timestamp": datetime.now(timezone.utc).isoformat(), "proposal": proposal, "domain_votes": {str(k): v for k, v in domain_votes.items()}, "vote_counts": vote_counts, "weighted_halt_ratio": weighted_halt / weighted_total if weighted_total > 0 else 0, "final_verdict": final_verdict, "domains_voting": len(domain_votes), "context": context or {}, } # Persist locally with open(LOCAL_GOVERNANCE_LOG, "a", encoding="utf-8") as f: f.write(json.dumps(vote_record) + "\n") # Push to BODY bucket s3 = self._get_s3(REGION_BODY) if s3: try: s3.upload_file( str(LOCAL_GOVERNANCE_LOG), BUCKET_BODY, BODY_GOVERNANCE_KEY, ) logger.info( "Governance vote persisted: %s → %s (%d domains voted)", proposal[:60], final_verdict, len(domain_votes), ) except Exception as e: logger.error("Governance vote S3 push failed: %s", e) return vote_record def get_governance_history(self, limit: int = 20) -> List[Dict]: """Read governance voting history.""" # Try S3 first s3 = self._get_s3(REGION_BODY) if s3: try: s3.download_file( BUCKET_BODY, BODY_GOVERNANCE_KEY, str(LOCAL_GOVERNANCE_LOG) ) except Exception: pass if not LOCAL_GOVERNANCE_LOG.exists(): return [] entries = [] try: with open(LOCAL_GOVERNANCE_LOG, encoding="utf-8") as f: for line in f: if line.strip(): entries.append(json.loads(line)) except Exception: pass return entries[-limit:] # ═══════════════════════════════════════════════════════════════ # FIX 5: Heartbeat Protocol # ═══════════════════════════════════════════════════════════════ def emit_heartbeat(self, component: str = "hf_space") -> Dict[str, Any]: """ Write a heartbeat to BODY bucket. Both HF Space and ECS/native engine emit heartbeats so each can detect whether the other is alive. Args: component: "hf_space" or "native_engine" Returns: The heartbeat payload. """ heartbeat = { "component": component, "timestamp": datetime.now(timezone.utc).isoformat(), "status": "alive", "uptime_check": time.monotonic(), } # Add component-specific info if component == "hf_space": heartbeat["mind_cache_lines"] = self._count_lines(LOCAL_MIND_CACHE) heartbeat["feedback_cache_lines"] = self._count_lines(LOCAL_FEEDBACK_CACHE) heartbeat["governance_votes"] = self._count_lines(LOCAL_GOVERNANCE_LOG) heartbeat["watermark"] = self._load_watermark() # Write locally with open(LOCAL_HEARTBEAT, "w", encoding="utf-8") as f: json.dump(heartbeat, f, indent=2) # Push to BODY bucket s3_key = f"heartbeat/{component}.json" s3 = self._get_s3(REGION_BODY) if s3: try: s3.put_object( Bucket=BUCKET_BODY, Key=s3_key, Body=json.dumps(heartbeat, indent=2), ContentType="application/json", ) logger.info("Heartbeat emitted: %s → s3://%s/%s", component, BUCKET_BODY, s3_key) except Exception as e: logger.warning("Heartbeat S3 push failed: %s", e) return heartbeat def check_heartbeat(self, component: str = "native_engine") -> Optional[Dict]: """ Read the heartbeat of another component from BODY bucket. Args: component: Which component's heartbeat to check Returns: Heartbeat dict with age_seconds, or None if no heartbeat found. """ s3 = self._get_s3(REGION_BODY) if not s3: return None try: resp = s3.get_object( Bucket=BUCKET_BODY, Key=f"heartbeat/{component}.json" ) heartbeat = json.loads(resp["Body"].read()) # Calculate age ts = heartbeat.get("timestamp", "") if ts: then = datetime.fromisoformat(ts.replace("Z", "+00:00")) now = datetime.now(timezone.utc) heartbeat["age_seconds"] = (now - then).total_seconds() heartbeat["alive"] = heartbeat["age_seconds"] < 7 * 3600 # 7h tolerance (6h cycle + margin) return heartbeat except Exception: return None # ═══════════════════════════════════════════════════════════════ # FIX 6: Federation Bridge (MIND↔BODY Governance) # ═══════════════════════════════════════════════════════════════ def pull_mind_heartbeat(self) -> Optional[Dict]: """ Read MIND's federation heartbeat from S3. Returns the FederationHeartbeat dict which includes: - mind_cycle, coherence, current_rhythm, ark_mood - canonical_count, recursion_warning, friction_boost - kernel_version, federation_version Caches locally to reduce S3 calls. Returns None if unavailable. """ s3 = self._get_s3(REGION_BODY) if not s3: # Try local cache if LOCAL_FED_HEARTBEAT.exists(): try: return json.loads(LOCAL_FED_HEARTBEAT.read_text()) except Exception: pass return None try: resp = s3.get_object(Bucket=BUCKET_BODY, Key=FED_MIND_HEARTBEAT_KEY) heartbeat = json.loads(resp["Body"].read()) # Cache locally LOCAL_FED_HEARTBEAT.parent.mkdir(parents=True, exist_ok=True) with open(LOCAL_FED_HEARTBEAT, "w", encoding="utf-8") as f: json.dump(heartbeat, f, indent=2) # Calculate age ts = heartbeat.get("timestamp") or heartbeat.get("mind_epoch", "") if ts: try: then = datetime.fromisoformat(ts.replace("Z", "+00:00")) now = datetime.now(timezone.utc) heartbeat["age_seconds"] = (now - then).total_seconds() heartbeat["alive"] = heartbeat["age_seconds"] < 7 * 3600 except Exception: pass logger.info( "Federation heartbeat pulled: cycle=%s rhythm=%s coherence=%s", heartbeat.get("mind_cycle"), heartbeat.get("current_rhythm"), heartbeat.get("coherence"), ) return heartbeat except Exception as e: logger.debug("Federation heartbeat pull: %s", e) # Fallback to local cache if LOCAL_FED_HEARTBEAT.exists(): try: return json.loads(LOCAL_FED_HEARTBEAT.read_text()) except Exception: pass return None def pull_mind_curation(self, limit: int = 100) -> List[Dict]: """ Read MIND's curation metadata from S3 (JSONL, append-only). Each entry is a CurationMetadata dict with: - pattern_hash, tier (CANONICAL/PENDING/STANDARD/EPHEMERAL) - ttl_cycles, cross_domain_count, generativity_score - source_domain, recursion_detected, friction_boost_active Returns the most recent `limit` entries. """ s3 = self._get_s3(REGION_BODY) entries: List[Dict] = [] if s3: try: resp = s3.get_object(Bucket=BUCKET_BODY, Key=FED_MIND_CURATION_KEY) raw = resp["Body"].read().decode("utf-8") # Cache locally LOCAL_FED_CURATION.parent.mkdir(parents=True, exist_ok=True) with open(LOCAL_FED_CURATION, "w", encoding="utf-8") as f: f.write(raw) for line in raw.strip().split("\n"): line = line.strip() if line: try: entries.append(json.loads(line)) except json.JSONDecodeError: pass logger.info("Federation curation pulled: %d entries", len(entries)) except Exception as e: logger.debug("Federation curation pull: %s", e) # Fallback to local cache if not entries and LOCAL_FED_CURATION.exists(): try: for line in LOCAL_FED_CURATION.read_text().strip().split("\n"): line = line.strip() if line: try: entries.append(json.loads(line)) except json.JSONDecodeError: pass except Exception: pass return entries[-limit:] def push_body_decision(self, exchange: Dict[str, Any]) -> bool: """ Append a Parliament decision (GovernanceExchange) to the federation body_decisions channel. The MIND side reads this via FederationBridge.pull_body_decisions(). Args: exchange: GovernanceExchange dict with at minimum: exchange_id, source="BODY", verdict, pattern_hash, parliament_score, parliament_approval, timestamp Returns: True if pushed to S3, False if only saved locally. """ # Append locally LOCAL_FED_DECISIONS.parent.mkdir(parents=True, exist_ok=True) with open(LOCAL_FED_DECISIONS, "a", encoding="utf-8") as f: f.write(json.dumps(exchange, ensure_ascii=False) + "\n") # Push to S3 (read-modify-write for JSONL append) s3 = self._get_s3(REGION_BODY) if s3: try: line = json.dumps(exchange, ensure_ascii=False) + "\n" self._append_jsonl_s3( s3=s3, bucket=BUCKET_BODY, key=FED_BODY_DECISIONS_KEY, line=line, ) logger.info( "Federation body decision pushed: verdict=%s hash=%s", exchange.get("verdict"), exchange.get("pattern_hash", "?")[:12], ) return True except Exception as e: logger.error("Federation body decision push failed: %s", e) return False return False def push_d16_execution(self, entry: Dict[str, Any]) -> bool: """ Append a D16 execution attestation entry to federation/d16_executions.jsonl. This preserves a dedicated durable audit stream for agency payloads, separate from generic body_decisions exchanges. """ # Append locally first (survives transient S3 failures) LOCAL_FED_D16_EXECUTIONS.parent.mkdir(parents=True, exist_ok=True) with open(LOCAL_FED_D16_EXECUTIONS, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") s3 = self._get_s3(REGION_BODY) if s3: try: line = json.dumps(entry, ensure_ascii=False) + "\n" self._append_jsonl_s3( s3=s3, bucket=BUCKET_BODY, key=FED_D16_EXECUTIONS_KEY, line=line, ) logger.info( "D16 execution pushed: cycle=%s hash=%s", entry.get("body_cycle", "?"), str(entry.get("content_hash", "?"))[:12], ) return True except Exception as e: logger.error("D16 execution push failed: %s", e) return False return False def _append_jsonl_s3(self, s3, bucket: str, key: str, line: str): """ Append one JSONL line to an S3 object. Strategy: - Missing object: create with one line. - Small object (<=8MB): simple read-modify-write. - Large object: multipart server-side append (copy existing object as part 1 + upload newline/line as part 2) to avoid full downloads. """ encoded_line = line.encode("utf-8") try: head = s3.head_object(Bucket=bucket, Key=key) size = int(head.get("ContentLength", 0)) except ClientError as e: code = str(e.response.get("Error", {}).get("Code", "")) if code in {"404", "NoSuchKey", "NotFound"}: s3.put_object( Bucket=bucket, Key=key, Body=encoded_line, ContentType="application/jsonl", ) return raise # Small files are cheaper to rewrite directly. if size <= 8 * 1024 * 1024: existing = "" if size > 0: resp = s3.get_object(Bucket=bucket, Key=key) existing = resp["Body"].read().decode("utf-8", errors="replace") updated = existing.rstrip("\n") if updated: updated += "\n" updated += line s3.put_object( Bucket=bucket, Key=key, Body=updated.encode("utf-8"), ContentType="application/jsonl", ) return # Large files: append via multipart copy to avoid full download. upload = s3.create_multipart_upload( Bucket=bucket, Key=key, ContentType="application/jsonl", ) upload_id = upload["UploadId"] try: parts: List[Dict[str, Any]] = [] # Part 1: copy existing object server-side. copy_resp = s3.upload_part_copy( Bucket=bucket, Key=key, PartNumber=1, UploadId=upload_id, CopySource={"Bucket": bucket, "Key": key}, CopySourceRange=f"bytes=0-{size - 1}", ) parts.append( { "ETag": copy_resp["CopyPartResult"]["ETag"], "PartNumber": 1, } ) # Ensure a single newline separator between old content and new line. tail = s3.get_object(Bucket=bucket, Key=key, Range=f"bytes={size - 1}-{size - 1}") tail_byte = tail["Body"].read(1) body = encoded_line if tail_byte == b"\n" else b"\n" + encoded_line # Part 2: upload the appended line. up2 = s3.upload_part( Bucket=bucket, Key=key, PartNumber=2, UploadId=upload_id, Body=body, ) parts.append({"ETag": up2["ETag"], "PartNumber": 2}) s3.complete_multipart_upload( Bucket=bucket, Key=key, UploadId=upload_id, MultipartUpload={"Parts": parts}, ) except Exception: try: s3.abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id) except Exception: pass raise # ═══════════════════════════════════════════════════════════════ # D14 Persistence — Constitutional Living Axioms # ═══════════════════════════════════════════════════════════════ def push_living_axioms(self, path: Path) -> bool: """ D14 — Upload living_axioms.jsonl to S3 as a constitutional snapshot. Called after every Parliament ratification so the constitutional memory survives HF Space container restarts. S3 key: elpida-body-evolution / federation/living_axioms.jsonl """ s3 = self._get_s3(REGION_BODY) if not s3: return False try: content = Path(path).read_text(encoding="utf-8") s3.put_object( Bucket=BUCKET_BODY, Key=FED_LIVING_AXIOMS_KEY, Body=content.encode("utf-8"), ContentType="application/jsonl", ) LOCAL_FED_LIVING_AXIOMS.parent.mkdir(parents=True, exist_ok=True) LOCAL_FED_LIVING_AXIOMS.write_text(content, encoding="utf-8") logger.info("D14: living_axioms.jsonl pushed → s3://%s/%s", BUCKET_BODY, FED_LIVING_AXIOMS_KEY) return True except Exception as e: logger.warning("D14 push_living_axioms failed: %s", e) return False def pull_living_axioms(self) -> List[Dict]: """ D14 — Download living_axioms.jsonl from S3. Returns list of ratified axiom dicts (living_axioms format). Returns empty list if not found or on error. """ # Try local cache first (avoids S3 round-trip on warm restarts) if LOCAL_FED_LIVING_AXIOMS.exists(): try: return [ json.loads(l) for l in LOCAL_FED_LIVING_AXIOMS.read_text().splitlines() if l.strip() ] except Exception: pass s3 = self._get_s3(REGION_BODY) if not s3: return [] try: resp = s3.get_object(Bucket=BUCKET_BODY, Key=FED_LIVING_AXIOMS_KEY) content = resp["Body"].read().decode("utf-8") LOCAL_FED_LIVING_AXIOMS.parent.mkdir(parents=True, exist_ok=True) LOCAL_FED_LIVING_AXIOMS.write_text(content, encoding="utf-8") return [json.loads(l) for l in content.splitlines() if l.strip()] except s3.exceptions.NoSuchKey: return [] except Exception as e: logger.warning("D14 pull_living_axioms failed: %s", e) return [] def pull_body_decisions_constitutional(self, limit: int = 500) -> List[Dict]: """ Fallback D14 restore: scan body_decisions.jsonl for BODY_CONSTITUTIONAL records and return them as constitutional axiom candidates. Used when federation/living_axioms.jsonl does not yet exist. """ try: content: str = "" if LOCAL_FED_DECISIONS.exists(): content = LOCAL_FED_DECISIONS.read_text(encoding="utf-8") else: s3 = self._get_s3(REGION_BODY) if s3: resp = s3.get_object(Bucket=BUCKET_BODY, Key=FED_BODY_DECISIONS_KEY) content = resp["Body"].read().decode("utf-8") records = [] for line in content.splitlines(): line = line.strip() if line: try: rec = json.loads(line) if rec.get("type") == "BODY_CONSTITUTIONAL": records.append(rec) except json.JSONDecodeError: pass return records[-limit:] except Exception as e: logger.warning("pull_body_decisions_constitutional failed: %s", e) return [] def get_curation_for_hash(self, pattern_hash: str) -> Optional[Dict]: """ Look up a specific pattern's CurationMetadata by its hash. Reads from local cache first, then S3 if needed. Returns the most recent curation entry for that hash, or None. """ # Try local cache first entries = self.pull_mind_curation(limit=500) for entry in reversed(entries): if entry.get("pattern_hash") == pattern_hash: return entry return None # ═══════════════════════════════════════════════════════════════ # WORLD Bucket Operations (D15 + Kaya events) # ═══════════════════════════════════════════════════════════════ def list_world_kaya_events(self, since_key: str = "") -> List[Dict]: """ G4 — List CROSS_LAYER_KAYA events from the WORLD bucket. Returns events newer than ``since_key`` (S3 object key, lexicographic order is chronological because keys are timestamp-named). Each returned dict includes the full event payload plus ``_s3_key``. Returns [] on any error so callers can be unconditionally iterated. """ s3 = self._get_s3(REGION_WORLD) if not s3: return [] try: paginator = s3.get_paginator("list_objects_v2") pages = paginator.paginate( Bucket=BUCKET_WORLD, Prefix="kaya/", PaginationConfig={"MaxItems": 200}, ) keys = [] for page in pages: for obj in page.get("Contents", []): key = obj["Key"] if key.endswith(".json") and key > since_key: keys.append(key) keys.sort() # chronological (ISO timestamp in name) results = [] for key in keys: try: resp = s3.get_object(Bucket=BUCKET_WORLD, Key=key) event = json.loads(resp["Body"].read().decode("utf-8")) event["_s3_key"] = key results.append(event) except Exception as e: logger.warning("Kaya event download failed [%s]: %s", key, e) return results except Exception as e: logger.warning("list_world_kaya_events failed: %s", e) return [] # ═══════════════════════════════════════════════════════════════ # Human Voice Bridge — curated Vercel conversations → Parliament # ═══════════════════════════════════════════════════════════════ def push_human_conversation_for_vote(self, entry: Dict) -> bool: """ Upload a curated Vercel conversation entry to BODY S3 so the HumanVoiceAgent can propose it for Parliament ratification. Entry shape (matches public_memory.jsonl / curated_log.jsonl): {type, timestamp, user_message_preview, score, reasons, ...} """ s3 = self._get_s3() if not s3: return False try: # Read existing entries to avoid duplicates existing: List[Dict] = [] try: resp = s3.get_object(Bucket=BUCKET_BODY, Key=FED_HUMAN_VOTES_KEY) for line in resp["Body"].read().decode().splitlines(): if line.strip(): existing.append(json.loads(line)) except s3.exceptions.NoSuchKey: pass except Exception: pass known_hashes = {e.get("_hash") for e in existing} import hashlib h = hashlib.md5( (entry.get("timestamp", "") + entry.get("user_message_preview", "")).encode() ).hexdigest()[:12] if h in known_hashes: return True # already queued entry["_hash"] = h entry["_queued_at"] = __import__("datetime").datetime.utcnow().isoformat() + "Z" existing.append(entry) payload = "\n".join(json.dumps(e) for e in existing) + "\n" s3.put_object( Bucket=BUCKET_BODY, Key=FED_HUMAN_VOTES_KEY, Body=payload.encode(), ContentType="application/x-ndjson", ) logger.info("Human conversation queued for Parliament vote: %s", h) return True except Exception as e: logger.warning("push_human_conversation_for_vote failed: %s", e) return False def list_pending_human_votes(self) -> List[Dict]: """ Fetch all curated human conversations awaiting Parliament ratification. Returns [] on any error. """ # Try S3 first s3 = self._get_s3() if s3: try: resp = s3.get_object(Bucket=BUCKET_BODY, Key=FED_HUMAN_VOTES_KEY) lines = resp["Body"].read().decode().splitlines() entries = [json.loads(l) for l in lines if l.strip()] # Cache locally LOCAL_FED_HUMAN_VOTES.parent.mkdir(parents=True, exist_ok=True) LOCAL_FED_HUMAN_VOTES.write_text("\n".join(json.dumps(e) for e in entries) + "\n") return entries except Exception: pass # Fallback: local cache if LOCAL_FED_HUMAN_VOTES.exists(): try: return [json.loads(l) for l in LOCAL_FED_HUMAN_VOTES.read_text().splitlines() if l.strip()] except Exception: pass return [] def mark_human_vote_accepted(self, entry_hash: str) -> bool: """ Move a ratified entry from pending → accepted on S3. Called by parliament_cycle_engine after ratification. """ s3 = self._get_s3() if not s3: return False try: pending = self.list_pending_human_votes() accepted_entry = next((e for e in pending if e.get("_hash") == entry_hash), None) if not accepted_entry: return False # Remove from pending remaining = [e for e in pending if e.get("_hash") != entry_hash] s3.put_object( Bucket=BUCKET_BODY, Key=FED_HUMAN_VOTES_KEY, Body=("\n".join(json.dumps(e) for e in remaining) + "\n").encode(), ContentType="application/x-ndjson", ) # Append to accepted try: resp = s3.get_object(Bucket=BUCKET_BODY, Key=FED_HUMAN_ACCEPTED_KEY) acc_lines = resp["Body"].read().decode() except Exception: acc_lines = "" accepted_entry["_accepted_at"] = __import__("datetime").datetime.utcnow().isoformat() + "Z" s3.put_object( Bucket=BUCKET_BODY, Key=FED_HUMAN_ACCEPTED_KEY, Body=(acc_lines + json.dumps(accepted_entry) + "\n").encode(), ContentType="application/x-ndjson", ) logger.info("Human voice accepted by Parliament: %s", entry_hash) return True except Exception as e: logger.warning("mark_human_vote_accepted failed: %s", e) return False def pull_d15_broadcasts(self, limit: int = 10) -> List[Dict]: """Pull recent D15 broadcasts from WORLD bucket.""" s3 = self._get_s3(REGION_WORLD) if not s3: return [] broadcasts = [] for subdir in ["synthesis", "proposals", "patterns", "dialogues", "d15"]: try: resp = s3.list_objects_v2( Bucket=BUCKET_WORLD, Prefix=f"{subdir}/", MaxKeys=limit ) for obj in resp.get("Contents", []): if obj["Key"].endswith(".json"): try: data = s3.get_object(Bucket=BUCKET_WORLD, Key=obj["Key"]) payload = json.loads(data["Body"].read()) payload["_s3_key"] = obj["Key"] broadcasts.append(payload) except Exception: pass except Exception: pass broadcasts.sort(key=lambda x: x.get("timestamp", ""), reverse=True) return broadcasts[:limit] def count_d15_broadcasts(self) -> int: """Count all D15 broadcast objects in WORLD bucket. Uses paginated listing over `d15/broadcast_` so callers can restore an accurate total rather than a capped recent-window estimate. """ s3 = self._get_s3(REGION_WORLD) if not s3: return 0 total = 0 token = None try: while True: kwargs = { "Bucket": BUCKET_WORLD, "Prefix": "d15/broadcast_", "MaxKeys": 1000, } if token: kwargs["ContinuationToken"] = token resp = s3.list_objects_v2(**kwargs) total += len(resp.get("Contents", [])) if not resp.get("IsTruncated"): break token = resp.get("NextContinuationToken") except Exception as e: logger.warning("count_d15_broadcasts failed: %s", e) return 0 return total def write_d15_broadcast( self, broadcast_content: Dict[str, Any], governance_metadata: Dict[str, Any], ) -> Optional[str]: """ Write a D15 Constitutional Broadcast to WORLD bucket. This is the moment when internal parliament consensus becomes external world action. Every broadcast carries: - Full D0↔D13 grounding - Governance approval metadata (9-node parliament votes) - D14 persistence signature - Axioms in tension + synthesis reasoning Args: broadcast_content: The D15 emergence payload governance_metadata: Parliament voting result from governance_client Returns: S3 key if successful, None otherwise. """ ts = datetime.now(timezone.utc) ts_str = ts.isoformat() ts_safe = ts_str.replace(":", "-").replace("+", "_") broadcast_id = hashlib.md5( f"{ts_str}:{json.dumps(broadcast_content.get('d15_output', ''))[:200]}".encode() ).hexdigest()[:12] broadcast_entry = { "type": "D15_CONSTITUTIONAL_BROADCAST", "broadcast_id": broadcast_id, "timestamp": ts_str, # Core content "d15_output": broadcast_content.get("d15_output", ""), "axioms_in_tension": broadcast_content.get("axioms_in_tension", []), "contributing_domains": broadcast_content.get("contributing_domains", []), "pipeline_duration_s": broadcast_content.get("pipeline_duration_s", 0), # Governance approval "governance": { "verdict": governance_metadata.get("governance", "UNKNOWN"), "parliament_votes": governance_metadata.get("parliament", {}).get("votes", {}), "approval_rate": governance_metadata.get("parliament", {}).get("approval_rate", 0), "veto_exercised": governance_metadata.get("parliament", {}).get("veto_exercised", False), "veto_nodes": governance_metadata.get("parliament", {}).get("veto_nodes", []), "tensions": governance_metadata.get("parliament", {}).get("tensions", []), "reasoning": governance_metadata.get("reasoning", ""), }, # D14 persistence signature "d14_signature": self._get_d14_signature(), # Pipeline stages summary "pipeline_stages": { k: { "success": v.get("success", False), "domain": v.get("domain"), "summary": v.get("summary", "")[:200], } for k, v in broadcast_content.get("pipeline_stages", {}).items() }, } # Thread A metadata lane — additive/optional pass-through. Readers that # don't know about convergence_theme simply ignore it; readers that do # (e.g. build_d15_index.py's theme_distribution aggregate) can group # broadcasts by convergence pattern without taxonomy inflation. theme = broadcast_content.get("convergence_theme") if theme: broadcast_entry["convergence_theme"] = theme # Write to WORLD bucket s3_key = f"d15/broadcast_{ts_safe}_{broadcast_id}.json" s3 = self._get_s3(REGION_WORLD) if s3: try: s3.put_object( Bucket=BUCKET_WORLD, Key=s3_key, Body=json.dumps(broadcast_entry, indent=2, ensure_ascii=False), ContentType="application/json", ) logger.info( "D15 BROADCAST → s3://%s/%s (id=%s, governance=%s)", BUCKET_WORLD, s3_key, broadcast_id, governance_metadata.get("governance", "?"), ) # Also append to broadcasts.jsonl for easy streaming try: jsonl_key = "d15/broadcasts.jsonl" # Read existing try: existing = s3.get_object(Bucket=BUCKET_WORLD, Key=jsonl_key) existing_data = existing["Body"].read().decode("utf-8") except Exception: existing_data = "" # Append updated = existing_data + json.dumps(broadcast_entry, ensure_ascii=False) + "\n" s3.put_object( Bucket=BUCKET_WORLD, Key=jsonl_key, Body=updated.encode("utf-8"), ContentType="application/jsonl", ) except Exception as e: logger.warning("JSONL append failed (non-critical): %s", e) # Merge broadcast summary back to MIND self._merge_d15_to_mind(broadcast_entry) # Regenerate the public index.html in WORLD bucket try: self._regenerate_world_index() except Exception as _he: logger.warning("index.html regen failed (non-critical): %s", _he) return s3_key except Exception as e: logger.error("D15 WORLD bucket write failed: %s", e) # Local fallback local_dir = LOCAL_DIR / "cache" / "d15_broadcasts" local_dir.mkdir(parents=True, exist_ok=True) local_path = local_dir / f"broadcast_{ts_safe}_{broadcast_id}.json" with open(local_path, "w", encoding="utf-8") as f: json.dump(broadcast_entry, f, indent=2, ensure_ascii=False) logger.warning("D15 broadcast saved locally: %s", local_path) return None # ═══════════════════════════════════════════════════════════════ # WORLD index.html — live public page regeneration # ═══════════════════════════════════════════════════════════════ def _collect_all_broadcasts(self, limit: int = 20) -> List[Dict]: """ Collect and normalize broadcasts from both MIND and BODY layers. MIND → synthesis/broadcast_*.json (COLLECTIVE_SYNTHESIS) BODY → d15/broadcast_*.json (D15_CONSTITUTIONAL_BROADCAST) Returns list sorted newest-first, len ≤ limit. """ s3 = self._get_s3(REGION_WORLD) if not s3: return [] all_items: List[Dict] = [] for prefix, layer in [("synthesis/", "MIND"), ("d15/", "BODY")]: try: # List ALL keys first (cheap), then sort by key descending # (ISO-dated keys sort chronologically) and read only the # newest ones. The old code used MaxItems which returned # the *oldest* N entries because S3 lists lexicographically. keys: List[str] = [] paginator = s3.get_paginator("list_objects_v2") for page in paginator.paginate( Bucket=BUCKET_WORLD, Prefix=prefix, ): for obj in page.get("Contents", []): key = obj["Key"] if key.endswith(".json") and not key.endswith(".jsonl"): keys.append(key) # Sort descending = newest first, take only what we need keys.sort(reverse=True) for key in keys[: limit * 2]: try: raw = s3.get_object(Bucket=BUCKET_WORLD, Key=key) entry = json.loads(raw["Body"].read().decode("utf-8")) entry["_s3_key"] = key entry["_layer"] = layer all_items.append(entry) except Exception: pass except Exception as e: logger.warning("Broadcast collect [%s] failed: %s", prefix, e) all_items.sort(key=lambda x: x.get("timestamp", ""), reverse=True) return all_items[:limit] def _regenerate_world_index(self) -> bool: """ Rebuild index.html from all MIND + BODY broadcasts and push to s3://elpida-external-interfaces/index.html. Called automatically after every D15 constitutional broadcast. Preserves the existing page style (dark monospace, purple accent) and adds a green accent stripe for BODY Parliament broadcasts. """ s3 = self._get_s3(REGION_WORLD) if not s3: return False broadcasts = self._collect_all_broadcasts(limit=200) mind_count = sum(1 for b in broadcasts if b.get("_layer") == "MIND") body_count = sum(1 for b in broadcasts if b.get("_layer") == "BODY") total = len(broadcasts) cards: List[str] = [] for b in broadcasts: btype = b.get("type", "UNKNOWN") s3_key = b.get("_s3_key", "") layer = b.get("_layer", "BODY").lower() s3_url = ( f"https://{BUCKET_WORLD}.s3.{REGION_WORLD}.amazonaws.com/{s3_key}" ) ts_raw = b.get("timestamp", "")[:19].replace("T", " ") if btype == "COLLECTIVE_SYNTHESIS": cycle = b.get("cycle", "?") rhythm = b.get("rhythm", "?") body_text = b.get("current_insight_summary", "") coherence = b.get("coherence", 0) criteria = b.get("criteria_met", 0) type_label = "COLLECTIVE_SYNTHESIS" meta_line = f"Cycle {cycle} • {ts_raw} UTC • Rhythm: {rhythm}" crit_line = ( f'{criteria}/5' f" criteria met • Coherence: {coherence}" ) card_class = "broadcast" else: # D15_CONSTITUTIONAL_BROADCAST axioms = ", ".join(b.get("axioms_in_tension", [])) body_text = b.get("d15_output", "") gov = b.get("governance", {}) approval = gov.get("approval_rate", 0) verdict = gov.get("verdict", "?") bid = b.get("broadcast_id", "") type_label = "D15 PARLIAMENT BROADCAST" meta_line = f"{ts_raw} UTC • ID: {bid}" crit_line = ( f'{verdict}' f" • Approval: {approval:.0%} • Axiom: {axioms}" ) card_class = "broadcast broadcast-parliament" body_safe = body_text.replace("<", "<").replace(">", ">") cards.append( f'
No broadcasts yet.
" ) html = f"""