"""gr.Server + FastAPI app. v1: FastAPI only; gr.Server mount added later.""" from __future__ import annotations import uuid import hashlib from dataclasses import asdict from datetime import date from fastapi import FastAPI, HTTPException, Response from fastapi.responses import StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from .api import deps from .case_state import CaseState, Deposition, Turn from .fusion import recompute as _recompute, deliver_verdict from .prompts import witness_system_prompt from .persistence import record_verdict, read_ledger class StartSessionReq(BaseModel): case_id: str class InterrogateReq(BaseModel): witness_id: str question: str class DeposeReq(BaseModel): witness_id: str class DiscountReq(BaseModel): witness_id: str alpha: float class MethodReq(BaseModel): method: str class VerdictReq(BaseModel): suspect: str class CardReq(BaseModel): session_id: str def _state_dict(s: CaseState) -> dict: d = asdict(s) if d.get("combined_mass") is not None: d["combined_mass"] = {",".join(sorted(k)): v for k, v in s.combined_mass.items()} # Stringify deposition mass keys for JSON serialization if d.get("depositions"): d["depositions"] = [ {**dep, "mass": {",".join(sorted(k)): v for k, v in src.mass.items()}} for dep, src in zip(d["depositions"], s.depositions) ] return d def build_app(use_fake_witness: bool = False) -> FastAPI: app = FastAPI(title="Dempster's Court") if use_fake_witness: deps.set_witness_engine(_FakeWitness()) @app.get("/api/cases") def list_cases(): return [ { "case_id": c.case_id, "title": c.title, "victim": c.victim, "dock": c.dock, "court_days": c.court_days, "threshold": c.threshold, "setting_blurb": c.setting_blurb, "methods_available": c.methods_available, } for c in deps.cases().values() ] @app.get("/api/cases/{case_id}") def get_case(case_id: str): c = deps.cases().get(case_id) if c is None: raise HTTPException(404, "unknown case") return { "case_id": c.case_id, "title": c.title, "victim": c.victim, "setting_blurb": c.setting_blurb, "dock": c.dock, "court_days": c.court_days, "threshold": c.threshold, "methods_available": c.methods_available, "witnesses": [ {"id": w.id, "name": w.name, "portrait_ref": w.portrait_ref, "cost_days": w.cost_days, "is_physical_evidence": w.is_physical_evidence} for w in c.witnesses ], } @app.post("/api/sessions") def start_session(req: StartSessionReq): cs = deps.cases().get(req.case_id) if cs is None: raise HTTPException(404, "unknown case") sid = uuid.uuid4().hex[:12] deps.sessions()[sid] = CaseState.start(sid, cs) return {"session_id": sid, "case_id": req.case_id} @app.get("/api/sessions/{sid}") def get_session(sid: str): s = deps.sessions().get(sid) if s is None: raise HTTPException(404, "unknown session") return _state_dict(s) @app.post("/api/sessions/{sid}/interrogate") def interrogate(sid: str, req: InterrogateReq): s = deps.sessions().get(sid) if s is None: raise HTTPException(404, "unknown session") case = deps.cases()[s.case_id] w = next((w for w in case.witnesses if w.id == req.witness_id), None) if w is None: raise HTTPException(404, "unknown witness") engine = deps.witness_engine() if engine is None: raise HTTPException(503, "witness engine not loaded") s.transcripts.setdefault(w.id, []).append(Turn(role="user", content=req.question)) system = witness_system_prompt(case, w) turns = s.transcripts[w.id] def event_stream(): # Prime the connection through HF's nginx proxy with 2KB of padding + # an initial empty data event. Otherwise nginx buffers the whole stream. yield ":" + (" " * 2048) + "\n\n" yield "event: thinking\ndata: \n\n" chunks: list[str] = [] for piece in engine.stream_interrogation(system, turns): chunks.append(piece) # Replace any embedded newlines so SSE framing stays valid safe = piece.replace("\n", "\\n") yield f"data: {safe}\n\n" full = "".join(chunks) s.transcripts[w.id].append(Turn(role="assistant", content=full)) yield "event: end\ndata: \n\n" return StreamingResponse( event_stream(), media_type="text/event-stream", headers={ "X-Accel-Buffering": "no", "Cache-Control": "no-cache, no-transform", "Connection": "keep-alive", }, ) @app.post("/api/sessions/{sid}/depose") def depose(sid: str, req: DeposeReq): s = deps.sessions().get(sid) if s is None: raise HTTPException(404, "unknown session") case = deps.cases()[s.case_id] w = next((w for w in case.witnesses if w.id == req.witness_id), None) if w is None: raise HTTPException(404, "unknown witness") engine = deps.witness_engine() if engine is None: raise HTTPException(503, "witness engine not loaded") system = witness_system_prompt(case, w) turns = s.transcripts.get(w.id, []) try: _, summary = engine.take_deposition(system, turns, case.dock) except Exception: summary = "(no statement given)" # STORY MODE: use authored target_mass for the math; summary only from model mass = w.target_mass if w.target_mass else {frozenset(case.dock): 1.0} dep = Deposition(witness_id=w.id, mass=mass, summary=summary, alpha=0.0) s.depositions = [d for d in s.depositions if d.witness_id != w.id] + [dep] s.days_remaining = max(0, s.days_remaining - w.cost_days) _recompute(s, case) return { "deposition": { "witness_id": dep.witness_id, "summary": dep.summary, "mass": {",".join(sorted(k)): v for k, v in dep.mass.items()}, }, "state": _state_dict(s), } @app.post("/api/sessions/{sid}/discount") def set_discount(sid: str, req: DiscountReq): s = deps.sessions().get(sid) if s is None: raise HTTPException(404, "unknown session") case = deps.cases()[s.case_id] for d in s.depositions: if d.witness_id == req.witness_id: d.alpha = max(0.0, min(1.0, req.alpha)) _recompute(s, case) return {"state": _state_dict(s)} @app.post("/api/sessions/{sid}/method") def set_method(sid: str, req: MethodReq): s = deps.sessions().get(sid) if s is None: raise HTTPException(404, "unknown session") case = deps.cases()[s.case_id] if req.method not in case.methods_available: raise HTTPException(400, f"method {req.method!r} not unlocked for this case") s.active_method = req.method # type: ignore[assignment] _recompute(s, case) return {"state": _state_dict(s)} @app.post("/api/sessions/{sid}/verdict") def deliver(sid: str, req: VerdictReq): s = deps.sessions().get(sid) if s is None: raise HTTPException(404, "unknown session") case = deps.cases()[s.case_id] outcome = deliver_verdict(s, case, req.suspect) # comparison panel: what would each method have done? comparison = {} original_method = s.active_method for m in case.methods_available: s.active_method = m # type: ignore[assignment] _recompute(s, case) comparison[m] = {sp: round(s.bel.get(sp, 0.0), 4) for sp in case.dock} s.active_method = original_method _recompute(s, case) record_verdict(case.case_id, req.suspect, s.verdict.method, outcome, s.verdict.suspicion) return { "outcome": outcome, "ground_truth": {"culprit": case.ground_truth.culprit, "accomplice": case.ground_truth.accomplice}, "verdict": {"suspect": req.suspect, "method": s.verdict.method, "suspicion": s.verdict.suspicion, "threshold": case.threshold}, "method_comparison": comparison, "state": _state_dict(s), } @app.get("/api/ledger") def get_ledger(): return read_ledger() @app.post("/api/verdict-cards") def make_card(req: CardReq): from .verdict_card import render_png s = deps.sessions().get(req.session_id) if s is None or s.verdict is None or s.bel is None: raise HTTPException(400, "no verdict yet") case = deps.cases()[s.case_id] png = render_png( title=f"{case.title}", suspect=s.verdict.suspect, method=s.verdict.method, suspicion=s.verdict.suspicion, shadow=s.pl.get(s.verdict.suspect, 0.0), outcome=s.outcome or "mistrial", ) return Response(content=png, media_type="image/png", headers={"Content-Disposition": "inline; filename=verdict.png"}) @app.post("/api/endless") def start_endless(): from .procedural import generate_interesting c = generate_interesting(seed_start=int.from_bytes(uuid.uuid4().bytes[:4], "big")) case_obj = deps.add_case_from_dict(c) sid = uuid.uuid4().hex[:12] deps.sessions()[sid] = CaseState.start(sid, case_obj) return {"session_id": sid, "case_id": case_obj.case_id} @app.post("/api/daily") def start_daily(): from .procedural import generate_interesting seed = int(hashlib.sha1(date.today().isoformat().encode()).hexdigest()[:8], 16) case_id = f"daily_{date.today().isoformat()}" if case_id not in deps.cases(): c = generate_interesting(seed_start=seed) c["case_id"] = case_id c["title"] = f"Daily Case — {date.today().isoformat()}" deps.add_case_from_dict(c) sid = uuid.uuid4().hex[:12] deps.sessions()[sid] = CaseState.start(sid, deps.cases()[case_id]) return {"session_id": sid, "case_id": case_id} # frontend static (mounted last so /api/* wins) try: app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend") except RuntimeError: pass return app class _FakeWitness: """A test/dev witness that returns canned in-character text + a YAML-derived mass.""" def stream_interrogation(self, system_prompt, turns): yield "I have nothing further to say at this time." def take_deposition(self, system_prompt, turns, dock): return {frozenset(dock): 1.0}, "..."