import os import random import json import uuid import time import queue import subprocess from datetime import datetime, timedelta, timezone from typing import Dict, Any, List, Optional import spaces import requests from dotenv import load_dotenv import gradio as gr from gradio.components import LoginButton import data_manager from huggingface_hub import HfApi, hf_hub_download, whoami from transformers import Mistral3ForConditionalGeneration, AutoTokenizer, TextIteratorStreamer import threading import torch load_dotenv() APP_SECRET = os.urandom(24) ZONES_FILE = 'zones.json' zones = { "easy": [], "medium": [], "hard": [] } user_sessions: Dict[str, Dict[str, Any]] = {} DEFAULT_USERNAME = "player" def save_zones_to_file(): with open(ZONES_FILE, 'w') as f: json.dump(zones, f, indent=4) def load_zones_from_file(): global zones if os.path.exists(ZONES_FILE): try: with open(ZONES_FILE, 'r') as f: loaded_zones = json.load(f) if not (isinstance(loaded_zones, dict) and all(k in loaded_zones for k in ["easy", "medium", "hard"])): raise ValueError("Invalid format") migrated = False for difficulty in loaded_zones: for zone in loaded_zones[difficulty]: if 'id' not in zone: zone['id'] = uuid.uuid4().hex migrated = True zones = loaded_zones print(zones) if migrated: print("Info: Migrated old zone data by adding unique IDs.") save_zones_to_file() except (json.JSONDecodeError, IOError, ValueError): print(f"Warning: '{ZONES_FILE}' is corrupted or invalid. Recreating with empty zones.") save_zones_to_file() else: save_zones_to_file() LOCATIONS = [ {'lat': 48.85824, 'lng': 2.2945}, {'lat': 40.748440, 'lng': -73.985664}, {'lat': 35.689487, 'lng': 139.691711}, {'lat': -33.856784, 'lng': 151.215297} ] def generate_id(): return ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=10)) HF_DATASET_REPO = 'Jofthomas/geoguessr_game_of_the_day' GOOGLE_MAPS_API_KEY = os.getenv('GOOGLE_MAPS_API_KEY') SERVER_HF_TOKEN = os.getenv('HF_TOKEN', '') # Mistral hosted API (used by the "MM3.5" player). # The Mistral SDK convention is MISTRAL_API_KEY. # `mistral-medium-latest` is a rolling alias to the newest Medium model. # Override MISTRAL_MM35_MODEL if/when a pinned dated id (e.g. mistral-medium-2511) # is published for Mistral Medium 3.5. MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY", "") MISTRAL_API_URL = os.getenv("MISTRAL_API_URL", "https://api.mistral.ai/v1/chat/completions") MM35_MODEL_ID = os.getenv("MISTRAL_MM35_MODEL", "mistral-medium-latest") MM35_DISPLAY_NAME = "MM3.5" # Wipe the ZeroGPU offload dir and any partial HF cache from a previous (possibly # failed) container start. ZeroGPU pre-allocates the full packed-tensor blob with # posix_fallocate; stale files there are a common cause of # OSError: [Errno 28] No space left on device on ZeroGPU Spaces. # Note: we keep PATH and HOME so the shell can resolve `rm` and `~`. try: subprocess.run( "rm -rf /data-nvme/zerogpu-offload/* ~/.cache/huggingface/hub/tmp* 2>/dev/null || true", shell=True, env={ "PATH": os.environ.get("PATH", "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"), "HOME": os.environ.get("HOME", "/root"), }, check=False, ) except Exception as _e: print(f"[startup] offload cleanup skipped: {_e}") model_id = "mistralai/Magistral-Small-2509" tokenizer = AutoTokenizer.from_pretrained(model_id, tokenizer_type="mistral", use_fast=False) # On ZeroGPU, CUDA is emulated at module load and becomes a real GPU only inside # @spaces.GPU functions. The docs explicitly require model placement to happen at # the root module level (`.to("cuda")`); lazy moves inside @spaces.GPU are much # slower because tensor packing happens at startup. # https://huggingface.co/docs/hub/spaces-zerogpu#model-loading model = ( Mistral3ForConditionalGeneration.from_pretrained( model_id, torch_dtype=torch.bfloat16, low_cpu_mem_usage=True, ) .to("cuda") .eval() ) # SYSTEM_PROMPT_TEXT = ( # "You are a world-class geolocation expert. Given a street-view style image, " # "think step by step about visual clues and infer approximate coordinates. " # "When you conclude, output your answer inside [ANSWER]lat,lng[/ANSWER]." # ) SYSTEM_PROMPT_TEXT = """First draft your thinking process (inner monologue) until you arrive at a response. Format your response using Markdown, and use LaTeX for any mathematical equations. Write both your thoughts and the response in the same language as the input. Your thinking process must follow the template below:[THINK]Your thoughts or/and draft, like working through an exercise on scratch paper. Be as casual and as long as you want until you are confident to generate the response to the user.[/THINK]Here, provide a self-contained response.""" USER_INSTRUCTION = """You are a world-class geolocation expert. Given a street-view style image, think step by step about visual clues and infer approximate coordinates. When you conclude, output your final answer inside [ANSWER]lat,lng[/ANSWER]. Please analyze this image and provide coordinates in the required format.""" @spaces.GPU(duration=120) def llm_decode_image_return_text(image_bytes: bytes) -> str: print(f"[llm] decode start. image_bytes={len(image_bytes)} bytes") import base64, mimetypes try: encoded_image = base64.b64encode(image_bytes).decode('utf-8') mime_type = 'image/jpeg' data_url = f"data:{mime_type};base64,{encoded_image}" messages = [ {"role": "system", "content": [{"type": "text", "text": SYSTEM_PROMPT_TEXT}]}, {"role": "user", "content": [ {"type": "text", "text": USER_INSTRUCTION}, {"type": "image_url", "image_url": {"url": data_url}}, ]}, ] print(f"[llm] messages prepared. system+user with image_url length={len(data_url)}") tokenized = tokenizer.apply_chat_template(messages, return_dict=True) print(f"[llm] tokenized keys={list(tokenized.keys())}") import torch input_ids = torch.tensor(tokenized.input_ids).unsqueeze(0) attention_mask = torch.tensor(tokenized.attention_mask).unsqueeze(0) print(f"[llm] input_ids shape={tuple(input_ids.shape)} attn_mask shape={tuple(attention_mask.shape)} device={model.device}") kwargs = { 'input_ids': input_ids.to(model.device), 'attention_mask': attention_mask.to(model.device), } if 'pixel_values' in tokenized and len(tokenized.pixel_values) > 0: pixel_values = torch.tensor(tokenized.pixel_values[0], dtype=model.dtype).unsqueeze(0).to(model.device) image_sizes = torch.tensor(pixel_values.shape[-2:]).unsqueeze(0).to(model.device) kwargs.update({'pixel_values': pixel_values, 'image_sizes': image_sizes}) print(f"[llm] pixel_values shape={tuple(pixel_values.shape)} image_sizes={tuple(image_sizes.shape)}") output = model.generate(**kwargs)[0] print(f"[llm] generate done. output length={len(output)}") decoded = tokenizer.decode(output[len(tokenized.input_ids): ( -1 if output[-1] == tokenizer.eos_token_id else len(output) )]) print(f"[llm] decode done. text length={len(decoded)}") return decoded except Exception as e: print(f"[llm] decode failed: {e}") return f"[Error] {e}" @spaces.GPU(duration=120) def llm_stream_image_text(image_bytes: bytes): print(f"[llm-stream] start. image_bytes={len(image_bytes)} bytes") import base64 try: encoded_image = base64.b64encode(image_bytes).decode('utf-8') data_url = f"data:image/jpeg;base64,{encoded_image}" messages = [ {"role": "system", "content": [{"type": "text", "text": SYSTEM_PROMPT_TEXT}]}, {"role": "user", "content": [ {"type": "text", "text": USER_INSTRUCTION}, {"type": "image_url", "image_url": {"url": data_url}}, ]}, ] tokenized = tokenizer.apply_chat_template(messages, return_dict=True) input_ids = torch.tensor(tokenized.input_ids).unsqueeze(0) attention_mask = torch.tensor(tokenized.attention_mask).unsqueeze(0) kwargs = { 'input_ids': input_ids.to(model.device), 'attention_mask': attention_mask.to(model.device), 'max_new_tokens': 8192, } if 'pixel_values' in tokenized and len(tokenized.pixel_values) > 0: pixel_values = torch.tensor(tokenized.pixel_values[0], dtype=model.dtype).unsqueeze(0).to(model.device) image_sizes = torch.tensor(pixel_values.shape[-2:]).unsqueeze(0).to(model.device) kwargs.update({'pixel_values': pixel_values, 'image_sizes': image_sizes}) streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=False) kwargs['streamer'] = streamer thread = threading.Thread(target=model.generate, kwargs=kwargs) thread.start() acc = "" for new_text in streamer: acc += new_text yield acc except Exception as e: yield f"[Error] {e}" def mm35_stream_image_text(image_bytes: bytes): """Stream a response from Mistral Medium (MM3.5) via the Mistral hosted API. Yields the cumulative text after each new chunk, mirroring `llm_stream_image_text`'s contract so the two players are interchangeable. """ print(f"[mm35] start. image_bytes={len(image_bytes)} bytes model={MM35_MODEL_ID}") if not MISTRAL_API_KEY: yield "[Error] MISTRAL_API_KEY is not set on this Space. MM3.5 is disabled." return import base64 try: encoded_image = base64.b64encode(image_bytes).decode("utf-8") data_url = f"data:image/jpeg;base64,{encoded_image}" payload = { "model": MM35_MODEL_ID, "messages": [ {"role": "system", "content": SYSTEM_PROMPT_TEXT}, { "role": "user", "content": [ {"type": "text", "text": USER_INSTRUCTION}, {"type": "image_url", "image_url": data_url}, ], }, ], "stop": ["[ANSWER]"], "stream": True, "max_tokens": 4096, "temperature": 0.7, "top_p": 0.95, } headers = { "Authorization": f"Bearer {MISTRAL_API_KEY}", "Content-Type": "application/json", "Accept": "text/event-stream", } acc = "" with requests.post( MISTRAL_API_URL, json=payload, headers=headers, stream=True, timeout=180, ) as resp: if resp.status_code >= 400: body = resp.text[:500] yield f"[Error] Mistral API {resp.status_code}: {body}" return for raw_line in resp.iter_lines(decode_unicode=True): if not raw_line: continue if not raw_line.startswith("data:"): continue data_str = raw_line[5:].strip() if data_str == "[DONE]": break try: chunk = json.loads(data_str) except json.JSONDecodeError: continue choices = chunk.get("choices") or [] if not choices: continue delta = choices[0].get("delta") or {} content = delta.get("content") if not content: continue if isinstance(content, list): for part in content: if isinstance(part, dict): text_part = part.get("text") or "" if text_part: acc += text_part yield acc else: acc += content yield acc print(f"[mm35] done. text length={len(acc)}") except Exception as e: yield f"[Error] {e}" def pick_random_location(difficulty: str) -> Dict[str, float]: candidates = zones.get(difficulty, []) if candidates: selected_zone = random.choice(candidates) if selected_zone.get('type') == 'rectangle': b = selected_zone['bounds'] north, south, east, west = b['north'], b['south'], b['east'], b['west'] if west > east: east += 360 lng = random.uniform(west, east) if lng > 180: lng -= 360 lat = random.uniform(south, north) ensured = _ensure_street_view_location(lat, lng) if ensured: return ensured fallback = random.choice(LOCATIONS) ensured_fallback = _ensure_street_view_location(fallback['lat'], fallback['lng']) return ensured_fallback or fallback def street_view_image_url(lat: float, lng: float) -> str: if not GOOGLE_MAPS_API_KEY: # Fallback placeholder to avoid blank image when key is missing return "https://picsum.photos/1200/800" return ( f"https://maps.googleapis.com/maps/api/streetview?size=1200x800&location={lat},{lng}&fov=60&pitch=0&source=outdoor&key={GOOGLE_MAPS_API_KEY}" ) def _has_street_view(lat: float, lng: float) -> bool: if not GOOGLE_MAPS_API_KEY: return True try: resp = requests.get( "https://maps.googleapis.com/maps/api/streetview/metadata", params={"location": f"{lat},{lng}", "source": "outdoor", "key": GOOGLE_MAPS_API_KEY}, timeout=5, ) resp.raise_for_status() data = resp.json() # Check if it's OK and preferably outdoor (not inside buildings) if data.get("status") == "OK": # Prefer locations that are explicitly outdoor location_type = data.get("location_type") # If location_type is available, check it's not indoors if location_type and location_type == "INDOOR": return False return True return False except Exception: return False def _snap_to_nearest_road(lat: float, lng: float) -> Optional[Dict[str, float]]: if not GOOGLE_MAPS_API_KEY: return None try: resp = requests.get( "https://roads.googleapis.com/v1/nearestRoads", params={"points": f"{lat},{lng}", "key": GOOGLE_MAPS_API_KEY}, timeout=5, ) resp.raise_for_status() data = resp.json() points = data.get("snappedPoints") or [] if not points: return None loc = points[0].get("location") or {} if "latitude" in loc and "longitude" in loc: return {"lat": float(loc["latitude"]), "lng": float(loc["longitude"])} except Exception: pass return None def _ensure_street_view_location(lat: float, lng: float) -> Optional[Dict[str, float]]: """Return a coordinate with confirmed Street View coverage, snapped near a road when possible.""" if not GOOGLE_MAPS_API_KEY: return {"lat": lat, "lng": lng} checked: set[tuple] = set() snapped = _snap_to_nearest_road(lat, lng) candidates: List[Dict[str, float]] = [] if snapped: candidates.append(snapped) candidates.append({"lat": lat, "lng": lng}) # Explore a few jittered points if needed if not snapped: increments = [0.0005, -0.0005, 0.001, -0.001] for d_lat in increments: for d_lng in increments: if d_lat == 0 and d_lng == 0: continue candidates.append({"lat": lat + d_lat, "lng": lng + d_lng}) for candidate in candidates: key = (round(candidate["lat"], 6), round(candidate["lng"], 6)) if key in checked: continue checked.add(key) if _has_street_view(candidate["lat"], candidate["lng"]): return candidate return None def haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float: from math import radians, cos, sin, asin, sqrt lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2]) dlon = lon2 - lon1 dlat = lat2 - lat1 a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2 c = 2 * asin(sqrt(a)) r = 6371 return c * r def score_from_distance_km(distance_km: float) -> float: max_score = 5000.0 return max(0.0, max_score - distance_km) def build_street_html(image_url: str) -> str: base = """
""" return base.replace('__IMG_URL__', image_url) def gr_start_game(difficulty: str, username: str, request: gr.Request): rounds: List[Dict[str, Any]] = [] date_str = datetime.now(timezone.utc).date().isoformat() game_id = str(uuid.uuid4()) # Generate unique game ID for _ in range(3): loc = pick_random_location(difficulty) round_id = generate_id() rounds.append({ 'id': round_id, 'lat': loc['lat'], 'lng': loc['lng'], 'image_url': street_view_image_url(loc['lat'], loc['lng']), 'human_guess': None, 'ai_guess': None, 'human_score': 0.0, 'ai_score': 0.0, }) user_sessions[username] = { 'game_id': game_id, 'difficulty': difficulty, 'rounds': rounds, 'total_score': 0.0, 'completed': False, 'date': date_str, } r0 = rounds[0] street_html = build_street_html(r0['image_url']) return rounds, 0, r0['id'], street_html, "", "" def get_round(username: str, round_id: str) -> Optional[Dict[str, Any]]: session_data = user_sessions.get(username) if not session_data: return None for r in session_data['rounds']: if r['id'] == round_id: return r return None def gr_submit_guess(round_id: str, lat: float, lng: float, username: str, request: gr.Request): rnd = get_round(username, round_id) if not rnd: return "", "Round not found", gr.update(), gr.update(), gr.update() distance_km = haversine_km(rnd['lat'], rnd['lng'], float(lat), float(lng)) score = score_from_distance_km(distance_km) rnd['human_guess'] = {'lat': float(lat), 'lng': float(lng)} rnd['human_score'] = score rnd['human_distance_km'] = float(distance_km) result_text = f"Your guess was {distance_km:.2f} km away. You scored {score:.0f} points." scoreboard_html = ( f"
" f"
Human Guess Recorded
" f"
{result_text}
" f"
" f" Distance: {distance_km:.2f} km" f" Score: {score:.0f} pts" f"
" f"
AI analysis will be added once the model finishes.
" f"
" ) popup_html = """

Round Results

__SCOREBOARD__
G = Ground Truth H = Human A = AI
""".replace('__SCOREBOARD__', scoreboard_html)\ .replace('__RND_LAT__', str(rnd['lat']))\ .replace('__RND_LNG__', str(rnd['lng']))\ .replace('__H_LAT__', str(float(lat)))\ .replace('__H_LNG__', str(float(lng)))\ .replace('__AI_LAT__', str(float(lat)))\ .replace('__AI_LNG__', str(float(lng)))\ .replace('__GMAPS_KEY__', GOOGLE_MAPS_API_KEY or '') return popup_html, result_text, rnd['lat'], rnd['lng'], score def extract_coords_from_text(text: str): import re m = re.search(r"\[ANSWER\]\s*([+-]?\d+(?:\.\d+)?)\s*,\s*([+-]?\d+(?:\.\d+)?)\s*\[/ANSWER\]", text, re.IGNORECASE) if not m: return None try: lat = float(m.group(1)) lng = float(m.group(2)) return {'lat': lat, 'lng': lng} except Exception: return {'lat': 0, 'lng': 0} # instead of defaulting to geocode it will return 0 as default when failing def geocode_text_to_coords(query: str) -> Optional[Dict[str, float]]: if not GOOGLE_MAPS_API_KEY: return None resp = requests.get('https://maps.googleapis.com/maps/api/geocode/json', params={'address': query, 'key': GOOGLE_MAPS_API_KEY}) try: j = resp.json() if j.get('results'): loc = j['results'][0]['geometry']['location'] return {'lat': loc['lat'], 'lng': loc['lng']} except Exception: return None return None def format_coords(coords: Optional[Dict[str, float]]) -> str: if not coords or 'lat' not in coords or 'lng' not in coords: return "N/A" return f"lat: {coords['lat']:.2f}, lng: {coords['lng']:.2f}" load_zones_from_file() def _read_text(path: str) -> str: try: with open(path, 'r') as f: return f.read() except Exception: return "" APP_CSS = _read_text('static/style.css') + "\n#lat_box, #lng_box { display:none; }\n" + """ #app-styles { display: none !important; } #next_btn { position: absolute; left: -9999px; } #lobby_group, #game_group{max-width:1024px;margin:24px auto;padding:0;} #start_btn{height:48px;font-weight:700} .gradio-container{background:#FFFBEB} body, .gradio-container, .gradio-container *{color:#111 !important} /* force markdown text to be dark */ .gradio-container .prose, .gradio-container .prose *{color:#111 !important} /* override: LLM output textboxes use white font on dark background */ #ai_chat, #ai_chat *, #mm35_chat, #mm35_chat *{color:#fff !important} #ai_chat textarea, #mm35_chat textarea{background:#111 !important;color:#fff !important;border-radius:10px !important;} #ai_chat label, #mm35_chat label{color:#fff !important} #ai_panels{gap:14px;align-items:stretch;} #ai_panels > .form{flex:1;} /* difficulty dropdown white text */ #difficulty_select, #difficulty_select *{color:#fff !important} /* keep dropdown menu items readable */ .svelte-3lgy39 .wrap-inner, .wrap-inner{ color: inherit; } #popup-overlay, #popup-overlay * {color:#f9fafb !important} #popup-overlay #ai-analysis-box, #popup-overlay #ai-analysis-box * { color: #1e293b !important; } /* Game Over summary text in orange */ .gradio-container div[style*='text-align:center'] h2, .gradio-container div[style*='text-align:center'] h3, .gradio-container div[style*='text-align:center'] p { color: #FA500F !important; } /* Flatten Gradio's panel chrome: no thick grey borders / shadows around groups, columns or input wrappers. */ .gradio-container button, .gradio-container .form, .gradio-container .block, .gradio-container input, .gradio-container textarea, .gradio-container select, .gradio-container .gr-box, .gradio-container .gr-input, .gradio-container .gr-form, .gradio-container .gr-group, .gradio-container .panel, .gradio-container .container, .gradio-container .wrap { border: none !important; box-shadow: none !important; } .gradio-container .gr-group, .gradio-container .gr-box, .gradio-container .block, .gradio-container .form, .gradio-container .panel { background: transparent !important; padding: 0 !important; } #lobby_group, #game_group { background: transparent !important; box-shadow: none !important; border: none !important; } /* Style login button to be compact */ #login_button {max-width: 180px !important;} #login_button button {padding: 6px 14px !important; font-size: 0.9rem !important;} /* Style Hugging Face GPU notifications + the ZeroGPU "processing | X/Y" chip that the `spaces` package injects in the top-right of the page. */ .toast-wrap, .toast-body {background: #f8fafc !important; color: #111 !important; border: none !important;} .toast-title {color: #111 !important;} .toast-text {color: #334155 !important;} .toast-close {color: #111 !important;} [class*='zero-gpu'], [class*='zerogpu'], [data-testid*='zero-gpu'], .gpu-processing, .gradio-zero-gpu-indicator, #zero-gpu-indicator { font-size: 0.72rem !important; padding: 4px 8px !important; border-radius: 999px !important; opacity: 0.65 !important; transform: scale(0.85); transform-origin: top right; } """ # Client boot JS to initialize the mini-map reliably in Gradio (scripts in HTML are sanitized) APP_BOOT_JS = """ () => { const GMAPS_KEY = "__GMAPS_KEY__"; const log = (...a) => { try { console.log('[boot]', ...a); } catch(_) {} }; function ensureMapsLoaded(cb) { if (window.google && google.maps) return cb(); if (!GMAPS_KEY) { log('No GOOGLE_MAPS_API_KEY; mini-map disabled'); return; } window.__gmapsQueue = window.__gmapsQueue || []; window.__gmapsQueue.push(cb); if (window.__gmapsLoading) return; window.__gmapsLoading = true; window.__mini_cb__ = () => { log('Google Maps ready'); const q = window.__gmapsQueue || []; q.forEach(fn => { try { fn(); } catch(_) {} }); window.__gmapsQueue = []; }; const s = document.createElement('script'); s.async = true; s.defer = true; s.dataset.gmapsLoader = '1'; s.src = 'https://maps.googleapis.com/maps/api/js?key=' + GMAPS_KEY + '&callback=__mini_cb__'; s.onerror = () => log('Failed to load Google Maps script'); document.head.appendChild(s); } function initMiniMapIfPresent() { const el = document.getElementById('mini-map'); if (!el || el.dataset.initialized === '1') return; ensureMapsLoaded(() => { try { const map = new google.maps.Map(el, { center: { lat: 0, lng: 0 }, zoom: 1, streetViewControl: false, mapTypeControl: false, fullscreenControl: false }); window._miniMapInstance = map; el.dataset.initialized = '1'; let marker=null; map.addListener('click',(e)=>{ if(marker) marker.setMap(null); marker=new google.maps.Marker({position:e.latLng, map}); const latBox=document.querySelector('#lat_box input, #lat_box textarea, #lat_box input[type=number]'); const lngBox=document.querySelector('#lng_box input, #lng_box textarea, #lng_box input[type=number]'); if(latBox){ latBox.value=e.latLng.lat(); latBox.dispatchEvent(new Event('input',{bubbles:true})); } if(lngBox){ lngBox.value=e.latLng.lng(); lngBox.dispatchEvent(new Event('input',{bubbles:true})); } }); setTimeout(() => { try { google.maps.event.trigger(map, 'resize'); map.setCenter({ lat: 0, lng: 0 }); } catch(_) {} }, 150); log('Mini-map initialized'); } catch (e) { log('Mini-map init error', e); } }); } function initPopupIfPresent() { log('initPopupIfPresent called'); const el = document.getElementById('popup-map'); if (!el || el.dataset.initialized === '1') return; log("Raw AI dataset values:", { lat: el.dataset.aiLat, lng: el.dataset.aiLng }); log("Raw MM3.5 dataset values:", { lat: el.dataset.mmLat, lng: el.dataset.mmLng }); const rnd = { lat: parseFloat(el.dataset.rndLat), lng: parseFloat(el.dataset.rndLng) }; const human = { lat: parseFloat(el.dataset.hLat), lng: parseFloat(el.dataset.hLng) }; const ai = { lat: parseFloat(el.dataset.aiLat), lng: parseFloat(el.dataset.aiLng) }; const mm = { lat: parseFloat(el.dataset.mmLat), lng: parseFloat(el.dataset.mmLng) }; log("Parsed AI coords:", ai); log("Parsed MM3.5 coords:", mm); ensureMapsLoaded(() => { try { log('Popup map element found, ensuring maps loaded...'); const mapOpts={zoom:6,center:rnd,mapTypeControl:false,streetViewControl:false,fullscreenControl:false}; const m = new google.maps.Map(el, mapOpts); el.dataset.initialized = '1'; const bounds = new google.maps.LatLngBounds(); const markerIcon = (fill, stroke) => ({ path: google.maps.SymbolPath.CIRCLE, scale: 9.5, fillColor: fill, fillOpacity: 1, strokeColor: stroke, strokeWeight: 2 }); const markerLabel = (text) => ({ text, color: '#ffffff', fontWeight: '700', fontSize: '12px' }); const gMk = new google.maps.Marker({ position: rnd, map: m, label: markerLabel('G'), icon: markerIcon('#22c55e', '#166534') }); bounds.extend(gMk.getPosition()); if (Number.isFinite(ai.lat) && Number.isFinite(ai.lng)) { log("Magistral coords valid, creating marker."); const aMk = new google.maps.Marker({ position: ai, map: m, label: markerLabel('M'), icon: markerIcon('#FA500F', '#c2410c') }); bounds.extend(aMk.getPosition()); new google.maps.Polyline({ path: [rnd, ai], geodesic: true, strokeColor: '#FA500F', strokeOpacity: 1.0, strokeWeight: 2, map: m }); } else { log("Magistral coords NOT valid, skipping marker."); } if (Number.isFinite(mm.lat) && Number.isFinite(mm.lng)) { log("MM3.5 coords valid, creating marker."); const mmMk = new google.maps.Marker({ position: mm, map: m, label: markerLabel('3'), icon: markerIcon('#A855F7', '#6B21A8') }); bounds.extend(mmMk.getPosition()); new google.maps.Polyline({ path: [rnd, mm], geodesic: true, strokeColor: '#A855F7', strokeOpacity: 1.0, strokeWeight: 2, map: m }); } else { log("MM3.5 coords NOT valid, skipping marker."); } if (Number.isFinite(human.lat) && Number.isFinite(human.lng)) { const hMk = new google.maps.Marker({ position: human, map: m, label: markerLabel('H'), icon: markerIcon('#2563EB', '#1e3a8a') }); bounds.extend(hMk.getPosition()); new google.maps.Polyline({ path: [rnd, human], geodesic: true, strokeColor: '#2563EB', strokeOpacity: 1.0, strokeWeight: 2, map: m }); } const ne = bounds.getNorthEast(); const sw = bounds.getSouthWest(); if (ne && sw && ne.equals(sw)) { m.setCenter(ne); log("Setting zoom to 18"); m.setZoom(18); } else { log("Fitting map to bounds"); m.fitBounds(bounds); } setTimeout(() => { try { google.maps.event.trigger(m, 'resize'); const ne2 = bounds.getNorthEast(); const sw2 = bounds.getSouthWest(); if (ne2 && sw2 && ne2.equals(sw2)) { m.setCenter(ne2); log("Setting zoom to 18 after resize"); m.setZoom(18); } else { log("Fitting map to bounds after resize"); m.fitBounds(bounds); } } catch (e) { log('Resize error', e); } }, 120); const closeButtons = [document.getElementById('popup-close-next'), document.getElementById('popup-close-next-footer')]; closeButtons.forEach((btn) => { if (!btn) return; if (!btn.dataset.bound) { btn.addEventListener('click', () => { const nxt = document.getElementById('next_btn'); if (nxt) nxt.click(); }); btn.dataset.bound = '1'; } }); window.addEventListener('keydown', (ev) => { if (ev.key === 'Escape') { const nxt = document.getElementById('next_btn'); if (nxt) nxt.click(); } }, { once: true }); log('Popup map initialized'); } catch (e) { log('Popup map init error', e); } }); } function initFullscreenButtonIfPresent() { log('initFullscreenButtonIfPresent called'); const btn = document.getElementById('fullscreen-btn'); const wrapper = document.getElementById('fullscreen-wrapper'); const img = document.getElementById('street-image'); if (!btn || !wrapper || !img || btn.dataset.initialized === '1') return; const enterIcon = ''; const exitIcon = ''; btn.innerHTML = enterIcon; const originalImgParentStyle = img.parentElement.style; const originalImgStyle = { width: img.style.width, height: img.style.height, objectFit: img.style.objectFit, borderRadius: img.style.borderRadius, }; btn.addEventListener('click', () => { if (!document.fullscreenElement) { wrapper.requestFullscreen().catch(err => { log('Fullscreen error:', err); }); } else { document.exitFullscreen(); } }); document.addEventListener('fullscreenchange', () => { if (document.fullscreenElement === wrapper) { btn.innerHTML = exitIcon; img.style.width = '100%'; img.style.height = '100%'; img.style.objectFit = 'contain'; img.style.borderRadius = '0'; } else { btn.innerHTML = enterIcon; img.style.width = originalImgStyle.width; img.style.height = originalImgStyle.height; img.style.objectFit = originalImgStyle.objectFit; img.style.borderRadius = originalImgStyle.borderRadius; } }); btn.dataset.initialized = '1'; log('Fullscreen button initialized'); } function initMapControlsIfPresent() { const plusBtn = document.getElementById('map-size-plus'); const minusBtn = document.getElementById('map-size-minus'); const mapWrap = document.getElementById('mini-map-wrap'); if (!plusBtn || !minusBtn || !mapWrap || mapWrap.dataset.controlsInitialized) return; const sizes = [{w: 200, h: 130}, {w: 280, h: 180}, {w: 400, h: 260}, {w: 550, h: 360}]; let currentSizeIndex = 1; const updateSize = () => { const newSize = sizes[currentSizeIndex]; mapWrap.style.width = newSize.w + 'px'; mapWrap.style.height = newSize.h + 'px'; if (window._miniMapInstance) { setTimeout(() => { google.maps.event.trigger(window._miniMapInstance, 'resize'); }, 300); } log('Map size changed to', newSize); }; plusBtn.addEventListener('click', (e) => { e.stopPropagation(); if (currentSizeIndex < sizes.length - 1) { currentSizeIndex++; updateSize(); } }); minusBtn.addEventListener('click', (e) => { e.stopPropagation(); if (currentSizeIndex > 0) { currentSizeIndex--; updateSize(); } }); mapWrap.dataset.controlsInitialized = '1'; updateSize(); log('Map controls initialized'); } const obs = new MutationObserver(() => { initMiniMapIfPresent(); initPopupIfPresent(); initMapControlsIfPresent(); initFullscreenButtonIfPresent(); }); obs.observe(document.documentElement, { childList: true, subtree: true }); initMiniMapIfPresent(); initPopupIfPresent(); initMapControlsIfPresent(); initFullscreenButtonIfPresent(); } """.replace("__GMAPS_KEY__", GOOGLE_MAPS_API_KEY or '') with gr.Blocks(title="LLM GeoGuessr") as demo: # Gradio 6 dropped `css=` on `Blocks(...)`, and `launch(css=...)` isn't # always honoured under SSR. Injecting a ", elem_id="app-styles") user_profile = gr.State() with gr.Row(): gr.Markdown("## LLM GeoGuessr", elem_id="title_md") login_button = LoginButton(visible=True, elem_id="login_button", scale=0, min_width=300) gr.Markdown(f""" ### Your opponents You play against **two** Mistral AI models on every round: - **Magistral** — an open multimodal reasoning model (`Magistral-Small-2509`, 24B) running locally on ZeroGPU. - **{MM35_DISPLAY_NAME}** — Mistral Medium 3.5, called through the Mistral hosted API. Neither model was trained on GeoGuessr data and neither has access to any tools (maps, search, …), so the playing field stays fair — they only see what you see: the Street View image. ### How to Play 1. **Login** using your Hugging Face account and click "Start Game". 2. You'll be shown a random Google Street View image. 3. Place a marker on the mini-map to guess the location. 4. Submit your guess and watch both AIs reason in parallel. Score = 5000 − (distance in km). The game consists of 3 rounds! """) login_prompt_md = gr.Markdown("### Please log in with your Hugging Face account to play.", visible=True) logged_in_md = gr.Markdown(visible=False) with gr.Group(visible=False, elem_id="lobby_group") as lobby_group: start_btn = gr.Button("Start Game", variant="primary", elem_id="start_btn") limit_msg = gr.Markdown(visible=False) with gr.Group(visible=False, elem_id="game_group") as game_group: rounds_state = gr.State([]) idx_state = gr.State(0) round_id_box = gr.Textbox(visible=False) lat_box = gr.Number(visible=True, elem_id="lat_box", label="lat") lng_box = gr.Number(visible=True, elem_id="lng_box", label="lng") street_html = gr.HTML(visible=True) map_html = gr.HTML(visible=False) validate_btn = gr.Button("Validate Guess", visible=True) result_md = gr.Markdown() popup_html = gr.HTML() with gr.Row(elem_id="ai_panels"): with gr.Column(scale=1): ai_chat = gr.Textbox( label="Magistral (local, ZeroGPU)", interactive=False, elem_id="ai_chat", lines=15, ) with gr.Column(scale=1): mm35_chat = gr.Textbox( label=f"{MM35_DISPLAY_NAME} (Mistral API)", interactive=False, elem_id="mm35_chat", lines=15, ) next_btn = gr.Button("Next", visible=True, elem_id="next_btn") final_md = gr.Markdown(visible=True) def on_login(token: gr.OAuthToken | None): if not token: return ( None, gr.update(visible=True), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(), gr.update(visible=False), ) try: profile = whoami(token=token.token) username = profile["name"] # Use server token to read the dataset, not user's token todays_games = data_manager.get_todays_games(token=SERVER_HF_TOKEN) # Only check if user played today if BLOCK_MULTIPLE_GAMES is enabled has_played = data_manager.BLOCK_MULTIPLE_GAMES and data_manager.has_user_played_today(username, todays_games) except Exception as e: gr.Warning(f"Could not check your game status. Please try again. Error: {e}") return ( None, gr.update(visible=True), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(), gr.update(visible=False), ) welcome_message = f"Welcome, **{profile.get('fullname', username)}**! You are logged in as **{username}**." updates = [ profile, gr.update(visible=False), gr.update(visible=True), gr.update(visible=True, value=welcome_message), gr.update(visible=True), ] if has_played: limit_message = "You have already played today. Please come back tomorrow for a new challenge!" updates.extend([ gr.update(interactive=False), gr.update(visible=True, value=limit_message), ]) else: updates.extend([ gr.update(interactive=True), gr.update(visible=False), ]) return tuple(updates) # Use demo.load to set the initial UI state when the app loads with an existing token. # This is the key fix for the UI flickering issue. demo.load( on_login, outputs=[ user_profile, login_prompt_md, login_button, logged_in_md, lobby_group, start_btn, limit_msg, ], show_progress="hidden", ) # The click handler is still needed to initiate the login flow if the user is not logged in. login_button.click( on_login, outputs=[ user_profile, login_prompt_md, login_button, logged_in_md, lobby_group, start_btn, limit_msg, ], show_progress="hidden", ) def start_click(profile: dict, request: gr.Request): if not profile: gr.Warning("Please log in before starting the game.") return None, 0, "", "", "", "", gr.update(), gr.update(), gr.update(), gr.update(), gr.update() r, idx, rid, s_html, m_html, err = gr_start_game("easy", profile["name"], request) return ( r, idx, rid, s_html, m_html, gr.update(value=""), gr.update(visible=True), gr.update(visible=False), gr.update(value=""), gr.update(value=""), gr.update(value=""), ) start_btn.click( start_click, inputs=[user_profile], outputs=[ rounds_state, idx_state, round_id_box, street_html, map_html, result_md, game_group, lobby_group, limit_msg, ai_chat, mm35_chat, ], # Avoid the gray "processing" overlay on the freshly-revealed game widgets # while gr_start_game hits the Street View / Roads APIs. show_progress="hidden", ) def on_validate(rid, lat, lng, profile: dict, request: gr.Request): if not profile: return username = profile["name"] _popup, txt, a_lat, a_lng, _score = gr_submit_guess(rid, lat, lng, username, request) yield "", txt + "\n\n[Magistral] Analyzing image...", f"[{MM35_DISPLAY_NAME}] Analyzing image..." rnd = get_round(username, rid) if not rnd: yield "", txt + "\n\n[Error] Round not found", "" return try: img_resp = requests.get(rnd['image_url']) img_resp.raise_for_status() image_bytes = img_resp.content except Exception as e: yield "", txt + f"\n\n[Error] {e}", "" return # Run both players in parallel: Magistral via ZeroGPU + MM3.5 via Mistral API. mag_q: "queue.Queue[tuple]" = queue.Queue() mm_q: "queue.Queue[tuple]" = queue.Queue() def _runner(stream_fn, q): try: for partial in stream_fn(image_bytes): q.put(("text", partial)) except Exception as ex: q.put(("error", str(ex))) finally: q.put(("done", None)) t_mag = threading.Thread(target=_runner, args=(llm_stream_image_text, mag_q), daemon=True) t_mm = threading.Thread(target=_runner, args=(mm35_stream_image_text, mm_q), daemon=True) t_mag.start() t_mm.start() mag_text = "" mm_text = "" mag_done = False mm_done = False mag_error = None mm_error = None def _drain(q, current_text, done_flag, err): try: while True: kind, data = q.get_nowait() if kind == "text": current_text = data elif kind == "error": err = data current_text = (current_text + f"\n[Error] {data}").strip() elif kind == "done": done_flag = True break except queue.Empty: pass return current_text, done_flag, err while not (mag_done and mm_done): prev = (mag_text, mm_text, mag_done, mm_done) if not mag_done: mag_text, mag_done, mag_error = _drain(mag_q, mag_text, mag_done, mag_error) if not mm_done: mm_text, mm_done, mm_error = _drain(mm_q, mm_text, mm_done, mm_error) if (mag_text, mm_text, mag_done, mm_done) != prev: yield ( "", txt + "\n\n" + (mag_text or "[Magistral] (no output yet)"), mm_text or f"[{MM35_DISPLAY_NAME}] (no output yet)", ) else: time.sleep(0.05) t_mag.join(timeout=1.0) t_mm.join(timeout=1.0) # Compute guesses + scores for both AIs for player, last_text, prefix in ( ("ai", mag_text, "ai"), ("mm35", mm_text, "mm35"), ): coords = extract_coords_from_text(last_text) or geocode_text_to_coords(last_text[-256:]) if coords: rnd[f'{prefix}_guess'] = coords dist_km = haversine_km(rnd['lat'], rnd['lng'], coords['lat'], coords['lng']) rnd[f'{prefix}_distance_km'] = float(dist_km) rnd[f'{prefix}_score'] = score_from_distance_km(dist_km) rnd[f'{prefix}_analysis'] = last_text # Record this round's data immediately to prevent abuse sess = user_sessions.get(username, {}) game_id = sess.get('game_id', '') round_idx = next((i for i, rr in enumerate(sess.get('rounds', [])) if rr['id'] == rid), 0) + 1 round_record = { "round_number": round_idx, "actual_location": {"lat": rnd.get('lat'), "lng": rnd.get('lng')}, "human_guess": rnd.get('human_guess'), "human_distance_km": round(rnd.get('human_distance_km', 0), 2), "human_score": float(round(rnd.get('human_score', 0))), "ai_guess": rnd.get('ai_guess'), "ai_distance_km": round(rnd.get('ai_distance_km', 0), 2) if rnd.get('ai_distance_km') else None, "ai_score": float(round(rnd.get('ai_score', 0))) if rnd.get('ai_score') else 0.0, "ai_analysis": rnd.get('ai_analysis', ''), "mm35_guess": rnd.get('mm35_guess'), "mm35_distance_km": round(rnd.get('mm35_distance_km', 0), 2) if rnd.get('mm35_distance_km') else None, "mm35_score": float(round(rnd.get('mm35_score', 0))) if rnd.get('mm35_score') else 0.0, "mm35_analysis": rnd.get('mm35_analysis', ''), "mm35_model": MM35_MODEL_ID, } try: data_manager.update_game_record(username, game_id, round_data=round_record) except Exception as e: print(f"[on_validate] record update failed: {e}") # Build the scoreboard popup with all three players sess = user_sessions.get(username, {}) total_human = sum(float(r.get('human_score', 0.0)) for r in sess.get('rounds', [])) total_ai = sum(float(r.get('ai_score', 0.0)) for r in sess.get('rounds', [])) total_mm35 = sum(float(r.get('mm35_score', 0.0)) for r in sess.get('rounds', [])) import html as _html mag_text_safe = _html.escape(mag_text or "") mm_text_safe = _html.escape(mm_text or "") summary_safe = _html.escape(txt) human_guess_str = format_coords(rnd.get('human_guess')) ai_guess_str = format_coords(rnd.get('ai_guess')) mm35_guess_str = format_coords(rnd.get('mm35_guess')) scoreboard_html = ( f"
" f"
Round {round_idx}
" f"
{summary_safe}
" f"
" f" Human: {rnd.get('human_score',0):.0f} pts ( {human_guess_str} ) ({rnd.get('human_distance_km',0.0):.1f} km)" f" Magistral: {rnd.get('ai_score',0):.0f} pts ( {ai_guess_str} ) ({rnd.get('ai_distance_km',0.0):.1f} km)" f" {MM35_DISPLAY_NAME}: {rnd.get('mm35_score',0):.0f} pts ( {mm35_guess_str} ) ({rnd.get('mm35_distance_km',0.0):.1f} km)" f"
" f"
Totals — Human {total_human:.0f} / Magistral {total_ai:.0f} / {MM35_DISPLAY_NAME} {total_mm35:.0f}
" f"
" f"
" f"
Magistral
" f"
" + mag_text_safe + "
" f"
" f"
" f"
{MM35_DISPLAY_NAME}
" f"
" + mm_text_safe + "
" f"
" f"
" f"
" ) def _coord_strs(d): if not d: return "", "" return str(float(d.get('lat', 0.0))), str(float(d.get('lng', 0.0))) ai_lat_str, ai_lng_str = _coord_strs(rnd.get('ai_guess')) mm_lat_str, mm_lng_str = _coord_strs(rnd.get('mm35_guess')) popup_html_template = """ """ popup_html = ( popup_html_template .replace('__SCOREBOARD__', scoreboard_html) .replace('__RND_LAT__', str(rnd['lat'])) .replace('__RND_LNG__', str(rnd['lng'])) .replace('__H_LAT__', str(float(lat))) .replace('__H_LNG__', str(float(lng))) .replace('__AI_LAT__', ai_lat_str) .replace('__AI_LNG__', ai_lng_str) .replace('__MM_LAT__', mm_lat_str) .replace('__MM_LNG__', mm_lng_str) .replace('__GMAPS_KEY__', GOOGLE_MAPS_API_KEY or '') ) yield popup_html, (txt + "\n\n" + (mag_text or "")), (mm_text or "") validate_btn.click( on_validate, inputs=[round_id_box, lat_box, lng_box, user_profile], outputs=[popup_html, ai_chat, mm35_chat], # We already stream live tokens into the two AI textboxes, so Gradio's # default full-screen "pending" overlay is just noise that makes the UI # look frozen. show_progress="hidden", ) def on_next(r_state: list, idx: int, profile: dict, request: gr.Request): if not profile: return idx, gr.update(), gr.update(), gr.update(), gr.update(), gr.update() username = profile["name"] idx += 1 sess = user_sessions.get(username) if not sess or idx >= len(sess['rounds']): total_human = sum(float(r.get('human_score', 0.0)) for r in sess.get('rounds', [])) total_ai = sum(float(r.get('ai_score', 0.0)) for r in sess.get('rounds', [])) total_mm35 = sum(float(r.get('mm35_score', 0.0)) for r in sess.get('rounds', [])) game_id = sess.get('game_id', '') # Final scores (rounds already recorded incrementally) try: data_manager.update_game_record( username, game_id, final_score=total_human, final_ai_score=total_ai, final_mm35_score=total_mm35, ) except TypeError: # Backwards-compat: older data_manager.update_game_record without mm35 arg. data_manager.update_game_record( username, game_id, final_score=total_human, final_ai_score=total_ai, ) scores = { "You": total_human, "Magistral": total_ai, MM35_DISPLAY_NAME: total_mm35, } top_score = max(scores.values()) if scores else 0 winners = [name for name, sc in scores.items() if sc == top_score and top_score > 0] if not winners: winner_message = "Everyone scored 0 — try a harder round!" elif len(winners) == 1: w = winners[0] winner_message = "Congratulations, you won!" if w == "You" else f"{w} won this round." else: winner_message = "It's a tie between " + ", ".join(winners) + "!" summary_html = f"""

Game Over!

Final scores:

You: {total_human:.0f}

Magistral: {total_ai:.0f}

{MM35_DISPLAY_NAME}: {total_mm35:.0f}

{winner_message}

""" return ( idx, gr.update(value=summary_html), gr.update(value=""), gr.update(value=""), gr.update(value=""), gr.update(value=""), ) r = sess['rounds'][idx] s_html = build_street_html(r['image_url']) return ( idx, gr.update(value=s_html), gr.update(value=r['id']), gr.update(value=""), gr.update(value=""), gr.update(value=""), ) next_btn.click( on_next, inputs=[rounds_state, idx_state, user_profile], outputs=[idx_state, street_html, round_id_box, popup_html, ai_chat, mm35_chat], show_progress="hidden", ) # Inject boot JS using load(js=callable) compatible format demo.load(fn=lambda: None, inputs=None, outputs=None, js=APP_BOOT_JS) if __name__ == "__main__": # SSR (the Node proxy at :7861) makes the whole UI look like it's still # "loading" until the first interaction triggers React hydration -- which on # a heavy multimodal Space can take several seconds. Disabling SSR pays a # ~200ms time-to-first-paint cost but every widget is interactive immediately. demo.queue().launch( server_name="0.0.0.0", server_port=7860, ssr_mode=False, )