ZeroEngine / app.py
turtle170's picture
Update app.py
daa3fb2 verified
Raw
History Blame
11.4 kB
import os
import json
import time
import psutil
import threading
import logging
import pytz
from datetime import datetime
from typing import List, Dict, Optional, Generator
import gradio as gr
from huggingface_hub import HfApi, hf_hub_download
# --- KERNEL INITIALIZATION ---
try:
from llama_cpp import Llama
except ImportError:
try:
from llama_cpp_pydist import Llama
except ImportError:
class Llama:
def __init__(self, *args, **kwargs):
raise ImportError("Kernel Binary Missing. Ensure llama-cpp-python is installed.")
# --- CONFIGURATION ---
HF_TOKEN = os.environ.get("HF_TOKEN")
SPACE_ID = os.environ.get("SPACE_ID")
LOG_FILE = "engine_telemetry.json"
RAM_LIMIT_PCT = 0.50
SYSTEM_RESERVE_MB = 250
DEFAULT_MODEL = "unsloth/Llama-3.2-1B-Instruct-GGUF"
DEFAULT_QUANT = "Llama-3.2-1B-Instruct-Q4_K_M.gguf"
logging.basicConfig(level=logging.INFO, format='%(asctime)s - ZEROENGINE - %(message)s')
logger = logging.getLogger(__name__)
# --- TELEMETRY MODULE ---
class TelemetryManager:
def __init__(self, api: HfApi):
self.api = api
self.stats = self._load_initial_stats()
def _load_initial_stats(self) -> Dict:
if os.path.exists(LOG_FILE):
try:
with open(LOG_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return {
"session_start": str(datetime.now(pytz.utc)),
"load_count": {},
"total_tokens_generated": 0,
"popular_repos": []
}
def track_load(self, repo: str, filename: str):
key = f"{repo}/{filename}"
self.stats["load_count"][key] = self.stats["load_count"].get(key, 0) + 1
self._sync_to_cloud()
def track_generation(self, tokens: int):
self.stats["total_tokens_generated"] += tokens
def _sync_to_cloud(self):
if not HF_TOKEN or not SPACE_ID:
return
try:
with open(LOG_FILE, "w", encoding="utf-8") as f:
json.dump(self.stats, f, indent=4)
self.api.upload_file(
path_or_fileobj=LOG_FILE,
path_in_repo=LOG_FILE,
repo_id=SPACE_ID,
repo_type="space"
)
except Exception as e:
logger.error(f"Sync Failure: {e}")
# --- RESOURCE MONITOR ---
class ResourceMonitor:
@staticmethod
def get_metrics() -> Dict:
vm = psutil.virtual_memory()
return {
"ram_used_gb": round(vm.used / (1024**3), 2),
"ram_avail_gb": round(vm.available / (1024**3), 2),
"ram_total_gb": round(vm.total / (1024**3), 2),
"ram_pct": vm.percent,
"cpu_usage_pct": psutil.cpu_percent(interval=None),
"load_avg": os.getloadavg()[0] if hasattr(os, 'getloadavg') else 0
}
@staticmethod
def validate_deployment(file_path: str) -> (bool, str):
vm = psutil.virtual_memory()
file_size_mb = os.path.getsize(file_path) / (1024**2)
total_ram_mb = vm.total / (1024**2)
avail_ram_mb = vm.available / (1024**2)
if file_size_mb > (total_ram_mb * RAM_LIMIT_PCT):
return False, f"Model size ({file_size_mb:.1f}MB) exceeds safety limit."
if (file_size_mb + SYSTEM_RESERVE_MB) > avail_ram_mb:
return False, f"Insufficient headroom for context (Need ~{file_size_mb+SYSTEM_RESERVE_MB:.1f}MB)."
return True, "Passed."
# --- ENGINE CORE ---
class ZeroEngine:
def __init__(self):
self.api = HfApi(token=HF_TOKEN)
self.telemetry = TelemetryManager(self.api)
self.llm: Optional[Llama] = None
self.active_model_info = {"repo": "", "file": ""}
self.kernel_lock = threading.Lock()
self.is_prefilling = False
def list_ggufs(self, repo_id: str) -> List[str]:
try:
files = self.api.list_repo_files(repo_id=repo_id)
return [f for f in files if f.endswith(".gguf")]
except Exception as e:
logger.error(f"Scan error: {e}")
return []
def boot_kernel(self, repo: str, filename: str) -> str:
try:
logger.info(f"Downloading {filename} from {repo}...")
path = hf_hub_download(repo_id=repo, filename=filename, token=HF_TOKEN)
valid, msg = ResourceMonitor.validate_deployment(path)
if not valid:
return msg
with self.kernel_lock:
if self.llm:
del self.llm
self.llm = Llama(
model_path=path,
n_ctx=2048,
n_threads=2,
use_mmap=True,
n_batch=512,
verbose=False
)
self.active_model_info = {"repo": repo, "file": filename}
self.telemetry.track_load(repo, filename)
return f"🟢 KERNEL ONLINE: {filename}"
except Exception as e:
return f"🔴 BOOT FAILURE: {str(e)}"
def stitch_cache(self, ghost_text: str) -> str:
if not self.llm or not ghost_text or self.is_prefilling:
return "Kernel Idle/Busy"
def _bg_eval():
self.is_prefilling = True
try:
tokens = self.llm.tokenize(ghost_text.encode("utf-8"))
self.llm.eval(tokens)
except Exception as e:
logger.error(f"KV Cache priming failed: {e}")
finally:
self.is_prefilling = False
threading.Thread(target=_bg_eval, daemon=True).start()
return "⚡ Ghost Cache Primed"
def inference_generator(self, prompt: str, history: List[Dict], ghost_context: str) -> Generator:
if not self.llm:
history.append({"role": "assistant", "content": "⚠️ Engine offline. BOOT a kernel first."})
yield history
return
# Prepare input
full_input = f"{ghost_context}\n{prompt}" if ghost_context else prompt
formatted_prompt = f"User: {full_input}\nAssistant: "
# Add User Message & Empty Assistant Message for Streaming
history.append({"role": "user", "content": prompt})
history.append({"role": "assistant", "content": "..."})
yield history
response_text = ""
start_time = time.time()
tokens_count = 0
try:
stream = self.llm(
formatted_prompt,
max_tokens=1024,
stop=["User:", "<|eot_id|>", "\n\n"],
stream=True
)
for chunk in stream:
token = chunk["choices"][0]["text"]
response_text += token
tokens_count += 1
elapsed = time.time() - start_time
tps = round(tokens_count / elapsed, 1) if elapsed > 0 else 0
# Gradio 6.5.0: Update history dict structure
history[-1]["content"] = f"{response_text}\n\n`[{tps} t/s]`"
yield history
self.telemetry.track_generation(tokens_count)
except Exception as e:
history[-1]["content"] = f"🔴 Runtime Error: {str(e)}"
yield history
# --- UI INTERFACE ---
kernel = ZeroEngine()
# Removed 'theme' from gr.Blocks constructor (Moved to .launch())
with gr.Blocks(title="ZeroEngine Kernel 6.5") as demo:
gr.HTML("<div style='text-align: center; border-bottom: 2px solid #333; margin-bottom: 20px;'><h1>🛰️ ZEROENGINE V0.1</h1><p>Gradio 6.5.0 Production Build</p></div>")
with gr.Row():
with gr.Column(scale=8):
# FIXED: Removed 'type="messages"' (deprecated/auto-detected in 6.5.0)
chat_box = gr.Chatbot(
label="Main Engine Feedback",
height=650,
show_label=False,
autoscroll=True
)
with gr.Row():
user_input = gr.Textbox(
placeholder="Input command...",
label="Terminal",
container=False,
scale=9
)
send_btn = gr.Button("EXE", variant="primary", scale=1)
with gr.Sidebar(label="Engine Room", open=True, width=350):
gr.Markdown("### 🛠️ Hardware Status")
ram_metric = gr.Label(label="RAM Usage", value="0/0 GB")
cpu_metric = gr.Label(label="CPU Load", value="0%")
gr.Markdown("---")
gr.Markdown("### 📡 Model Control")
repo_input = gr.Textbox(label="HuggingFace Repo", value=DEFAULT_MODEL)
quant_dropdown = gr.Dropdown(label="Available Quants", choices=[])
with gr.Row():
scan_btn = gr.Button("SCAN", size="sm")
boot_btn = gr.Button("BOOT", variant="primary", size="sm")
boot_status = gr.Markdown("Status: `STANDBY`")
gr.Markdown("---")
gr.Markdown("### 👻 Ghost Cache")
ghost_buffer = gr.Textbox(
label="Background Context",
placeholder="Queue priming tokens here...",
lines=3
)
stitch_status = gr.Markdown("Cache: `EMPTY`")
stitch_btn = gr.Button("STITCH", size="sm")
log_output = gr.Code(label="Kernel Logs", language="shell", value="[INIT] System Ready.")
# --- UI LOGIC ---
def update_stats():
m = ResourceMonitor.get_metrics()
return f"{m['ram_used_gb']}/{m['ram_total_gb']} GB", f"{m['cpu_usage_pct']}%"
def on_scan(repo):
files = kernel.list_ggufs(repo)
if not files:
return gr.update(choices=[], value=None), "No GGUFs found in repo."
return gr.update(choices=files, value=files[0]), f"Found {len(files)} quants."
def on_boot(repo, file):
if not repo or not file:
return "Selection Missing", gr.update()
yield "System: Booting Kernel...", gr.update()
res = kernel.boot_kernel(repo, file)
yield res, gr.update()
# FIXED: Use gr.Timer for periodic updates (Gradio 6.5.0 compatible)
timer = gr.Timer(value=2)
timer.tick(update_stats, None, [ram_metric, cpu_metric])
scan_btn.click(on_scan, [repo_input], [quant_dropdown, log_output])
boot_btn.click(on_boot, [repo_input, quant_dropdown], [boot_status, log_output])
stitch_btn.click(
lambda x: f"Cache: `{kernel.stitch_cache(x)}`",
[ghost_buffer],
[stitch_status]
)
inference_args = [user_input, chat_box, ghost_buffer]
user_input.submit(kernel.inference_generator, inference_args, [chat_box])
send_btn.click(kernel.inference_generator, inference_args, [chat_box])
user_input.submit(lambda: "", None, [user_input])
# --- LAUNCH ---
if __name__ == "__main__":
# FIXED: Theme and CSS parameters moved here for Gradio 6 compatibility
demo.queue(max_size=20).launch(
server_name="0.0.0.0",
share=False,
theme=gr.themes.Monochrome(primary_hue="blue", radius_size="none")
)