"""MatchDay — HF Space entry point (gradio.Server mode, N1 / Off-Brand). The Space runs THIS file. It is a `gradio.Server` app: a fully custom ``index.html`` frontend is served at ``/`` while ``@app.api("plan_trip")`` is an async generator that streams N12-typed JSON events through Gradio's queue (SSE), so the frontend updates live as Nemotron decides → Python scores → Nemotron explains. This is the Off-Brand path: a bespoke UI powered by Gradio's backend (queuing, concurrency, Spaces hosting) — not stock Gradio components. Brain + Hands: Nemotron (on Modal) never calls an API or names a price; Python executes every call and scores every value. Every figure carries provenance. ``matchday/app.py`` is a compatibility shim that imports and launches this same app, so ``python3 -m matchday.app`` runs the identical non-decorative path. Reference patterns (3-codebase study, see MATCHDAY_UNCONSTRAINED_PLAN.md): - N1 gradio.Server custom-frontend architecture (Off-Brand badge): https://huggingface.co/blog/introducing-gradio-server ("Why @app.api()…"). - N35 preflight gate (fail-fast on missing SerpApi key): Claude Code utils/preflightChecks.tsx:1-60. """ from __future__ import annotations import asyncio import json import logging import os import sys import time from datetime import date from pathlib import Path # Repo-root importability when the Space runs this file directly. sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from fastapi.responses import HTMLResponse # noqa: E402 from gradio import Server # noqa: E402 from matchday.agent import MatchDayAgent # noqa: E402 from matchday.agent_loop import run_agent_loop # noqa: E402 from matchday.intent import parse_intent # noqa: E402 from matchday.models import TripRequest # noqa: E402 from matchday.prompts import EXPLANATION_HINT # noqa: E402 from matchday.render import render_full # noqa: E402 from matchday.trip_tool import build_trip_packages, format_for_nemotron # noqa: E402 logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) # Nemotron primary, deterministic fallback. Flip to False to force the # deterministic path (fast demo / Modal-down insurance). USE_AGENT = True HERE = Path(__file__).parent INDEX_HTML = HERE / "index.html" async def _warm_nemotron() -> None: """Best-effort warm generate on Space startup so the FIRST user query isn't stuck behind a Modal cold start (~2 min warm from the weight cache). Runs as a fire-and-forget background task; never blocks startup, never raises. """ try: agent = MatchDayAgent() await asyncio.wait_for( agent.run([{"role": "user", "content": "warmup ping"}]), timeout=240 ) logger.info("startup warmup ping completed — Nemotron container is hot") except Exception as exc: # noqa: BLE001 — best-effort, must not break boot logger.info("startup warmup ping ended early (%s)", repr(exc)[:80]) async def _startup_warmup() -> None: """Server startup hook — schedule the warmup without blocking boot.""" asyncio.create_task(_warm_nemotron()) app = Server(on_startup=[_startup_warmup]) def _ev(**payload) -> str: """Serialize a typed stream event (N12) as a JSON string for the SSE stream.""" return json.dumps(payload, ensure_ascii=False) async def _pulse(coro, holder, message, interval: int = 9): """Run ``coro`` to completion, yielding a commentary heartbeat every ``interval`` seconds (carrying elapsed seconds) so the SSE stream is never silent during a long Modal cold-start or SerpApi phase. Stashes the result in ``holder['result']``; re-raises if ``coro`` raised. Usage:: h = {} async for beat in _pulse(coro, h, msg): yield beat value = h["result"] """ task = asyncio.ensure_future(coro) start = time.monotonic() while True: done, _ = await asyncio.wait({task}, timeout=interval) if task in done: holder["result"] = task.result() return yield _ev(type="commentary", text=f"{message} ({int(time.monotonic() - start)}s)") def _notice_status(result, *keywords: str) -> str: """Map a data category to ``done`` | ``fallback`` from REAL degradation notices. Honest per-category progress: if ``build_trip_packages`` reported a category as unavailable, that step is ``fallback``; otherwise ``done``. Tied to the real dispatch outcome — never a cosmetic timer. """ blob = " ".join(result.degradation_notices or "").lower() return "fallback" if any(k in blob for k in keywords) else "done" # Cached per-boot preflight (N35). Fail-fast ONLY on genuinely-doomed config # (missing SerpApi key — build_trip_packages cannot fetch live flights/hotels). # Modal cold-start is NOT a hard failure: it streams via _pulse heartbeats and # the loop degrades to the deterministic parser, so we don't gate on it. _PREFLIGHT_OK: bool | None = None def _preflight() -> tuple[bool, str]: """Return (ok, reason). ``reason`` is empty when ok. Cached positive.""" global _PREFLIGHT_OK if _PREFLIGHT_OK: return True, "" if not os.environ.get("SERPAPI_API_KEY"): return False, ( "SerpApi key is not set on this Space — live flight & hotel search is " "unavailable. Add SERPAPI_API_KEY in Settings → Secrets, then restart." ) _PREFLIGHT_OK = True return True, "" async def _agent_explain(agent, user_text: str, trip: TripRequest, result) -> str: """Round 2 — Nemotron compares the packages. Best-effort ('' on failure).""" args_json = json.dumps(trip.model_dump(mode="json")) convo = [ {"role": "user", "content": user_text}, { "role": "assistant", "content": "", "tool_calls": [{ "id": "call_build", "type": "function", "function": {"name": "build_trip_packages", "arguments": args_json}, }], }, { "role": "tool", "tool_call_id": "call_build", "name": "build_trip_packages", "content": format_for_nemotron(result), }, {"role": "user", "content": EXPLANATION_HINT}, ] try: r2 = await agent.run(convo, tools=[]) # no tools → Nemotron must write text return (r2.get("text") or "").strip() except Exception as exc: logger.warning("explanation round failed: %s", exc) return "" @app.api(name="plan_trip", concurrency_limit=4, stream_every=0.5) async def plan_trip(user_text: str) -> str: """Stream the agentic trip build as typed events (N12 + N10). Yields: commentary (progress beats, sent immediately) → greenlight (parsed trip) | clarify | error → result (full cards+map+timeline render + Nemotron's explanation). Falls back to the deterministic parser if the agent is unavailable or hedges. """ ok, why = _preflight() # N35 — fail-fast on doomed config (missing key) if not ok: yield _ev(type="error", text=f"⚠️ {why}") return yield _ev(type="commentary", text="Reading your trip request…") yield _ev(type="progress", step="read", status="done", text="Read your request") yield _ev(type="progress", step="extract", status="running", text="Understanding your trip") agent = None if USE_AGENT: try: agent = MatchDayAgent() except Exception as exc: logger.warning("agent init failed (%s); deterministic path.", exc) # ── Smart path: the bounded agent loop (K1). Nemotron UNDERSTANDS the # request, may GROUND itself with web_search (I6), may CLARIFY to capture # intent (P7), and calls build_trip_packages when ready. The loop validates # args, dedups, and self-corrects one malformed call (A4). No more bypass. messages: list[dict] = [{"role": "user", "content": user_text}] trip: TripRequest | None = None result = None # TripPackageResult produced inside the loop's build call agent_text = "" # a clarify question or direct answer from the Brain if agent is not None: yield _ev(type="commentary", text="🧠 Nemotron is understanding your request…") for attempt in range(3): # cap grounding rounds (web_search → build) h = {} try: async for beat in _pulse( run_agent_loop(agent, messages), h, "🧠 Nemotron is understanding your request & choosing tools", ): yield beat res = h.get("result") except Exception as exc: logger.warning("agent loop attempt %d failed (%s).", attempt, exc) res = None if res is None: break if res.type == "tool_called" and res.tool == "build_trip_packages": result = res.result.get("full_result") trip = res.result.get("trip") break if res.type == "tool_called" and res.tool == "web_search": # Brain grounded itself — thread the result back so it can build. tcid = f"call_ws_{attempt}" messages.append({ "role": "assistant", "content": "", "tool_calls": [{ "id": tcid, "type": "function", "function": { "name": "web_search", "arguments": json.dumps(res.result.get("query") or {}), }, }], }) messages.append({ "role": "tool", "tool_call_id": tcid, "name": "web_search", "content": json.dumps(res.result, ensure_ascii=False)[:1200], }) yield _ev( type="commentary", text="🔎 Grounded with a web search — now building your packages…", ) continue if res.type == "final_answer": agent_text = res.text or "" break # fallback_to_deterministic → EXPLICIT + user-visible degrade. Never # silently swap in the deterministic path. Most commonly this is a # Modal cold-start timeout (see the agent_loop reason) — tell the user # honestly so the wait / fast-mode result is understood, not hidden. if res.type == "fallback_to_deterministic": yield _ev( type="commentary", text="🌡️ Nemotron is warming up on Modal (cold start) — " "building your packages in fast mode now, then I'll " "still compare them live.", ) break # ── Deterministic fallback (K3): parse intent + build directly. Used when # the agent is unavailable, hedged to a non-build answer, or the loop failed. if result is None and not agent_text: parsed = parse_intent(user_text) trip = parsed.trip_request if trip is not None: yield _ev(type="greenlight", text=trip.summary()) yield _ev( type="commentary", text="✈️ Scanning airlines · 🏨 Finding hotels near BC Place · 🌤️ Checking the match-day forecast…", ) yield _ev(type="progress", step="flights", status="running") yield _ev(type="progress", step="hotels", status="running") yield _ev(type="progress", step="weather", status="running") yield _ev(type="progress", step="nearby", status="running") hb = {} try: async for beat in _pulse( build_trip_packages(trip), hb, "✈️ Scanning airlines · 🏨 hotels near BC Place · 🌤️ weather", ): yield beat result = hb["result"] except Exception as exc: yield _ev(type="error", text=f"⚠️ {exc}") return else: yield _ev(type="progress", step="ready", status="fallback", text="Need a detail from you") yield _ev( type="clarify", text=parsed.question or "Tell me where you're flying from and which match you want to see.", ) return # Clarify / direct answer from the Brain (no packages to show). if result is None: yield _ev(type="progress", step="ready", status="fallback", text="Need a detail from you") yield _ev(type="clarify", text=agent_text) return # ── Honest per-category progress from the REAL dispatch outcome (N10/I1): # each data step is done/fallback according to build_trip_packages' own # degradation notices — never a cosmetic timer. if trip is not None: yield _ev(type="progress", step="extract", status="done", text="Trip details captured") yield _ev(type="progress", step="flights", status=_notice_status(result, "flight")) yield _ev(type="progress", step="hotels", status=_notice_status(result, "hotels unavailable", "hotels,")) yield _ev(type="progress", step="weather", status=_notice_status(result, "weather")) yield _ev(type="progress", step="nearby", status=_notice_status(result, "amenities", "nearby")) yield _ev(type="progress", step="score", status="done" if result.packages else "fallback") yield _ev(type="progress", step="itinerary", status="running", text="Building itinerary") yield _ev(type="progress", step="links", status="running", text="Preparing links") # ── Render. greenlight confirms the captured intent just before the packages. if trip is not None: yield _ev(type="greenlight", text=trip.summary()) yield _ev(type="commentary", text="🗺️ Nemotron is comparing your 3 packages…") explanation = "" if agent is not None: he = {} try: async for beat in _pulse( _agent_explain(agent, user_text, trip, result), he, "🗺️ Nemotron is comparing your 3 packages…", ): yield beat explanation = he["result"] except Exception as exc: logger.warning("explanation round failed (%s).", exc) explanation = "" # Final: the full Layla-competitive render (status + cards + map + timeline). # leaflet_preloaded=True → the frontend already loaded Leaflet in ; the # map's inline init script is re-run after injection (see index.html). yield _ev(type="progress", step="itinerary", status="done") yield _ev(type="progress", step="links", status="done") yield _ev(type="progress", step="ready", status="done", text="Your packages are ready") yield _ev(type="result", html=render_full(result, trip, leaflet_preloaded=True), explanation=explanation) @app.get("/", response_class=HTMLResponse) async def homepage(): with open(INDEX_HTML, "r", encoding="utf-8") as fh: return fh.read() if __name__ == "__main__": app.launch(server_port=int(os.environ.get("PORT", "7860")), show_error=True)