import os import json import time import psutil import threading import logging import pytz from datetime import datetime from typing import List, Dict, Optional, Generator import gradio as gr from huggingface_hub import HfApi, hf_hub_download try: from llama_cpp import Llama except ImportError: try: from llama_cpp_pydist import Llama except ImportError: Llama = None HF_TOKEN = os.environ.get("HF_TOKEN") SPACE_ID = os.environ.get("SPACE_ID") LOG_FILE = "engine_telemetry.json" RAM_LIMIT_PCT = 0.50 SYSTEM_RESERVE_MB = 250 DEFAULT_MODEL = "unsloth/Llama-3.2-1B-Instruct-GGUF" DEFAULT_QUANT = "Llama-3.2-1B-Instruct-Q4_K_M.gguf" logging.basicConfig(level=logging.INFO, format='%(asctime)s - ZEROENGINE - %(message)s') logger = logging.getLogger(__name__) class TelemetryManager: def __init__(self, api: HfApi): self.api = api self.stats = self._load_initial_stats() def _load_initial_stats(self) -> Dict: if os.path.exists(LOG_FILE): try: with open(LOG_FILE, "r") as f: return json.load(f) except Exception as e: logger.error(f"Failed to load telemetry: {e}") return { "session_start": str(datetime.now(pytz.utc)), "load_count": {}, "total_tokens_generated": 0, "popular_repos": [] } def track_load(self, repo: str, filename: str): key = f"{repo}/{filename}" self.stats["load_count"][key] = self.stats["load_count"].get(key, 0) + 1 self._sync_to_cloud() def track_generation(self, tokens: int): self.stats["total_tokens_generated"] += tokens def _sync_to_cloud(self): if not HF_TOKEN or not SPACE_ID: return try: with open(LOG_FILE, "w") as f: json.dump(self.stats, f, indent=4) self.api.upload_file( path_or_fileobj=LOG_FILE, path_in_repo=LOG_FILE, repo_id=SPACE_ID, repo_type="space" ) except Exception as e: logger.warning(f"Telemetry sync failed: {e}") class ResourceMonitor: @staticmethod def get_metrics() -> Dict: vm = psutil.virtual_memory() return { "ram_used_gb": round(vm.used / (1024**3), 2), "ram_avail_gb": round(vm.available / (1024**3), 2), "ram_total_gb": round(vm.total / (1024**3), 2), "ram_pct": vm.percent, "cpu_usage_pct": psutil.cpu_percent(interval=None), "load_avg": os.getloadavg()[0] if hasattr(os, 'getloadavg') else 0 } @staticmethod def validate_deployment(file_path: str) -> (bool, str): vm = psutil.virtual_memory() file_size_mb = os.path.getsize(file_path) / (1024**2) total_ram_mb = vm.total / (1024**2) avail_ram_mb = vm.available / (1024**2) if file_size_mb > (total_ram_mb * RAM_LIMIT_PCT): return False, f"Model size ({file_size_mb:.1f}MB) exceeds 50% System RAM limit." if (file_size_mb + SYSTEM_RESERVE_MB) > avail_ram_mb: return False, f"Insufficient headroom. Need {SYSTEM_RESERVE_MB}MB buffer." return True, "Resource check passed." class ZeroEngine: def __init__(self): self.api = HfApi(token=HF_TOKEN) self.telemetry = TelemetryManager(self.api) self.llm: Optional[Llama] = None self.active_model_info = {"repo": "", "file": ""} self.kernel_lock = threading.Lock() self.is_prefilling = False def list_ggufs(self, repo_id: str) -> List[str]: try: files = self.api.list_repo_files(repo_id=repo_id) return [f for f in files if f.endswith(".gguf")] except Exception as e: return [] def boot_kernel(self, repo: str, filename: str) -> str: try: if Llama is None: return "🔴 KERNEL ERROR: llama-cpp-python not installed correctly." path = hf_hub_download(repo_id=repo, filename=filename, token=HF_TOKEN) valid, msg = ResourceMonitor.validate_deployment(path) if not valid: return msg with self.kernel_lock: if self.llm: del self.llm self.llm = Llama( model_path=path, n_ctx=2048, n_threads=2, use_mmap=True, n_batch=512, verbose=False ) self.active_model_info = {"repo": repo, "file": filename} self.telemetry.track_load(repo, filename) return f"🟢 KERNEL ONLINE: {filename} loaded successfully." except Exception as e: return f"🔴 BOOT FAILURE: {str(e)}" def stitch_cache(self, ghost_text: str) -> str: if not self.llm or not ghost_text: return "Kernel Idle" if self.is_prefilling: return "Kernel Busy" def _bg_eval(): self.is_prefilling = True try: tokens = self.llm.tokenize(ghost_text.encode("utf-8")) self.llm.eval(tokens) except Exception: pass finally: self.is_prefilling = False threading.Thread(target=_bg_eval, daemon=True).start() return "⚡ Ghost Cache Primed" def inference_generator(self, prompt: str, history: List, ghost_context: str) -> Generator: if not self.llm: yield history + [{"role": "assistant", "content": "Engine offline. Please load a model in the Sidebar."}] return full_input = f"{ghost_context}\n{prompt}" if ghost_context else prompt formatted_prompt = f"User: {full_input}\nAssistant: " response_text = "" start_time = time.time() tokens_count = 0 try: stream = self.llm( formatted_prompt, max_tokens=1024, stop=["User:", "\n\n"], stream=True ) for chunk in stream: token = chunk["choices"][0]["text"] response_text += token tokens_count += 1 elapsed = time.time() - start_time tps = round(tokens_count / elapsed, 1) if elapsed > 0 else 0 yield history + [ {"role": "user", "content": prompt}, {"role": "assistant", "content": f"{response_text}\n\n`[{tps} t/s]`"} ] self.telemetry.track_generation(tokens_count) except Exception as e: yield history + [{"role": "assistant", "content": f"Inference Error: {str(e)}"}] kernel = ZeroEngine() with gr.Blocks(title="ZeroEngine Kernel") as demo: gr.HTML("""
STATUS: HIGH-PERFORMANCE KERNEL / VCPU-PARTITIONED