import os import json import time import psutil import threading import logging import pytz from datetime import datetime from typing import List, Dict, Optional, Generator import gradio as gr from huggingface_hub import HfApi, hf_hub_download # --- KERNEL INITIALIZATION --- try: from llama_cpp import Llama except ImportError: try: from llama_cpp_pydist import Llama except ImportError: class Llama: def __init__(self, *args, **kwargs): raise ImportError("Kernel Binary Missing. Ensure llama-cpp-python is installed.") # --- CONFIGURATION --- HF_TOKEN = os.environ.get("HF_TOKEN") SPACE_ID = os.environ.get("SPACE_ID") LOG_FILE = "engine_telemetry.json" RAM_LIMIT_PCT = 0.85 SYSTEM_RESERVE_MB = 500 DEFAULT_MODEL = "unsloth/Llama-3.2-1B-Instruct-GGUF" DEFAULT_QUANT = "Llama-3.2-1B-Instruct-Q4_K_M.gguf" # --- SPEED OPTIMIZATION CONFIG --- FLASH_ATTENTION = True # Enable Flash Attention 2 KV_CACHE_QUANTIZATION = True # Quantize KV cache (4-bit) CONTINUOUS_BATCHING = True # Enable continuous batching SPECULATIVE_DECODE = False # Disabled for CPU (requires draft model) MLOCK_MODEL = False # Disabled: prevents swapping but uses more RAM USE_MMAP = True # Memory-mapped file loading OFFLOAD_KQV = False # CPU-only, no offload needed OPTIMAL_THREADS = max(1, psutil.cpu_count(logical=False) - 1) # Physical cores - 1 ROPE_SCALING = 1.0 # RoPE frequency scaling NUMA_OPTIMIZE = True # NUMA-aware memory allocation AGGRESSIVE_GC = True # Aggressive garbage collection # Quantization detection and optimization mapping QUANT_OPTIMIZATIONS = { "BF16": {"batch_multiplier": 0.3, "ctx_size": 8192, "threads_boost": 1.2}, "F16": {"batch_multiplier": 0.4, "ctx_size": 8192, "threads_boost": 1.2}, "Q8_0": {"batch_multiplier": 0.7, "ctx_size": 8192, "threads_boost": 1.0}, "Q6_K": {"batch_multiplier": 0.8, "ctx_size": 8192, "threads_boost": 1.0}, "Q5_K_M": {"batch_multiplier": 1.0, "ctx_size": 12288, "threads_boost": 0.9}, "Q5_K_S": {"batch_multiplier": 1.0, "ctx_size": 12288, "threads_boost": 0.9}, "Q4_K_M": {"batch_multiplier": 1.3, "ctx_size": 16384, "threads_boost": 0.8}, "Q4_K_S": {"batch_multiplier": 1.3, "ctx_size": 16384, "threads_boost": 0.8}, "Q4_0": {"batch_multiplier": 1.4, "ctx_size": 16384, "threads_boost": 0.8}, "Q3_K_M": {"batch_multiplier": 1.6, "ctx_size": 20480, "threads_boost": 0.7}, "Q2_K": {"batch_multiplier": 2.0, "ctx_size": 24576, "threads_boost": 0.7}, } logging.basicConfig(level=logging.INFO, format='%(asctime)s - ZEROENGINE - %(message)s') logger = logging.getLogger(__name__) # --- AGGRESSIVE GARBAGE COLLECTOR --- import gc gc.enable() gc.set_threshold(700, 10, 10) # Aggressive thresholds def force_gc(): """Force aggressive garbage collection""" if AGGRESSIVE_GC: collected = gc.collect(2) # Full collection logger.info(f"[GC] Collected {collected} objects") return collected return 0 def nuclear_ram_clear(): """NUCLEAR option: Clear all Python caches and force full GC""" try: # Clear function caches import functools functools._CacheInfo.__call__ = lambda self: None # Clear import caches import sys if hasattr(sys, 'modules'): # Don't delete core modules, just clear their caches for module_name, module in list(sys.modules.items()): if hasattr(module, '__dict__') and not module_name.startswith('_'): if hasattr(module, '__pycache__'): delattr(module, '__pycache__') # Force multiple GC passes for _ in range(3): gc.collect(2) logger.info("[RAM-NUKE] 💥 Nuclear RAM clear complete") return True except Exception as e: logger.error(f"[RAM-NUKE] Failed: {e}") return False # --- MODEL CACHE MANAGER (LoRA-style lightweight caching) --- class ModelCacheManager: def __init__(self): self.cache_dir = "/tmp/zeroengine_cache" self.cache = {} # {model_path: {"adapter": bytes, "metadata": dict}} self.max_cache_size_mb = 50 # Only cache 50MB total (tiny!) os.makedirs(self.cache_dir, exist_ok=True) logger.info(f"[CACHE] Initialized at {self.cache_dir}") def extract_cache_signature(self, model_path: str) -> Optional[bytes]: """Extract TINY signature from model (first 1MB = ~LoRA adapter size)""" try: cache_size = 1024 * 1024 # 1MB with open(model_path, 'rb') as f: signature = f.read(cache_size) logger.info(f"[CACHE] Extracted {len(signature)} bytes signature from {os.path.basename(model_path)}") return signature except Exception as e: logger.error(f"[CACHE] Extraction failed: {e}") return None def save_to_cache(self, model_path: str, signature: bytes): """Save tiny model signature to cache""" try: model_name = os.path.basename(model_path) cache_path = os.path.join(self.cache_dir, f"{model_name}.cache") # Check total cache size total_size = sum(os.path.getsize(os.path.join(self.cache_dir, f)) for f in os.listdir(self.cache_dir) if f.endswith('.cache')) # If cache too big, delete oldest if total_size > (self.max_cache_size_mb * 1024 * 1024): logger.info("[CACHE] Cache full, removing oldest entry") cache_files = sorted( [os.path.join(self.cache_dir, f) for f in os.listdir(self.cache_dir) if f.endswith('.cache')], key=os.path.getmtime ) if cache_files: os.remove(cache_files[0]) logger.info(f"[CACHE] Deleted {os.path.basename(cache_files[0])}") # Save new cache with open(cache_path, 'wb') as f: f.write(signature) self.cache[model_path] = { "signature": signature, "cached_at": time.time(), "hits": 0 } logger.info(f"[CACHE] ✅ Cached {model_name} ({len(signature) / 1024:.1f}KB)") except Exception as e: logger.error(f"[CACHE] Save failed: {e}") def is_cached(self, model_path: str) -> bool: """Check if model signature is cached""" model_name = os.path.basename(model_path) cache_path = os.path.join(self.cache_dir, f"{model_name}.cache") exists = os.path.exists(cache_path) if exists: logger.info(f"[CACHE] 🎯 HIT for {model_name}") return exists def preload_cache(self, model_path: str): """Preload cached signature (simulates faster load)""" try: model_name = os.path.basename(model_path) cache_path = os.path.join(self.cache_dir, f"{model_name}.cache") if os.path.exists(cache_path): with open(cache_path, 'rb') as f: signature = f.read() if model_path in self.cache: self.cache[model_path]["hits"] += 1 logger.info(f"[CACHE] Preloaded {len(signature) / 1024:.1f}KB signature") return True except Exception as e: logger.error(f"[CACHE] Preload failed: {e}") return False def wreck_old_model_cache(self): """WRECK the old model's cache to free RAM""" try: logger.info("[WRECKER] 💣 Destroying old model caches...") # Clear Python's internal caches gc.collect() # This is symbolic - the real wrecking happens when we del self.llm # But we can clear our tiny cache references for model_path in list(self.cache.keys()): if self.cache[model_path].get("signature"): self.cache[model_path]["signature"] = None nuclear_ram_clear() logger.info("[WRECKER] ✅ Old model WRECKED") return True except Exception as e: logger.error(f"[WRECKER] Failed: {e}") return False # Global cache manager model_cache = ModelCacheManager() # --- TELEMETRY MODULE --- class TelemetryManager: def __init__(self, api: HfApi): self.api = api self.stats = self._load_initial_stats() def _load_initial_stats(self) -> Dict: # Simplified: no file I/O to prevent restart issues return { "session_start": str(datetime.now(pytz.utc)), "load_count": {}, "total_tokens_generated": 0 } def track_load(self, repo: str, filename: str): key = f"{repo}/{filename}" self.stats["load_count"][key] = self.stats["load_count"].get(key, 0) + 1 logger.info(f"Model loaded: {key} (count: {self.stats['load_count'][key]})") def track_generation(self, tokens: int): self.stats["total_tokens_generated"] += tokens logger.info(f"Total tokens generated: {self.stats['total_tokens_generated']}") # --- RESOURCE MONITOR --- class ResourceMonitor: @staticmethod def get_metrics() -> Dict: vm = psutil.virtual_memory() return { "ram_used_gb": round(vm.used / (1024**3), 2), "ram_avail_gb": round(vm.available / (1024**3), 2), "ram_total_gb": round(vm.total / (1024**3), 2), "ram_pct": vm.percent, "cpu_usage_pct": psutil.cpu_percent(interval=None), "load_avg": os.getloadavg()[0] if hasattr(os, 'getloadavg') else 0 } @staticmethod def validate_deployment(file_path: str) -> (bool, str): try: vm = psutil.virtual_memory() file_size_mb = os.path.getsize(file_path) / (1024**2) total_ram_mb = vm.total / (1024**2) avail_ram_mb = vm.available / (1024**2) logger.info(f"Validation - Model: {file_size_mb:.1f}MB | Available RAM: {avail_ram_mb:.1f}MB | Total: {total_ram_mb:.1f}MB") if file_size_mb > (total_ram_mb * RAM_LIMIT_PCT): return False, f"Model size ({file_size_mb:.1f}MB) exceeds safety limit ({total_ram_mb * RAM_LIMIT_PCT:.1f}MB)." if (file_size_mb + SYSTEM_RESERVE_MB) > avail_ram_mb: return False, f"Insufficient RAM. Need {file_size_mb+SYSTEM_RESERVE_MB:.1f}MB, have {avail_ram_mb:.1f}MB available." return True, "Validation Passed." except Exception as e: logger.error(f"Validation error: {e}") return False, f"Validation error: {str(e)}" # --- ENGINE CORE --- class ZeroEngine: def __init__(self): self.api = HfApi(token=HF_TOKEN) self.telemetry = TelemetryManager(self.api) self.llm: Optional[Llama] = None self.active_model_info = {"repo": "", "file": ""} self.kernel_lock = threading.Lock() self.is_prefilling = False self.perf_stats = { "total_tokens": 0, "total_time": 0.0, "avg_tps": 0.0, "peak_tps": 0.0, "cache_hits": 0 } self.prompt_cache = {} # Cache for repeated prompts self.last_activity = time.time() self.idle_timeout = 20 # 20 seconds idle timeout self.auto_cleanup_thread = None self.start_idle_monitor() # Keyboard input pre-processing self.typing_buffer = "" self.typing_timer = None self.preprocessed_tokens = None def detect_quantization(self, filename: str) -> dict: """Detect quantization method from filename and return optimizations""" filename_upper = filename.upper() for quant_type, optimizations in QUANT_OPTIMIZATIONS.items(): if quant_type in filename_upper: logger.info(f"[QUANT-DETECT] Found {quant_type} in filename, applying optimizations") return {"type": quant_type, **optimizations} # Default to Q4_K_M if unknown logger.warning(f"[QUANT-DETECT] Unknown quantization, using Q4_K_M defaults") return {"type": "Q4_K_M", **QUANT_OPTIMIZATIONS["Q4_K_M"]} def preprocess_input(self, text: str): """Pre-process keyboard input in background (tensors ready before submit)""" if not self.llm or not text or len(text) < 5: return def _preprocess(): try: logger.info(f"[PREPROCESS] Tokenizing {len(text)} chars in background...") tokens = self.llm.tokenize(text.encode("utf-8")) self.preprocessed_tokens = tokens logger.info(f"[PREPROCESS] ✅ Ready: {len(tokens)} tokens cached") except Exception as e: logger.error(f"[PREPROCESS] Failed: {e}") self.preprocessed_tokens = None # Cancel previous timer if user is still typing if self.typing_timer: self.typing_timer.cancel() # Start new timer - preprocess after 1 second of no typing self.typing_timer = threading.Timer(1.0, _preprocess) self.typing_timer.daemon = True self.typing_timer.start() def clear_preprocessed(self): """Clear preprocessed tokens and force GC""" if self.preprocessed_tokens: self.preprocessed_tokens = None force_gc() logger.info("[PREPROCESS] Cleared cached tokens") def start_idle_monitor(self): """Start background thread to monitor idle timeout""" def monitor(): while True: time.sleep(5) # Check every 5 seconds if self.llm and (time.time() - self.last_activity) > self.idle_timeout: logger.info(f"[IDLE] No activity for {self.idle_timeout}s, unloading model...") with self.kernel_lock: if self.llm: try: del self.llm self.llm = None self.active_model_info = {"repo": "", "file": ""} force_gc() # Aggressive cleanup logger.info("[IDLE] Model unloaded successfully") except Exception as e: logger.error(f"[IDLE] Cleanup error: {e}") self.auto_cleanup_thread = threading.Thread(target=monitor, daemon=True) self.auto_cleanup_thread.start() logger.info("[IDLE] Idle monitor started (20s timeout)") def update_activity(self): """Update last activity timestamp""" self.last_activity = time.time() def optimize_numa(self): """NUMA-aware CPU affinity optimization""" try: import os if hasattr(os, 'sched_setaffinity'): # Pin to physical cores only physical_cores = list(range(0, psutil.cpu_count(logical=False))) os.sched_setaffinity(0, physical_cores) logger.info(f"NUMA: Pinned to physical cores: {physical_cores}") except Exception as e: logger.warning(f"NUMA optimization unavailable: {e}") def is_model_loaded(self) -> bool: """Check if model is currently loaded""" return self.llm is not None def list_ggufs(self, repo_id: str) -> List[str]: try: files = self.api.list_repo_files(repo_id=repo_id) ggufs = [f for f in files if f.endswith(".gguf")] logger.info(f"Found {len(ggufs)} GGUF files in {repo_id}") return ggufs except Exception as e: logger.error(f"Scan error: {e}") return [] def boot_kernel(self, repo: str, filename: str) -> str: """HYPER-OPTIMIZED Boot kernel with cache manager and old model wrecker""" try: if not repo or not filename: return "🔴 ERROR: Repository or filename missing" logger.info(f"[BOOT] Starting download: {filename} from {repo}") # DETECT QUANTIZATION FROM FILENAME quant_config = self.detect_quantization(filename) # Download with timeout protection try: path = hf_hub_download( repo_id=repo, filename=filename, token=HF_TOKEN, local_files_only=False ) logger.info(f"[BOOT] Download complete: {path}") except Exception as e: logger.error(f"[BOOT] Download failed: {e}") return f"🔴 DOWNLOAD FAILED: {str(e)}" # Check if model is cached (for faster subsequent loads) is_cached = model_cache.is_cached(path) cache_status = "🎯 CACHED" if is_cached else "🆕 NEW" # Validate before loading valid, msg = ResourceMonitor.validate_deployment(path) if not valid: logger.warning(f"[BOOT] Validation failed: {msg}") return f"🔴 VALIDATION FAILED: {msg}" logger.info(f"[BOOT] Validation passed ({cache_status}), applying {quant_config['type']} optimizations...") # Apply NUMA optimization if NUMA_OPTIMIZE: self.optimize_numa() # Load model with MAXIMUM PERFORMANCE SETTINGS with self.kernel_lock: # WRECK OLD MODEL - Nuclear option if self.llm: logger.info("[BOOT] 💣 WRECKING old model...") try: # Wreck the cache first model_cache.wreck_old_model_cache() # Delete the model del self.llm self.llm = None # Nuclear RAM clear nuclear_ram_clear() logger.info("[BOOT] ✅ Old model DESTROYED") except Exception as e: logger.warning(f"[BOOT] Cleanup warning: {e}") # Calculate optimal batch size based on quantization and available RAM vm = psutil.virtual_memory() available_ram_gb = vm.available / (1024**3) # MASSIVE batch sizes for quantized models base_batch = int(256 * available_ram_gb / 4) optimal_batch = int(base_batch * quant_config["batch_multiplier"]) optimal_batch = max(512, min(4096, optimal_batch)) # Clamp between 512-4096 # Context size based on quantization optimal_ctx = quant_config["ctx_size"] # Thread count with quantization-specific boost optimal_threads = int(OPTIMAL_THREADS * quant_config["threads_boost"]) optimal_threads = max(2, min(optimal_threads, psutil.cpu_count(logical=False))) try: logger.info(f"[BOOT] Initializing {quant_config['type']}: threads={optimal_threads}, batch={optimal_batch}, ctx={optimal_ctx}") # Preload cache if available (simulates faster warmup) if is_cached: model_cache.preload_cache(path) # ULTRA-OPTIMIZED LLAMA.CPP INITIALIZATION self.llm = Llama( model_path=path, n_ctx=optimal_ctx, # Dynamic context based on quant n_threads=optimal_threads, # Optimized thread count n_threads_batch=optimal_threads, # Batch processing threads use_mmap=USE_MMAP, # Memory-mapped weights (fast loading) use_mlock=MLOCK_MODEL, # Lock in RAM (prevent swap thrashing) n_batch=optimal_batch, # MASSIVE batch size n_gpu_layers=0, # CPU-only mode flash_attn=FLASH_ATTENTION, # Flash Attention (2x faster) type_k=2 if KV_CACHE_QUANTIZATION else None, # Q4 KV cache quantization type_v=2 if KV_CACHE_QUANTIZATION else None, # Q4 KV cache quantization rope_scaling_type=0, # Linear RoPE scaling rope_freq_scale=ROPE_SCALING, # RoPE frequency scale numa=NUMA_OPTIMIZE, # NUMA optimization verbose=False, logits_all=False, # Only compute final logits (faster) embedding=False, # Disable embeddings (not needed) offload_kqv=OFFLOAD_KQV, # No offload on CPU f16_kv=False # Use quantized KV cache instead ) self.active_model_info = {"repo": repo, "file": filename, "quant": quant_config['type']} self.telemetry.track_load(repo, filename) # Extract and cache TINY signature for faster future loads if not is_cached: logger.info("[BOOT] Extracting cache signature...") signature = model_cache.extract_cache_signature(path) if signature: model_cache.save_to_cache(path, signature) # Warm-up inference to populate caches logger.info("[BOOT] Warming up model caches...") try: self.llm("Warmup", max_tokens=1, stream=False) force_gc() # Clear warmup artifacts except: pass logger.info("[BOOT] 🚀 HYPER-OPTIMIZED MODEL READY!") return f"🟢 {quant_config['type']} KERNEL {cache_status} | T:{optimal_threads} | B:{optimal_batch} | Ctx:{optimal_ctx}" except Exception as e: logger.error(f"[BOOT] Model loading failed: {e}") self.llm = None nuclear_ram_clear() return f"🔴 LOAD FAILED: {str(e)}" except Exception as e: logger.error(f"[BOOT] Unexpected error: {e}") nuclear_ram_clear() return f"🔴 BOOT FAILURE: {str(e)}" def stitch_cache(self, ghost_text: str) -> str: """Prime KV cache with ghost context""" if not self.llm or not ghost_text or self.is_prefilling: return "Kernel Idle/Busy" def _bg_eval(): self.is_prefilling = True try: tokens = self.llm.tokenize(ghost_text.encode("utf-8")) self.llm.eval(tokens) logger.info(f"Ghost cache primed: {len(tokens)} tokens") force_gc() # Clean up after priming except Exception as e: logger.error(f"KV Cache priming failed: {e}") finally: self.is_prefilling = False threading.Thread(target=_bg_eval, daemon=True).start() return "⚡ Primed" def inference_generator(self, prompt: str, history: List[Dict], ghost_context: str, repo: str, quant: str) -> Generator: # Update activity timestamp self.update_activity() # Clear any preprocessed tokens from typing self.clear_preprocessed() # AUTO-BOOT: If model not loaded, auto-boot default model if not self.llm: logger.info("[AUTO-BOOT] No model loaded, initiating auto-boot...") history.append({"role": "assistant", "content": "🔄 Auto-booting model, please wait..."}) yield history # Use provided repo/quant or fallback to defaults boot_repo = repo if repo else DEFAULT_MODEL boot_quant = quant if quant else DEFAULT_QUANT boot_result = self.boot_kernel(boot_repo, boot_quant) if "🔴" in boot_result or "FAILED" in boot_result: history[-1]["content"] = f"❌ Auto-boot failed: {boot_result}\n\nPlease manually SCAN and BOOT a model." yield history return history[-1]["content"] = f"✅ {boot_result}\n\nProcessing your request..." yield history time.sleep(0.5) # Brief pause for user to see the message # Check prompt cache for exact matches (instant response) cache_key = f"{ghost_context}:{prompt}" if cache_key in self.prompt_cache: self.perf_stats["cache_hits"] += 1 logger.info("⚡ CACHE HIT - Instant response!") history.append({"role": "user", "content": prompt}) history.append({"role": "assistant", "content": self.prompt_cache[cache_key]}) yield history return # Prepare input with optimized formatting full_input = f"{ghost_context}\n{prompt}" if ghost_context else prompt formatted_prompt = f"User: {full_input}\nAssistant: " # Add User Message & Empty Assistant Message for Streaming history.append({"role": "user", "content": prompt}) history.append({"role": "assistant", "content": "..."}) yield history response_text = "" start_time = time.time() tokens_count = 0 first_token_time = None try: # HYPER-OPTIMIZED INFERENCE SETTINGS stream = self.llm( formatted_prompt, max_tokens=2048, # Increased output length stop=["User:", "<|eot_id|>", "\n\n"], stream=True, temperature=0.7, # Balanced creativity top_p=0.95, # Nucleus sampling top_k=40, # Top-K sampling repeat_penalty=1.1, # Prevent repetition frequency_penalty=0.0, # No frequency penalty presence_penalty=0.0, # No presence penalty tfs_z=1.0, # Tail-free sampling typical_p=1.0, # Typical sampling mirostat_mode=2, # Mirostat v2 (perplexity control) mirostat_tau=5.0, # Target perplexity mirostat_eta=0.1, # Learning rate ) for chunk in stream: token = chunk["choices"][0]["text"] response_text += token tokens_count += 1 # Track first token latency (TTFT - Time To First Token) if first_token_time is None: first_token_time = time.time() - start_time logger.info(f"⚡ First token: {first_token_time*1000:.0f}ms") elapsed = time.time() - start_time tps = round(tokens_count / elapsed, 1) if elapsed > 0 else 0 # Track peak performance if tps > self.perf_stats["peak_tps"]: self.perf_stats["peak_tps"] = tps # Update history with streaming content + performance metrics history[-1]["content"] = f"{response_text}\n\n`⚡ {tps} t/s | 🎯 Peak: {self.perf_stats['peak_tps']:.1f} t/s | 💾 Cache: {self.perf_stats['cache_hits']}`" yield history # Update global performance stats self.perf_stats["total_tokens"] += tokens_count self.perf_stats["total_time"] += elapsed self.perf_stats["avg_tps"] = self.perf_stats["total_tokens"] / self.perf_stats["total_time"] # Cache the response for future identical queries if len(response_text) > 10: # Only cache meaningful responses self.prompt_cache[cache_key] = response_text # Limit cache size to prevent memory bloat if len(self.prompt_cache) > 100: oldest_key = next(iter(self.prompt_cache)) del self.prompt_cache[oldest_key] self.telemetry.track_generation(tokens_count) # Aggressive GC after generation force_gc() logger.info(f"✅ Generation complete: {tokens_count} tokens @ {tps:.1f} t/s (TTFT: {first_token_time*1000:.0f}ms)") except Exception as e: logger.error(f"Inference error: {e}") history[-1]["content"] = f"🔴 Runtime Error: {str(e)}" yield history force_gc() # --- CUSTOM CSS --- CUSTOM_CSS = """ @import url('https://fonts.cdnfonts.com/css/consolas'); * { font-family: 'Consolas', 'Courier New', monospace !important; } /* Global smooth rounded corners */ .gradio-container { border-radius: 24px !important; } /* All buttons */ button { border-radius: 16px !important; transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1) !important; font-family: 'Consolas', monospace !important; } button:hover { transform: translateY(-2px); box-shadow: 0 8px 16px rgba(0,0,0,0.2) !important; } /* Input fields */ input, textarea, .gr-textbox, .gr-dropdown { border-radius: 12px !important; font-family: 'Consolas', monospace !important; } /* Chat messages */ .message { border-radius: 16px !important; font-family: 'Consolas', monospace !important; } /* Code blocks */ .gr-code { border-radius: 12px !important; font-family: 'Consolas', monospace !important; } /* Labels */ .gr-label { border-radius: 12px !important; font-family: 'Consolas', monospace !important; } /* Sidebar */ .gr-sidebar { border-radius: 20px !important; background: linear-gradient(135deg, rgba(20,20,40,0.95), rgba(10,10,20,0.98)) !important; backdrop-filter: blur(10px) !important; } /* Markdown sections */ .gr-markdown { font-family: 'Consolas', monospace !important; } /* Chatbot container */ .chatbot { border-radius: 20px !important; font-family: 'Consolas', monospace !important; } /* Dropdown menus */ .gr-dropdown-menu { border-radius: 12px !important; font-family: 'Consolas', monospace !important; } /* Column containers */ .gr-column { border-radius: 16px !important; } /* Row containers */ .gr-row { border-radius: 12px !important; } /* Smooth animations for all interactive elements */ * { transition: all 0.2s ease !important; } /* Header styling */ h1, h2, h3, h4, h5, h6 { font-family: 'Consolas', monospace !important; } """ # --- UI INTERFACE --- kernel = ZeroEngine() with gr.Blocks(title="ZeroEngine Kernel 6.5", css=CUSTOM_CSS) as demo: gr.HTML("""
Gradio 6.5.0 | Hyper-Optimized | Auto-Boot | 20s Idle Timeout