import os, json, time, hashlib, httpx, base64, re, asyncio, threading, shutil, logging from typing import Dict, Any from urllib.parse import urlparse import chromedriver_autoinstaller from selenium import webdriver from selenium.webdriver.chrome.options import Options as ChromeOptions from selenium.webdriver.chrome.service import Service as ChromeService chromedriver_autoinstaller.install() LOGGER = logging.getLogger("lens_images_core") if not LOGGER.handlers: logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", ) COOKIE_JSON_URL = os.getenv("COOKIE_JSON_URL") if not COOKIE_JSON_URL: raise RuntimeError("Missing COOKIE_JSON_URL secret. Set it in Space Settings > Secrets.") UA = "Mozilla/5.0 (Lens OCR Images)" _COMMON_CHROME_PATHS = [ # Linux "/usr/bin/google-chrome", "/usr/bin/chromium", "/usr/bin/chromium-browser", "/snap/bin/chromium", "/opt/google/chrome/google-chrome", # macOS "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", "/Applications/Chromium.app/Contents/MacOS/Chromium", # Windows r"C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe", r"C:\\Program Files (x86)\\Google\\Chrome\\Application\\chrome.exe", ] def _find_chrome_binary() -> str | None: env = os.getenv("CHROME_BINARY") if env and shutil.which(env): return env for p in _COMMON_CHROME_PATHS: if os.path.isfile(p) and os.access(p, os.X_OK): return p try: import subprocess, shlex out = subprocess.check_output(shlex.split("which google-chrome"), stderr=subprocess.DEVNULL).decode().strip() if out: return out except Exception: pass return None def _build_chrome() -> webdriver.Chrome: bin_loc = _find_chrome_binary() or "/usr/bin/chromium" drv_path = os.getenv("CHROMEDRIVER", "/usr/bin/chromedriver") opts = ChromeOptions() opts.binary_location = bin_loc extra = os.getenv( "CHROME_EXTRA_ARGS", "--disable-gpu --no-sandbox --disable-dev-shm-usage --window-size=1920,1080 --headless=new", ).split() for a in extra: if a: opts.add_argument(a) service = ChromeService(executable_path=drv_path) return webdriver.Chrome(service=service, options=opts) _cached_cookie_obj: Dict[str, Any] | None = None _cached_cookie_fetched_at: float = 0.0 _CACHE_TTL = 300 _BROWSER_TTL = 900 _cookie_lock = threading.Lock() _IDLE_TIMEOUT = int(os.getenv("CHROME_IDLE_SECONDS", "60")) _driver_lock = threading.Lock() _global_driver = None _driver_last_use = 0.0 def _ensure_cookie_driver(): global _global_driver, _driver_last_use with _driver_lock: if _global_driver is None: LOGGER.info("▶️ starting headless Chrome for cookies") _global_driver = _build_chrome() _driver_last_use = time.time() return _global_driver def _quit_cookie_driver(): global _global_driver try: if _global_driver: _global_driver.quit() except Exception: pass finally: _global_driver = None def _driver_reaper_loop(): global _driver_last_use while True: try: time.sleep(1) with _driver_lock: if _global_driver and (time.time() - _driver_last_use) > _IDLE_TIMEOUT: LOGGER.info("♻️ quitting idle cookie driver") _quit_cookie_driver() except Exception: pass _reaper_started = False def _ensure_reaper_started(): global _reaper_started if _reaper_started: return try: threading.Thread(target=_driver_reaper_loop, daemon=True).start() _reaper_started = True LOGGER.debug("cookie driver reaper started") except Exception as e: LOGGER.warning("could not start cookie driver reaper: %s", e) def _grab_cookies_with_browser() -> Dict[str, Any]: drv = _ensure_cookie_driver() with _driver_lock: drv.get("https://lens.google.com/") jar = {} for c in drv.get_cookies(): dom = c.get("domain") or "" if dom.endswith(".google.com") or dom.endswith("google.com"): jar[c["name"]] = c["value"] return {"cookies": jar, "_source": "browser"} async def _cookie_header() -> str: global _cached_cookie_obj, _cached_cookie_fetched_at now = time.time() _ensure_reaper_started() def extract_obj(obj): if isinstance(obj, dict): return obj.get("cookies", obj) return obj with _cookie_lock: if _cached_cookie_obj: ttl = _BROWSER_TTL if _cached_cookie_obj.get("_source") == "browser" else _CACHE_TTL if (now - _cached_cookie_fetched_at) < ttl: return "; ".join(f"{k}={v}" for k, v in extract_obj(_cached_cookie_obj).items()) if COOKIE_JSON_URL: try: async with httpx.AsyncClient(timeout=5) as cli: resp = await cli.get(COOKIE_JSON_URL) resp.raise_for_status() data = resp.json() with _cookie_lock: data["_source"] = "remote" _cached_cookie_obj, _cached_cookie_fetched_at = data, now return "; ".join(f"{k}={v}" for k, v in extract_obj(data).items()) except Exception as e: LOGGER.warning("COOKIE_JSON_URL fetch failed: %s – falling back to headless chrome", e) loop = asyncio.get_running_loop() data: Dict[str, Any] = await loop.run_in_executor(None, _grab_cookies_with_browser) with _cookie_lock: _cached_cookie_obj, _cached_cookie_fetched_at = data, now return "; ".join(f"{k}={v}" for k, v in extract_obj(data).items()) def _sap_header(cookie_header: str) -> dict: origin = "https://lens.google.com" sid = None for c in cookie_header.split("; "): if c.startswith("__Secure-3PAPISID=") or c.startswith("SAPISID="): sid = c.split("=", 1)[1] break if not sid: return {} ts = int(time.time()) raw = f"{ts} {sid} {origin}" sig = hashlib.sha1(raw.encode()).hexdigest() return { "X-Origin": origin, "X-Goog-AuthUser": "0", "Authorization": f"SAPISIDHASH {ts}_{sig}", } def _json_url(loc: str, tl: str) -> str: from urllib.parse import urlparse, parse_qs q = parse_qs(urlparse(loc).query) return ( "https://lens.google.com/translatedimage?" f"vsrid={q.get('vsrid', [None])[0]}&gsessionid={q.get('gsessionid', [None])[0]}" f"&sl=auto&tl={tl}&sf=1.07&ib=1" ) async def translate_lens(image_url: str, lang: str = "en") -> dict: start_ts = time.time() debug: Dict[str, Any] = {"steps": [], "errors": []} ck = await _cookie_header() hdr = { "User-Agent": UA, "Cookie": ck, "Referer": "https://lens.google.com/", **_sap_header(ck), } async with httpx.AsyncClient() as cli: try: o = urlparse(image_url) referer = f"{o.scheme}://{o.netloc}/" if o.scheme and o.netloc else None hdr_img = {"User-Agent": UA} if referer: hdr_img["Referer"] = referer img_resp = await cli.get(image_url, headers=hdr_img, timeout=10) img_resp.raise_for_status() debug["steps"].append(f"fetched original image {image_url} status={img_resp.status_code}") except httpx.HTTPStatusError as he: code = he.response.status_code if he.response is not None else "NA" debug["errors"].append(f"fetch image HTTP {code} {image_url}") raise RuntimeError(f"fetch image HTTP {code}") except httpx.TimeoutException: debug["errors"].append(f"fetch image TIMEOUT {image_url}") raise RuntimeError("fetch image TIMEOUT") except Exception as e: debug["errors"].append(f"fetch image ERROR {type(e).__name__} {image_url}") raise RuntimeError(f"fetch image ERROR {type(e).__name__}") files = { "encoded_image": ("file.jpg", img_resp.content, "image/jpeg"), "sbisrc": (None, "browser"), "rt": (None, "j"), } up = await cli.post( "https://lens.google.com/v3/upload", files=files, headers=hdr, follow_redirects=False, timeout=10, ) debug["steps"].append(f"upload response status={up.status_code}") if up.status_code not in (302, 303): msg = f"Lens upload failed {up.status_code}" debug["errors"].append(msg) raise RuntimeError(msg) loc = up.headers.get("location", "") debug["steps"].append(f"got redirect location: {loc}") json_url = _json_url(loc, lang) debug["steps"].append(f"constructed json_url: {json_url}") js = await cli.get(json_url, headers=hdr, timeout=5) raw_body = js.text debug["steps"].append("fetched translation JSON") body = raw_body.lstrip(")]}'") try: info = json.loads(body) except Exception as e: debug["errors"].append(f"JSON parse failure: {e}; raw_body snippet: {body[:200]}") raise data_url = info.get("imageUrl", "") extracted_data_url = "" if data_url: if data_url.startswith("data:image/"): extracted_data_url = data_url debug["steps"].append("imageUrl already data URL") else: try: html = base64.b64decode(data_url).decode("utf-8", errors="ignore") m = re.search(r"data:image/[a-zA-Z]+;base64,[A-Za-z0-9+/=]+", html) if m: extracted_data_url = m.group(0) debug["steps"].append("extracted embedded data:image from base64 HTML") else: debug["steps"].append("no embedded data:image found inside decoded HTML") except Exception as e: debug["errors"].append(f"error decoding imageUrl: {e}") if not extracted_data_url and (data_url.startswith("http://") or data_url.startswith("https://")): try: fallback_img = await cli.get(data_url, headers={"User-Agent": UA}, timeout=5) fallback_img.raise_for_status() b64 = base64.b64encode(fallback_img.content).decode("utf-8") extracted_data_url = f"data:image/jpeg;base64,{b64}" debug["steps"].append("fetched fallback image URL and encoded to data URL") except Exception as e: debug["errors"].append(f"fallback fetch of imageUrl failed: {e}") translated_text = info.get("translatedTextFull", "") or info.get("translatedText", "") duration = time.time() - start_ts debug["duration_sec"] = duration return { "image": extracted_data_url, "text": translated_text, "loc": loc, "json_url": json_url, "raw_info": info, "debug": debug, }