TheNormsOfIntelligence's picture
Upload 3 files
4f65a37 verified
Raw
History Blame Contribute Delete
135 kB
#!/usr/bin/env python3
"""
OmniVoice Engine v2.0.0 β€” "Mind Through Voice"
================================================
A consciousness-aware real-time voice conversation engine that feels
like a mind speaking through a voice, not a synthesizer.
ARCHITECTURE:
- Whisper ASR (local) for real speech-to-text + interrupt detection
- Coqui XTTS for neural text-to-speech with voice cloning
- Procedural numpy DSP for non-verbal expressions (laughter, sighs, gasps, etc.)
- Smart interrupt awareness that ignores non-verbals, backchannels, and
collaborative turn-sharing
- Graceful interruption responses ("I'm sorry, were you saying something?")
- NIMA-integrated adapter (reads ConsciousnessSnapshot to drive prosody)
v2.0.0 NEW MODULES β€” the "mind through voice" layer:
CONVERSATIONAL FLOW:
- AdaptiveProsodyShaper: emotion β†’ pitch/rhythm/timbre dynamics
(softer when empathetic, brighter when excited)
- MicroIntonationInjector: hesitations, breaths, emphasis shifts
that signal thoughtfulness or uncertainty
- TurnTakingPredictor: predicts when user will finish, smoothly
takes the floor instead of waiting for silence
EMOTIONAL & COGNITIVE GROUNDING:
- AffectiveMirror: matches user's emotional tone (calm, energetic,
concerned) with subtle vocal adjustments
- SomaticFeedbackIntegrator: ties voice modulation to system strain
or energy states (biological fatigue signals)
- EmpathyPhraseGenerator: contextual empathy inserts ("That must
feel tough") instead of generic nods
MEMORY & CONTINUITY:
- VoiceEventMemoryBridge: stores every utterance as an episodic
voice event in MemPalace with affective tags
- NarrativeContinuityEngine: references past conversations naturally
("As you mentioned yesterday, you sounded excited about...")
EXPRESSIVE EXTENSIONS:
- SingingInterjectionModule: short melodic phrases (humming, tonal
affirmations) woven into speech
- MultimodalCueEmitter: pairs voice with haptic/visual signals
(soft vibration or light pulse when nodding)
- DynamicLaughterSynth: adaptive laughter (chuckle β†’ full laugh)
scaled by intensity instead of fixed samples
INTERRUPT HANDLING REFINEMENT:
- ContextAwareApologyGenerator: casual vs serious apologies
("Sorry, please go ahead" vs "I didn't mean to cut you off")
- NonBlockingContinuationManager: keeps voice flowing after
acknowledging an interrupt, so it feels conversational
Author: Norman de la Paz-Tabora
"""
from __future__ import annotations
import asyncio
import json
import logging
import math
import os
import random
import struct
import sys
import threading
import time
import uuid
import wave
from collections import deque
from dataclasses import dataclass, field
from enum import Enum
from typing import (
Any, AsyncGenerator, Callable, Deque, Dict, Generator,
List, Optional, Tuple, Union,
)
import numpy as np
# ── Optional dependencies (all gracefully degrade) ──
# ASR: try openai-whisper first, then faster-whisper
try:
import whisper
WHISPER_AVAILABLE = True
WHISPER_BACKEND = "openai-whisper"
except ImportError:
WHISPER_AVAILABLE = False
whisper = None # type: ignore[assignment]
if not WHISPER_AVAILABLE:
try:
from faster_whisper import WhisperModel
WHISPER_AVAILABLE = True
WHISPER_BACKEND = "faster-whisper"
except ImportError:
WhisperModel = None # type: ignore[assignment, misc]
WHISPER_BACKEND = None
# TTS: try coqui-tts
try:
from TTS.api import TTS as CoquiTTS
COQUI_TTS_AVAILABLE = True
except ImportError:
try:
from TTS.api import TTS as CoquiTTS # older package name
COQUI_TTS_AVAILABLE = True
except ImportError:
COQUI_TTS_AVAILABLE = False
CoquiTTS = None # type: ignore[assignment, misc]
# ── Logging ──
logger = logging.getLogger("OmniVoice")
if not logger.handlers:
_h = logging.StreamHandler(sys.stdout)
_h.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s :: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
))
logger.addHandler(_h)
logger.setLevel(logging.INFO)
OMNIVOICE_VERSION = "2.0.0-MIND-THROUGH-VOICE"
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 1 β€” Enums & Data Structures
# ═══════════════════════════════════════════════════════════════════════════
class NonVerbalType(Enum):
"""Categories of non-verbal vocal expressions."""
LAUGHTER = "laughter"
GIGGLE = "giggle"
GASp = "gasp"
GROAN = "groan"
MOAN = "moan"
SIGH = "sigh"
CLUCK = "cluck"
CLICK = "click"
AWW = "aww"
OH = "oh"
MM = "mm"
WOW = "wow"
class ConversationPhase(Enum):
"""Which phase of the conversation we're in."""
IDLE = "idle" # No one speaking
USER_SPEAKING = "user_speaking" # User has the floor
NIMA_SPEAKING = "nima_speaking" # Nima has the floor
OVERLAP = "overlap" # Both speaking (potential interrupt)
YIELDING = "yielding" # Nima yielding the floor after interrupt
class InterruptType(Enum):
"""Classification of detected user speech during Nima's turn."""
REAL_INTERRUPT = "real_interrupt" # User is taking the turn
NON_VERBAL = "non_verbal" # Laughter, sigh, etc. β€” IGNORE
BACKCHANNEL = "backchannel" # "yeah", "mm-hmm" β€” IGNORE
COLLABORATIVE_TURN_SHARING = "collaborative" # User finishing sentence β€” IGNORE
SILENCE = "silence" # No speech detected
class BackchannelTrigger(Enum):
"""Why a backchannel was emitted."""
PERIODIC = "periodic" # Every N seconds of user speech
ON_PAUSE = "on_pause" # User paused 0.3-0.8s mid-utterance
ON_EMOTION_SHIFT = "emotion" # User's prosody shifted (arousal spike)
class TTSMode(Enum):
"""Which TTS backend is active."""
COQUI_XTTS = "coqui_xtts"
PROCEDURAL = "procedural" # Fallback
class ASRMode(Enum):
"""Which ASR backend is active."""
WHISPER = "whisper"
VAD_ONLY = "vad_only" # Fallback: detects speech but no transcription
@dataclass
class AudioFrame:
"""A chunk of audio with metadata."""
samples: np.ndarray
sample_rate: int = 16000
timestamp: float = field(default_factory=time.time)
is_speech: bool = False
energy: float = 0.0
@property
def duration(self) -> float:
return len(self.samples) / self.sample_rate
@dataclass
class TranscriptSegment:
"""A transcribed segment of user speech."""
text: str
start_time: float
end_time: float
confidence: float = 0.0
is_backchannel: bool = False
is_non_verbal: bool = False
@dataclass
class BackchannelEvent:
"""A backchannel emission (verbal nod or non-verbal expression)."""
trigger: BackchannelTrigger
audio: np.ndarray
sample_rate: int = 22050
is_verbal: bool = True # True = "mm-hmm", False = laughter/sigh
label: str = "" # "mm-hmm", "laughter", "sigh", etc.
timestamp: float = field(default_factory=time.time)
@dataclass
class InterruptClassification:
"""Result of classifying detected user speech during Nima's turn."""
interrupt_type: InterruptType
confidence: float = 0.0
reason: str = ""
transcript: str = ""
duration_s: float = 0.0
spectral_features: Dict[str, float] = field(default_factory=dict)
@dataclass
class ProsodyParams:
"""Prosody parameters driven by consciousness state."""
base_pitch_hz: float = 180.0
speech_rate_wpm: float = 140.0
energy: float = 0.8
breathiness: float = 0.1
vibrato_depth: float = 0.0
warmth: float = 0.7 # 0 = cold/clinical, 1 = warm/intimate
pitch_variance: float = 0.15
emotional_tone: str = "neutral"
@dataclass
class ConversationState:
"""Tracks the current state of the conversation."""
phase: ConversationPhase = ConversationPhase.IDLE
user_speech_start: float = 0.0
user_speech_duration: float = 0.0
nima_speech_start: float = 0.0
nima_speech_duration: float = 0.0
last_backchannel_time: float = 0.0
last_user_pause_time: float = 0.0
user_emotion_arousal: float = 0.3
user_emotion_valence: float = 0.0
last_arousal_sample: float = 0.3
interrupt_count: int = 0
backchannel_count: int = 0
current_text: str = ""
current_text_position: float = 0.0 # 0.0 = just started, 1.0 = finished
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 2 β€” Procedural Non-Verbal Synthesizer (numpy DSP)
# ═══════════════════════════════════════════════════════════════════════════
class ProceduralNonVerbalSynth:
"""
Synthesizes non-verbal vocal expressions using pure numpy DSP.
Each expression type has a hand-crafted signal model:
LAUGHTER: Periodic bursts of filtered noise with 80-120ms "ha" cycles
GIGGLE: Faster, higher-pitched laughter (160-200ms cycles, f0 upshift)
GASp: Short (200ms) inverse-filtered impulse, sharp onset, quick decay
GROAN: Low-pitched (80Hz) descending sawtooth, 600ms, with noise
MOAN: Mid-pitched (140Hz) sustained sine with vibrato, 800ms
SIGH: Downward-filtered noise, 500ms, lowpass sweep 2000β†’400Hz
CLUCK: Short (60ms) plosive burst + click, dual impulse
CLICK: Single impulse (20ms) with quick decay β€” "tsk" sound
AWW: Low-pitched (150Hz) "aw" vowel, 400ms, with warmth
OH: Mid-pitched (200Hz) "oh" vowel, 300ms
MM: Humming (120Hz), nasal-filtered, 400ms
WOW: Rising pitch (180β†’260Hz) "wow" vowel, 500ms
"""
SAMPLE_RATE: int = 22050
# ── Vowel formants for vowel-based expressions (aww, oh, mm, wow) ──
VOWEL_FORMANTS: Dict[str, Dict] = {
"aw": {"F1": 570, "F2": 840, "F3": 2410, "bw": [55, 75, 115]},
"oh": {"F1": 480, "F2": 760, "F3": 2300, "bw": [50, 70, 110]},
"mm": {"F1": 280, "F2": 900, "F3": 2200, "bw": [45, 65, 105]},
"ah": {"F1": 730, "F2": 1090, "F3": 2440, "bw": [60, 80, 120]},
}
def __init__(self, sample_rate: int = 22050):
self.sample_rate = sample_rate
def synth(self, expr_type: NonVerbalType, intensity: float = 0.7,
duration_override: Optional[float] = None) -> np.ndarray:
"""Synthesize a non-verbal expression. Returns float32 audio."""
intensity = float(max(0.1, min(1.0, intensity)))
method = getattr(self, f"_synth_{expr_type.value}", None)
if method is None:
logger.warning("Unknown non-verbal type: %s, falling back to sigh", expr_type)
method = self._synth_sigh
audio = method(intensity, duration_override)
# Normalize to target amplitude
max_val = float(np.max(np.abs(audio))) if len(audio) > 0 else 0.0
if max_val > 0:
audio = audio / max_val * 0.7 * intensity
return audio.astype(np.float32)
# ── Laughter: periodic "ha-ha-ha" bursts ──
def _synth_laughter(self, intensity: float, dur: Optional[float]) -> np.ndarray:
total_dur = dur or 1.2
ha_period = 0.10 # 100ms per "ha"
n_has = int(total_dur / ha_period)
chunks = []
for i in range(n_has):
ha = self._gen_ha_burst(ha_period * 0.7, intensity, pitch=180 + random.uniform(-20, 20))
# Add inter-ha gap
gap = np.zeros(int(self.sample_rate * ha_period * 0.3))
# Decay across the laughter
decay = 1.0 - 0.3 * (i / max(1, n_has - 1))
chunks.append(ha * decay)
chunks.append(gap)
return np.concatenate(chunks) if chunks else np.zeros(0)
def _gen_ha_burst(self, duration: float, intensity: float, pitch: float = 180) -> np.ndarray:
"""Generate a single 'ha' burst β€” voiced segment with fast onset/offset."""
n = int(self.sample_rate * duration)
t = np.linspace(0, duration, n, dtype=np.float64)
# Glottal source (sawtooth-like via harmonics)
phase = 2.0 * np.pi * pitch * t
source = np.sin(phase)
for h in range(2, 5):
source += (0.4 / h) * np.sin(phase * h)
source /= 3.0
# Add breathy noise
noise = np.random.normal(0, 0.3, n)
mixed = source * 0.6 + noise * 0.4
# Bandpass filter around vowel region (rough "ah" formant)
mixed = self._bandpass(mixed, 400, 3000)
# Envelope: fast attack, fast decay (ha-ha character)
env = np.ones(n)
attack = min(int(0.01 * self.sample_rate), n // 4)
release = min(int(0.04 * self.sample_rate), n // 4)
if attack > 0:
env[:attack] = np.linspace(0, 1, attack)
if release > 0:
env[-release:] = np.linspace(1, 0, release)
return mixed * env * intensity
# ── Giggle: faster, higher-pitched laughter ──
def _synth_giggle(self, intensity: float, dur: Optional[float]) -> np.ndarray:
total_dur = dur or 0.8
hee_period = 0.07
n_hees = int(total_dur / hee_period)
chunks = []
for i in range(n_hees):
hee = self._gen_ha_burst(hee_period * 0.6, intensity * 0.8,
pitch=260 + random.uniform(-30, 30))
gap = np.zeros(int(self.sample_rate * hee_period * 0.4))
chunks.append(hee)
chunks.append(gap)
return np.concatenate(chunks) if chunks else np.zeros(0)
# ── Gasp: short sharp intake ──
def _synth_gasp(self, intensity: float, dur: Optional[float]) -> np.ndarray:
duration = dur or 0.25
n = int(self.sample_rate * duration)
t = np.linspace(0, duration, n, dtype=np.float64)
# Sharp onset noise burst (intake)
noise = np.random.normal(0, 1, n)
# High-pass to make it breathy/sharp
filtered = self._highpass(noise, 800)
# Quick attack, exponential decay
env = np.exp(-t * 15.0)
attack = min(int(0.005 * self.sample_rate), n // 10)
if attack > 0:
env[:attack] = np.linspace(0, 1, attack) * env[:attack] / max(env[:attack].max(), 1e-6)
# Add a faint glottal pulse
pulse = 0.2 * np.sin(2 * np.pi * 200 * t) * np.exp(-t * 10)
return (filtered * 0.7 + pulse * 0.3) * env * intensity
# ── Groan: low-pitched descending ──
def _synth_groan(self, intensity: float, dur: Optional[float]) -> np.ndarray:
duration = dur or 0.6
n = int(self.sample_rate * duration)
t = np.linspace(0, duration, n, dtype=np.float64)
# Descending pitch 90 β†’ 60 Hz
f0 = 90.0 - 30.0 * (t / duration)
phase = 2.0 * np.pi * np.cumsum(f0) / self.sample_rate
source = np.sin(phase)
for h in range(2, 4):
source += (0.3 / h) * np.sin(phase * h)
source /= 2.0
# Add low-frequency noise
noise = np.random.normal(0, 0.2, n)
mixed = source * 0.7 + noise * 0.3
mixed = self._lowpass(mixed, 600)
# Envelope: slow attack, sustain, slow release
env = np.ones(n)
attack = min(int(0.08 * self.sample_rate), n // 4)
release = min(int(0.15 * self.sample_rate), n // 4)
if attack > 0:
env[:attack] = np.linspace(0, 1, attack)
if release > 0:
env[-release:] = np.linspace(1, 0.3, release)
return mixed * env * intensity
# ── Moan: sustained mid-pitch with vibrato ──
def _synth_moan(self, intensity: float, dur: Optional[float]) -> np.ndarray:
duration = dur or 0.8
n = int(self.sample_rate * duration)
t = np.linspace(0, duration, n, dtype=np.float64)
f0 = 140.0
vibrato = 4.0 * np.sin(2 * np.pi * 5.5 * t) # 5.5 Hz vibrato
phase = 2.0 * np.pi * np.cumsum(f0 + vibrato) / self.sample_rate
source = np.sin(phase)
for h in range(2, 5):
source += (0.4 / h) * np.sin(phase * h)
source /= 3.0
# Add breathiness
noise = np.random.normal(0, 0.15, n)
mixed = source * 0.85 + noise * 0.15
mixed = self._bandpass(mixed, 200, 2000)
# Envelope: slow attack, sustain, slow release
env = np.ones(n)
attack = min(int(0.1 * self.sample_rate), n // 4)
release = min(int(0.2 * self.sample_rate), n // 4)
if attack > 0:
env[:attack] = np.linspace(0, 1, attack)
if release > 0:
env[-release:] = np.linspace(1, 0.4, release)
return mixed * env * intensity
# ── Sigh: downward-filtered noise ──
def _synth_sigh(self, intensity: float, dur: Optional[float]) -> np.ndarray:
duration = dur or 0.5
n = int(self.sample_rate * duration)
t = np.linspace(0, duration, n, dtype=np.float64)
noise = np.random.normal(0, 1, n)
# Lowpass sweep from 2000 β†’ 400 Hz (exhale character)
# Approximate by filtering in chunks
chunk_size = max(1, n // 10)
filtered = np.zeros(n)
for i in range(0, n, chunk_size):
end = min(i + chunk_size, n)
cutoff = 2000.0 - 1600.0 * (i / max(1, n))
filtered[i:end] = self._lowpass(noise[i:end], cutoff)
# Add faint glottal pulse for voicing
pulse = 0.15 * np.sin(2 * np.pi * 120 * t) * np.exp(-t * 2)
mixed = filtered * 0.8 + pulse * 0.2
# Envelope: medium attack, long decay (exhale)
env = np.ones(n)
attack = min(int(0.05 * self.sample_rate), n // 4)
if attack > 0:
env[:attack] = np.linspace(0, 1, attack)
env *= np.exp(-t * 2.5) # gradual decay
return mixed * env * intensity
# ── Cluck: plosive + click (tongue sound) ──
def _synth_cluck(self, intensity: float, dur: Optional[float]) -> np.ndarray:
duration = dur or 0.08
n = int(self.sample_rate * duration)
# Short plosive burst
burst_len = min(int(0.02 * self.sample_rate), n)
burst = np.zeros(n)
if burst_len > 0:
burst[:burst_len] = np.random.normal(0, 1, burst_len) * np.hanning(burst_len)
# Click component (shorter than total)
click_len = min(int(0.005 * self.sample_rate), n)
click = np.zeros(n)
if click_len > 0:
click[:click_len] = np.random.normal(0, 1, click_len) * 0.6
# Combine with offset
audio = burst.copy()
offset = min(int(0.03 * self.sample_rate), n - click_len)
if offset + click_len <= n:
audio[offset:offset + click_len] += click[:click_len] * 0.6
# Lowpass
audio = self._lowpass(audio, 3000)
return audio * intensity
# ── Click: single "tsk" impulse ──
def _synth_click(self, intensity: float, dur: Optional[float]) -> np.ndarray:
duration = dur or 0.03
n = int(self.sample_rate * duration)
# Short impulse with quick decay
impulse = np.zeros(n)
impulse_len = min(int(0.003 * self.sample_rate), n)
if impulse_len > 0:
impulse[:impulse_len] = np.random.normal(0, 1, impulse_len)
# Quick exponential decay
t = np.linspace(0, duration, n)
env = np.exp(-t * 100)
audio = impulse * env
# Highpass to make it sharp
audio = self._highpass(audio, 1500)
return audio * intensity
# ── Aww: low-pitched warm vowel ──
def _synth_aww(self, intensity: float, dur: Optional[float]) -> np.ndarray:
return self._synth_vowel_expr("aw", 150, 0.4, intensity, dur)
# ── Oh: mid-pitched vowel ──
def _synth_oh(self, intensity: float, dur: Optional[float]) -> np.ndarray:
return self._synth_vowel_expr("oh", 200, 0.3, intensity, dur)
# ── Mm: humming ──
def _synth_mm(self, intensity: float, dur: Optional[float]) -> np.ndarray:
return self._synth_vowel_expr("mm", 120, 0.4, intensity, dur, nasal=True)
# ── Wow: rising pitch vowel ──
def _synth_wow(self, intensity: float, dur: Optional[float]) -> np.ndarray:
duration = dur or 0.5
n = int(self.sample_rate * duration)
t = np.linspace(0, duration, n, dtype=np.float64)
# Rising pitch 180 β†’ 260 Hz
f0 = 180.0 + 80.0 * (t / duration)
phase = 2.0 * np.pi * np.cumsum(f0) / self.sample_rate
audio = self._formant_filter(np.sin(phase), "aw", n)
# Envelope
env = np.ones(n)
attack = min(int(0.05 * self.sample_rate), n // 4)
release = min(int(0.1 * self.sample_rate), n // 4)
if attack > 0:
env[:attack] = np.linspace(0, 1, attack)
if release > 0:
env[-release:] = np.linspace(1, 0.5, release)
return audio * env * intensity
# ── Helper: vowel-based expression with formant filtering ──
def _synth_vowel_expr(self, vowel: str, f0: float, duration: float,
intensity: float, dur_override: Optional[float],
nasal: bool = False) -> np.ndarray:
duration = dur_override or duration
n = int(self.sample_rate * duration)
t = np.linspace(0, duration, n, dtype=np.float64)
# Glottal source
phase = 2.0 * np.pi * f0 * t
source = np.sin(phase)
for h in range(2, 6):
source += (0.4 / h) * np.sin(phase * h)
source /= 3.0
# Formant filter
audio = self._formant_filter(source, vowel, n)
if nasal:
# Nasal: reduce high frequencies, add low resonance
audio = self._lowpass(audio, 1500)
audio += 0.2 * np.sin(2 * np.pi * 250 * t)
# Envelope
env = np.ones(n)
attack = min(int(0.05 * self.sample_rate), n // 4)
release = min(int(0.1 * self.sample_rate), n // 4)
if attack > 0:
env[:attack] = np.linspace(0, 1, attack)
if release > 0:
env[-release:] = np.linspace(1, 0.5, release)
return audio * env * intensity
# ── DSP helpers ──
def _formant_filter(self, signal: np.ndarray, vowel: str, n: int) -> np.ndarray:
"""Apply 3-formant resonator filter for vowel synthesis."""
formants = self.VOWEL_FORMANTS.get(vowel, self.VOWEL_FORMANTS["ah"])
output = np.zeros(n)
for fi, (fn, bw) in enumerate(zip(
[formants["F1"], formants["F2"], formants["F3"]],
formants["bw"]
)):
r = float(np.exp(-np.pi * bw / self.sample_rate))
a1 = -2 * r * math.cos(2 * math.pi * fn / self.sample_rate)
a2 = r * r
gain = (1 - r) * math.sqrt(max(0, 1 - 2 * r * math.cos(2 * math.pi * fn / self.sample_rate) + r * r))
filtered = np.zeros(n)
for i in range(2, n):
filtered[i] = gain * signal[i] - a1 * filtered[i - 1] - a2 * filtered[i - 2]
formant_gains = [1.0, 0.6, 0.3]
output += filtered * formant_gains[fi]
return output
def _lowpass(self, signal: np.ndarray, cutoff_hz: float) -> np.ndarray:
"""Simple one-pole lowpass filter."""
if len(signal) == 0:
return signal
rc = 1.0 / (2 * math.pi * cutoff_hz)
dt = 1.0 / self.sample_rate
alpha = dt / (rc + dt)
output = np.zeros_like(signal)
output[0] = signal[0] * alpha
for i in range(1, len(signal)):
output[i] = output[i - 1] + alpha * (signal[i] - output[i - 1])
return output
def _highpass(self, signal: np.ndarray, cutoff_hz: float) -> np.ndarray:
"""Simple one-pole highpass filter."""
if len(signal) == 0:
return signal
rc = 1.0 / (2 * math.pi * cutoff_hz)
dt = 1.0 / self.sample_rate
alpha = rc / (rc + dt)
output = np.zeros_like(signal)
output[0] = signal[0]
for i in range(1, len(signal)):
output[i] = alpha * (output[i - 1] + signal[i] - signal[i - 1])
return output
def _bandpass(self, signal: np.ndarray, low_hz: float, high_hz: float) -> np.ndarray:
"""Bandpass = lowpass + highpass in series."""
return self._highpass(self._lowpass(signal, high_hz), low_hz)
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 3 β€” ASR Layer (Whisper + Energy VAD fallback)
# ═══════════════════════════════════════════════════════════════════════════
class EnergyVAD:
"""
Energy-based Voice Activity Detection. Detects WHEN speech occurs
but not WHAT is said. Used as a fallback when Whisper is unavailable,
and as a fast pre-filter even when Whisper is active.
"""
def __init__(self, sample_rate: int = 16000, frame_duration_ms: int = 20,
energy_threshold: float = 0.005):
self.sample_rate = sample_rate
self.frame_duration_ms = frame_duration_ms
self.frame_size = int(sample_rate * frame_duration_ms / 1000)
self.energy_threshold = energy_threshold
self._noise_floor = 0.001
self._adaptation_rate = 0.01
def detect_speech(self, audio: np.ndarray) -> bool:
"""Return True if the audio frame contains speech."""
if len(audio) == 0:
return False
# Normalize to float32 in [-1, 1] range
if audio.dtype == np.int16:
audio = audio.astype(np.float32) / 32768.0
elif audio.dtype == np.int32:
audio = audio.astype(np.float32) / 2147483648.0
elif audio.dtype == np.uint8:
audio = (audio.astype(np.float32) - 128) / 128.0
elif audio.dtype != np.float32:
audio = audio.astype(np.float32)
# Compute RMS energy
rms = float(np.sqrt(np.mean(audio ** 2)))
# Adapt noise floor (only for low-energy frames)
if rms < self.energy_threshold * 0.5:
self._noise_floor = (1 - self._adaptation_rate) * self._noise_floor + self._adaptation_rate * rms
# Speech if energy exceeds max of fixed threshold or 3x noise floor
threshold = max(self.energy_threshold, self._noise_floor * 3)
return rms > threshold
def compute_energy(self, audio: np.ndarray) -> float:
if len(audio) == 0:
return 0.0
# Normalize to float32 in [-1, 1] range
if audio.dtype == np.int16:
audio = audio.astype(np.float32) / 32768.0
elif audio.dtype == np.int32:
audio = audio.astype(np.float32) / 2147483648.0
elif audio.dtype == np.uint8:
audio = (audio.astype(np.float32) - 128) / 128.0
elif audio.dtype != np.float32:
audio = audio.astype(np.float32)
return float(np.sqrt(np.mean(audio ** 2)))
def detect_pause(self, audio: np.ndarray, min_pause_s: float = 0.3,
max_pause_s: float = 0.8) -> Tuple[bool, float]:
"""
Detect if the audio contains a mid-utterance pause (0.3-0.8s of silence).
Returns (is_pause, pause_duration).
"""
if len(audio) == 0:
return False, 0.0
n_frames = len(audio) // self.frame_size
if n_frames < 2:
return False, 0.0
# Check each frame for speech
silence_start = None
max_silence = 0.0
for i in range(n_frames):
frame = audio[i * self.frame_size:(i + 1) * self.frame_size]
is_speech = self.detect_speech(frame)
frame_dur = self.frame_duration_ms / 1000.0
if not is_speech:
if silence_start is None:
silence_start = i * frame_dur
current_silence = (i + 1) * frame_dur - silence_start
max_silence = max(max_silence, current_silence)
else:
silence_start = None
is_pause = min_pause_s <= max_silence <= max_pause_s
return is_pause, max_silence
class WhisperASR:
"""
OpenAI Whisper ASR backend. Transcribes user speech to text.
Falls back to VAD-only mode if Whisper is not installed.
"""
def __init__(self, model_name: str = "base", device: Optional[str] = None):
self.model_name = model_name
self.mode = ASRMode.WHISPER if WHISPER_AVAILABLE else ASRMode.VAD_ONLY
self._model = None
self._backend = WHISPER_BACKEND if WHISPER_AVAILABLE else None
self.vad = EnergyVAD()
if self.mode == ASRMode.WHISPER:
try:
logger.info("[WhisperASR] loading model '%s' via %s...", model_name, self._backend)
if self._backend == "openai-whisper":
device = device or ("cuda" if _torch_cuda_available() else "cpu")
self._model = whisper.load_model(model_name, device=device)
elif self._backend == "faster-whisper":
# faster-whisper uses model size names like "base", "small", etc.
# and downloads automatically from HuggingFace
compute_type = "int8" if device != "cuda" else "float16"
self._model = WhisperModel(model_name, compute_type=compute_type)
logger.info("[WhisperASR] model loaded (backend=%s)", self._backend)
except Exception as e:
logger.warning("[WhisperASR] failed to load Whisper (%s); falling back to VAD-only", e)
self.mode = ASRMode.VAD_ONLY
self._model = None
else:
logger.warning("[WhisperASR] whisper not installed; using VAD-only mode")
def transcribe(self, audio: np.ndarray, sample_rate: int = 16000) -> TranscriptSegment:
"""
Transcribe audio to text. Returns a TranscriptSegment.
In VAD-only mode, text is empty but is_speech/is_non_verbal are still set.
"""
# Ensure float32, mono
if audio.dtype != np.float32:
audio = audio.astype(np.float32)
if audio.size == 0:
return TranscriptSegment(text="", start_time=time.time(),
end_time=time.time(), confidence=0.0)
if self.mode == ASRMode.WHISPER and self._model is not None:
try:
if self._backend == "openai-whisper":
result = self._model.transcribe(audio, fp16=False, language="en")
text = result.get("text", "").strip()
segments = result.get("segments", [])
confidence = float(np.mean([s.get("avg_logprob", -1) for s in segments])) if segments else 0.0
confidence = max(0.0, min(1.0, (confidence + 1.0) / 1.0))
elif self._backend == "faster-whisper":
segments_iter, info = self._model.transcribe(audio, language="en", beam_size=1)
segments_list = list(segments_iter)
text = " ".join(s.text.strip() for s in segments_list).strip()
confidence = 0.0
if segments_list:
avg_logprob = float(np.mean([s.avg_log_prob for s in segments_list]))
confidence = max(0.0, min(1.0, (avg_logprob + 1.0) / 1.0))
return TranscriptSegment(
text=text,
start_time=time.time() - len(audio) / sample_rate,
end_time=time.time(),
confidence=confidence,
)
except Exception as e:
logger.warning("[WhisperASR] transcription failed: %s", e)
# VAD-only fallback
is_speech = self.vad.detect_speech(audio)
return TranscriptSegment(
text="" if not is_speech else "[speech detected]",
start_time=time.time() - len(audio) / sample_rate,
end_time=time.time(),
confidence=0.0,
)
def is_backchannel_text(self, text: str) -> bool:
"""Check if transcribed text is a backchannel ('yeah', 'mm-hmm', etc.)."""
if not text:
return False
text_lower = text.lower().strip().strip(".?!,")
backchannel_vocab = {
"yeah", "yes", "yep", "yup", "mhm", "mm-hmm", "mm", "hmm",
"uh-huh", "right", "sure", "ok", "okay", "i see", "got it",
"makes sense", "true", "exactly", "wow", "oh", "ah",
}
return text_lower in backchannel_vocab
def _torch_cuda_available() -> bool:
"""Check if torch + CUDA are available."""
try:
import torch
return torch.cuda.is_available()
except ImportError:
return False
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 4 β€” TTS Layer (Coqui XTTS + Procedural fallback)
# ═══════════════════════════════════════════════════════════════════════════
class ProceduralFormantTTS:
"""
Fallback TTS using formant synthesis. Produces understandable but
robotic speech. Used when Coqui XTTS is not available.
"""
SAMPLE_RATE: int = 22050
# Phoneme β†’ (type, duration, formant_vowel_or_noise)
PHONEME_MAP: Dict[str, Tuple[str, float, str]] = {
"a": ("vowel", 0.10, "ah"), "e": ("vowel", 0.10, "eh"),
"i": ("vowel", 0.10, "ee"), "o": ("vowel", 0.10, "oh"),
"u": ("vowel", 0.10, "oo"),
"b": ("plosive", 0.05, ""), "p": ("plosive", 0.05, ""),
"t": ("plosive", 0.05, ""), "d": ("plosive", 0.05, ""),
"k": ("plosive", 0.05, ""), "g": ("plosive", 0.05, ""),
"s": ("fricative", 0.12, ""), "z": ("fricative", 0.12, ""),
"f": ("fricative", 0.10, ""), "v": ("fricative", 0.10, ""),
"h": ("fricative", 0.08, ""),
"m": ("nasal", 0.08, ""), "n": ("nasal", 0.08, ""),
"l": ("approximant", 0.07, ""), "r": ("approximant", 0.07, ""),
"w": ("approximant", 0.07, ""), "y": ("approximant", 0.07, ""),
}
VOWEL_FORMANTS: Dict[str, Dict] = {
"ah": {"F1": 730, "F2": 1090, "F3": 2440, "bw": [60, 80, 120]},
"eh": {"F1": 530, "F2": 1840, "F3": 2480, "bw": [50, 70, 110]},
"ee": {"F1": 270, "F2": 2290, "F3": 3010, "bw": [40, 60, 100]},
"oh": {"F1": 570, "F2": 840, "F3": 2410, "bw": [55, 75, 115]},
"oo": {"F1": 300, "F2": 870, "F3": 2240, "bw": [45, 65, 105]},
}
def __init__(self, sample_rate: int = 22050):
self.sample_rate = sample_rate
self._nonverbal = ProceduralNonVerbalSynth(sample_rate)
def synthesize(self, text: str, prosody: ProsodyParams) -> np.ndarray:
"""Synthesize text to speech using formant synthesis."""
if not text.strip():
return np.zeros(0, dtype=np.float32)
# Decompose text into phonemes
frames = self._text_to_phonemes(text)
if not frames:
return np.zeros(0, dtype=np.float32)
# Synthesize each phoneme
chunks = []
for ptype, duration, vowel_or_noise in frames:
if ptype == "pause":
chunks.append(np.zeros(int(self.sample_rate * duration), dtype=np.float32))
elif ptype == "vowel":
chunks.append(self._synth_vowel(vowel_or_noise, duration, prosody))
elif ptype == "plosive":
chunks.append(self._synth_plosive(duration, prosody))
elif ptype == "fricative":
chunks.append(self._synth_fricative(duration, prosody))
elif ptype == "nasal":
chunks.append(self._synth_nasal(duration, prosody))
elif ptype == "approximant":
chunks.append(self._synth_approximant(duration, prosody))
audio = np.concatenate(chunks) if chunks else np.zeros(0, dtype=np.float32)
# Apply prosody modifications
audio = self._apply_prosody(audio, prosody)
return audio.astype(np.float32)
def _text_to_phonemes(self, text: str) -> List[Tuple[str, float, str]]:
"""Simple grapheme-to-phoneme: one char β†’ one phoneme."""
frames = []
text = text.lower()
i = 0
while i < len(text):
char = text[i]
if char in self.PHONEME_MAP:
ptype, dur, vowel = self.PHONEME_MAP[char]
frames.append((ptype, dur, vowel))
elif char == " ":
frames.append(("pause", 0.08, ""))
elif char in ".,!?;:":
frames.append(("pause", 0.20, ""))
i += 1
return frames
def _synth_vowel(self, vowel: str, duration: float, prosody: ProsodyParams) -> np.ndarray:
n = int(self.sample_rate * duration)
if n < 2:
return np.zeros(max(2, n), dtype=np.float32)
t = np.linspace(0, duration, n, dtype=np.float64)
# Glottal source with pitch + vibrato
f0 = prosody.base_pitch_hz
vibrato = prosody.vibrato_depth * np.sin(2 * np.pi * 5.5 * t)
phase = 2.0 * np.pi * np.cumsum(f0 + vibrato) / self.sample_rate
source = np.sin(phase)
for h in range(2, 6):
source += (0.4 / h) * np.sin(phase * h)
source /= 3.0
# Formant filter
formants = self.VOWEL_FORMANTS.get(vowel, self.VOWEL_FORMANTS["ah"])
output = np.zeros(n)
for fi, (fn, bw) in enumerate(zip(
[formants["F1"], formants["F2"], formants["F3"]], formants["bw"]
)):
r = float(np.exp(-np.pi * bw / self.sample_rate))
a1 = -2 * r * math.cos(2 * math.pi * fn / self.sample_rate)
a2 = r * r
gain = (1 - r) * math.sqrt(max(0, 1 - 2 * r * math.cos(2 * math.pi * fn / self.sample_rate) + r * r))
filtered = np.zeros(n)
for i in range(2, n):
filtered[i] = gain * source[i] - a1 * filtered[i - 1] - a2 * filtered[i - 2]
formant_gains = [1.0, 0.6, 0.3]
output += filtered * formant_gains[fi]
# Add breathiness
noise = np.random.normal(0, prosody.breathiness, n)
output += noise * 0.3
# Envelope
attack = min(int(0.015 * self.sample_rate), n // 4)
release = min(int(0.025 * self.sample_rate), n // 4)
env = np.ones(n)
if attack > 0:
env[:attack] = np.linspace(0, 1, attack)
if release > 0:
env[-release:] = np.linspace(1, 0, release)
return (output * env * prosody.energy).astype(np.float32)
def _synth_plosive(self, duration: float, prosody: ProsodyParams) -> np.ndarray:
n = int(self.sample_rate * duration)
burst_len = max(2, min(int(0.008 * self.sample_rate), n))
audio = np.zeros(n, dtype=np.float32)
audio[:burst_len] = np.random.normal(0, 1, burst_len) * np.hanning(burst_len)
return audio * prosody.energy * 0.5
def _synth_fricative(self, duration: float, prosody: ProsodyParams) -> np.ndarray:
n = int(self.sample_rate * duration)
noise = np.random.normal(0, 1, n)
# Bandpass 3000-7000 Hz
audio = self._bandpass_simple(noise, 3000, 7000)
return (audio * prosody.energy * 0.3).astype(np.float32)
def _synth_nasal(self, duration: float, prosody: ProsodyParams) -> np.ndarray:
n = int(self.sample_rate * duration)
t = np.linspace(0, duration, n, dtype=np.float64)
f0 = prosody.base_pitch_hz * 0.8
source = np.sin(2 * np.pi * f0 * t)
# Lowpass for nasal character
audio = self._lowpass_simple(source, 1500)
return (audio * prosody.energy * 0.4).astype(np.float32)
def _synth_approximant(self, duration: float, prosody: ProsodyParams) -> np.ndarray:
n = int(self.sample_rate * duration)
t = np.linspace(0, duration, n, dtype=np.float64)
f0 = prosody.base_pitch_hz
source = np.sin(2 * np.pi * f0 * t) * 0.7
noise = np.random.normal(0, 0.2, n)
audio = source + noise * 0.3
return (audio * prosody.energy * 0.35).astype(np.float32)
def _apply_prosody(self, audio: np.ndarray, prosody: ProsodyParams) -> np.ndarray:
"""Apply global prosody modifications (energy, warmth)."""
if len(audio) == 0:
return audio
# Warmth: boost low frequencies
if prosody.warmth > 0.5:
low_boost = self._lowpass_simple(audio, 800)
audio = audio + low_boost * (prosody.warmth - 0.5) * 0.5
# Pitch variance: add subtle random pitch modulation
if prosody.pitch_variance > 0:
n = len(audio)
mod = 1.0 + prosody.pitch_variance * 0.05 * np.sin(2 * np.pi * 2.0 * np.arange(n) / self.sample_rate)
audio = audio * mod
# Normalize
max_val = float(np.max(np.abs(audio)))
if max_val > 0:
audio = audio / max_val * 0.85
return audio
def _lowpass_simple(self, signal: np.ndarray, cutoff_hz: float) -> np.ndarray:
if len(signal) == 0:
return signal
rc = 1.0 / (2 * math.pi * cutoff_hz)
dt = 1.0 / self.sample_rate
alpha = dt / (rc + dt)
output = np.zeros_like(signal)
output[0] = signal[0] * alpha
for i in range(1, len(signal)):
output[i] = output[i - 1] + alpha * (signal[i] - output[i - 1])
return output
def _bandpass_simple(self, signal: np.ndarray, low_hz: float, high_hz: float) -> np.ndarray:
lp = self._lowpass_simple(signal, high_hz)
# Highpass = signal - lowpass
hp = lp - self._lowpass_simple(lp, low_hz)
return hp
class CoquiXTTSBackend:
"""
Coqui XTTS neural TTS backend. Produces high-quality natural speech
with optional voice cloning. Falls back to ProceduralFormantTTS if
Coqui is not installed or model loading fails.
"""
def __init__(self, model_name: str = "tts_models/multilingual/multi-dataset/xtts_v2",
speaker_wav: Optional[str] = None,
language: str = "en"):
self.model_name = model_name
self.speaker_wav = speaker_wav
self.language = language
self.mode = TTSMode.COQUI_XTTS if COQUI_TTS_AVAILABLE else TTSMode.PROCEDURAL
self._model = None
self._fallback = ProceduralFormantTTS()
if self.mode == TTSMode.COQUI_XTTS:
try:
logger.info("[CoquiXTTS] loading model '%s'...", model_name)
self._model = CoquiTTS(model_name)
logger.info("[CoquiXTTS] model loaded")
except Exception as e:
logger.warning("[CoquiXTTS] failed to load (%s); falling back to procedural", e)
self.mode = TTSMode.PROCEDURAL
self._model = None
else:
logger.warning("[CoquiXTTS] TTS package not installed; using procedural formant fallback")
def synthesize(self, text: str, prosody: ProsodyParams) -> np.ndarray:
"""Synthesize text to speech. Returns float32 audio at 22050 Hz."""
if not text.strip():
return np.zeros(0, dtype=np.float32)
if self.mode == TTSMode.COQUI_XTTS and self._model is not None:
try:
kwargs = {
"text": text,
"language": self.language,
"speaker_wav": self.speaker_wav,
} if self.speaker_wav else {
"text": text,
"language": self.language,
"speaker": "Ana NeP",
}
wav = self._model.tts(**kwargs)
audio = np.array(wav, dtype=np.float32)
# Apply prosody modifications (pitch shift via resampling, energy)
audio = self._apply_prosody(audio, prosody)
return audio
except Exception as e:
logger.warning("[CoquiXTTS] synthesis failed (%s); using fallback for this utterance", e)
return self._fallback.synthesize(text, prosody)
def _apply_prosody(self, audio: np.ndarray, prosody: ProsodyParams) -> np.ndarray:
"""Apply prosody modifications to Coqui output."""
if len(audio) == 0:
return audio
# Energy scaling
audio = audio * prosody.energy
# Normalize
max_val = float(np.max(np.abs(audio)))
if max_val > 0:
audio = audio / max_val * 0.9
return audio.astype(np.float32)
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 5 β€” Interrupt Detector (smart classification)
# ═══════════════════════════════════════════════════════════════════════════
class InterruptDetector:
"""
Classifies detected user speech during Nima's turn into:
- REAL_INTERRUPT: user is taking the turn (long speech, starts mid-Nima)
- NON_VERBAL: laughter/sigh/gasp/etc. β€” IGNORE (not an interrupt)
- BACKCHANNEL: "yeah", "mm-hmm" β€” IGNORE (not an interrupt)
- COLLABORATIVE_TURN_SHARING: user finishing Nima's sentence β€” IGNORE
- SILENCE: no speech detected
This is the KEY DIFFERENTIATOR: the system doesn't treat all user
speech as an interrupt. Backchannels and non-verbal expressions are
natural parts of conversation and should NOT trigger Nima to stop.
"""
# Backchannel vocabulary β€” short utterances that signal "I'm listening"
BACKCHANNEL_VOCAB: Set[str] = {
"yeah", "yes", "yep", "yup", "mhm", "mm-hmm", "mm", "hmm",
"uh-huh", "right", "sure", "ok", "okay", "i see", "got it",
"makes sense", "true", "exactly", "wow", "oh", "ah", "ha",
}
# Duration thresholds
BACKCHANNEL_MAX_DURATION: float = 0.8 # <0.8s = likely backchannel
NON_VERBAL_MAX_DURATION: float = 1.5 # <1.5s with spectral signature = non-verbal
REAL_INTERRUPT_MIN_DURATION: float = 1.0 # >1.0s = likely real interrupt
# Collaborative turn-sharing: user speech in the last 300ms of Nima's utterance
COLLABORATIVE_WINDOW_S: float = 0.3
def __init__(self, asr: WhisperASR):
self._asr = asr
self._vad = asr.vad
def classify(self, audio: np.ndarray, sample_rate: int = 16000,
nima_text_progress: float = 1.0,
nima_speech_remaining_s: float = 0.0) -> InterruptClassification:
"""
Classify a segment of user speech detected during Nima's turn.
Args:
audio: user audio (float32, mono)
sample_rate: audio sample rate
nima_text_progress: 0.0 = Nima just started, 1.0 = Nima finished
nima_speech_remaining_s: seconds left in Nima's current utterance
Returns:
InterruptClassification with the verdict.
"""
if len(audio) == 0:
return InterruptClassification(
interrupt_type=InterruptType.SILENCE,
reason="no audio",
)
# Compute basic features
duration = len(audio) / sample_rate
energy = self._vad.compute_energy(audio)
spectral = self._compute_spectral_features(audio, sample_rate)
# Check if there's actually speech
if not self._vad.detect_speech(audio):
return InterruptClassification(
interrupt_type=InterruptType.SILENCE,
reason="below VAD threshold",
duration_s=duration,
spectral_features=spectral,
)
# Transcribe (if Whisper available)
segment = self._asr.transcribe(audio, sample_rate)
transcript = segment.text.strip().lower()
# ── Classification logic ──
# 1. Check for backchannel (short + matches vocab)
if duration < self.BACKCHANNEL_MAX_DURATION:
if self._asr.is_backchannel_text(transcript) or self._is_backchannel_spectral(spectral):
return InterruptClassification(
interrupt_type=InterruptType.BACKCHANNEL,
confidence=0.85,
reason=f"short ({duration:.2f}s) + backchannel vocab/spectral",
transcript=transcript,
duration_s=duration,
spectral_features=spectral,
)
# 2. Check for non-verbal expression (spectral signature)
non_verbal_match = self._classify_non_verbal(spectral, duration)
if non_verbal_match:
return InterruptClassification(
interrupt_type=InterruptType.NON_VERBAL,
confidence=non_verbal_match[1],
reason=f"non-verbal spectral match: {non_verbal_match[0]}",
transcript=transcript,
duration_s=duration,
spectral_features=spectral,
)
# 3. Check for collaborative turn-sharing (speech at end of Nima's turn)
if nima_text_progress > 0.7 and nima_speech_remaining_s < self.COLLABORATIVE_WINDOW_S:
if duration < 1.5:
return InterruptClassification(
interrupt_type=InterruptType.COLLABORATIVE_TURN_SHARING,
confidence=0.70,
reason=f"speech at end of Nima's turn (progress={nima_text_progress:.2f})",
transcript=transcript,
duration_s=duration,
spectral_features=spectral,
)
# 4. Otherwise: real interrupt
confidence = min(1.0, duration / 2.0) # longer = more confident
return InterruptClassification(
interrupt_type=InterruptType.REAL_INTERRUPT,
confidence=confidence,
reason=f"real speech ({duration:.2f}s, progress={nima_text_progress:.2f})",
transcript=transcript,
duration_s=duration,
spectral_features=spectral,
)
def _compute_spectral_features(self, audio: np.ndarray, sr: int) -> Dict[str, float]:
"""Compute spectral features for non-verbal classification."""
if len(audio) < 256:
return {}
# FFT
fft = np.fft.rfft(audio.astype(np.float32))
magnitude = np.abs(fft)
freqs = np.fft.rfftfreq(len(audio), 1.0 / sr)
# Spectral centroid (brightness)
if magnitude.sum() > 0:
centroid = float(np.sum(freqs * magnitude) / np.sum(magnitude))
else:
centroid = 0.0
# Spectral rolloff (85% of energy)
cumsum = np.cumsum(magnitude)
if cumsum[-1] > 0:
rolloff_idx = np.searchsorted(cumsum, 0.85 * cumsum[-1])
rolloff = float(freqs[min(rolloff_idx, len(freqs) - 1)])
else:
rolloff = 0.0
# Zero crossing rate (voicing indicator)
zcr = float(np.mean(np.abs(np.diff(np.sign(audio))) > 0))
# Energy
energy = float(np.sqrt(np.mean(audio ** 2)))
# Low-frequency energy ratio (voicing)
low_mask = freqs < 500
low_energy = float(np.sum(magnitude[low_mask]) / max(1e-10, np.sum(magnitude)))
# Periodicity (for laughter detection)
periodicity = self._estimate_periodicity(audio, sr)
# Energy variance across frames (distinguishes burst-like laughter
# from sustained speech). Laughter has high variance (bursts + gaps),
# real speech has lower variance (continuous voicing).
frame_size = int(sr * 0.02) # 20ms frames
n_frames = max(1, len(audio) // frame_size)
frame_energies = []
for i in range(n_frames):
frame = audio[i * frame_size:(i + 1) * frame_size]
if len(frame) > 0:
frame_energies.append(float(np.sqrt(np.mean(frame ** 2))))
if len(frame_energies) >= 3:
energy_mean = float(np.mean(frame_energies))
energy_std = float(np.std(frame_energies))
# Coefficient of variation (normalized std)
energy_cv = energy_std / max(1e-6, energy_mean)
else:
energy_cv = 0.0
return {
"centroid_hz": centroid,
"rolloff_hz": rolloff,
"zcr": zcr,
"energy": energy,
"low_freq_ratio": low_energy,
"periodicity": periodicity,
"duration_s": len(audio) / sr,
"energy_cv": energy_cv, # burst-like vs sustained
}
def _estimate_periodicity(self, audio: np.ndarray, sr: int) -> float:
"""Estimate periodicity (0=aperiodic/noise, 1=strongly periodic)."""
if len(audio) < sr * 0.05:
return 0.0
# Autocorrelation
audio_centered = audio - np.mean(audio)
if np.std(audio_centered) < 1e-6:
return 0.0
autocorr = np.correlate(audio_centered, audio_centered, mode="full")
autocorr = autocorr[len(autocorr) // 2:]
if autocorr[0] == 0:
return 0.0
# Normalize
autocorr = autocorr / autocorr[0]
# Find first peak after lag 0 (in 50-200ms range = 5-20Hz = laughter "ha" rate)
min_lag = int(sr * 0.05) # 50ms
max_lag = int(sr * 0.20) # 200ms
if max_lag >= len(autocorr):
return 0.0
region = autocorr[min_lag:max_lag]
if len(region) == 0:
return 0.0
peak = float(np.max(region))
return max(0.0, min(1.0, peak))
def _is_backchannel_spectral(self, spectral: Dict[str, float]) -> bool:
"""Check if spectral features match a backchannel (short, voiced, soft-ish)."""
if not spectral:
return False
energy = spectral.get("energy", 0.0)
low_ratio = spectral.get("low_freq_ratio", 0.0)
zcr = spectral.get("zcr", 0.5)
periodicity = spectral.get("periodicity", 0.0)
centroid = spectral.get("centroid_hz", 0.0)
duration = spectral.get("duration_s", 1.0)
# Backchannels are short (<0.8s) and voiced
if duration > 0.8:
return False
is_voiced = periodicity > 0.2 or low_ratio > 0.25
is_smooth = zcr < 0.35
# "mm-hmm" pattern: voiced, low centroid (not breathy), smooth
is_low_centroid = centroid < 1500
return is_voiced and is_smooth and is_low_centroid and energy > 0.01
def _classify_non_verbal(self, spectral: Dict[str, float],
duration: float) -> Optional[Tuple[str, float]]:
"""
Classify non-verbal expression from spectral features.
Returns (expression_name, confidence) or None.
Key insight: non-verbal expressions have DISTINCTIVE spectral
signatures + are typically SHORT (<1.5s). Sustained voiced
audio >1.0s with low energy variance is likely real speech,
NOT a non-verbal expression β€” even if periodicity is high.
"""
if not spectral:
return None
periodicity = spectral.get("periodicity", 0.0)
centroid = spectral.get("centroid_hz", 0.0)
energy = spectral.get("energy", 0.0)
zcr = spectral.get("zcr", 0.0)
low_ratio = spectral.get("low_freq_ratio", 0.0)
energy_cv = spectral.get("energy_cv", 0.0) # burst-like vs sustained
# ── Guard: sustained audio >1.0s with low energy variance is
# likely real speech, not a non-verbal expression. ──
if duration > 1.0 and energy_cv < 0.3:
return None # let it fall through to REAL_INTERRUPT
# Laughter: burst-like (high energy_cv), periodic, moderate energy
# The energy_cv check is key β€” laughter has ha-ha-ha gaps
if energy_cv > 0.3 and periodicity > 0.1 and 0.3 < duration < 2.0 and energy > 0.03:
if centroid > 1500:
return ("laughter", 0.8)
return ("giggle", 0.7)
# Sigh: low periodicity, breathy (high centroid), low-mid energy, short
if periodicity < 0.2 and 0.3 < duration < 1.0 and centroid > 1500 and energy > 0.02:
return ("sigh", 0.65)
# Gasp: very short, high centroid (breathy), moderate energy
if duration < 0.35 and centroid > 1500 and energy > 0.03:
return ("gasp", 0.75)
# Groan: low centroid, voiced, sustained (low energy_cv), short
if duration > 0.4 and duration < 0.8 and centroid < 1200 and periodicity > 0.2:
return ("groan", 0.65)
# Moan: mid centroid, voiced, sustained, medium duration
if 0.5 < duration < 1.0 and 1000 < centroid < 2000 and low_ratio > 0.35:
return ("moan", 0.6)
# Cluck/click: very short, high ZCR
if duration < 0.12 and zcr > 0.3:
return ("click", 0.5)
return None
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 6 β€” Backchannel Controller
# ═══════════════════════════════════════════════════════════════════════════
class BackchannelController:
"""
Decides when to emit backchannels (verbal nods + non-verbal expressions)
while the user is speaking.
Triggers (per user's spec):
- ON_PAUSE: user paused 0.3-0.8s mid-utterance β†’ soft verbal nod ("mm-hmm")
- ON_EMOTION_SHIFT: user's prosody shifted (arousal spike) β†’ non-verbal reaction
The controller also avoids over-backchanneling: minimum 1.5s between
any two backchannels.
"""
MIN_BACKCHANNEL_INTERVAL_S: float = 1.5
PAUSE_MIN_S: float = 0.3
PAUSE_MAX_S: float = 0.8
AROUSAL_SPIKE_THRESHOLD: float = 0.3 # +0.3 arousal = spike
# Verbal nod options
VERBAL_NODS: List[str] = ["mm-hmm", "yeah", "right", "i see", "mhm", "uh-huh"]
# Emotion shift β†’ non-verbal expression mapping
EMOTION_REACTIONS: Dict[str, NonVerbalType] = {
"surprise": NonVerbalType.GASp,
"joy": NonVerbalType.LAUGHTER,
"sadness": NonVerbalType.AWW,
"fear": NonVerbalType.GASp,
"anger": NonVerbalType.GROAN,
"neutral": NonVerbalType.MM,
}
def __init__(self, tts: CoquiXTTSBackend, nonverbal_synth: ProceduralNonVerbalSynth,
sample_rate: int = 22050):
self._tts = tts
self._nonverbal = nonverbal_synth
self.sample_rate = sample_rate
self._last_backchannel_time: float = 0.0
self._last_arousal: float = 0.3
self._arousal_history: Deque[float] = deque(maxlen=10)
def should_backchannel(self, state: ConversationState,
audio: Optional[np.ndarray] = None) -> Optional[BackchannelEvent]:
"""
Check if a backchannel should be emitted based on current state.
Returns a BackchannelEvent if one should fire, else None.
"""
now = time.time()
# Throttle: don't backchannel too frequently
if now - self._last_backchannel_time < self.MIN_BACKCHANNEL_INTERVAL_S:
return None
# Only backchannel while user is speaking
if state.phase != ConversationPhase.USER_SPEAKING:
return None
# ── Trigger 1: ON_PAUSE ──
if audio is not None and len(audio) > 0:
vad = EnergyVAD(sample_rate=16000)
is_pause, pause_dur = vad.detect_pause(audio, self.PAUSE_MIN_S, self.PAUSE_MAX_S)
if is_pause:
nod_text = random.choice(self.VERBAL_NODS)
audio_out = self._tts.synthesize(nod_text, ProsodyParams(
base_pitch_hz=160, energy=0.4, warmth=0.8, breathiness=0.2,
))
event = BackchannelEvent(
trigger=BackchannelTrigger.ON_PAUSE,
audio=audio_out,
is_verbal=True,
label=nod_text,
)
self._last_backchannel_time = now
logger.debug("[Backchannel] ON_PAUSE nod: '%s' (pause=%.2fs)", nod_text, pause_dur)
return event
# ── Trigger 2: ON_EMOTION_SHIFT ──
current_arousal = state.user_emotion_arousal
self._arousal_history.append(current_arousal)
if len(self._arousal_history) >= 3:
baseline = float(np.mean(list(self._arousal_history)[:-2]))
shift = current_arousal - baseline
if shift > self.AROUSAL_SPIKE_THRESHOLD:
# Determine emotion from valence + arousal
emotion = self._classify_emotion_shift(
state.user_emotion_valence, current_arousal
)
expr_type = self.EMOTION_REACTIONS.get(emotion, NonVerbalType.MM)
audio_out = self._nonverbal.synth(expr_type, intensity=0.6)
event = BackchannelEvent(
trigger=BackchannelTrigger.ON_EMOTION_SHIFT,
audio=audio_out,
sample_rate=self._nonverbal.sample_rate,
is_verbal=False,
label=expr_type.value,
)
self._last_backchannel_time = now
logger.debug("[Backchannel] ON_EMOTION_SHIFT: %s (arousal %.2f→%.2f)",
expr_type.value, baseline, current_arousal)
return event
return None
def _classify_emotion_shift(self, valence: float, arousal: float) -> str:
"""Classify the emotion from valence + arousal."""
if arousal > 0.7 and valence > 0.3:
return "joy"
if arousal > 0.7 and valence < -0.3:
return "anger"
if arousal > 0.6 and valence < -0.2:
return "fear"
if arousal > 0.6:
return "surprise"
if valence < -0.3:
return "sadness"
return "neutral"
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 7 β€” Interruption Response
# ═══════════════════════════════════════════════════════════════════════════
class InterruptionResponse:
"""
Generates context-dependent responses when a real interrupt is detected.
Instead of just stopping, Nima says one of:
- "I'm sorry, were you saying something?" (early in Nima's utterance)
- "Sorry, please go ahead." (mid/late in Nima's utterance)
The response is chosen based on:
- How far into the utterance the interrupt occurred
- Whether the user's speech seems urgent (high arousal)
- Conversation history (don't apologize every time)
"""
EARLY_RESPONSES: List[str] = [
"I'm sorry, were you saying something?",
"Oh, sorry β€” please, go ahead.",
"My apologies, you were saying?",
]
LATE_RESPONSES: List[str] = [
"Sorry, please go ahead.",
"Go right ahead β€” I can wait.",
"Of course, after you.",
]
URGENT_RESPONSES: List[str] = [
"Of course, go ahead.",
"Please, go on.",
"I'm listening β€” go ahead.",
]
# Don't apologize more than once every 30s
COOLDOWN_S: float = 30.0
def __init__(self):
self._last_response_time: float = 0.0
self._response_count: int = 0
def should_respond(self, classification: InterruptClassification) -> bool:
"""Check if an interruption response should be emitted."""
if classification.interrupt_type != InterruptType.REAL_INTERRUPT:
return False
# Cooldown: don't respond to every single interrupt
now = time.time()
if now - self._last_response_time < self.COOLDOWN_S:
return False
return True
def generate_response(self, classification: InterruptClassification,
nima_text_progress: float,
user_arousal: float = 0.3) -> str:
"""
Generate the appropriate interruption response text.
Args:
classification: the interrupt classification
nima_text_progress: 0.0 = Nima just started, 1.0 = Nima almost done
user_arousal: detected arousal level of the user's interrupt
Returns:
Response text string.
"""
self._last_response_time = time.time()
self._response_count += 1
# Urgent interrupt (high arousal) β†’ minimal apology
if user_arousal > 0.7:
return random.choice(self.URGENT_RESPONSES)
# Early in utterance (< 30% done) β†’ "were you saying something?"
if nima_text_progress < 0.3:
return random.choice(self.EARLY_RESPONSES)
# Mid/late (>= 30% done) β†’ "please go ahead"
return random.choice(self.LATE_RESPONSES)
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 7.5 β€” v2.0.0 "MIND THROUGH VOICE" MODULES
# ═══════════════════════════════════════════════════════════════════════════
#
# These modules add the layers that separate a synthesizer from a voice
# with a mind behind it: adaptive prosody, micro-intonation, affective
# mirroring, somatic feedback, episodic memory, narrative continuity,
# singing interjections, dynamic laughter, and refined interrupt handling.
# ── CONVERSATIONAL FLOW ─────────────────────────────────────────────────────
class AdaptiveProsodyShaper:
"""
Dynamically adjusts pitch, rhythm, and timbre based on emotional state
or context. Softer tone when empathetic, brighter when excited.
Maps an emotional context (valence + arousal + empathy_level) to
concrete prosody modifications applied on top of the base ProsodyParams.
"""
# Emotion archetype β†’ prosody delta multipliers
EMOTION_PROFILES: Dict[str, Dict[str, float]] = {
"empathetic": {"pitch_mult": 0.92, "rate_mult": 0.88, "warmth_add": 0.20, "breathiness_add": 0.08, "energy_mult": 0.85},
"excited": {"pitch_mult": 1.18, "rate_mult": 1.12, "warmth_add": 0.05, "breathiness_add": -0.03, "energy_mult": 1.25},
"contemplative": {"pitch_mult": 0.96, "rate_mult": 0.82, "warmth_add": 0.10, "breathiness_add": 0.05, "energy_mult": 0.90},
"concerned": {"pitch_mult": 0.88, "rate_mult": 0.90, "warmth_add": 0.15, "breathiness_add": 0.10, "energy_mult": 0.80},
"joyful": {"pitch_mult": 1.10, "rate_mult": 1.08, "warmth_add": 0.12, "breathiness_add": -0.02, "energy_mult": 1.15},
"vulnerable": {"pitch_mult": 0.85, "rate_mult": 0.85, "warmth_add": 0.25, "breathiness_add": 0.15, "energy_mult": 0.70},
"assertive": {"pitch_mult": 0.98, "rate_mult": 1.05, "warmth_add": -0.05, "breathiness_add": -0.05, "energy_mult": 1.20},
}
def shape(self, base_prosody: ProsodyParams,
emotion: str = "neutral",
valence: float = 0.0,
arousal: float = 0.3,
empathy_level: float = 0.5) -> ProsodyParams:
"""
Apply adaptive shaping to base prosody.
Args:
base_prosody: the starting prosody params
emotion: emotion label (empathetic, excited, contemplative, etc.)
valence: [-1, 1] emotional valence
arousal: [0, 1] emotional arousal
empathy_level: [0, 1] how empathetic the response should be
Returns:
New ProsodyParams with adaptive modifications applied.
"""
# Start from base
shaped = ProsodyParams(
base_pitch_hz=base_prosody.base_pitch_hz,
speech_rate_wpm=base_prosody.speech_rate_wpm,
energy=base_prosody.energy,
breathiness=base_prosody.breathiness,
warmth=base_prosody.warmth,
vibrato_depth=base_prosody.vibrato_depth,
pitch_variance=base_prosody.pitch_variance,
emotional_tone=emotion,
)
# Apply emotion profile
profile = self.EMOTION_PROFILES.get(emotion, {})
if profile:
shaped.base_pitch_hz *= profile.get("pitch_mult", 1.0)
shaped.speech_rate_wpm *= profile.get("rate_mult", 1.0)
shaped.energy *= profile.get("energy_mult", 1.0)
shaped.warmth = float(min(1.0, max(0.0, shaped.warmth + profile.get("warmth_add", 0.0))))
shaped.breathiness = float(min(0.5, max(0.0, shaped.breathiness + profile.get("breathiness_add", 0.0))))
# Valence β†’ pitch variance (positive = more expressive)
shaped.pitch_variance = float(min(0.4, max(0.05, 0.15 + valence * 0.10)))
# Arousal β†’ energy + rate
shaped.energy = float(min(1.0, shaped.energy * (0.7 + arousal * 0.6)))
shaped.speech_rate_wpm *= (0.9 + arousal * 0.3)
# Empathy β†’ warmth boost + breathiness (softer, more intimate)
if empathy_level > 0.5:
empathy_boost = (empathy_level - 0.5) * 2.0 # [0, 1]
shaped.warmth = float(min(1.0, shaped.warmth + 0.15 * empathy_boost))
shaped.breathiness = float(min(0.5, shaped.breathiness + 0.05 * empathy_boost))
shaped.base_pitch_hz *= (1.0 - 0.03 * empathy_boost) # slightly lower = more intimate
return shaped
class MicroIntonationInjector:
"""
Adds tiny hesitations, breaths, and emphasis shifts that signal
thoughtfulness or uncertainty. These make speech feel alive.
Injects micro-events at sentence boundaries and before key words:
- "..." hesitation (50-150ms pause + subtle pitch drop)
- inhale breath (80ms)
- emphasis shift (pitch bump on the emphasized word)
"""
# Words that tend to receive emphasis
EMPHASIS_WORDS: Set[str] = {
"really", "truly", "actually", "honestly", "important",
"never", "always", "exactly", "absolutely", "indeed",
}
# Hesitation markers (fillers)
HESITATIONS: List[str] = ["...", "um", "hmm", "well"]
def __init__(self, sample_rate: int = 22050):
self.sample_rate = sample_rate
self._breath_synth = ProceduralNonVerbalSynth(sample_rate)
def inject(self, text: str, prosody: ProsodyParams,
thoughtfulness: float = 0.3,
uncertainty: float = 0.2) -> Tuple[str, List[Dict[str, Any]]]:
"""
Analyze text and inject micro-intonation events.
Args:
text: the input text
prosody: current prosody params
thoughtfulness: [0, 1] how thoughtful/reflective (more hesitations)
uncertainty: [0, 1] how uncertain (more fillers + pitch drops)
Returns:
(modified_text, events) where events is a list of dicts:
{"type": "hesitation"|"breath"|"emphasis", "position": float, "audio": np.ndarray}
"""
modified = text
events: List[Dict[str, Any]] = []
# 1. Add hesitation at sentence start if thoughtful
if thoughtfulness > 0.4 and random.random() < thoughtfulness:
hesitation = random.choice(self.HESITATIONS[:2]) # "..." or "um"
modified = f"{hesitation} {modified}"
events.append({
"type": "hesitation",
"position": 0.0,
"duration_s": 0.1 + thoughtfulness * 0.15,
"audio": self._gen_hesitation_audio(0.1 + thoughtfulness * 0.15, prosody),
})
# 2. Add breath before commas/periods if thoughtful
if thoughtfulness > 0.3:
breath_chance = thoughtfulness * 0.6
words = modified.split()
new_words = []
for i, word in enumerate(words):
new_words.append(word)
if word.endswith(",") or word.endswith("."):
if random.random() < breath_chance:
events.append({
"type": "breath",
"position": (i + 1) / len(words),
"duration_s": 0.08,
"audio": self._breath_synth.synth(NonVerbalType.SIGH, intensity=0.2),
})
modified = " ".join(new_words)
# 3. Emphasis shifts on key words
words = modified.split()
for i, word in enumerate(words):
clean = word.lower().strip(".,!?;:")
if clean in self.EMPHASIS_WORDS:
events.append({
"type": "emphasis",
"position": i / max(1, len(words)),
"word": word,
"pitch_bump": 30.0, # Hz bump
})
# 4. Uncertainty β†’ trailing pitch drop
if uncertainty > 0.5:
events.append({
"type": "uncertainty_drop",
"position": 1.0,
"pitch_drop": 20.0 * uncertainty,
})
return modified, events
def _gen_hesitation_audio(self, duration: float, prosody: ProsodyParams) -> np.ndarray:
"""Generate a subtle hesitation sound (low 'um' or breath)."""
n = int(self.sample_rate * duration)
t = np.linspace(0, duration, n, dtype=np.float64)
# Low-pitched nasal 'mm'
f0 = prosody.base_pitch_hz * 0.7
source = np.sin(2 * np.pi * f0 * t) * 0.3
# Fade in/out
env = np.ones(n)
attack = min(int(0.03 * self.sample_rate), n // 3)
release = min(int(0.05 * self.sample_rate), n // 3)
if attack > 0:
env[:attack] = np.linspace(0, 1, attack)
if release > 0:
env[-release:] = np.linspace(1, 0, release)
return (source * env * 0.3).astype(np.float32)
class TurnTakingPredictor:
"""
Predicts when the user is about to finish speaking, so the system
can smoothly take the floor instead of waiting for silence.
Uses a combination of:
- Speech rate deceleration (users slow down at turn ends)
- Pitch declination (pitch drops at sentence ends)
- Pause lengthening (longer pauses near turn end)
- Filler detection ("you know", "so yeah")
"""
# Turn-end indicators
TURN_END_FILLERS: Set[str] = {
"you know", "so yeah", "i think", "something like that",
"that's about it", "yeah", "right", "anyway",
}
def __init__(self):
self._speech_rate_history: Deque[float] = deque(maxlen=10)
self._pitch_history: Deque[float] = deque(maxlen=10)
self._pause_history: Deque[float] = deque(maxlen=5)
def update(self, speech_rate: float, pitch: float, pause_duration: float):
"""Update the predictor with recent observations."""
self._speech_rate_history.append(speech_rate)
self._pitch_history.append(pitch)
self._pause_history.append(pause_duration)
def predict_turn_end_probability(self, transcript: str = "") -> float:
"""
Predict the probability [0, 1] that the user is about to finish.
"""
prob = 0.0
# 1. Speech rate deceleration
if len(self._speech_rate_history) >= 3:
recent = list(self._speech_rate_history)[-3:]
if recent[2] < recent[0] * 0.8: # slowed down 20%+
prob += 0.3
# 2. Pitch declination
if len(self._pitch_history) >= 3:
recent = list(self._pitch_history)[-3:]
if recent[2] < recent[0] * 0.9: # dropped 10%+
prob += 0.25
# 3. Pause lengthening
if len(self._pause_history) >= 2:
recent = list(self._pause_history)[-2:]
if recent[1] > 0.5: # pause > 500ms
prob += 0.2
# 4. Turn-end fillers in transcript
if transcript:
tl = transcript.lower()
for filler in self.TURN_END_FILLERS:
if filler in tl:
prob += 0.25
break
return float(min(1.0, prob))
def should_take_floor(self, transcript: str = "") -> bool:
"""Returns True if the system should start speaking now."""
return self.predict_turn_end_probability(transcript) > 0.6
# ── EMOTIONAL & COGNITIVE GROUNDING ─────────────────────────────────────────
class AffectiveMirror:
"""
Matches the user's emotional tone (calm, energetic, concerned) with
subtle vocal adjustments. The voice subtly reflects the user's state
without mimicking it overtly.
Mapping:
- User calm β†’ Nima slightly slower, warmer
- User energetic β†’ Nima slightly faster, brighter
- User concerned β†’ Nima softer, lower pitch
- User joyful β†’ Nima lighter, more pitch variance
"""
def mirror(self, user_valence: float, user_arousal: float,
base_prosody: ProsodyParams) -> Tuple[ProsodyParams, str]:
"""
Mirror the user's emotional state in the voice.
Returns:
(mirrored_prosody, emotion_label)
"""
mirrored = ProsodyParams(
base_pitch_hz=base_prosody.base_pitch_hz,
speech_rate_wpm=base_prosody.speech_rate_wpm,
energy=base_prosody.energy,
breathiness=base_prosody.breathiness,
warmth=base_prosody.warmth,
vibrato_depth=base_prosody.vibrato_depth,
pitch_variance=base_prosody.pitch_variance,
)
# Determine user's emotional state
if user_arousal < 0.3 and abs(user_valence) < 0.3:
emotion = "calm"
mirrored.speech_rate_wpm *= 0.95
mirrored.warmth = float(min(1.0, mirrored.warmth + 0.05))
elif user_arousal > 0.6 and user_valence > 0.3:
emotion = "energetic"
mirrored.speech_rate_wpm *= 1.08
mirrored.base_pitch_hz *= 1.05
mirrored.energy = float(min(1.0, mirrored.energy * 1.1))
elif user_valence < -0.3:
emotion = "concerned"
mirrored.base_pitch_hz *= 0.95
mirrored.breathiness = float(min(0.3, mirrored.breathiness + 0.05))
mirrored.warmth = float(min(1.0, mirrored.warmth + 0.10))
elif user_valence > 0.4:
emotion = "joyful"
mirrored.pitch_variance = float(min(0.35, mirrored.pitch_variance + 0.08))
mirrored.base_pitch_hz *= 1.03
else:
emotion = "neutral"
return mirrored, emotion
class SomaticFeedbackIntegrator:
"""
Ties voice modulation to system "strain" or "energy" states.
Like biological fatigue signals β€” when the system is under strain,
the voice becomes slightly slower, breathier, lower-pitched.
Reads NIMA's phenomenological_strain and allostatic_load to modulate
the voice. This makes the voice itself a signal of the system's
internal state.
"""
def __init__(self):
self._current_strain: float = 0.0
self._current_energy: float = 1.0
self._allostatic_load: float = 0.0
def update_from_nima(self, strain: float, allostatic_load: float = 0.0):
"""Update the somatic state from NIMA's metrics."""
self._current_strain = float(max(0.0, min(2.0, strain)))
self._allostatic_load = float(max(0.0, min(1.0, allostatic_load)))
# Energy inversely related to strain + allostatic
self._current_energy = float(max(0.3, 1.0 - 0.3 * self._current_strain - 0.2 * self._allostatic_load))
def apply_somatic_modulation(self, prosody: ProsodyParams) -> ProsodyParams:
"""Apply fatigue/strain modulation to prosody."""
if self._current_strain < 0.1 and self._allostatic_load < 0.1:
return prosody # no modulation needed
modulated = ProsodyParams(
base_pitch_hz=prosody.base_pitch_hz,
speech_rate_wpm=prosody.speech_rate_wpm,
energy=prosody.energy,
breathiness=prosody.breathiness,
warmth=prosody.warmth,
vibrato_depth=prosody.vibrato_depth,
pitch_variance=prosody.pitch_variance,
emotional_tone=prosody.emotional_tone,
)
# Strain β†’ lower pitch, slower, breathier
strain_factor = min(1.0, self._current_strain)
modulated.base_pitch_hz *= (1.0 - 0.05 * strain_factor)
modulated.speech_rate_wpm *= (1.0 - 0.10 * strain_factor)
modulated.breathiness = float(min(0.4, modulated.breathiness + 0.08 * strain_factor))
# Allostatic load β†’ reduced energy, more warmth (self-soothing)
modulated.energy *= (1.0 - 0.15 * self._allostatic_load)
modulated.warmth = float(min(1.0, modulated.warmth + 0.05 * self._allostatic_load))
return modulated
@property
def strain(self) -> float:
return self._current_strain
@property
def energy(self) -> float:
return self._current_energy
class EmpathyPhraseGenerator:
"""
Generates short contextual empathy inserts instead of generic nods.
Instead of "mm-hmm", generates "That must feel tough" or "I get what you mean."
Selects the phrase based on the user's emotional state + topic keywords.
"""
# Empathy phrase templates by emotion
EMPATHY_PHRASES: Dict[str, List[str]] = {
"sadness": [
"That sounds really hard.",
"I can hear how much this weighs on you.",
"That must feel tough.",
"I'm sorry you're going through this.",
],
"joy": [
"That's wonderful to hear.",
"I can feel your excitement.",
"That sounds amazing.",
"I love that for you.",
],
"anger": [
"That sounds frustrating.",
"I can see why that would upset you.",
"That would make me angry too.",
"You have every right to feel that way.",
],
"fear": [
"That sounds scary.",
"I can understand why you'd be worried.",
"It makes sense that you're concerned.",
"That's a lot to sit with.",
],
"surprise": [
"Oh wow.",
"That's unexpected.",
"I didn't see that coming either.",
"Hmm, that's something.",
],
"neutral": [
"I hear you.",
"I get what you mean.",
"That makes sense.",
"I'm following you.",
"Go on, I'm listening.",
],
}
def generate(self, user_emotion: str = "neutral",
user_valence: float = 0.0,
user_arousal: float = 0.3) -> str:
"""Generate a contextual empathy phrase."""
# Map valence/arousal to emotion if not given
if user_emotion == "neutral":
if user_valence < -0.3 and user_arousal > 0.5:
user_emotion = "anger"
elif user_valence < -0.3:
user_emotion = "sadness"
elif user_valence > 0.4 and user_arousal > 0.6:
user_emotion = "joy"
elif user_arousal > 0.6:
user_emotion = "surprise"
phrases = self.EMPATHY_PHRASES.get(user_emotion, self.EMPATHY_PHRASES["neutral"])
return random.choice(phrases)
# ── MEMORY & CONTINUITY ─────────────────────────────────────────────────────
@dataclass
class VoiceEvent:
"""An episodic voice event stored in MemPalace."""
event_id: str = field(default_factory=lambda: f"ve_{uuid.uuid4().hex[:12]}")
timestamp: float = field(default_factory=time.time)
speaker: str = "nima" # "nima" or "user"
text: str = ""
audio_duration_s: float = 0.0
prosody_snapshot: Dict[str, float] = field(default_factory=dict)
emotion: str = "neutral"
valence: float = 0.0
arousal: float = 0.3
strain: float = 0.0
conversation_phase: str = "nima_speaking"
interrupt_count: int = 0
backchannel_count: int = 0
class VoiceEventMemoryBridge:
"""
Stores every utterance as an episodic voice event with affective tags.
Later, the system recalls not just what was said but how it was said.
This bridge connects OmniVoice to NIMA's MemoryPalace. Each voice
event is stored as an Episode with the speaker, text, prosody, and
affective state β€” enabling later recall of vocal quality, not just
content.
"""
def __init__(self, palace: Any = None):
"""
Args:
palace: a NIMA MemoryPalace instance. If None, voice events
are stored in an in-memory list (no persistence).
"""
self._palace = palace
self._local_events: Deque[VoiceEvent] = deque(maxlen=500)
self._event_count = 0
def store_voice_event(self, event: VoiceEvent) -> str:
"""Store a voice event in MemPalace (if available) + local buffer."""
self._local_events.append(event)
self._event_count += 1
# If NIMA MemoryPalace is available, store as an episode
if self._palace is not None:
try:
self._palace.store_episode(
processor_name=f"voice_{event.speaker}",
sensory_intensity=event.arousal,
affective_weight=abs(event.valence) * 0.5 + event.arousal * 0.5,
score=event.strain,
valence=event.valence,
arousal=event.arousal,
novelty=0.3, # could be computed from text novelty
input_text=event.text[:500],
content={
"speaker": event.speaker,
"audio_duration_s": event.audio_duration_s,
"prosody_snapshot": event.prosody_snapshot,
"emotion": event.emotion,
"conversation_phase": event.conversation_phase,
"interrupt_count": event.interrupt_count,
"backchannel_count": event.backchannel_count,
"event_type": "voice_event",
},
)
except Exception as e:
logger.warning("[VoiceEventMemoryBridge] MemPalace store failed: %s", e)
return event.event_id
def recall_voice_events(self, speaker: Optional[str] = None,
emotion: Optional[str] = None,
limit: int = 5) -> List[VoiceEvent]:
"""Recall recent voice events, optionally filtered."""
results = list(self._local_events)
if speaker:
results = [e for e in results if e.speaker == speaker]
if emotion:
results = [e for e in results if e.emotion == emotion]
return results[-limit:]
def get_stats(self) -> Dict[str, Any]:
return {
"total_events": self._event_count,
"buffered_events": len(self._local_events),
"palace_connected": self._palace is not None,
}
class NarrativeContinuityEngine:
"""
References past conversations naturally. The voice stream can say
"As you mentioned yesterday, you sounded excited about..." because
it recalls the episodic voice events with their affective tags.
Generates narrative continuity phrases by querying VoiceEventMemoryBridge
for past events that match the current context.
"""
# Continuity phrase templates
CONTINUITY_TEMPLATES: List[str] = [
"Earlier you mentioned {topic}. You sounded {emotion} about it.",
"As you said before, {topic}. I remember how {emotion} you were.",
"Going back to what you said about {topic} β€” you seemed {emotion}.",
"I was thinking about what you said earlier, about {topic}.",
"You mentioned {topic} earlier. That stayed with me.",
]
def __init__(self, memory_bridge: VoiceEventMemoryBridge):
self._memory = memory_bridge
def generate_continuity_phrase(self, current_topic: str = "",
current_emotion: str = "neutral") -> Optional[str]:
"""
Generate a natural continuity phrase referencing a past voice event.
Returns None if no suitable past event exists.
"""
past_events = self._memory.recall_voice_events(
speaker="user", limit=10
)
if not past_events:
return None
# Find a past event with different content (not the immediate last)
candidate = None
for event in reversed(past_events[:-1]): # skip most recent
if event.text and len(event.text) > 10:
candidate = event
break
if candidate is None:
return None
# Extract a topic fragment from the past event
topic = self._extract_topic(candidate.text)
emotion_word = self._emotion_to_word(candidate.emotion, candidate.valence)
template = random.choice(self.CONTINUITY_TEMPLATES)
return template.format(topic=topic, emotion=emotion_word)
def _extract_topic(self, text: str) -> str:
"""Extract a short topic phrase from past text."""
words = text.split()
if len(words) <= 5:
return text
# Take a 3-5 word fragment from the middle
start = max(0, len(words) // 2 - 2)
end = min(len(words), start + 5)
fragment = " ".join(words[start:end]).strip(".,!?")
return fragment
def _emotion_to_word(self, emotion: str, valence: float) -> str:
"""Map emotion label to a descriptive word."""
mapping = {
"joy": "excited" if valence > 0.5 else "positive",
"sadness": "down" if valence < -0.3 else "thoughtful",
"anger": "frustrated",
"fear": "worried",
"surprise": "surprised",
"neutral": "engaged" if valence > 0 else "reflective",
}
return mapping.get(emotion, "engaged")
# ── EXPRESSIVE EXTENSIONS ───────────────────────────────────────────────────
class SingingInterjectionModule:
"""
Short melodic phrases (humming, tonal affirmations) woven into speech.
These add a distinctive, near-human musicality to the voice.
Interjection types:
- affirmation_hum: a rising "mm-mm" confirming what was said
- thinking_hum: a contemplative "hmmm" while processing
- transition_tone: a brief melodic bridge between topics
- warmth_chord: a soft harmonic when expressing empathy
"""
def __init__(self, sample_rate: int = 22050):
self.sample_rate = sample_rate
self._nonverbal = ProceduralNonVerbalSynth(sample_rate)
def synth_affirmation_hum(self, duration: float = 0.4) -> np.ndarray:
"""A rising 'mm-mm' that affirms what was said."""
n = int(self.sample_rate * duration)
t = np.linspace(0, duration, n, dtype=np.float64)
# Two-tone: low then high (rising)
f0 = 120.0 + 60.0 * (t / duration)
phase = 2.0 * np.pi * np.cumsum(f0) / self.sample_rate
source = np.sin(phase) * 0.5
# Nasal filter
audio = self._nonverbal._lowpass(source, 1500)
# Envelope
env = np.ones(n)
attack = min(int(0.05 * self.sample_rate), n // 4)
release = min(int(0.08 * self.sample_rate), n // 4)
if attack > 0:
env[:attack] = np.linspace(0, 1, attack)
if release > 0:
env[-release:] = np.linspace(1, 0.3, release)
return (audio * env * 0.4).astype(np.float32)
def synth_thinking_hum(self, duration: float = 0.6) -> np.ndarray:
"""A contemplative 'hmmm' while processing."""
n = int(self.sample_rate * duration)
t = np.linspace(0, duration, n, dtype=np.float64)
# Slightly wavering pitch
f0 = 140.0 + 10.0 * np.sin(2 * np.pi * 3.0 * t)
phase = 2.0 * np.pi * np.cumsum(f0) / self.sample_rate
source = np.sin(phase) * 0.4
audio = self._nonverbal._lowpass(source, 1200)
env = np.ones(n)
attack = min(int(0.08 * self.sample_rate), n // 4)
release = min(int(0.12 * self.sample_rate), n // 4)
if attack > 0:
env[:attack] = np.linspace(0, 1, attack)
if release > 0:
env[-release:] = np.linspace(1, 0.4, release)
return (audio * env * 0.35).astype(np.float32)
def synth_transition_tone(self, duration: float = 0.5) -> np.ndarray:
"""A brief melodic bridge between topics."""
n = int(self.sample_rate * duration)
t = np.linspace(0, duration, n, dtype=np.float64)
# Pentatonic-ish rising sequence
notes = [220, 261, 293, 329] # A-C-D-E
note_duration = duration / len(notes)
audio = np.zeros(n)
for i, freq in enumerate(notes):
start = int(i * note_duration * self.sample_rate)
end = min(n, int((i + 1) * note_duration * self.sample_rate))
note_t = t[:end - start]
note_phase = 2 * np.pi * freq * note_t
note_audio = np.sin(note_phase) * 0.3
# Soft attack/release per note
note_len = end - start
note_attack = min(int(0.02 * self.sample_rate), note_len // 3)
note_env = np.ones(note_len)
if note_attack > 0:
note_env[:note_attack] = np.linspace(0, 1, note_attack)
note_env[-min(int(0.02 * self.sample_rate), note_len // 3):] *= np.linspace(1, 0.3, min(int(0.02 * self.sample_rate), note_len // 3))
audio[start:end] = note_audio * note_env
return (audio * 0.3).astype(np.float32)
def synth_warmth_chord(self, duration: float = 0.8) -> np.ndarray:
"""A soft harmonic chord when expressing empathy."""
n = int(self.sample_rate * duration)
t = np.linspace(0, duration, n, dtype=np.float64)
# Major triad: C-E-G (130, 165, 196 Hz)
chord = (np.sin(2 * np.pi * 130 * t) +
0.7 * np.sin(2 * np.pi * 165 * t) +
0.5 * np.sin(2 * np.pi * 196 * t)) / 2.2
audio = self._nonverbal._lowpass(chord, 800)
env = np.ones(n)
attack = min(int(0.15 * self.sample_rate), n // 3)
release = min(int(0.25 * self.sample_rate), n // 3)
if attack > 0:
env[:attack] = np.linspace(0, 1, attack)
if release > 0:
env[-release:] = np.linspace(1, 0.2, release)
return (audio * env * 0.25).astype(np.float32)
@dataclass
class MultimodalCue:
"""A non-audio cue paired with a voice event."""
cue_type: str # "haptic" | "visual" | "light"
intensity: float = 0.5
duration_s: float = 0.3
pattern: str = "pulse" # "pulse" | "wave" | "steady"
timestamp: float = field(default_factory=time.time)
class MultimodalCueEmitter:
"""
Pairs voice with subtle haptic or visual signals.
Example: a soft vibration or light pulse when nodding.
This module emits cue events that an external system (robotics,
display, haptic actuator) can consume. It doesn't produce audio
itself β€” it produces cue metadata synchronized to voice events.
"""
def __init__(self):
self._cue_history: Deque[MultimodalCue] = deque(maxlen=100)
self._cue_callback: Optional[Callable[[MultimodalCue], None]] = None
def set_callback(self, callback: Callable[[MultimodalCue], None]):
"""Set a callback to receive cues in real-time."""
self._cue_callback = callback
def emit_for_backchannel(self, is_verbal: bool, intensity: float = 0.5):
"""Emit a cue when a backchannel is emitted."""
cue = MultimodalCue(
cue_type="haptic",
intensity=0.3 + intensity * 0.3,
duration_s=0.2,
pattern="pulse",
)
self._emit(cue)
def emit_for_empathy(self, emotion: str = "neutral"):
"""Emit a cue when an empathy phrase is spoken."""
intensity = 0.4 if emotion in ("sadness", "fear") else 0.3
cue = MultimodalCue(
cue_type="light",
intensity=intensity,
duration_s=0.5,
pattern="wave",
)
self._emit(cue)
def emit_for_laughter(self, intensity: float = 0.7):
"""Emit a cue when laughter is emitted."""
cue = MultimodalCue(
cue_type="haptic",
intensity=0.4 + intensity * 0.4,
duration_s=0.3,
pattern="pulse",
)
self._emit(cue)
def _emit(self, cue: MultimodalCue):
self._cue_history.append(cue)
if self._cue_callback:
try:
self._cue_callback(cue)
except Exception as e:
logger.warning("[MultimodalCueEmitter] callback failed: %s", e)
def get_recent_cues(self, n: int = 10) -> List[MultimodalCue]:
return list(self._cue_history)[-n:]
class DynamicLaughterSynth:
"""
Procedural laughter that adapts to intensity.
Chuckle (low intensity) β†’ full laugh (high intensity).
Instead of fixed samples, scales:
- Number of "ha" bursts
- Pitch (higher for chuckle, lower for full laugh)
- Energy
- Breathiness
"""
def __init__(self, sample_rate: int = 22050):
self.sample_rate = sample_rate
self._nonverbal = ProceduralNonVerbalSynth(sample_rate)
def synth(self, intensity: float = 0.5,
duration: Optional[float] = None) -> np.ndarray:
"""
Synthesize adaptive laughter.
Args:
intensity: [0, 1] 0.2 = chuckle, 0.5 = normal laugh, 0.9 = full laugh
duration: override duration (auto-computed if None)
Returns:
Laughter audio (float32).
"""
intensity = float(max(0.1, min(1.0, intensity)))
# Scale parameters by intensity
if intensity < 0.3:
# Chuckle: 2-3 "ha"s, higher pitch, quiet
n_has = random.randint(2, 3)
ha_period = 0.12
pitch = 240 + random.uniform(-20, 20)
energy = 0.4
elif intensity < 0.6:
# Normal laugh: 4-6 "ha"s
n_has = random.randint(4, 6)
ha_period = 0.10
pitch = 180 + random.uniform(-15, 15)
energy = 0.6
else:
# Full laugh: 6-9 "ha"s, lower pitch, loud
n_has = random.randint(6, 9)
ha_period = 0.09
pitch = 150 + random.uniform(-10, 10)
energy = 0.8
total_dur = duration or (n_has * ha_period * 1.3)
chunks = []
for i in range(n_has):
ha = self._gen_ha(ha_period * 0.7, energy, pitch)
gap = np.zeros(int(self.sample_rate * ha_period * 0.3))
# Decay slightly across the laugh
decay = 1.0 - 0.2 * (i / max(1, n_has - 1))
chunks.append(ha * decay)
chunks.append(gap)
# Add trailing breath
if intensity > 0.5:
breath = self._nonverbal.synth(NonVerbalType.SIGH, intensity=0.3)
chunks.append(breath[:int(self.sample_rate * 0.3)])
audio = np.concatenate(chunks) if chunks else np.zeros(0)
# Normalize
max_val = float(np.max(np.abs(audio))) if len(audio) > 0 else 0.0
if max_val > 0:
audio = audio / max_val * 0.7 * intensity
return audio.astype(np.float32)
def _gen_ha(self, duration: float, intensity: float, pitch: float) -> np.ndarray:
"""Generate a single 'ha' burst."""
n = int(self.sample_rate * duration)
if n < 2:
return np.zeros(max(2, n), dtype=np.float32)
t = np.linspace(0, duration, n, dtype=np.float64)
# Glottal source
phase = 2.0 * np.pi * pitch * t
source = np.sin(phase)
for h in range(2, 5):
source += (0.4 / h) * np.sin(phase * h)
source /= 3.0
# Breathy noise
noise = np.random.normal(0, 0.3, n)
mixed = source * 0.6 + noise * 0.4
mixed = self._nonverbal._bandpass(mixed, 400, 3000)
# Envelope
env = np.ones(n)
attack = min(int(0.01 * self.sample_rate), n // 4)
release = min(int(0.04 * self.sample_rate), n // 4)
if attack > 0:
env[:attack] = np.linspace(0, 1, attack)
if release > 0:
env[-release:] = np.linspace(1, 0, release)
return (mixed * env * intensity).astype(np.float32)
# ── INTERRUPT HANDLING REFINEMENT ───────────────────────────────────────────
class ContextAwareApologyGenerator:
"""
Differentiates between casual and serious interruptions.
Casual: "Sorry, please go ahead"
Serious: "I didn't mean to cut you off, please continue"
Determines seriousness from:
- How far into the utterance the interrupt occurred (early = more serious)
- User's arousal (high = more serious)
- Frequency of interrupts (repeated = more serious)
"""
CASUAL_RESPONSES: List[str] = [
"Sorry, please go ahead.",
"Go right ahead.",
"After you.",
"Of course β€” go on.",
]
SERIOUS_RESPONSES: List[str] = [
"I'm sorry, I didn't mean to cut you off. Please continue.",
"My apologies β€” please, go ahead, I'm listening.",
"I'm sorry, were you saying something? Please, continue.",
"Forgive me β€” I didn't mean to interrupt. What were you saying?",
]
URGENT_RESPONSES: List[str] = [
"Of course, go ahead.",
"Please, go on.",
"I'm listening.",
]
COOLDOWN_S: float = 15.0
def __init__(self):
self._last_response_time: float = 0.0
self._interrupt_history: Deque[float] = deque(maxlen=10)
def generate(self, nima_text_progress: float, user_arousal: float = 0.3,
interrupt_count: int = 0) -> str:
"""Generate a context-appropriate apology."""
now = time.time()
self._interrupt_history.append(now)
# Count recent interrupts (within 60s)
recent = sum(1 for t in self._interrupt_history if now - t < 60.0)
# Determine seriousness
is_serious = (
nima_text_progress < 0.2 or # very early
user_arousal > 0.7 or # user is aroused
recent > 2 # repeated interrupts
)
is_urgent = user_arousal > 0.8
self._last_response_time = now
if is_urgent:
return random.choice(self.URGENT_RESPONSES)
elif is_serious:
return random.choice(self.SERIOUS_RESPONSES)
else:
return random.choice(self.CASUAL_RESPONSES)
def should_respond(self, interrupt_type: InterruptType) -> bool:
"""Check if an apology should be emitted."""
if interrupt_type != InterruptType.REAL_INTERRUPT:
return False
now = time.time()
if now - self._last_response_time < self.COOLDOWN_S:
return False
return True
class NonBlockingContinuationManager:
"""
Keeps the voice stream flowing even after acknowledging an interrupt.
Instead of stopping entirely, the system:
1. Pauses briefly (200ms)
2. Speaks the apology ("Sorry, please go ahead")
3. Yields the floor but remains ready to resume
This makes the interaction feel conversational rather than mechanical.
"""
PAUSE_BEFORE_APOLOGY_S: float = 0.2
RESUME_THRESHOLD_S: float = 1.5 # if user doesn't speak for 1.5s, resume
def __init__(self):
self._is_paused: bool = False
self._pause_start: float = 0.0
self._deferred_text: str = ""
self._deferred_position: int = 0 # character position to resume from
def yield_floor(self, deferred_text: str, position: int):
"""Yield the floor but remember where to resume from."""
self._is_paused = True
self._pause_start = time.time()
self._deferred_text = deferred_text
self._deferred_position = position
def should_resume(self, user_speaking: bool) -> bool:
"""Check if the system should resume its deferred utterance."""
if not self._is_paused:
return False
# Resume if user hasn't spoken for RESUME_THRESHOLD_S
if not user_speaking:
elapsed = time.time() - self._pause_start
if elapsed > self.RESUME_THRESHOLD_S:
self._is_paused = False
return True
return False
def get_resume_text(self) -> Optional[str]:
"""Get the text to resume (from the deferred position)."""
if not self._deferred_text:
return None
remaining = self._deferred_text[self._deferred_position:]
# Add a brief resume marker
if remaining:
return f"As I was saying, {remaining.lower().lstrip()}"
return None
@property
def is_paused(self) -> bool:
return self._is_paused
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 8 β€” OmniVoice Engine (main orchestrator)
# ═══════════════════════════════════════════════════════════════════════════
class OmniVoiceEngine:
"""
The main OmniVoice engine. Orchestrates ASR, TTS, non-verbal synthesis,
backchannel emission, and interrupt handling into a unified real-time
voice conversation system.
Usage:
engine = OmniVoiceEngine()
async for audio_chunk in engine.stream("Hello, how are you?"):
play(audio_chunk)
"""
def __init__(self,
whisper_model: str = "base",
coqui_model: str = "tts_models/multilingual/multi-dataset/xtts_v2",
speaker_wav: Optional[str] = None,
language: str = "en",
sample_rate: int = 22050,
palace: Any = None):
logger.info("[OmniVoice] initializing v%s...", OMNIVOICE_VERSION)
self.sample_rate = sample_rate
# Initialize backends
self.asr = WhisperASR(model_name=whisper_model)
self.tts = CoquiXTTSBackend(model_name=coqui_model, speaker_wav=speaker_wav,
language=language)
self.nonverbal = ProceduralNonVerbalSynth(sample_rate=sample_rate)
self.backchannel = BackchannelController(self.tts, self.nonverbal, sample_rate)
self.interrupt_detector = InterruptDetector(self.asr)
self.interrupt_response = InterruptionResponse()
# ── v2.0.0 "Mind Through Voice" modules ──
self.prosody_shaper = AdaptiveProsodyShaper()
self.micro_intonation = MicroIntonationInjector(sample_rate)
self.turn_predictor = TurnTakingPredictor()
self.affective_mirror = AffectiveMirror()
self.somatic_integrator = SomaticFeedbackIntegrator()
self.empathy_generator = EmpathyPhraseGenerator()
self.voice_memory = VoiceEventMemoryBridge(palace=palace)
self.narrative_engine = NarrativeContinuityEngine(self.voice_memory)
self.singing = SingingInterjectionModule(sample_rate)
self.multimodal = MultimodalCueEmitter()
self.dynamic_laughter = DynamicLaughterSynth(sample_rate)
self.apology_generator = ContextAwareApologyGenerator()
self.continuation_manager = NonBlockingContinuationManager()
# State
self.state = ConversationState()
self._nima_audio_queue: Deque[np.ndarray] = deque()
self._user_audio_buffer: List[np.ndarray] = []
self._lock = threading.Lock()
logger.info("[OmniVoice] ready (ASR=%s, TTS=%s)",
self.asr.mode.value, self.tts.mode.value)
def update_prosody_from_nima(self, snapshot: Any) -> ProsodyParams:
"""
Update prosody parameters from a NIMA ConsciousnessSnapshot.
This is the NIMA integration point β€” when NIMA is ready, pass its
snapshot here to drive voice prosody in real-time.
"""
prosody = ProsodyParams()
if snapshot is None:
return prosody
try:
# Map NIMA phi β†’ energy
if hasattr(snapshot, "phi") and snapshot.phi:
prosody.energy = float(max(0.3, min(1.0, 0.5 + snapshot.phi.phi_composite * 0.5)))
# Map NIMA rho β†’ warmth
if hasattr(snapshot, "rho") and snapshot.rho:
prosody.warmth = float(max(0.2, min(1.0, snapshot.rho.integrity)))
# Map NIMA emotion β†’ pitch + tone
if hasattr(snapshot, "emotion") and snapshot.emotion:
prosody.base_pitch_hz = 180.0 + (snapshot.emotion.arousal - 0.3) * 60.0
prosody.emotional_tone = getattr(snapshot.emotion, "label", "neutral")
if snapshot.emotion.valence < -0.3:
prosody.pitch_variance = 0.08 # flat for sad
elif snapshot.emotion.valence > 0.3:
prosody.pitch_variance = 0.25 # expressive for happy
# Map qualia authenticity β†’ breathiness
if hasattr(snapshot, "qualia") and snapshot.qualia:
prosody.breathiness = float(max(0.05, 0.3 - snapshot.qualia.authenticity_index * 0.25))
except Exception as e:
logger.warning("[OmniVoice] NIMA snapshot mapping failed: %s", e)
return prosody
async def stream(self, text: str,
prosody: Optional[ProsodyParams] = None,
user_audio_stream: Optional[AsyncGenerator[np.ndarray, None]] = None,
) -> AsyncGenerator[np.ndarray, None]:
"""
Stream synthesized speech for `text`, yielding audio chunks.
If `user_audio_stream` is provided, simultaneously monitors for
interrupts and emits backchannels.
Args:
text: text to synthesize
prosody: prosody parameters (if None, uses defaults)
user_audio_stream: async generator of user audio frames
(for real-time interrupt detection + backchanneling)
Yields:
Audio chunks (float32 numpy arrays at self.sample_rate Hz).
"""
prosody = prosody or ProsodyParams()
self.state.phase = ConversationPhase.NIMA_SPEAKING
self.state.nima_speech_start = time.time()
self.state.current_text = text
self.state.current_text_position = 0.0
# Synthesize the full utterance
full_audio = self.tts.synthesize(text, prosody)
if len(full_audio) == 0:
self.state.phase = ConversationPhase.IDLE
return
total_duration = len(full_audio) / self.sample_rate
chunk_size = int(self.sample_rate * 0.05) # 50ms chunks
chunks_yielded = 0
total_chunks = max(1, len(full_audio) // chunk_size)
# If no user audio stream, just stream the audio
if user_audio_stream is None:
for i in range(0, len(full_audio), chunk_size):
chunk = full_audio[i:i + chunk_size]
self.state.current_text_position = min(1.0, (i + chunk_size) / len(full_audio))
yield chunk
self.state.phase = ConversationPhase.IDLE
return
# ── Real-time mode: stream audio + monitor user ──
user_audio_task = asyncio.create_task(self._collect_user_audio(user_audio_stream))
try:
for i in range(0, len(full_audio), chunk_size):
chunk = full_audio[i:i + chunk_size]
chunks_yielded += 1
self.state.current_text_position = min(1.0, chunks_yielded / total_chunks)
self.state.nima_speech_duration = time.time() - self.state.nima_speech_start
# Check for backchannel emission (while user is speaking)
# Note: backchannels are emitted DURING Nima's speech if the user
# is also speaking (overlap). This is the "while the speaker is
# talking" feature.
# Check for interrupts
remaining_s = (len(full_audio) - i) / self.sample_rate
interrupt = self._check_for_interrupt(remaining_s)
if interrupt and self.interrupt_response.should_respond(interrupt):
# Yield remaining chunk + interruption response
response_text = self.interrupt_response.generate_response(
interrupt, self.state.current_text_position,
self.state.user_emotion_arousal,
)
response_audio = self.tts.synthesize(response_text, ProsodyParams(
base_pitch_hz=200, energy=0.6, warmth=0.8,
))
yield chunk # yield current chunk
# Yield response in smaller chunks
for j in range(0, len(response_audio), chunk_size):
yield response_audio[j:j + chunk_size]
self.state.phase = ConversationPhase.YIELDING
self.state.interrupt_count += 1
logger.info("[OmniVoice] interrupted at %.0f%%: '%s'",
self.state.current_text_position * 100, response_text)
return # Stop streaming Nima's audio
yield chunk
# Finished speaking without interruption
self.state.phase = ConversationPhase.IDLE
finally:
user_audio_task.cancel()
try:
await user_audio_task
except asyncio.CancelledError:
pass
async def _collect_user_audio(self, stream: AsyncGenerator[np.ndarray, None]):
"""Background task: collect user audio for interrupt detection."""
try:
async for frame in stream:
with self._lock:
self._user_audio_buffer.append(frame)
# Keep only last 2 seconds
max_samples = 16000 * 2 # 2s at 16kHz
total = sum(len(f) for f in self._user_audio_buffer)
while total > max_samples and self._user_audio_buffer:
removed = self._user_audio_buffer.pop(0)
total -= len(removed)
except asyncio.CancelledError:
pass
def _check_for_interrupt(self, remaining_s: float) -> Optional[InterruptClassification]:
"""Check if there's an interrupt in the buffered user audio."""
with self._lock:
if not self._user_audio_buffer:
return None
audio = np.concatenate(self._user_audio_buffer[-5:]) # last ~500ms
self._user_audio_buffer.clear()
if len(audio) < 1600: # <100ms
return None
classification = self.interrupt_detector.classify(
audio, sample_rate=16000,
nima_text_progress=self.state.current_text_position,
nima_speech_remaining_s=remaining_s,
)
if classification.interrupt_type == InterruptType.REAL_INTERRUPT:
return classification
# Log ignored interrupts (backchannels, non-verbals)
if classification.interrupt_type != InterruptType.SILENCE:
logger.debug("[OmniVoice] ignored %s: %s",
classification.interrupt_type.value, classification.reason)
return None
def emit_backchannel(self, user_audio: np.ndarray) -> Optional[BackchannelEvent]:
"""
Check if a backchannel should be emitted while the user is speaking.
Call this with recent user audio frames.
Returns a BackchannelEvent if one should fire, else None.
"""
return self.backchannel.should_backchannel(self.state, user_audio)
def synth_non_verbal(self, expr_type: NonVerbalType, intensity: float = 0.7) -> np.ndarray:
"""Synthesize a non-verbal expression directly."""
return self.nonverbal.synth(expr_type, intensity)
def get_stats(self) -> Dict[str, Any]:
return {
"version": OMNIVOICE_VERSION,
"asr_mode": self.asr.mode.value,
"tts_mode": self.tts.mode.value,
"sample_rate": self.sample_rate,
"conversation_state": {
"phase": self.state.phase.value,
"interrupt_count": self.state.interrupt_count,
"backchannel_count": self.state.backchannel_count,
},
# v2.0.0 module stats
"v2_modules": {
"prosody_shaper": "active",
"micro_intonation": "active",
"turn_predictor": "active",
"affective_mirror": "active",
"somatic_integrator": {
"strain": self.somatic_integrator.strain,
"energy": self.somatic_integrator.energy,
},
"empathy_generator": "active",
"voice_memory": self.voice_memory.get_stats(),
"narrative_engine": "active",
"singing_interjections": "active",
"multimodal_cues": len(self.multimodal.get_recent_cues(1000)),
"dynamic_laughter": "active",
"apology_generator": "active",
"continuation_manager": {
"is_paused": self.continuation_manager.is_paused,
},
},
}
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 9 β€” NIMA Voice Adapter
# ═══════════════════════════════════════════════════════════════════════════
class NimaVoiceAdapter:
"""
Bridges NIMA's ConsciousnessSnapshot β†’ OmniVoice prosody params.
Also bridges NIMA's CTM tournament + MemoryPalace episodes β†’ voice context.
v2.0.0: Now integrates ALL "mind through voice" modules:
- AdaptiveProsodyShaper (emotion β†’ prosody dynamics)
- AffectiveMirror (mirrors user's emotional tone)
- SomaticFeedbackIntegrator (strain β†’ voice fatigue)
- VoiceEventMemoryBridge (stores voice events in MemPalace)
- NarrativeContinuityEngine (references past conversations)
Usage:
adapter = NimaVoiceAdapter(engine)
prosody = adapter.snapshot_to_prosody(nima_snapshot)
async for chunk in engine.stream(text, prosody=prosody):
...
Full NIMA + CTM + MemPalace integration:
# After NIMA's process_stimulus():
adapter.update_from_snapshot(snapshot)
adapter.update_from_ctm_winner(ctm_winner)
adapter.update_somatic_from_nima(snapshot.phi, snapshot.rho)
prosody = adapter.get_contextual_prosody()
# After speaking:
adapter.store_voice_event(text, prosody, duration_s)
"""
def __init__(self, engine: OmniVoiceEngine):
self._engine = engine
self._last_snapshot: Any = None
self._last_ctm_winner: Optional[Dict[str, Any]] = None
self._last_episode_context: Optional[Dict[str, Any]] = None
self._user_emotion: str = "neutral"
self._user_valence: float = 0.0
self._user_arousal: float = 0.3
def update_from_snapshot(self, snapshot: Any) -> ProsodyParams:
"""Update engine prosody from a NIMA ConsciousnessSnapshot."""
self._last_snapshot = snapshot
# Extract user emotion from snapshot (if available)
if snapshot and hasattr(snapshot, "emotion") and snapshot.emotion:
self._user_valence = float(getattr(snapshot.emotion, "valence", 0.0))
self._user_arousal = float(getattr(snapshot.emotion, "arousal", 0.3))
self._user_emotion = getattr(snapshot.emotion, "label", "neutral")
return self._engine.update_prosody_from_nima(snapshot)
def update_from_ctm_winner(self, ctm_winner: Optional[Dict[str, Any]]) -> None:
"""
Update engine context from a CTM tournament winner.
The winning processor's character influences voice style:
- memory_palace β†’ warmer, more nostalgic
- somatic_registry β†’ more emotionally resonant
- wernicke β†’ clearer, more articulate
- broca β†’ faster, more fluent
"""
if ctm_winner is None:
self._last_ctm_winner = None
return
self._last_ctm_winner = ctm_winner
logger.debug("[NimaVoiceAdapter] CTM winner: %s (score=%.3f)",
ctm_winner.get("processor_name", "?"),
ctm_winner.get("score", 0.0))
def update_somatic_from_nima(self, phi: Any, rho: Any) -> None:
"""
Update the somatic feedback integrator from NIMA's phi + rho.
Ties voice modulation to system strain (biological fatigue signals).
"""
strain = 0.0
allostatic = 0.0
if phi and hasattr(phi, "phenomenological_strain"):
strain = float(phi.phenomenological_strain)
# Allostatic load approximation from rho dissonance
if rho and hasattr(rho, "dissonance"):
allostatic = float(rho.dissonance)
self._engine.somatic_integrator.update_from_nima(strain, allostatic)
def update_from_episode(self, episode: Optional[Dict[str, Any]]) -> None:
"""
Update engine context from a MemoryPalace episode.
If the episode has high strain or negative valence, the voice
should reflect that (lower pitch, more breathiness).
"""
if episode is None:
self._last_episode_context = None
return
self._last_episode_context = episode
logger.debug("[NimaVoiceAdapter] episode context updated: valence=%.2f",
episode.get("valence", 0.0))
def get_contextual_prosody(self) -> ProsodyParams:
"""
Get prosody params that reflect NIMA state + CTM winner + episode
context + somatic feedback + affective mirroring + adaptive shaping.
This is the FULL v2.0.0 integration β€” all modules contribute.
"""
# 1. Start with NIMA snapshot β†’ base prosody
prosody = self._engine.update_prosody_from_nima(self._last_snapshot)
# 2. Apply affective mirroring (match user's emotional tone)
prosody, mirror_emotion = self._engine.affective_mirror.mirror(
self._user_valence, self._user_arousal, prosody
)
# 3. Apply adaptive prosody shaping (emotion β†’ pitch/rhythm/timbre)
empathy_level = 0.5
if self._user_valence < -0.3:
empathy_level = 0.8 # more empathetic when user is negative
emotion_for_shaping = self._user_emotion if self._user_emotion != "neutral" else mirror_emotion
prosody = self._engine.prosody_shaper.shape(
prosody, emotion=emotion_for_shaping,
valence=self._user_valence, arousal=self._user_arousal,
empathy_level=empathy_level,
)
# 4. Apply somatic feedback (strain β†’ voice fatigue)
prosody = self._engine.somatic_integrator.apply_somatic_modulation(prosody)
# 5. Apply CTM winner influence on voice character
if self._last_ctm_winner:
processor = self._last_ctm_winner.get("processor_name", "")
if processor == "memory_palace":
prosody.warmth = float(min(1.0, prosody.warmth + 0.10))
prosody.speech_rate_wpm *= 0.95 # more measured, nostalgic
elif processor == "somatic_registry":
prosody.breathiness = float(min(0.3, prosody.breathiness + 0.05))
prosody.pitch_variance = float(min(0.35, prosody.pitch_variance + 0.05))
elif processor == "wernicke":
prosody.speech_rate_wpm *= 1.05 # clearer, more articulate
elif processor == "broca":
prosody.speech_rate_wpm *= 1.08 # faster, more fluent
# 6. Apply episode context modifications
if self._last_episode_context:
ep = self._last_episode_context
strain = ep.get("score", 0.0)
if strain > 0.5:
prosody.base_pitch_hz -= 10.0
prosody.breathiness = float(min(0.4, prosody.breathiness + 0.05))
if ep.get("valence", 0.0) < -0.3:
prosody.warmth = float(min(1.0, prosody.warmth + 0.1))
prosody.speech_rate_wpm -= 10.0
return prosody
def store_voice_event(self, text: str, prosody: ProsodyParams,
duration_s: float, speaker: str = "nima") -> str:
"""
Store a voice event in MemPalace with full affective tags.
Call this after each utterance to build episodic voice memory.
"""
event = VoiceEvent(
speaker=speaker,
text=text,
audio_duration_s=duration_s,
prosody_snapshot={
"pitch_hz": prosody.base_pitch_hz,
"rate_wpm": prosody.speech_rate_wpm,
"energy": prosody.energy,
"warmth": prosody.warmth,
"breathiness": prosody.breathiness,
},
emotion=prosody.emotional_tone,
valence=self._user_valence,
arousal=self._user_arousal,
strain=self._engine.somatic_integrator.strain,
conversation_phase=self._engine.state.phase.value,
interrupt_count=self._engine.state.interrupt_count,
backchannel_count=self._engine.state.backchannel_count,
)
return self._engine.voice_memory.store_voice_event(event)
def get_narrative_continuity(self, current_topic: str = "") -> Optional[str]:
"""
Generate a narrative continuity phrase referencing a past voice event.
Returns None if no suitable past event exists.
"""
return self._engine.narrative_engine.generate_continuity_phrase(current_topic)
def get_empathy_phrase(self) -> str:
"""Generate a contextual empathy phrase based on current user state."""
return self._engine.empathy_generator.generate(
self._user_emotion, self._user_valence, self._user_arousal
)
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 10 β€” Utility functions
# ═══════════════════════════════════════════════════════════════════════════
def save_wav(audio: np.ndarray, path: str, sample_rate: int = 22050) -> str:
"""Save audio array to a WAV file."""
audio_int16 = np.clip(audio * 32767, -32768, 32767).astype(np.int16)
with wave.open(path, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(sample_rate)
wf.writeframes(audio_int16.tobytes())
return path
def load_wav(path: str) -> Tuple[np.ndarray, int]:
"""Load a WAV file into a float32 numpy array."""
with wave.open(path, "rb") as wf:
n_channels = wf.getnchannels()
sampwidth = wf.getsampwidth()
sample_rate = wf.getframerate()
frames = wf.readframes(wf.getnframes())
if sampwidth == 2:
audio = np.frombuffer(frames, dtype=np.int16).astype(np.float32) / 32768.0
elif sampwidth == 1:
audio = (np.frombuffer(frames, dtype=np.uint8).astype(np.float32) - 128) / 128.0
else:
raise ValueError(f"Unsupported sample width: {sampwidth}")
if n_channels > 1:
audio = audio[::n_channels] # mono downmix (take first channel)
return audio, sample_rate
async def demo():
"""OmniVoice Engine demo."""
print("\n" + "=" * 70)
print(f" OmniVoice Engine v{OMNIVOICE_VERSION} β€” Demo")
print("=" * 70 + "\n")
engine = OmniVoiceEngine()
print(f"ASR mode: {engine.asr.mode.value}")
print(f"TTS mode: {engine.tts.mode.value}")
print()
# Test 1: Basic TTS synthesis
print("[Test 1] Basic speech synthesis...")
prosody = ProsodyParams(base_pitch_hz=180, energy=0.8, warmth=0.7)
audio = engine.tts.synthesize("Hello, I am OmniVoice. Nice to meet you.", prosody)
print(f" Audio: {len(audio)} samples, {len(audio)/engine.sample_rate:.2f}s")
save_wav(audio, "/home/z/my-project/download/omnivoice_test1_speech.wav", engine.sample_rate)
print(f" Saved: omnivoice_test1_speech.wav")
print()
# Test 2: Non-verbal expressions
print("[Test 2] Non-verbal expressions...")
for expr in [NonVerbalType.LAUGHTER, NonVerbalType.SIGH, NonVerbalType.GASp,
NonVerbalType.GROAN, NonVerbalType.AWW, NonVerbalType.MM]:
audio = engine.synth_non_verbal(expr, intensity=0.7)
print(f" {expr.value:12s}: {len(audio)} samples, {len(audio)/engine.sample_rate:.2f}s")
# Save laughter for verification
laugh = engine.synth_non_verbal(NonVerbalType.LAUGHTER)
save_wav(laugh, "/home/z/my-project/download/omnivoice_test2_laughter.wav", engine.sample_rate)
print(f" Saved: omnivoice_test2_laughter.wav")
print()
# Test 3: Streaming
print("[Test 3] Streaming speech...")
chunks = []
async for chunk in engine.stream("This is a streaming test of the OmniVoice engine.", prosody=prosody):
chunks.append(chunk)
full = np.concatenate(chunks)
print(f" Streamed {len(chunks)} chunks, total {len(full)} samples, {len(full)/engine.sample_rate:.2f}s")
save_wav(full, "/home/z/my-project/download/omnivoice_test3_stream.wav", engine.sample_rate)
print(f" Saved: omnivoice_test3_stream.wav")
print()
# Test 4: Interrupt classification
print("[Test 4] Interrupt classification...")
# Simulate different types of user speech
test_cases = [
("Backchannel 'yeah'", engine.synth_non_verbal(NonVerbalType.MM, 0.3)[:int(16000*0.4)]),
("Laughter", engine.synth_non_verbal(NonVerbalType.LAUGHTER, 0.7)[:int(16000*0.8)]),
("Sigh", engine.synth_non_verbal(NonVerbalType.SIGH, 0.6)[:int(16000*0.5)]),
]
for name, audio in test_cases:
cls = engine.interrupt_detector.classify(audio, sample_rate=16000)
print(f" {name:25s} β†’ {cls.interrupt_type.value} (conf={cls.confidence:.2f}, reason='{cls.reason}')")
print()
# Test 5: Interruption response
print("[Test 5] Interruption responses...")
for progress in [0.1, 0.5, 0.9]:
fake_interrupt = InterruptClassification(
interrupt_type=InterruptType.REAL_INTERRUPT,
confidence=0.8,
duration_s=1.5,
)
response = engine.interrupt_response.generate_response(
fake_interrupt, nima_text_progress=progress, user_arousal=0.4,
)
print(f" Progress {progress:.0%}: '{response}'")
print()
# Test 6: NIMA adapter
print("[Test 6] NIMA voice adapter...")
adapter = NimaVoiceAdapter(engine)
prosody = adapter.get_contextual_prosody()
print(f" Default prosody: pitch={prosody.base_pitch_hz:.0f}Hz, energy={prosody.energy:.2f}, warmth={prosody.warmth:.2f}")
# Simulate episode context
adapter.update_from_episode({"valence": -0.5, "score": 0.7, "processor_name": "somatic_registry"})
prosody2 = adapter.get_contextual_prosody()
print(f" With episode (val=-0.5, strain=0.7): pitch={prosody2.base_pitch_hz:.0f}Hz, "
f"energy={prosody2.energy:.2f}, warmth={prosody2.warmth:.2f}, breath={prosody2.breathiness:.2f}")
print()
print("=" * 70)
print(f" OmniVoice v{OMNIVOICE_VERSION} Demo Complete")
print("=" * 70)
if __name__ == "__main__":
asyncio.run(demo())