# antigravity_proxy.py - Antigravity Cloud Code proxy integration # Routes requests through the antigravity-claude-proxy (github.com/badrisnarayanan/antigravity-claude-proxy) # which exposes an Anthropic Messages API backed by Google Cloud Code. # import webbrowser # Usage: prefix models with 'antigravity/' (e.g., antigravity/claude-sonnet-4-5, antigravity/gemini-3-flash) """ Antigravity Proxy adapter for Glossarion. The antigravity-claude-proxy runs as a local Node.js server (default: http://localhost:8080) and exposes an Anthropic-compatible Messages API backed by Google Cloud Code. Supported models (via the proxy): - Claude: claude-sonnet-4-5, claude-sonnet-4-5-thinking, claude-opus-4-6-thinking - Gemini: gemini-3-flash, gemini-3.1-pro-high, gemini-3.1-pro-low Prerequisites: 1. Install the proxy: npm install -g antigravity-claude-proxy 2. Start the proxy: antigravity-claude-proxy start (or: npx antigravity-claude-proxy@latest start) 3. Link account: Open http://localhost:8080 and add your Google account """ import os import sys import json import time import logging import shutil import subprocess import threading from typing import Optional, Dict, Any, List import requests logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- DEFAULT_PROXY_URL = "http://localhost:8080" MESSAGES_ENDPOINT = "/v1/messages" HEALTH_ENDPOINT = "/health" # The proxy accepts any value as auth token (it uses its own Google OAuth) DUMMY_AUTH_TOKEN = "antigravity-proxy" # Module-level cancellation flag _cancel_event = threading.Event() # Module-level proxy subprocess tracking _proxy_process: Optional[subprocess.Popen] = None _proxy_launch_lock = threading.Lock() # Auth browser tracking — only open the browser once per session _auth_browser_opened = False _auth_browser_lock = threading.Lock() def _open_auth_browser_once(proxy_url: str, log_fn=None) -> bool: """Open the proxy auth URL in the browser, but only once per session. Returns True if the browser was opened (first call), False if already opened. """ global _auth_browser_opened with _auth_browser_lock: if _auth_browser_opened: return False _auth_browser_opened = True _log = log_fn or (lambda msg: None) _log(f"🔐 Antigravity: Authentication required – opening {proxy_url} in your browser...") _log(f" Please link your Google account. Glossarion will continue automatically once done.") try: webbrowser.open(proxy_url) except Exception: pass return True def _wait_for_auth( url: str, payload: dict, headers: dict, proxy_url: str, log_fn=None, max_wait: int = 120, poll_interval: int = 5, stream: bool = False, ): """Open browser once and poll until authentication succeeds or timeout. Returns the successful requests.Response, or None if timed out. """ _open_auth_browser_once(proxy_url, log_fn) _log = log_fn or (lambda msg: None) elapsed = 0 while elapsed < max_wait: time.sleep(poll_interval) elapsed += poll_interval if _cancel_event.is_set(): return None _log(f"⏳ Waiting for authentication... ({elapsed}s / {max_wait}s)") try: retry_resp = requests.post( url, json=payload, headers=headers, timeout=30, stream=stream ) if retry_resp.status_code not in (401, 403): return retry_resp except Exception: continue return None def cancel_stream(): """Signal any active Antigravity proxy stream to abort.""" _cancel_event.set() def reset_cancel(): """Clear the cancellation flag before a new request.""" _cancel_event.clear() def is_cancelled() -> bool: return _cancel_event.is_set() # --------------------------------------------------------------------------- # Proxy URL resolution # --------------------------------------------------------------------------- def get_proxy_url() -> str: """Get the Antigravity proxy URL from env or default.""" return os.environ.get("ANTIGRAVITY_PROXY_URL", DEFAULT_PROXY_URL).rstrip("/") # --------------------------------------------------------------------------- # Health check # --------------------------------------------------------------------------- def check_proxy_health() -> Dict[str, Any]: """Check if the Antigravity proxy is running and healthy. Returns a dict with 'healthy' (bool) and optional 'details'. """ try: url = f"{get_proxy_url()}{HEALTH_ENDPOINT}" resp = requests.get(url, timeout=5) if resp.status_code == 200: data = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {} return {"healthy": True, "details": data} return {"healthy": False, "error": f"HTTP {resp.status_code}"} except requests.ConnectionError: return {"healthy": False, "error": "Connection refused – is the antigravity-claude-proxy running?"} except Exception as exc: return {"healthy": False, "error": str(exc)} # --------------------------------------------------------------------------- # Auto-launch proxy # --------------------------------------------------------------------------- def _find_npx() -> Optional[str]: """Locate the npx executable on PATH or common install locations.""" # Try PATH first npx = shutil.which("npx") if npx: return npx # On Windows, check common Node.js install locations if sys.platform == "win32": candidates = [] for env_var in ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA", "APPDATA"): base = os.environ.get(env_var, "") if base: candidates.append(os.path.join(base, "nodejs", "npx.cmd")) candidates.append(os.path.join(base, "fnm", "node-versions")) # fnm # nvm-windows nvm_home = os.environ.get("NVM_HOME", "") if nvm_home: nvm_symlink = os.environ.get("NVM_SYMLINK", os.path.join(nvm_home, "..","nodejs")) candidates.append(os.path.join(nvm_symlink, "npx.cmd")) # Volta volta_home = os.environ.get("VOLTA_HOME", "") if volta_home: candidates.append(os.path.join(volta_home, "bin", "npx.cmd")) # Common default paths candidates.extend([ os.path.expandvars(r"%PROGRAMFILES%\nodejs\npx.cmd"), os.path.expandvars(r"%APPDATA%\npm\npx.cmd"), ]) for path in candidates: if os.path.isfile(path): return path return None def _ensure_proxy_config(): """Ensure the proxy config disables API key auth. The antigravity-claude-proxy validates an ``apiKey`` on every ``/v1/*`` request. Glossarion talks to the proxy on localhost, so there is no need for this gate. We write ``{"apiKey": ""}`` (skip validation) into the proxy's config file so that any dummy token we send is accepted. Existing settings in the file are preserved; only ``apiKey`` is touched. """ try: config_dir = os.path.join(os.path.expanduser("~"), ".config", "antigravity-proxy") os.makedirs(config_dir, exist_ok=True) config_path = os.path.join(config_dir, "config.json") # Read existing config if present, otherwise start fresh existing: Dict[str, Any] = {} if os.path.isfile(config_path): try: with open(config_path, "r", encoding="utf-8") as f: existing = json.loads(f.read()) except Exception: existing = {} # Only write if apiKey is not already empty if existing.get("apiKey", None) != "": existing["apiKey"] = "" with open(config_path, "w", encoding="utf-8") as f: f.write(json.dumps(existing, indent=2)) except Exception: pass # Non-critical – proxy may still work without this def ensure_proxy_running(log_fn=None) -> Dict[str, Any]: """Ensure the Antigravity proxy is running, auto-launching if needed. 1. Checks health – if already running, returns immediately. 2. Finds npx on PATH (or common Node.js install locations). 3. Launches `npx -y antigravity-claude-proxy@latest start` in background. 4. Waits up to 20s for the proxy to become healthy. Returns dict with 'running' (bool), 'auto_launched' (bool), and optional 'error'. """ global _proxy_process _log = log_fn or (lambda msg: None) # Ensure proxy config disables API key auth (localhost doesn't need it) _ensure_proxy_config() # Already running? health = check_proxy_health() if health.get("healthy"): return {"running": True, "auto_launched": False} with _proxy_launch_lock: # Double-check after acquiring lock (another thread may have launched it) health = check_proxy_health() if health.get("healthy"): return {"running": True, "auto_launched": False} # If we already launched a process, check if it's still alive if _proxy_process is not None: if _proxy_process.poll() is None: # Process is still alive but not healthy yet – wait a bit _log("🌀 Antigravity proxy process is running, waiting for it to become healthy...") for _ in range(10): time.sleep(2) health = check_proxy_health() if health.get("healthy"): return {"running": True, "auto_launched": True} return { "running": False, "auto_launched": True, "error": "Proxy was launched but did not become healthy within 20s." } else: # Process exited – clear it so we can try again _proxy_process = None # Find npx npx_path = _find_npx() if not npx_path: return { "running": False, "auto_launched": False, "error": ( "Node.js (npx) is not installed or not on PATH.\n" "Install Node.js from https://nodejs.org/ then restart Glossarion,\n" "or manually run: npx -y antigravity-claude-proxy@latest start" ) } # Launch the proxy as a detached background process _log("🚀 Auto-launching Antigravity proxy...") try: # Build platform-appropriate launch args kwargs: Dict[str, Any] = { "stdout": subprocess.DEVNULL, "stderr": subprocess.DEVNULL, "stdin": subprocess.DEVNULL, } # Ensure node.exe's directory is on PATH for the subprocess # (npx.cmd invokes "node" and needs it resolvable) npx_dir = os.path.dirname(npx_path) env = os.environ.copy() if npx_dir not in env.get("PATH", ""): env["PATH"] = npx_dir + os.pathsep + env.get("PATH", "") kwargs["env"] = env if sys.platform == "win32": # CREATE_NEW_PROCESS_GROUP + DETACHED_PROCESS so it survives app close CREATE_NEW_PROCESS_GROUP = 0x00000200 DETACHED_PROCESS = 0x00000008 kwargs["creationflags"] = CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS else: kwargs["start_new_session"] = True cmd = [npx_path, "-y", "antigravity-claude-proxy@latest", "start"] _proxy_process = subprocess.Popen(cmd, **kwargs) _log(f"🌀 Proxy process started (PID {_proxy_process.pid}), waiting for it to become healthy...") except Exception as exc: return { "running": False, "auto_launched": False, "error": f"Failed to launch proxy: {exc}" } # Wait for it to become healthy (up to 20s) for attempt in range(20): time.sleep(1) # Check the process hasn't crashed if _proxy_process.poll() is not None: _proxy_process = None return { "running": False, "auto_launched": True, "error": ( "Proxy process exited immediately. " "Try running manually: npx -y antigravity-claude-proxy@latest start" ) } health = check_proxy_health() if health.get("healthy"): _log("✅ Antigravity proxy is now running!") return {"running": True, "auto_launched": True} return { "running": False, "auto_launched": True, "error": "Proxy launched but did not become healthy within 20s. Check the proxy logs." } # --------------------------------------------------------------------------- # Message format conversion # --------------------------------------------------------------------------- def _convert_messages_to_anthropic(messages: List[Dict]) -> tuple: """Convert OpenAI-style messages to Anthropic Messages API format. Returns (system_prompt, anthropic_messages). """ system_prompt = "" anthropic_messages = [] for msg in messages: role = msg.get("role", "user") content = msg.get("content", "") if role == "system": # Anthropic takes system as a top-level parameter if system_prompt: system_prompt += "\n\n" + content else: system_prompt = content elif role == "assistant": anthropic_messages.append({"role": "assistant", "content": content}) else: # user, function, tool → user anthropic_messages.append({"role": "user", "content": content}) # Ensure messages alternate user/assistant (Anthropic requirement) # Merge consecutive same-role messages merged = [] for msg in anthropic_messages: if merged and merged[-1]["role"] == msg["role"]: merged[-1]["content"] += "\n\n" + msg["content"] else: merged.append(msg) # Must start with user message if not merged or merged[0]["role"] != "user": merged.insert(0, {"role": "user", "content": "Please continue."}) return system_prompt, merged # --------------------------------------------------------------------------- # Send request (non-streaming) # --------------------------------------------------------------------------- def send_message( messages: List[Dict], model: str = "claude-sonnet-4-5", temperature: float = 0.7, max_tokens: int = 8192, timeout: float = 300, log_fn=None, ) -> Dict[str, Any]: """Send a message to the Antigravity proxy (Anthropic Messages API). Args: messages: OpenAI-format messages list model: Model name (without 'antigravity/' prefix) temperature: Sampling temperature max_tokens: Max output tokens timeout: Request timeout in seconds log_fn: Optional logging function (e.g. print) Returns: Dict with keys: content, finish_reason, usage, raw_response Raises: RuntimeError on proxy errors """ proxy_url = get_proxy_url() url = f"{proxy_url}{MESSAGES_ENDPOINT}" system_prompt, anthropic_messages = _convert_messages_to_anthropic(messages) payload = { "model": model, "messages": anthropic_messages, "max_tokens": max_tokens, "temperature": temperature, } if system_prompt: payload["system"] = system_prompt headers = { "Content-Type": "application/json", "x-api-key": DUMMY_AUTH_TOKEN, "anthropic-version": "2023-06-01", } if log_fn: log_fn(f"🌀 Antigravity: Sending to proxy at {proxy_url} (model={model})") try: resp = requests.post(url, json=payload, headers=headers, timeout=timeout) except requests.ConnectionError: raise RuntimeError( "Antigravity proxy connection refused. " "Make sure the proxy is running:\n" " npx antigravity-claude-proxy@latest start\n" " Then open http://localhost:8080 and add your Google account." ) except requests.Timeout: raise RuntimeError( f"Antigravity proxy request timed out after {timeout}s. " "The model may need more time for long translations." ) # Handle auth failure: open browser once and wait for user to authenticate if resp.status_code in (401, 403): auth_resp = _wait_for_auth( url, payload, headers, proxy_url, log_fn, stream=False ) if auth_resp is not None and auth_resp.status_code == 200: resp = auth_resp # auth succeeded, continue with this response else: raise RuntimeError( f"Antigravity: Authentication timed out.\n" f"Open {proxy_url} in your browser and link your Google account,\n" f"then try again." ) if resp.status_code != 200: error_body = resp.text try: error_json = resp.json() error_msg = error_json.get("error", {}).get("message", error_body) except Exception: error_msg = error_body raise RuntimeError( f"Antigravity: {resp.status_code} - {error_msg}" ) data = resp.json() # Extract content from Anthropic Messages API response content = "" if "content" in data and isinstance(data["content"], list): text_blocks = [ block.get("text", "") for block in data["content"] if block.get("type") == "text" ] content = "".join(text_blocks) elif "content" in data and isinstance(data["content"], str): content = data["content"] finish_reason = data.get("stop_reason", "end_turn") # Normalize to OpenAI-style finish reasons if finish_reason == "end_turn": finish_reason = "stop" elif finish_reason == "max_tokens": finish_reason = "length" usage = None if "usage" in data: u = data["usage"] usage = { "prompt_tokens": u.get("input_tokens", 0), "completion_tokens": u.get("output_tokens", 0), "total_tokens": u.get("input_tokens", 0) + u.get("output_tokens", 0), } return { "content": content, "finish_reason": finish_reason, "usage": usage, "raw_response": data, } # --------------------------------------------------------------------------- # Send request (streaming) # --------------------------------------------------------------------------- def send_message_stream( messages: List[Dict], model: str = "claude-sonnet-4-5", temperature: float = 0.7, max_tokens: int = 8192, timeout: float = 300, log_fn=None, ) -> Dict[str, Any]: """Send a streaming message to the Antigravity proxy. Collects all streamed chunks and returns once complete. Checks _cancel_event between chunks for cancellation support. Returns same format as send_message(). """ proxy_url = get_proxy_url() url = f"{proxy_url}{MESSAGES_ENDPOINT}" system_prompt, anthropic_messages = _convert_messages_to_anthropic(messages) payload = { "model": model, "messages": anthropic_messages, "max_tokens": max_tokens, "temperature": temperature, "stream": True, } if system_prompt: payload["system"] = system_prompt headers = { "Content-Type": "application/json", "x-api-key": DUMMY_AUTH_TOKEN, "anthropic-version": "2023-06-01", } if log_fn: log_fn(f"🌀 Antigravity: Streaming from proxy at {proxy_url} (model={model})") try: resp = requests.post( url, json=payload, headers=headers, timeout=timeout, stream=True ) except requests.ConnectionError: raise RuntimeError( "Antigravity proxy connection refused. " "Make sure the proxy is running:\n" " npx antigravity-claude-proxy@latest start" ) except requests.Timeout: raise RuntimeError( f"Antigravity proxy streaming request timed out after {timeout}s." ) # Handle auth failure: open browser once and wait for user to authenticate if resp.status_code in (401, 403): auth_resp = _wait_for_auth( url, payload, headers, proxy_url, log_fn, stream=True ) if auth_resp is not None and auth_resp.status_code == 200: resp = auth_resp # auth succeeded, continue with this streaming response else: raise RuntimeError( f"Antigravity: Authentication timed out.\n" f"Open {proxy_url} in your browser and link your Google account,\n" f"then try again." ) if resp.status_code != 200: raise RuntimeError(f"Antigravity: {resp.status_code} - {resp.text[:500]}") # Collect SSE events collected_content = [] finish_reason = "stop" usage = None for line in resp.iter_lines(decode_unicode=True): if _cancel_event.is_set(): resp.close() raise RuntimeError("Antigravity: stream cancelled by user") if not line or not line.startswith("data: "): continue data_str = line[6:] # Strip "data: " prefix if data_str.strip() == "[DONE]": break try: event = json.loads(data_str) except json.JSONDecodeError: continue event_type = event.get("type", "") if event_type == "content_block_delta": delta = event.get("delta", {}) if delta.get("type") == "text_delta": collected_content.append(delta.get("text", "")) elif event_type == "message_delta": delta = event.get("delta", {}) stop = delta.get("stop_reason", "") if stop: finish_reason = "stop" if stop == "end_turn" else ( "length" if stop == "max_tokens" else stop ) if "usage" in delta: u = delta["usage"] usage = { "prompt_tokens": u.get("input_tokens", 0), "completion_tokens": u.get("output_tokens", 0), "total_tokens": u.get("input_tokens", 0) + u.get("output_tokens", 0), } elif event_type == "message_start": msg = event.get("message", {}) if "usage" in msg: u = msg["usage"] usage = { "prompt_tokens": u.get("input_tokens", 0), "completion_tokens": u.get("output_tokens", 0), "total_tokens": u.get("input_tokens", 0) + u.get("output_tokens", 0), } content = "".join(collected_content) return { "content": content, "finish_reason": finish_reason, "usage": usage, "raw_response": None, }