""" Vosk Speech-to-Text for HuggingFace Spaces. Simple one-shot transcription: receives audio blob, returns text. For live preview, frontend sends the growing recording periodically. Uses the small English model (~50MB, CPU-only). """ import os import json import time import logging import subprocess import tempfile import threading logger = logging.getLogger(__name__) VOSK_MODEL_PATH = "/app/vosk-model" _model = None _initialized = False _stt_lock = threading.Lock() def _ts(): return time.strftime("%H:%M:%S", time.gmtime()) + f".{int(time.time()*1000)%1000:03d}" def initialize(): """Load the Vosk model at startup.""" global _model, _initialized if _initialized: return True t0 = time.time() logger.info(f"[{_ts()}] [STT] Initializing Vosk...") if not os.path.exists(VOSK_MODEL_PATH): logger.error(f"[{_ts()}] [STT] Model not found at {VOSK_MODEL_PATH}") return False try: from vosk import Model, SetLogLevel SetLogLevel(-1) _model = Model(VOSK_MODEL_PATH) _initialized = True logger.info(f"[{_ts()}] [STT] ✓ Vosk ready in {time.time()-t0:.2f}s") return True except Exception as e: logger.error(f"[{_ts()}] [STT] Failed to load Vosk: {e}", exc_info=True) return False def transcribe_audio(audio_bytes: bytes, content_type: str = "audio/webm") -> str: """ Transcribe audio bytes to text. Thread-safe — uses a lock to avoid concurrent ffmpeg/vosk issues. Args: audio_bytes: Raw audio data from browser MediaRecorder content_type: MIME type of the audio Returns: Transcribed text string, or empty string on failure """ if not _initialized or _model is None: return "" if not audio_bytes or len(audio_bytes) < 500: return "" with _stt_lock: return _do_transcribe(audio_bytes, content_type) def _do_transcribe(audio_bytes: bytes, content_type: str) -> str: """Actual transcription work (must be called under lock).""" t0 = time.time() tmp_in_path = None tmp_pcm_path = None try: from vosk import KaldiRecognizer # Save incoming audio to temp file suffix = ".webm" if "webm" in content_type else ".ogg" with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp_in: tmp_in.write(audio_bytes) tmp_in_path = tmp_in.name # Convert to raw 16kHz mono PCM using ffmpeg tmp_pcm_path = tmp_in_path + ".pcm" cmd = [ "ffmpeg", "-y", "-i", tmp_in_path, "-ar", "16000", "-ac", "1", "-f", "s16le", tmp_pcm_path ] result = subprocess.run(cmd, capture_output=True, timeout=10) if result.returncode != 0: return "" # Read PCM and transcribe with open(tmp_pcm_path, "rb") as f: pcm_data = f.read() if len(pcm_data) < 1600: # less than 0.05s of audio return "" rec = KaldiRecognizer(_model, 16000) # Feed in chunks offset = 0 while offset < len(pcm_data): chunk = pcm_data[offset:offset + 8000] offset += 8000 rec.AcceptWaveform(chunk) # Get final result final = json.loads(rec.FinalResult()) text = final.get("text", "").strip() elapsed = time.time() - t0 if text: logger.info(f"[{_ts()}] [STT] Transcribed in {elapsed:.2f}s: \"{text[:80]}\"") return text except Exception as e: logger.error(f"[{_ts()}] [STT] Error: {e}") return "" finally: for p in [tmp_in_path, tmp_pcm_path]: if p: try: os.unlink(p) except Exception: pass