import os import logging import torch from typing import Dict, Any, Optional, Union, Tuple from abc import ABC, abstractmethod import time # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class BaseModelLoader(ABC): """Abstract base class for model loaders""" @abstractmethod def load(self) -> Any: """Load and return the model""" pass @abstractmethod def generate(self, prompt: str, **kwargs) -> str: """Generate text from prompt""" pass @abstractmethod def get_model_info(self) -> Dict[str, Any]: """Get model information""" pass class TransformersModelLoader(BaseModelLoader): """Loader for Hugging Face Transformers models""" def __init__(self, model_name: str, model_type: str, device: Optional[str] = None): self.model_name = model_name self.model_type = model_type self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") self._model = None self._tokenizer = None self._pipeline = None def load(self): if self._pipeline is None: try: from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM, AutoModelForSeq2SeqLM logger.info(f"Loading Transformers model: {self.model_name} ({self.model_type})") torch.cuda.empty_cache() # Load tokenizer self._tokenizer = AutoTokenizer.from_pretrained( self.model_name, trust_remote_code=True, cache_dir=os.environ.get('HF_HOME', '/tmp/huggingface') ) if self.tokenizer.pad_token is None: self._tokenizer.pad_token = self._tokenizer.eos_token # Load model based on type if self.model_type == "text-generation": self._model = AutoModelForCausalLM.from_pretrained( self.model_name, trust_remote_code=True, device_map="auto" if self.device == "cuda" else None, low_cpu_mem_usage=True, torch_dtype=torch.float16 if self.device == "cuda" else torch.float32, cache_dir=os.environ.get('HF_HOME', '/tmp/huggingface') ) else: self._model = AutoModelForSeq2SeqLM.from_pretrained( self.model_name, trust_remote_code=True, device_map="auto" if self.device == "cuda" else None, low_cpu_mem_usage=True, torch_dtype=torch.float16 if self.device == "cuda" else torch.float32, cache_dir=os.environ.get('HF_HOME', '/tmp/huggingface') ) # Create pipeline device_id = 0 if self.device == "cuda" else -1 self._pipeline = pipeline( task=self.model_type, model=self._model, tokenizer=self._tokenizer, device=device_id ) logger.info(f"Transformers model loaded successfully: {self.model_name}") except Exception as e: logger.error(f"Failed to load Transformers model: {e}") raise RuntimeError(f"Transformers model loading failed: {str(e)}") return self._pipeline def generate(self, prompt: str, **kwargs) -> str: pipeline = self.load() try: if self.model_type == "text-generation": result = pipeline( prompt, max_new_tokens=kwargs.get('max_new_tokens', 4000), do_sample=kwargs.get('do_sample', False), temperature=kwargs.get('temperature', 0.7), pad_token_id=self._tokenizer.eos_token_id ) if isinstance(result, list) and result: return result[0].get('generated_text', '').replace(prompt, '').strip() return str(result) else: result = pipeline( prompt, max_length=kwargs.get('max_length', 512), min_length=kwargs.get('min_length', 50), do_sample=kwargs.get('do_sample', False) ) if isinstance(result, list) and result: return result[0].get('summary_text', str(result[0])) return str(result) except Exception as e: logger.error(f"Generation failed: {e}") raise RuntimeError(f"Text generation failed: {str(e)}") def get_model_info(self) -> Dict[str, Any]: return { "type": "transformers", "model_name": self.model_name, "model_type": self.model_type, "device": self.device, "loaded": self._pipeline is not None } @property def tokenizer(self): if self._tokenizer is None: self.load() return self._tokenizer @property def model(self): if self._model is None: self.load() return self._model class GGUFModelLoader(BaseModelLoader): """Loader for GGUF models using llama.cpp""" def __init__(self, model_name: str, filename: Optional[str] = None, device: Optional[str] = None): self.model_name = model_name self.filename = filename self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") self._pipeline = None def load(self): if self._pipeline is None: try: from .model_loader_gguf import GGUFModelPipeline logger.info(f"Loading GGUF model: {self.model_name}") if self.filename: self._pipeline = GGUFModelPipeline(self.model_name, self.filename) else: self._pipeline = GGUFModelPipeline(self.model_name) logger.info(f"GGUF model loaded successfully: {self.model_name}") except Exception as e: logger.error(f"Failed to load GGUF model: {e}") # Fallback to text-based response from .model_loader_gguf import create_fallback_pipeline self._pipeline = create_fallback_pipeline() logger.warning(f"Using fallback pipeline for {self.model_name}") return self._pipeline def generate(self, prompt: str, **kwargs) -> str: pipeline = self.load() try: max_tokens = kwargs.get('max_tokens', 4000) temperature = kwargs.get('temperature', 0.7) top_p = kwargs.get('top_p', 0.95) if hasattr(pipeline, 'generate_full_summary'): return pipeline.generate_full_summary( prompt, max_tokens=max_tokens, max_loops=kwargs.get('max_loops', 1) ) else: return pipeline.generate( prompt, max_tokens=max_tokens, temperature=temperature, top_p=top_p ) except Exception as e: logger.error(f"GGUF generation failed: {e}") raise RuntimeError(f"GGUF generation failed: {str(e)}") def get_model_info(self) -> Dict[str, Any]: return { "type": "gguf", "model_name": self.model_name, "filename": self.filename, "device": self.device, "loaded": self._pipeline is not None } class OpenVINOModelLoader(BaseModelLoader): """Loader for OpenVINO models""" def __init__(self, model_name: str, device: Optional[str] = None): self.model_name = model_name self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") self._pipeline = None def load(self): if self._pipeline is None: try: from .model_loader_spaces import get_openvino_pipeline logger.info(f"Loading OpenVINO model: {self.model_name}") self._pipeline = get_openvino_pipeline(self.model_name) logger.info(f"OpenVINO model loaded successfully: {self.model_name}") except Exception as e: logger.error(f"Failed to load OpenVINO model: {e}") raise RuntimeError(f"OpenVINO model loading failed: {str(e)}") return self._pipeline def generate(self, prompt: str, **kwargs) -> str: pipeline = self.load() try: # OpenVINO models typically use the same interface as transformers inputs = pipeline.tokenizer([prompt], return_tensors="pt") outputs = pipeline.model.generate( **inputs, max_new_tokens=kwargs.get('max_new_tokens', 500), do_sample=False, pad_token_id=pipeline.tokenizer.eos_token_id or 32000 ) return pipeline.tokenizer.decode(outputs[0], skip_special_tokens=True) except Exception as e: logger.error(f"OpenVINO generation failed: {e}") raise RuntimeError(f"OpenVINO generation failed: {str(e)}") def get_model_info(self) -> Dict[str, Any]: return { "type": "openvino", "model_name": self.model_name, "device": self.device, "loaded": self._pipeline is not None } class UnifiedModelManager: """Unified model manager that can handle any model type""" def __init__(self): self._model_cache: Dict[str, BaseModelLoader] = {} self._fallback_models = { "text-generation": "facebook/bart-base", "summarization": "Falconsai/medical_summarization", "ner": "dslim/bert-base-NER", "gguf": "microsoft/Phi-3-mini-4k-instruct-gguf" } def get_model_loader( self, model_name: str, model_type: str, filename: Optional[str] = None, force_reload: bool = False ) -> BaseModelLoader: """ Get a model loader for the specified model and type Args: model_name: Name or path of the model model_type: Type of model (text-generation, summarization, ner, gguf, openvino) filename: Optional filename for GGUF models force_reload: Force reload the model even if cached Returns: BaseModelLoader instance """ cache_key = f"{model_name}:{model_type}:{filename or ''}" if not force_reload and cache_key in self._model_cache: return self._model_cache[cache_key] try: # Determine loader type and create appropriate loader if model_type == "gguf": loader = GGUFModelLoader(model_name, filename) elif model_type == "openvino": loader = OpenVINOModelLoader(model_name) else: # Default to transformers for text-generation, summarization, ner, etc. loader = TransformersModelLoader(model_name, model_type) # Test load the model loader.load() # Cache the loader self._model_cache[cache_key] = loader logger.info(f"Model loader created successfully: {model_name} ({model_type})") return loader except Exception as e: logger.error(f"Failed to create model loader for {model_name} ({model_type}): {e}") # Try fallback model fallback_name = self._fallback_models.get(model_type) if fallback_name and fallback_name != model_name: logger.warning(f"Trying fallback model: {fallback_name}") try: if model_type == "gguf": loader = GGUFModelLoader(fallback_name) elif model_type == "openvino": loader = OpenVINOModelLoader(fallback_name) else: loader = TransformersModelLoader(fallback_name, model_type) loader.load() self._model_cache[cache_key] = loader logger.info(f"Fallback model loaded successfully: {fallback_name}") return loader except Exception as fallback_error: logger.error(f"Fallback model also failed: {fallback_error}") # Create a basic fallback from .model_loader_gguf import create_fallback_pipeline class FallbackLoader(BaseModelLoader): def __init__(self, model_name: str, model_type: str): self.model_name = model_name self.model_type = model_type self._pipeline = create_fallback_pipeline() def load(self): return self._pipeline def generate(self, prompt: str, **kwargs) -> str: return self._pipeline.generate(prompt, **kwargs) def get_model_info(self) -> Dict[str, Any]: return { "type": "fallback", "model_name": self.model_name, "model_type": self.model_type, "loaded": True } fallback_loader = FallbackLoader(model_name, model_type) self._model_cache[cache_key] = fallback_loader return fallback_loader def generate_text( self, model_name: str, model_type: str, prompt: str, filename: Optional[str] = None, **kwargs ) -> str: """ Generate text using the specified model Args: model_name: Name or path of the model model_type: Type of model prompt: Input prompt filename: Optional filename for GGUF models **kwargs: Additional generation parameters Returns: Generated text """ loader = self.get_model_loader(model_name, model_type, filename) return loader.generate(prompt, **kwargs) def get_model_info(self, model_name: str, model_type: str, filename: Optional[str] = None) -> Dict[str, Any]: """Get information about a specific model""" loader = self.get_model_loader(model_name, model_type, filename) return loader.get_model_info() def clear_cache(self): """Clear the model cache""" self._model_cache.clear() torch.cuda.empty_cache() logger.info("Model cache cleared") def list_loaded_models(self) -> Dict[str, Dict[str, Any]]: """List all loaded models and their information""" return { cache_key: loader.get_model_info() for cache_key, loader in self._model_cache.items() } # Global instance model_manager = UnifiedModelManager()