""" TTS module - Groq Orpheus API for fast cloud-based speech synthesis. Uses the diana voice from canopylabs/orpheus-v1-english. No local GPU needed — calls Groq's API endpoint. """ import os import re import time import logging logger = logging.getLogger(__name__) TEMP_DIR = "/tmp/tts_output" SAMPLE_RATE = 24000 # Groq Orpheus config MODEL = "canopylabs/orpheus-v1-english" VOICE = os.environ.get("GROQ_TTS_VOICE", "diana") RESPONSE_FORMAT = "wav" # Singleton _client = None _initialized = False _current_voice = VOICE def set_voice(voice_name: str): """Change the TTS voice at runtime.""" global _current_voice _current_voice = voice_name logger.info(f"[{_ts()}] [TTS] Voice set to: {_current_voice}") def _ts(): return time.strftime("%H:%M:%S", time.gmtime()) + f".{int(time.time()*1000)%1000:03d}" def ensure_temp_dir(): os.makedirs(TEMP_DIR, exist_ok=True) return TEMP_DIR def initialize(): """Initialize the Groq client.""" global _client, _initialized if _initialized: return t0 = time.time() logger.info(f"[{_ts()}] [TTS] Initializing Groq Orpheus TTS...") from groq import Groq api_key = os.environ.get("GROQ_API_KEY") if not api_key: logger.error(f"[{_ts()}] [TTS] GROQ_API_KEY not set!") return _client = Groq(api_key=api_key) _initialized = True logger.info(f"[{_ts()}] [TTS] ✓ Groq Orpheus ready in {time.time()-t0:.2f}s | voice: {_current_voice} | model: {MODEL}") def _clean_text_for_tts(text): """Remove tags and asterisk actions.""" text = re.sub(r'<[^>]+>', '', text) text = re.sub(r'\*[^*]+\*', '', text) text = re.sub(r'\s+', ' ', text).strip() return text def generate_audio(text: str, output_filename: str = None) -> str: """ Generate speech audio from text using Groq Orpheus API. Returns path to wav file, or None on failure. """ if not text or not text.strip(): return None text = _clean_text_for_tts(text) if not text: return None if not _initialized or _client is None: logger.error(f"[{_ts()}] [TTS] Not initialized!") return None temp_dir = ensure_temp_dir() if output_filename is None: timestamp = int(time.time() * 1000) output_filename = f"tts_{timestamp}" if not output_filename.endswith('.wav'): output_path = os.path.join(temp_dir, f"{output_filename}.wav") else: output_path = os.path.join(temp_dir, output_filename) try: t0 = time.time() logger.info(f"[{_ts()}] [TTS] Generating: {text[:60]}...") # Groq Orpheus has 200 char limit per request — split if needed chunks = _split_text(text, max_chars=195) all_audio = [] for i, chunk in enumerate(chunks): t1 = time.time() response = _client.audio.speech.create( model=MODEL, voice=_current_voice, input=chunk, response_format=RESPONSE_FORMAT, ) # Read the audio bytes audio_bytes = response.read() all_audio.append(audio_bytes) logger.info(f"[{_ts()}] [TTS] Chunk {i+1}/{len(chunks)}: {len(chunk)} chars → {len(audio_bytes)/1024:.0f}KB in {time.time()-t1:.2f}s") # If single chunk, write directly; if multiple, concatenate WAV data if len(all_audio) == 1: with open(output_path, "wb") as f: f.write(all_audio[0]) else: _concatenate_wav_files(all_audio, output_path) file_size = os.path.getsize(output_path) total = time.time() - t0 # Get duration via ffprobe (more reliable than wave module for 48kHz) duration = 0 try: import subprocess probe = subprocess.run( ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", output_path], capture_output=True, text=True ) duration = float(probe.stdout.strip()) except Exception: pass logger.info( f"[{_ts()}] [TTS] Saved: {output_path} ({file_size/1024:.0f}KB) " f"| audio: {duration:.1f}s | total: {total:.2f}s" ) return output_path except Exception as e: logger.error(f"[{_ts()}] [TTS] Error: {e}", exc_info=True) return None def _split_text(text: str, max_chars: int = 195) -> list: """Split text into chunks under max_chars, breaking at sentence boundaries.""" if len(text) <= max_chars: return [text] chunks = [] sentences = re.split(r'(?<=[.!?])\s+', text) current = "" for sentence in sentences: if len(sentence) > max_chars: # Single sentence too long — split at comma or space if current: chunks.append(current.strip()) current = "" words = sentence.split() for word in words: if len(current) + len(word) + 1 > max_chars: if current: chunks.append(current.strip()) current = word else: current = f"{current} {word}" if current else word elif len(current) + len(sentence) + 1 > max_chars: chunks.append(current.strip()) current = sentence else: current = f"{current} {sentence}" if current else sentence if current.strip(): chunks.append(current.strip()) return chunks if chunks else [text] def _concatenate_wav_files(audio_chunks: list, output_path: str): """Concatenate multiple WAV byte chunks into a single WAV file using ffmpeg.""" import tempfile import subprocess # Write each chunk to a temp file temp_files = [] try: for i, chunk_bytes in enumerate(audio_chunks): tf = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) tf.write(chunk_bytes) tf.close() temp_files.append(tf.name) # Build ffmpeg concat filter inputs = [] for f in temp_files: inputs += ["-i", f] filter_str = "".join(f"[{i}:a]" for i in range(len(temp_files))) + f"concat=n={len(temp_files)}:v=0:a=1[out]" cmd = ["ffmpeg", "-y"] + inputs + [ "-filter_complex", filter_str, "-map", "[out]", output_path ] result = subprocess.run(cmd, capture_output=True) if result.returncode != 0: logger.error(f"[{_ts()}] [TTS] ffmpeg concat error: {result.stderr.decode()[:300]}") finally: for f in temp_files: try: os.unlink(f) except OSError: pass