import os
import time
import uuid
import json
import html
from pathlib import Path
import gradio as gr
try:
import spaces # only available on Hugging Face Spaces runtime
except ImportError:
# Provide a dummy so @spaces.GPU(duration=...) is a no-op locally
# while keeping the decorator statically visible in the source code.
class _spaces:
@staticmethod
def GPU(duration=300):
return lambda fn: fn
spaces = _spaces
from app.services.retrieval import load_games_dataset, normalize_game_record, retrieve_examples
from app.services.generator import (
generate_game, generate_game_with_model, build_generation_prompt,
unload_nemotron,
)
from app.services.validator import validate_game, repair_game
from app.services.schema_validator import create_minimal_game_template
from app.services.tracing import log_event, log_generation_trace, load_events
from app.services.journal import (
create_journal_entry, save_journal_entry, summarize_journal,
load_journal_entries, detect_mood, assess_story_value,
)
from app.services.scoring import compute_scores, HINT_PENALTY
from app.services.story import build_story_packet, generate_story
from app.services.image_gen import generate_poster_sync
# ── Voice-journal language options ─────────────────────────────────────────
# Shown on the Create Lobby screen as full names; mapped to the ISO codes the
# ASR service expects before being routed into the play-screen audio handlers.
LANGUAGE_CHOICES = [
"English",
"French",
"Spanish",
"German",
"Italian",
"Portuguese",
"Dutch",
"Japanese",
"Chinese (Mandarin)",
"Arabic",
"Hindi",
"Korean",
"Russian",
"Turkish",
"Polish",
]
LANGUAGE_TO_ISO = {
"English": "en", "French": "fr", "Spanish": "es",
"German": "de", "Italian": "it", "Portuguese": "pt",
"Dutch": "nl", "Japanese": "ja", "Chinese (Mandarin)": "zh",
"Arabic": "ar", "Hindi": "hi", "Korean": "ko",
"Russian": "ru", "Turkish": "tr", "Polish": "pl",
}
# ── Load datasets once on startup ──────────────────────────────────────────
BASE_DIR = Path(__file__).resolve().parent
DATASET_PATHS = {
"old": BASE_DIR / "app/data/games_dataset.json",
"scavenger_hunt": BASE_DIR / "app/data/scavenger_hunt/dataset.json",
"hide_and_seek": BASE_DIR / "app/data/hide_and_seek/dataset.json",
"tag": BASE_DIR / "app/data/tag/dataset.json",
}
DATA_RECORDS = []
for src, path in DATASET_PATHS.items():
try:
raw = load_games_dataset(str(path))
normalized = [normalize_game_record(r) for r in raw]
DATA_RECORDS.extend(normalized)
print(f"[OK] Loaded {len(normalized)} records from {src} dataset ({path.name})")
except FileNotFoundError:
print(f"[!] Dataset not found at {path}, skipping {src}")
except Exception as e:
print(f"[!] Error loading {src} dataset from {path}: {e}")
print(f"[OK] Total: {len(DATA_RECORDS)} game records loaded for retrieval")
# ── Shared multiplayer state lives in app.services.rooms ───────────────────
# SESSION_STORE / ADVENTURE_CODES are owned by the rooms module so the lobby,
# task wall and wait-lobby all read/write the same in-process truth.
from app.services import rooms
from app.services.rooms import SESSION_STORE, ADVENTURE_CODES, save_state, load_state
# ── HF Spaces Zero GPU generation ──────────────────────────────────────────
# This function is statically decorated so HF Spaces detects the GPU
# requirement at startup. The 2.84 GB GGUF model is lazily downloaded
# on first call inside the GPU context.
@spaces.GPU(duration=300)
def _generate_with_gpu(config: dict, retrieved: list[dict]):
"""Generate game using GPU (Nemotron 3 Nano 4B via llama.cpp).
Runs inside @spaces.GPU so CUDA is available. The model download
and llama.cpp initialisation happen lazily on first call.
"""
import json
prompt = build_generation_prompt(config, retrieved)
json_str = generate_game_with_model(prompt, model_name="nemotron")
if json_str:
try:
game = json.loads(json_str)
if all(field in game for field in ["game_id", "title", "setup", "tasks", "safety"]):
return game
except json.JSONDecodeError:
pass
return None
def _generate_game(config: dict, retrieved: list[dict]):
"""Try GPU generation; fall back to CPU/mock if unavailable."""
if spaces is not None:
game = _generate_with_gpu(config, retrieved)
if game is not None:
return game
print("[app] GPU generation returned None, falling back to CPU/mock")
return generate_game(config, retrieved)
# ── Pipeline entry point ──────────────────────────────────────────────────────
def run_pipeline(
game_type: str,
city: str,
area: str,
location_type: str,
duration_minutes: int,
num_players: int,
difficulty: str,
age_group: str,
energy_level: str,
):
"""Run the full AI generation pipeline end-to-end."""
session_id = str(uuid.uuid4())
config = {
"game_type": game_type,
"city": city or "Paris",
"area": area or "Downtown",
"location_type": location_type,
"duration_minutes": int(duration_minutes),
"num_players": int(num_players),
"difficulty": difficulty,
"age_group": age_group,
"energy_level": energy_level,
"photo_enabled": True,
# New fields for enhanced retrieval scoring
"landscape_tags": [],
"theme": "",
"mobility": "standard",
"allow_transport": False,
}
state = {}
# 1 ── Retrieval
state["num_retrieved"] = 0
if DATA_RECORDS:
retrieved = retrieve_examples(config, DATA_RECORDS, k=3)
state["num_retrieved"] = len(retrieved)
state["retrieved_ids"] = [r["id"] for r in retrieved]
else:
retrieved = []
# 2 ── Generation (GPU if available, CPU/mock fallback otherwise)
game = _generate_game(config, retrieved)
state["game_id"] = game["game_id"]
state["game_title"] = game["title"]
# Free GPU memory — Nemotron is no longer needed after generation.
# This makes room for FLUX poster / Cohere ASR later.
unload_nemotron()
# 3 ── Validation
is_valid, failures = validate_game(game, config)
state["validation_passed"] = is_valid
state["validation_failures"] = failures
# 4 ── Repair (if needed)
repaired = None
if not is_valid:
repaired = repair_game(game, failures, config)
state["repair_applied"] = True
is_valid2, failures2 = validate_game(repaired, config)
state["repair_valid"] = is_valid2
state["remaining_failures"] = failures2
else:
state["repair_applied"] = False
final_game = repaired if repaired is not None else game
# Stamp the selected game type onto the game so it's recorded explicitly
# (the model output / schema don't carry it) and can't drift from config.
final_game["game_type"] = config.get("game_type", "scavenger_hunt")
# 5 ── Log generation trace
log_generation_trace(
session_id=session_id,
config=config,
retrieved_examples=retrieved,
game=final_game,
validation_passed=is_valid or (repaired is not None and state.get("repair_valid", False)),
validation_failures=failures,
repaired_game=repaired,
)
# 6 ── Reveal all tasks as events
for task in final_game.get("tasks", []):
log_event(session_id, "task_revealed", {
"task_id": task["task_id"],
"title": task["title"],
"points": task["points"],
})
# 7 ── Store session
SESSION_STORE[session_id] = {
"config": config,
"game": final_game,
"events": [],
"journals": [],
}
# 8 ── Build summary text
summary = build_summary(final_game, state, session_id)
return summary, session_id
def generate_for_session(session_id: str) -> dict:
"""Generate (and validate/repair) a game for an *existing* lobby session.
Used when the host presses Start: the room/players already exist, so we
only run retrieval → generation → validation → repair, store the game on
the session, and reveal the tasks. Returns the final game dict.
Reuses the same pipeline pieces as :func:`run_pipeline`.
"""
session = SESSION_STORE.get(session_id)
if not session:
raise ValueError("Unknown session")
# Idempotent: once a quest exists for this session, never re-roll it. A
# second call (double-clicked Start, a poll race, a reload mid-flow) must
# return the SAME tasks — players completing a task should never see a
# different set of tasks appear under them.
if session.get("game"):
return session["game"]
config = session["config"]
retrieved = retrieve_examples(config, DATA_RECORDS, k=3) if DATA_RECORDS else []
game = _generate_game(config, retrieved)
unload_nemotron() # free GPU for ASR / MiniCPM / FLUX
is_valid, failures = validate_game(game, config)
repaired = repair_game(game, failures, config) if not is_valid else None
final_game = repaired if repaired is not None else game
# Record the selected game type on the game object (authoritative = config).
final_game["game_type"] = config.get("game_type", "scavenger_hunt")
log_generation_trace(
session_id=session_id, config=config, retrieved_examples=retrieved,
game=final_game,
validation_passed=is_valid or (repaired is not None),
validation_failures=failures, repaired_game=repaired,
)
for task in final_game.get("tasks", []):
log_event(session_id, "task_revealed", {
"task_id": task["task_id"], "title": task["title"],
"points": task["points"],
})
session["game"] = final_game
# Persist immediately so a process restart (e.g. dev hot-reload) between
# generation and begin_play() can't lose the quest and trigger a re-roll.
save_state()
return final_game
# ── Phase 3: Gameplay helpers ─────────────────────────────────────────────────
def complete_task(session_id: str, task_id: str, team_id: str = "team-a"):
"""Log a task completion event and return updated scoreboard."""
if session_id not in SESSION_STORE:
return "⚠ Unknown session"
ev = log_event(session_id, "task_completed", {
"task_id": task_id,
"summary": f"Team {team_id} completed {task_id}",
}, team_id=team_id)
SESSION_STORE[session_id]["events"].append(ev)
return f"✅ Task {task_id} completed!"
def skip_task(session_id: str, task_id: str, team_id: str = "team-a"):
"""Log a task skip event."""
if session_id not in SESSION_STORE:
return "⚠ Unknown session"
ev = log_event(session_id, "task_skipped", {
"task_id": task_id,
"summary": f"Team {team_id} skipped {task_id}",
}, team_id=team_id)
SESSION_STORE[session_id]["events"].append(ev)
return f"⏭️ Task {task_id} skipped."
def use_hint(session_id: str, task_id: str, team_id: str = "team-a",
question: str = "", answer: str = ""):
"""Log a hint usage event and return the team's new hint count for this task.
The question/answer are stored on the event so the detail view can show a
hint history. Scoring (``compute_scores``) deducts ``HINT_PENALTY`` per
``hint_used`` event, so no separate score mutation is needed here."""
if session_id not in SESSION_STORE:
return 0
ev = log_event(session_id, "hint_used", {
"task_id": task_id,
"question": (question or "").strip(),
"answer": (answer or "").strip(),
"summary": f"Team {team_id} used a hint for {task_id}",
}, team_id=team_id)
SESSION_STORE[session_id]["events"].append(ev)
return rooms.hints_used_count(session_id, team_id, task_id)
@spaces.GPU(duration=120)
def record_journal(
session_id: str,
transcript: str = "",
task_id: str = "",
location_note: str = "",
team_id: str = "team-a",
audio_path: str = "",
language: str = "en",
):
"""Record a journal entry, summarize it, and return the result.
Decorated with ``@spaces.GPU`` so the Cohere ASR model can run on the
GPU. On HF ZeroGPU, ``torch.cuda.is_available()`` is ``False`` outside
a ``@spaces.GPU`` function, so without this the ASR model would load
on CPU and the voice-journal button would appear to hang.
Two input paths are supported:
* **Voice path** — pass ``audio_path`` (e.g. a path returned by
``gr.Audio(type="filepath")``). The audio is transcribed with the
Cohere ASR service (``CohereLabs/cohere-transcribe-03-2026``) and
the transcript is stored with ASR metadata.
* **Typed path** — pass ``transcript`` directly. The user can also
edit an ASR transcript in the UI before submitting; if both
``audio_path`` and ``transcript`` are present, the typed
transcript wins and is treated as a manual correction
(``transcript_source == "hybrid"``).
"""
if session_id not in SESSION_STORE:
return "[!] Unknown session"
asr_metadata: dict | None = None
audio_ref: str | None = None
transcript_source = "typed"
# ── 1. Voice path — transcribe audio if provided ───────────────────
if audio_path:
audio_ref = audio_path
try:
from app.services.asr import transcribe as _asr_transcribe
asr_result = _asr_transcribe(audio_path, language=language)
asr_metadata = {
"model": asr_result.get("model"),
"language": asr_result.get("language"),
"status": asr_result.get("status"),
"error": asr_result.get("error"),
}
asr_text = (asr_result.get("transcript") or "").strip()
if asr_text and not transcript.strip():
transcript = asr_text
transcript_source = "asr"
elif asr_text and transcript.strip() and asr_text != transcript.strip():
transcript_source = "hybrid"
except Exception as exc:
print(f"[app] ASR transcription failed: {type(exc).__name__}: {exc}")
asr_metadata = {
"model": None,
"language": language,
"status": "error",
"error": f"{type(exc).__name__}: {exc}",
}
if not transcript or not transcript.strip():
# Surface the actual error instead of a generic message
hint = ""
if asr_metadata:
err = asr_metadata.get("error") or ""
status = asr_metadata.get("status") or ""
if status == "error" and err:
hint = f"\n\n**ASR error:** {err}"
elif status == "skipped":
hint = (
"\n\n**ASR was skipped** — set `CITYQUEST_SKIP_MODEL=0` "
"and ensure `HF_TOKEN` is configured."
)
return (
"⚠️ No transcript available — record audio or type a note first."
+ hint
)
# ── 2. Build journal entry ────────────────────────────────────────
entry = create_journal_entry(
transcript=transcript,
session_id=session_id,
team_id=team_id,
task_id=task_id or None,
location_note=location_note,
audio_ref=audio_ref,
asr_metadata=asr_metadata,
transcript_source=transcript_source,
)
# Summarize
summary = summarize_journal(transcript, task_id=task_id or None, location_note=location_note)
entry["moment_summary"] = summary["moment_summary"]
entry["tags"] = summary["tags"]
entry["story_value"] = summary["story_value"]
# Persist
save_journal_entry(entry)
# Log event
ev = log_event(session_id, "journal_recorded", {
"journal_id": entry["journal_id"],
"mood": entry["mood"],
"story_value": summary["story_value"],
"summary": summary["moment_summary"],
"transcript_source": transcript_source,
"asr_status": asr_metadata.get("status") if asr_metadata else None,
"asr_model": asr_metadata.get("model") if asr_metadata else None,
}, team_id=team_id)
SESSION_STORE[session_id]["events"].append(ev)
SESSION_STORE[session_id]["journals"].append(entry)
# Build display text
source_label = {
"asr": "🎙️ (transcribed)",
"hybrid": "🎙️✏️ (transcribed, edited)",
"typed": "⌨️ (typed)",
}.get(transcript_source, transcript_source)
asr_line = ""
if asr_metadata and asr_metadata.get("status") != "ok":
asr_line = f"- ASR status: **{asr_metadata.get('status')}** — {asr_metadata.get('error') or ''}\n"
display = (
f"🎙️ **Journal recorded!** {source_label}\n"
f"- Mood: *{entry['mood']}*\n"
f"- Story value: **{summary['story_value']}**\n"
f"- Tags: {', '.join(summary['tags'])}\n"
f"{asr_line}"
f"- Summary: {summary['moment_summary']}"
)
return display
def upload_photo(
session_id: str,
photo_file,
caption: str = "",
task_id: str = "",
team_id: str = "team-a",
):
"""Log a photo upload event and return a confirmation."""
if session_id not in SESSION_STORE:
return "[!] Unknown session"
photo_name = ""
if photo_file is not None:
# Gradio 4+ returns a filepath string or a PIL image
if isinstance(photo_file, str):
photo_name = photo_file.split("/")[-1].split("\\")[-1]
else:
photo_name = getattr(photo_file, "name", "photo")
photo_id = f"photo-{uuid.uuid4().hex[:8]}"
payload = {
"photo_id": photo_id,
"photo_name": photo_name,
"caption": caption,
"summary": f"Team {team_id} uploaded photo for {task_id or 'general'}",
}
if task_id:
payload["task_id"] = task_id
ev = log_event(session_id, "photo_uploaded", payload, team_id=team_id)
SESSION_STORE[session_id]["events"].append(ev)
# Track photo in session store for recap
if "photos" not in SESSION_STORE[session_id]:
SESSION_STORE[session_id]["photos"] = []
SESSION_STORE[session_id]["photos"].append({
"photo_id": photo_id,
"photo_name": photo_name,
"photo_path": photo_file if isinstance(photo_file, str) else "",
"caption": caption,
"task_id": task_id,
"team_id": team_id,
})
# Build gallery list
photos = SESSION_STORE[session_id]["photos"]
gallery_lines = [f"📸 **{p['photo_id']}** — {p['caption'] or '(no caption)'} [{p['task_id'] or 'general'}]" for p in photos]
display = (
f"📸 **Photo uploaded!**\n"
f"- ID: `{photo_id}`\n"
f"- Caption: {caption or '(none)'}\n"
f"- Related task: {task_id or 'general'}\n\n"
f"**All photos ({len(photos)}):**\n" + "\n".join(gallery_lines)
)
return display
def end_game(session_id: str, team_id: str = "team-a"):
"""End the game, compute scores, and return a scoreboard."""
if session_id not in SESSION_STORE:
return "[!] Unknown session"
session = SESSION_STORE[session_id]
game = session["game"]
events = load_events(session_id=session_id)
ev = log_event(session_id, "game_finished", {
"summary": f"Game finished — session {session_id}",
}, team_id=team_id)
events.append(ev)
scores = compute_scores(events, game)
session["scores"] = scores
lines = ["# Final Scoreboard\n"]
for ts in scores.get("team_scores", []):
marker = " [WINNER]" if ts["team_id"] == scores.get("winner") else ""
lines.append(f"### Team: {ts['team_id']}{marker}")
lines.append(f"- **Total points:** {ts['points']}")
lines.append(f"- Tasks completed: {ts['completed_tasks']}/{ts['total_tasks']}")
lines.append(f"- Hints used: {ts['hints_used']}")
if ts.get("bonuses"):
lines.append(f"- Bonuses: {', '.join(ts['bonuses'])}")
lines.append("")
lines.append("**Breakdown:**")
for b in ts.get("scoring_breakdown", []):
lines.append(f" * {b}")
lines.append("")
if scores.get("winner"):
lines.append(f"**Winner: {scores['winner']}**")
return "\n".join(lines)
def generate_recap(session_id: str):
"""Generate the final story recap from all collected session data."""
if session_id not in SESSION_STORE:
return "[!] Unknown session", {}
session = SESSION_STORE[session_id]
game = session["game"]
events = load_events(session_id=session_id)
journals = load_journal_entries(session_id=session_id)
scores = session.get("scores", compute_scores(events, game))
photos = session.get("photos", [])
# Build story packet
packet = build_story_packet(
game=game,
events=events,
scores=scores,
journal_entries=journals,
photo_captions=photos,
)
# Generate story
result = generate_story(packet, session_id=session_id)
# Format for display (poster goes in dedicated section below)
lines = [
"# Episode Recap\n",
result["short_recap"],
"",
result["long_summary"],
"",
]
# NOTE: poster_prompt is internal — do NOT render to user.
# NOTE: do not duplicate Final Standings here; left column shows it.
# TODO: result["long_summary"] may still contain a "Final standings"
# sub-section produced upstream in generate_story(); if so, strip it
# in the story generator module rather than here.
return "\n".join(lines), result
@spaces.GPU(duration=300)
def generate_poster_from_session(session_id: str):
"""Generate a poster image from an existing session's recap data."""
if session_id not in SESSION_STORE:
return None, "[!] Unknown session - generate a recap first"
session = SESSION_STORE[session_id]
# Check if we have a recap result or can build one
if "poster_url" in session:
return session["poster_url"], "Poster ready from previous generation"
# Build story packet from stored data
game = session.get("game")
if not game:
return None, "[!] No game data in session"
events = load_events(session_id=session_id)
journals = load_journal_entries(session_id=session_id)
scores = session.get("scores", compute_scores(events, game))
photos = session.get("photos", [])
packet = build_story_packet(
game=game, events=events, scores=scores,
journal_entries=journals, photo_captions=photos,
)
result = generate_story(packet, session_id=session_id)
poster_url, status = generate_poster_sync(
session_id,
result["poster_prompt"],
photo_paths=[p.get("photo_path", "") for p in photos if p.get("photo_path")],
)
session["poster_url"] = poster_url
return poster_url, status
# ── Build summary text ────────────────────────────────────────────────────────
def build_summary(game: dict, state: dict, session_id: str = "") -> str:
"""Format a human-readable game summary for the UI."""
lines = []
lines.append(f"# 🎮 {game.get('title', 'Untitled')}")
lines.append("")
if session_id:
lines.append(f"> Session `{session_id[:12]}…` — paste this in the **Play** tab to log progress.")
lines.append("")
lines.append("## 📋 Setup")
setup = game.get("setup", {})
lines.append(f"- **Location:** {setup.get('city', '?')} — {setup.get('area', '?')}")
lines.append(f"- **Meeting point:** {setup.get('meeting_point', '?')}")
lines.append(f"- **Duration:** {setup.get('duration_minutes', '?')} min")
lines.append(f"- **Players:** {setup.get('num_players', '?')}")
lines.append("")
lines.append("## 📜 Rules")
for i, rule in enumerate(game.get("rules", []), 1):
lines.append(f" {i}. {rule}")
lines.append("")
lines.append("## 🎯 Tasks")
for t in game.get("tasks", []):
time_str = f"{t.get('time_limit_minutes', '∞')} min" if t.get("time_limit_minutes") else "No time limit"
lines.append(f" **{t.get('task_id', '?')}:** {t.get('title', '?')}")
lines.append(f" - *{t.get('description', '')[:80]}*")
lines.append(f" - 🏆 {t.get('points', 0)} pts | ⏱ {time_str} | 📸 {t.get('proof_type', '?')}")
lines.append(f" - 💡 {t.get('hint', '')[:70]}")
lines.append(f" - 🛡 {t.get('safety_note', '')[:70]}")
lines.append("")
lines.append("## 💡 Global Hints")
for h in game.get("global_hints", []):
lines.append(f" • {h}")
lines.append("")
lines.append("## 🔒 Safety")
safety = game.get("safety", {})
lines.append(f"- **Zone:** {safety.get('allowed_zone', '?')}")
lines.append(f"- **Supervision required:** {'Yes' if safety.get('adult_supervision') else 'No'}")
lines.append(f"- **Forbidden:** {', '.join(safety.get('forbidden_behaviors', []))}")
lines.append("")
lines.append("## 📊 Scoring")
for s in game.get("score_rules", []):
lines.append(f" • {s}")
lines.append(f"- **Tie-breaker:** {game.get('tie_breaker', '?')}")
lines.append("")
lines.append("## 📖 Story Seed")
seed = game.get("story_seed", {})
lines.append(f"- **Tone:** {seed.get('tone', '?')}")
lines.append(f"- **Motifs:** {', '.join(seed.get('motifs', []))}")
lines.append(f"- **Recap style:** {seed.get('recap_style', '?')}")
lines.append("")
# Pipeline trace
lines.append("---")
lines.append("### 🔍 Pipeline trace")
lines.append(f"- Retrieval: {state.get('num_retrieved', 0)} examples found")
if state.get("retrieved_ids"):
lines.append(f"- Retrieved IDs: {', '.join(state['retrieved_ids'])}")
lines.append(f"- Game ID: {state.get('game_id', '?')}")
if state.get("validation_passed"):
lines.append(f"- ✅ Validation passed")
else:
lines.append(f"- ❌ Validation failed ({len(state.get('validation_failures', []))} issues)")
if state.get("repair_applied"):
lines.append(f"- 🔧 Repair applied → {'✅ Passed' if state.get('repair_valid') else '❌ Still has issues'}")
for f in state.get("validation_failures", [])[:5]:
lines.append(f" - {f}")
leftover = state.get("remaining_failures", [])
if leftover:
for f in leftover[:5]:
lines.append(f" - ⚠ {f}")
return "\n".join(lines)
# ══════════════════════════════════════════════════════════════════════════
# LIQUID-FLOW MULTIPLAYER UI
# ══════════════════════════════════════════════════════════════════════════
from app.ui.theme import CSS, JS_INIT, gradio_theme
rooms.load_state() # restore live rooms after a restart
ORDER = ["home", "create", "join", "lobby", "play", "wait", "win"]
def vis(target: str):
"""Visibility updates for the 7 top-level screen groups."""
return tuple(gr.update(visible=(name == target)) for name in ORDER)
def _mmss(secs) -> str:
m, s = divmod(max(0, int(secs)), 60)
return f"{m:02d}:{s:02d}"
# ── HTML fragment renderers (polled) ──────────────────────────────────────
def code_html(session_id: str) -> str:
s = SESSION_STORE.get(session_id) or {}
code = s.get("room_code", "——————")
return (
"
"
"
Share this code with your friends to join
"
f"
{code}
"
)
def copy_btn_html(session_id: str) -> str:
"""A clipboard button beside the room code. Kept in its own (non-polled)
HTML component so the 'Copied!' state isn't reset by the 1.5s lobby poll.
Pure inline JS — Gradio can't reach the clipboard from Python."""
s = SESSION_STORE.get(session_id) or {}
code = s.get("room_code", "")
js = (
"var b=this;navigator.clipboard.writeText('" + code + "').then(function(){"
"b.classList.add('copied');var l=b.querySelector('.cq-copy-label');"
"if(l)l.textContent='✓ Copied!';"
"setTimeout(function(){b.classList.remove('copied');"
"if(l)l.textContent='📋 Copy';},2000);});"
)
return (
"
"
f"
"
)
def _teams_ready(session_id: str):
"""(teams_with_at_least_one_player, total_teams) for the lobby."""
s = SESSION_STORE.get(session_id) or {}
teams = s.get("teams", [])
have = {p["team_id"] for p in s.get("players", [])}
ready = sum(1 for t in teams if t["id"] in have)
return ready, len(teams)
def roster_html(session_id: str, me: str = "") -> str:
s = SESSION_STORE.get(session_id) or {}
host = s.get("host_name", "")
ready, total = _teams_ready(session_id)
pct = int(round(100 * ready / total)) if total else 0
parts = ["
"]
parts.append(
"
"
f"
{ready} / {total} teams ready
"
"
"
f"
"
)
for t in s.get("teams", []):
members = [p for p in s.get("players", []) if p["team_id"] == t["id"]]
if members:
chips = "".join(
f""
f"{p['name']}{' ⭐' if p['name'] == host else ''}"
for p in members
)
else:
chips = "waiting…"
parts.append(
"
"
f"
"
f"{t['name']} · {t['role']}
"
f"
{chips}
"
)
parts.append("
")
return "".join(parts)
def timer_html(session_id: str, team_id: str) -> str:
ph = rooms.team_phase(session_id, team_id)
meta = rooms.team_meta(session_id, team_id)
if ph["phase"] == "waiting":
label, val, sub = "Head start in progress — you launch in", _mmss(ph["seconds_to_start"]), "Sit tight, your quest is about to open"
elif ph["phase"] == "overtime":
label, val, sub = "Time's up — wrap up and finish", "00:00", ""
else:
label, val, sub = "Time remaining", _mmss(ph["seconds_remaining"]), ""
return (
"
"
f"
"
f"{meta['name']} · {meta['role']}
"
f"
{label}
"
f"
{val}
"
f"
{sub}
"
)
def status_strip_html(session_id: str, my_team: str = "") -> str:
s = SESSION_STORE.get(session_id) or {}
pills = []
for t in s.get("teams", []):
tid = t["id"]
if rooms.team_finished(session_id, tid):
cls, txt = "done", f"{t['name']} · finished 🏁"
elif rooms.is_unlocked(session_id, tid):
cls, txt = "live", f"{t['name']} · exploring"
else:
ph = rooms.team_phase(session_id, tid)
cls, txt = "wait", f"{t['name']} · starts in {_mmss(ph['seconds_to_start'])}"
me = " (you)" if tid == my_team else ""
pills.append(f"{txt}{me}")
return "
" + "".join(pills) + "
"
def scoreboard_md(session_id: str) -> str:
s = SESSION_STORE.get(session_id) or {}
game = s.get("game") or {}
events = load_events(session_id=session_id)
scores = compute_scores(events, game)
s["scores"] = scores
winner = scores.get("winner")
ts_sorted = sorted(scores.get("team_scores", []), key=lambda x: x.get("points", 0), reverse=True)
medals = ["🥇", "🥈", "🥉"]
lines = ["# 🏆 Final Standings", ""]
for i, ts in enumerate(ts_sorted):
meta = rooms.team_meta(session_id, ts["team_id"])
medal = medals[i] if i < 3 else "•"
crown = " — Champion" if ts["team_id"] == winner else ""
lines.append(f"### {medal} {meta['name']}{crown}")
lines.append(
f"- **{ts['points']} points** · {ts['completed_tasks']}/{ts['total_tasks']} tasks · "
f"{ts['hints_used']} hints used"
)
lines.append("")
return "\n".join(lines)
def _task_id_list(session_id: str) -> list[str]:
s = SESSION_STORE.get(session_id) or {}
return [t["task_id"] for t in (s.get("game") or {}).get("tasks", [])[:12]]
def _task_by_id(session_id: str, task_id: str) -> dict:
s = SESSION_STORE.get(session_id) or {}
return next((x for x in (s.get("game") or {}).get("tasks", []) if x["task_id"] == task_id), {})
def task_detail_md(session_id: str, team_id: str, task_id: str) -> str:
t = _task_by_id(session_id, task_id)
tl = t.get("time_limit_minutes")
tls = f"{tl} min" if tl else "no time limit"
return (
f"## {t.get('title', 'Task')}\n\n"
f"**🏆 {t.get('points', 0)} pts · ⏱ {tls} · 📸 {t.get('proof_type', 'photo')}**\n\n"
f"{t.get('description', '')}"
)
def task_hint_html(session_id: str, team_id: str, task_id: str) -> str:
"""Hint rendered with its own class so it reads as 'discovery' (amber),
visually distinct from a safety warning."""
hint = (_task_by_id(session_id, task_id).get("hint") or "").strip()
if not hint:
return ""
return f"
💡 {html.escape(hint)}
"
def task_safety_html(session_id: str, team_id: str, task_id: str) -> str:
"""Safety flag rendered with a red 'warning' treatment, distinct from a hint."""
note = (_task_by_id(session_id, task_id).get("safety_note")
or "Stay aware of your surroundings.").strip()
return f"
🛡 {html.escape(note)}
"
def task_gallery(session_id: str, team_id: str, task_id: str):
return [
(p["photo_path"], p.get("caption") or "")
for p in rooms.team_photos(session_id, team_id, task_id)
if p.get("photo_path")
]
def proof_md(session_id: str, team_id: str, task_id: str) -> str:
n_photos, n_journals = rooms.proof_count(session_id, team_id, task_id)
if n_photos + n_journals > 0:
return f"✅ Proof captured — **{n_photos}** photo(s), **{n_journals}** journal(s). You can complete this task."
return "🔒 Add at least **one photo** or **one voice journal** to unlock **Mark Completed**."
def journal_log_md(session_id: str, team_id: str, task_id: str) -> str:
"""List the voice/typed journals already saved for this task so players
can keep adding more recordings."""
entries = rooms.team_journals(session_id, team_id, task_id)
if not entries:
return ""
lines = [f"**🎙️ Journals for this task ({len(entries)})** — record more anytime:"]
for i, e in enumerate(entries, 1):
txt = (e.get("transcript") or "").strip().replace("\n", " ")
if len(txt) > 90:
txt = txt[:90] + "…"
lines.append(f"{i}. {txt or '*(no text)*'}")
return "\n".join(lines)
def _hint_points_update(session_id: str, team_id: str, task_id: str):
"""Live 'available points' for this task: base points minus the per-hint
penalty for every hint this team has revealed, clamped at 0. Turns
orange-red once any hint has been spent."""
base = _task_by_id(session_id, task_id).get("points", 0)
used = rooms.hints_used_count(session_id, team_id, task_id)
avail = max(0, base - used * HINT_PENALTY)
cls = ["points-display"] + (["reduced"] if used > 0 else [])
return gr.update(value=f"⭐ {avail} pts available (−{HINT_PENALTY} pts per hint)",
elem_classes=cls)
def _hint_btn_state(remaining: int):
"""Hint button label/interactivity from the number of hints still allowed."""
if remaining <= 0:
return gr.update(value="No hints left", interactive=False,
elem_classes=["btn-hint", "spent"])
return gr.update(value=f"Get a hint −{HINT_PENALTY} pts ({remaining} left)",
interactive=True, elem_classes=["btn-hint"])
def _hint_btn_update(session_id: str, team_id: str, task_id: str):
used = rooms.hints_used_count(session_id, team_id, task_id)
return _hint_btn_state(max(0, rooms.HINT_CAP - used))
def hint_log_md(session_id: str, team_id: str, task_id: str) -> str:
"""History of hints already revealed for this task so players don't have to
re-buy a hint they've seen — shows the guide's answer for each."""
hints = rooms.team_hints(session_id, team_id, task_id)
if not hints:
return ""
total = len(hints) * HINT_PENALTY
lines = [f"**Hints used ({len(hints)} of {rooms.HINT_CAP}) — −{total} pts total**"]
for i, p in enumerate(hints, 1):
ans = (p.get("answer") or "").strip().replace("\n", " ")
if len(ans) > 140:
ans = ans[:140] + "…"
lines.append(f"💡 Hint {i}: {ans or '*(no answer captured)*'}")
return "\n\n".join(lines)
def _detail_tab_updates(session_id: str, team_id: str, active_tid: str):
"""Updates for the 12 horizontal task-tab buttons in the detail view: each
shows ✅/○ + name + points, the completed ones flagged 'done', and the one
you're viewing flagged 'active'."""
s = SESSION_STORE.get(session_id) or {}
tasks = (s.get("game") or {}).get("tasks", [])[:12]
wall = rooms.task_wall_state(session_id, team_id)
ups = []
for i in range(12):
if i < len(tasks):
t = tasks[i]
tid = t["task_id"]
done = wall.get(tid, False)
name = t.get("title", "Task")
if len(name) > 22:
name = name[:21] + "…"
cls = ["task-tab"]
if done:
cls.append("task-tab-done")
if tid == active_tid:
cls.append("task-tab-active")
ups.append(gr.update(
value=f"{'✅' if done else '○'} {name} · {t.get('points', 0)}pts",
visible=True, elem_classes=cls,
))
else:
ups.append(gr.update(visible=False))
return ups
def _task_position(session_id: str, active_tid: str) -> str:
s = SESSION_STORE.get(session_id) or {}
ids = [t["task_id"] for t in (s.get("game") or {}).get("tasks", [])[:12]]
if not ids:
return ""
idx = ids.index(active_tid) if active_tid in ids else 0
return f"Task {idx + 1} of {len(ids)}"
def _complete_btn_update(can: bool):
"""Locked (grey, disabled) vs unlocked (green, active) Mark Completed button."""
if can:
return gr.update(interactive=True, value="✅ Mark completed",
elem_classes=["mark-complete-btn", "complete-btn", "unlocked"])
return gr.update(interactive=False, value="🔒 Mark completed",
elem_classes=["mark-complete-btn", "complete-btn", "locked"])
def _completion_instruction_update(can: bool):
"""The elevated unlock instruction shown just above Mark Completed."""
if can:
return gr.update(value="✅ Proof added — you can mark this complete!",
elem_classes=["completion-instruction", "unlocked"])
return gr.update(value="🔒 Add a photo or voice note to unlock completion",
elem_classes=["completion-instruction", "locked"])
# ── Polling dispatchers (attached to the shared gr.Timer) ─────────────────
# Adaptive polling: stay snappy (FAST) while anything is changing, ease to
# SLOW after a stretch of identical ticks. The countdown timer on the play
# screen changes every second, so play naturally stays FAST; lobby/wait back
# off when idle. Capped at SLOW so screen transitions still arrive within ~3s.
POLL_FAST = 1.5
POLL_SLOW = 3.0
POLL_IDLE_LIMIT = 4 # identical ticks before easing off
def poll_tick(session_id, screen, team_id, player_name, is_host, poll_state):
nav = [gr.update()] * 7
new_screen = screen
lobby_code = lobby_roster = play_timer = play_status = wait_status = win_score = gr.update()
lobby_start = lobby_start_anyway = lobby_hint = gr.update()
wait_line = wait_results_btn = gr.update()
if session_id:
status = rooms.session_status(session_id)
# Canonical team id — never "" — so same-team members converge on one id.
team_id = rooms.resolve_team_id(session_id, player_name) or team_id
if screen == "lobby":
lobby_code = code_html(session_id)
lobby_roster = roster_html(session_id, player_name)
# Start gate (Model B): the primary Start enables only when the whole
# party is here (full AND min_ok). The host-only "Start anyway"
# override covers the short-handed-but-legal case (min_ok, not full).
pcs = rooms.player_count_status(session_id)
full, min_ok = pcs["full"], pcs["min_ok"]
lobby_start = gr.update(interactive=(full and min_ok))
if is_host:
lobby_start_anyway = gr.update(
visible=(not full), interactive=(min_ok and not full),
value=f"▶ Start anyway ({pcs['joined']}/{pcs['target']} here)")
lobby_hint = lobby_hint_text(session_id, is_host)
if status == "playing":
new_screen = "play"
nav = list(vis("play"))
play_timer = timer_html(session_id, team_id)
play_status = status_strip_html(session_id, team_id)
elif screen == "play":
play_timer = timer_html(session_id, team_id)
play_status = status_strip_html(session_id, team_id)
elif screen == "wait":
# The wait screen NEVER navigates on its own — for any player, under
# any condition. Each tick only UPDATES: the team strip, the
# "who's still playing" line, and the results button's enabled state.
# The wait->win transition happens ONLY when a player clicks the
# "See final results" button (see do_see_results).
wait_status = status_strip_html(session_id, team_id)
wait_line = wait_playing_md(session_id)
pcs = rooms.player_count_status(session_id)
can_results = rooms.all_finished(session_id) and pcs["min_ok"]
wait_results_btn = gr.update(interactive=can_results)
# ── Decide next poll interval from whether anything changed ──
def _s(x):
return x if isinstance(x, str) else ""
sig = "|".join([new_screen, _s(lobby_code), _s(lobby_roster), _s(play_timer),
_s(play_status), _s(wait_status), _s(wait_line), _s(win_score)])
last_sig, idle, cur_iv = poll_state or ("", 0, POLL_FAST)
if sig != last_sig:
idle, target_iv = 0, POLL_FAST
else:
idle += 1
target_iv = POLL_SLOW if idle >= POLL_IDLE_LIMIT else POLL_FAST
# Only emit a Timer change when the interval actually flips, so we don't
# reset the running countdown every tick.
timer_update = gr.Timer(target_iv) if target_iv != cur_iv else gr.update()
new_state = (sig, idle, target_iv)
return (*nav, new_screen, lobby_code, lobby_roster, lobby_start, lobby_start_anyway,
lobby_hint, play_timer, play_status, wait_status, wait_line,
wait_results_btn, win_score, timer_update, new_state)
def play_tick(session_id, screen, team_id, taskids, cur_task):
"""On the play screen keep the task-id list current and, on first entry
(no task open yet), open the first task in place. This covers both the host
(who clicks Start) and guests (who auto-transition to play on the poll), so
the detail view is always populated without a separate landing screen.
Returns the full detail-open tuple plus the refreshed ``s_taskids``."""
if screen != "play" or not session_id:
return (gr.update(),) * _N_DETAIL_OUTPUTS + (taskids,)
ids = _task_id_list(session_id)
ids_json = json.dumps(ids)
if not cur_task and ids:
# First time on the play screen for this client: open task 1.
return _open_task(session_id, team_id, ids[0]) + (ids_json,)
# Already viewing a task — leave the detail view untouched (don't reset the
# player's in-progress photo/transcript every tick), just sync the id list.
return (gr.update(),) * _N_DETAIL_OUTPUTS + (ids_json,)
# ── Screen-action handlers ────────────────────────────────────────────────
# Lobby instruction copy — differs for the host vs. everyone else. The host
# copy shows progress (joined/target); HINT_HOST_WAIT/HINT_HOST_SHORT are
# format strings filled by lobby_hint_text().
HINT_HOST_WAIT = "⏳ Waiting for players… {joined}/{target} joined"
HINT_HOST_SHORT = "▶ {joined}/{target} here — start now, or wait for the rest."
HINT_HOST_READY = "✅ Everyone's in — start the adventure whenever you're ready."
HINT_GUEST = "⏳ Waiting for the host to start the adventure…"
def lobby_hint_text(session_id: str, is_host: bool) -> str:
"""Lobby instruction line. Guests always wait on the host; the host sees
progress and which gate (full vs. structural minimum) they're at."""
if not is_host:
return HINT_GUEST
pcs = rooms.player_count_status(session_id)
if pcs["full"] and pcs["min_ok"]:
return HINT_HOST_READY
if pcs["min_ok"]:
return HINT_HOST_SHORT.format(joined=pcs["joined"], target=pcs["target"])
return HINT_HOST_WAIT.format(joined=pcs["joined"], target=pcs["target"])
def wait_playing_md(session_id: str) -> str:
"""Live 'who's still playing' line for the wait screen — active teams
(>=1 player) that haven't finished yet."""
active = rooms.active_team_ids(session_id)
still = [tid for tid in active if not rooms.team_finished(session_id, tid)]
if not still:
return "Everyone's finished — see results whenever you're ready."
names = ", ".join(rooms.team_meta(session_id, tid)["name"] for tid in still)
return f"Waiting on: {names}"
def _team_btn_updates(session_id, selected_id):
"""Button updates for the 4 team-pick slots: label + visibility, with the
player's current team flagged 'selected' so it reads as chosen (green)."""
teams = rooms.list_teams(session_id)
ups = []
for i in range(4):
if i < len(teams):
cls = ["team-pick", "selected"] if teams[i]["id"] == selected_id else ["team-pick"]
ups.append(gr.update(value=f"{teams[i]['name']} · {teams[i]['role']}",
visible=True, elem_classes=cls))
else:
ups.append(gr.update(visible=False))
return ups
def do_create(name, gtype, city, area, duration, teams_n, head_start,
difficulty, age_group, energy, players_n):
name = (name or "Host").strip() or "Host"
# Defense-in-depth: the slider min is 2, but guard here too so a game can
# never be created with a single team (build_teams also floors at 2).
teams_n = int(teams_n)
if teams_n < 2:
gr.Warning("Minimum 2 teams required — using 2.")
teams_n = 2
session_id = str(uuid.uuid4())
config = {
"game_type": gtype, "city": city or "Paris", "area": area or "Downtown",
"location_type": "mixed", "duration_minutes": int(duration),
"num_players": int(players_n), "num_teams": int(teams_n),
"head_start_seconds": int(head_start), "difficulty": difficulty,
"age_group": age_group, "energy_level": energy, "photo_enabled": True,
"landscape_tags": [], "theme": "", "mobility": "standard", "allow_transport": False,
}
rooms.create_room(name, config, session_id)
teams = rooms.list_teams(session_id)
tb = _team_btn_updates(session_id, teams[0]["id"])
# Host's s_team is their real team id (teams[0] — create_room files the host
# under it too), NOT "", so host proof/journal/finish state and final
# standings all key off the same id as every other same-team member.
return (*vis("lobby"), "lobby", session_id, name, teams[0]["id"], True,
code_html(session_id), roster_html(session_id, name),
gr.update(visible=True, interactive=False),
copy_btn_html(session_id), lobby_hint_text(session_id, True), *tb)
def do_join(code, name):
session_id, msg, ok = rooms.join_room(code, name)
if not ok:
gr.Warning(msg)
return (*vis("join"), "join", "", "", "", False, f"⚠ {msg}",
gr.update(), gr.update(), gr.update(visible=False),
gr.update(), gr.update(),
*([gr.update(visible=False)] * 4))
gr.Info(f"Joined room — {msg}")
name = (name or "").strip()
teams = rooms.list_teams(session_id)
tb = _team_btn_updates(session_id, teams[0]["id"])
return (*vis("lobby"), "lobby", session_id, name, teams[0]["id"], False, f"✅ {msg}",
code_html(session_id), roster_html(session_id, name),
gr.update(visible=False),
copy_btn_html(session_id), HINT_GUEST, *tb)
def make_pick(idx):
def _pick(session_id, player):
teams = rooms.list_teams(session_id)
if idx >= len(teams):
return (gr.update(), gr.update(), *([gr.update()] * 4))
tid = teams[idx]["id"]
rooms.pick_team(session_id, player, tid)
return (tid, roster_html(session_id, player),
*_team_btn_updates(session_id, tid))
return _pick
def prepare_game(session_id):
"""Generate the game during the lobby wait so Start is instant.
Triggered right after the host creates the room. Config is fully known at
creation time (teams/players don't affect Nemotron), so we download +
generate here, hidden behind the lobby. Runs in a normal Gradio event
(request context) so the inner @spaces.GPU generation gets a real GPU.
"""
if not session_id:
yield ""
return
s = SESSION_STORE.get(session_id)
if not s:
yield ""
return
if s.get("game"):
yield "
✓ Quest ready
"
return
yield "
Preparing your quest — warming Nemotron & generating tasks…
"
try:
generate_for_session(session_id)
yield "
✓ Quest ready — start whenever everyone's in
"
except Exception as e:
print(f"[prepare_game] {type(e).__name__}: {e}")
yield "
Couldn't pre-generate — it'll generate when you start.
"
def do_start(session_id, team_id=""):
s = SESSION_STORE.get(session_id) or {}
# Render the starter's OWN team strip/timer, not always team A — a host who
# picked a later team shouldn't see Team A's strip. Never empty: fall back
# to the first team (mirrors rooms.resolve_team_id's fallback).
team_id = team_id or (s.get("teams") or [{"id": "team-a"}])[0]["id"]
if not s.get("game"):
# Fallback: lobby pre-generation didn't finish — show a loader.
yield (*vis("lobby"), "lobby", gr.update(), gr.update(),
"
Summoning your quest…
")
generate_for_session(session_id)
rooms.begin_play(session_id, SESSION_STORE[session_id].get("game"))
yield (*vis("play"), "play", timer_html(session_id, team_id),
status_strip_html(session_id, team_id), "")
def do_leave():
return (*vis("home"), "home", "", "", "", False)
# Number of outputs produced by _open_task / consumed by DETAIL_OPEN_OUTPUTS.
# 17 fixed slots + 12 task-tab buttons.
_N_DETAIL_OUTPUTS = 19 + 12
def _open_task(session_id, team_id, tid):
"""Render the full task-detail view for one task. Shared by the play-screen
entry, the horizontal task tabs, and the prev/next arrows so every entry
point produces an identical, consistent detail screen."""
# NOTE: tasks are session-global (session["game"]["tasks"]) and are the SAME
# for everyone — never slice/filter them per player. team_id here only keys
# the per-team completion/proof/hint OVERLAY, so same-team members (sharing
# one resolved team_id) see one shared task/proof view. Do not regress this.
can = rooms.can_complete(session_id, team_id, tid)
return (
tid,
gr.update(visible=True), # detail view (always shown on the play screen)
task_detail_md(session_id, team_id, tid),
task_hint_html(session_id, team_id, tid),
task_safety_html(session_id, team_id, tid),
task_gallery(session_id, team_id, tid),
proof_md(session_id, team_id, tid),
_complete_btn_update(can),
_completion_instruction_update(can),
"", None, "", "", "", # transcript, audio, tagline, ask answer, capture status
journal_log_md(session_id, team_id, tid),
_task_position(session_id, tid),
_hint_points_update(session_id, team_id, tid),
hint_log_md(session_id, team_id, tid),
_hint_btn_update(session_id, team_id, tid),
*_detail_tab_updates(session_id, team_id, tid),
)
def open_task_factory(idx):
"""Open the task at position ``idx`` in the team's task list. Used by both
the wall cards and the horizontal task tabs (both index into s_taskids)."""
def _open(session_id, team_id, taskids):
ids = json.loads(taskids or "[]")
if idx >= len(ids):
return (gr.update(),) * _N_DETAIL_OUTPUTS
return _open_task(session_id, team_id, ids[idx])
return _open
def go_adjacent_factory(delta):
"""Step to the previous (delta=-1) or next (delta=+1) task, clamped to the
list ends, and open it in place."""
def _go(session_id, team_id, taskids, cur_tid):
ids = json.loads(taskids or "[]")
if not ids:
return (gr.update(),) * _N_DETAIL_OUTPUTS
i = ids.index(cur_tid) if cur_tid in ids else 0
i = max(0, min(len(ids) - 1, i + delta))
return _open_task(session_id, team_id, ids[i])
return _go
def do_transcribe(audio_path, language):
if not audio_path:
yield "No recording yet — tap record above, then transcribe."
return
yield "🌀 Transcribing your note…"
text = transcribe_only(audio_path, language or "en")
yield text or "(No speech detected — pick the right spoken language, or type your note.)"
def do_save_journal(session_id, transcript, audio_path, language, task_id, team_id):
# 7 outputs: capture status, proof, complete-btn, completion instruction,
# journal log, audio, transcript
yield ("🌀 Saving your journal…", gr.update(), gr.update(),
gr.update(), gr.update(), gr.update(), gr.update())
msg = record_journal(session_id, transcript=transcript or "", task_id=task_id,
team_id=team_id, audio_path=audio_path or "", language=language or "en")
if msg.lstrip().startswith("⚠"):
gr.Warning("Couldn't save — record audio or type a note first.")
else:
gr.Info("Journal saved 🎙️")
can = rooms.can_complete(session_id, team_id, task_id)
# Clear the recorder + transcript so the player can immediately record another.
yield (msg, proof_md(session_id, team_id, task_id), _complete_btn_update(can),
_completion_instruction_update(can),
journal_log_md(session_id, team_id, task_id), None, "")
def do_add_photo(session_id, image, tagline, task_id, team_id):
# 7 outputs: capture status, gallery, proof, complete-btn, completion
# instruction, photo, tagline
if image is None:
gr.Warning("Choose a photo first.")
return ("Choose a photo first.", task_gallery(session_id, team_id, task_id),
proof_md(session_id, team_id, task_id), gr.update(), gr.update(), image, tagline)
upload_photo(session_id, image, caption=tagline or "", task_id=task_id, team_id=team_id)
can = rooms.can_complete(session_id, team_id, task_id)
gr.Info("Photo added 📸")
return ("📸 Photo added with its tagline.", task_gallery(session_id, team_id, task_id),
proof_md(session_id, team_id, task_id), _complete_btn_update(can),
_completion_instruction_update(can), None, "")
@spaces.GPU(duration=120)
def _ask_gpu(question, session_id, team_id, task_id):
s = SESSION_STORE.get(session_id) or {}
game = s.get("game") or {}
t = next((x for x in game.get("tasks", []) if x["task_id"] == task_id), {})
city = game.get("setup", {}).get("city", "")
ctx = f"City: {city}. Task: {t.get('title','')} — {t.get('description','')}. Hint: {t.get('hint','')}"
ans = None
try:
from app.services import minicpm
ans = minicpm.ask(question, context=ctx)
except Exception as e:
print(f"[ask] failed: {e}")
if not ans:
ans = "*(The guide is resting — re-read the hint and scan for the named landmark. Try again in a moment.)*"
return ans
def on_get_hint(question, session_id, team_id, task_id):
"""Reveal a hint from the in-game guide and charge the team for it.
Outputs (4): guide answer, available-points display, hint button, hint log.
Points are only charged once a real answer is produced — an empty question
or a spent task costs nothing.
"""
used = rooms.hints_used_count(session_id, team_id, task_id)
remaining = max(0, rooms.HINT_CAP - used)
if remaining <= 0:
gr.Warning("No hints left for this task.")
yield (gr.update(value="You've used all the hints for this task — check the hint history below."),
gr.update(), _hint_btn_state(0), gr.update())
return
if not (question or "").strip():
yield ("Tell me where you're stuck, or ask for a clue — then tap the button.",
gr.update(), gr.update(), gr.update())
return
# Thinking: disable the button so the hint can't be double-charged; don't
# deduct until we actually have an answer.
yield ("🌀 The guide is thinking…", gr.update(),
gr.update(value="Thinking…", interactive=False), gr.update())
answer = _ask_gpu(question, session_id, team_id, task_id)
used_now = use_hint(session_id, task_id, team_id, question=question, answer=answer)
remaining_now = max(0, rooms.HINT_CAP - used_now)
gr.Info(f"−{HINT_PENALTY} pts · Hint used · {remaining_now} hint(s) left")
yield (f"💡 {answer}",
_hint_points_update(session_id, team_id, task_id),
_hint_btn_state(remaining_now),
hint_log_md(session_id, team_id, task_id))
def do_complete(session_id, team_id, task_id):
if rooms.can_complete(session_id, team_id, task_id):
complete_task(session_id, task_id, team_id)
gr.Info("Task completed — nice work! 🎉")
# Re-render the same task so its tab shows ✓ and the completed state
# is reflected in place (there's no wall to return to).
return _open_task(session_id, team_id, task_id)
gr.Warning("Add a photo or voice journal first to complete this task.")
return (gr.update(),) * _N_DETAIL_OUTPUTS
def do_finish(session_id, team_id):
log_event(session_id, "game_finished", {"summary": f"{team_id} finished"}, team_id=team_id)
rooms.finish_team(session_id, team_id)
return (*vis("wait"), "wait", status_strip_html(session_id, team_id))
def do_see_results(session_id):
"""Manual wait->win advance — the ONLY path to the win screen. Fired by the
'See final results' button (gated/enabled by poll_tick), never automatic."""
return (*vis("win"), "win", scoreboard_md(session_id))
def _gather_moments(session_id, team_id):
bits = [j.get("transcript", "") for j in rooms.team_journals(session_id, team_id)]
bits += [p.get("caption", "") for p in rooms.team_photos(session_id, team_id)]
return [b for b in bits if b][:12]
@spaces.GPU(duration=180)
def _funny_recap_gpu(moments):
prompt = (
"Our playful city adventure moments: " + "; ".join(moments) +
". Write a short, funny, warm recap (4-6 sentences). Keep it light and silly. "
"Do NOT mention winning, losing, scores, or rankings."
)
ans = None
try:
from app.services import minicpm
ans = minicpm.ask(prompt, context="", max_tokens=320)
except Exception as e:
print(f"[funny_recap] {e}")
if not ans:
ans = "### A few ridiculous highlights\n\n- " + "\n- ".join(moments)
return ans
def _stream_text(full: str):
"""Yield a string progressively for a narrative reveal (UI pacing only).
Reveals paragraph-by-paragraph, falling back to sentence chunks for a
single long block, accumulating so the Markdown stays well-formed at every
step. The text itself is produced by the backend and is never altered.
"""
full = (full or "").strip()
if not full:
yield full
return
paras = [p for p in full.split("\n\n") if p.strip()]
if len(paras) <= 1:
# One block — chunk by sentence so long recaps still feel alive.
import re
paras = re.findall(r"[^.!?]+[.!?]*\s*", full) or [full]
sep = ""
else:
sep = "\n\n"
acc = ""
for i, chunk in enumerate(paras):
acc += (sep if i else "") + chunk
yield acc
time.sleep(0.12)
def do_funny_recap(session_id, team_id):
moments = _gather_moments(session_id, team_id)
if not moments:
yield "_Snap a photo or record a voice note first — then I'll spin a silly recap!_"
return
yield "🌀 Spinning a silly recap…"
yield from _stream_text(_funny_recap_gpu(moments))
@spaces.GPU(duration=300)
def _funny_post_gpu(session_id, team_id):
paths = [p["photo_path"] for p in rooms.team_photos(session_id, team_id) if p.get("photo_path")]
prompt = ("A whimsical, funny travel scrapbook poster, warm pastel beige tones, playful "
"hand-drawn doodles, lighthearted city adventure, cozy and silly, no text")
return generate_poster_sync(f"{session_id}-funny", prompt, photo_paths=paths)
def do_funny_post(session_id, team_id):
yield None, "🎨 Painting your funny poster…"
yield _funny_post_gpu(session_id, team_id)
@spaces.GPU(duration=180)
def _win_recap_gpu(session_id):
md, _ = generate_recap(session_id)
return md
def do_win_recap(session_id):
yield "🌀 Writing your winning recap…"
yield from _stream_text(_win_recap_gpu(session_id))
@spaces.GPU(duration=300)
def _win_poster_gpu(session_id, kind):
s = SESSION_STORE.get(session_id) or {}
paths = [p["photo_path"] for p in s.get("photos", []) if p.get("photo_path")]
prompts = {
"ending": ("A triumphant golden-hour ending poster for a city adventure, warm beige and "
"teal tones, celebratory and cinematic, no text"),
"group": ("A joyful group celebration scrapbook poster, warm pastel beige tones, friends "
"adventuring together, confetti, no text"),
}
return generate_poster_sync(f"{session_id}-{kind}", prompts.get(kind, prompts["ending"]), photo_paths=paths)
def do_win_endpost(session_id):
yield None, "🎨 Painting your ending poster…"
yield _win_poster_gpu(session_id, "ending")
def do_win_grouppost(session_id):
yield None, "🎨 Painting your group poster…"
yield _win_poster_gpu(session_id, "group")
def do_new():
return (*vis("home"), "home", "", "", "", False)
# ── Live form validation (pure UI, no backend calls) ──────────────────────
# Mirror the room-code charset/length from rooms so the live hint matches what
# join_room() will accept (it upper-cases + strips before checking).
_CODE_LEN = 6
_CODE_CHARS = getattr(rooms, "_CODE_ALPHABET", "ABCDEFGHJKMNPQRSTUVWXYZ23456789")
def validate_join_code(code: str, name: str):
"""Live feedback for the join form: code counter/validity + submit gating.
Returns (info_html, submit_update). Purely presentational — never calls
the backend; join_room() still does the authoritative check on submit.
"""
raw = (code or "").strip().upper()
n = len(raw)
bad = [c for c in raw if c not in _CODE_CHARS]
has_name = bool((name or "").strip())
if n == 0:
pill = "enter your 6-character code"
elif n < _CODE_LEN:
pill = f"{n}/{_CODE_LEN} characters"
elif n > _CODE_LEN or bad:
detail = "too long" if n > _CODE_LEN else f"unexpected: {' '.join(sorted(set(bad)))}"
pill = f"{n}/{_CODE_LEN} · {detail}"
else:
pill = f"✓ {n}/{_CODE_LEN} · looks good"
name_pill = "" if has_name else "add your name"
info = f"
{pill}{name_pill}
"
valid = (n == _CODE_LEN and not bad and has_name)
return info, gr.update(interactive=valid)
# Button busy-state helpers for .then() chaining around LLM/IO calls.
# Disabling during the call prevents double-submits and gives a visual cue;
# the in-component text loaders ("🌀 …") still communicate progress.
def _btn_off():
return gr.update(interactive=False)
def _btn_on():
return gr.update(interactive=True)
_GTYPE_LABELS = {
"scavenger_hunt": "Scavenger hunt",
"hide_and_seek": "Hide & seek",
"tag": "Tag",
}
def create_summary(name, gtype, city, area, teams, duration, players):
"""Live one-line recap of the create form. Pure string formatting."""
bits = [
_GTYPE_LABELS.get(gtype, gtype),
(city or "Paris").strip(),
]
if (area or "").strip():
bits.append(area.strip())
bits.append(f"{int(teams)} team{'s' if int(teams) != 1 else ''}")
bits.append(f"{int(duration)} min")
bits.append(f"{int(players)} players")
summary = " · ".join(bits)
host = (name or "").strip()
who = f"Hosted by {host} — " if host else ""
return (
"
"
f"{who}"
f"{summary}
"
)
# ── ASR transcribe-only helper (no save) ──────────────────────────────────
@spaces.GPU(duration=120)
def transcribe_only(audio_path: str, language: str = "en") -> str:
if not audio_path:
return ""
try:
from app.services.asr import transcribe as _t
result = _t(audio_path, language=language or "en")
text = (result.get("transcript") or "").strip()
print(f"[transcribe_only] lang={language} status={result.get('status')} "
f"len={len(text)} preview={text[:80]!r}")
return text
except Exception as e:
print(f"[transcribe_only] {type(e).__name__}: {e}")
return ""
# ══════════════════════════════════════════════════════════════════════════
# BLOCKS
# ══════════════════════════════════════════════════════════════════════════
with gr.Blocks(title="CityQuest · AI") as demo:
# per-browser state
s_session = gr.State("")
s_player = gr.State("")
s_team = gr.State("")
s_host = gr.State(False)
s_screen = gr.State("home")
s_task = gr.State("")
s_taskids = gr.State("[]")
s_lang = gr.State("en") # voice-journal ISO code, set on the Create screen
s_poll_state = gr.State(("", 0, POLL_FAST)) # (signature, idle_ticks, interval)
gr.HTML(
"
"
"🌲 CityQuest · AI"
"
explore your city, the unhurried way
"
)
# ── SCREEN: HOME ──────────────────────────────────────────────────────
with gr.Group(visible=True, elem_classes=["cq-screen", "cq-home"]) as g_home:
gr.HTML("
"
"
A calm, playful city adventure
"
"
")
with gr.Row():
with gr.Column():
with gr.Group(elem_classes=["cq-card", "cq-home-card"]):
gr.Markdown("Host a new adventure and invite your friends.", elem_classes=["landing-description"])
home_create = gr.Button("Create a room", variant="primary", size="lg", elem_id="home-create", elem_classes=["landing-btn"])
with gr.Column():
with gr.Group(elem_classes=["cq-card", "cq-home-card"]):
gr.Markdown("Got a code from a friend? Hop in.", elem_classes=["landing-description"])
home_join = gr.Button("Join a room", variant="secondary", size="lg", elem_id="home-join", elem_classes=["landing-btn"])
# ── SCREEN: CREATE ────────────────────────────────────────────────────
with gr.Group(visible=False, elem_classes=["cq-screen", "cq-create"]) as g_create:
with gr.Column(elem_classes=["cq-card"]):
gr.Markdown("## Host a new adventure")
c_name = gr.Textbox(label="What to call you?", placeholder="e.g. Arjun", value="")
with gr.Row():
c_type = gr.Dropdown(["scavenger_hunt", "hide_and_seek", "tag"],
value="scavenger_hunt", label="Game type",
filterable=False, elem_classes=["cq-dropdown"])
c_teams = gr.Slider(2, 4, value=2, step=1, label="Teams (min 2)")
with gr.Row():
c_city = gr.Textbox(label="City", placeholder="Paris")
c_area = gr.Textbox(label="Area / neighbourhood", placeholder="Le Marais")
with gr.Row():
c_duration = gr.Slider(15, 120, value=45, step=5, label="Duration (min)")
# Starts at 0 / hidden because the default game type is
# scavenger_hunt (no head start). toggle_head_start raises it to
# 60 and reveals it when a staggered-start game type is picked.
c_headstart = gr.Slider(0, 300, value=0, step=15,
label="Head start between teams (sec)",
visible=False)
with gr.Row():
c_diff = gr.Dropdown(["easy", "medium", "hard"], value="medium", label="Difficulty",
elem_classes=["cq-dropdown"])
c_age = gr.Dropdown(["kids", "teens", "adults", "all"], value="all", label="Age group",
elem_classes=["cq-dropdown"])
with gr.Row():
c_energy = gr.Dropdown(["chill", "balanced", "high"], value="balanced", label="Energy",
elem_classes=["cq-dropdown"])
c_players = gr.Slider(2, 20, value=4, step=1, label="Players")
with gr.Row():
c_language = gr.Dropdown(LANGUAGE_CHOICES, value="English",
label="Voice journal language",
filterable=False, elem_classes=["cq-dropdown"])
create_summary_html = gr.HTML("", elem_classes=["game-summary"])
with gr.Row():
create_back = gr.Button("← Back", variant="secondary", elem_id="create-back",
elem_classes=["back-btn", "create-btn"])
create_submit = gr.Button("Open the lobby →", variant="primary", elem_id="create-submit",
elem_classes=["open-lobby-btn", "create-btn"])
# ── SCREEN: JOIN ──────────────────────────────────────────────────────
with gr.Group(visible=False, elem_classes=["cq-screen"]) as g_join:
with gr.Column(elem_classes=["cq-card"]):
gr.Markdown("## Join a room")
j_code = gr.Textbox(label="Room code", placeholder="e.g. WAVE42")
join_code_info = gr.HTML("")
j_name = gr.Textbox(label="Your name", placeholder="e.g. Maya")
join_feedback = gr.Markdown("")
with gr.Row():
join_back = gr.Button("← Back", variant="secondary", elem_id="join-back")
join_submit = gr.Button("Join →", variant="primary",
interactive=False, elem_id="join-submit")
# ── SCREEN: LOBBY ─────────────────────────────────────────────────────
with gr.Group(visible=False, elem_classes=["cq-screen"]) as g_lobby:
with gr.Column(elem_classes=["cq-card"]):
lobby_code = gr.HTML("")
lobby_copy = gr.HTML("")
gr.HTML("
Pick your team
")
with gr.Row():
team_btn = [gr.Button("", variant="secondary", visible=False,
elem_classes=["team-pick"]) for _ in range(4)]
gr.HTML("
Who's here
")
lobby_roster = gr.HTML("")
lobby_prep = gr.HTML("", elem_classes=["lobby-prep"])
lobby_hint = gr.Markdown(HINT_GUEST, elem_classes=["lobby-hint"])
with gr.Row():
lobby_leave = gr.Button("Leave", variant="secondary", elem_id="lobby-leave")
lobby_start = gr.Button("Start the adventure →", variant="primary",
visible=False, interactive=False,
elem_id="lobby-start", elem_classes=["lobby-start-btn"])
# Host-only short-handed override, sitting just below Start as a green
# underlined text link (not a button). Hidden until poll_tick reveals
# it (min_ok and not full); the normal Start covers the full case.
lobby_start_anyway = gr.Button("▶ Start anyway", variant="secondary",
visible=False, interactive=False,
elem_id="lobby-start-anyway",
elem_classes=["lobby-start-link"])
# ── SCREEN: PLAY ──────────────────────────────────────────────────────
with gr.Group(visible=False, elem_classes=["cq-screen"]) as g_play:
play_timer = gr.HTML("")
gr.HTML("
Teams
")
play_status = gr.HTML("")
# The play screen lands directly in the task detail; the horizontal task
# tabs below are the only task switcher (the old vertical task wall was
with gr.Group(visible=True, elem_classes=["cq-reveal"]) as g_detail:
# Wrap everything in a master row with spacers to align the ENTIRE page
with gr.Row():
# 1. Left global spacer
gr.Column(scale=1, min_width=0)
# 2. Main Central Column (Aligns Tabs, Text, and Boxes perfectly)
with gr.Column(scale=10, elem_classes=["play-card"]):
# --- Tabs & Nav ---
with gr.Row(elem_classes=["task-tab-row"]):
detail_tab_btn = [gr.Button("", visible=False, size="sm",
elem_classes=["task-tab"]) for _ in range(12)]
with gr.Row(elem_classes=["task-nav-row"]):
prev_task_btn = gr.Button("← Prev", size="sm", scale=0,
elem_id="td-prev", elem_classes=["task-nav-btn"])
task_position_label = gr.Markdown("", elem_classes=["task-position"])
next_task_btn = gr.Button("Next →", size="sm", scale=0,
elem_id="td-next", elem_classes=["task-nav-btn"])
# --- Text Content ---
detail_md = gr.Markdown("")
detail_hint = gr.HTML("")
detail_safety = gr.HTML("")
# --- Voice & Photos ---
with gr.Row(equal_height=True, elem_classes=["panels-row"]):
with gr.Column(scale=1, min_width=250, elem_classes=["proof-col"]):
gr.HTML("
🎙️ Voice journal
")
d_audio = gr.Audio(sources=["microphone", "upload"], type="filepath",
format="wav", show_label=False,
elem_classes=["cq-audio"],
waveform_options=gr.WaveformOptions(
show_recording_waveform=False))
d_transcript = gr.Textbox(label="Your notes", lines=3,
placeholder="Transcribe your audio, or type a note…")
with gr.Row():
d_transcribe = gr.Button("Transcribe", variant="secondary",
elem_id="td-transcribe",
elem_classes=["btn-secondary-action"])
d_savejournal = gr.Button("Save journal", variant="primary",
elem_id="td-savejournal",
elem_classes=["btn-primary-action"])
d_journal_log = gr.Markdown("")
with gr.Column(scale=1, min_width=250, elem_classes=["proof-col"]):
gr.HTML("
📸 Photos & taglines
")
d_photo = gr.Image(type="filepath", show_label=False, height=170)
d_tagline = gr.Textbox(label="Caption", placeholder="A caption for this photo", lines=3)
with gr.Row():
d_addphoto = gr.Button("Add photo", variant="primary",
elem_id="td-addphoto",
elem_classes=["btn-primary-action"])
d_gallery = gr.Gallery(label="Photos for this task", columns=3, height=170)
# --- Footer Actions ---
d_capture_status = gr.Markdown("")
with gr.Row(elem_classes=["hints-header"]):
gr.Markdown("💡 Stuck? Take a hint..", elem_classes=["guide-header", "hints-label"])
d_points = gr.Markdown("", elem_classes=["points-display"])
with gr.Row(elem_classes=["hints-input-row"]):
d_ask_q = gr.Textbox(show_label=False, scale=1,
placeholder="Ask for a clue, or tell me where you're stuck…",
elem_classes=["hint-input"])
d_ask_btn = gr.Button(f"Get a hint −{HINT_PENALTY} pts", scale=0,
min_width=220,
elem_id="td-ask", elem_classes=["btn-hint"])
d_ask_a = gr.Markdown("")
d_hint_log = gr.Markdown("")
d_proof = gr.Markdown("")
completion_instruction = gr.Markdown(
"🔒 Add a photo or voice note to unlock completion",
elem_classes=["completion-instruction", "locked"])
d_complete = gr.Button("🔒 Mark completed", variant="primary",
interactive=False, elem_id="td-complete",
elem_classes=["mark-complete-btn", "complete-btn", "locked"])
d_finish = gr.Button("🏁 Finish & wait for others", variant="primary",
elem_id="td-finish", elem_classes=["finish-btn"])
# 3. Right global spacer
gr.Column(scale=1, min_width=0)
# ── SCREEN: WAIT-LOBBY ────────────────────────────────────────────────
with gr.Group(visible=False, elem_classes=["cq-screen"]) as g_wait:
with gr.Column(elem_classes=["cq-card"]):
gr.Markdown("## 🌿 You're done — nicely paced!")
gr.Markdown("*Waiting for the other teams to wrap up. Meanwhile, make something silly:*")
wait_status = gr.HTML("")
# Live "who's still playing" line + manual advance to results. The
# button stays DISABLED until the results gate passes (all active
# teams finished AND min_ok); poll_tick flips its enabled state. The
# wait screen NEVER auto-navigates — this click is the only path.
wait_status_line = gr.Markdown("", elem_classes=["lobby-hint"])
wait_see_results = gr.Button("🏆 See final results", variant="primary",
interactive=False, elem_id="wait-see-results",
elem_classes=["lobby-start-btn"])
with gr.Row():
wait_funny_recap = gr.Button("😄 Funny recap", variant="secondary", elem_id="wait-funny-recap")
wait_funny_post = gr.Button("🎨 Funny poster", variant="secondary", elem_id="wait-funny-post")
wait_recap_md = gr.Markdown("")
wait_post_img = gr.Image(label="Funny poster", height=320, visible=True, elem_classes=["cq-poster"])
wait_post_status = gr.Markdown("")
# ── SCREEN: WIN ───────────────────────────────────────────────────────
with gr.Group(visible=False, elem_classes=["cq-screen", "cq-win"]) as g_win:
with gr.Column(elem_classes=["cq-win-standings"]):
win_scoreboard = gr.Markdown("Final standings will appear here.")
with gr.Group(elem_classes=["cq-card", "cq-win-celebrate"]):
gr.Markdown("### 🎉 Celebrate the run")
win_recap = gr.Button("📖 Winning recap", variant="primary", elem_id="win-recap", elem_classes=["cq-win-btn"])
win_recap_md = gr.Markdown("")
with gr.Row():
win_endpost = gr.Button("🌅 Ending poster", variant="secondary", elem_id="win-endpost", elem_classes=["cq-win-btn"])
win_grouppost = gr.Button("👯 Group poster", variant="secondary", elem_id="win-grouppost", elem_classes=["cq-win-btn"])
win_poster_img = gr.Image(label="Poster", height=320, visible=True)
win_poster_status = gr.Markdown("")
win_new = gr.Button("✨ Start a new adventure", variant="primary", elem_id="win-new", elem_classes=["cq-win-btn", "cq-win-new"])
# ── Shared poll timer ─────────────────────────────────────────────────
poll = gr.Timer(1.5)
nav_groups = [g_home, g_create, g_join, g_lobby, g_play, g_wait, g_win]
poll.tick(poll_tick, inputs=[s_session, s_screen, s_team, s_player, s_host, s_poll_state],
outputs=nav_groups + [s_screen, lobby_code, lobby_roster, lobby_start,
lobby_start_anyway, lobby_hint,
play_timer, play_status, wait_status, wait_status_line,
wait_see_results, win_scoreboard,
poll, s_poll_state])
# ── Navigation ────────────────────────────────────────────────────────
create_inputs = [c_name, c_type, c_city, c_area, c_teams, c_duration, c_players]
home_create.click(lambda: (*vis("create"), "create"), outputs=nav_groups + [s_screen]
).then(create_summary, inputs=create_inputs, outputs=[create_summary_html])
home_join.click(lambda: (*vis("join"), "join"), outputs=nav_groups + [s_screen])
create_back.click(lambda: (*vis("home"), "home"), outputs=nav_groups + [s_screen])
join_back.click(lambda: (*vis("home"), "home"), outputs=nav_groups + [s_screen])
# ── Head start visibility: hide for scavenger hunt (irrelevant there) ──
def toggle_head_start(game_type):
if game_type == "scavenger_hunt":
return gr.update(visible=False, value=0)
return gr.update(visible=True, value=60)
c_type.change(toggle_head_start, inputs=[c_type], outputs=[c_headstart])
# ── Live form validation (pure UI) ────────────────────────────────────
for comp in create_inputs:
comp.change(create_summary, inputs=create_inputs, outputs=[create_summary_html])
for comp in (j_code, j_name):
comp.change(validate_join_code, inputs=[j_code, j_name],
outputs=[join_code_info, join_submit])
# Route the host's chosen voice-journal language (full name) into the ISO
# code held in s_lang, which the play-screen audio handlers consume.
c_language.change(lambda name: LANGUAGE_TO_ISO.get(name, "en"),
inputs=[c_language], outputs=[s_lang])
create_submit.click(_btn_off, None, [create_submit]).then(
do_create,
inputs=[c_name, c_type, c_city, c_area, c_duration, c_teams, c_headstart,
c_diff, c_age, c_energy, c_players],
outputs=nav_groups + [s_screen, s_session, s_player, s_team, s_host,
lobby_code, lobby_roster, lobby_start,
lobby_copy, lobby_hint] + team_btn,
).then(prepare_game, inputs=[s_session], outputs=[lobby_prep]
).then(_btn_on, None, [create_submit])
join_submit.click(
do_join, inputs=[j_code, j_name],
outputs=nav_groups + [s_screen, s_session, s_player, s_team, s_host,
join_feedback, lobby_code, lobby_roster, lobby_start,
lobby_copy, lobby_hint] + team_btn,
)
for i, tb in enumerate(team_btn):
tb.click(make_pick(i), inputs=[s_session, s_player],
outputs=[s_team, lobby_roster] + team_btn)
lobby_leave.click(do_leave, outputs=nav_groups + [s_screen, s_session, s_player, s_team, s_host])
# Both Start and Start-anyway funnel through the SAME do_start path — no
# duplicated start logic; only the gating (in poll_tick) differs.
lobby_start.click(_btn_off, None, [lobby_start]).then(
do_start, inputs=[s_session, s_team],
outputs=nav_groups + [s_screen, play_timer, play_status, lobby_prep]
).then(_btn_on, None, [lobby_start])
lobby_start_anyway.click(_btn_off, None, [lobby_start_anyway]).then(
do_start, inputs=[s_session, s_team],
outputs=nav_groups + [s_screen, play_timer, play_status, lobby_prep]
).then(_btn_on, None, [lobby_start_anyway])
# ── Task tabs → detail ────────────────────────────────────────────────
# Order must match _open_task's return tuple (19 fixed + 12 tab buttons).
detail_open_outputs = [s_task, g_detail, detail_md, detail_hint, detail_safety,
d_gallery, d_proof, d_complete, completion_instruction,
d_transcript, d_audio, d_tagline, d_ask_a, d_capture_status,
d_journal_log, task_position_label,
d_points, d_hint_log, d_ask_btn] + detail_tab_btn
# Keep the task-id list fresh and auto-open the first task when a client
# first reaches the play screen (host via Start, guests via poll).
poll.tick(play_tick, inputs=[s_session, s_screen, s_team, s_taskids, s_task],
outputs=detail_open_outputs + [s_taskids])
# The in-detail horizontal task tabs index into s_taskids.
for i, dtb in enumerate(detail_tab_btn):
dtb.click(open_task_factory(i), inputs=[s_session, s_team, s_taskids], outputs=detail_open_outputs)
prev_task_btn.click(go_adjacent_factory(-1), inputs=[s_session, s_team, s_taskids, s_task],
outputs=detail_open_outputs)
next_task_btn.click(go_adjacent_factory(+1), inputs=[s_session, s_team, s_taskids, s_task],
outputs=detail_open_outputs)
d_finish.click(do_finish, inputs=[s_session, s_team],
outputs=nav_groups + [s_screen, wait_status])
d_transcribe.click(_btn_off, None, [d_transcribe]).then(
do_transcribe, inputs=[d_audio, s_lang], outputs=[d_transcript]
).then(_btn_on, None, [d_transcribe])
# Auto-transcribe once the clip is actually on the server. stop_recording
# (mic) and upload (file) fire *after* Gradio has committed/uploaded the
# audio, so do_transcribe always sees a real filepath. This removes the
# race where clicking "Transcribe" immediately after recording sent a
# stale/None path and seemed to do nothing until you clicked around.
d_audio.stop_recording(_btn_off, None, [d_transcribe]).then(
do_transcribe, inputs=[d_audio, s_lang], outputs=[d_transcript]
).then(_btn_on, None, [d_transcribe])
d_audio.upload(_btn_off, None, [d_transcribe]).then(
do_transcribe, inputs=[d_audio, s_lang], outputs=[d_transcript]
).then(_btn_on, None, [d_transcribe])
d_savejournal.click(_btn_off, None, [d_savejournal]).then(
do_save_journal,
inputs=[s_session, d_transcript, d_audio, s_lang, s_task, s_team],
outputs=[d_capture_status, d_proof, d_complete, completion_instruction,
d_journal_log, d_audio, d_transcript]
).then(_btn_on, None, [d_savejournal])
d_addphoto.click(do_add_photo, inputs=[s_session, d_photo, d_tagline, s_task, s_team],
outputs=[d_capture_status, d_gallery, d_proof, d_complete,
completion_instruction, d_photo, d_tagline])
# on_get_hint manages the button itself (disable while thinking, then set
# the "(N left)"/"No hints left" state), so it does NOT use the generic
# _btn_off/_btn_on toggle — that would re-enable a spent button.
d_ask_btn.click(
on_get_hint, inputs=[d_ask_q, s_session, s_team, s_task],
outputs=[d_ask_a, d_points, d_ask_btn, d_hint_log]
)
# After completing, re-render the same task in place so the ✓ shows on its
# tab and the completed state is reflected (no wall to bounce back to).
d_complete.click(do_complete, inputs=[s_session, s_team, s_task],
outputs=detail_open_outputs)
# ── Wait-lobby actions ────────────────────────────────────────────────
# Manual, click-only advance to the win screen (gate/enabled by poll_tick).
wait_see_results.click(do_see_results, inputs=[s_session],
outputs=nav_groups + [s_screen, win_scoreboard])
wait_funny_recap.click(_btn_off, None, [wait_funny_recap]).then(
do_funny_recap, inputs=[s_session, s_team], outputs=[wait_recap_md]
).then(_btn_on, None, [wait_funny_recap])
wait_funny_post.click(_btn_off, None, [wait_funny_post]).then(
do_funny_post, inputs=[s_session, s_team], outputs=[wait_post_img, wait_post_status]
).then(_btn_on, None, [wait_funny_post])
# ── Winning actions ───────────────────────────────────────────────────
win_recap.click(_btn_off, None, [win_recap]).then(
do_win_recap, inputs=[s_session], outputs=[win_recap_md]
).then(_btn_on, None, [win_recap])
win_endpost.click(_btn_off, None, [win_endpost]).then(
do_win_endpost, inputs=[s_session], outputs=[win_poster_img, win_poster_status]
).then(_btn_on, None, [win_endpost])
win_grouppost.click(_btn_off, None, [win_grouppost]).then(
do_win_grouppost, inputs=[s_session], outputs=[win_poster_img, win_poster_status]
).then(_btn_on, None, [win_grouppost])
win_new.click(do_new, outputs=nav_groups + [s_screen, s_session, s_player, s_team, s_host])
# ── Entry point ────────────────────────────────────────────────────────────
if __name__ == "__main__":
# ssr_mode=False avoids the Node SSR proxy that emits spurious
# `sse_stream 404` errors with our 1.5s polling timer.
demo.launch(theme=gradio_theme(), css=CSS, js=JS_INIT, ssr_mode=False,share=True)