ZeroEngine / app.py
turtle170's picture
Update app.py
438c44e verified
Raw
History Blame
17.3 kB
import os
import json
import time
import psutil
import threading
import logging
import pytz
from datetime import datetime
from typing import List, Dict, Optional, Generator
import gradio as gr
from huggingface_hub import HfApi, hf_hub_download
# --- KERNEL INITIALIZATION ---
try:
from llama_cpp import Llama
except ImportError:
try:
from llama_cpp_pydist import Llama
except ImportError:
class Llama:
def __init__(self, *args, **kwargs):
raise ImportError("Kernel Binary Missing. Ensure llama-cpp-python is installed.")
# --- CONFIGURATION ---
HF_TOKEN = os.environ.get("HF_TOKEN")
SPACE_ID = os.environ.get("SPACE_ID")
LOG_FILE = "engine_telemetry.json"
RAM_LIMIT_PCT = 0.85 # Increased from 0.50 to prevent false rejections
SYSTEM_RESERVE_MB = 500 # Increased reserve
DEFAULT_MODEL = "unsloth/Llama-3.2-1B-Instruct-GGUF"
DEFAULT_QUANT = "Llama-3.2-1B-Instruct-Q4_K_M.gguf"
logging.basicConfig(level=logging.INFO, format='%(asctime)s - ZEROENGINE - %(message)s')
logger = logging.getLogger(__name__)
# --- TELEMETRY MODULE ---
class TelemetryManager:
def __init__(self, api: HfApi):
self.api = api
self.stats = self._load_initial_stats()
def _load_initial_stats(self) -> Dict:
if os.path.exists(LOG_FILE):
try:
with open(LOG_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return {
"session_start": str(datetime.now(pytz.utc)),
"load_count": {},
"total_tokens_generated": 0,
"popular_repos": []
}
def track_load(self, repo: str, filename: str):
key = f"{repo}/{filename}"
self.stats["load_count"][key] = self.stats["load_count"].get(key, 0) + 1
self._sync_to_cloud()
def track_generation(self, tokens: int):
self.stats["total_tokens_generated"] += tokens
def _sync_to_cloud(self):
if not HF_TOKEN or not SPACE_ID:
return
try:
with open(LOG_FILE, "w", encoding="utf-8") as f:
json.dump(self.stats, f, indent=4)
self.api.upload_file(
path_or_fileobj=LOG_FILE,
path_in_repo=LOG_FILE,
repo_id=SPACE_ID,
repo_type="space"
)
except Exception as e:
logger.error(f"Sync Failure: {e}")
# --- RESOURCE MONITOR ---
class ResourceMonitor:
@staticmethod
def get_metrics() -> Dict:
vm = psutil.virtual_memory()
return {
"ram_used_gb": round(vm.used / (1024**3), 2),
"ram_avail_gb": round(vm.available / (1024**3), 2),
"ram_total_gb": round(vm.total / (1024**3), 2),
"ram_pct": vm.percent,
"cpu_usage_pct": psutil.cpu_percent(interval=None),
"load_avg": os.getloadavg()[0] if hasattr(os, 'getloadavg') else 0
}
@staticmethod
def validate_deployment(file_path: str) -> (bool, str):
try:
vm = psutil.virtual_memory()
file_size_mb = os.path.getsize(file_path) / (1024**2)
total_ram_mb = vm.total / (1024**2)
avail_ram_mb = vm.available / (1024**2)
logger.info(f"Validation - Model: {file_size_mb:.1f}MB | Available RAM: {avail_ram_mb:.1f}MB | Total: {total_ram_mb:.1f}MB")
if file_size_mb > (total_ram_mb * RAM_LIMIT_PCT):
return False, f"Model size ({file_size_mb:.1f}MB) exceeds safety limit ({total_ram_mb * RAM_LIMIT_PCT:.1f}MB)."
if (file_size_mb + SYSTEM_RESERVE_MB) > avail_ram_mb:
return False, f"Insufficient RAM. Need {file_size_mb+SYSTEM_RESERVE_MB:.1f}MB, have {avail_ram_mb:.1f}MB available."
return True, "Validation Passed."
except Exception as e:
logger.error(f"Validation error: {e}")
return False, f"Validation error: {str(e)}"
# --- ENGINE CORE ---
class ZeroEngine:
def __init__(self):
self.api = HfApi(token=HF_TOKEN)
self.telemetry = TelemetryManager(self.api)
self.llm: Optional[Llama] = None
self.active_model_info = {"repo": "", "file": ""}
self.kernel_lock = threading.Lock()
self.is_prefilling = False
def list_ggufs(self, repo_id: str) -> List[str]:
try:
files = self.api.list_repo_files(repo_id=repo_id)
ggufs = [f for f in files if f.endswith(".gguf")]
logger.info(f"Found {len(ggufs)} GGUF files in {repo_id}")
return ggufs
except Exception as e:
logger.error(f"Scan error: {e}")
return []
def boot_kernel(self, repo: str, filename: str) -> str:
"""Boot kernel with proper error handling to prevent space crashes"""
try:
if not repo or not filename:
return "πŸ”΄ ERROR: Repository or filename missing"
logger.info(f"[BOOT] Starting download: {filename} from {repo}")
# Download with timeout protection
try:
path = hf_hub_download(
repo_id=repo,
filename=filename,
token=HF_TOKEN,
local_files_only=False
)
logger.info(f"[BOOT] Download complete: {path}")
except Exception as e:
logger.error(f"[BOOT] Download failed: {e}")
return f"πŸ”΄ DOWNLOAD FAILED: {str(e)}"
# Validate before loading
valid, msg = ResourceMonitor.validate_deployment(path)
if not valid:
logger.warning(f"[BOOT] Validation failed: {msg}")
return f"πŸ”΄ VALIDATION FAILED: {msg}"
logger.info("[BOOT] Validation passed, initializing model...")
# Load model with proper cleanup
with self.kernel_lock:
# Clear previous model
if self.llm:
logger.info("[BOOT] Clearing previous model...")
try:
del self.llm
self.llm = None
except Exception as e:
logger.warning(f"[BOOT] Cleanup warning: {e}")
# Initialize new model with conservative settings
try:
logger.info("[BOOT] Loading model into memory...")
self.llm = Llama(
model_path=path,
n_ctx=2048,
n_threads=2,
use_mmap=True, # Critical: memory map to reduce RAM usage
n_batch=256, # Reduced from 512 to be safer
n_gpu_layers=0, # Force CPU only
verbose=False
)
self.active_model_info = {"repo": repo, "file": filename}
self.telemetry.track_load(repo, filename)
logger.info("[BOOT] Model loaded successfully!")
return f"🟒 KERNEL ONLINE: {filename}"
except Exception as e:
logger.error(f"[BOOT] Model loading failed: {e}")
self.llm = None
return f"πŸ”΄ LOAD FAILED: {str(e)}"
except Exception as e:
logger.error(f"[BOOT] Unexpected error: {e}")
return f"πŸ”΄ BOOT FAILURE: {str(e)}"
def stitch_cache(self, ghost_text: str) -> str:
if not self.llm or not ghost_text or self.is_prefilling:
return "Kernel Idle/Busy"
def _bg_eval():
self.is_prefilling = True
try:
tokens = self.llm.tokenize(ghost_text.encode("utf-8"))
self.llm.eval(tokens)
logger.info(f"Ghost cache primed: {len(tokens)} tokens")
except Exception as e:
logger.error(f"KV Cache priming failed: {e}")
finally:
self.is_prefilling = False
threading.Thread(target=_bg_eval, daemon=True).start()
return "⚑ Ghost Cache Primed"
def inference_generator(self, prompt: str, history: List[Dict], ghost_context: str) -> Generator:
if not self.llm:
history.append({"role": "assistant", "content": "⚠️ Engine offline. BOOT a kernel first."})
yield history
return
# Prepare input
full_input = f"{ghost_context}\n{prompt}" if ghost_context else prompt
formatted_prompt = f"User: {full_input}\nAssistant: "
# Add User Message & Empty Assistant Message for Streaming
history.append({"role": "user", "content": prompt})
history.append({"role": "assistant", "content": "..."})
yield history
response_text = ""
start_time = time.time()
tokens_count = 0
try:
stream = self.llm(
formatted_prompt,
max_tokens=1024,
stop=["User:", "<|eot_id|>", "\n\n"],
stream=True
)
for chunk in stream:
token = chunk["choices"][0]["text"]
response_text += token
tokens_count += 1
elapsed = time.time() - start_time
tps = round(tokens_count / elapsed, 1) if elapsed > 0 else 0
# Update history with streaming content
history[-1]["content"] = f"{response_text}\n\n`[{tps} t/s]`"
yield history
self.telemetry.track_generation(tokens_count)
except Exception as e:
logger.error(f"Inference error: {e}")
history[-1]["content"] = f"πŸ”΄ Runtime Error: {str(e)}"
yield history
# --- CUSTOM CSS ---
CUSTOM_CSS = """
@import url('https://fonts.cdnfonts.com/css/consolas');
* {
font-family: 'Consolas', 'Courier New', monospace !important;
}
/* Global smooth rounded corners */
.gradio-container {
border-radius: 24px !important;
}
/* All buttons */
button {
border-radius: 16px !important;
transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1) !important;
font-family: 'Consolas', monospace !important;
}
button:hover {
transform: translateY(-2px);
box-shadow: 0 8px 16px rgba(0,0,0,0.2) !important;
}
/* Input fields */
input, textarea, .gr-textbox, .gr-dropdown {
border-radius: 12px !important;
font-family: 'Consolas', monospace !important;
}
/* Chat messages */
.message {
border-radius: 16px !important;
font-family: 'Consolas', monospace !important;
}
/* Code blocks */
.gr-code {
border-radius: 12px !important;
font-family: 'Consolas', monospace !important;
}
/* Labels */
.gr-label {
border-radius: 12px !important;
font-family: 'Consolas', monospace !important;
}
/* Sidebar */
.gr-sidebar {
border-radius: 20px !important;
background: linear-gradient(135deg, rgba(20,20,40,0.95), rgba(10,10,20,0.98)) !important;
backdrop-filter: blur(10px) !important;
}
/* Markdown sections */
.gr-markdown {
font-family: 'Consolas', monospace !important;
}
/* Chatbot container */
.chatbot {
border-radius: 20px !important;
font-family: 'Consolas', monospace !important;
}
/* Dropdown menus */
.gr-dropdown-menu {
border-radius: 12px !important;
font-family: 'Consolas', monospace !important;
}
/* Column containers */
.gr-column {
border-radius: 16px !important;
}
/* Row containers */
.gr-row {
border-radius: 12px !important;
}
/* Smooth animations for all interactive elements */
* {
transition: all 0.2s ease !important;
}
/* Header styling */
h1, h2, h3, h4, h5, h6 {
font-family: 'Consolas', monospace !important;
}
"""
# --- UI INTERFACE ---
kernel = ZeroEngine()
with gr.Blocks(title="ZeroEngine Kernel 6.5", css=CUSTOM_CSS) as demo:
gr.HTML("""
<div style='text-align: center; padding: 30px; border-radius: 24px;
background: linear-gradient(135deg, #1a1a2e 0%, #16213e 100%);
margin-bottom: 30px; box-shadow: 0 10px 30px rgba(0,0,0,0.3);'>
<h1 style='margin: 0; font-size: 3em; background: linear-gradient(90deg, #00d4ff, #7b2ff7);
-webkit-background-clip: text; -webkit-text-fill-color: transparent;
font-family: Consolas, monospace;'>
πŸ›°οΈ ZEROENGINE V0.1
</h1>
<p style='margin: 10px 0 0 0; color: #888; font-family: Consolas, monospace;'>
Gradio 6.5.0 Production Build | Smooth Rounded UI
</p>
</div>
""")
with gr.Row():
with gr.Column(scale=8):
chat_box = gr.Chatbot(
label="Main Engine Feedback",
height=650,
show_label=False,
autoscroll=True,
container=True
)
with gr.Row():
user_input = gr.Textbox(
placeholder="Input command...",
label="Terminal",
container=False,
scale=9
)
send_btn = gr.Button("EXE", variant="primary", scale=1)
with gr.Column(scale=3):
gr.Markdown("### πŸ› οΈ Hardware Status")
ram_metric = gr.Label(label="RAM Usage", value="0/0 GB")
cpu_metric = gr.Label(label="CPU Load", value="0%")
gr.Markdown("---")
gr.Markdown("### πŸ“‘ Model Control")
repo_input = gr.Textbox(label="HuggingFace Repo", value=DEFAULT_MODEL)
quant_dropdown = gr.Dropdown(label="Available Quants", choices=[], interactive=True)
with gr.Row():
scan_btn = gr.Button("SCAN", size="sm")
boot_btn = gr.Button("BOOT", variant="primary", size="sm")
boot_status = gr.Markdown("Status: `STANDBY`")
gr.Markdown("---")
gr.Markdown("### πŸ‘» Ghost Cache")
ghost_buffer = gr.Textbox(
label="Background Context",
placeholder="Queue priming tokens here...",
lines=3
)
stitch_status = gr.Markdown("Cache: `EMPTY`")
stitch_btn = gr.Button("STITCH", size="sm")
log_output = gr.Code(
label="Kernel Logs",
language="shell",
value="[INIT] System Ready.",
lines=5
)
# --- UI LOGIC ---
def update_stats():
try:
m = ResourceMonitor.get_metrics()
return f"{m['ram_used_gb']}/{m['ram_total_gb']} GB", f"{m['cpu_usage_pct']}%"
except Exception as e:
logger.error(f"Stats update error: {e}")
return "Error", "Error"
def on_scan(repo):
try:
if not repo:
return gr.update(choices=[], value=None), "⚠️ Please enter a repository ID"
logger.info(f"Scanning repository: {repo}")
files = kernel.list_ggufs(repo)
if not files:
return gr.update(choices=[], value=None), f"❌ No GGUFs found in {repo}"
return gr.update(choices=files, value=files[0]), f"βœ… Found {len(files)} GGUF file(s)"
except Exception as e:
logger.error(f"Scan error: {e}")
return gr.update(choices=[], value=None), f"πŸ”΄ Scan failed: {str(e)}"
def on_boot(repo, file):
try:
if not repo or not file:
yield "πŸ”΄ ERROR: Repository and filename required", gr.update()
return
yield "βš™οΈ System: Initiating boot sequence...", gr.update()
time.sleep(0.5) # Small delay for UI feedback
result = kernel.boot_kernel(repo, file)
yield result, gr.update()
except Exception as e:
logger.error(f"Boot UI error: {e}")
yield f"πŸ”΄ BOOT ERROR: {str(e)}", gr.update()
# Timer for periodic stats updates
timer = gr.Timer(value=2)
timer.tick(update_stats, None, [ram_metric, cpu_metric])
# Event handlers
scan_btn.click(on_scan, [repo_input], [quant_dropdown, log_output])
boot_btn.click(on_boot, [repo_input, quant_dropdown], [boot_status, log_output])
stitch_btn.click(
lambda x: f"Cache: `{kernel.stitch_cache(x)}`",
[ghost_buffer],
[stitch_status]
)
inference_args = [user_input, chat_box, ghost_buffer]
user_input.submit(kernel.inference_generator, inference_args, [chat_box])
send_btn.click(kernel.inference_generator, inference_args, [chat_box])
user_input.submit(lambda: "", None, [user_input])
# --- LAUNCH ---
if __name__ == "__main__":
demo.queue(max_size=20).launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)