import os import re import sys import time import json import sqlite3 import random import threading import logging from datetime import datetime from urllib.parse import urlparse, unquote from contextlib import contextmanager from bs4 import BeautifulSoup, Comment import markdownify import pandas as pd import gradio as gr from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError from huggingface_hub import HfApi, repo_exists, hf_hub_download # --- Logging Configuration --- logging.basicConfig( level=logging.INFO, format='[%(asctime)s] [%(levelname)s] [%(threadName)s] %(message)s', handlers=[logging.StreamHandler(sys.stdout)] ) logger = logging.getLogger("WikiniumMaker") # --- Constants & Global Configurations --- DB_PATH = "wikinium_engine.db" REPOS_ID = "Rikunarita-ORG/Wikinium" COMMIT_INTERVAL_SECONDS = 3600 # Commit and push every 1 hour INITIAL_LANGUAGES = ["ja", "en", "zh", "ko", "de", "fr"] MAX_LANG_RATIO = 0.25 # 特定言語が全体の25%を超えたら制限をかける # Strict multi-language noise section header exact-match keywords BAD_SECTIONS = { # Japanese "関連項目", "参考文献", "外部リンク", "注釈", "出典", "脚注", "参考資料", "文献", "関連文献", "アルバム解説", # English "see also", "references", "external links", "notes", "further reading", "bibliography", "sources", "academic journals", "citations", # Chinese "参见", "参考文献", "外部链接", "注释", "相关条目", "外部連結", "參見", # Korean "같이 보기", "각주", "참고 문헌", "외부 링크", "참고 자료", # German "siehe auch", "einzelnachweise", "weblinks", "literatur", "anmerkungen", "quellen", # French "voir aussi", "notes et références", "liens externes", "bibliographie", "articles connexes", "sources" } HF_TOKEN = os.environ.get("HF_TOKEN") db_lock = threading.RLock() # 再入可能ロックに変更しデッドロックを完全予防 # --- SQLite Database Operations (Thread-Safe with WAL Mode & Auto-Close Context) --- @contextmanager def get_db_context(): """ sqlite3の接続リークを完全に防ぐための堅牢なコンテキストマネージャ。 __exit__ 時に commit/rollback だけでなく、必ず .close() を実行してファイル記述子を解放します。 """ conn = sqlite3.connect(DB_PATH, timeout=60.0) conn.execute("PRAGMA journal_mode=WAL;") conn.execute("PRAGMA synchronous=NORMAL;") try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() def init_db(): with get_db_context() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS visited_urls ( url TEXT PRIMARY KEY, lang TEXT, title TEXT, visited_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS queue_urls ( url TEXT PRIMARY KEY, lang TEXT, added_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS buffered_articles ( id INTEGER PRIMARY KEY AUTOINCREMENT, lang TEXT, title TEXT, text TEXT, url TEXT, added_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS statistics ( key TEXT PRIMARY KEY, value TEXT ) """) conn.execute("INSERT OR IGNORE INTO statistics (key, value) VALUES ('total_processed', '0')") conn.execute("INSERT OR IGNORE INTO statistics (key, value) VALUES ('last_commit_time', 'Never')") init_db() def is_url_visited(url): with db_lock: with get_db_context() as conn: cursor = conn.cursor() cursor.execute("SELECT 1 FROM visited_urls WHERE url = ?", (url,)) return cursor.fetchone() is not None def add_visited_url(url, lang, title): with db_lock: with get_db_context() as conn: conn.execute("INSERT OR IGNORE INTO visited_urls (url, lang, title) VALUES (?, ?, ?)", (url, lang, title)) conn.execute("UPDATE statistics SET value = CAST(CAST(value AS INTEGER) + 1 AS TEXT) WHERE key = 'total_processed'") def enqueue_urls(urls_with_lang): # 偏り防止のため、制限対象の言語はキューに追加しない stats, lang_counts = get_statistics() try: total = int(stats.get("total_processed", "0")) except ValueError: total = 0 restricted_langs = set() if total > 500: # ある程度データが集まってから判定 for l, count in lang_counts.items(): if (count / total) > MAX_LANG_RATIO: restricted_langs.add(l) filtered_urls = [item for item in urls_with_lang if item[1] not in restricted_langs] if not filtered_urls: return with db_lock: with get_db_context() as conn: cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM queue_urls") if cursor.fetchone()[0] > 50000: return conn.executemany("INSERT OR IGNORE INTO queue_urls (url, lang) VALUES (?, ?)", filtered_urls) def dequeue_any_url(): with db_lock: with get_db_context() as conn: cursor = conn.cursor() cursor.execute("SELECT url, lang FROM queue_urls ORDER BY added_at ASC LIMIT 1") row = cursor.fetchone() if row: url, lang = row[0], row[1] conn.execute("DELETE FROM queue_urls WHERE url = ?", (url,)) return url, lang return None, None def buffer_article(lang, title, text, url): with db_lock: with get_db_context() as conn: conn.execute( "INSERT INTO buffered_articles (lang, title, text, url) VALUES (?, ?, ?, ?)", (lang, title, text, url) ) def get_active_languages(): stats, lang_counts = get_statistics() try: total = int(stats.get("total_processed", "0")) except ValueError: total = 0 with db_lock: with get_db_context() as conn: cursor = conn.cursor() cursor.execute("SELECT DISTINCT lang FROM visited_urls") langs = [r[0] for r in cursor.fetchall()] all_langs = list(set(langs + INITIAL_LANGUAGES)) # 25%を超えている言語はランダム選択の候補から除外(偏り防止) if total > 500: available_langs = [l for l in all_langs if (lang_counts.get(l, 0) / total) <= MAX_LANG_RATIO] if available_langs: return available_langs return all_langs def get_statistics(): with db_lock: with get_db_context() as conn: cursor = conn.cursor() cursor.execute("SELECT key, value FROM statistics") stats = dict(cursor.fetchall()) cursor.execute("SELECT lang, COUNT(*) FROM visited_urls GROUP BY lang") lang_counts = dict(cursor.fetchall()) cursor.execute("SELECT COUNT(*) FROM queue_urls") queue_count = cursor.fetchone()[0] stats["queue_count"] = str(queue_count) return stats, lang_counts # --- Robust Multi-Language Markdown Clean & Convert Logic --- def clean_and_convert_html(html_content, url): soup = BeautifulSoup(html_content, "html.parser") # Pre-parse: 完全に不要なシステムタグを最初からパージしてメモリと構文解析を最適化 for noisy_tag in list(soup(["script", "style", "noscript", "iframe", "header", "footer", "nav"])): noisy_tag.extract() # コメントのパージ for comment in list(soup.find_all(string=lambda text: isinstance(text, Comment))): comment.extract() content_area = soup.find(class_="mw-parser-output") if not content_area: content_area = soup.find(id="content") if not content_area: content_area = soup # テーブルセルの構造維持を強化(マークダウン変換崩れの防止) for table in content_area.find_all("table"): for cell in table.find_all(["th", "td"]): for br in list(cell.find_all("br")): br.replace_with(" ") # セル内のブロック要素をアンラップしてインライン化(イテレータ破壊防止のため静的リスト化) for block_el in list(cell.find_all(["p", "div"])): block_el.unwrap() # 1. Advanced Math Equation Protection & Conversion Setup # markdownifyによる「$」記号のエスケープ(\$への強制変換)を防ぐためのプレースホルダー辞書 math_placeholders = {} ph_counter = 0 for math_el in content_area.find_all([re.compile(r"span|div|semantics"), "math"], class_=re.compile(r"mwe-math|math|texhtml")): img = math_el.find("img") tex = None if img and img.get("alt"): tex = img["alt"].strip() else: # さまざまなTex型エンコーディング属性(text/x-texなど)を柔軟に捕捉して脱落を防ぐ annotation = math_el.find("annotation", encoding=re.compile(r"tex", re.IGNORECASE)) if annotation: tex = annotation.get_text().strip() if tex: # 括弧のDepthネストカウントによって安全に最外周の \displaystyle のみを除去 if tex.startswith(r'{\displaystyle') and tex.endswith('}'): depth = 0 is_pure_envelope = True for idx, char in enumerate(tex): if char == '{': depth += 1 elif char == '}': depth -= 1 if depth == 0 and idx < len(tex) - 1: # 末尾に到達する前に最外周の波括弧が閉じているため、独立した複数ブロック構造と判定 is_pure_envelope = False break if is_pure_envelope and depth == 0: # 安全に最外周の '{\displaystyle' と末尾の '}' を剥ぎ取る tex = tex[14:-1].strip() tex = tex.strip().replace('\n', ' ') is_block = False if "block" in "".join(math_el.get("class", [])).lower() or math_el.name == "div": is_block = True elif math_el.parent and math_el.parent.name in ["div", "dd"]: is_block = True # 一時的な安全プレースホルダーに置換 ph_key = f"WIKINIUM_MATH_PH_{ph_counter}" ph_counter += 1 math_placeholders[ph_key] = (is_block, tex) math_el.replace_with(soup.new_string(f" {ph_key} ")) else: tex_text = math_el.get_text().strip().replace('\n', ' ') if tex_text: is_block = (math_el.name == "div") ph_key = f"WIKINIUM_MATH_PH_{ph_counter}" ph_counter += 1 math_placeholders[ph_key] = (is_block, tex_text) math_el.replace_with(soup.new_string(f" {ph_key} ")) # 2. Layout, UI, Navigation & Template elements の厳格な排除 bad_selectors = [ ".mw-editsection", ".toc", "#toc", ".navbox", ".ambox", ".asiabox", ".banner", ".metadata", ".catlinks", ".reference", "sup.reference", ".mw-jump-link", ".hatnote", ".sidebar", ".vertical-navbox", ".noprint", "#mw-navigation", ".mw-indicators", ".reflist", "ol.references", ".mw-references-wrap", ".thumbaction", ".sistersitebox", ".mbox-small", ".navigation-not-searchable", ".geo-default", ".geo-nondefault", ".cs1-doc-fragment", ".mw-empty-elt", ".mw-authority-control", ".asbox", "#coordinates", ".navbox-styles", ".printfooter", ".mw-editsection-like", ".mw-pt-languages", ".mw-translation-selector" ] for selector in bad_selectors: for element in list(content_area.select(selector)): element.extract() # 画像タグそのものは削除(LLMはテキスト情報を学習するため) for img in list(content_area.find_all("img")): img.extract() # 3. 参考文献、外部リンク、関連項目などの不要セクションのピンポイント範囲削除(イテレータ保護のためリスト化) for heading in list(content_area.find_all(["h2", "h3"])): if heading.parent is None: continue headline_span = heading.find(class_="mw-headline") text = headline_span.get_text().strip().lower() if headline_span else heading.get_text().strip().lower() text_clean = re.sub(r'^[::\s.・]+', '', text) text_clean = re.sub(r'[::\s.・]+$', '', text_clean).strip() if text_clean in BAD_SECTIONS: targets = [heading] for sibling in heading.find_next_siblings(): if sibling.name in ["h1", "h2", "h3"]: if heading.name == "h2" and sibling.name in ["h1", "h2"]: break if heading.name == "h3" and sibling.name in ["h1", "h2", "h3"]: break targets.append(sibling) for t in targets: if t.parent: t.extract() # 4. ハイパーリンクの除去(テキストの文脈を維持) for a_tag in list(content_area.find_all("a")): if a_tag.get("href") and (a_tag["href"].startswith("#cite_note") or "reference" in a_tag.get("class", [])): a_tag.extract() continue a_tag.unwrap() # 5. Markdownifyによる高品質構造変換(不正な独自引数を排除しTypeErrorを防止) md_output = markdownify.markdownify( str(content_area), heading_style="ATX" ) # 6. ポストプロセス:生成されたマークダウンデータの徹底的なクレンジングと整形強化 md_output = re.sub(r'https?://\S+', '', md_output) # 生URL文字列の除去 md_output = re.sub(r'\textbf+\s*\textbf+', '', md_output) # 空の強調記法のパージ md_output = re.sub(r'\*\*+\s*\*+', '', md_output) md_output = re.sub(r'\s_+\s_+', '', md_output) # 空の斜体記法のパージ # 見出し記号(#)の後に適切なスペースを保証 md_output = re.sub(r'^(#+)\s*([^#\s].*)$', r'\1 \2', md_output, flags=re.MULTILINE) md_output = "\n".join([line.rstrip() for line in md_output.splitlines()]) # 各行の末尾空白をトリミング md_output = re.sub(r'\n{3,}', '\n\n', md_output) # 3つ以上の過剰な連続改行を2つに集約 # 7. すべてのクレンジング完了後、数式プレースホルダーを正規のLaTeX記法に完全安全差し戻し for ph_key, (is_block, tex_val) in math_placeholders.items(): if is_block: md_output = md_output.replace(ph_key, f"\n$$\n{tex_val}\n$$\n") else: md_output = md_output.replace(ph_key, f" ${tex_val} ") return md_output.strip() # --- Continuous Crawler Core Loop --- def crawler_worker(thread_id): logger.info(f"Worker-Thread-{thread_id} initiated and running fully.") with sync_playwright() as p: browser = p.chromium.launch( headless=True, args=["--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-gpu"] ) context = browser.new_context( user_agent="WikiniumMaker/2.0 (High-Speed Parquet Dataset Pipeline; Engine Spin)" ) page = context.new_page() page.route("**/*", lambda route: route.abort() if route.request.resource_type in ["image", "stylesheet", "font", "media"] else route.continue_()) while True: target_url, current_lang = dequeue_any_url() if not target_url: active_langs = get_active_languages() current_lang = random.choice(active_langs) target_url = f"https://{current_lang}.wikipedia.org/wiki/Special:Random" if is_url_visited(target_url) and "Special:Random" not in target_url: continue logger.info(f"[Worker-{thread_id}] Scraping [{current_lang}]: {target_url}") html_content, final_url, page_title = "", "", "" success = False for attempt in range(3): try: response = page.goto(target_url, timeout=20000, wait_until="domcontentloaded") if response and response.status == 429: time.sleep(5.0) continue page.wait_for_selector("#firstHeading", timeout=6000) html_content = page.content() final_url = page.url page_title = page.locator("#firstHeading").inner_text().strip() success = True break except PlaywrightTimeoutError: time.sleep(2.0) except Exception as e: logger.error(f"[Worker-{thread_id}] Browser exception: {str(e)}") break if not success or not html_content or not page_title: continue # リダイレクト後の最終URLで再度重複排除の厳格な二重チェック if is_url_visited(final_url): continue try: soup_for_links = BeautifulSoup(html_content, "html.parser") discovered_links = [] for a in soup_for_links.find_all("a", href=True): href = a["href"] if href.startswith("/wiki/") and not any(x in href for x in [":", "#", "Main_Page"]): full_link = f"https://{current_lang}.wikipedia.org{href}" discovered_links.append((full_link, current_lang)) for a in soup_for_links.find_all("a", class_="interlanguage-link-target"): href = a.get("href") if href and ".wikipedia.org/wiki/" in href: parsed = urlparse(href) lang_code = parsed.netloc.split(".")[0] if not any(x in parsed.path for x in [":", "Main_Page"]): discovered_links.append((href, lang_code)) if discovered_links: enqueue_urls(discovered_links) markdown_text = clean_and_convert_html(html_content, final_url) if markdown_text and len(markdown_text) > 150: # 記事本文の最上部に構造化されたタイトルをインジェクト embedded_text = f"# {page_title}\n\n{markdown_text}" buffer_article(current_lang, page_title, embedded_text, final_url) add_visited_url(final_url, current_lang, page_title) except Exception as e: logger.error(f"Parsing error on {final_url}: {str(e)}") # --- Hugging Face Dataset Engine (Atomic Parquet Commits) --- def hf_uploader_loop(): if not HF_TOKEN: logger.error("HF_TOKEN is missing from environment. Automated Parquet export suspended.") return api = HfApi(token=HF_TOKEN) try: if not repo_exists(repo_id=REPOS_ID, repo_type="dataset"): api.create_repo(repo_id=REPOS_ID, repo_type="dataset", private=True) logger.info(f"Initialized private dataset repository: {REPOS_ID}") except Exception as e: logger.error(f"Repository verification failed: {str(e)}") while True: time.sleep(COMMIT_INTERVAL_SECONDS) logger.info("Initializing atomic Parquet serialization and sync batch...") with db_lock: with get_db_context() as conn: cursor = conn.cursor() cursor.execute("SELECT id, lang, title, text, url FROM buffered_articles") rows = cursor.fetchall() if not rows: logger.info("No buffered articles found. Skipping synchronization loop.") continue lang_groups = {} for r in rows: row_id, lang, title, text, url = r if lang not in lang_groups: lang_groups[lang] = [] lang_groups[lang].append({"id": row_id, "title": title, "text": text, "url": url}) for lang, items in lang_groups.items(): df = pd.DataFrame(items) df_dataset = df[["title", "text", "url"]].copy() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") local_filename = f"tmp_{lang}_{timestamp}.parquet" repo_file_path = f"{lang}/train-{timestamp}.parquet" try: # Fast Apache Parquet Serialization df_dataset.to_parquet(local_filename, index=False, engine='pyarrow', compression='snappy') # 指数バックオフによる堅牢なHFアップロードリトライロジック uploaded = False for attempt in range(3): try: api.upload_file( path_or_fileobj=local_filename, path_in_repo=repo_file_path, repo_id=REPOS_ID, repo_type="dataset" ) uploaded = True break except Exception as upload_err: wait_time = 2 ** attempt logger.warning(f"Upload attempt {attempt+1} failed for [{lang}]. Retrying in {wait_time}s... Error: {str(upload_err)}") time.sleep(wait_time) if not uploaded: raise RuntimeError(f"Failed to upload {repo_file_path} after multiple retries.") logger.info(f"Committed {len(df_dataset)} items to private subset [{lang}] -> {repo_file_path}") uploaded_ids = [x["id"] for x in items] # ループの外側でコンテキストを定義し、1トランザクションで一括削除を高速実行 with db_lock: with get_db_context() as conn: for i in range(0, len(uploaded_ids), 400): chunk = uploaded_ids[i:i+400] placeholders = ",".join(["?"] * len(chunk)) conn.execute(f"DELETE FROM buffered_articles WHERE id IN ({placeholders})", chunk) except Exception as e: logger.error(f"Failed synchronization shard for [{lang}]: {str(e)}") finally: if os.path.exists(local_filename): os.remove(local_filename) # README.md の自動生成とアップロード(YAMLメタデータの安全な追記・統合ロジック) local_readme_filename = "tmp_README.md" try: with db_lock: with get_db_context() as conn: cursor = conn.cursor() cursor.execute("SELECT DISTINCT lang FROM visited_urls") db_langs = [r[0] for r in cursor.fetchall()] all_current_langs = sorted(list(set(INITIAL_LANGUAGES + db_langs))) # リモートから既存のREADME.mdの内容の取得を試みる existing_content = "" try: downloaded_path = hf_hub_download( repo_id=REPOS_ID, filename="README.md", repo_type="dataset", token=HF_TOKEN ) with open(downloaded_path, "r", encoding="utf-8") as rf: existing_content = rf.read() if os.path.exists(downloaded_path): os.remove(downloaded_path) except Exception: logger.info("No existing README.md found or couldn't download. Creating a new one.") # 既存の記述を破壊しないためのパース処理 existing_content_stripped = existing_content.lstrip() yaml_part = "" remaining_text = "" if existing_content_stripped.startswith("---"): parts = existing_content_stripped.split("---", 2) if len(parts) >= 3: yaml_part = parts[1] remaining_text = parts[2] else: remaining_text = existing_content else: remaining_text = existing_content # 既存のYAMLからconfigsセクションのみを除去して重複を防ぐ other_yaml_lines = [] in_configs = False if yaml_part: for line in yaml_part.splitlines(): if line.strip() and not (line.startswith(" ") or line.startswith("\t")): if line.rstrip().startswith("configs:"): in_configs = True continue else: in_configs = False if in_configs: continue other_yaml_lines.append(line) # フロントマターの再構成 new_yaml_content = "---\n" for line in other_yaml_lines: if line.strip(): new_yaml_content += line + "\n" # 動的なconfigs配列の書き込み new_yaml_content += "configs:\n" for idx, l in enumerate(all_current_langs): new_yaml_content += f" - config_name: {l}\n" new_yaml_content += " data_files:\n" new_yaml_content += " - split: train\n" new_yaml_content += f" path: {l}/*.parquet\n" new_yaml_content += "---\n" final_readme_content = new_yaml_content + remaining_text with open(local_readme_filename, "w", encoding="utf-8") as f: f.write(final_readme_content) uploaded_readme = False for attempt in range(3): try: api.upload_file( path_or_fileobj=local_readme_filename, path_in_repo="README.md", repo_id=REPOS_ID, repo_type="dataset" ) uploaded_readme = True break except Exception as upload_err: wait_time = 2 ** attempt logger.warning(f"README.md upload attempt {attempt+1} failed. Retrying in {wait_time}s... Error: {str(upload_err)}") time.sleep(wait_time) if not uploaded_readme: logger.error("Failed to upload README.md after multiple retries.") except Exception as e: logger.error(f"Failed to generate or upload README.md: {str(e)}") finally: if os.path.exists(local_readme_filename): os.remove(local_readme_filename) with db_lock: with get_db_context() as conn: conn.execute("UPDATE statistics SET value = ? WHERE key = 'last_commit_time'", (datetime.now().strftime("%Y-%m-%d %H:%M:%S"),)) # --- Thread Spawning --- t1 = threading.Thread(target=crawler_worker, args=(1,), name="Worker-1", daemon=True) t2 = threading.Thread(target=crawler_worker, args=(2,), name="Worker-2", daemon=True) tu = threading.Thread(target=hf_uploader_loop, name="Uploader", daemon=True) t1.start() t2.start() tu.start() # --- Gradio System Monitor Dashboard --- def get_ui_data(): try: stats, lang_counts = get_statistics() total = stats.get("total_processed", "0") last_commit = stats.get("last_commit_time", "Never") queue_size = stats.get("queue_count", "0") table_html = """
| Subset Language | Collected Documents Count |
|---|---|
| {lang} | {count:,} |