Spaces:
Starting
Starting
| # SPDX-License-Identifier: AGPL-3.0-or-later | |
| # Copyright (C) 2026 Vinay Umrethe and the heretic-project.org | |
| from __future__ import annotations | |
| import csv | |
| import html | |
| import json | |
| import math | |
| import os | |
| import re | |
| import subprocess | |
| import threading | |
| import time | |
| from collections.abc import Iterable | |
| from dataclasses import dataclass | |
| from datetime import UTC, datetime | |
| from pathlib import Path | |
| from typing import Any | |
| import gradio as gr | |
| from dotenv import load_dotenv | |
| from filelock import FileLock, Timeout | |
| from gradio.utils import get_upload_folder | |
| load_dotenv() | |
| def get_icon(slug: str | None, local_path: Path | None = None) -> str: | |
| src = ( | |
| f"/gradio_api/file={local_path.as_posix()}" | |
| if local_path is not None | |
| else f"https://cdn.simpleicons.org/{slug}/{SOCIAL_ICON_COLOR}" | |
| ) | |
| return f'<img class="hx-icon" src="{src}" alt="" loading="lazy">' | |
| APP_TITLE = "Heretic Grimoire" | |
| CLI_TIMEOUT_SECONDS = 2 * 60 * 60 | |
| RUN_ON_STARTUP = True | |
| AUTO_REFRESH_SECONDS = 600 | |
| SCHEDULE_HOURS_UTC = "0,12" | |
| SCHEDULE_MINUTE_UTC = 15 | |
| REPO_SHA_RE = re.compile(r"^(?P<repo>.+)-(?P<sha>[0-9a-f]{7,40})\.json$", re.IGNORECASE) | |
| ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]") | |
| BASE_DIR = Path(__file__).resolve().parent | |
| LOGO_PATH = BASE_DIR / "assets" / "Heretic-Grimoire-Logo.png" | |
| SOCIAL_ICON_COLOR = "f8fafc" | |
| SOCIAL_LINKS = [ | |
| { | |
| "label": "Homepage", | |
| "url": "https://heretic-project.org", | |
| "icon": get_icon(None, local_path=BASE_DIR / "assets" / "Heretic-Logo.png"), | |
| }, | |
| { | |
| "label": "GitHub", | |
| "url": "https://github.com/p-e-w/heretic", | |
| "icon": get_icon("github"), | |
| }, | |
| { | |
| "label": "Join Discord", | |
| "url": "https://discord.gg/gdXc48gSyT", | |
| "icon": get_icon("discord"), | |
| }, | |
| { | |
| "label": "Matrix", | |
| "url": "https://matrix.to/#/#heretic:matrix.org", | |
| "icon": get_icon("matrix"), | |
| }, | |
| ] | |
| TABLE_HEADERS = [ | |
| ("Heretic Model", "text"), | |
| ("Base Model", "text"), | |
| ("Created On", "date"), | |
| ("Version", "version"), | |
| ("KLD", "number"), | |
| ("Refusals", "number"), | |
| ("Base Refusals", "number"), | |
| ("Trials", "number"), | |
| ("Accelerator", "text"), | |
| ("JSON", "text"), | |
| ] | |
| INDEX_CSV_FIELDS = [ | |
| "index_generated_at", | |
| "index_count", | |
| "index_data_json_count", | |
| "index_invalid_count", | |
| ] | |
| RECORD_CSV_FIELDS = [ | |
| "source_repo", | |
| "source_repo_url", | |
| "reproduce_json_url", | |
| "source_commit_short", | |
| "base_model", | |
| "base_model_url", | |
| "base_model_commit", | |
| "timestamp", | |
| "reproduce_version", | |
| "heretic_version", | |
| "pytorch_version", | |
| "python_version", | |
| "os_platform", | |
| "accelerator", | |
| "kl_divergence", | |
| "refusals", | |
| "base_refusals", | |
| "n_bad_prompts", | |
| "direction_index", | |
| "row_normalization", | |
| "n_trials", | |
| "n_startup_trials", | |
| "seed", | |
| "good_prompts_dataset", | |
| "bad_prompts_dataset", | |
| "good_eval_dataset", | |
| "bad_eval_dataset", | |
| "local_path", | |
| ] | |
| CUSTOM_CSS = """ | |
| :root { | |
| --hx-bg: #05070d; | |
| --hx-bg-2: #0b1020; | |
| --hx-line: rgba(148, 163, 184, 0.20); | |
| --hx-text: #f8fafc; | |
| --hx-muted: rgba(226, 232, 240, 0.72); | |
| --hx-faint: rgba(148, 163, 184, 0.68); | |
| --hx-gold: #f59e0b; | |
| --hx-orange-2: #fb923c; | |
| --hx-green: #22c55e; | |
| --hx-red: #ef4444; | |
| --hx-font: Inter, ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; | |
| --hx-mono: "SFMono-Regular", Consolas, "Liberation Mono", Menlo, monospace; | |
| } | |
| html, body, .gradio-container { | |
| min-height: 100%; | |
| background: | |
| radial-gradient(circle at 50% 0%, rgba(245, 158, 11, 0.18), transparent 34rem), | |
| radial-gradient(circle at 90% 10%, rgba(249, 115, 22, 0.12), transparent 28rem), | |
| linear-gradient(135deg, var(--hx-bg), var(--hx-bg-2) 54%, #09090b); | |
| color: var(--hx-text); | |
| font-family: var(--hx-font) !important; | |
| } | |
| .gradio-container, | |
| .gradio-container .contain, | |
| .gradio-container .wrap, | |
| .gradio-container .main, | |
| .gradio-container main, | |
| .gradio-container .block { | |
| max-width: none !important; | |
| width: 100% !important; | |
| padding: 0 !important; | |
| margin: 0 !important; | |
| background: transparent !important; | |
| border: 0 !important; | |
| box-shadow: none !important; | |
| overflow: visible !important; | |
| } | |
| #app-shell-top { | |
| position: relative !important; | |
| z-index: 10 !important; | |
| overflow: visible !important; | |
| } | |
| .hx-app, .hx-hero { | |
| overflow: visible !important; | |
| } | |
| #bridge-search, | |
| #bridge-max-kl, | |
| #bridge-max-refusals, | |
| #bridge-refresh, | |
| #bridge-download { | |
| position: fixed !important; | |
| left: -10000px !important; | |
| top: auto !important; | |
| width: 1px !important; | |
| height: 1px !important; | |
| overflow: hidden !important; | |
| opacity: 0 !important; | |
| z-index: -1 !important; | |
| } | |
| #app-shell-top, #app-shell-top > div, #app-metrics, #app-metrics > div, #app-toolbar, #app-toolbar > div, #app-table, #app-table > div, #app-log, #app-log > div { width: 100% !important; max-width: none !important; margin: 0 !important; padding: 0 !important; } | |
| a, a:visited, | |
| .hx-table a, .hx-table a:visited, | |
| .hx-project-link, .hx-project-link:visited, | |
| .hx-sort { | |
| color: var(--hx-orange-2) !important; | |
| } | |
| a:hover, .hx-table a:hover, .hx-project-link:hover { | |
| color: var(--hx-gold) !important; | |
| } | |
| .hx-app { | |
| box-sizing: border-box; | |
| width: 100%; | |
| margin: 0; | |
| padding: clamp(.8rem, 2vw, 2rem) 0 0; | |
| } | |
| .hx-hero { | |
| width: 100%; | |
| box-sizing: border-box; | |
| display: flex; | |
| flex-wrap: wrap; | |
| justify-content: center; | |
| align-items: center; | |
| gap: clamp(.9rem, 3vw, 2.6rem); | |
| text-align: center; | |
| padding: clamp(.8rem, 2.8vw, 2.4rem) clamp(.6rem, 2vw, 1.2rem) clamp(5.5rem, 10vw, 7rem); | |
| } | |
| .hx-brand, | |
| .hx-projects { flex: 1 1 18rem; min-width: 0; } | |
| .hx-logo-wrap { flex: 0 0 auto; } | |
| .hx-brand { display: grid; justify-items: center; container-type: inline-size; } | |
| .hx-ascii { | |
| margin: 0; | |
| max-width: 100%; | |
| overflow: visible; | |
| color: var(--hx-gold); | |
| font-family: var(--hx-mono) !important; | |
| font-size: clamp(.74rem, 7.5cqw, 2.5rem); | |
| font-weight: 900; | |
| line-height: 1; | |
| white-space: pre; | |
| letter-spacing: -0.08em; | |
| text-shadow: | |
| 0 0 4px rgba(255, 247, 237, 0.20), | |
| 0 0 8px rgba(245, 158, 11, 0.45), | |
| 0 0 16px rgba(249, 115, 22, 0.25); | |
| filter: drop-shadow(0 0 6px rgba(249, 115, 22, 0.20)); | |
| } | |
| .hx-title-copy { | |
| max-width: 50rem; | |
| margin: 1rem auto 0; | |
| color: var(--hx-muted); | |
| font-size: clamp(.95rem, 1.5vw, 1.22rem); | |
| line-height: 1.58; | |
| } | |
| .hx-kicker { | |
| width: 100%; | |
| margin: 0; | |
| color: var(--hx-faint); | |
| font-size: .76rem; | |
| font-weight: 850; | |
| letter-spacing: .15em; | |
| text-align: center; | |
| text-transform: uppercase; | |
| } | |
| .hx-projects { | |
| display: flex; | |
| flex-wrap: wrap; | |
| align-content: center; | |
| align-items: center; | |
| justify-content: center; | |
| gap: .55rem .6rem; | |
| } | |
| .hx-project-link { | |
| display: inline-flex; | |
| align-items: center; | |
| justify-content: center; | |
| gap: .55rem; | |
| min-height: 2.6rem; | |
| max-width: 22rem; | |
| padding: .7rem .95rem; | |
| border: 1px solid rgba(148, 163, 184, .28); | |
| border-radius: 999px; | |
| background: rgba(2, 6, 23, .28); | |
| text-decoration: none !important; | |
| font-weight: 850; | |
| box-shadow: 0 10px 26px rgba(0,0,0,.18), 0 0 26px rgba(249,115,22,.10); | |
| transition: transform .16s ease, border-color .16s ease, box-shadow .16s ease; | |
| white-space: nowrap; | |
| } | |
| .hx-project-link:hover { transform: translateY(-1px); border-color: rgba(245,158,11,.70); box-shadow: 0 16px 34px rgba(0,0,0,.26), 0 0 34px rgba(249,115,22,.18); text-decoration: none !important; } | |
| .hx-icon { width: 1.08rem; height: 1.08rem; fill: currentColor; flex: 0 0 auto; } | |
| .hx-logo-wrap { | |
| display: grid; | |
| place-items: center; | |
| width: clamp(14rem, 26vw, 24rem); | |
| aspect-ratio: 1 / 1; | |
| filter: drop-shadow(0 0 28px rgba(251, 146, 60, 0.62)) drop-shadow(0 0 78px rgba(249, 115, 22, 0.32)); | |
| will-change: filter; | |
| transform: translate3d(0, 0, 0); | |
| overflow: visible !important; | |
| } | |
| .hx-logo { | |
| width: 100%; | |
| height: 100%; | |
| object-fit: contain; | |
| } | |
| .hx-toolbar { | |
| display: grid; | |
| grid-template-columns: minmax(min(100%, 22rem), 1fr) repeat(2, minmax(min(100%, 9rem), .22fr)) auto; | |
| gap: .7rem; | |
| align-items: end; | |
| padding: clamp(1rem, 2vw, 1.45rem) clamp(.35rem, 1vw, .9rem) .9rem; | |
| border-bottom: 1px solid var(--hx-line); | |
| background: rgba(2, 6, 23, .25); | |
| } | |
| .hx-field { display: grid; gap: .34rem; min-width: 0; } | |
| .hx-field span { color: var(--hx-orange-2); font-size: .72rem; font-weight: 850; letter-spacing: .08em; text-transform: uppercase; } | |
| .hx-input { | |
| width: 100%; | |
| min-height: 2.78rem; | |
| box-sizing: border-box; | |
| border: 1px solid rgba(148, 163, 184, .24); | |
| border-radius: 999px; | |
| padding: .72rem .92rem; | |
| color: var(--hx-text); | |
| background: rgba(2, 6, 23, .50); | |
| outline: none; | |
| font: 750 .94rem/1.2 var(--hx-font); | |
| transition: border-color .16s ease, box-shadow .16s ease, background .16s ease; | |
| } | |
| .hx-input::placeholder { color: rgba(148,163,184,.56); } | |
| .hx-input:focus { border-color: rgba(249,115,22,.66); box-shadow: 0 0 0 4px rgba(249,115,22,.14); background: rgba(2, 6, 23, .72); } | |
| .hx-actions { display: flex; flex-wrap: wrap; gap: .55rem; justify-content: flex-end; align-items: stretch; } | |
| .hx-btn { | |
| appearance: none; | |
| display: inline-flex; | |
| align-items: center; | |
| justify-content: center; | |
| box-sizing: border-box; | |
| min-height: 2.78rem; | |
| height: 2.78rem; | |
| border: 1px solid rgba(148, 163, 184, .28); | |
| border-radius: 999px; | |
| padding: .74rem 1rem; | |
| color: var(--hx-text) !important; | |
| background: rgba(15, 23, 42, .72); | |
| cursor: pointer; | |
| font: 850 .9rem/1 var(--hx-font); | |
| box-shadow: 0 12px 28px rgba(0,0,0,.22); | |
| transition: transform .16s ease, border-color .16s ease, background .16s ease, box-shadow .16s ease; | |
| } | |
| .hx-btn:hover { transform: translateY(-1px); border-color: rgba(245,158,11,.74); background: rgba(249,115,22,.12); box-shadow: 0 16px 34px rgba(0,0,0,.30), 0 0 24px rgba(249,115,22,.14); } | |
| .hx-metrics { | |
| display: grid; | |
| grid-template-columns: repeat(auto-fit, minmax(min(100%, 12rem), 1fr)); | |
| gap: .7rem; | |
| padding: clamp(1rem, 2vw, 1.45rem) clamp(.35rem, 1vw, .9rem) .9rem; | |
| } | |
| .hx-card { | |
| min-width: 0; | |
| padding: .95rem; | |
| border: 1px solid rgba(148,163,184,.20); | |
| border-radius: 16px; | |
| background: linear-gradient(180deg, rgba(148,163,184,.09), rgba(148,163,184,.04)); | |
| } | |
| .hx-card-label { color: var(--hx-orange-2); font-size: .72rem; font-weight: 850; letter-spacing: .08em; text-transform: uppercase; } | |
| .hx-card-value { margin-top: .22rem; color: var(--hx-text); font-size: clamp(1.3rem, 2.4vw, 2rem); font-weight: 950; font-variant-numeric: tabular-nums; } | |
| .hx-status-ok { color: var(--hx-green); font-weight: 950; } | |
| .hx-status-bad { color: var(--hx-red); font-weight: 950; } | |
| .hx-status-running { color: var(--hx-orange-2); font-weight: 950; } | |
| .hx-table-wrap { | |
| width: 100% !important; | |
| max-height: 650px !important; | |
| overflow: auto !important; | |
| display: block !important; | |
| scrollbar-width: thin; | |
| scrollbar-color: rgba(249, 115, 22, 0.42) rgba(15, 23, 42, 0.24); | |
| -webkit-overflow-scrolling: touch; | |
| } | |
| .hx-table { width: 100%; min-width: max(100%, 900px); border-collapse: separate; border-spacing: 0; color: var(--hx-text); font-size: clamp(.82rem, .9vw, .92rem); line-height: 1.36; border: none !important; } | |
| .hx-table th { position: sticky; top: 0; z-index: 5; padding: .72rem .62rem; text-align: left; border: none !important; border-bottom: 1px solid rgba(249,115,22,.40) !important; background: #0f172a !important; box-shadow: 0 10px 20px rgba(0,0,0,.18); } | |
| .hx-table td { padding: .64rem .62rem; vertical-align: top; border: none !important; border-bottom: 1px solid rgba(148,163,184,.06) !important; text-align: left !important; text-indent: 0 !important; } | |
| .hx-table tbody tr:hover td { background: rgba(249,115,22,.06); } | |
| .hx-sort { all: unset; cursor: pointer; display: inline-flex; align-items: center; gap: .4rem; font-weight: 900; white-space: nowrap; transition: color .2s ease; } | |
| .hx-sort:hover { color: var(--hx-gold); } | |
| .hx-sort::after { content: ""; display: inline-block; width: .8rem; height: .8rem; opacity: 0.6; background-image: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 16 16' fill='none' stroke='%2394a3b8' stroke-width='2' stroke-linecap='round' stroke-linejoin='round'%3E%3Cpath d='M5 6.5L8 3.5L11 6.5M5 9.5L8 12.5L11 9.5'/%3E%3C/svg%3E"); background-size: contain; background-repeat: no-repeat; background-position: center; transition: all .2s ease; } | |
| .hx-sort:hover::after { opacity: 1; background-image: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 16 16' fill='none' stroke='%23f59e0b' stroke-width='2' stroke-linecap='round' stroke-linejoin='round'%3E%3Cpath d='M5 6.5L8 3.5L11 6.5M5 9.5L8 12.5L11 9.5'/%3E%3C/svg%3E"); } | |
| .hx-sort[data-sort-dir="asc"]::after { opacity: 1; background-image: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 16 16' fill='none' stroke='%23f59e0b' stroke-width='2.5' stroke-linecap='round' stroke-linejoin='round'%3E%3Cpath d='M4 10l4-4 4 4'/%3E%3C/svg%3E"); } | |
| .hx-sort[data-sort-dir="desc"]::after { opacity: 1; background-image: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 16 16' fill='none' stroke='%23f59e0b' stroke-width='2.5' stroke-linecap='round' stroke-linejoin='round'%3E%3Cpath d='M4 6l4 4 4-4'/%3E%3C/svg%3E"); } | |
| .hx-table a { text-decoration: none !important; font-weight: 850; padding: 0 !important; margin: 0 !important; display: inline !important; word-break: break-all !important; } | |
| .hx-table a:hover { text-decoration: underline !important; } | |
| .hx-num { text-align: right; font-variant-numeric: tabular-nums; white-space: nowrap; } | |
| .hx-center, .hx-table th.hx-center { text-align: center; } | |
| .hx-center .hx-num { text-align: center; } | |
| .hx-center .hx-sort { justify-content: center; } | |
| .hx-mono { font-family: var(--hx-mono) !important; font-size: .84rem; } | |
| .hx-empty { margin: .85rem clamp(.35rem, 1vw, .9rem); padding: 1rem; border: 1px dashed rgba(249,115,22,.35); border-radius: 16px; color: var(--hx-muted); background: rgba(2, 6, 23, .22); } | |
| .hx-log { margin: .85rem clamp(.35rem, 1vw, .9rem) .9rem; border: 1px solid rgba(148,163,184,.20); border-radius: 16px; background: rgba(2,6,23,.50); overflow: hidden; } | |
| .hx-log-head { | |
| display: flex; | |
| flex-wrap: wrap; | |
| align-items: center; | |
| justify-content: space-between; | |
| gap: .55rem 1rem; | |
| padding: .82rem .95rem; | |
| border-bottom: 1px solid rgba(148,163,184,.15); | |
| } | |
| .hx-log-title { color: var(--hx-orange-2); font-size: .8rem; font-weight: 700; letter-spacing: .11em; text-transform: uppercase; } | |
| .hx-log-status { color: var(--hx-muted); font-size: .86rem; font-weight: 400; line-height: 1.35; } | |
| .hx-log-status .hx-status-ok, .hx-log-status .hx-status-bad, .hx-log-status .hx-status-running { font-weight: 400; } | |
| .hx-log pre { min-height: 10rem; max-height: 17rem; overflow: auto; margin: 0; padding: .95rem; color: rgba(226,232,240,.86); font: 400 .84rem/1.55 var(--hx-mono); white-space: pre-wrap; } | |
| @media (max-width: 760px) { | |
| .hx-toolbar { grid-template-columns: 1fr; } | |
| .hx-actions { justify-content: stretch; } | |
| .hx-btn { flex: 1 1 9rem; } | |
| .hx-project-link { max-width: 100%; } | |
| .hx-logo-wrap { width: clamp(12rem, 70vw, 20rem); } | |
| } | |
| """ | |
| TABLE_SORT_JS = r""" | |
| (() => { | |
| const getInput = (id) => document.querySelector(`#${id} textarea, #${id} input`); | |
| const setBridgeValue = (id, value) => { | |
| const input = getInput(id); | |
| if (!input) return false; | |
| const nativeSetter = Object.getOwnPropertyDescriptor( | |
| Object.getPrototypeOf(input), | |
| "value" | |
| )?.set; | |
| if (nativeSetter) nativeSetter.call(input, value); | |
| else input.value = value; | |
| input.dispatchEvent(new InputEvent("input", { bubbles: true, inputType: "insertText", data: null })); | |
| input.dispatchEvent(new Event("change", { bubbles: true })); | |
| return true; | |
| }; | |
| const clickBridge = (id) => { | |
| const button = document.querySelector(`#${id} button, #${id} a, #${id}`); | |
| if (button) button.click(); | |
| }; | |
| const debounce = (fn, wait = 180) => { | |
| let handle = null; | |
| return (...args) => { | |
| window.clearTimeout(handle); | |
| handle = window.setTimeout(() => fn(...args), wait); | |
| }; | |
| }; | |
| window.sortHereticTable = function sortHereticTable(columnIndex, type, button) { | |
| const table = button.closest("table"); | |
| const tbody = table?.querySelector("tbody"); | |
| if (!table || !tbody) return; | |
| const rows = Array.from(tbody.querySelectorAll("tr")); | |
| const nextDir = button.dataset.sortDir === "asc" ? "desc" : "asc"; | |
| table.querySelectorAll(".hx-sort").forEach((item) => { item.dataset.sortDir = ""; }); | |
| button.dataset.sortDir = nextDir; | |
| const parseValue = (row) => { | |
| const cell = row.children[columnIndex]; | |
| const text = (cell?.innerText || "").trim(); | |
| const sortValue = cell?.dataset?.sortValue; | |
| if (type === "number") { | |
| const source = sortValue || text; | |
| const match = source.match(/-?\d+(?:\.\d+)?/); | |
| return match ? Number(match[0]) : Number.POSITIVE_INFINITY; | |
| } | |
| if (type === "date") { | |
| const timestamp = Number(sortValue || 0); | |
| return Number.isFinite(timestamp) ? timestamp : 0; | |
| } | |
| if (type === "version") { | |
| if (!text || text === "—") return [Number.NEGATIVE_INFINITY]; | |
| return text.split(".").map((part) => Number(part.match(/^\d+/)?.[0] || 0)); | |
| } | |
| return text.toLowerCase(); | |
| }; | |
| const compareValues = (left, right) => { | |
| if (Array.isArray(left) && Array.isArray(right)) { | |
| const length = Math.max(left.length, right.length); | |
| for (let index = 0; index < length; index += 1) { | |
| const leftPart = left[index] ?? 0; | |
| const rightPart = right[index] ?? 0; | |
| if (leftPart < rightPart) return -1; | |
| if (leftPart > rightPart) return 1; | |
| } | |
| return 0; | |
| } | |
| if (left < right) return -1; | |
| if (left > right) return 1; | |
| return 0; | |
| }; | |
| rows.sort((left, right) => { | |
| const order = compareValues(parseValue(left), parseValue(right)); | |
| return nextDir === "asc" ? order : -order; | |
| }); | |
| rows.forEach((row) => tbody.appendChild(row)); | |
| }; | |
| const pushAllBridgeValues = () => { | |
| document.querySelectorAll("[data-bridge-input]").forEach((input) => { | |
| setBridgeValue(input.dataset.bridgeInput, input.value || ""); | |
| }); | |
| }; | |
| const syncBridgeValues = debounce(pushAllBridgeValues, 200); | |
| const visibleInput = (id) => document.querySelector(`[data-bridge-input="${id}"]`); | |
| const sanitizeNumericInput = (input, allowDecimal) => { | |
| const original = input.value; | |
| const pattern = allowDecimal ? /[^0-9.]/g : /[^0-9]/g; | |
| let next = original.replace(pattern, ""); | |
| if (allowDecimal) { | |
| const firstDot = next.indexOf("."); | |
| if (firstDot !== -1) { | |
| next = next.slice(0, firstDot + 1) + next.slice(firstDot + 1).split(".").join(""); | |
| } | |
| } | |
| if (next === original) return; | |
| const caret = Math.max(0, (input.selectionStart ?? next.length) - (original.length - next.length)); | |
| input.value = next; | |
| input.setSelectionRange(caret, caret); | |
| }; | |
| const parseLimit = (value) => { | |
| if (!value) return null; | |
| const number = Number(value); | |
| return Number.isNaN(number) ? null : number; | |
| }; | |
| const findMetricValue = (label) => { | |
| const card = Array.from(document.querySelectorAll(".hx-card")).find( | |
| (item) => item.querySelector(".hx-card-label")?.textContent.trim() === label | |
| ); | |
| return card?.querySelector(".hx-card-value") || null; | |
| }; | |
| const applyTableFilters = () => { | |
| const tableWrap = document.getElementById("app-table"); | |
| const tbody = tableWrap?.querySelector(".hx-table tbody"); | |
| if (!tableWrap || !tbody) return; | |
| const search = (visibleInput("bridge-search")?.value || "").trim().toLowerCase(); | |
| const maxKl = parseLimit(visibleInput("bridge-max-kl")?.value); | |
| const maxRefusals = parseLimit(visibleInput("bridge-max-refusals")?.value); | |
| let visible = 0; | |
| tbody.querySelectorAll("tr").forEach((row) => { | |
| let show = true; | |
| if (search && !(row.dataset.search || "").includes(search)) show = false; | |
| if (show && maxKl !== null) { | |
| show = row.dataset.kl !== undefined && Number(row.dataset.kl) <= maxKl; | |
| } | |
| if (show && maxRefusals !== null) { | |
| show = row.dataset.refusals !== undefined && Number(row.dataset.refusals) <= maxRefusals; | |
| } | |
| row.style.display = show ? "" : "none"; | |
| if (show) visible += 1; | |
| }); | |
| const emptyMessage = tableWrap.querySelector("[data-filter-empty]"); | |
| if (emptyMessage) emptyMessage.hidden = visible !== 0; | |
| const visibleValue = findMetricValue("Visible after filters"); | |
| if (visibleValue) { | |
| const text = visible.toLocaleString(); | |
| if (visibleValue.textContent !== text) visibleValue.textContent = text; | |
| } | |
| }; | |
| const resetSortIndicators = () => { | |
| document.querySelectorAll(".hx-sort").forEach((button) => { | |
| button.removeAttribute("data-sort-dir"); | |
| }); | |
| }; | |
| const observeTable = () => { | |
| const tableWrap = document.getElementById("app-table"); | |
| if (!tableWrap || tableWrap.dataset.filterObserverBound === "true") return; | |
| tableWrap.dataset.filterObserverBound = "true"; | |
| new MutationObserver(applyTableFilters).observe(tableWrap, { | |
| childList: true, | |
| subtree: true, | |
| }); | |
| }; | |
| const syncCustomInputs = () => { | |
| document.querySelectorAll("[data-bridge-input]").forEach((input) => { | |
| if (input.dataset.bound === "true") return; | |
| input.dataset.bound = "true"; | |
| const onInput = () => { | |
| if (input.dataset.bridgeInput === "bridge-max-kl") sanitizeNumericInput(input, true); | |
| if (input.dataset.bridgeInput === "bridge-max-refusals") sanitizeNumericInput(input, false); | |
| applyTableFilters(); | |
| syncBridgeValues(); | |
| }; | |
| input.addEventListener("input", onInput); | |
| input.addEventListener("change", onInput); | |
| input.addEventListener("keydown", (event) => { | |
| if (event.key === "Enter") event.preventDefault(); | |
| }); | |
| }); | |
| document.querySelectorAll("[data-bridge-click]").forEach((button) => { | |
| if (button.dataset.bound === "true") return; | |
| button.dataset.bound = "true"; | |
| button.addEventListener("click", () => { | |
| if (button.dataset.bridgeClick === "bridge-refresh") resetSortIndicators(); | |
| pushAllBridgeValues(); | |
| clickBridge(button.dataset.bridgeClick); | |
| }); | |
| }); | |
| observeTable(); | |
| }; | |
| syncCustomInputs(); | |
| applyTableFilters(); | |
| new MutationObserver(syncCustomInputs).observe(document.documentElement, { | |
| childList: true, | |
| subtree: true, | |
| }); | |
| })(); | |
| """ | |
| class AppPaths: | |
| data_root: Path | |
| data_dir: Path | |
| index_file: Path | |
| status_file: Path | |
| log_file: Path | |
| lock_file: Path | |
| def from_env(cls) -> AppPaths: | |
| default_root = ( | |
| Path("/data/heretic-reproducibles") | |
| if Path("/data").exists() | |
| else Path("data") | |
| ) | |
| data_root = ( | |
| Path(os.getenv("DATA_ROOT", str(default_root))).expanduser().resolve() | |
| ) | |
| data_dir = ( | |
| Path(os.getenv("DATA_DIR", str(data_root / "data"))).expanduser().resolve() | |
| ) | |
| return cls( | |
| data_root=data_root, | |
| data_dir=data_dir, | |
| index_file=Path(os.getenv("INDEX_FILE", str(data_root / "index.json"))) | |
| .expanduser() | |
| .resolve(), | |
| status_file=Path(os.getenv("STATUS_FILE", str(data_root / "status.json"))) | |
| .expanduser() | |
| .resolve(), | |
| log_file=Path(os.getenv("LOG_FILE", str(data_root / "collector.log"))) | |
| .expanduser() | |
| .resolve(), | |
| lock_file=Path(os.getenv("LOCK_FILE", str(data_root / ".collector.lock"))) | |
| .expanduser() | |
| .resolve(), | |
| ) | |
| def ensure(self) -> None: | |
| self.data_root.mkdir(parents=True, exist_ok=True) | |
| self.data_dir.mkdir(parents=True, exist_ok=True) | |
| for path in [ | |
| self.index_file, | |
| self.status_file, | |
| self.log_file, | |
| self.lock_file, | |
| ]: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| PATHS = AppPaths.from_env() | |
| PATHS.ensure() | |
| def utc_now() -> datetime: | |
| return datetime.now(UTC) | |
| def iso_now() -> str: | |
| return utc_now().replace(microsecond=0).isoformat() | |
| def read_json(path: Path, default: Any) -> Any: | |
| try: | |
| if not path.exists(): | |
| return default | |
| return json.loads(path.read_text(encoding="utf-8")) | |
| except (OSError, json.JSONDecodeError, UnicodeDecodeError): | |
| return default | |
| def write_json_atomic(path: Path, value: Any) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| tmp = path.with_suffix(path.suffix + ".tmp") | |
| tmp.write_text(json.dumps(value, ensure_ascii=False, indent=2), encoding="utf-8") | |
| tmp.replace(path) | |
| def clean_log_text(value: Any) -> str: | |
| text = ANSI_RE.sub("", str(value)).replace("\r", "") | |
| text = text.encode("utf-8", errors="replace").decode("utf-8", errors="replace") | |
| return " ".join(line.strip() for line in text.splitlines() if line.strip()) | |
| def append_log(message: str, **fields: Any) -> None: | |
| PATHS.ensure() | |
| suffix = "" | |
| if fields: | |
| compact = json.dumps( | |
| fields, ensure_ascii=False, sort_keys=True, separators=(",", ":") | |
| ) | |
| suffix = " " + compact | |
| line = f"[{iso_now()}] {clean_log_text(message)}{suffix}\n" | |
| with PATHS.log_file.open("a", encoding="utf-8", errors="replace") as handle: | |
| handle.write(line) | |
| def clear_log_file() -> None: | |
| PATHS.ensure() | |
| PATHS.log_file.write_text("", encoding="utf-8") | |
| def read_log_tail(max_lines: int = 100) -> str: | |
| if not PATHS.log_file.exists(): | |
| return "" | |
| try: | |
| lines = PATHS.log_file.read_text( | |
| encoding="utf-8", errors="replace" | |
| ).splitlines() | |
| return "\n".join(lines[-max_lines:]) | |
| except OSError as exc: | |
| return f"Could not read log: {exc}" | |
| def load_status() -> dict[str, Any]: | |
| base = { | |
| "state": "idle", | |
| "last_started_at": None, | |
| "last_finished_at": None, | |
| "last_ok": None, | |
| "last_error": None, | |
| "last_summary": {}, | |
| } | |
| loaded = read_json(PATHS.status_file, {}) | |
| if isinstance(loaded, dict): | |
| base.update(loaded) | |
| return base | |
| def set_status(**updates: Any) -> dict[str, Any]: | |
| current = load_status() | |
| current.update(updates) | |
| write_json_atomic(PATHS.status_file, current) | |
| return current | |
| def get_nested(data: Any, *keys: str, default: Any = None) -> Any: | |
| current = data | |
| for key in keys: | |
| if not isinstance(current, dict) or key not in current: | |
| return default | |
| current = current[key] | |
| return current | |
| def round_float(value: Any, digits: int = 6) -> Any: | |
| if isinstance(value, (int, float)) and not isinstance(value, bool): | |
| return round(float(value), digits) | |
| return value | |
| def format_accelerator(data: dict[str, Any]) -> str | None: | |
| accelerators = get_nested(data, "system", "accelerators", default={}) | |
| if not isinstance(accelerators, dict): | |
| return None | |
| acc_type = accelerators.get("type") | |
| devices = accelerators.get("devices") | |
| if isinstance(devices, list) and devices: | |
| device_counts: dict[str, int] = {} | |
| device_order: list[str] = [] | |
| for item in devices: | |
| if not isinstance(item, dict): | |
| continue | |
| name = item.get("name") or "unknown" | |
| vram = item.get("vram_gb") | |
| dev_str = ( | |
| f"{name} ({float(vram):.1f} GB)" | |
| if isinstance(vram, (int, float)) | |
| else str(name) | |
| ) | |
| if dev_str not in device_counts: | |
| device_counts[dev_str] = 0 | |
| device_order.append(dev_str) | |
| device_counts[dev_str] += 1 | |
| formatted_devices: list[str] = [] | |
| for dev_str in device_order: | |
| count = device_counts[dev_str] | |
| if count > 1: | |
| formatted_devices.append(f"{count} x {dev_str}") | |
| else: | |
| formatted_devices.append(dev_str) | |
| if formatted_devices: | |
| return f"{acc_type or 'accelerator'}: " + ", ".join(formatted_devices) | |
| return str(acc_type) if acc_type else None | |
| def iter_data_json_files(data_dir: Path) -> Iterable[Path]: | |
| official_root = data_dir / "huggingface.co" | |
| if not official_root.exists(): | |
| return [] | |
| return sorted(path for path in official_root.glob("*/*.json") if path.is_file()) | |
| def count_data_json_files(data_dir: Path) -> int: | |
| return sum(1 for _ in iter_data_json_files(data_dir)) | |
| def newest_data_mtime(data_dir: Path) -> float: | |
| mtimes = [path.stat().st_mtime for path in iter_data_json_files(data_dir)] | |
| return max(mtimes) if mtimes else 0.0 | |
| def infer_repo_and_commit( | |
| data_file: Path, data_dir: Path | |
| ) -> tuple[str | None, str | None, str]: | |
| local_path = data_file.as_posix() | |
| try: | |
| rel = data_file.relative_to(data_dir / "huggingface.co") | |
| if len(rel.parts) != 2: | |
| return None, None, local_path | |
| owner = rel.parts[0] | |
| filename = rel.parts[1] | |
| match = REPO_SHA_RE.match(filename) | |
| if not match: | |
| return f"{owner}/{data_file.stem}", None, local_path | |
| return f"{owner}/{match.group('repo')}", match.group("sha"), local_path | |
| except ValueError: | |
| return None, None, local_path | |
| def normalize_record( | |
| data_file: Path, data_dir: Path | |
| ) -> tuple[dict[str, Any] | None, str | None]: | |
| try: | |
| payload = json.loads(data_file.read_text(encoding="utf-8")) | |
| except (OSError, json.JSONDecodeError, UnicodeDecodeError) as exc: | |
| return None, f"{data_file.name}: invalid JSON: {exc}" | |
| if not isinstance(payload, dict): | |
| return None, f"{data_file.name}: root value is not an object" | |
| repo_id, repo_commit_short, local_path = infer_repo_and_commit(data_file, data_dir) | |
| settings = ( | |
| payload.get("settings") if isinstance(payload.get("settings"), dict) else {} | |
| ) | |
| metrics = payload.get("metrics") if isinstance(payload.get("metrics"), dict) else {} | |
| params = ( | |
| payload.get("parameters") if isinstance(payload.get("parameters"), dict) else {} | |
| ) | |
| base_model = ( | |
| settings.get("model") or payload.get("base_model") or payload.get("model") | |
| ) | |
| source_url = f"https://huggingface.co/{repo_id}" if repo_id else None | |
| json_url = ( | |
| f"{source_url}/blob/main/reproduce/reproduce.json" if source_url else None | |
| ) | |
| base_model_url = ( | |
| f"https://huggingface.co/{base_model}" | |
| if isinstance(base_model, str) and "/" in base_model | |
| else None | |
| ) | |
| refusals = metrics.get("refusals") | |
| n_bad = metrics.get("n_bad_prompts") | |
| return { | |
| "source_repo": repo_id, | |
| "source_repo_url": source_url, | |
| "reproduce_json_url": json_url, | |
| "source_commit_short": repo_commit_short, | |
| "base_model": base_model, | |
| "base_model_url": base_model_url, | |
| "base_model_commit": settings.get("model_commit") | |
| or payload.get("base_model_commit"), | |
| "timestamp": payload.get("timestamp") or payload.get("created_at"), | |
| "reproduce_version": payload.get("version"), | |
| "heretic_version": get_nested(payload, "environment", "heretic", "version") | |
| or payload.get("heretic_version"), | |
| "pytorch_version": get_nested(payload, "environment", "pytorch_version"), | |
| "python_version": get_nested(payload, "system", "python", "version"), | |
| "os_platform": get_nested(payload, "system", "os", "platform"), | |
| "accelerator": format_accelerator(payload), | |
| "kl_divergence": round_float(metrics.get("kl_divergence") or metrics.get("kl")), | |
| "refusals": refusals, | |
| "base_refusals": metrics.get("base_refusals"), | |
| "n_bad_prompts": n_bad, | |
| "direction_index": round_float(params.get("direction_index")), | |
| "row_normalization": settings.get("row_normalization"), | |
| "n_trials": settings.get("n_trials"), | |
| "n_startup_trials": settings.get("n_startup_trials"), | |
| "seed": settings.get("seed"), | |
| "good_prompts_dataset": get_nested(settings, "good_prompts", "dataset"), | |
| "bad_prompts_dataset": get_nested(settings, "bad_prompts", "dataset"), | |
| "good_eval_dataset": get_nested(settings, "good_evaluation_prompts", "dataset"), | |
| "bad_eval_dataset": get_nested(settings, "bad_evaluation_prompts", "dataset"), | |
| "local_path": local_path, | |
| }, None | |
| def record_key(record: dict[str, Any]) -> str: | |
| repo = record.get("source_repo") | |
| commit = record.get("source_commit_short") | |
| if repo and commit: | |
| return f"{repo}@{commit}" | |
| local_path = record.get("local_path") | |
| if local_path: | |
| return str(local_path) | |
| return json.dumps(record, ensure_ascii=False, sort_keys=True) | |
| def merge_index_records( | |
| existing_records: list[dict[str, Any]], scanned_records: list[dict[str, Any]] | |
| ) -> tuple[list[dict[str, Any]], int]: | |
| existing = { | |
| record_key(record): record | |
| for record in existing_records | |
| if isinstance(record, dict) | |
| } | |
| scanned = {record_key(record): record for record in scanned_records} | |
| merged = {**existing, **scanned} | |
| retained_count = len(set(existing) - set(scanned)) | |
| return sorted( | |
| merged.values(), | |
| key=lambda item: str(item.get("timestamp") or ""), | |
| reverse=True, | |
| ), retained_count | |
| def build_index() -> dict[str, Any]: | |
| PATHS.ensure() | |
| existing_index = read_json(PATHS.index_file, {}) | |
| existing_records = ( | |
| existing_index.get("records", []) if isinstance(existing_index, dict) else [] | |
| ) | |
| existing_records = existing_records if isinstance(existing_records, list) else [] | |
| scanned_records: list[dict[str, Any]] = [] | |
| errors: list[str] = [] | |
| for data_file in iter_data_json_files(PATHS.data_dir): | |
| record, error = normalize_record(data_file, PATHS.data_dir) | |
| if record is not None: | |
| scanned_records.append(record) | |
| if error is not None: | |
| errors.append(error) | |
| records, retained_count = merge_index_records(existing_records, scanned_records) | |
| payload = { | |
| "generated_at": iso_now(), | |
| "count": len(records), | |
| "data_json_count": count_data_json_files(PATHS.data_dir), | |
| "invalid_count": len(errors), | |
| "errors": errors[:200], | |
| "records": records, | |
| } | |
| write_json_atomic(PATHS.index_file, payload) | |
| append_log( | |
| "index rebuilt", | |
| records=len(records), | |
| scanned=len(scanned_records), | |
| retained=retained_count, | |
| invalid=len(errors), | |
| ) | |
| return payload | |
| def index_is_stale(index: dict[str, Any]) -> bool: | |
| data_count = count_data_json_files(PATHS.data_dir) | |
| if data_count == 0: | |
| return not PATHS.index_file.exists() | |
| if not PATHS.index_file.exists(): | |
| return True | |
| if not isinstance(index.get("records"), list): | |
| return True | |
| if int(index.get("data_json_count") or 0) != data_count: | |
| return True | |
| if len(index.get("records") or []) == 0 and data_count > 0: | |
| return True | |
| return newest_data_mtime(PATHS.data_dir) > PATHS.index_file.stat().st_mtime | |
| def load_index() -> dict[str, Any]: | |
| index = read_json(PATHS.index_file, {}) | |
| if not isinstance(index, dict): | |
| index = {} | |
| if index_is_stale(index): | |
| return build_index() | |
| if isinstance(index.get("records"), list): | |
| return index | |
| return build_index() | |
| def parse_cli_summary(output: str) -> dict[str, int | None]: | |
| summary: dict[str, int | None] = { | |
| "found": None, | |
| "downloaded": None, | |
| "already_stored": None, | |
| } | |
| patterns = { | |
| "found": r"Found:\s*(\d+)", | |
| "downloaded": r"Downloaded:\s*(\d+)", | |
| "already_stored": r"Already stored:\s*(\d+)", | |
| } | |
| cleaned = clean_log_text(output) | |
| for key, pattern in patterns.items(): | |
| match = re.search(pattern, cleaned, flags=re.IGNORECASE) | |
| if match: | |
| summary[key] = int(match.group(1)) | |
| return summary | |
| def run_heretic_collect() -> dict[str, Any]: | |
| cmd = [ | |
| os.getenv("HERETIC_BIN", "heretic"), | |
| "--collect-reproducibles", | |
| str(PATHS.data_dir), | |
| ] | |
| append_log("cli collection started") | |
| started = time.perf_counter() | |
| env = { | |
| **os.environ, | |
| "PYTHONUNBUFFERED": "1", | |
| "PYTHONUTF8": "1", | |
| "PYTHONIOENCODING": "utf-8", | |
| "NO_COLOR": "1", | |
| "TERM": "dumb", | |
| } | |
| completed = subprocess.run( | |
| cmd, | |
| cwd=str(PATHS.data_root), | |
| text=True, | |
| encoding="utf-8", | |
| errors="replace", | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| timeout=CLI_TIMEOUT_SECONDS, | |
| env=env, | |
| check=False, | |
| ) | |
| elapsed = round(time.perf_counter() - started, 3) | |
| output = completed.stdout or "" | |
| if completed.returncode != 0: | |
| tail = re.sub(r"[╭╮╰╯│─━═┌┐└┘├┤┬┴┼]+", " ", clean_log_text(output)) | |
| tail = re.sub(r"\s+", " ", tail).strip()[-900:] | |
| append_log("cli collection failed", code=completed.returncode, output_tail=tail) | |
| raise RuntimeError(f"Heretic CLI failed with exit code {completed.returncode}") | |
| summary = parse_cli_summary(output) | |
| append_log( | |
| "cli collection completed", | |
| **{k: v for k, v in summary.items() if v is not None}, | |
| seconds=elapsed, | |
| ) | |
| return {"method": "heretic_cli", "elapsed_seconds": elapsed, **summary} | |
| def collect_and_index() -> dict[str, Any]: | |
| PATHS.ensure() | |
| lock = FileLock(str(PATHS.lock_file), timeout=2) | |
| try: | |
| with lock: | |
| clear_log_file() | |
| set_status( | |
| state="running", | |
| last_started_at=iso_now(), | |
| last_finished_at=None, | |
| last_ok=None, | |
| last_error=None, | |
| ) | |
| append_log("collection started") | |
| try: | |
| result = run_heretic_collect() | |
| index = build_index() | |
| summary = { | |
| **result, | |
| "data_json_files": index.get("data_json_count", 0), | |
| "indexed_records": index.get("count", 0), | |
| "invalid_records": index.get("invalid_count", 0), | |
| "index_generated_at": index.get("generated_at"), | |
| } | |
| set_status( | |
| state="idle", | |
| last_finished_at=iso_now(), | |
| last_ok=True, | |
| last_summary=summary, | |
| last_error=None, | |
| ) | |
| append_log( | |
| "collection completed", | |
| indexed=summary["indexed_records"], | |
| data=summary["data_json_files"], | |
| invalid=summary["invalid_records"], | |
| ) | |
| return summary | |
| except Exception as exc: | |
| error = clean_log_text(exc) | |
| set_status( | |
| state="idle", | |
| last_finished_at=iso_now(), | |
| last_ok=False, | |
| last_error=error, | |
| ) | |
| append_log("collection failed", error=error) | |
| raise | |
| except Timeout: | |
| append_log("collection skipped; another run is active") | |
| return { | |
| "method": "locked", | |
| "message": "Another collection is already running.", | |
| "status": load_status(), | |
| } | |
| def parse_iso_datetime(value: str | None) -> datetime | None: | |
| if not value: | |
| return None | |
| try: | |
| return datetime.fromisoformat(str(value).replace("Z", "+00:00")) | |
| except (TypeError, ValueError): | |
| return None | |
| def datetime_sort_value(value: str | None) -> str: | |
| dt = parse_iso_datetime(value) | |
| return str(dt.timestamp()) if dt else "0" | |
| def to_display_time(value: str | None) -> str: | |
| dt = parse_iso_datetime(value) | |
| if dt is None: | |
| return "—" if not value else str(value) | |
| return dt.strftime("%d-%m-%Y %I:%M %p") | |
| def html_text(value: Any) -> str: | |
| if value is None or value == "": | |
| return "—" | |
| return html.escape(str(value), quote=True) | |
| def html_link(label: Any, url: Any) -> str: | |
| if label is None or label == "": | |
| return "—" | |
| safe_label = html.escape(str(label), quote=True) | |
| if isinstance(url, str) and url.startswith(("http://", "https://")): | |
| safe_url = html.escape(url, quote=True) | |
| return f'<a href="{safe_url}" target="_blank" rel="noopener noreferrer">{safe_label}</a>' | |
| return safe_label | |
| def format_number(value: Any, digits: int | None = None) -> str: | |
| if value is None or value == "": | |
| return "—" | |
| if isinstance(value, bool): | |
| return str(value) | |
| try: | |
| number = float(value) | |
| if math.isnan(number) or math.isinf(number): | |
| return "—" | |
| if digits is not None: | |
| return f"{number:.{digits}f}".rstrip("0").rstrip(".") | |
| if number.is_integer(): | |
| return str(int(number)) | |
| return f"{number:.6f}".rstrip("0").rstrip(".") | |
| except (TypeError, ValueError, OverflowError): | |
| return html_text(value) | |
| def parse_optional_float(value: Any) -> float | None: | |
| if value is None: | |
| return None | |
| if isinstance(value, str) and not value.strip(): | |
| return None | |
| try: | |
| number = float(value) | |
| if math.isnan(number) or math.isinf(number): | |
| return None | |
| return number | |
| except (TypeError, ValueError, OverflowError): | |
| return None | |
| def records_from_index(index: dict[str, Any]) -> list[dict[str, Any]]: | |
| records = index.get("records", []) | |
| return records if isinstance(records, list) else [] | |
| def format_csv_cell(value: Any) -> Any: | |
| if value is None: | |
| return "" | |
| if isinstance(value, dict | list): | |
| return json.dumps(value, ensure_ascii=False, sort_keys=True) | |
| return value | |
| def write_archive_csv() -> str: | |
| index = load_index() | |
| records = records_from_index(index) | |
| extra_fields = sorted( | |
| {key for record in records for key in record if key not in RECORD_CSV_FIELDS} | |
| ) | |
| fieldnames = [ | |
| *INDEX_CSV_FIELDS, | |
| *RECORD_CSV_FIELDS, | |
| *extra_fields, | |
| "record_json", | |
| "index_errors_json", | |
| ] | |
| metadata = { | |
| "index_generated_at": index.get("generated_at"), | |
| "index_count": index.get("count"), | |
| "index_data_json_count": index.get("data_json_count"), | |
| "index_invalid_count": index.get("invalid_count"), | |
| "index_errors_json": index.get("errors", []), | |
| } | |
| rows = records or [{}] | |
| timestamp = utc_now().strftime("%Y%m%d_%H%M%S") | |
| csv_path = ( | |
| Path(get_upload_folder()) / f"heretic_reproducibles_archive_{timestamp}.csv" | |
| ) | |
| csv_path.parent.mkdir(parents=True, exist_ok=True) | |
| tmp = csv_path.with_suffix(csv_path.suffix + ".tmp") | |
| with tmp.open("w", encoding="utf-8-sig", newline="") as handle: | |
| writer = csv.DictWriter(handle, fieldnames=fieldnames, extrasaction="ignore") | |
| writer.writeheader() | |
| for record in rows: | |
| record = record if isinstance(record, dict) else {} | |
| row = {key: format_csv_cell(value) for key, value in metadata.items()} | |
| row.update( | |
| {key: format_csv_cell(record.get(key)) for key in RECORD_CSV_FIELDS} | |
| ) | |
| row.update({key: format_csv_cell(record.get(key)) for key in extra_fields}) | |
| row["record_json"] = format_csv_cell(record) | |
| writer.writerow(row) | |
| tmp.replace(csv_path) | |
| append_log("archive csv written", path=str(csv_path), records=len(records)) | |
| return str(csv_path) | |
| def prepare_archive_csv_download() -> dict[str, Any]: | |
| return gr.update(value=write_archive_csv()) | |
| def format_ratio(numerator: Any, denominator: Any) -> str: | |
| if numerator is None or numerator == "": | |
| return "—" | |
| if denominator is None or denominator == "": | |
| return format_number(numerator) | |
| return f"{format_number(numerator)}/{format_number(denominator)}" | |
| def sort_records_by_timestamp(records: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| return sorted( | |
| records, key=lambda item: str(item.get("timestamp") or ""), reverse=True | |
| ) | |
| SEARCH_FIELDS = [ | |
| "source_repo", | |
| "base_model", | |
| "heretic_version", | |
| "accelerator", | |
| "row_normalization", | |
| "good_prompts_dataset", | |
| "bad_prompts_dataset", | |
| "good_eval_dataset", | |
| "bad_eval_dataset", | |
| "pytorch_version", | |
| "python_version", | |
| "os_platform", | |
| ] | |
| def record_search_text(record: dict[str, Any]) -> str: | |
| return " ".join(str(record.get(key, "")) for key in SEARCH_FIELDS).lower() | |
| def filter_records( | |
| records: list[dict[str, Any]], | |
| search: str, | |
| max_kl: Any, | |
| max_refusals: Any, | |
| ) -> list[dict[str, Any]]: | |
| query = (search or "").strip().lower() | |
| kl_limit = parse_optional_float(max_kl) | |
| refusal_limit = parse_optional_float(max_refusals) | |
| filtered = records | |
| if query: | |
| filtered = [ | |
| record for record in filtered if query in record_search_text(record) | |
| ] | |
| if kl_limit is not None: | |
| filtered = [ | |
| record | |
| for record in filtered | |
| if (kl_val := parse_optional_float(record.get("kl_divergence"))) is not None | |
| and kl_val <= kl_limit | |
| ] | |
| if refusal_limit is not None: | |
| filtered = [ | |
| record | |
| for record in filtered | |
| if (ref_val := parse_optional_float(record.get("refusals"))) is not None | |
| and ref_val <= refusal_limit | |
| ] | |
| return sort_records_by_timestamp(filtered) | |
| def render_records_table_html(records: list[dict[str, Any]]) -> str: | |
| if not records: | |
| return '<div class="hx-empty">No records visible. Clear filters or press <b>Refresh</b>.</div>' | |
| center_columns = {3, 4, 5, 6, 7} | |
| head_parts: list[str] = [] | |
| for index, (column, sort_type) in enumerate(TABLE_HEADERS): | |
| class_attr = ' class="hx-center"' if index in center_columns else "" | |
| head_parts.append( | |
| f"<th{class_attr}>" | |
| f'<button class="hx-sort" type="button" onclick="sortHereticTable({index}, \'{html.escape(sort_type)}\', this)">' | |
| f"{html.escape(column)}</button>" | |
| "</th>" | |
| ) | |
| head = "".join(head_parts) | |
| body_rows: list[str] = [] | |
| for record in records: | |
| n_bad = record.get("n_bad_prompts") | |
| created_sort = html.escape( | |
| datetime_sort_value(record.get("timestamp")), quote=True | |
| ) | |
| kl_value = parse_optional_float(record.get("kl_divergence")) | |
| refusals_value = parse_optional_float(record.get("refusals")) | |
| row_index = len(body_rows) | |
| row_attrs = f' data-row-index="{row_index}" data-search="{html.escape(record_search_text(record), quote=True)}"' | |
| if kl_value is not None: | |
| row_attrs += f' data-kl="{kl_value}"' | |
| if refusals_value is not None: | |
| row_attrs += f' data-refusals="{refusals_value}"' | |
| row_cells = [ | |
| (html_link(record.get("source_repo"), record.get("source_repo_url")), ""), | |
| (html_link(record.get("base_model"), record.get("base_model_url")), ""), | |
| ( | |
| html_text(to_display_time(record.get("timestamp"))), | |
| f' data-sort-value="{created_sort}"', | |
| ), | |
| (html_text(record.get("heretic_version")), ' class="hx-center"'), | |
| ( | |
| f'<div class="hx-num">{format_number(record.get("kl_divergence"), 6)}</div>', | |
| ' class="hx-center"', | |
| ), | |
| ( | |
| f'<div class="hx-num">{format_ratio(record.get("refusals"), n_bad)}</div>', | |
| ' class="hx-center"', | |
| ), | |
| ( | |
| f'<div class="hx-num">{format_ratio(record.get("base_refusals"), n_bad)}</div>', | |
| ' class="hx-center"', | |
| ), | |
| ( | |
| f'<div class="hx-num">{format_number(record.get("n_trials"))}</div>', | |
| ' class="hx-center"', | |
| ), | |
| (f'<div class="hx-mono">{html_text(record.get("accelerator"))}</div>', ""), | |
| (html_link("open", record.get("reproduce_json_url")), ""), | |
| ] | |
| body_rows.append( | |
| f"<tr{row_attrs}>" | |
| + "".join(f"<td{attrs}>{cell}</td>" for cell, attrs in row_cells) | |
| + "</tr>" | |
| ) | |
| return ( | |
| '<div class="hx-table-wrap">' | |
| '<table class="hx-table">' | |
| f"<thead><tr>{head}</tr></thead>" | |
| f"<tbody>{''.join(body_rows)}</tbody>" | |
| "</table></div>" | |
| '<div class="hx-empty" data-filter-empty hidden>No records match the current filters.</div>' | |
| ) | |
| def render_collector_status_html( | |
| index: dict[str, Any], records: list[dict[str, Any]] | |
| ) -> str: | |
| st = load_status() | |
| last_ok = st.get("last_ok") | |
| if st.get("state") == "running": | |
| status_html = '<span class="hx-status-running">running</span>' | |
| elif last_ok is True: | |
| status_html = '<span class="hx-status-ok">ok</span>' | |
| elif last_ok is False: | |
| status_html = '<span class="hx-status-bad">failed</span>' | |
| else: | |
| status_html = "idle" | |
| unique_repos = len({r.get("source_repo") for r in records if r.get("source_repo")}) | |
| last_updated = to_display_time( | |
| st.get("last_finished_at") | |
| or st.get("last_started_at") | |
| or index.get("generated_at") | |
| ) | |
| return ( | |
| f"Collector status: {status_html} · Unique repositories: {unique_repos:,} · " | |
| f"Last updated: {last_updated}" | |
| ) | |
| def render_metric_cards_html( | |
| records: list[dict[str, Any]], | |
| filtered_count: int | None = None, | |
| ) -> str: | |
| visible = filtered_count if filtered_count is not None else len(records) | |
| unique_base = len({r.get("base_model") for r in records if r.get("base_model")}) | |
| cards = [ | |
| ("Indexed records", f"{len(records):,}"), | |
| ("Visible after filters", f"{visible:,}"), | |
| ("Unique base models", f"{unique_base:,}"), | |
| ] | |
| card_html = "".join( | |
| f'<article class="hx-card"><div class="hx-card-label">{html.escape(label)}</div><div class="hx-card-value">{html.escape(value)}</div></article>' | |
| for label, value in cards | |
| ) | |
| return ( | |
| f'<section class="hx-metrics" aria-label="Index metrics">{card_html}</section>' | |
| ) | |
| def render_header_html() -> str: | |
| logo_src = html.escape( | |
| os.getenv("LOGO_SRC", f"/gradio_api/file={LOGO_PATH.as_posix()}"), quote=True | |
| ) | |
| logo_html = ( | |
| f'<div class="hx-logo-wrap"><img class="hx-logo" src="{logo_src}" alt="Heretic logo"></div>' | |
| if LOGO_PATH.exists() | |
| else '<div class="hx-logo-wrap" aria-hidden="true"></div>' | |
| ) | |
| links_str = "\n ".join( | |
| f'<a class="hx-project-link" href="{html.escape(link["url"], quote=True)}" target="_blank" rel="noopener noreferrer" aria-label="{html.escape(link["label"], quote=True)}">' | |
| f"{link['icon']}{html.escape(link['label'], quote=True)}</a>" | |
| for link in SOCIAL_LINKS | |
| ) | |
| return f""" | |
| <header class="hx-hero"> | |
| <div class="hx-brand"> | |
| <pre class="hx-ascii">█░█░█▀▀░█▀▄░█▀▀░▀█▀░█░█▀▀ | |
| █▀█░█▀▀░█▀▄░█▀▀░░█░░█░█░░ | |
| ▀░▀░▀▀▀░▀░▀░▀▀▀░░▀░░▀░▀▀▀</pre> | |
| <p class="hx-title-copy">Search, filter, collect, and download reproducibility records from public Heretic model archives.<br>Updates twice daily.</p> | |
| </div> | |
| <nav class="hx-projects" aria-label="Project links"> | |
| <p class="hx-kicker">Project</p> | |
| {links_str} | |
| </nav> | |
| {logo_html} | |
| </header> | |
| """ | |
| def render_toolbar_html(search: str, max_kl: Any, max_refusals: Any) -> str: | |
| search_value = html.escape(str(search or ""), quote=True) | |
| max_kl_value = html.escape(str(max_kl or ""), quote=True) | |
| max_refusals_value = html.escape(str(max_refusals or ""), quote=True) | |
| return f''' | |
| <section class="hx-toolbar" aria-label="Filtering controls"> | |
| <label class="hx-field"><span>Search</span><input class="hx-input" data-bridge-input="bridge-search" value="{search_value}" placeholder="repo, base model, dataset, accelerator…" autocomplete="off"></label> | |
| <label class="hx-field"><span>Max KL</span><input class="hx-input" data-bridge-input="bridge-max-kl" value="{max_kl_value}" placeholder="blank = any" inputmode="decimal"></label> | |
| <label class="hx-field"><span>Max refusals</span><input class="hx-input" data-bridge-input="bridge-max-refusals" value="{max_refusals_value}" placeholder="blank = any" inputmode="numeric"></label> | |
| <div class="hx-actions"> | |
| <button class="hx-btn" type="button" data-bridge-click="bridge-refresh">Refresh</button> | |
| <button class="hx-btn" type="button" data-bridge-click="bridge-download">Download CSV</button> | |
| </div> | |
| </section> | |
| ''' | |
| def render_log_html(status_html: str) -> str: | |
| log_text = html.escape( | |
| read_log_tail() or "No collector log entries yet.", quote=False | |
| ) | |
| return f""" | |
| <section class="hx-log" aria-label="Collector log"> | |
| <div class="hx-log-head"><span class="hx-log-title">Collector log</span><span class="hx-log-status">{status_html}</span></div> | |
| <pre>{log_text}</pre> | |
| </section> | |
| """ | |
| def refresh_view(search: str, max_kl: Any, max_refusals: Any) -> tuple[str, str, str]: | |
| index = load_index() | |
| records = records_from_index(index) | |
| filtered = filter_records(records, search, max_kl, max_refusals) | |
| return ( | |
| render_metric_cards_html(records, len(filtered)), | |
| render_records_table_html(sort_records_by_timestamp(records)), | |
| render_log_html(render_collector_status_html(index, records)), | |
| ) | |
| def run_scheduled_collection() -> None: | |
| try: | |
| collect_and_index() | |
| except Exception as exc: | |
| append_log("scheduled collection failed", error=clean_log_text(exc)) | |
| def scheduled_hours() -> set[int]: | |
| hours: set[int] = set() | |
| for chunk in SCHEDULE_HOURS_UTC.split(","): | |
| chunk = chunk.strip() | |
| if not chunk: | |
| continue | |
| try: | |
| hour = int(chunk) | |
| except ValueError: | |
| continue | |
| if 0 <= hour <= 23: | |
| hours.add(hour) | |
| return hours or {0, 12} | |
| def run_scheduler_loop() -> None: | |
| hours = scheduled_hours() | |
| seen_slots: set[str] = set() | |
| while True: | |
| now = datetime.now(UTC) | |
| slot = now.strftime("%Y-%m-%dT%H:%M") | |
| if ( | |
| now.hour in hours | |
| and now.minute == SCHEDULE_MINUTE_UTC | |
| and slot not in seen_slots | |
| ): | |
| seen_slots.add(slot) | |
| run_scheduled_collection() | |
| if len(seen_slots) > 16: | |
| seen_slots = set(sorted(seen_slots)[-8:]) | |
| time.sleep(20) | |
| def start_scheduler() -> None: | |
| thread = threading.Thread( | |
| target=run_scheduler_loop, name="twice-daily-heretic-cli", daemon=True | |
| ) | |
| thread.start() | |
| def start_initial_collection_if_enabled() -> None: | |
| if RUN_ON_STARTUP: | |
| thread = threading.Thread( | |
| target=run_scheduled_collection, name="startup-heretic-cli", daemon=True | |
| ) | |
| thread.start() | |
| def build_gradio_app() -> gr.Blocks: | |
| index = load_index() | |
| records = records_from_index(index) | |
| filtered = filter_records(records, "", "", "") | |
| with gr.Blocks( | |
| title=APP_TITLE, | |
| fill_width=True, | |
| delete_cache=(3600, 3600), | |
| ) as demo: | |
| gr.HTML( | |
| value='<div class="hx-app">' + render_header_html() + "</div>", | |
| elem_id="app-shell-top", | |
| container=False, | |
| padding=False, | |
| ) | |
| metrics_html = gr.HTML( | |
| value=render_metric_cards_html(records, len(filtered)), | |
| elem_id="app-metrics", | |
| container=False, | |
| padding=False, | |
| ) | |
| gr.HTML( | |
| value=render_toolbar_html("", "", ""), | |
| elem_id="app-toolbar", | |
| container=False, | |
| padding=False, | |
| ) | |
| table_html = gr.HTML( | |
| value=render_records_table_html(sort_records_by_timestamp(records)), | |
| elem_id="app-table", | |
| container=False, | |
| padding=False, | |
| ) | |
| log_html = gr.HTML( | |
| value=render_log_html(render_collector_status_html(index, records)), | |
| elem_id="app-log", | |
| container=False, | |
| padding=False, | |
| ) | |
| search = gr.Textbox( | |
| value="", show_label=False, container=False, elem_id="bridge-search" | |
| ) | |
| max_kl = gr.Textbox( | |
| value="", show_label=False, container=False, elem_id="bridge-max-kl" | |
| ) | |
| max_refusals = gr.Textbox( | |
| value="", show_label=False, container=False, elem_id="bridge-max-refusals" | |
| ) | |
| refresh_btn = gr.Button("Refresh", elem_id="bridge-refresh") | |
| download_btn = gr.DownloadButton( | |
| "Download CSV", value=None, elem_id="bridge-download" | |
| ) | |
| inputs = [search, max_kl, max_refusals] | |
| outputs = [metrics_html, table_html, log_html] | |
| demo.load( | |
| refresh_view, | |
| inputs=inputs, | |
| outputs=outputs, | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| refresh_btn.click( | |
| refresh_view, | |
| inputs=inputs, | |
| outputs=outputs, | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| download_btn.click( | |
| prepare_archive_csv_download, | |
| outputs=[download_btn], | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| timer = gr.Timer(value=AUTO_REFRESH_SECONDS, active=True) | |
| timer.tick( | |
| refresh_view, | |
| inputs=inputs, | |
| outputs=outputs, | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| return demo | |
| start_scheduler() | |
| start_initial_collection_if_enabled() | |
| demo = build_gradio_app() | |
| if __name__ == "__main__": | |
| demo.queue(default_concurrency_limit=1).launch( | |
| theme=gr.themes.Base(), | |
| css=CUSTOM_CSS, | |
| js=TABLE_SORT_JS, | |
| allowed_paths=[str(BASE_DIR / "assets")], | |
| ssr_mode=False, | |
| ) | |