import os
import random
import json
import uuid
import time
import queue
import subprocess
from datetime import datetime, timedelta, timezone
from typing import Dict, Any, List, Optional
import spaces
import requests
from dotenv import load_dotenv
import gradio as gr
from gradio.components import LoginButton
import data_manager
from huggingface_hub import HfApi, hf_hub_download, whoami
from transformers import Mistral3ForConditionalGeneration, AutoTokenizer, TextIteratorStreamer
import threading
import torch
load_dotenv()
APP_SECRET = os.urandom(24)
ZONES_FILE = 'zones.json'
zones = {
"easy": [],
"medium": [],
"hard": []
}
user_sessions: Dict[str, Dict[str, Any]] = {}
DEFAULT_USERNAME = "player"
def save_zones_to_file():
with open(ZONES_FILE, 'w') as f:
json.dump(zones, f, indent=4)
def load_zones_from_file():
global zones
if os.path.exists(ZONES_FILE):
try:
with open(ZONES_FILE, 'r') as f:
loaded_zones = json.load(f)
if not (isinstance(loaded_zones, dict) and all(k in loaded_zones for k in ["easy", "medium", "hard"])):
raise ValueError("Invalid format")
migrated = False
for difficulty in loaded_zones:
for zone in loaded_zones[difficulty]:
if 'id' not in zone:
zone['id'] = uuid.uuid4().hex
migrated = True
zones = loaded_zones
print(zones)
if migrated:
print("Info: Migrated old zone data by adding unique IDs.")
save_zones_to_file()
except (json.JSONDecodeError, IOError, ValueError):
print(f"Warning: '{ZONES_FILE}' is corrupted or invalid. Recreating with empty zones.")
save_zones_to_file()
else:
save_zones_to_file()
LOCATIONS = [
{'lat': 48.85824, 'lng': 2.2945},
{'lat': 40.748440, 'lng': -73.985664},
{'lat': 35.689487, 'lng': 139.691711},
{'lat': -33.856784, 'lng': 151.215297}
]
def generate_id():
return ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=10))
HF_DATASET_REPO = 'Jofthomas/geoguessr_game_of_the_day'
GOOGLE_MAPS_API_KEY = os.getenv('GOOGLE_MAPS_API_KEY')
SERVER_HF_TOKEN = os.getenv('HF_TOKEN', '')
# Mistral hosted API (used by the "MM3.5" player).
# The Mistral SDK convention is MISTRAL_API_KEY.
# `mistral-medium-latest` is a rolling alias to the newest Medium model.
# Override MISTRAL_MM35_MODEL if/when a pinned dated id (e.g. mistral-medium-2511)
# is published for Mistral Medium 3.5.
MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY", "")
MISTRAL_API_URL = os.getenv("MISTRAL_API_URL", "https://api.mistral.ai/v1/chat/completions")
MM35_MODEL_ID = os.getenv("MISTRAL_MM35_MODEL", "mistral-medium-latest")
MM35_DISPLAY_NAME = "MM3.5"
# Wipe the ZeroGPU offload dir and any partial HF cache from a previous (possibly
# failed) container start. ZeroGPU pre-allocates the full packed-tensor blob with
# posix_fallocate; stale files there are a common cause of
# OSError: [Errno 28] No space left on device on ZeroGPU Spaces.
# Note: we keep PATH and HOME so the shell can resolve `rm` and `~`.
try:
subprocess.run(
"rm -rf /data-nvme/zerogpu-offload/* ~/.cache/huggingface/hub/tmp* 2>/dev/null || true",
shell=True,
env={
"PATH": os.environ.get("PATH", "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"),
"HOME": os.environ.get("HOME", "/root"),
},
check=False,
)
except Exception as _e:
print(f"[startup] offload cleanup skipped: {_e}")
model_id = "mistralai/Magistral-Small-2509"
tokenizer = AutoTokenizer.from_pretrained(model_id, tokenizer_type="mistral", use_fast=False)
# On ZeroGPU, CUDA is emulated at module load and becomes a real GPU only inside
# @spaces.GPU functions. The docs explicitly require model placement to happen at
# the root module level (`.to("cuda")`); lazy moves inside @spaces.GPU are much
# slower because tensor packing happens at startup.
# https://huggingface.co/docs/hub/spaces-zerogpu#model-loading
model = (
Mistral3ForConditionalGeneration.from_pretrained(
model_id,
torch_dtype=torch.bfloat16,
low_cpu_mem_usage=True,
)
.to("cuda")
.eval()
)
# SYSTEM_PROMPT_TEXT = (
# "You are a world-class geolocation expert. Given a street-view style image, "
# "think step by step about visual clues and infer approximate coordinates. "
# "When you conclude, output your answer inside [ANSWER]lat,lng[/ANSWER]."
# )
SYSTEM_PROMPT_TEXT = """First draft your thinking process (inner monologue) until you arrive at a response. Format your response using Markdown, and use LaTeX for any mathematical equations. Write both your thoughts and the response in the same language as the input.
Your thinking process must follow the template below:[THINK]Your thoughts or/and draft, like working through an exercise on scratch paper. Be as casual and as long as you want until you are confident to generate the response to the user.[/THINK]Here, provide a self-contained response."""
USER_INSTRUCTION = """You are a world-class geolocation expert. Given a street-view style image, think step by step about visual clues and infer approximate coordinates.
When you conclude, output your final answer inside [ANSWER]lat,lng[/ANSWER].
Please analyze this image and provide coordinates in the required format."""
@spaces.GPU(duration=120)
def llm_decode_image_return_text(image_bytes: bytes) -> str:
print(f"[llm] decode start. image_bytes={len(image_bytes)} bytes")
import base64, mimetypes
try:
encoded_image = base64.b64encode(image_bytes).decode('utf-8')
mime_type = 'image/jpeg'
data_url = f"data:{mime_type};base64,{encoded_image}"
messages = [
{"role": "system", "content": [{"type": "text", "text": SYSTEM_PROMPT_TEXT}]},
{"role": "user", "content": [
{"type": "text", "text": USER_INSTRUCTION},
{"type": "image_url", "image_url": {"url": data_url}},
]},
]
print(f"[llm] messages prepared. system+user with image_url length={len(data_url)}")
tokenized = tokenizer.apply_chat_template(messages, return_dict=True)
print(f"[llm] tokenized keys={list(tokenized.keys())}")
import torch
input_ids = torch.tensor(tokenized.input_ids).unsqueeze(0)
attention_mask = torch.tensor(tokenized.attention_mask).unsqueeze(0)
print(f"[llm] input_ids shape={tuple(input_ids.shape)} attn_mask shape={tuple(attention_mask.shape)} device={model.device}")
kwargs = {
'input_ids': input_ids.to(model.device),
'attention_mask': attention_mask.to(model.device),
}
if 'pixel_values' in tokenized and len(tokenized.pixel_values) > 0:
pixel_values = torch.tensor(tokenized.pixel_values[0], dtype=model.dtype).unsqueeze(0).to(model.device)
image_sizes = torch.tensor(pixel_values.shape[-2:]).unsqueeze(0).to(model.device)
kwargs.update({'pixel_values': pixel_values, 'image_sizes': image_sizes})
print(f"[llm] pixel_values shape={tuple(pixel_values.shape)} image_sizes={tuple(image_sizes.shape)}")
output = model.generate(**kwargs)[0]
print(f"[llm] generate done. output length={len(output)}")
decoded = tokenizer.decode(output[len(tokenized.input_ids): ( -1 if output[-1] == tokenizer.eos_token_id else len(output) )])
print(f"[llm] decode done. text length={len(decoded)}")
return decoded
except Exception as e:
print(f"[llm] decode failed: {e}")
return f"[Error] {e}"
@spaces.GPU(duration=120)
def llm_stream_image_text(image_bytes: bytes):
print(f"[llm-stream] start. image_bytes={len(image_bytes)} bytes")
import base64
try:
encoded_image = base64.b64encode(image_bytes).decode('utf-8')
data_url = f"data:image/jpeg;base64,{encoded_image}"
messages = [
{"role": "system", "content": [{"type": "text", "text": SYSTEM_PROMPT_TEXT}]},
{"role": "user", "content": [
{"type": "text", "text": USER_INSTRUCTION},
{"type": "image_url", "image_url": {"url": data_url}},
]},
]
tokenized = tokenizer.apply_chat_template(messages, return_dict=True)
input_ids = torch.tensor(tokenized.input_ids).unsqueeze(0)
attention_mask = torch.tensor(tokenized.attention_mask).unsqueeze(0)
kwargs = {
'input_ids': input_ids.to(model.device),
'attention_mask': attention_mask.to(model.device),
'max_new_tokens': 8192,
}
if 'pixel_values' in tokenized and len(tokenized.pixel_values) > 0:
pixel_values = torch.tensor(tokenized.pixel_values[0], dtype=model.dtype).unsqueeze(0).to(model.device)
image_sizes = torch.tensor(pixel_values.shape[-2:]).unsqueeze(0).to(model.device)
kwargs.update({'pixel_values': pixel_values, 'image_sizes': image_sizes})
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=False)
kwargs['streamer'] = streamer
thread = threading.Thread(target=model.generate, kwargs=kwargs)
thread.start()
acc = ""
for new_text in streamer:
acc += new_text
yield acc
except Exception as e:
yield f"[Error] {e}"
def mm35_stream_image_text(image_bytes: bytes):
"""Stream a response from Mistral Medium (MM3.5) via the Mistral hosted API.
Yields the cumulative text after each new chunk, mirroring
`llm_stream_image_text`'s contract so the two players are interchangeable.
"""
print(f"[mm35] start. image_bytes={len(image_bytes)} bytes model={MM35_MODEL_ID}")
if not MISTRAL_API_KEY:
yield "[Error] MISTRAL_API_KEY is not set on this Space. MM3.5 is disabled."
return
import base64
try:
encoded_image = base64.b64encode(image_bytes).decode("utf-8")
data_url = f"data:image/jpeg;base64,{encoded_image}"
payload = {
"model": MM35_MODEL_ID,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT_TEXT},
{
"role": "user",
"content": [
{"type": "text", "text": USER_INSTRUCTION},
{"type": "image_url", "image_url": data_url},
],
},
],
"stop": ["[ANSWER]"],
"stream": True,
"max_tokens": 4096,
"temperature": 0.7,
"top_p": 0.95,
}
headers = {
"Authorization": f"Bearer {MISTRAL_API_KEY}",
"Content-Type": "application/json",
"Accept": "text/event-stream",
}
acc = ""
with requests.post(
MISTRAL_API_URL,
json=payload,
headers=headers,
stream=True,
timeout=180,
) as resp:
if resp.status_code >= 400:
body = resp.text[:500]
yield f"[Error] Mistral API {resp.status_code}: {body}"
return
for raw_line in resp.iter_lines(decode_unicode=True):
if not raw_line:
continue
if not raw_line.startswith("data:"):
continue
data_str = raw_line[5:].strip()
if data_str == "[DONE]":
break
try:
chunk = json.loads(data_str)
except json.JSONDecodeError:
continue
choices = chunk.get("choices") or []
if not choices:
continue
delta = choices[0].get("delta") or {}
content = delta.get("content")
if not content:
continue
if isinstance(content, list):
for part in content:
if isinstance(part, dict):
text_part = part.get("text") or ""
if text_part:
acc += text_part
yield acc
else:
acc += content
yield acc
print(f"[mm35] done. text length={len(acc)}")
except Exception as e:
yield f"[Error] {e}"
def pick_random_location(difficulty: str) -> Dict[str, float]:
candidates = zones.get(difficulty, [])
if candidates:
selected_zone = random.choice(candidates)
if selected_zone.get('type') == 'rectangle':
b = selected_zone['bounds']
north, south, east, west = b['north'], b['south'], b['east'], b['west']
if west > east:
east += 360
lng = random.uniform(west, east)
if lng > 180:
lng -= 360
lat = random.uniform(south, north)
ensured = _ensure_street_view_location(lat, lng)
if ensured:
return ensured
fallback = random.choice(LOCATIONS)
ensured_fallback = _ensure_street_view_location(fallback['lat'], fallback['lng'])
return ensured_fallback or fallback
def street_view_image_url(lat: float, lng: float) -> str:
if not GOOGLE_MAPS_API_KEY:
# Fallback placeholder to avoid blank image when key is missing
return "https://picsum.photos/1200/800"
return (
f"https://maps.googleapis.com/maps/api/streetview?size=1200x800&location={lat},{lng}&fov=60&pitch=0&source=outdoor&key={GOOGLE_MAPS_API_KEY}"
)
def _has_street_view(lat: float, lng: float) -> bool:
if not GOOGLE_MAPS_API_KEY:
return True
try:
resp = requests.get(
"https://maps.googleapis.com/maps/api/streetview/metadata",
params={"location": f"{lat},{lng}", "source": "outdoor", "key": GOOGLE_MAPS_API_KEY},
timeout=5,
)
resp.raise_for_status()
data = resp.json()
# Check if it's OK and preferably outdoor (not inside buildings)
if data.get("status") == "OK":
# Prefer locations that are explicitly outdoor
location_type = data.get("location_type")
# If location_type is available, check it's not indoors
if location_type and location_type == "INDOOR":
return False
return True
return False
except Exception:
return False
def _snap_to_nearest_road(lat: float, lng: float) -> Optional[Dict[str, float]]:
if not GOOGLE_MAPS_API_KEY:
return None
try:
resp = requests.get(
"https://roads.googleapis.com/v1/nearestRoads",
params={"points": f"{lat},{lng}", "key": GOOGLE_MAPS_API_KEY},
timeout=5,
)
resp.raise_for_status()
data = resp.json()
points = data.get("snappedPoints") or []
if not points:
return None
loc = points[0].get("location") or {}
if "latitude" in loc and "longitude" in loc:
return {"lat": float(loc["latitude"]), "lng": float(loc["longitude"])}
except Exception:
pass
return None
def _ensure_street_view_location(lat: float, lng: float) -> Optional[Dict[str, float]]:
"""Return a coordinate with confirmed Street View coverage, snapped near a road when possible."""
if not GOOGLE_MAPS_API_KEY:
return {"lat": lat, "lng": lng}
checked: set[tuple] = set()
snapped = _snap_to_nearest_road(lat, lng)
candidates: List[Dict[str, float]] = []
if snapped:
candidates.append(snapped)
candidates.append({"lat": lat, "lng": lng})
# Explore a few jittered points if needed
if not snapped:
increments = [0.0005, -0.0005, 0.001, -0.001]
for d_lat in increments:
for d_lng in increments:
if d_lat == 0 and d_lng == 0:
continue
candidates.append({"lat": lat + d_lat, "lng": lng + d_lng})
for candidate in candidates:
key = (round(candidate["lat"], 6), round(candidate["lng"], 6))
if key in checked:
continue
checked.add(key)
if _has_street_view(candidate["lat"], candidate["lng"]):
return candidate
return None
def haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
from math import radians, cos, sin, asin, sqrt
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
r = 6371
return c * r
def score_from_distance_km(distance_km: float) -> float:
max_score = 5000.0
return max(0.0, max_score - distance_km)
def build_street_html(image_url: str) -> str:
base = """
"""
return base.replace('__IMG_URL__', image_url)
def gr_start_game(difficulty: str, username: str, request: gr.Request):
rounds: List[Dict[str, Any]] = []
date_str = datetime.now(timezone.utc).date().isoformat()
game_id = str(uuid.uuid4()) # Generate unique game ID
for _ in range(3):
loc = pick_random_location(difficulty)
round_id = generate_id()
rounds.append({
'id': round_id,
'lat': loc['lat'],
'lng': loc['lng'],
'image_url': street_view_image_url(loc['lat'], loc['lng']),
'human_guess': None,
'ai_guess': None,
'human_score': 0.0,
'ai_score': 0.0,
})
user_sessions[username] = {
'game_id': game_id,
'difficulty': difficulty,
'rounds': rounds,
'total_score': 0.0,
'completed': False,
'date': date_str,
}
r0 = rounds[0]
street_html = build_street_html(r0['image_url'])
return rounds, 0, r0['id'], street_html, "", ""
def get_round(username: str, round_id: str) -> Optional[Dict[str, Any]]:
session_data = user_sessions.get(username)
if not session_data:
return None
for r in session_data['rounds']:
if r['id'] == round_id:
return r
return None
def gr_submit_guess(round_id: str, lat: float, lng: float, username: str, request: gr.Request):
rnd = get_round(username, round_id)
if not rnd:
return "", "Round not found", gr.update(), gr.update(), gr.update()
distance_km = haversine_km(rnd['lat'], rnd['lng'], float(lat), float(lng))
score = score_from_distance_km(distance_km)
rnd['human_guess'] = {'lat': float(lat), 'lng': float(lng)}
rnd['human_score'] = score
rnd['human_distance_km'] = float(distance_km)
result_text = f"Your guess was {distance_km:.2f} km away. You scored {score:.0f} points."
scoreboard_html = (
f"
"""
popup_html = (
popup_html_template
.replace('__SCOREBOARD__', scoreboard_html)
.replace('__RND_LAT__', str(rnd['lat']))
.replace('__RND_LNG__', str(rnd['lng']))
.replace('__H_LAT__', str(float(lat)))
.replace('__H_LNG__', str(float(lng)))
.replace('__AI_LAT__', ai_lat_str)
.replace('__AI_LNG__', ai_lng_str)
.replace('__MM_LAT__', mm_lat_str)
.replace('__MM_LNG__', mm_lng_str)
.replace('__GMAPS_KEY__', GOOGLE_MAPS_API_KEY or '')
)
yield popup_html, (txt + "\n\n" + (mag_text or "")), (mm_text or "")
validate_btn.click(
on_validate,
inputs=[round_id_box, lat_box, lng_box, user_profile],
outputs=[popup_html, ai_chat, mm35_chat],
# We already stream live tokens into the two AI textboxes, so Gradio's
# default full-screen "pending" overlay is just noise that makes the UI
# look frozen.
show_progress="hidden",
)
def on_next(r_state: list, idx: int, profile: dict, request: gr.Request):
if not profile:
return idx, gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
username = profile["name"]
idx += 1
sess = user_sessions.get(username)
if not sess or idx >= len(sess['rounds']):
total_human = sum(float(r.get('human_score', 0.0)) for r in sess.get('rounds', []))
total_ai = sum(float(r.get('ai_score', 0.0)) for r in sess.get('rounds', []))
total_mm35 = sum(float(r.get('mm35_score', 0.0)) for r in sess.get('rounds', []))
game_id = sess.get('game_id', '')
# Final scores (rounds already recorded incrementally)
try:
data_manager.update_game_record(
username,
game_id,
final_score=total_human,
final_ai_score=total_ai,
final_mm35_score=total_mm35,
)
except TypeError:
# Backwards-compat: older data_manager.update_game_record without mm35 arg.
data_manager.update_game_record(
username,
game_id,
final_score=total_human,
final_ai_score=total_ai,
)
scores = {
"You": total_human,
"Magistral": total_ai,
MM35_DISPLAY_NAME: total_mm35,
}
top_score = max(scores.values()) if scores else 0
winners = [name for name, sc in scores.items() if sc == top_score and top_score > 0]
if not winners:
winner_message = "Everyone scored 0 — try a harder round!"
elif len(winners) == 1:
w = winners[0]
winner_message = "Congratulations, you won!" if w == "You" else f"{w} won this round."
else:
winner_message = "It's a tie between " + ", ".join(winners) + "!"
summary_html = f"""
Game Over!
Final scores:
You: {total_human:.0f}
Magistral: {total_ai:.0f}
{MM35_DISPLAY_NAME}: {total_mm35:.0f}
{winner_message}
"""
return (
idx,
gr.update(value=summary_html),
gr.update(value=""),
gr.update(value=""),
gr.update(value=""),
gr.update(value=""),
)
r = sess['rounds'][idx]
s_html = build_street_html(r['image_url'])
return (
idx,
gr.update(value=s_html),
gr.update(value=r['id']),
gr.update(value=""),
gr.update(value=""),
gr.update(value=""),
)
next_btn.click(
on_next,
inputs=[rounds_state, idx_state, user_profile],
outputs=[idx_state, street_html, round_id_box, popup_html, ai_chat, mm35_chat],
show_progress="hidden",
)
# Inject boot JS using load(js=callable) compatible format
demo.load(fn=lambda: None, inputs=None, outputs=None, js=APP_BOOT_JS)
if __name__ == "__main__":
# SSR (the Node proxy at :7861) makes the whole UI look like it's still
# "loading" until the first interaction triggers React hydration -- which on
# a heavy multimodal Space can take several seconds. Disabling SSR pays a
# ~200ms time-to-first-paint cost but every widget is interactive immediately.
demo.queue().launch(
server_name="0.0.0.0",
server_port=7860,
ssr_mode=False,
)