""" ZEROENGINE KERNEL V0.1 Target SDK: Gradio 6.5.0 Optimized for: 2 vCPU / 16GB RAM Features: KV-Cache Stitching, Hard Partitioning, Resource Gatekeeper, Ghost Terminal """ import os import json import time import psutil import threading import logging from datetime import datetime from typing import List, Dict, Optional, Generator import gradio as gr from huggingface_hub import HfApi, hf_hub_download try: from llama_cpp import Llama except ImportError: from llama_cpp_pydist import Llama # ========================================== # SYSTEM CONFIGURATION & CONSTANTS # ========================================== HF_TOKEN = os.environ.get("HF_TOKEN") SPACE_ID = os.environ.get("SPACE_ID") LOG_FILE = "engine_telemetry.json" RAM_LIMIT_PCT = 0.50 SYSTEM_RESERVE_MB = 250 DEFAULT_MODEL = "unsloth/Llama-3.2-1B-Instruct-GGUF" DEFAULT_QUANT = "Llama-3.2-1B-Instruct-Q4_K_M.gguf" logging.basicConfig(level=logging.INFO, format='%(asctime)s - ZEROENGINE - %(message)s') logger = logging.getLogger(__name__) # ========================================== # CORE TELEMETRY & PERSISTENCE # ========================================== class TelemetryManager: """Handles JSON-based usage tracking and HF Space persistence.""" def __init__(self, api: HfApi): self.api = api self.stats = self._load_initial_stats() def _load_initial_stats(self) -> Dict: if os.path.exists(LOG_FILE): try: with open(LOG_FILE, "r") as f: return json.load(f) except Exception as e: logger.error(f"Failed to load telemetry: {e}") return { "session_start": str(datetime.now()), "load_count": {}, "total_tokens_generated": 0, "popular_repos": [] } def track_load(self, repo: str, filename: str): key = f"{repo}/{filename}" self.stats["load_count"][key] = self.stats["load_count"].get(key, 0) + 1 self._sync_to_cloud() def track_generation(self, tokens: int): self.stats["total_tokens_generated"] += tokens def _sync_to_cloud(self): if not HF_TOKEN or not SPACE_ID: return try: with open(LOG_FILE, "w") as f: json.dump(self.stats, f, indent=4) self.api.upload_file( path_or_fileobj=LOG_FILE, path_in_repo=LOG_FILE, repo_id=SPACE_ID, repo_type="space" ) logger.info("Telemetry synced to Space repository.") except Exception as e: logger.warning(f"Telemetry sync failed: {e}") # ========================================== # RESOURCE GATEKEEPER # ========================================== class ResourceMonitor: """Monitors vCPU and RAM to prevent Kernel Panics.""" @staticmethod def get_metrics() -> Dict: vm = psutil.virtual_memory() cpu_freq = psutil.cpu_freq() return { "ram_used_gb": round(vm.used / (1024**3), 2), "ram_avail_gb": round(vm.available / (1024**3), 2), "ram_total_gb": round(vm.total / (1024**3), 2), "ram_pct": vm.percent, "cpu_usage_pct": psutil.cpu_percent(interval=None), "load_avg": os.getloadavg()[0] if hasattr(os, 'getloadavg') else 0 } @staticmethod def validate_deployment(file_path: str) -> (bool, str): vm = psutil.virtual_memory() file_size_mb = os.path.getsize(file_path) / (1024**2) total_ram_mb = vm.total / (1024**2) avail_ram_mb = vm.available / (1024**2) if file_size_mb > (total_ram_mb * RAM_LIMIT_PCT): return False, f"Model size ({file_size_mb:.1f}MB) exceeds 50% System RAM limit." if (file_size_mb + SYSTEM_RESERVE_MB) > avail_ram_mb: return False, f"Insufficient headroom. Need {SYSTEM_RESERVE_MB}MB buffer." return True, "Resource check passed." # ========================================== # THE ZEROENGINE KERNEL # ========================================== class ZeroEngine: def __init__(self): self.api = HfApi(token=HF_TOKEN) self.telemetry = TelemetryManager(self.api) self.llm: Optional[Llama] = None self.active_model_info = {"repo": "", "file": ""} self.kernel_lock = threading.Lock() self.is_prefilling = False def list_ggufs(self, repo_id: str) -> List[str]: try: files = self.api.list_repo_files(repo_id=repo_id) return [f for f in files if f.endswith(".gguf")] except Exception as e: logger.error(f"HF API Error: {e}") return [] def boot_kernel(self, repo: str, filename: str) -> str: """Downloads and initializes the llama-cpp-python instance.""" try: logger.info(f"Booting Kernel with {filename}...") path = hf_hub_download(repo_id=repo, filename=filename, token=HF_TOKEN) valid, msg = ResourceMonitor.validate_deployment(path) if not valid: return msg with self.kernel_lock: if self.llm: del self.llm self.llm = Llama( model_path=path, n_ctx=4096, n_threads=1, use_mmap=True, n_batch=512, last_n_tokens_size=64, verbose=False ) self.active_model_info = {"repo": repo, "file": filename} self.telemetry.track_load(repo, filename) return f"🟢 KERNEL ONLINE: {filename} loaded successfully." except Exception as e: return f"🔴 BOOT FAILURE: {str(e)}" def stitch_cache(self, ghost_text: str) -> str: """KV-CACHE STITCHING: Pre-processes queue tokens in background.""" if not self.llm or not ghost_text: return "Kernel Idle" if self.is_prefilling: return "Kernel Busy" def _bg_eval(): self.is_prefilling = True try: tokens = self.llm.tokenize(ghost_text.encode("utf-8")) self.llm.eval(tokens) logger.info(f"KV-Cache stitched for {len(tokens)} tokens.") except Exception as e: logger.error(f"Stitching failed: {e}") finally: self.is_prefilling = False threading.Thread(target=_bg_eval, daemon=True).start() return "⚡ Ghost Cache Primed" def inference_generator(self, prompt: str, history: List, ghost_context: str) -> Generator: """Main chat generator using prefix-matched context.""" if not self.llm: yield history + [{"role": "assistant", "content": "Engine offline. Please load a model in the Sidebar."}] return full_input = f"{ghost_context}\n{prompt}" if ghost_context else prompt formatted_prompt = f"User: {full_input}\nAssistant: " response_text = "" start_time = time.time() tokens_count = 0 try: stream = self.llm( formatted_prompt, max_tokens=1024, stop=["User:", "\n\n"], stream=True ) for chunk in stream: token = chunk["choices"][0]["text"] response_text += token tokens_count += 1 elapsed = time.time() - start_time tps = round(tokens_count / elapsed, 1) if elapsed > 0 else 0 yield history + [ {"role": "user", "content": prompt}, {"role": "assistant", "content": f"{response_text}\n\n`[{tps} t/s]`"} ] self.telemetry.track_generation(tokens_count) except Exception as e: yield history + [{"role": "assistant", "content": f"Inference Error: {str(e)}"}] # ========================================== # GRADIO INTERFACE (DASHBOARD) # ========================================== kernel = ZeroEngine() with gr.Blocks( title="ZeroEngine Kernel", theme=gr.themes.Monochrome(primary_hue="blue", radius_size="none"), css=".gradio-container {background-color: #fafafa;} #sidebar {border-left: 1px solid #ddd;}" ) as demo: gr.HTML("""
STATUS: HIGH-PERFORMANCE KERNEL / VCPU-PARTITIONED