AvatarChatbot / groq_tts.py
J Z
Update groq_tts.py
8d17527 verified
Raw
History Blame Contribute Delete
6.86 kB
"""
TTS module - Groq Orpheus API for fast cloud-based speech synthesis.
Uses the diana voice from canopylabs/orpheus-v1-english.
No local GPU needed — calls Groq's API endpoint.
"""
import os
import re
import time
import logging
logger = logging.getLogger(__name__)
TEMP_DIR = "/tmp/tts_output"
SAMPLE_RATE = 24000
# Groq Orpheus config
MODEL = "canopylabs/orpheus-v1-english"
VOICE = os.environ.get("GROQ_TTS_VOICE", "diana")
RESPONSE_FORMAT = "wav"
# Singleton
_client = None
_initialized = False
_current_voice = VOICE
def set_voice(voice_name: str):
"""Change the TTS voice at runtime."""
global _current_voice
_current_voice = voice_name
logger.info(f"[{_ts()}] [TTS] Voice set to: {_current_voice}")
def _ts():
return time.strftime("%H:%M:%S", time.gmtime()) + f".{int(time.time()*1000)%1000:03d}"
def ensure_temp_dir():
os.makedirs(TEMP_DIR, exist_ok=True)
return TEMP_DIR
def initialize():
"""Initialize the Groq client."""
global _client, _initialized
if _initialized:
return
t0 = time.time()
logger.info(f"[{_ts()}] [TTS] Initializing Groq Orpheus TTS...")
from groq import Groq
api_key = os.environ.get("GROQ_API_KEY")
if not api_key:
logger.error(f"[{_ts()}] [TTS] GROQ_API_KEY not set!")
return
_client = Groq(api_key=api_key)
_initialized = True
logger.info(f"[{_ts()}] [TTS] ✓ Groq Orpheus ready in {time.time()-t0:.2f}s | voice: {_current_voice} | model: {MODEL}")
def _clean_text_for_tts(text):
"""Remove tags and asterisk actions."""
text = re.sub(r'<[^>]+>', '', text)
text = re.sub(r'\*[^*]+\*', '', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
def generate_audio(text: str, output_filename: str = None) -> str:
"""
Generate speech audio from text using Groq Orpheus API.
Returns path to wav file, or None on failure.
"""
if not text or not text.strip():
return None
text = _clean_text_for_tts(text)
if not text:
return None
if not _initialized or _client is None:
logger.error(f"[{_ts()}] [TTS] Not initialized!")
return None
temp_dir = ensure_temp_dir()
if output_filename is None:
timestamp = int(time.time() * 1000)
output_filename = f"tts_{timestamp}"
if not output_filename.endswith('.wav'):
output_path = os.path.join(temp_dir, f"{output_filename}.wav")
else:
output_path = os.path.join(temp_dir, output_filename)
try:
t0 = time.time()
logger.info(f"[{_ts()}] [TTS] Generating: {text[:60]}...")
# Groq Orpheus has 200 char limit per request — split if needed
chunks = _split_text(text, max_chars=195)
all_audio = []
for i, chunk in enumerate(chunks):
t1 = time.time()
response = _client.audio.speech.create(
model=MODEL,
voice=_current_voice,
input=chunk,
response_format=RESPONSE_FORMAT,
)
# Read the audio bytes
audio_bytes = response.read()
all_audio.append(audio_bytes)
logger.info(f"[{_ts()}] [TTS] Chunk {i+1}/{len(chunks)}: {len(chunk)} chars → {len(audio_bytes)/1024:.0f}KB in {time.time()-t1:.2f}s")
# If single chunk, write directly; if multiple, concatenate WAV data
if len(all_audio) == 1:
with open(output_path, "wb") as f:
f.write(all_audio[0])
else:
_concatenate_wav_files(all_audio, output_path)
file_size = os.path.getsize(output_path)
total = time.time() - t0
# Get duration via ffprobe (more reliable than wave module for 48kHz)
duration = 0
try:
import subprocess
probe = subprocess.run(
["ffprobe", "-v", "quiet", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", output_path],
capture_output=True, text=True
)
duration = float(probe.stdout.strip())
except Exception:
pass
logger.info(
f"[{_ts()}] [TTS] Saved: {output_path} ({file_size/1024:.0f}KB) "
f"| audio: {duration:.1f}s | total: {total:.2f}s"
)
return output_path
except Exception as e:
logger.error(f"[{_ts()}] [TTS] Error: {e}", exc_info=True)
return None
def _split_text(text: str, max_chars: int = 195) -> list:
"""Split text into chunks under max_chars, breaking at sentence boundaries."""
if len(text) <= max_chars:
return [text]
chunks = []
sentences = re.split(r'(?<=[.!?])\s+', text)
current = ""
for sentence in sentences:
if len(sentence) > max_chars:
# Single sentence too long — split at comma or space
if current:
chunks.append(current.strip())
current = ""
words = sentence.split()
for word in words:
if len(current) + len(word) + 1 > max_chars:
if current:
chunks.append(current.strip())
current = word
else:
current = f"{current} {word}" if current else word
elif len(current) + len(sentence) + 1 > max_chars:
chunks.append(current.strip())
current = sentence
else:
current = f"{current} {sentence}" if current else sentence
if current.strip():
chunks.append(current.strip())
return chunks if chunks else [text]
def _concatenate_wav_files(audio_chunks: list, output_path: str):
"""Concatenate multiple WAV byte chunks into a single WAV file using ffmpeg."""
import tempfile
import subprocess
# Write each chunk to a temp file
temp_files = []
try:
for i, chunk_bytes in enumerate(audio_chunks):
tf = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
tf.write(chunk_bytes)
tf.close()
temp_files.append(tf.name)
# Build ffmpeg concat filter
inputs = []
for f in temp_files:
inputs += ["-i", f]
filter_str = "".join(f"[{i}:a]" for i in range(len(temp_files))) + f"concat=n={len(temp_files)}:v=0:a=1[out]"
cmd = ["ffmpeg", "-y"] + inputs + [
"-filter_complex", filter_str,
"-map", "[out]",
output_path
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
logger.error(f"[{_ts()}] [TTS] ffmpeg concat error: {result.stderr.decode()[:300]}")
finally:
for f in temp_files:
try:
os.unlink(f)
except OSError:
pass