| """ |
| TTS module - Groq Orpheus API for fast cloud-based speech synthesis. |
| Uses the diana voice from canopylabs/orpheus-v1-english. |
| No local GPU needed — calls Groq's API endpoint. |
| """ |
| import os |
| import re |
| import time |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
| TEMP_DIR = "/tmp/tts_output" |
| SAMPLE_RATE = 24000 |
|
|
| |
| MODEL = "canopylabs/orpheus-v1-english" |
| VOICE = os.environ.get("GROQ_TTS_VOICE", "diana") |
| RESPONSE_FORMAT = "wav" |
|
|
| |
| _client = None |
| _initialized = False |
| _current_voice = VOICE |
|
|
|
|
| def set_voice(voice_name: str): |
| """Change the TTS voice at runtime.""" |
| global _current_voice |
| _current_voice = voice_name |
| logger.info(f"[{_ts()}] [TTS] Voice set to: {_current_voice}") |
|
|
|
|
| def _ts(): |
| return time.strftime("%H:%M:%S", time.gmtime()) + f".{int(time.time()*1000)%1000:03d}" |
|
|
|
|
| def ensure_temp_dir(): |
| os.makedirs(TEMP_DIR, exist_ok=True) |
| return TEMP_DIR |
|
|
|
|
| def initialize(): |
| """Initialize the Groq client.""" |
| global _client, _initialized |
|
|
| if _initialized: |
| return |
|
|
| t0 = time.time() |
| logger.info(f"[{_ts()}] [TTS] Initializing Groq Orpheus TTS...") |
|
|
| from groq import Groq |
|
|
| api_key = os.environ.get("GROQ_API_KEY") |
| if not api_key: |
| logger.error(f"[{_ts()}] [TTS] GROQ_API_KEY not set!") |
| return |
|
|
| _client = Groq(api_key=api_key) |
| _initialized = True |
| logger.info(f"[{_ts()}] [TTS] ✓ Groq Orpheus ready in {time.time()-t0:.2f}s | voice: {_current_voice} | model: {MODEL}") |
|
|
|
|
| def _clean_text_for_tts(text): |
| """Remove tags and asterisk actions.""" |
| text = re.sub(r'<[^>]+>', '', text) |
| text = re.sub(r'\*[^*]+\*', '', text) |
| text = re.sub(r'\s+', ' ', text).strip() |
| return text |
|
|
|
|
| def generate_audio(text: str, output_filename: str = None) -> str: |
| """ |
| Generate speech audio from text using Groq Orpheus API. |
| Returns path to wav file, or None on failure. |
| """ |
| if not text or not text.strip(): |
| return None |
|
|
| text = _clean_text_for_tts(text) |
| if not text: |
| return None |
|
|
| if not _initialized or _client is None: |
| logger.error(f"[{_ts()}] [TTS] Not initialized!") |
| return None |
|
|
| temp_dir = ensure_temp_dir() |
| if output_filename is None: |
| timestamp = int(time.time() * 1000) |
| output_filename = f"tts_{timestamp}" |
|
|
| if not output_filename.endswith('.wav'): |
| output_path = os.path.join(temp_dir, f"{output_filename}.wav") |
| else: |
| output_path = os.path.join(temp_dir, output_filename) |
|
|
| try: |
| t0 = time.time() |
| logger.info(f"[{_ts()}] [TTS] Generating: {text[:60]}...") |
|
|
| |
| chunks = _split_text(text, max_chars=195) |
| all_audio = [] |
|
|
| for i, chunk in enumerate(chunks): |
| t1 = time.time() |
| response = _client.audio.speech.create( |
| model=MODEL, |
| voice=_current_voice, |
| input=chunk, |
| response_format=RESPONSE_FORMAT, |
| ) |
|
|
| |
| audio_bytes = response.read() |
| all_audio.append(audio_bytes) |
| logger.info(f"[{_ts()}] [TTS] Chunk {i+1}/{len(chunks)}: {len(chunk)} chars → {len(audio_bytes)/1024:.0f}KB in {time.time()-t1:.2f}s") |
|
|
| |
| if len(all_audio) == 1: |
| with open(output_path, "wb") as f: |
| f.write(all_audio[0]) |
| else: |
| _concatenate_wav_files(all_audio, output_path) |
|
|
| file_size = os.path.getsize(output_path) |
| total = time.time() - t0 |
|
|
| |
| duration = 0 |
| try: |
| import subprocess |
| probe = subprocess.run( |
| ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", |
| "-of", "default=noprint_wrappers=1:nokey=1", output_path], |
| capture_output=True, text=True |
| ) |
| duration = float(probe.stdout.strip()) |
| except Exception: |
| pass |
|
|
| logger.info( |
| f"[{_ts()}] [TTS] Saved: {output_path} ({file_size/1024:.0f}KB) " |
| f"| audio: {duration:.1f}s | total: {total:.2f}s" |
| ) |
| return output_path |
|
|
| except Exception as e: |
| logger.error(f"[{_ts()}] [TTS] Error: {e}", exc_info=True) |
| return None |
|
|
|
|
| def _split_text(text: str, max_chars: int = 195) -> list: |
| """Split text into chunks under max_chars, breaking at sentence boundaries.""" |
| if len(text) <= max_chars: |
| return [text] |
|
|
| chunks = [] |
| sentences = re.split(r'(?<=[.!?])\s+', text) |
| current = "" |
|
|
| for sentence in sentences: |
| if len(sentence) > max_chars: |
| |
| if current: |
| chunks.append(current.strip()) |
| current = "" |
| words = sentence.split() |
| for word in words: |
| if len(current) + len(word) + 1 > max_chars: |
| if current: |
| chunks.append(current.strip()) |
| current = word |
| else: |
| current = f"{current} {word}" if current else word |
| elif len(current) + len(sentence) + 1 > max_chars: |
| chunks.append(current.strip()) |
| current = sentence |
| else: |
| current = f"{current} {sentence}" if current else sentence |
|
|
| if current.strip(): |
| chunks.append(current.strip()) |
|
|
| return chunks if chunks else [text] |
|
|
|
|
| def _concatenate_wav_files(audio_chunks: list, output_path: str): |
| """Concatenate multiple WAV byte chunks into a single WAV file using ffmpeg.""" |
| import tempfile |
| import subprocess |
|
|
| |
| temp_files = [] |
| try: |
| for i, chunk_bytes in enumerate(audio_chunks): |
| tf = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) |
| tf.write(chunk_bytes) |
| tf.close() |
| temp_files.append(tf.name) |
|
|
| |
| inputs = [] |
| for f in temp_files: |
| inputs += ["-i", f] |
|
|
| filter_str = "".join(f"[{i}:a]" for i in range(len(temp_files))) + f"concat=n={len(temp_files)}:v=0:a=1[out]" |
|
|
| cmd = ["ffmpeg", "-y"] + inputs + [ |
| "-filter_complex", filter_str, |
| "-map", "[out]", |
| output_path |
| ] |
|
|
| result = subprocess.run(cmd, capture_output=True) |
| if result.returncode != 0: |
| logger.error(f"[{_ts()}] [TTS] ffmpeg concat error: {result.stderr.decode()[:300]}") |
| finally: |
| for f in temp_files: |
| try: |
| os.unlink(f) |
| except OSError: |
| pass |