#!/usr/bin/env python3 """ Visual Search Product Indexer Indexes Shopify products into Pinecone using local Jina CLIP v2 model. Uses the SAME model as the HF Space search endpoint for compatible embeddings. Usage: python index.py # Index all products python index.py --limit 10 # Test with 10 products python index.py --clear # Clear index first python index.py --dry-run # Test without uploading """ import os import sys import argparse import time from io import BytesIO from pathlib import Path try: import torch from PIL import Image import requests from tqdm import tqdm from pinecone import Pinecone except ImportError as e: print(f"Missing package: {e}") print("Run: pip install -r requirements.txt") sys.exit(1) def load_env(): """Load .env file.""" env_path = Path(__file__).parent / '.env' if env_path.exists(): print(f"Loading {env_path}") for line in env_path.read_text().splitlines(): line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) os.environ[key.strip()] = value.strip().strip('"\'') load_env() # Config SHOPIFY_STORE = os.environ.get('SHOPIFY_STORE', '25c0da-4') SHOPIFY_ADMIN_TOKEN = os.environ.get('SHOPIFY_ADMIN_TOKEN') PINECONE_API_KEY = os.environ.get('PINECONE_API_KEY') PINECONE_HOST = os.environ.get('PINECONE_HOST') API_VERSION = "2024-01" # Model (loaded lazily) model = None device = None def check_config(): """Validate environment variables.""" missing = [] if not SHOPIFY_ADMIN_TOKEN: missing.append('SHOPIFY_ADMIN_TOKEN') if not PINECONE_API_KEY: missing.append('PINECONE_API_KEY') if not PINECONE_HOST: missing.append('PINECONE_HOST') if missing: print("Missing environment variables:") for v in missing: print(f" - {v}") print("\nCopy .env.example to .env and fill in values") sys.exit(1) def load_model(): """Load Jina CLIP v2 model.""" global model, device print("Loading Jina CLIP v2 model...") print("(First run downloads ~2GB)") from transformers import AutoModel device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Using: {device.upper()}") model = AutoModel.from_pretrained( "jinaai/jina-clip-v2", trust_remote_code=True ).to(device).eval() print("Model loaded!") def get_pinecone(): """Connect to Pinecone.""" print("Connecting to Pinecone...") pc = Pinecone(api_key=PINECONE_API_KEY) index = pc.Index(host=f"https://{PINECONE_HOST}") stats = index.describe_index_stats() print(f"Connected! {stats.get('total_vector_count', 0)} vectors") return index def fetch_products(limit=None, tags=None): """Fetch products from Shopify.""" print(f"Fetching products from {SHOPIFY_STORE}...") if tags: print(f" Tags filter: {tags}") products = [] url = f"https://{SHOPIFY_STORE}.myshopify.com/admin/api/{API_VERSION}/products.json?limit=250&status=active&order=created_at%20desc" headers = {"X-Shopify-Access-Token": SHOPIFY_ADMIN_TOKEN} while url: resp = requests.get(url, headers=headers, timeout=30) resp.raise_for_status() batch = resp.json().get('products', []) # Filter by tags if tags: tag_list = [t.strip().lower() for t in tags.split(',')] batch = [p for p in batch if any( t.lower() in [x.strip().lower() for x in p.get('tags', '').split(',')] for t in tag_list )] products.extend(batch) print(f" {len(products)} products...", end='\r') if limit and len(products) >= limit: products = products[:limit] break # Pagination url = None link = resp.headers.get('Link', '') if 'rel="next"' in link: for part in link.split(','): if 'rel="next"' in part: url = part.split('<')[1].split('>')[0] print(f"\nFetched {len(products)} products") return products def download_image(url): """Download image as PIL.""" try: url = url + ('&' if '?' in url else '?') + 'width=512' resp = requests.get(url, timeout=15) resp.raise_for_status() return Image.open(BytesIO(resp.content)).convert('RGB') except: return None def get_embedding(image): """Generate embedding.""" global model try: with torch.no_grad(): emb = model.encode_image(image) if hasattr(emb, 'cpu'): emb = emb.cpu().numpy() emb = emb.flatten() emb = emb / (emb ** 2).sum() ** 0.5 # L2 normalize if len(emb) > 512: emb = emb[:512] return emb.tolist() except Exception as e: print(f"\nEmbedding error: {e}") return None def get_price(product): """Extract price from variants.""" try: return float(product.get('variants', [{}])[0].get('price', 0)) except: return 0.0 def main(): parser = argparse.ArgumentParser(description='Index products for visual search') parser.add_argument('--limit', type=int, help='Limit products') parser.add_argument('--tags', type=str, default='clothing,footwear', help='Filter by tags') parser.add_argument('--batch-size', type=int, default=100, help='Pinecone batch size') parser.add_argument('--clear', action='store_true', help='Clear index first') parser.add_argument('--dry-run', action='store_true', help='No upload') args = parser.parse_args() print("=" * 50) print(" Visual Search Indexer") print("=" * 50) check_config() load_model() index = None if not args.dry_run: index = get_pinecone() if args.clear: print("Clearing index...") index.delete(delete_all=True) time.sleep(2) products = fetch_products(limit=args.limit, tags=args.tags) if not products: print("No products found!") return print(f"\nIndexing {len(products)} products...") vectors = [] ok, skip, err = 0, 0, 0 for product in tqdm(products, desc="Processing"): if not product.get('images'): skip += 1 continue try: # Get default image images = product['images'] img_data = next((i for i in images if i.get('position') == 1), images[0]) img_url = img_data['src'] # Download & embed img = download_image(img_url) if not img: err += 1 continue emb = get_embedding(img) if not emb: err += 1 continue # Build vector with metadata for future analysis tags = [t.strip() for t in product.get('tags', '').split(',') if t.strip()] vectors.append({ 'id': str(product['id']), 'values': emb, 'metadata': { 'product_id': product['id'], 'handle': product['handle'], 'title': product['title'], 'vendor': product.get('vendor', ''), 'product_type': product.get('product_type', ''), 'tags': tags[:20], 'price': get_price(product), 'created_at': product.get('created_at', ''), 'image_url': img_url } }) ok += 1 # Batch upload if len(vectors) >= args.batch_size and not args.dry_run: index.upsert(vectors=vectors) vectors = [] except Exception as e: err += 1 # Final batch if vectors and not args.dry_run: index.upsert(vectors=vectors) print("\n" + "=" * 50) print(" Done!") print("=" * 50) print(f" Indexed: {ok}") print(f" Skipped: {skip}") print(f" Errors: {err}") if args.dry_run: print(" (dry run - nothing uploaded)") print("=" * 50) if __name__ == "__main__": main()