Elpida Deploy Bot commited on
Commit
f7893ed
Β·
1 Parent(s): 187848a

deploy: fefbb688 feat(step3): Gnosis sink + bottom-lane S3 + Brave grounding (#190)

Browse files
elpidaapp/chat_engine.py CHANGED
@@ -263,13 +263,27 @@ def needs_grounding(text: str) -> bool:
263
  return any(kw in text_lower for kw in _GROUNDING_KEYWORDS)
264
 
265
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
266
  def fetch_live_context(
267
  query: str,
268
  llm,
269
  prefer_grok: bool = False,
270
  ) -> Tuple[str, str]:
271
  """
272
- Fetch live grounding β€” DuckDuckGo/Wikipedia first (free), then
273
  Perplexity/Grok LLM fallback (paid) only if free search returns empty.
274
  Returns (context_text, provider_used).
275
  One call per turn max β€” budget aware.
@@ -277,11 +291,12 @@ def fetch_live_context(
277
  # ── Primary: free web search via domain_grounding ──────────────
278
  try:
279
  from elpidaapp.domain_grounding import ground_query
280
- ddg_result = ground_query(query, max_results=3)
281
- if ddg_result and len(ddg_result.strip()) > 30:
282
- return ddg_result.strip(), "duckduckgo"
 
283
  except Exception as e:
284
- logger.debug("DDG grounding failed, falling back to LLM: %s", e)
285
 
286
  # ── Fallback: LLM-based grounding (Perplexity/Grok) ───────────
287
  grounding_prompt = (
 
263
  return any(kw in text_lower for kw in _GROUNDING_KEYWORDS)
264
 
265
 
266
+ def _strip_grounding_provider_footer(result: str) -> Tuple[str, str]:
267
+ """Remove the authoritative provider footer from ground_query output."""
268
+ provider = "duckduckgo"
269
+ cleaned_result = result.rstrip()
270
+ last_newline = cleaned_result.rfind("\n")
271
+ footer = cleaned_result[last_newline + 1:].strip()
272
+ prefix = "<!-- provider: "
273
+ suffix = " -->"
274
+ if footer.startswith(prefix) and footer.endswith(suffix):
275
+ provider = footer[len(prefix):-len(suffix)].strip() or provider
276
+ cleaned_result = "" if last_newline == -1 else cleaned_result[:last_newline]
277
+ return cleaned_result, provider
278
+
279
+
280
  def fetch_live_context(
281
  query: str,
282
  llm,
283
  prefer_grok: bool = False,
284
  ) -> Tuple[str, str]:
285
  """
286
+ Fetch live grounding β€” Brave/DuckDuckGo/Wikipedia first (free), then
287
  Perplexity/Grok LLM fallback (paid) only if free search returns empty.
288
  Returns (context_text, provider_used).
289
  One call per turn max β€” budget aware.
 
291
  # ── Primary: free web search via domain_grounding ──────────────
292
  try:
293
  from elpidaapp.domain_grounding import ground_query
294
+ grounding_result = ground_query(query, max_results=3)
295
+ if grounding_result and len(grounding_result.strip()) > 30:
296
+ cleaned_result, provider = _strip_grounding_provider_footer(grounding_result)
297
+ return cleaned_result.strip(), provider
298
  except Exception as e:
299
+ logger.debug("Free grounding failed, falling back to LLM: %s", e)
300
 
301
  # ── Fallback: LLM-based grounding (Perplexity/Grok) ───────────
302
  grounding_prompt = (
elpidaapp/contradiction_log.py CHANGED
@@ -33,10 +33,11 @@ Location: cache/contradiction_log.jsonl
33
 
34
  import json
35
  import logging
 
36
  from collections import deque
37
  from datetime import datetime, timezone
38
  from pathlib import Path
39
- from typing import Any, Dict, List, Optional
40
 
41
  logger = logging.getLogger("elpida.contradiction_log")
42
 
@@ -45,6 +46,62 @@ CONTRADICTION_LOG = CACHE_DIR / "contradiction_log.jsonl"
45
 
46
  # In-memory sliding window for recent query
47
  MAX_IN_MEMORY = 200
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
 
49
 
50
  class ContradictionLog:
@@ -66,6 +123,49 @@ class ContradictionLog:
66
  self._entries: deque = deque(maxlen=MAX_IN_MEMORY)
67
  self._total: int = 0
68
  self._type_counts: Dict[str, int] = {}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
 
70
  def record(
71
  self,
@@ -111,6 +211,7 @@ class ContradictionLog:
111
  f.write(json.dumps(entry, ensure_ascii=False) + "\n")
112
  except Exception as e:
113
  logger.debug("Contradiction persist failed: %s", e)
 
114
 
115
  return entry
116
 
@@ -180,3 +281,78 @@ class ContradictionLog:
180
  "type_counts": dict(self._type_counts),
181
  "recent_count": len(self._entries),
182
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
34
  import json
35
  import logging
36
+ import os
37
  from collections import deque
38
  from datetime import datetime, timezone
39
  from pathlib import Path
40
+ from typing import Any, Dict, Iterable, List, Optional
41
 
42
  logger = logging.getLogger("elpida.contradiction_log")
43
 
 
46
 
47
  # In-memory sliding window for recent query
48
  MAX_IN_MEMORY = 200
49
+ S3_BUCKET = os.environ.get("CONTRADICTION_S3_BUCKET", "elpida-body-evolution")
50
+ S3_REGION = os.environ.get("CONTRADICTION_S3_REGION", "eu-north-1")
51
+ S3_KEY = os.environ.get("CONTRADICTION_S3_KEY", "federation/contradiction_log.jsonl")
52
+ _MISSING_S3_KEY_CODES = {"NoSuchKey", "404", "NotFound"}
53
+
54
+
55
+ def _is_missing_s3_key_error(s3, exc: Exception) -> bool:
56
+ no_such_key = getattr(getattr(s3, "exceptions", None), "NoSuchKey", None)
57
+ if isinstance(no_such_key, type) and isinstance(exc, no_such_key):
58
+ return True
59
+ response = getattr(exc, "response", None)
60
+ if isinstance(response, dict):
61
+ code = response.get("Error", {}).get("Code")
62
+ return str(code) in _MISSING_S3_KEY_CODES
63
+ return False
64
+
65
+
66
+ def _has_s3_credentials() -> bool:
67
+ return bool(
68
+ os.environ.get("AWS_ACCESS_KEY_ID")
69
+ and os.environ.get("AWS_SECRET_ACCESS_KEY")
70
+ )
71
+
72
+
73
+ def _merge_jsonl_payloads(*payloads: bytes) -> bytes:
74
+ """Merge JSONL bodies without dropping rows present only in local cache."""
75
+ merged_lines: List[str] = []
76
+ seen = set()
77
+ for payload in payloads:
78
+ text = payload.decode("utf-8", errors="replace")
79
+ for line in text.splitlines():
80
+ if not line.strip() or line in seen:
81
+ continue
82
+ merged_lines.append(line)
83
+ seen.add(line)
84
+ return ("\n".join(merged_lines) + ("\n" if merged_lines else "")).encode("utf-8")
85
+
86
+
87
+ def _drop_trailing_jsonl_lines(payload: bytes, lines_to_drop: Iterable[str]) -> bytes:
88
+ drop_counts: Dict[str, int] = {}
89
+ for line in lines_to_drop:
90
+ key = line.rstrip("\n")
91
+ drop_counts[key] = drop_counts.get(key, 0) + 1
92
+ if not drop_counts:
93
+ return payload
94
+
95
+ text = payload.decode("utf-8", errors="replace")
96
+ kept_reversed: List[str] = []
97
+ for line in reversed(text.splitlines()):
98
+ count = drop_counts.get(line, 0)
99
+ if count:
100
+ drop_counts[line] = count - 1
101
+ continue
102
+ kept_reversed.append(line)
103
+ kept = list(reversed(kept_reversed))
104
+ return ("\n".join(kept) + ("\n" if kept else "")).encode("utf-8")
105
 
106
 
107
  class ContradictionLog:
 
123
  self._entries: deque = deque(maxlen=MAX_IN_MEMORY)
124
  self._total: int = 0
125
  self._type_counts: Dict[str, int] = {}
126
+ self._s3 = None
127
+ self._pending_lines: List[str] = []
128
+ self._hydrate_from_disk()
129
+
130
+ def _hydrate_from_disk(self) -> None:
131
+ """Initialize counters from the restored append-only ledger."""
132
+ if not CONTRADICTION_LOG.exists():
133
+ return
134
+ records = 0
135
+ max_number = 0
136
+ type_counts: Dict[str, int] = {}
137
+ try:
138
+ with open(CONTRADICTION_LOG, "r", encoding="utf-8") as f:
139
+ for line in f:
140
+ line = line.strip()
141
+ if not line:
142
+ continue
143
+ try:
144
+ entry = json.loads(line)
145
+ except json.JSONDecodeError:
146
+ continue
147
+ records += 1
148
+ self._entries.append(entry)
149
+ try:
150
+ max_number = max(
151
+ max_number,
152
+ int(entry.get("contradiction_number", 0)),
153
+ )
154
+ except (TypeError, ValueError):
155
+ pass
156
+ contradiction_type = entry.get("type")
157
+ if contradiction_type:
158
+ type_counts[contradiction_type] = (
159
+ type_counts.get(contradiction_type, 0) + 1
160
+ )
161
+ self._total = max(records, max_number)
162
+ self._type_counts = type_counts
163
+ logger.info(
164
+ "ContradictionLog: loaded %d existing records from %s",
165
+ records, CONTRADICTION_LOG,
166
+ )
167
+ except Exception as e:
168
+ logger.warning("ContradictionLog: could not read ledger: %s", e)
169
 
170
  def record(
171
  self,
 
211
  f.write(json.dumps(entry, ensure_ascii=False) + "\n")
212
  except Exception as e:
213
  logger.debug("Contradiction persist failed: %s", e)
214
+ self._queue_for_s3(entry)
215
 
216
  return entry
217
 
 
281
  "type_counts": dict(self._type_counts),
282
  "recent_count": len(self._entries),
283
  }
284
+
285
+ def _get_s3(self):
286
+ if self._s3 is not None:
287
+ return self._s3
288
+ sink_enabled = os.environ.get("CONTRADICTION_S3_SINK", "1").strip().lower()
289
+ if sink_enabled in {"0", "false", "no", "off", ""}:
290
+ return None
291
+ if not _has_s3_credentials():
292
+ return None
293
+ try:
294
+ import boto3
295
+ self._s3 = boto3.client("s3", region_name=S3_REGION)
296
+ return self._s3
297
+ except Exception as e:
298
+ logger.debug("Contradiction S3 client unavailable: %s", e)
299
+ return None
300
+
301
+ def _queue_for_s3(self, entry: Dict[str, Any]) -> None:
302
+ sink_enabled = os.environ.get("CONTRADICTION_S3_SINK", "1").strip().lower()
303
+ if sink_enabled in {"0", "false", "no", "off", ""}:
304
+ return
305
+ if getattr(self, "_s3", None) is None and not _has_s3_credentials():
306
+ return
307
+ if not hasattr(self, "_pending_lines"):
308
+ self._pending_lines = []
309
+ line = json.dumps(entry, ensure_ascii=False) + "\n"
310
+ self._pending_lines.append(line)
311
+
312
+ def flush_pending_to_s3(self, max_lines: int = 200) -> None:
313
+ """Flush queued contradiction records to S3 on heartbeat cadence."""
314
+ pending = getattr(self, "_pending_lines", [])
315
+ if not pending:
316
+ return
317
+ s3 = self._get_s3()
318
+ if not s3:
319
+ return
320
+ batch_size = max(1, max_lines)
321
+ batch = pending[:batch_size]
322
+ try:
323
+ try:
324
+ resp = s3.get_object(Bucket=S3_BUCKET, Key=S3_KEY)
325
+ existing = resp["Body"].read()
326
+ except Exception as e:
327
+ if _is_missing_s3_key_error(s3, e):
328
+ existing = b""
329
+ else:
330
+ raise
331
+ try:
332
+ local_payload = CONTRADICTION_LOG.read_bytes()
333
+ except OSError:
334
+ local_payload = b""
335
+ local_payload = _drop_trailing_jsonl_lines(
336
+ local_payload,
337
+ pending[len(batch):],
338
+ )
339
+ pending_payload = "".join(batch).encode("utf-8")
340
+ body = _merge_jsonl_payloads(
341
+ existing,
342
+ local_payload,
343
+ pending_payload,
344
+ )
345
+ s3.put_object(
346
+ Bucket=S3_BUCKET,
347
+ Key=S3_KEY,
348
+ Body=body,
349
+ ContentType="application/x-ndjson",
350
+ )
351
+ del pending[: len(batch)]
352
+ except Exception as e:
353
+ logger.debug("Contradiction S3 flush failed: %s", e)
354
+
355
+ def _append_to_s3(self, entry: Dict[str, Any]) -> None:
356
+ """Compatibility path; runtime uses heartbeat flush."""
357
+ self._queue_for_s3(entry)
358
+ self.flush_pending_to_s3(max_lines=1)
elpidaapp/divergence_engine.py CHANGED
@@ -20,9 +20,11 @@ import sys
20
  import os
21
  import json
22
  import time
 
23
  import logging
24
  import concurrent.futures
25
- from datetime import datetime
 
26
  from typing import Dict, List, Optional, Any
27
  from pathlib import Path
28
 
@@ -34,6 +36,63 @@ from elpida_config import DOMAINS, AXIOMS, AXIOM_RATIOS
34
 
35
  logger = logging.getLogger("elpidaapp.divergence")
36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
  # ── Integration layer (lazy imports β€” optional dependencies) ──
38
  _governance_client = None
39
  _frozen_mind = None
@@ -364,6 +423,27 @@ class DivergenceEngine:
364
  "kaya_events": kaya_events,
365
  }
366
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
367
  if save_to:
368
  path = Path(save_to)
369
  path.parent.mkdir(parents=True, exist_ok=True)
 
20
  import os
21
  import json
22
  import time
23
+ import hashlib
24
  import logging
25
  import concurrent.futures
26
+ import uuid
27
+ from datetime import datetime, timezone
28
  from typing import Dict, List, Optional, Any
29
  from pathlib import Path
30
 
 
36
 
37
  logger = logging.getLogger("elpidaapp.divergence")
38
 
39
+
40
+ def _push_audit_record_to_s3(record: Dict[str, Any]) -> Optional[Dict[str, Any]]:
41
+ """Best-effort WORLD audit sink. Never raises on failure."""
42
+ sink_enabled = os.environ.get("DIVERGENCE_AUDIT_S3_SINK", "1").strip().lower()
43
+ if sink_enabled in {"0", "false", "no", "off", ""}:
44
+ return None
45
+
46
+ if not (
47
+ os.environ.get("AWS_ACCESS_KEY_ID")
48
+ and os.environ.get("AWS_SECRET_ACCESS_KEY")
49
+ ):
50
+ return None
51
+
52
+ try:
53
+ import boto3
54
+ from botocore.exceptions import BotoCoreError, ClientError
55
+ except Exception:
56
+ return None
57
+
58
+ bucket = os.environ.get("DIVERGENCE_AUDIT_BUCKET", "elpida-external-interfaces")
59
+ region = os.environ.get("DIVERGENCE_AUDIT_REGION", "eu-north-1")
60
+ prefix = os.environ.get("DIVERGENCE_AUDIT_PREFIX", "audits/")
61
+ if not prefix.endswith("/"):
62
+ prefix += "/"
63
+
64
+ try:
65
+ body = json.dumps(record, ensure_ascii=False).encode("utf-8")
66
+ ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
67
+ problem_hash = hashlib.sha256(
68
+ str(record.get("problem", "")).encode("utf-8")
69
+ ).hexdigest()[:12]
70
+ nonce = uuid.uuid4().hex
71
+ key = f"{prefix}{ts}_{problem_hash}_{nonce}.json"
72
+ s3 = boto3.client("s3", region_name=region)
73
+ resp = s3.put_object(
74
+ Bucket=bucket,
75
+ Key=key,
76
+ Body=body,
77
+ ContentType="application/json; charset=utf-8",
78
+ Metadata={
79
+ "elpida-record-type": "audit",
80
+ "elpida-falsifiable-core": "true",
81
+ },
82
+ )
83
+ return {
84
+ "bucket": bucket,
85
+ "key": key,
86
+ "etag": (resp.get("ETag") or "").strip('"'),
87
+ "size_bytes": len(body),
88
+ }
89
+ except (BotoCoreError, ClientError) as e:
90
+ logger.warning("Audit S3 sink failed: %s", e)
91
+ return None
92
+ except Exception as e: # pragma: no cover - defensive never-raise
93
+ logger.warning("Audit S3 sink unexpected error: %s", e)
94
+ return None
95
+
96
  # ── Integration layer (lazy imports β€” optional dependencies) ──
97
  _governance_client = None
98
  _frozen_mind = None
 
423
  "kaya_events": kaya_events,
424
  }
425
 
426
+ result["falsifiable_core"] = {
427
+ "single_model": baseline,
428
+ "domain_responses": domain_responses,
429
+ "divergence": divergence,
430
+ }
431
+ result["unfalsifiable_frame"] = {
432
+ "falsifiable": False,
433
+ "synthesis": synthesis,
434
+ }
435
+ audit_record = {
436
+ "record_type": "divergence_audit",
437
+ "problem": problem,
438
+ "timestamp": ts,
439
+ "total_time_s": elapsed,
440
+ "falsifiable_core": result["falsifiable_core"],
441
+ "unfalsifiable_frame": result["unfalsifiable_frame"],
442
+ "governance_check": governance_check,
443
+ "frozen_mind_context": frozen_mind_context,
444
+ }
445
+ result["audit_s3_receipt"] = _push_audit_record_to_s3(audit_record)
446
+
447
  if save_to:
448
  path = Path(save_to)
449
  path.parent.mkdir(parents=True, exist_ok=True)
elpidaapp/domain_grounding.py CHANGED
@@ -3,10 +3,11 @@ Domain Internet Grounding
3
  =========================
4
 
5
  Gives Elpida's 16 domains access to live web data.
6
- Two search backends, automatic failover:
7
 
8
- 1. DuckDuckGo text search (primary β€” zero API keys)
9
- 2. Wikipedia API (fallback β€” always available, English content)
 
10
 
11
  Each domain query can be optionally augmented with real-world context
12
  before the LLM prompt is built. The grounding fetches 3-5 results
@@ -27,11 +28,12 @@ Architecture:
27
  """
28
 
29
  import logging
 
30
  import re
31
  import time
32
  import threading
33
- from functools import lru_cache
34
- from typing import Optional, List, Dict
35
 
36
  import requests
37
 
@@ -42,6 +44,52 @@ _lock = threading.Lock()
42
  _last_search_time = 0.0
43
  _RATE_LIMIT_S = 3.0
44
  _TIMEOUT_S = 8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
 
46
 
47
  def _rate_limit():
@@ -75,7 +123,11 @@ def _search_ddg(query: str, max_results: int) -> List[Dict[str, str]]:
75
  if _is_english(r.get("title", "") + r.get("body", ""))
76
  ]
77
  return [
78
- {"title": r.get("title", ""), "body": r.get("body", "")}
 
 
 
 
79
  for r in english[:max_results]
80
  ]
81
  except Exception as e:
@@ -83,6 +135,42 @@ def _search_ddg(query: str, max_results: int) -> List[Dict[str, str]]:
83
  return []
84
 
85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86
  def _search_wikipedia(query: str, max_results: int) -> List[Dict[str, str]]:
87
  """Search via Wikipedia API (always English, always available)."""
88
  try:
@@ -107,19 +195,24 @@ def _search_wikipedia(query: str, max_results: int) -> List[Dict[str, str]]:
107
  # Strip HTML tags from snippet
108
  snippet = re.sub(r'<[^>]+>', '', item.get("snippet", ""))
109
  if title and snippet:
110
- results.append({"title": title, "body": snippet})
 
 
 
 
 
 
111
  return results[:max_results]
112
  except Exception as e:
113
  logger.debug("Wikipedia search failed: %s", e)
114
  return []
115
 
116
 
117
- @lru_cache(maxsize=128)
118
  def ground_query(query: str, max_results: int = 3) -> str:
119
  """
120
  Search the web for context relevant to a domain query.
121
 
122
- Tries DuckDuckGo first, falls back to Wikipedia API.
123
 
124
  Args:
125
  query: The search query (typically the problem + domain keywords)
@@ -128,16 +221,39 @@ def ground_query(query: str, max_results: int = 3) -> str:
128
  Returns:
129
  Formatted string of web snippets, or "" on any failure.
130
  """
131
- _rate_limit()
 
 
 
 
132
 
133
- # Try DuckDuckGo first
134
- results = _search_ddg(query, max_results)
135
 
136
- # Fallback to Wikipedia if DDG returned nothing
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
137
  if not results:
138
  results = _search_wikipedia(query, max_results)
 
 
 
139
 
140
  if not results:
 
141
  return ""
142
 
143
  snippets = []
@@ -150,11 +266,17 @@ def ground_query(query: str, max_results: int = 3) -> str:
150
  if not snippets:
151
  return ""
152
 
153
- return (
154
  "─── LIVE WEB CONTEXT ───\n"
155
  + "\n".join(snippets)
156
- + "\n─── END WEB CONTEXT ───"
157
  )
 
 
 
 
 
 
158
 
159
 
160
  def ground_domain_query(
 
3
  =========================
4
 
5
  Gives Elpida's 16 domains access to live web data.
6
+ Three search backends, automatic failover:
7
 
8
+ 1. Brave Search API (primary when BRAVE_API_KEY is configured)
9
+ 2. DuckDuckGo text search (zero-key fallback)
10
+ 3. Wikipedia API (fallback β€” always available, English content)
11
 
12
  Each domain query can be optionally augmented with real-world context
13
  before the LLM prompt is built. The grounding fetches 3-5 results
 
28
  """
29
 
30
  import logging
31
+ import os
32
  import re
33
  import time
34
  import threading
35
+ from collections import OrderedDict
36
+ from typing import Optional, List, Dict, Tuple
37
 
38
  import requests
39
 
 
44
  _last_search_time = 0.0
45
  _RATE_LIMIT_S = 3.0
46
  _TIMEOUT_S = 8
47
+ _GROUNDING_COUNTS = {
48
+ "brave_hits": 0,
49
+ "ddg_hits": 0,
50
+ "wiki_fallbacks": 0,
51
+ "empty_results": 0,
52
+ }
53
+ _GROUNDING_COUNTS_LOCK = threading.Lock()
54
+ _GROUNDING_CACHE: "OrderedDict[Tuple[str, int, bool], str]" = OrderedDict()
55
+ _GROUNDING_CACHE_MAXSIZE = 128
56
+ _GROUNDING_CACHE_LOCK = threading.Lock()
57
+
58
+
59
+ def _bump_counter(counter: str) -> None:
60
+ with _GROUNDING_COUNTS_LOCK:
61
+ _GROUNDING_COUNTS[counter] = _GROUNDING_COUNTS.get(counter, 0) + 1
62
+
63
+
64
+ def grounding_stats() -> Dict[str, int]:
65
+ """Return backend hit/fallback counters for runtime visibility."""
66
+ with _GROUNDING_COUNTS_LOCK:
67
+ return dict(_GROUNDING_COUNTS)
68
+
69
+
70
+ def _brave_configured() -> bool:
71
+ return bool(os.environ.get("BRAVE_API_KEY", "").strip())
72
+
73
+
74
+ def _cache_get(key: Tuple[str, int, bool]) -> Optional[str]:
75
+ with _GROUNDING_CACHE_LOCK:
76
+ cached = _GROUNDING_CACHE.get(key)
77
+ if cached is not None:
78
+ _GROUNDING_CACHE.move_to_end(key)
79
+ return cached
80
+
81
+
82
+ def _cache_set(key: Tuple[str, int, bool], value: str) -> None:
83
+ with _GROUNDING_CACHE_LOCK:
84
+ _GROUNDING_CACHE[key] = value
85
+ _GROUNDING_CACHE.move_to_end(key)
86
+ while len(_GROUNDING_CACHE) > _GROUNDING_CACHE_MAXSIZE:
87
+ _GROUNDING_CACHE.popitem(last=False)
88
+
89
+
90
+ def _cache_clear() -> None:
91
+ with _GROUNDING_CACHE_LOCK:
92
+ _GROUNDING_CACHE.clear()
93
 
94
 
95
  def _rate_limit():
 
123
  if _is_english(r.get("title", "") + r.get("body", ""))
124
  ]
125
  return [
126
+ {
127
+ "title": r.get("title", ""),
128
+ "body": r.get("body", ""),
129
+ "href": r.get("href", ""),
130
+ }
131
  for r in english[:max_results]
132
  ]
133
  except Exception as e:
 
135
  return []
136
 
137
 
138
+ def _search_brave(query: str, max_results: int) -> List[Dict[str, str]]:
139
+ """Search via Brave Search API when BRAVE_API_KEY is configured."""
140
+ api_key = os.environ.get("BRAVE_API_KEY", "").strip()
141
+ if not api_key:
142
+ return []
143
+
144
+ try:
145
+ resp = requests.get(
146
+ "https://api.search.brave.com/res/v1/web/search",
147
+ params={"q": query, "count": max_results},
148
+ headers={
149
+ "Accept": "application/json",
150
+ "X-Subscription-Token": api_key,
151
+ },
152
+ timeout=_TIMEOUT_S,
153
+ )
154
+ resp.raise_for_status()
155
+ data = resp.json()
156
+ results = []
157
+ for item in data.get("web", {}).get("results", []):
158
+ title = item.get("title", "")
159
+ description = item.get("description", "")
160
+ if title and description and _is_english(title + description):
161
+ results.append(
162
+ {
163
+ "title": title,
164
+ "body": description,
165
+ "href": item.get("url", ""),
166
+ }
167
+ )
168
+ return results[:max_results]
169
+ except Exception as e:
170
+ logger.debug("Brave search failed: %s", e)
171
+ return []
172
+
173
+
174
  def _search_wikipedia(query: str, max_results: int) -> List[Dict[str, str]]:
175
  """Search via Wikipedia API (always English, always available)."""
176
  try:
 
195
  # Strip HTML tags from snippet
196
  snippet = re.sub(r'<[^>]+>', '', item.get("snippet", ""))
197
  if title and snippet:
198
+ results.append(
199
+ {
200
+ "title": title,
201
+ "body": snippet,
202
+ "href": f"https://en.wikipedia.org/wiki/{title.replace(' ', '_')}",
203
+ }
204
+ )
205
  return results[:max_results]
206
  except Exception as e:
207
  logger.debug("Wikipedia search failed: %s", e)
208
  return []
209
 
210
 
 
211
  def ground_query(query: str, max_results: int = 3) -> str:
212
  """
213
  Search the web for context relevant to a domain query.
214
 
215
+ Tries Brave first when configured, then DuckDuckGo, then Wikipedia.
216
 
217
  Args:
218
  query: The search query (typically the problem + domain keywords)
 
221
  Returns:
222
  Formatted string of web snippets, or "" on any failure.
223
  """
224
+ brave_enabled = _brave_configured()
225
+ cache_key = (query, max_results, brave_enabled)
226
+ cached = _cache_get(cache_key)
227
+ if cached is not None:
228
+ return cached
229
 
230
+ _rate_limit()
 
231
 
232
+ provider = "unknown"
233
+ cacheable = False
234
+ # Try Brave first when key is available.
235
+ results = _search_brave(query, max_results)
236
+ if results:
237
+ _bump_counter("brave_hits")
238
+ provider = "brave"
239
+ cacheable = True
240
+ else:
241
+ # Then DuckDuckGo.
242
+ results = _search_ddg(query, max_results)
243
+ if results:
244
+ _bump_counter("ddg_hits")
245
+ provider = "duckduckgo"
246
+ cacheable = not brave_enabled
247
+
248
+ # Fallback to Wikipedia if Brave/DDG returned nothing.
249
  if not results:
250
  results = _search_wikipedia(query, max_results)
251
+ if results:
252
+ _bump_counter("wiki_fallbacks")
253
+ provider = "wikipedia"
254
 
255
  if not results:
256
+ _bump_counter("empty_results")
257
  return ""
258
 
259
  snippets = []
 
266
  if not snippets:
267
  return ""
268
 
269
+ output = (
270
  "─── LIVE WEB CONTEXT ───\n"
271
  + "\n".join(snippets)
272
+ + f"\n─── END WEB CONTEXT ───\n<!-- provider: {provider} -->"
273
  )
274
+ if cacheable:
275
+ _cache_set(cache_key, output)
276
+ return output
277
+
278
+
279
+ ground_query.cache_clear = _cache_clear # type: ignore[attr-defined]
280
 
281
 
282
  def ground_domain_query(
elpidaapp/governance_event_spine.py CHANGED
@@ -10,16 +10,94 @@ from __future__ import annotations
10
 
11
  import hashlib
12
  import json
 
 
13
  from dataclasses import asdict, dataclass
14
  from datetime import datetime, timezone
15
  from pathlib import Path
16
  from typing import Any, Dict, Iterable, List, Optional
17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
 
19
  def _utc_now() -> str:
20
  return datetime.now(timezone.utc).isoformat()
21
 
22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  def _stable_id(prefix: str, *parts: Any) -> str:
24
  raw = "::".join(str(part) for part in parts if part is not None)
25
  digest = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]
@@ -103,6 +181,97 @@ class GovernanceEventSpine:
103
  self.ark_updates_path = self.base_dir / "ark_updates.jsonl"
104
  self.federation_stats_path = self.base_dir / "federation_stats.jsonl"
105
  self.living_axioms_path = self.base_dir / "living_axioms.jsonl"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
106
 
107
  def append_dilemma(
108
  self,
@@ -122,7 +291,7 @@ class GovernanceEventSpine:
122
  "timestamp": timestamp,
123
  "payload": payload,
124
  }
125
- _append_jsonl(self.internal_dilemmas_path, record)
126
  self._append_stats("internal_dilemmas", dilemma_id, source=source)
127
  return DilemmaRef(
128
  dilemma_id=dilemma_id,
@@ -151,7 +320,7 @@ class GovernanceEventSpine:
151
  "timestamp": timestamp,
152
  "payload": advisory,
153
  }
154
- _append_jsonl(self.oracle_advisories_path, record)
155
  self._append_stats("oracle_advisories", advisory_id, source=source)
156
  return AdvisoryRef(
157
  advisory_id=advisory_id,
@@ -182,7 +351,9 @@ class GovernanceEventSpine:
182
  "timestamp": timestamp,
183
  "payload": resolution,
184
  }
185
- _append_jsonl(self.synthesis_resolutions_path, record)
 
 
186
  self._append_stats("synthesis_resolutions", synthesis_id, source=source)
187
  return ResolutionRef(
188
  synthesis_id=synthesis_id,
@@ -213,7 +384,11 @@ class GovernanceEventSpine:
213
  "timestamp": timestamp,
214
  "payload": decision,
215
  }
216
- _append_jsonl(self.synthesis_council_decisions_path, record)
 
 
 
 
217
  self._append_stats("synthesis_council_decisions", synthesis_id, source=source)
218
  return ResolutionRef(
219
  synthesis_id=synthesis_id,
@@ -242,7 +417,7 @@ class GovernanceEventSpine:
242
  "timestamp": timestamp,
243
  "payload": event,
244
  }
245
- _append_jsonl(self.federation_events_path, record)
246
  self._append_stats("federation_events", event_id, source=source)
247
  return FederationEventRef(
248
  federation_event_id=event_id,
@@ -275,7 +450,7 @@ class GovernanceEventSpine:
275
  "timestamp": timestamp,
276
  "payload": update,
277
  }
278
- _append_jsonl(self.ark_updates_path, record)
279
  self._append_stats("ark_updates", update_id, source=source)
280
  return ArkUpdateRef(
281
  ark_update_id=update_id,
@@ -285,7 +460,7 @@ class GovernanceEventSpine:
285
  )
286
 
287
  def append_living_axiom(self, entry: Dict[str, Any]) -> None:
288
- _append_jsonl(self.living_axioms_path, entry)
289
 
290
  def iter_oracle_advisories(self) -> Iterable[Dict[str, Any]]:
291
  return _read_jsonl(self.oracle_advisories_path)
@@ -303,5 +478,4 @@ class GovernanceEventSpine:
303
  "source": source,
304
  "timestamp": _utc_now(),
305
  }
306
- _append_jsonl(self.federation_stats_path, record)
307
-
 
10
 
11
  import hashlib
12
  import json
13
+ import logging
14
+ import os
15
  from dataclasses import asdict, dataclass
16
  from datetime import datetime, timezone
17
  from pathlib import Path
18
  from typing import Any, Dict, Iterable, List, Optional
19
 
20
+ logger = logging.getLogger("elpida.governance_event_spine")
21
+
22
+ S3_BUCKET = os.environ.get("SPINE_S3_BUCKET", "elpida-body-evolution")
23
+ S3_REGION = os.environ.get("SPINE_S3_REGION", "eu-north-1")
24
+ S3_KEY_MAP = {
25
+ "internal_dilemmas": "federation/internal_dilemmas.jsonl",
26
+ "oracle_advisories": "federation/oracle_advisories.jsonl",
27
+ "synthesis_council_decisions": "federation/synthesis_council_decisions.jsonl",
28
+ "synthesis_resolutions": "federation/synthesis_resolutions.jsonl",
29
+ "federation_events": "federation/federation_events.jsonl",
30
+ "ark_updates": "federation/ark_updates.jsonl",
31
+ "federation_stats": "federation/federation_stats.jsonl",
32
+ }
33
+ S3_PATH_ATTR_MAP = {
34
+ "internal_dilemmas": "internal_dilemmas_path",
35
+ "oracle_advisories": "oracle_advisories_path",
36
+ "synthesis_council_decisions": "synthesis_council_decisions_path",
37
+ "synthesis_resolutions": "synthesis_resolutions_path",
38
+ "federation_events": "federation_events_path",
39
+ "ark_updates": "ark_updates_path",
40
+ "federation_stats": "federation_stats_path",
41
+ }
42
+ _MISSING_S3_KEY_CODES = {"NoSuchKey", "404", "NotFound"}
43
+
44
 
45
  def _utc_now() -> str:
46
  return datetime.now(timezone.utc).isoformat()
47
 
48
 
49
+ def _is_missing_s3_key_error(s3, exc: Exception) -> bool:
50
+ no_such_key = getattr(getattr(s3, "exceptions", None), "NoSuchKey", None)
51
+ if isinstance(no_such_key, type) and isinstance(exc, no_such_key):
52
+ return True
53
+ response = getattr(exc, "response", None)
54
+ if isinstance(response, dict):
55
+ code = response.get("Error", {}).get("Code")
56
+ return str(code) in _MISSING_S3_KEY_CODES
57
+ return False
58
+
59
+
60
+ def _has_s3_credentials() -> bool:
61
+ return bool(
62
+ os.environ.get("AWS_ACCESS_KEY_ID")
63
+ and os.environ.get("AWS_SECRET_ACCESS_KEY")
64
+ )
65
+
66
+
67
+ def _merge_jsonl_payloads(*payloads: bytes) -> bytes:
68
+ """Merge JSONL bodies without dropping rows present only in local cache."""
69
+ merged_lines: List[str] = []
70
+ seen = set()
71
+ for payload in payloads:
72
+ text = payload.decode("utf-8", errors="replace")
73
+ for line in text.splitlines():
74
+ if not line.strip() or line in seen:
75
+ continue
76
+ merged_lines.append(line)
77
+ seen.add(line)
78
+ return ("\n".join(merged_lines) + ("\n" if merged_lines else "")).encode("utf-8")
79
+
80
+
81
+ def _drop_trailing_jsonl_lines(payload: bytes, lines_to_drop: Iterable[str]) -> bytes:
82
+ drop_counts: Dict[str, int] = {}
83
+ for line in lines_to_drop:
84
+ key = line.rstrip("\n")
85
+ drop_counts[key] = drop_counts.get(key, 0) + 1
86
+ if not drop_counts:
87
+ return payload
88
+
89
+ text = payload.decode("utf-8", errors="replace")
90
+ kept_reversed: List[str] = []
91
+ for line in reversed(text.splitlines()):
92
+ count = drop_counts.get(line, 0)
93
+ if count:
94
+ drop_counts[line] = count - 1
95
+ continue
96
+ kept_reversed.append(line)
97
+ kept = list(reversed(kept_reversed))
98
+ return ("\n".join(kept) + ("\n" if kept else "")).encode("utf-8")
99
+
100
+
101
  def _stable_id(prefix: str, *parts: Any) -> str:
102
  raw = "::".join(str(part) for part in parts if part is not None)
103
  digest = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:12]
 
181
  self.ark_updates_path = self.base_dir / "ark_updates.jsonl"
182
  self.federation_stats_path = self.base_dir / "federation_stats.jsonl"
183
  self.living_axioms_path = self.base_dir / "living_axioms.jsonl"
184
+ self._s3 = None
185
+ self._pending_lines: Dict[str, List[str]] = {}
186
+
187
+ def _append_stream(self, path: Path, payload: Dict[str, Any], stream: Optional[str] = None) -> None:
188
+ _append_jsonl(path, payload)
189
+ if stream:
190
+ self._queue_for_s3(stream, payload)
191
+
192
+ def _queue_for_s3(self, stream: str, payload: Dict[str, Any]) -> None:
193
+ if stream not in S3_KEY_MAP:
194
+ return
195
+ sink_enabled = os.environ.get("SPINE_S3_SINK", "1").strip().lower()
196
+ if sink_enabled in {"0", "false", "no", "off", ""}:
197
+ return
198
+ if getattr(self, "_s3", None) is None and not _has_s3_credentials():
199
+ return
200
+ line = json.dumps(payload, ensure_ascii=False) + "\n"
201
+ self._pending_lines.setdefault(stream, []).append(line)
202
+
203
+ def _get_s3(self):
204
+ if self._s3 is not None:
205
+ return self._s3
206
+ if not _has_s3_credentials():
207
+ return None
208
+ try:
209
+ import boto3
210
+ self._s3 = boto3.client("s3", region_name=S3_REGION)
211
+ return self._s3
212
+ except Exception as e:
213
+ logger.debug("Spine S3 client unavailable: %s", e)
214
+ return None
215
+
216
+ def _read_local_stream_payload(
217
+ self,
218
+ stream: str,
219
+ exclude_lines: Optional[Iterable[str]] = None,
220
+ ) -> bytes:
221
+ path_attr = S3_PATH_ATTR_MAP.get(stream)
222
+ path = getattr(self, path_attr, None) if path_attr else None
223
+ if path is None:
224
+ return b""
225
+ try:
226
+ payload = Path(path).read_bytes()
227
+ except OSError:
228
+ return b""
229
+ return _drop_trailing_jsonl_lines(payload, exclude_lines or [])
230
+
231
+ def flush_pending_to_s3(self, max_lines_per_stream: int = 200) -> None:
232
+ """Flush queued records to S3 in bounded batches (heartbeat cadence)."""
233
+ if not self._pending_lines:
234
+ return
235
+ s3 = self._get_s3()
236
+ if not s3:
237
+ return
238
+
239
+ for stream, pending in list(self._pending_lines.items()):
240
+ if not pending:
241
+ continue
242
+ key = S3_KEY_MAP.get(stream)
243
+ if not key:
244
+ continue
245
+ batch_size = max(1, max_lines_per_stream)
246
+ batch = pending[:batch_size]
247
+ try:
248
+ try:
249
+ resp = s3.get_object(Bucket=S3_BUCKET, Key=key)
250
+ existing = resp["Body"].read()
251
+ except Exception as e:
252
+ if _is_missing_s3_key_error(s3, e):
253
+ existing = b""
254
+ else:
255
+ raise
256
+ pending_payload = "".join(batch).encode("utf-8")
257
+ local_payload = self._read_local_stream_payload(
258
+ stream,
259
+ exclude_lines=pending[len(batch):],
260
+ )
261
+ body = _merge_jsonl_payloads(
262
+ existing,
263
+ local_payload,
264
+ pending_payload,
265
+ )
266
+ s3.put_object(
267
+ Bucket=S3_BUCKET,
268
+ Key=key,
269
+ Body=body,
270
+ ContentType="application/x-ndjson",
271
+ )
272
+ del pending[: len(batch)]
273
+ except Exception as e:
274
+ logger.debug("Spine S3 flush failed for %s: %s", stream, e)
275
 
276
  def append_dilemma(
277
  self,
 
291
  "timestamp": timestamp,
292
  "payload": payload,
293
  }
294
+ self._append_stream(self.internal_dilemmas_path, record, "internal_dilemmas")
295
  self._append_stats("internal_dilemmas", dilemma_id, source=source)
296
  return DilemmaRef(
297
  dilemma_id=dilemma_id,
 
320
  "timestamp": timestamp,
321
  "payload": advisory,
322
  }
323
+ self._append_stream(self.oracle_advisories_path, record, "oracle_advisories")
324
  self._append_stats("oracle_advisories", advisory_id, source=source)
325
  return AdvisoryRef(
326
  advisory_id=advisory_id,
 
351
  "timestamp": timestamp,
352
  "payload": resolution,
353
  }
354
+ self._append_stream(
355
+ self.synthesis_resolutions_path, record, "synthesis_resolutions"
356
+ )
357
  self._append_stats("synthesis_resolutions", synthesis_id, source=source)
358
  return ResolutionRef(
359
  synthesis_id=synthesis_id,
 
384
  "timestamp": timestamp,
385
  "payload": decision,
386
  }
387
+ self._append_stream(
388
+ self.synthesis_council_decisions_path,
389
+ record,
390
+ "synthesis_council_decisions",
391
+ )
392
  self._append_stats("synthesis_council_decisions", synthesis_id, source=source)
393
  return ResolutionRef(
394
  synthesis_id=synthesis_id,
 
417
  "timestamp": timestamp,
418
  "payload": event,
419
  }
420
+ self._append_stream(self.federation_events_path, record, "federation_events")
421
  self._append_stats("federation_events", event_id, source=source)
422
  return FederationEventRef(
423
  federation_event_id=event_id,
 
450
  "timestamp": timestamp,
451
  "payload": update,
452
  }
453
+ self._append_stream(self.ark_updates_path, record, "ark_updates")
454
  self._append_stats("ark_updates", update_id, source=source)
455
  return ArkUpdateRef(
456
  ark_update_id=update_id,
 
460
  )
461
 
462
  def append_living_axiom(self, entry: Dict[str, Any]) -> None:
463
+ self._append_stream(self.living_axioms_path, entry)
464
 
465
  def iter_oracle_advisories(self) -> Iterable[Dict[str, Any]]:
466
  return _read_jsonl(self.oracle_advisories_path)
 
478
  "source": source,
479
  "timestamp": _utc_now(),
480
  }
481
+ self._append_stream(self.federation_stats_path, record, "federation_stats")
 
elpidaapp/parliament_cycle_engine.py CHANGED
@@ -2560,6 +2560,147 @@ class ParliamentCycleEngine:
2560
  else:
2561
  print(" πŸ“š D14 restore: no prior constitutional axioms in S3 yet")
2562
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2563
  def _emit_d16_execution(self, exec_type: str, proposal: str,
2564
  meta: Optional[Dict] = None) -> None:
2565
  """
@@ -2736,6 +2877,26 @@ class ParliamentCycleEngine:
2736
  - approval_rate, veto_exercised, axiom_frequency
2737
  - d15_broadcast_count
2738
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2739
  watch = self._watch.current()
2740
  heartbeat = {
2741
  "source": "BODY",
@@ -3953,6 +4114,9 @@ class ParliamentCycleEngine:
3953
  # across all previous spirals (cross-spiral awareness, mirrors MIND Ark fix).
3954
  self._restore_d15_broadcast_state()
3955
 
 
 
 
3956
  # D0↔D11 Body Bridge β€” restore arc coherence state from cache before first cycle.
3957
  try:
3958
  from elpidaapp.domain_0_11_connector_body import get_body_connector
 
2560
  else:
2561
  print(" πŸ“š D14 restore: no prior constitutional axioms in S3 yet")
2562
 
2563
+ def _restore_step3_bottom_lane_memory(self) -> None:
2564
+ """Restore Step-3 persistence files from federation/world S3 keys."""
2565
+ s3_bridge = self._get_s3()
2566
+ if s3_bridge is None:
2567
+ logger.warning("Step3 restore skipped β€” S3 unavailable")
2568
+ return
2569
+
2570
+ try:
2571
+ from s3_bridge import BUCKET_BODY, REGION_BODY, BUCKET_WORLD, REGION_WORLD
2572
+ except Exception as e:
2573
+ logger.warning("Step3 restore skipped β€” cannot import S3 constants: %s", e)
2574
+ return
2575
+
2576
+ body_s3 = s3_bridge._get_s3(REGION_BODY)
2577
+ world_s3 = s3_bridge._get_s3(REGION_WORLD)
2578
+ cache_dir = Path(__file__).resolve().parent.parent / "cache"
2579
+ restore_targets = [
2580
+ (
2581
+ body_s3,
2582
+ BUCKET_BODY,
2583
+ "federation/contradiction_log.jsonl",
2584
+ cache_dir / "contradiction_log.jsonl",
2585
+ ),
2586
+ (
2587
+ body_s3,
2588
+ BUCKET_BODY,
2589
+ "federation/sacrifice_ledger.jsonl",
2590
+ cache_dir / "governance_sacrifices.jsonl",
2591
+ ),
2592
+ (
2593
+ world_s3,
2594
+ BUCKET_WORLD,
2595
+ "proposals/polis_civic_memory.json",
2596
+ Path(__file__).resolve().parent.parent / "POLIS" / "polis_civic_memory.json",
2597
+ ),
2598
+ ]
2599
+
2600
+ spine = self._get_governance_event_spine()
2601
+ if spine:
2602
+ restore_targets.extend(
2603
+ [
2604
+ (body_s3, BUCKET_BODY, "federation/internal_dilemmas.jsonl", spine.internal_dilemmas_path),
2605
+ (body_s3, BUCKET_BODY, "federation/oracle_advisories.jsonl", spine.oracle_advisories_path),
2606
+ (body_s3, BUCKET_BODY, "federation/synthesis_council_decisions.jsonl", spine.synthesis_council_decisions_path),
2607
+ (body_s3, BUCKET_BODY, "federation/synthesis_resolutions.jsonl", spine.synthesis_resolutions_path),
2608
+ (body_s3, BUCKET_BODY, "federation/federation_events.jsonl", spine.federation_events_path),
2609
+ (body_s3, BUCKET_BODY, "federation/ark_updates.jsonl", spine.ark_updates_path),
2610
+ (body_s3, BUCKET_BODY, "federation/federation_stats.jsonl", spine.federation_stats_path),
2611
+ ]
2612
+ )
2613
+
2614
+ restored = 0
2615
+ for client, bucket, key, local_path in restore_targets:
2616
+ if client is None:
2617
+ continue
2618
+ try:
2619
+ resp = client.get_object(Bucket=bucket, Key=key)
2620
+ payload = resp["Body"].read()
2621
+ repair_s3 = False
2622
+
2623
+ if local_path.suffix == ".jsonl" and local_path.exists():
2624
+ local_payload = local_path.read_bytes()
2625
+ if local_payload != payload:
2626
+ payload, repair_s3 = self._merge_step3_jsonl_restore(
2627
+ payload, local_payload
2628
+ )
2629
+ elif local_path.exists():
2630
+ local_payload = local_path.read_bytes()
2631
+ if (
2632
+ local_payload != payload
2633
+ and self._step3_local_is_newer(local_path, resp.get("LastModified"))
2634
+ ):
2635
+ payload = local_payload
2636
+ repair_s3 = True
2637
+
2638
+ local_path.parent.mkdir(parents=True, exist_ok=True)
2639
+ local_path.write_bytes(payload)
2640
+ if repair_s3:
2641
+ self._repair_step3_s3_object(client, bucket, key, payload, local_path)
2642
+ restored += 1
2643
+ except Exception:
2644
+ continue
2645
+
2646
+ if restored:
2647
+ print(f" 🧷 Step3 bottom-lane restore: {restored} file(s) synced from S3")
2648
+
2649
+ @staticmethod
2650
+ def _merge_step3_jsonl_restore(
2651
+ s3_payload: bytes,
2652
+ local_payload: bytes,
2653
+ ) -> Tuple[bytes, bool]:
2654
+ """Merge S3 and local JSONL bodies without dropping local-only rows."""
2655
+ merged_lines: List[str] = []
2656
+ seen = set()
2657
+ for payload in (s3_payload, local_payload):
2658
+ text = payload.decode("utf-8", errors="replace")
2659
+ for line in text.splitlines():
2660
+ if not line.strip() or line in seen:
2661
+ continue
2662
+ merged_lines.append(line)
2663
+ seen.add(line)
2664
+ merged = (
2665
+ "\n".join(merged_lines) + ("\n" if merged_lines else "")
2666
+ ).encode("utf-8")
2667
+ return merged, merged != s3_payload
2668
+
2669
+ @staticmethod
2670
+ def _step3_local_is_newer(local_path: Path, s3_last_modified: Any) -> bool:
2671
+ if s3_last_modified is None:
2672
+ return False
2673
+ try:
2674
+ local_mtime = datetime.fromtimestamp(local_path.stat().st_mtime, timezone.utc)
2675
+ if s3_last_modified.tzinfo is None:
2676
+ s3_last_modified = s3_last_modified.replace(tzinfo=timezone.utc)
2677
+ return local_mtime > s3_last_modified
2678
+ except Exception:
2679
+ return False
2680
+
2681
+ @staticmethod
2682
+ def _repair_step3_s3_object(
2683
+ client: Any,
2684
+ bucket: str,
2685
+ key: str,
2686
+ payload: bytes,
2687
+ local_path: Path,
2688
+ ) -> None:
2689
+ content_type = (
2690
+ "application/x-ndjson"
2691
+ if local_path.suffix == ".jsonl"
2692
+ else "application/json; charset=utf-8"
2693
+ )
2694
+ try:
2695
+ client.put_object(
2696
+ Bucket=bucket,
2697
+ Key=key,
2698
+ Body=payload,
2699
+ ContentType=content_type,
2700
+ )
2701
+ except Exception as e:
2702
+ logger.debug("Step3 restore S3 repair failed for %s: %s", key, e)
2703
+
2704
  def _emit_d16_execution(self, exec_type: str, proposal: str,
2705
  meta: Optional[Dict] = None) -> None:
2706
  """
 
2877
  - approval_rate, veto_exercised, axiom_frequency
2878
  - d15_broadcast_count
2879
  """
2880
+ spine = self._get_governance_event_spine()
2881
+ if spine:
2882
+ try:
2883
+ spine.flush_pending_to_s3()
2884
+ except Exception as e:
2885
+ logger.debug("GovernanceEventSpine flush skipped: %s", e)
2886
+ if self._contradiction_log:
2887
+ try:
2888
+ self._contradiction_log.flush_pending_to_s3()
2889
+ except Exception as e:
2890
+ logger.debug("ContradictionLog flush skipped: %s", e)
2891
+ if self._gov_sacrifice_tracker:
2892
+ try:
2893
+ self._gov_sacrifice_tracker.flush_pending_to_s3()
2894
+ except Exception as e:
2895
+ logger.debug(
2896
+ "GovernanceSacrificeTracker flush skipped: %s",
2897
+ e,
2898
+ )
2899
+
2900
  watch = self._watch.current()
2901
  heartbeat = {
2902
  "source": "BODY",
 
4114
  # across all previous spirals (cross-spiral awareness, mirrors MIND Ark fix).
4115
  self._restore_d15_broadcast_state()
4116
 
4117
+ # Step 3 persistence β€” restore bottom-lane runtime files from S3.
4118
+ self._restore_step3_bottom_lane_memory()
4119
+
4120
  # D0↔D11 Body Bridge β€” restore arc coherence state from cache before first cycle.
4121
  try:
4122
  from elpidaapp.domain_0_11_connector_body import get_body_connector
elpidaapp/polis_bridge.py CHANGED
@@ -35,6 +35,7 @@ Usage in parliament_cycle_engine.py::
35
 
36
  import json
37
  import logging
 
38
  from pathlib import Path
39
  from datetime import datetime, timezone
40
  from typing import Dict, List, Optional, Any, Tuple
@@ -65,11 +66,26 @@ for p_ax, a_list in P_TO_A_MAP.items():
65
 
66
  # POLIS civic memory default path (relative to repo root)
67
  DEFAULT_CIVIC_MEMORY = "POLIS/polis_civic_memory.json"
 
 
 
 
68
 
69
  # How many cycles between POLIS bridge consultations (Fibonacci)
70
  POLIS_CYCLE_INTERVAL = 34
71
 
72
 
 
 
 
 
 
 
 
 
 
 
 
73
  class PolisBridge:
74
  """
75
  Bridge between POLIS civic contradictions and Elpida parliament.
@@ -100,6 +116,7 @@ class PolisBridge:
100
 
101
  self._last_processed_id: Optional[str] = None
102
  self._processed_ids: set = set()
 
103
 
104
  if self._path.exists():
105
  logger.info(
@@ -116,6 +133,14 @@ class PolisBridge:
116
 
117
  def _load_memory(self) -> Dict[str, Any]:
118
  """Load the full POLIS civic memory JSON."""
 
 
 
 
 
 
 
 
119
  if not self._path.exists():
120
  return {}
121
  try:
@@ -125,6 +150,101 @@ class PolisBridge:
125
  logger.warning("Cannot read POLIS civic memory: %s", e)
126
  return {}
127
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
128
  def get_held_contradictions(self) -> List[Dict[str, Any]]:
129
  """Return all HELD (unresolved) contradictions from POLIS memory."""
130
  memory = self._load_memory()
@@ -340,6 +460,7 @@ class PolisBridge:
340
  # Write back to file
341
  with open(self._path, "w", encoding="utf-8") as f:
342
  json.dump(memory, f, indent=2, ensure_ascii=False)
 
343
 
344
  logger.info(
345
  "POLIS branch written: %s β†’ %s (%s)",
 
35
 
36
  import json
37
  import logging
38
+ import os
39
  from pathlib import Path
40
  from datetime import datetime, timezone
41
  from typing import Dict, List, Optional, Any, Tuple
 
66
 
67
  # POLIS civic memory default path (relative to repo root)
68
  DEFAULT_CIVIC_MEMORY = "POLIS/polis_civic_memory.json"
69
+ POLIS_S3_BUCKET = os.environ.get("POLIS_S3_BUCKET", "elpida-external-interfaces")
70
+ POLIS_S3_REGION = os.environ.get("POLIS_S3_REGION", "eu-north-1")
71
+ POLIS_S3_KEY = os.environ.get("POLIS_S3_KEY", "proposals/polis_civic_memory.json")
72
+ _MISSING_S3_KEY_CODES = {"NoSuchKey", "404", "NotFound"}
73
 
74
  # How many cycles between POLIS bridge consultations (Fibonacci)
75
  POLIS_CYCLE_INTERVAL = 34
76
 
77
 
78
+ def _is_missing_s3_key_error(s3, exc: Exception) -> bool:
79
+ no_such_key = getattr(getattr(s3, "exceptions", None), "NoSuchKey", None)
80
+ if isinstance(no_such_key, type) and isinstance(exc, no_such_key):
81
+ return True
82
+ response = getattr(exc, "response", None)
83
+ if isinstance(response, dict):
84
+ code = response.get("Error", {}).get("Code")
85
+ return str(code) in _MISSING_S3_KEY_CODES
86
+ return False
87
+
88
+
89
  class PolisBridge:
90
  """
91
  Bridge between POLIS civic contradictions and Elpida parliament.
 
116
 
117
  self._last_processed_id: Optional[str] = None
118
  self._processed_ids: set = set()
119
+ self._s3 = None
120
 
121
  if self._path.exists():
122
  logger.info(
 
133
 
134
  def _load_memory(self) -> Dict[str, Any]:
135
  """Load the full POLIS civic memory JSON."""
136
+ s3_memory = self._load_memory_from_s3()
137
+ if s3_memory is not None:
138
+ return s3_memory
139
+
140
+ return self._load_memory_from_local()
141
+
142
+ def _load_memory_from_local(self) -> Dict[str, Any]:
143
+ """Load POLIS civic memory from the local filesystem."""
144
  if not self._path.exists():
145
  return {}
146
  try:
 
150
  logger.warning("Cannot read POLIS civic memory: %s", e)
151
  return {}
152
 
153
+ def _get_s3(self):
154
+ if self._s3 is not None:
155
+ return self._s3
156
+ sink_enabled = os.environ.get("POLIS_S3_SYNC", "1").strip().lower()
157
+ if sink_enabled in {"0", "false", "no", "off", ""}:
158
+ return None
159
+ if not (
160
+ os.environ.get("AWS_ACCESS_KEY_ID")
161
+ and os.environ.get("AWS_SECRET_ACCESS_KEY")
162
+ ):
163
+ return None
164
+ try:
165
+ import boto3
166
+ self._s3 = boto3.client("s3", region_name=POLIS_S3_REGION)
167
+ return self._s3
168
+ except Exception as e:
169
+ logger.debug("POLIS S3 client unavailable: %s", e)
170
+ return None
171
+
172
+ def _load_memory_from_s3(self) -> Optional[Dict[str, Any]]:
173
+ s3 = self._get_s3()
174
+ if not s3:
175
+ return None
176
+ try:
177
+ resp = s3.get_object(Bucket=POLIS_S3_BUCKET, Key=POLIS_S3_KEY)
178
+ payload_bytes = resp["Body"].read()
179
+ local_payload = self._read_local_memory_bytes()
180
+ if (
181
+ local_payload is not None
182
+ and local_payload != payload_bytes
183
+ and self._local_memory_newer_than(resp.get("LastModified"))
184
+ ):
185
+ try:
186
+ local_memory = json.loads(local_payload.decode("utf-8"))
187
+ self._save_memory_to_s3(local_memory)
188
+ return local_memory
189
+ except (json.JSONDecodeError, UnicodeDecodeError):
190
+ pass
191
+
192
+ payload = json.loads(payload_bytes.decode("utf-8"))
193
+ # Keep local seed in sync for fallback/startup continuity.
194
+ try:
195
+ self._path.parent.mkdir(parents=True, exist_ok=True)
196
+ with open(self._path, "w", encoding="utf-8") as f:
197
+ json.dump(payload, f, indent=2, ensure_ascii=False)
198
+ except Exception:
199
+ pass
200
+ return payload
201
+ except (json.JSONDecodeError, UnicodeDecodeError) as e:
202
+ logger.warning("POLIS S3 memory decode failed; falling back to local: %s", e)
203
+ return None
204
+ except Exception as e:
205
+ if _is_missing_s3_key_error(s3, e):
206
+ return None
207
+ logger.warning("POLIS S3 memory read failed; refusing stale local fallback: %s", e)
208
+ return {}
209
+
210
+ def _read_local_memory_bytes(self) -> Optional[bytes]:
211
+ if not self._path.exists():
212
+ return None
213
+ try:
214
+ return self._path.read_bytes()
215
+ except OSError:
216
+ return None
217
+
218
+ def _local_memory_newer_than(self, s3_last_modified: Any) -> bool:
219
+ if s3_last_modified is None:
220
+ return False
221
+ try:
222
+ local_mtime = datetime.fromtimestamp(
223
+ self._path.stat().st_mtime,
224
+ timezone.utc,
225
+ )
226
+ if s3_last_modified.tzinfo is None:
227
+ s3_last_modified = s3_last_modified.replace(tzinfo=timezone.utc)
228
+ return local_mtime > s3_last_modified
229
+ except Exception:
230
+ return False
231
+
232
+ def _save_memory_to_s3(self, memory: Dict[str, Any]) -> bool:
233
+ s3 = self._get_s3()
234
+ if not s3:
235
+ return False
236
+ try:
237
+ s3.put_object(
238
+ Bucket=POLIS_S3_BUCKET,
239
+ Key=POLIS_S3_KEY,
240
+ Body=json.dumps(memory, ensure_ascii=False, indent=2).encode("utf-8"),
241
+ ContentType="application/json; charset=utf-8",
242
+ )
243
+ return True
244
+ except Exception as e:
245
+ logger.debug("POLIS S3 write failed: %s", e)
246
+ return False
247
+
248
  def get_held_contradictions(self) -> List[Dict[str, Any]]:
249
  """Return all HELD (unresolved) contradictions from POLIS memory."""
250
  memory = self._load_memory()
 
460
  # Write back to file
461
  with open(self._path, "w", encoding="utf-8") as f:
462
  json.dump(memory, f, indent=2, ensure_ascii=False)
463
+ self._save_memory_to_s3(memory)
464
 
465
  logger.info(
466
  "POLIS branch written: %s β†’ %s (%s)",
elpidaapp/sacrifice_tracker.py CHANGED
@@ -30,9 +30,10 @@ from __future__ import annotations
30
 
31
  import json
32
  import logging
 
33
  from datetime import datetime, timezone
34
  from pathlib import Path
35
- from typing import Any, Dict, List, Optional
36
 
37
  logger = logging.getLogger("elpidaapp.sacrifice_tracker")
38
 
@@ -213,6 +214,63 @@ def create_sacrifice_tracker(
213
  # ---------------------------------------------------------------------------
214
 
215
  _GOV_SACRIFICE_LOG = Path(__file__).resolve().parent.parent / "cache" / "governance_sacrifices.jsonl"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
216
 
217
  # Mapping: sacrifice type β†’ (axiom_cost, axiom_served)
218
  # What value was traded away, and what value was protected.
@@ -237,6 +295,48 @@ class GovernanceSacrificeTracker:
237
  def __init__(self):
238
  self._count: int = 0
239
  self._type_counts: Dict[str, int] = {}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
240
 
241
  def record(
242
  self,
@@ -285,6 +385,7 @@ class GovernanceSacrificeTracker:
285
  f.write(json.dumps(entry, ensure_ascii=False) + "\n")
286
  except Exception as e:
287
  logger.debug("Gov sacrifice persist failed: %s", e)
 
288
 
289
  return entry
290
 
@@ -294,3 +395,80 @@ class GovernanceSacrificeTracker:
294
  "total": self._count,
295
  "type_counts": dict(self._type_counts),
296
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
 
31
  import json
32
  import logging
33
+ import os
34
  from datetime import datetime, timezone
35
  from pathlib import Path
36
+ from typing import Any, Dict, Iterable, List, Optional
37
 
38
  logger = logging.getLogger("elpidaapp.sacrifice_tracker")
39
 
 
214
  # ---------------------------------------------------------------------------
215
 
216
  _GOV_SACRIFICE_LOG = Path(__file__).resolve().parent.parent / "cache" / "governance_sacrifices.jsonl"
217
+ _GOV_SACRIFICE_S3_BUCKET = os.environ.get("SACRIFICE_S3_BUCKET", "elpida-body-evolution")
218
+ _GOV_SACRIFICE_S3_REGION = os.environ.get("SACRIFICE_S3_REGION", "eu-north-1")
219
+ _GOV_SACRIFICE_S3_KEY = os.environ.get("SACRIFICE_S3_KEY", "federation/sacrifice_ledger.jsonl")
220
+ _MISSING_S3_KEY_CODES = {"NoSuchKey", "404", "NotFound"}
221
+
222
+
223
+ def _is_missing_s3_key_error(s3, exc: Exception) -> bool:
224
+ no_such_key = getattr(getattr(s3, "exceptions", None), "NoSuchKey", None)
225
+ if isinstance(no_such_key, type) and isinstance(exc, no_such_key):
226
+ return True
227
+ response = getattr(exc, "response", None)
228
+ if isinstance(response, dict):
229
+ code = response.get("Error", {}).get("Code")
230
+ return str(code) in _MISSING_S3_KEY_CODES
231
+ return False
232
+
233
+
234
+ def _has_s3_credentials() -> bool:
235
+ return bool(
236
+ os.environ.get("AWS_ACCESS_KEY_ID")
237
+ and os.environ.get("AWS_SECRET_ACCESS_KEY")
238
+ )
239
+
240
+
241
+ def _merge_jsonl_payloads(*payloads: bytes) -> bytes:
242
+ """Merge JSONL bodies without dropping rows present only in local cache."""
243
+ merged_lines: List[str] = []
244
+ seen = set()
245
+ for payload in payloads:
246
+ text = payload.decode("utf-8", errors="replace")
247
+ for line in text.splitlines():
248
+ if not line.strip() or line in seen:
249
+ continue
250
+ merged_lines.append(line)
251
+ seen.add(line)
252
+ return ("\n".join(merged_lines) + ("\n" if merged_lines else "")).encode("utf-8")
253
+
254
+
255
+ def _drop_trailing_jsonl_lines(payload: bytes, lines_to_drop: Iterable[str]) -> bytes:
256
+ drop_counts: Dict[str, int] = {}
257
+ for line in lines_to_drop:
258
+ key = line.rstrip("\n")
259
+ drop_counts[key] = drop_counts.get(key, 0) + 1
260
+ if not drop_counts:
261
+ return payload
262
+
263
+ text = payload.decode("utf-8", errors="replace")
264
+ kept_reversed: List[str] = []
265
+ for line in reversed(text.splitlines()):
266
+ count = drop_counts.get(line, 0)
267
+ if count:
268
+ drop_counts[line] = count - 1
269
+ continue
270
+ kept_reversed.append(line)
271
+ kept = list(reversed(kept_reversed))
272
+ return ("\n".join(kept) + ("\n" if kept else "")).encode("utf-8")
273
+
274
 
275
  # Mapping: sacrifice type β†’ (axiom_cost, axiom_served)
276
  # What value was traded away, and what value was protected.
 
295
  def __init__(self):
296
  self._count: int = 0
297
  self._type_counts: Dict[str, int] = {}
298
+ self._s3 = None
299
+ self._pending_lines: List[str] = []
300
+ self._hydrate_from_disk()
301
+
302
+ def _hydrate_from_disk(self) -> None:
303
+ """Initialize counters from the restored governance sacrifice ledger."""
304
+ if not _GOV_SACRIFICE_LOG.exists():
305
+ return
306
+ records = 0
307
+ max_number = 0
308
+ type_counts: Dict[str, int] = {}
309
+ try:
310
+ with open(_GOV_SACRIFICE_LOG, "r", encoding="utf-8") as f:
311
+ for line in f:
312
+ line = line.strip()
313
+ if not line:
314
+ continue
315
+ try:
316
+ entry = json.loads(line)
317
+ except json.JSONDecodeError:
318
+ continue
319
+ records += 1
320
+ try:
321
+ max_number = max(
322
+ max_number,
323
+ int(entry.get("sacrifice_number", 0)),
324
+ )
325
+ except (TypeError, ValueError):
326
+ pass
327
+ sacrifice_type = entry.get("type")
328
+ if sacrifice_type:
329
+ type_counts[sacrifice_type] = (
330
+ type_counts.get(sacrifice_type, 0) + 1
331
+ )
332
+ self._count = max(records, max_number)
333
+ self._type_counts = type_counts
334
+ logger.info(
335
+ "GovernanceSacrificeTracker: loaded %d existing records from %s",
336
+ records, _GOV_SACRIFICE_LOG,
337
+ )
338
+ except Exception as e:
339
+ logger.warning("GovernanceSacrificeTracker: could not read ledger: %s", e)
340
 
341
  def record(
342
  self,
 
385
  f.write(json.dumps(entry, ensure_ascii=False) + "\n")
386
  except Exception as e:
387
  logger.debug("Gov sacrifice persist failed: %s", e)
388
+ self._queue_for_s3(entry)
389
 
390
  return entry
391
 
 
395
  "total": self._count,
396
  "type_counts": dict(self._type_counts),
397
  }
398
+
399
+ def _get_s3(self):
400
+ if self._s3 is not None:
401
+ return self._s3
402
+ sink_enabled = os.environ.get("SACRIFICE_S3_SINK", "1").strip().lower()
403
+ if sink_enabled in {"0", "false", "no", "off", ""}:
404
+ return None
405
+ if not _has_s3_credentials():
406
+ return None
407
+ try:
408
+ import boto3
409
+ self._s3 = boto3.client("s3", region_name=_GOV_SACRIFICE_S3_REGION)
410
+ return self._s3
411
+ except Exception as e:
412
+ logger.debug("Sacrifice S3 client unavailable: %s", e)
413
+ return None
414
+
415
+ def _queue_for_s3(self, entry: Dict[str, Any]) -> None:
416
+ sink_enabled = os.environ.get("SACRIFICE_S3_SINK", "1").strip().lower()
417
+ if sink_enabled in {"0", "false", "no", "off", ""}:
418
+ return
419
+ if getattr(self, "_s3", None) is None and not _has_s3_credentials():
420
+ return
421
+ if not hasattr(self, "_pending_lines"):
422
+ self._pending_lines = []
423
+ line = json.dumps(entry, ensure_ascii=False) + "\n"
424
+ self._pending_lines.append(line)
425
+
426
+ def flush_pending_to_s3(self, max_lines: int = 200) -> None:
427
+ """Flush queued sacrifice records to S3 on heartbeat cadence."""
428
+ pending = getattr(self, "_pending_lines", [])
429
+ if not pending:
430
+ return
431
+ s3 = self._get_s3()
432
+ if not s3:
433
+ return
434
+ batch_size = max(1, max_lines)
435
+ batch = pending[:batch_size]
436
+ try:
437
+ try:
438
+ resp = s3.get_object(
439
+ Bucket=_GOV_SACRIFICE_S3_BUCKET,
440
+ Key=_GOV_SACRIFICE_S3_KEY,
441
+ )
442
+ existing = resp["Body"].read()
443
+ except Exception as e:
444
+ if _is_missing_s3_key_error(s3, e):
445
+ existing = b""
446
+ else:
447
+ raise
448
+ try:
449
+ local_payload = _GOV_SACRIFICE_LOG.read_bytes()
450
+ except OSError:
451
+ local_payload = b""
452
+ local_payload = _drop_trailing_jsonl_lines(
453
+ local_payload,
454
+ pending[len(batch):],
455
+ )
456
+ pending_payload = "".join(batch).encode("utf-8")
457
+ s3.put_object(
458
+ Bucket=_GOV_SACRIFICE_S3_BUCKET,
459
+ Key=_GOV_SACRIFICE_S3_KEY,
460
+ Body=_merge_jsonl_payloads(
461
+ existing,
462
+ local_payload,
463
+ pending_payload,
464
+ ),
465
+ ContentType="application/x-ndjson",
466
+ )
467
+ del pending[: len(batch)]
468
+ except Exception as e:
469
+ logger.debug("Sacrifice S3 flush failed: %s", e)
470
+
471
+ def _append_to_s3(self, entry: Dict[str, Any]) -> None:
472
+ """Compatibility path; runtime uses heartbeat flush."""
473
+ self._queue_for_s3(entry)
474
+ self.flush_pending_to_s3(max_lines=1)
elpidaapp/scanner.py CHANGED
@@ -2,7 +2,7 @@
2
  """
3
  Problem Scanner β€” Autonomous dilemma finder for ElpidaApp.
4
 
5
- Uses Perplexity (D13 β€” Archive/External Interface) to find
6
  real-world dilemmas worth analyzing, then structures them into
7
  properly-formatted problem statements for the Divergence Engine.
8
 
@@ -56,9 +56,9 @@ SCAN_TOPICS = [
56
 
57
  class ProblemScanner:
58
  """
59
- Finds real-world policy dilemmas by searching the web via
60
- DuckDuckGo (free, no API key) and structuring results with
61
- a free LLM (Groq).
62
  """
63
 
64
  def __init__(
@@ -88,7 +88,7 @@ class ProblemScanner:
88
  scan_topic = topic or self._pick_topic(i)
89
  print(f"\nπŸ” Scanning: {scan_topic}...")
90
 
91
- # Step 1: Ask Perplexity for current real-world dilemmas (with citations)
92
  research = self._research_dilemmas(scan_topic)
93
  raw_dilemmas = research.get("text") if isinstance(research, dict) else research
94
  citations = research.get("citations", []) if isinstance(research, dict) else []
@@ -99,7 +99,7 @@ class ProblemScanner:
99
  # Step 2: Structure into a proper problem statement
100
  structured = self._structure_problem(raw_dilemmas, scan_topic)
101
  if structured:
102
- # Build source list from Perplexity citations
103
  sources = []
104
  for url in citations:
105
  if isinstance(url, str) and url.startswith("http"):
@@ -157,40 +157,47 @@ class ProblemScanner:
157
 
158
  def _research_dilemmas(self, topic: str) -> Dict[str, Any]:
159
  """
160
- Use DuckDuckGo web search + Groq to find real-world active
161
- dilemmas with genuine competing interests. Falls back to Wikipedia
162
- when DDG returns empty (HF Space frequently throttles DDG).
163
 
164
  Returns {"text": str, "citations": list[str]}.
165
  """
166
- from elpidaapp.domain_grounding import _rate_limit, _search_wikipedia
 
 
 
 
167
 
168
- # ── Step 1: free web search via DDG ──
169
  _rate_limit()
170
  query = f"{topic} policy dilemma controversy 2026"
171
- raw_results = []
172
- try:
173
- from ddgs import DDGS
174
- ddgs = DDGS()
175
- raw_results = list(ddgs.text(query, max_results=6))
176
- except Exception:
177
- pass
178
-
179
- # ── Step 1b: Wikipedia fallback when DDG returns empty ──
180
- # Mirrors domain_grounding.ground_query's two-tier strategy.
 
 
 
 
 
 
 
 
 
181
  if not raw_results:
182
  wiki_results = _search_wikipedia(topic, max_results=5)
183
- # Wikipedia API returns {title, body}; normalize to ddgs shape so
184
- # the citation/snippet code below works unchanged.
185
  for r in wiki_results:
186
- title = r.get("title", "")
187
  raw_results.append({
188
- "title": title,
189
  "body": r.get("body", ""),
190
- "href": (
191
- f"https://en.wikipedia.org/wiki/{title.replace(' ', '_')}"
192
- if title else ""
193
- ),
194
  })
195
 
196
  # Extract URLs as citations and build context
 
2
  """
3
  Problem Scanner β€” Autonomous dilemma finder for ElpidaApp.
4
 
5
+ Uses live web grounding (Brave/DDG/Wikipedia) to find
6
  real-world dilemmas worth analyzing, then structures them into
7
  properly-formatted problem statements for the Divergence Engine.
8
 
 
56
 
57
  class ProblemScanner:
58
  """
59
+ Finds real-world policy dilemmas by searching live web sources
60
+ (Brave API when available, then DuckDuckGo, then Wikipedia)
61
+ and structuring results with an LLM.
62
  """
63
 
64
  def __init__(
 
88
  scan_topic = topic or self._pick_topic(i)
89
  print(f"\nπŸ” Scanning: {scan_topic}...")
90
 
91
+ # Step 1: collect current real-world dilemmas from live web context
92
  research = self._research_dilemmas(scan_topic)
93
  raw_dilemmas = research.get("text") if isinstance(research, dict) else research
94
  citations = research.get("citations", []) if isinstance(research, dict) else []
 
99
  # Step 2: Structure into a proper problem statement
100
  structured = self._structure_problem(raw_dilemmas, scan_topic)
101
  if structured:
102
+ # Build source list from collected citations
103
  sources = []
104
  for url in citations:
105
  if isinstance(url, str) and url.startswith("http"):
 
157
 
158
  def _research_dilemmas(self, topic: str) -> Dict[str, Any]:
159
  """
160
+ Use live web search + Groq to find real-world active dilemmas
161
+ with genuine competing interests. Search order:
162
+ Brave API (if BRAVE_API_KEY) -> DuckDuckGo -> Wikipedia.
163
 
164
  Returns {"text": str, "citations": list[str]}.
165
  """
166
+ from elpidaapp.domain_grounding import (
167
+ _rate_limit,
168
+ _search_brave,
169
+ _search_wikipedia,
170
+ )
171
 
172
+ # ── Step 1: live web search (Brave -> DDG -> Wikipedia fallback) ──
173
  _rate_limit()
174
  query = f"{topic} policy dilemma controversy 2026"
175
+ raw_results = [
176
+ {
177
+ "title": r.get("title", ""),
178
+ "body": r.get("body", ""),
179
+ "href": r.get("href", ""),
180
+ }
181
+ for r in _search_brave(query, max_results=6)
182
+ ]
183
+
184
+ # DDG fallback if Brave unavailable/empty
185
+ if not raw_results:
186
+ try:
187
+ from ddgs import DDGS
188
+ ddgs = DDGS()
189
+ raw_results = list(ddgs.text(query, max_results=6))
190
+ except Exception:
191
+ pass
192
+
193
+ # Wikipedia fallback when web search returns empty.
194
  if not raw_results:
195
  wiki_results = _search_wikipedia(topic, max_results=5)
 
 
196
  for r in wiki_results:
 
197
  raw_results.append({
198
+ "title": r.get("title", ""),
199
  "body": r.get("body", ""),
200
+ "href": r.get("href", ""),
 
 
 
201
  })
202
 
203
  # Extract URLs as citations and build context
elpidaapp/ui.py CHANGED
@@ -988,7 +988,13 @@ with tab_audit:
988
  baseline_provider=_baseline,
989
  )
990
  with st.spinner(f"Running {preset_choice} analysis across {len(_domain_ids)} domains..."):
991
- result = engine.analyze(problem)
 
 
 
 
 
 
992
 
993
  # Resume BODY loop
994
  try:
@@ -1116,7 +1122,7 @@ with tab_scanner:
1116
  st.markdown("### Scanner β€” Problem Discovery")
1117
  st.markdown("""
1118
  <div class="mode-intro">
1119
- Enter a topic or let Elpida choose. D13 (Archive / Perplexity) finds current
1120
  real-world problems, then the Divergence Engine runs multi-domain analysis.
1121
  </div>
1122
  """, unsafe_allow_html=True)
 
988
  baseline_provider=_baseline,
989
  )
990
  with st.spinner(f"Running {preset_choice} analysis across {len(_domain_ids)} domains..."):
991
+ _audit_out = (
992
+ Path(__file__).resolve().parent.parent
993
+ / "cache"
994
+ / "audits"
995
+ / f"audit_{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}.json"
996
+ )
997
+ result = engine.analyze(problem, save_to=str(_audit_out))
998
 
999
  # Resume BODY loop
1000
  try:
 
1122
  st.markdown("### Scanner β€” Problem Discovery")
1123
  st.markdown("""
1124
  <div class="mode-intro">
1125
+ Enter a topic or let Elpida choose. D13 live grounding (Brave/DDG/Wikipedia) finds current
1126
  real-world problems, then the Divergence Engine runs multi-domain analysis.
1127
  </div>
1128
  """, unsafe_allow_html=True)