_dash_dev / main.py
Adarshu07's picture
Create main.py
30f2952 verified
Raw
History Blame Contribute Delete
6.58 kB
import os
from typing import Literal, Optional
from fastapi import FastAPI, Query, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from gallery_scraper import GalleryScraper
app = FastAPI(
title="Perchance Gallery API",
version="1.0.0",
description="FastAPI server for Perchance gallery scraping",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def root():
return {
"ok": True,
"service": "Perchance Gallery API",
"endpoints": {
"/api/gallery": "Fetch gallery data",
"/health": "Health check",
},
}
@app.get("/health")
def health():
return {"status": "ok"}
@app.get("/api/gallery")
def api_gallery(
page: int = Query(1, ge=1, description="Starting page, 1-based"),
pages: int = Query(1, ge=1, le=50, description="How many pages to fetch"),
sort: Literal["recent", "trending", "top"] = Query("top"),
timeRange: Literal["all-time", "1-month"] = Query("all-time"),
contentFilter: Literal["none", "pg13"] = Query("none"),
concurrency: int = Query(1, ge=1, le=16),
timeout: int = Query(30, ge=5, le=120),
save: Optional[str] = Query(None, description="Optional local file path to save JSON"),
):
"""
Example:
/api/gallery?page=1&pages=3&sort=top&timeRange=all-time&contentFilter=none
"""
try:
start_page = page - 1
scraper = GalleryScraper(
pages=pages,
sort=sort,
time_range=timeRange,
content_filter=contentFilter,
concurrency=concurrency,
timeout=timeout,
save=save if save else False,
)
# Re-map pages so the scraper starts from the requested page.
# We do this by reusing the built params behavior in a small wrapper below.
data = _fetch_from_start_page(
start_page=start_page,
pages=pages,
sort=sort,
time_range=timeRange,
content_filter=contentFilter,
concurrency=concurrency,
timeout=timeout,
)
return JSONResponse(
{
"ok": True,
"page": page,
"pages": pages,
"sort": sort,
"timeRange": timeRange,
"contentFilter": contentFilter,
"count": len(data),
"data": data,
}
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Server error: {e}")
def _fetch_from_start_page(
start_page: int,
pages: int,
sort: str,
time_range: str,
content_filter: str,
concurrency: int,
timeout: int,
):
"""
Helper that fetches from an arbitrary starting page.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import cloudscraper
from bs4 import BeautifulSoup
from html import unescape
GALLERY_URL = "https://image-generation.perchance.org/gallery"
PER_PAGE = 200
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/145.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Referer": "https://image-generation.perchance.org/",
"Origin": "https://image-generation.perchance.org",
}
def clean(value):
if value is None:
return ""
return unescape(str(value)).replace("\r", "\n").strip()
def build_params(page_index: int):
skip = page_index * PER_PAGE
params = {
"sort": sort,
"timeRange": time_range,
"hideIfScoreIsBelow": "-1",
"contentFilter": content_filter,
"subChannel": "public",
"channel": "ai-text-to-image-generator",
}
if skip > 0:
params["skip"] = skip
return params
def parse_page(html: str):
if not html:
return []
soup = BeautifulSoup(html, "html.parser")
items = []
for card in soup.select(".imageCtn"):
prompt = clean(card.get("data-prompt"))
negative_prompt = clean(card.get("data-negative-prompt"))
guidance_scale = clean(card.get("data-guidance-scale"))
seed = clean(card.get("data-seed"))
nsfw = clean(card.get("data-is-nsfw")).lower() == "true"
title_attr = clean(card.get("data-title"))
img_tag = card.select_one(".imageWrapperInner img.image")
image_url = img_tag.get("src", "") if img_tag else ""
title_el = card.select_one(".image-title")
visible_title = clean(title_el.get_text(" ", strip=True)) if title_el else ""
item = {
"image_url": image_url,
"title": title_attr or visible_title,
"prompt": prompt,
"guidance_scale": guidance_scale,
"seed": seed,
"nsfw": nsfw,
}
if negative_prompt:
item["negative_prompt"] = negative_prompt
items.append(item)
return items
scraper = cloudscraper.create_scraper()
results = {}
def fetch_one(i: int):
page_index = start_page + i
try:
resp = scraper.get(
GALLERY_URL,
params=build_params(page_index),
headers=headers,
timeout=timeout,
)
if resp.status_code != 200:
return i, []
return i, parse_page(resp.text)
except Exception:
return i, []
if concurrency <= 1:
for i in range(pages):
_, items = fetch_one(i)
results[i] = items
else:
with ThreadPoolExecutor(max_workers=concurrency) as pool:
futures = [pool.submit(fetch_one, i) for i in range(pages)]
for future in as_completed(futures):
i, items = future.result()
results[i] = items
merged = []
for i in range(pages):
merged.extend(results.get(i, []))
for idx, item in enumerate(merged, start=1):
item["no"] = idx
return merged