import os import json import time import psutil import threading import logging import pytz from datetime import datetime from typing import List, Dict, Optional, Generator import gradio as gr from huggingface_hub import HfApi, hf_hub_download # --- KERNEL INITIALIZATION --- try: from llama_cpp import Llama except ImportError: try: from llama_cpp_pydist import Llama except ImportError: class Llama: def __init__(self, *args, **kwargs): raise ImportError("Kernel Binary Missing. Ensure llama-cpp-python is installed.") # --- CONFIGURATION --- HF_TOKEN = os.environ.get("HF_TOKEN") SPACE_ID = os.environ.get("SPACE_ID") LOG_FILE = "engine_telemetry.json" RAM_LIMIT_PCT = 0.50 SYSTEM_RESERVE_MB = 250 DEFAULT_MODEL = "unsloth/Llama-3.2-1B-Instruct-GGUF" DEFAULT_QUANT = "Llama-3.2-1B-Instruct-Q4_K_M.gguf" logging.basicConfig(level=logging.INFO, format='%(asctime)s - ZEROENGINE - %(message)s') logger = logging.getLogger(__name__) # --- TELEMETRY MODULE --- class TelemetryManager: def __init__(self, api: HfApi): self.api = api self.stats = self._load_initial_stats() def _load_initial_stats(self) -> Dict: if os.path.exists(LOG_FILE): try: with open(LOG_FILE, "r", encoding="utf-8") as f: return json.load(f) except Exception: pass return { "session_start": str(datetime.now(pytz.utc)), "load_count": {}, "total_tokens_generated": 0, "popular_repos": [] } def track_load(self, repo: str, filename: str): key = f"{repo}/{filename}" self.stats["load_count"][key] = self.stats["load_count"].get(key, 0) + 1 self._sync_to_cloud() def track_generation(self, tokens: int): self.stats["total_tokens_generated"] += tokens def _sync_to_cloud(self): if not HF_TOKEN or not SPACE_ID: return try: with open(LOG_FILE, "w", encoding="utf-8") as f: json.dump(self.stats, f, indent=4) self.api.upload_file( path_or_fileobj=LOG_FILE, path_in_repo=LOG_FILE, repo_id=SPACE_ID, repo_type="space" ) except Exception as e: logger.error(f"Sync Failure: {e}") # --- RESOURCE MONITOR --- class ResourceMonitor: @staticmethod def get_metrics() -> Dict: vm = psutil.virtual_memory() return { "ram_used_gb": round(vm.used / (1024**3), 2), "ram_avail_gb": round(vm.available / (1024**3), 2), "ram_total_gb": round(vm.total / (1024**3), 2), "ram_pct": vm.percent, "cpu_usage_pct": psutil.cpu_percent(interval=None), "load_avg": os.getloadavg()[0] if hasattr(os, 'getloadavg') else 0 } @staticmethod def validate_deployment(file_path: str) -> (bool, str): vm = psutil.virtual_memory() file_size_mb = os.path.getsize(file_path) / (1024**2) total_ram_mb = vm.total / (1024**2) avail_ram_mb = vm.available / (1024**2) if file_size_mb > (total_ram_mb * RAM_LIMIT_PCT): return False, f"Model size ({file_size_mb:.1f}MB) exceeds safety limit." if (file_size_mb + SYSTEM_RESERVE_MB) > avail_ram_mb: return False, f"Insufficient headroom for context (Need ~{file_size_mb+SYSTEM_RESERVE_MB:.1f}MB)." return True, "Passed." # --- ENGINE CORE --- class ZeroEngine: def __init__(self): self.api = HfApi(token=HF_TOKEN) self.telemetry = TelemetryManager(self.api) self.llm: Optional[Llama] = None self.active_model_info = {"repo": "", "file": ""} self.kernel_lock = threading.Lock() self.is_prefilling = False def list_ggufs(self, repo_id: str) -> List[str]: try: files = self.api.list_repo_files(repo_id=repo_id) return [f for f in files if f.endswith(".gguf")] except Exception as e: logger.error(f"Scan error: {e}") return [] def boot_kernel(self, repo: str, filename: str) -> str: try: logger.info(f"Downloading {filename} from {repo}...") path = hf_hub_download(repo_id=repo, filename=filename, token=HF_TOKEN) valid, msg = ResourceMonitor.validate_deployment(path) if not valid: return msg with self.kernel_lock: if self.llm: del self.llm self.llm = Llama( model_path=path, n_ctx=2048, n_threads=2, use_mmap=True, n_batch=512, verbose=False ) self.active_model_info = {"repo": repo, "file": filename} self.telemetry.track_load(repo, filename) return f"🟢 KERNEL ONLINE: {filename}" except Exception as e: return f"🔴 BOOT FAILURE: {str(e)}" def stitch_cache(self, ghost_text: str) -> str: if not self.llm or not ghost_text or self.is_prefilling: return "Kernel Idle/Busy" def _bg_eval(): self.is_prefilling = True try: tokens = self.llm.tokenize(ghost_text.encode("utf-8")) self.llm.eval(tokens) except Exception as e: logger.error(f"KV Cache priming failed: {e}") finally: self.is_prefilling = False threading.Thread(target=_bg_eval, daemon=True).start() return "⚡ Ghost Cache Primed" def inference_generator(self, prompt: str, history: List[Dict], ghost_context: str) -> Generator: if not self.llm: history.append({"role": "assistant", "content": "⚠️ Engine offline. BOOT a kernel first."}) yield history return # Prepare input full_input = f"{ghost_context}\n{prompt}" if ghost_context else prompt formatted_prompt = f"User: {full_input}\nAssistant: " # Add User Message & Empty Assistant Message for Streaming history.append({"role": "user", "content": prompt}) history.append({"role": "assistant", "content": "..."}) yield history response_text = "" start_time = time.time() tokens_count = 0 try: stream = self.llm( formatted_prompt, max_tokens=1024, stop=["User:", "<|eot_id|>", "\n\n"], stream=True ) for chunk in stream: token = chunk["choices"][0]["text"] response_text += token tokens_count += 1 elapsed = time.time() - start_time tps = round(tokens_count / elapsed, 1) if elapsed > 0 else 0 # Gradio 6.5.0: Update history dict structure history[-1]["content"] = f"{response_text}\n\n`[{tps} t/s]`" yield history self.telemetry.track_generation(tokens_count) except Exception as e: history[-1]["content"] = f"🔴 Runtime Error: {str(e)}" yield history # --- UI INTERFACE --- kernel = ZeroEngine() # Removed 'theme' from gr.Blocks constructor (Moved to .launch()) with gr.Blocks(title="ZeroEngine Kernel 6.5") as demo: gr.HTML("
Gradio 6.5.0 Production Build