"""HuggingWizards — co-op pixel wizard arena with a Nemotron-4B Game Master. Single process: * FastAPI serves the canvas client (`/`) and a WebSocket (`/ws`). * A ~20 Hz asyncio task runs the authoritative simulation on CPU and broadcasts snapshots to every connected client (players + spectators). * At each round boundary the Game Master (Nemotron, burst GPU) decides rewards and the next round; every decision is logged as an agent trace. * A Gradio dashboard is mounted at `/dashboard` (live traces + leaderboard). """ from __future__ import annotations import asyncio import contextlib import json import os import time import uuid # Gradio 5+ auto-enables SSR on Spaces: a Node server takes port 7860 and # pushes the Python server to 7861, colliding with our own uvicorn bind # ("[Errno 98] address already in use"). We serve Gradio mounted INSIDE # FastAPI, so SSR must be off. Set before gradio import; assignment (not # setdefault) because Spaces exports GRADIO_SSR_MODE=true. os.environ["GRADIO_SSR_MODE"] = "false" # On ZeroGPU, `spaces` must be imported before any CUDA-touching library. with contextlib.suppress(Exception): # not installed in local dev import spaces # noqa: F401 import gradio as gr from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from game import gamemaster from game.engine import Engine, MAX_PLAYERS, TICK_HZ HERE = os.path.dirname(__file__) engine = Engine() # Active websocket connections: ws -> {"pid", "role", "name", "char"} clients: dict[WebSocket, dict] = {} # Ordered waiting line for when the 8-slot arena is full. queue: list[WebSocket] = [] # ---- persistent leaderboard ---------------------------------------------- LEADERBOARD_FILE = os.path.join(HERE, "traces", "leaderboard.json") _leaderboard: list[dict] = [] def _load_leaderboard(): global _leaderboard try: with open(LEADERBOARD_FILE, encoding="utf-8") as f: _leaderboard = json.load(f) except Exception: _leaderboard = [] def _save_leaderboard(): with contextlib.suppress(Exception): with open(LEADERBOARD_FILE, "w", encoding="utf-8") as f: json.dump(_leaderboard[:100], f, indent=2) def add_score(name: str, gold: int, level: int, wave: int) -> list[dict]: _leaderboard.append({ "name": str(name)[:16] or "Wizard", "gold": int(gold), "level": int(level), "wave": int(wave), "ts": time.strftime("%Y-%m-%d %H:%M"), }) _leaderboard.sort(key=lambda e: (e["gold"], e["wave"], e["level"]), reverse=True) del _leaderboard[100:] _save_leaderboard() return _leaderboard[:20] _load_leaderboard() async def broadcast(payload: dict): if not clients: return msg = json.dumps(payload) dead = [] for ws in list(clients.keys()): try: await ws.send_text(msg) except Exception: dead.append(ws) for ws in dead: _drop(ws) def _drop(ws: WebSocket): info = clients.pop(ws, None) if info and info.get("pid"): engine.remove_player(info["pid"]) if not engine.players and engine.status != "lobby": engine.status = "lobby" engine.status_msg = "All wizards left. Waiting for players..." async def _send(ws: WebSocket, payload: dict): with contextlib.suppress(Exception): await ws.send_text(json.dumps(payload)) async def handle_skill_request(ws: WebSocket, pid: str, prompt: str): """The round's top wizard asks the Game Master for a custom power-up.""" if engine.status != "intermission" or engine.skill_request_used \ or engine.skill_request_pid != pid or not prompt.strip(): await _send(ws, {"type": "skill_result", "ok": False, "reason": "Not available right now."}) return engine.skill_request_used = True # one wish per intermission ctx = {"round": engine.round, "player_level": engine.players[pid].level if pid in engine.players else 1} spec = await asyncio.to_thread(gamemaster.generate_skill, prompt, ctx) granted = engine.add_runtime_skill(pid, spec) if spec else None await _send(ws, {"type": "skill_result", "ok": bool(granted), "skill": granted, "reason": None if granted else "The arcane energies fizzled — try again."}) def _ws_for_pid(pid: str): for ws, info in clients.items(): if info.get("pid") == pid: return ws return None async def _broadcast_queue(): """Tell each queued connection its current place in line.""" for i, ws in enumerate(list(queue)): await _send(ws, {"type": "queue", "pos": i + 1, "size": len(queue)}) async def promote_queue(): """Fill open arena slots from the front of the queue.""" while engine.can_join() and queue: ws = queue.pop(0) info = clients.get(ws) if info is None: # disconnected while waiting continue pid = uuid.uuid4().hex[:8] engine.add_player(pid, info.get("name", "Wizard"), info.get("char", "warrior")) clients[ws] = {**info, "pid": pid, "role": "player"} await _send(ws, {"type": "welcome", "pid": pid, "role": "player", "reason": "from_queue"}) async def manage_slots(): """Eliminate out-of-lives wizards, then promote the queue into free slots.""" for pid, p in list(engine.players.items()): if p.lives <= 0 and not p.alive: ws = _ws_for_pid(pid) engine.remove_player(pid) if ws is not None: info = clients.get(ws, {}) clients[ws] = {**info, "pid": None, "role": "spectator"} await _send(ws, {"type": "eliminated", "score": {"gold": p.gold, "level": p.level, "wave": engine.round}}) if not engine.players and engine.status != "lobby": engine.status = "lobby" engine.status_msg = "Waiting for wizards to join..." await promote_queue() await _broadcast_queue() async def game_loop(): last = time.time() prev_status = engine.status gm_task: asyncio.Task | None = None gm_applied = False interval = 1.0 / TICK_HZ slot_accum = 0.0 while True: now = time.time() dt = min(0.1, now - last) last = now engine.tick(dt) # eliminate / promote queue / broadcast positions a few times a second slot_accum += dt if slot_accum >= 0.4: slot_accum = 0.0 await manage_slots() # Round just ended -> ask the Game Master (off the loop thread). if engine.status == "intermission" and prev_status == "active": summary = engine.round_summary() prev_cfg = dict(engine.cfg) gm_task = asyncio.create_task( asyncio.to_thread(gamemaster.decide, summary, prev_cfg) ) gm_applied = False # Apply the decision as soon as it's ready (enables shopping this break). if gm_task is not None and gm_task.done() and not gm_applied: with contextlib.suppress(Exception): engine.apply_gm_decision(gm_task.result()) gm_applied = True # Start the next round once the break is over AND the decision is in. if engine.status == "intermission" and gm_applied and time.time() >= engine.intermission_until: if engine.players: engine.start_round() else: engine.status = "lobby" engine.status_msg = "Waiting for wizards to join..." gm_task = None gm_applied = False prev_status = engine.status await broadcast(engine.snapshot()) await asyncio.sleep(max(0, interval - (time.time() - now))) # -------------------------------------------------------------------------- # FastAPI app # -------------------------------------------------------------------------- app = FastAPI(title="HuggingWizards") _loop_task: asyncio.Task | None = None def ensure_loop(): """Start the game loop once. Robust to Gradio overriding the app lifespan.""" global _loop_task if _loop_task is None or _loop_task.done(): _loop_task = asyncio.create_task(game_loop()) @app.get("/") async def index(): return FileResponse(os.path.join(HERE, "static", "index.html")) @app.get("/traces") async def traces(): """Recent Game Master agent traces, trimmed for the in-game panel.""" out = [] for t in gamemaster.RECENT_TRACES[:15]: out.append({ "trace_id": t.get("trace_id"), "round": t.get("round"), "ts": t.get("ts"), "model": t.get("model"), "source": t.get("source"), "latency_sec": t.get("latency_sec"), "kind": t.get("kind", "round_decision"), "error": t.get("error"), "mercy": t.get("mercy"), "raw_output": str(t.get("raw_output", ""))[:600], "decision": t.get("decision"), }) return {"traces": out} @app.websocket("/ws") async def ws_endpoint(ws: WebSocket): await ws.accept() ensure_loop() clients[ws] = {"pid": None, "role": "spectator", "name": "Wizard", "char": "warrior"} try: while True: data = json.loads(await ws.receive_text()) mtype = data.get("type") if mtype == "join": name = str(data.get("name", "Wizard"))[:16] or "Wizard" char = str(data.get("char", "warrior"))[:16] or "soldier" clients[ws]["name"] = name clients[ws]["char"] = char if clients[ws]["pid"]: continue if engine.can_join(): pid = uuid.uuid4().hex[:8] engine.add_player(pid, name, char) clients[ws] = {**clients[ws], "pid": pid, "role": "player"} await _send(ws, {"type": "welcome", "pid": pid, "role": "player"}) else: # game full -> join the queue if ws not in queue: queue.append(ws) pos = queue.index(ws) + 1 await _send(ws, {"type": "welcome", "pid": None, "role": "spectator", "reason": "queued", "pos": pos, "size": len(queue)}) elif mtype == "spectate": clients[ws]["name"] = str(data.get("name", "Wizard"))[:16] or "Wizard" clients[ws]["char"] = str(data.get("char", "warrior"))[:16] or "soldier" elif mtype == "leave_queue": if ws in queue: queue.remove(ws) elif mtype == "get_leaderboard": await _send(ws, {"type": "leaderboard", "top": _leaderboard[:20]}) elif mtype == "submit_score": top = add_score(str(data.get("name", "Wizard")), int(data.get("gold", 0)), int(data.get("level", 1)), int(data.get("wave", 0))) await _send(ws, {"type": "leaderboard", "top": top, "submitted": True}) elif mtype == "input" and clients[ws]["pid"]: engine.set_input(clients[ws]["pid"], data) elif mtype == "choose_card" and clients[ws]["pid"]: engine.choose_card(clients[ws]["pid"], data.get("key", "")) elif mtype == "request_skill" and clients[ws]["pid"]: await handle_skill_request(ws, clients[ws]["pid"], str(data.get("prompt", ""))) elif mtype == "start" and clients[ws]["pid"]: if engine.status == "lobby": engine.start_round() except WebSocketDisconnect: pass except Exception: pass finally: if ws in queue: queue.remove(ws) _drop(ws) app.mount("/static", StaticFiles(directory=os.path.join(HERE, "static")), name="static") # -------------------------------------------------------------------------- # Gradio dashboard (mounted at /dashboard) # -------------------------------------------------------------------------- def _dashboard_state(): players = sorted(engine.players.values(), key=lambda p: p.gold, reverse=True) if players: lb = "| Wizard | Level | Gold | Last-round dmg | Kills |\n|---|---|---|---|---|\n" lb += "\n".join( f"| {p.name} | {p.level} | {p.gold} | {round(p.dmg_dealt)} | {p.kills} |" for p in players ) else: lb = "_No wizards in the arena yet._" header = (f"### Round {engine.round} — `{engine.status}`\n" f"{engine.status_msg}\n\n" f"Players: {len(engine.players)}/{MAX_PLAYERS} · " f"GM: _{engine.gm_message or '—'}_") traces = gamemaster.RECENT_TRACES[:10] return header, lb, traces with gr.Blocks(title="HuggingWizards — Game Master Dashboard") as demo: gr.Markdown("# 🧙 HuggingWizards — Game Master Dashboard\n" "Play at the [arena](/). Below: live state, leaderboard, and the " "per-round **agent traces** produced by Nemotron-4B.") header_md = gr.Markdown() with gr.Row(): leaderboard_md = gr.Markdown() gr.Markdown("## Agent traces (most recent first)") traces_json = gr.JSON() timer = gr.Timer(2.0) timer.tick(_dashboard_state, outputs=[header_md, leaderboard_md, traces_json]) demo.load(_dashboard_state, outputs=[header_md, leaderboard_md, traces_json]) app = gr.mount_gradio_app(app, demo, path="/dashboard", ssr_mode=False) # ---- ZeroGPU startup report ------------------------------------------------ # The `spaces` package sends its "this app has @spaces.GPU functions" report # to the ZeroGPU controller via a hook it installs on gr.Blocks.launch(). We # serve everything through uvicorn instead of demo.launch(), so that hook # never fires and ZeroGPU kills the Space with "No @spaces.GPU function # detected during startup". Invoke the same startup task manually. It packs # the (already loaded) model for fast GPU transfer and phones home; off # Spaces, `spaces.zero` doesn't define `startup` and this is a no-op. with contextlib.suppress(Exception): from spaces import zero as _spaces_zero # type: ignore if hasattr(_spaces_zero, "startup"): _spaces_zero.startup() print("[app] ZeroGPU startup report sent") if __name__ == "__main__": import uvicorn # Spaces expects the app on 7860 (it exports GRADIO_SERVER_PORT). port = int(os.environ.get("PORT") or os.environ.get("GRADIO_SERVER_PORT") or 7860) uvicorn.run(app, host="0.0.0.0", port=port)