api-ocr-test / app /lens_images_core.py
plan291037's picture
Update app/lens_images_core.py
3d91b9b verified
Raw
History Blame
11.3 kB
import os, json, time, hashlib, httpx, base64, re, asyncio, threading, shutil, logging
from typing import Dict, Any
from urllib.parse import urlparse
import chromedriver_autoinstaller
from selenium import webdriver
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.chrome.service import Service as ChromeService
chromedriver_autoinstaller.install()
LOGGER = logging.getLogger("lens_images_core")
if not LOGGER.handlers:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
)
COOKIE_JSON_URL = os.getenv("COOKIE_JSON_URL")
if not COOKIE_JSON_URL:
raise RuntimeError("Missing COOKIE_JSON_URL secret. Set it in Space Settings > Secrets.")
UA = "Mozilla/5.0 (Lens OCR Images)"
_COMMON_CHROME_PATHS = [
# Linux
"/usr/bin/google-chrome", "/usr/bin/chromium", "/usr/bin/chromium-browser",
"/snap/bin/chromium",
"/opt/google/chrome/google-chrome",
# macOS
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Chromium.app/Contents/MacOS/Chromium",
# Windows
r"C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe",
r"C:\\Program Files (x86)\\Google\\Chrome\\Application\\chrome.exe",
]
def _find_chrome_binary() -> str | None:
env = os.getenv("CHROME_BINARY")
if env and shutil.which(env):
return env
for p in _COMMON_CHROME_PATHS:
if os.path.isfile(p) and os.access(p, os.X_OK):
return p
try:
import subprocess, shlex
out = subprocess.check_output(shlex.split("which google-chrome"), stderr=subprocess.DEVNULL).decode().strip()
if out:
return out
except Exception:
pass
return None
def _build_chrome() -> webdriver.Chrome:
bin_loc = _find_chrome_binary() or "/usr/bin/chromium"
drv_path = os.getenv("CHROMEDRIVER", "/usr/bin/chromedriver")
opts = ChromeOptions()
opts.binary_location = bin_loc
extra = os.getenv(
"CHROME_EXTRA_ARGS",
"--disable-gpu --no-sandbox --disable-dev-shm-usage --window-size=1920,1080 --headless=new",
).split()
for a in extra:
if a:
opts.add_argument(a)
service = ChromeService(executable_path=drv_path)
return webdriver.Chrome(service=service, options=opts)
_cached_cookie_obj: Dict[str, Any] | None = None
_cached_cookie_fetched_at: float = 0.0
_CACHE_TTL = 300
_BROWSER_TTL = 900
_cookie_lock = threading.Lock()
_IDLE_TIMEOUT = int(os.getenv("CHROME_IDLE_SECONDS", "60"))
_driver_lock = threading.Lock()
_global_driver = None
_driver_last_use = 0.0
def _ensure_cookie_driver():
global _global_driver, _driver_last_use
with _driver_lock:
if _global_driver is None:
LOGGER.info("▶️ starting headless Chrome for cookies")
_global_driver = _build_chrome()
_driver_last_use = time.time()
return _global_driver
def _quit_cookie_driver():
global _global_driver
try:
if _global_driver:
_global_driver.quit()
except Exception:
pass
finally:
_global_driver = None
def _driver_reaper_loop():
global _driver_last_use
while True:
try:
time.sleep(1)
with _driver_lock:
if _global_driver and (time.time() - _driver_last_use) > _IDLE_TIMEOUT:
LOGGER.info("♻️ quitting idle cookie driver")
_quit_cookie_driver()
except Exception:
pass
_reaper_started = False
def _ensure_reaper_started():
global _reaper_started
if _reaper_started:
return
try:
threading.Thread(target=_driver_reaper_loop, daemon=True).start()
_reaper_started = True
LOGGER.debug("cookie driver reaper started")
except Exception as e:
LOGGER.warning("could not start cookie driver reaper: %s", e)
def _grab_cookies_with_browser() -> Dict[str, Any]:
drv = _ensure_cookie_driver()
with _driver_lock:
drv.get("https://lens.google.com/")
jar = {}
for c in drv.get_cookies():
dom = c.get("domain") or ""
if dom.endswith(".google.com") or dom.endswith("google.com"):
jar[c["name"]] = c["value"]
return {"cookies": jar, "_source": "browser"}
async def _cookie_header() -> str:
global _cached_cookie_obj, _cached_cookie_fetched_at
now = time.time()
_ensure_reaper_started()
def extract_obj(obj):
if isinstance(obj, dict):
return obj.get("cookies", obj)
return obj
with _cookie_lock:
if _cached_cookie_obj:
ttl = _BROWSER_TTL if _cached_cookie_obj.get("_source") == "browser" else _CACHE_TTL
if (now - _cached_cookie_fetched_at) < ttl:
return "; ".join(f"{k}={v}" for k, v in extract_obj(_cached_cookie_obj).items())
if COOKIE_JSON_URL:
try:
async with httpx.AsyncClient(timeout=5) as cli:
resp = await cli.get(COOKIE_JSON_URL)
resp.raise_for_status()
data = resp.json()
with _cookie_lock:
data["_source"] = "remote"
_cached_cookie_obj, _cached_cookie_fetched_at = data, now
return "; ".join(f"{k}={v}" for k, v in extract_obj(data).items())
except Exception as e:
LOGGER.warning("COOKIE_JSON_URL fetch failed: %s – falling back to headless chrome", e)
loop = asyncio.get_running_loop()
data: Dict[str, Any] = await loop.run_in_executor(None, _grab_cookies_with_browser)
with _cookie_lock:
_cached_cookie_obj, _cached_cookie_fetched_at = data, now
return "; ".join(f"{k}={v}" for k, v in extract_obj(data).items())
def _sap_header(cookie_header: str) -> dict:
origin = "https://lens.google.com"
sid = None
for c in cookie_header.split("; "):
if c.startswith("__Secure-3PAPISID=") or c.startswith("SAPISID="):
sid = c.split("=", 1)[1]
break
if not sid:
return {}
ts = int(time.time())
raw = f"{ts} {sid} {origin}"
sig = hashlib.sha1(raw.encode()).hexdigest()
return {
"X-Origin": origin,
"X-Goog-AuthUser": "0",
"Authorization": f"SAPISIDHASH {ts}_{sig}",
}
def _json_url(loc: str, tl: str) -> str:
from urllib.parse import urlparse, parse_qs
q = parse_qs(urlparse(loc).query)
return (
"https://lens.google.com/translatedimage?"
f"vsrid={q.get('vsrid', [None])[0]}&gsessionid={q.get('gsessionid', [None])[0]}"
f"&sl=auto&tl={tl}&sf=1.07&ib=1"
)
async def translate_lens(image_url: str, lang: str = "en") -> dict:
start_ts = time.time()
debug: Dict[str, Any] = {"steps": [], "errors": []}
ck = await _cookie_header()
hdr = {
"User-Agent": UA,
"Cookie": ck,
"Referer": "https://lens.google.com/",
**_sap_header(ck),
}
async with httpx.AsyncClient() as cli:
try:
o = urlparse(image_url)
referer = f"{o.scheme}://{o.netloc}/" if o.scheme and o.netloc else None
hdr_img = {"User-Agent": UA}
if referer:
hdr_img["Referer"] = referer
img_resp = await cli.get(image_url, headers=hdr_img, timeout=10)
img_resp.raise_for_status()
debug["steps"].append(f"fetched original image {image_url} status={img_resp.status_code}")
except httpx.HTTPStatusError as he:
code = he.response.status_code if he.response is not None else "NA"
debug["errors"].append(f"fetch image HTTP {code} {image_url}")
raise RuntimeError(f"fetch image HTTP {code}")
except httpx.TimeoutException:
debug["errors"].append(f"fetch image TIMEOUT {image_url}")
raise RuntimeError("fetch image TIMEOUT")
except Exception as e:
debug["errors"].append(f"fetch image ERROR {type(e).__name__} {image_url}")
raise RuntimeError(f"fetch image ERROR {type(e).__name__}")
files = {
"encoded_image": ("file.jpg", img_resp.content, "image/jpeg"),
"sbisrc": (None, "browser"),
"rt": (None, "j"),
}
up = await cli.post(
"https://lens.google.com/v3/upload",
files=files,
headers=hdr,
follow_redirects=False,
timeout=10,
)
debug["steps"].append(f"upload response status={up.status_code}")
if up.status_code not in (302, 303):
msg = f"Lens upload failed {up.status_code}"
debug["errors"].append(msg)
raise RuntimeError(msg)
loc = up.headers.get("location", "")
debug["steps"].append(f"got redirect location: {loc}")
json_url = _json_url(loc, lang)
debug["steps"].append(f"constructed json_url: {json_url}")
js = await cli.get(json_url, headers=hdr, timeout=5)
raw_body = js.text
debug["steps"].append("fetched translation JSON")
body = raw_body.lstrip(")]}'")
try:
info = json.loads(body)
except Exception as e:
debug["errors"].append(f"JSON parse failure: {e}; raw_body snippet: {body[:200]}")
raise
data_url = info.get("imageUrl", "")
extracted_data_url = ""
if data_url:
if data_url.startswith("data:image/"):
extracted_data_url = data_url
debug["steps"].append("imageUrl already data URL")
else:
try:
html = base64.b64decode(data_url).decode("utf-8", errors="ignore")
m = re.search(r"data:image/[a-zA-Z]+;base64,[A-Za-z0-9+/=]+", html)
if m:
extracted_data_url = m.group(0)
debug["steps"].append("extracted embedded data:image from base64 HTML")
else:
debug["steps"].append("no embedded data:image found inside decoded HTML")
except Exception as e:
debug["errors"].append(f"error decoding imageUrl: {e}")
if not extracted_data_url and (data_url.startswith("http://") or data_url.startswith("https://")):
try:
fallback_img = await cli.get(data_url, headers={"User-Agent": UA}, timeout=5)
fallback_img.raise_for_status()
b64 = base64.b64encode(fallback_img.content).decode("utf-8")
extracted_data_url = f"data:image/jpeg;base64,{b64}"
debug["steps"].append("fetched fallback image URL and encoded to data URL")
except Exception as e:
debug["errors"].append(f"fallback fetch of imageUrl failed: {e}")
translated_text = info.get("translatedTextFull", "") or info.get("translatedText", "")
duration = time.time() - start_ts
debug["duration_sec"] = duration
return {
"image": extracted_data_url,
"text": translated_text,
"loc": loc,
"json_url": json_url,
"raw_info": info,
"debug": debug,
}