| |
| """ |
| OmniVoice Engine v2.0.0 β "Mind Through Voice" |
| ================================================ |
| A consciousness-aware real-time voice conversation engine that feels |
| like a mind speaking through a voice, not a synthesizer. |
| |
| ARCHITECTURE: |
| - Whisper ASR (local) for real speech-to-text + interrupt detection |
| - Coqui XTTS for neural text-to-speech with voice cloning |
| - Procedural numpy DSP for non-verbal expressions (laughter, sighs, gasps, etc.) |
| - Smart interrupt awareness that ignores non-verbals, backchannels, and |
| collaborative turn-sharing |
| - Graceful interruption responses ("I'm sorry, were you saying something?") |
| - NIMA-integrated adapter (reads ConsciousnessSnapshot to drive prosody) |
| |
| v2.0.0 NEW MODULES β the "mind through voice" layer: |
| |
| CONVERSATIONAL FLOW: |
| - AdaptiveProsodyShaper: emotion β pitch/rhythm/timbre dynamics |
| (softer when empathetic, brighter when excited) |
| - MicroIntonationInjector: hesitations, breaths, emphasis shifts |
| that signal thoughtfulness or uncertainty |
| - TurnTakingPredictor: predicts when user will finish, smoothly |
| takes the floor instead of waiting for silence |
| |
| EMOTIONAL & COGNITIVE GROUNDING: |
| - AffectiveMirror: matches user's emotional tone (calm, energetic, |
| concerned) with subtle vocal adjustments |
| - SomaticFeedbackIntegrator: ties voice modulation to system strain |
| or energy states (biological fatigue signals) |
| - EmpathyPhraseGenerator: contextual empathy inserts ("That must |
| feel tough") instead of generic nods |
| |
| MEMORY & CONTINUITY: |
| - VoiceEventMemoryBridge: stores every utterance as an episodic |
| voice event in MemPalace with affective tags |
| - NarrativeContinuityEngine: references past conversations naturally |
| ("As you mentioned yesterday, you sounded excited about...") |
| |
| EXPRESSIVE EXTENSIONS: |
| - SingingInterjectionModule: short melodic phrases (humming, tonal |
| affirmations) woven into speech |
| - MultimodalCueEmitter: pairs voice with haptic/visual signals |
| (soft vibration or light pulse when nodding) |
| - DynamicLaughterSynth: adaptive laughter (chuckle β full laugh) |
| scaled by intensity instead of fixed samples |
| |
| INTERRUPT HANDLING REFINEMENT: |
| - ContextAwareApologyGenerator: casual vs serious apologies |
| ("Sorry, please go ahead" vs "I didn't mean to cut you off") |
| - NonBlockingContinuationManager: keeps voice flowing after |
| acknowledging an interrupt, so it feels conversational |
| |
| Author: Norman de la Paz-Tabora |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import json |
| import logging |
| import math |
| import os |
| import random |
| import struct |
| import sys |
| import threading |
| import time |
| import uuid |
| import wave |
| from collections import deque |
| from dataclasses import dataclass, field |
| from enum import Enum |
| from typing import ( |
| Any, AsyncGenerator, Callable, Deque, Dict, Generator, |
| List, Optional, Tuple, Union, |
| ) |
|
|
| import numpy as np |
|
|
| |
|
|
| |
| try: |
| import whisper |
| WHISPER_AVAILABLE = True |
| WHISPER_BACKEND = "openai-whisper" |
| except ImportError: |
| WHISPER_AVAILABLE = False |
| whisper = None |
|
|
| if not WHISPER_AVAILABLE: |
| try: |
| from faster_whisper import WhisperModel |
| WHISPER_AVAILABLE = True |
| WHISPER_BACKEND = "faster-whisper" |
| except ImportError: |
| WhisperModel = None |
| WHISPER_BACKEND = None |
|
|
| |
| try: |
| from TTS.api import TTS as CoquiTTS |
| COQUI_TTS_AVAILABLE = True |
| except ImportError: |
| try: |
| from TTS.api import TTS as CoquiTTS |
| COQUI_TTS_AVAILABLE = True |
| except ImportError: |
| COQUI_TTS_AVAILABLE = False |
| CoquiTTS = None |
|
|
| |
| logger = logging.getLogger("OmniVoice") |
| if not logger.handlers: |
| _h = logging.StreamHandler(sys.stdout) |
| _h.setFormatter(logging.Formatter( |
| "%(asctime)s [%(levelname)s] %(name)s :: %(message)s", |
| datefmt="%Y-%m-%d %H:%M:%S", |
| )) |
| logger.addHandler(_h) |
| logger.setLevel(logging.INFO) |
|
|
| OMNIVOICE_VERSION = "2.0.0-MIND-THROUGH-VOICE" |
|
|
|
|
| |
| |
| |
|
|
| class NonVerbalType(Enum): |
| """Categories of non-verbal vocal expressions.""" |
| LAUGHTER = "laughter" |
| GIGGLE = "giggle" |
| GASp = "gasp" |
| GROAN = "groan" |
| MOAN = "moan" |
| SIGH = "sigh" |
| CLUCK = "cluck" |
| CLICK = "click" |
| AWW = "aww" |
| OH = "oh" |
| MM = "mm" |
| WOW = "wow" |
|
|
|
|
| class ConversationPhase(Enum): |
| """Which phase of the conversation we're in.""" |
| IDLE = "idle" |
| USER_SPEAKING = "user_speaking" |
| NIMA_SPEAKING = "nima_speaking" |
| OVERLAP = "overlap" |
| YIELDING = "yielding" |
|
|
|
|
| class InterruptType(Enum): |
| """Classification of detected user speech during Nima's turn.""" |
| REAL_INTERRUPT = "real_interrupt" |
| NON_VERBAL = "non_verbal" |
| BACKCHANNEL = "backchannel" |
| COLLABORATIVE_TURN_SHARING = "collaborative" |
| SILENCE = "silence" |
|
|
|
|
| class BackchannelTrigger(Enum): |
| """Why a backchannel was emitted.""" |
| PERIODIC = "periodic" |
| ON_PAUSE = "on_pause" |
| ON_EMOTION_SHIFT = "emotion" |
|
|
|
|
| class TTSMode(Enum): |
| """Which TTS backend is active.""" |
| COQUI_XTTS = "coqui_xtts" |
| PROCEDURAL = "procedural" |
|
|
|
|
| class ASRMode(Enum): |
| """Which ASR backend is active.""" |
| WHISPER = "whisper" |
| VAD_ONLY = "vad_only" |
|
|
|
|
| @dataclass |
| class AudioFrame: |
| """A chunk of audio with metadata.""" |
| samples: np.ndarray |
| sample_rate: int = 16000 |
| timestamp: float = field(default_factory=time.time) |
| is_speech: bool = False |
| energy: float = 0.0 |
|
|
| @property |
| def duration(self) -> float: |
| return len(self.samples) / self.sample_rate |
|
|
|
|
| @dataclass |
| class TranscriptSegment: |
| """A transcribed segment of user speech.""" |
| text: str |
| start_time: float |
| end_time: float |
| confidence: float = 0.0 |
| is_backchannel: bool = False |
| is_non_verbal: bool = False |
|
|
|
|
| @dataclass |
| class BackchannelEvent: |
| """A backchannel emission (verbal nod or non-verbal expression).""" |
| trigger: BackchannelTrigger |
| audio: np.ndarray |
| sample_rate: int = 22050 |
| is_verbal: bool = True |
| label: str = "" |
| timestamp: float = field(default_factory=time.time) |
|
|
|
|
| @dataclass |
| class InterruptClassification: |
| """Result of classifying detected user speech during Nima's turn.""" |
| interrupt_type: InterruptType |
| confidence: float = 0.0 |
| reason: str = "" |
| transcript: str = "" |
| duration_s: float = 0.0 |
| spectral_features: Dict[str, float] = field(default_factory=dict) |
|
|
|
|
| @dataclass |
| class ProsodyParams: |
| """Prosody parameters driven by consciousness state.""" |
| base_pitch_hz: float = 180.0 |
| speech_rate_wpm: float = 140.0 |
| energy: float = 0.8 |
| breathiness: float = 0.1 |
| vibrato_depth: float = 0.0 |
| warmth: float = 0.7 |
| pitch_variance: float = 0.15 |
| emotional_tone: str = "neutral" |
|
|
|
|
| @dataclass |
| class ConversationState: |
| """Tracks the current state of the conversation.""" |
| phase: ConversationPhase = ConversationPhase.IDLE |
| user_speech_start: float = 0.0 |
| user_speech_duration: float = 0.0 |
| nima_speech_start: float = 0.0 |
| nima_speech_duration: float = 0.0 |
| last_backchannel_time: float = 0.0 |
| last_user_pause_time: float = 0.0 |
| user_emotion_arousal: float = 0.3 |
| user_emotion_valence: float = 0.0 |
| last_arousal_sample: float = 0.3 |
| interrupt_count: int = 0 |
| backchannel_count: int = 0 |
| current_text: str = "" |
| current_text_position: float = 0.0 |
|
|
|
|
| |
| |
| |
|
|
| class ProceduralNonVerbalSynth: |
| """ |
| Synthesizes non-verbal vocal expressions using pure numpy DSP. |
| Each expression type has a hand-crafted signal model: |
| |
| LAUGHTER: Periodic bursts of filtered noise with 80-120ms "ha" cycles |
| GIGGLE: Faster, higher-pitched laughter (160-200ms cycles, f0 upshift) |
| GASp: Short (200ms) inverse-filtered impulse, sharp onset, quick decay |
| GROAN: Low-pitched (80Hz) descending sawtooth, 600ms, with noise |
| MOAN: Mid-pitched (140Hz) sustained sine with vibrato, 800ms |
| SIGH: Downward-filtered noise, 500ms, lowpass sweep 2000β400Hz |
| CLUCK: Short (60ms) plosive burst + click, dual impulse |
| CLICK: Single impulse (20ms) with quick decay β "tsk" sound |
| AWW: Low-pitched (150Hz) "aw" vowel, 400ms, with warmth |
| OH: Mid-pitched (200Hz) "oh" vowel, 300ms |
| MM: Humming (120Hz), nasal-filtered, 400ms |
| WOW: Rising pitch (180β260Hz) "wow" vowel, 500ms |
| """ |
|
|
| SAMPLE_RATE: int = 22050 |
|
|
| |
| VOWEL_FORMANTS: Dict[str, Dict] = { |
| "aw": {"F1": 570, "F2": 840, "F3": 2410, "bw": [55, 75, 115]}, |
| "oh": {"F1": 480, "F2": 760, "F3": 2300, "bw": [50, 70, 110]}, |
| "mm": {"F1": 280, "F2": 900, "F3": 2200, "bw": [45, 65, 105]}, |
| "ah": {"F1": 730, "F2": 1090, "F3": 2440, "bw": [60, 80, 120]}, |
| } |
|
|
| def __init__(self, sample_rate: int = 22050): |
| self.sample_rate = sample_rate |
|
|
| def synth(self, expr_type: NonVerbalType, intensity: float = 0.7, |
| duration_override: Optional[float] = None) -> np.ndarray: |
| """Synthesize a non-verbal expression. Returns float32 audio.""" |
| intensity = float(max(0.1, min(1.0, intensity))) |
| method = getattr(self, f"_synth_{expr_type.value}", None) |
| if method is None: |
| logger.warning("Unknown non-verbal type: %s, falling back to sigh", expr_type) |
| method = self._synth_sigh |
| audio = method(intensity, duration_override) |
| |
| max_val = float(np.max(np.abs(audio))) if len(audio) > 0 else 0.0 |
| if max_val > 0: |
| audio = audio / max_val * 0.7 * intensity |
| return audio.astype(np.float32) |
|
|
| |
| def _synth_laughter(self, intensity: float, dur: Optional[float]) -> np.ndarray: |
| total_dur = dur or 1.2 |
| ha_period = 0.10 |
| n_has = int(total_dur / ha_period) |
| chunks = [] |
| for i in range(n_has): |
| ha = self._gen_ha_burst(ha_period * 0.7, intensity, pitch=180 + random.uniform(-20, 20)) |
| |
| gap = np.zeros(int(self.sample_rate * ha_period * 0.3)) |
| |
| decay = 1.0 - 0.3 * (i / max(1, n_has - 1)) |
| chunks.append(ha * decay) |
| chunks.append(gap) |
| return np.concatenate(chunks) if chunks else np.zeros(0) |
|
|
| def _gen_ha_burst(self, duration: float, intensity: float, pitch: float = 180) -> np.ndarray: |
| """Generate a single 'ha' burst β voiced segment with fast onset/offset.""" |
| n = int(self.sample_rate * duration) |
| t = np.linspace(0, duration, n, dtype=np.float64) |
| |
| phase = 2.0 * np.pi * pitch * t |
| source = np.sin(phase) |
| for h in range(2, 5): |
| source += (0.4 / h) * np.sin(phase * h) |
| source /= 3.0 |
| |
| noise = np.random.normal(0, 0.3, n) |
| mixed = source * 0.6 + noise * 0.4 |
| |
| mixed = self._bandpass(mixed, 400, 3000) |
| |
| env = np.ones(n) |
| attack = min(int(0.01 * self.sample_rate), n // 4) |
| release = min(int(0.04 * self.sample_rate), n // 4) |
| if attack > 0: |
| env[:attack] = np.linspace(0, 1, attack) |
| if release > 0: |
| env[-release:] = np.linspace(1, 0, release) |
| return mixed * env * intensity |
|
|
| |
| def _synth_giggle(self, intensity: float, dur: Optional[float]) -> np.ndarray: |
| total_dur = dur or 0.8 |
| hee_period = 0.07 |
| n_hees = int(total_dur / hee_period) |
| chunks = [] |
| for i in range(n_hees): |
| hee = self._gen_ha_burst(hee_period * 0.6, intensity * 0.8, |
| pitch=260 + random.uniform(-30, 30)) |
| gap = np.zeros(int(self.sample_rate * hee_period * 0.4)) |
| chunks.append(hee) |
| chunks.append(gap) |
| return np.concatenate(chunks) if chunks else np.zeros(0) |
|
|
| |
| def _synth_gasp(self, intensity: float, dur: Optional[float]) -> np.ndarray: |
| duration = dur or 0.25 |
| n = int(self.sample_rate * duration) |
| t = np.linspace(0, duration, n, dtype=np.float64) |
| |
| noise = np.random.normal(0, 1, n) |
| |
| filtered = self._highpass(noise, 800) |
| |
| env = np.exp(-t * 15.0) |
| attack = min(int(0.005 * self.sample_rate), n // 10) |
| if attack > 0: |
| env[:attack] = np.linspace(0, 1, attack) * env[:attack] / max(env[:attack].max(), 1e-6) |
| |
| pulse = 0.2 * np.sin(2 * np.pi * 200 * t) * np.exp(-t * 10) |
| return (filtered * 0.7 + pulse * 0.3) * env * intensity |
|
|
| |
| def _synth_groan(self, intensity: float, dur: Optional[float]) -> np.ndarray: |
| duration = dur or 0.6 |
| n = int(self.sample_rate * duration) |
| t = np.linspace(0, duration, n, dtype=np.float64) |
| |
| f0 = 90.0 - 30.0 * (t / duration) |
| phase = 2.0 * np.pi * np.cumsum(f0) / self.sample_rate |
| source = np.sin(phase) |
| for h in range(2, 4): |
| source += (0.3 / h) * np.sin(phase * h) |
| source /= 2.0 |
| |
| noise = np.random.normal(0, 0.2, n) |
| mixed = source * 0.7 + noise * 0.3 |
| mixed = self._lowpass(mixed, 600) |
| |
| env = np.ones(n) |
| attack = min(int(0.08 * self.sample_rate), n // 4) |
| release = min(int(0.15 * self.sample_rate), n // 4) |
| if attack > 0: |
| env[:attack] = np.linspace(0, 1, attack) |
| if release > 0: |
| env[-release:] = np.linspace(1, 0.3, release) |
| return mixed * env * intensity |
|
|
| |
| def _synth_moan(self, intensity: float, dur: Optional[float]) -> np.ndarray: |
| duration = dur or 0.8 |
| n = int(self.sample_rate * duration) |
| t = np.linspace(0, duration, n, dtype=np.float64) |
| f0 = 140.0 |
| vibrato = 4.0 * np.sin(2 * np.pi * 5.5 * t) |
| phase = 2.0 * np.pi * np.cumsum(f0 + vibrato) / self.sample_rate |
| source = np.sin(phase) |
| for h in range(2, 5): |
| source += (0.4 / h) * np.sin(phase * h) |
| source /= 3.0 |
| |
| noise = np.random.normal(0, 0.15, n) |
| mixed = source * 0.85 + noise * 0.15 |
| mixed = self._bandpass(mixed, 200, 2000) |
| |
| env = np.ones(n) |
| attack = min(int(0.1 * self.sample_rate), n // 4) |
| release = min(int(0.2 * self.sample_rate), n // 4) |
| if attack > 0: |
| env[:attack] = np.linspace(0, 1, attack) |
| if release > 0: |
| env[-release:] = np.linspace(1, 0.4, release) |
| return mixed * env * intensity |
|
|
| |
| def _synth_sigh(self, intensity: float, dur: Optional[float]) -> np.ndarray: |
| duration = dur or 0.5 |
| n = int(self.sample_rate * duration) |
| t = np.linspace(0, duration, n, dtype=np.float64) |
| noise = np.random.normal(0, 1, n) |
| |
| |
| chunk_size = max(1, n // 10) |
| filtered = np.zeros(n) |
| for i in range(0, n, chunk_size): |
| end = min(i + chunk_size, n) |
| cutoff = 2000.0 - 1600.0 * (i / max(1, n)) |
| filtered[i:end] = self._lowpass(noise[i:end], cutoff) |
| |
| pulse = 0.15 * np.sin(2 * np.pi * 120 * t) * np.exp(-t * 2) |
| mixed = filtered * 0.8 + pulse * 0.2 |
| |
| env = np.ones(n) |
| attack = min(int(0.05 * self.sample_rate), n // 4) |
| if attack > 0: |
| env[:attack] = np.linspace(0, 1, attack) |
| env *= np.exp(-t * 2.5) |
| return mixed * env * intensity |
|
|
| |
| def _synth_cluck(self, intensity: float, dur: Optional[float]) -> np.ndarray: |
| duration = dur or 0.08 |
| n = int(self.sample_rate * duration) |
| |
| burst_len = min(int(0.02 * self.sample_rate), n) |
| burst = np.zeros(n) |
| if burst_len > 0: |
| burst[:burst_len] = np.random.normal(0, 1, burst_len) * np.hanning(burst_len) |
| |
| click_len = min(int(0.005 * self.sample_rate), n) |
| click = np.zeros(n) |
| if click_len > 0: |
| click[:click_len] = np.random.normal(0, 1, click_len) * 0.6 |
| |
| audio = burst.copy() |
| offset = min(int(0.03 * self.sample_rate), n - click_len) |
| if offset + click_len <= n: |
| audio[offset:offset + click_len] += click[:click_len] * 0.6 |
| |
| audio = self._lowpass(audio, 3000) |
| return audio * intensity |
|
|
| |
| def _synth_click(self, intensity: float, dur: Optional[float]) -> np.ndarray: |
| duration = dur or 0.03 |
| n = int(self.sample_rate * duration) |
| |
| impulse = np.zeros(n) |
| impulse_len = min(int(0.003 * self.sample_rate), n) |
| if impulse_len > 0: |
| impulse[:impulse_len] = np.random.normal(0, 1, impulse_len) |
| |
| t = np.linspace(0, duration, n) |
| env = np.exp(-t * 100) |
| audio = impulse * env |
| |
| audio = self._highpass(audio, 1500) |
| return audio * intensity |
|
|
| |
| def _synth_aww(self, intensity: float, dur: Optional[float]) -> np.ndarray: |
| return self._synth_vowel_expr("aw", 150, 0.4, intensity, dur) |
|
|
| |
| def _synth_oh(self, intensity: float, dur: Optional[float]) -> np.ndarray: |
| return self._synth_vowel_expr("oh", 200, 0.3, intensity, dur) |
|
|
| |
| def _synth_mm(self, intensity: float, dur: Optional[float]) -> np.ndarray: |
| return self._synth_vowel_expr("mm", 120, 0.4, intensity, dur, nasal=True) |
|
|
| |
| def _synth_wow(self, intensity: float, dur: Optional[float]) -> np.ndarray: |
| duration = dur or 0.5 |
| n = int(self.sample_rate * duration) |
| t = np.linspace(0, duration, n, dtype=np.float64) |
| |
| f0 = 180.0 + 80.0 * (t / duration) |
| phase = 2.0 * np.pi * np.cumsum(f0) / self.sample_rate |
| audio = self._formant_filter(np.sin(phase), "aw", n) |
| |
| env = np.ones(n) |
| attack = min(int(0.05 * self.sample_rate), n // 4) |
| release = min(int(0.1 * self.sample_rate), n // 4) |
| if attack > 0: |
| env[:attack] = np.linspace(0, 1, attack) |
| if release > 0: |
| env[-release:] = np.linspace(1, 0.5, release) |
| return audio * env * intensity |
|
|
| |
| def _synth_vowel_expr(self, vowel: str, f0: float, duration: float, |
| intensity: float, dur_override: Optional[float], |
| nasal: bool = False) -> np.ndarray: |
| duration = dur_override or duration |
| n = int(self.sample_rate * duration) |
| t = np.linspace(0, duration, n, dtype=np.float64) |
| |
| phase = 2.0 * np.pi * f0 * t |
| source = np.sin(phase) |
| for h in range(2, 6): |
| source += (0.4 / h) * np.sin(phase * h) |
| source /= 3.0 |
| |
| audio = self._formant_filter(source, vowel, n) |
| if nasal: |
| |
| audio = self._lowpass(audio, 1500) |
| audio += 0.2 * np.sin(2 * np.pi * 250 * t) |
| |
| env = np.ones(n) |
| attack = min(int(0.05 * self.sample_rate), n // 4) |
| release = min(int(0.1 * self.sample_rate), n // 4) |
| if attack > 0: |
| env[:attack] = np.linspace(0, 1, attack) |
| if release > 0: |
| env[-release:] = np.linspace(1, 0.5, release) |
| return audio * env * intensity |
|
|
| |
|
|
| def _formant_filter(self, signal: np.ndarray, vowel: str, n: int) -> np.ndarray: |
| """Apply 3-formant resonator filter for vowel synthesis.""" |
| formants = self.VOWEL_FORMANTS.get(vowel, self.VOWEL_FORMANTS["ah"]) |
| output = np.zeros(n) |
| for fi, (fn, bw) in enumerate(zip( |
| [formants["F1"], formants["F2"], formants["F3"]], |
| formants["bw"] |
| )): |
| r = float(np.exp(-np.pi * bw / self.sample_rate)) |
| a1 = -2 * r * math.cos(2 * math.pi * fn / self.sample_rate) |
| a2 = r * r |
| gain = (1 - r) * math.sqrt(max(0, 1 - 2 * r * math.cos(2 * math.pi * fn / self.sample_rate) + r * r)) |
| filtered = np.zeros(n) |
| for i in range(2, n): |
| filtered[i] = gain * signal[i] - a1 * filtered[i - 1] - a2 * filtered[i - 2] |
| formant_gains = [1.0, 0.6, 0.3] |
| output += filtered * formant_gains[fi] |
| return output |
|
|
| def _lowpass(self, signal: np.ndarray, cutoff_hz: float) -> np.ndarray: |
| """Simple one-pole lowpass filter.""" |
| if len(signal) == 0: |
| return signal |
| rc = 1.0 / (2 * math.pi * cutoff_hz) |
| dt = 1.0 / self.sample_rate |
| alpha = dt / (rc + dt) |
| output = np.zeros_like(signal) |
| output[0] = signal[0] * alpha |
| for i in range(1, len(signal)): |
| output[i] = output[i - 1] + alpha * (signal[i] - output[i - 1]) |
| return output |
|
|
| def _highpass(self, signal: np.ndarray, cutoff_hz: float) -> np.ndarray: |
| """Simple one-pole highpass filter.""" |
| if len(signal) == 0: |
| return signal |
| rc = 1.0 / (2 * math.pi * cutoff_hz) |
| dt = 1.0 / self.sample_rate |
| alpha = rc / (rc + dt) |
| output = np.zeros_like(signal) |
| output[0] = signal[0] |
| for i in range(1, len(signal)): |
| output[i] = alpha * (output[i - 1] + signal[i] - signal[i - 1]) |
| return output |
|
|
| def _bandpass(self, signal: np.ndarray, low_hz: float, high_hz: float) -> np.ndarray: |
| """Bandpass = lowpass + highpass in series.""" |
| return self._highpass(self._lowpass(signal, high_hz), low_hz) |
|
|
|
|
| |
| |
| |
|
|
| class EnergyVAD: |
| """ |
| Energy-based Voice Activity Detection. Detects WHEN speech occurs |
| but not WHAT is said. Used as a fallback when Whisper is unavailable, |
| and as a fast pre-filter even when Whisper is active. |
| """ |
|
|
| def __init__(self, sample_rate: int = 16000, frame_duration_ms: int = 20, |
| energy_threshold: float = 0.005): |
| self.sample_rate = sample_rate |
| self.frame_duration_ms = frame_duration_ms |
| self.frame_size = int(sample_rate * frame_duration_ms / 1000) |
| self.energy_threshold = energy_threshold |
| self._noise_floor = 0.001 |
| self._adaptation_rate = 0.01 |
|
|
| def detect_speech(self, audio: np.ndarray) -> bool: |
| """Return True if the audio frame contains speech.""" |
| if len(audio) == 0: |
| return False |
| |
| if audio.dtype == np.int16: |
| audio = audio.astype(np.float32) / 32768.0 |
| elif audio.dtype == np.int32: |
| audio = audio.astype(np.float32) / 2147483648.0 |
| elif audio.dtype == np.uint8: |
| audio = (audio.astype(np.float32) - 128) / 128.0 |
| elif audio.dtype != np.float32: |
| audio = audio.astype(np.float32) |
| |
| rms = float(np.sqrt(np.mean(audio ** 2))) |
| |
| if rms < self.energy_threshold * 0.5: |
| self._noise_floor = (1 - self._adaptation_rate) * self._noise_floor + self._adaptation_rate * rms |
| |
| threshold = max(self.energy_threshold, self._noise_floor * 3) |
| return rms > threshold |
|
|
| def compute_energy(self, audio: np.ndarray) -> float: |
| if len(audio) == 0: |
| return 0.0 |
| |
| if audio.dtype == np.int16: |
| audio = audio.astype(np.float32) / 32768.0 |
| elif audio.dtype == np.int32: |
| audio = audio.astype(np.float32) / 2147483648.0 |
| elif audio.dtype == np.uint8: |
| audio = (audio.astype(np.float32) - 128) / 128.0 |
| elif audio.dtype != np.float32: |
| audio = audio.astype(np.float32) |
| return float(np.sqrt(np.mean(audio ** 2))) |
|
|
| def detect_pause(self, audio: np.ndarray, min_pause_s: float = 0.3, |
| max_pause_s: float = 0.8) -> Tuple[bool, float]: |
| """ |
| Detect if the audio contains a mid-utterance pause (0.3-0.8s of silence). |
| Returns (is_pause, pause_duration). |
| """ |
| if len(audio) == 0: |
| return False, 0.0 |
| n_frames = len(audio) // self.frame_size |
| if n_frames < 2: |
| return False, 0.0 |
| |
| silence_start = None |
| max_silence = 0.0 |
| for i in range(n_frames): |
| frame = audio[i * self.frame_size:(i + 1) * self.frame_size] |
| is_speech = self.detect_speech(frame) |
| frame_dur = self.frame_duration_ms / 1000.0 |
| if not is_speech: |
| if silence_start is None: |
| silence_start = i * frame_dur |
| current_silence = (i + 1) * frame_dur - silence_start |
| max_silence = max(max_silence, current_silence) |
| else: |
| silence_start = None |
| is_pause = min_pause_s <= max_silence <= max_pause_s |
| return is_pause, max_silence |
|
|
|
|
| class WhisperASR: |
| """ |
| OpenAI Whisper ASR backend. Transcribes user speech to text. |
| Falls back to VAD-only mode if Whisper is not installed. |
| """ |
|
|
| def __init__(self, model_name: str = "base", device: Optional[str] = None): |
| self.model_name = model_name |
| self.mode = ASRMode.WHISPER if WHISPER_AVAILABLE else ASRMode.VAD_ONLY |
| self._model = None |
| self._backend = WHISPER_BACKEND if WHISPER_AVAILABLE else None |
| self.vad = EnergyVAD() |
| if self.mode == ASRMode.WHISPER: |
| try: |
| logger.info("[WhisperASR] loading model '%s' via %s...", model_name, self._backend) |
| if self._backend == "openai-whisper": |
| device = device or ("cuda" if _torch_cuda_available() else "cpu") |
| self._model = whisper.load_model(model_name, device=device) |
| elif self._backend == "faster-whisper": |
| |
| |
| compute_type = "int8" if device != "cuda" else "float16" |
| self._model = WhisperModel(model_name, compute_type=compute_type) |
| logger.info("[WhisperASR] model loaded (backend=%s)", self._backend) |
| except Exception as e: |
| logger.warning("[WhisperASR] failed to load Whisper (%s); falling back to VAD-only", e) |
| self.mode = ASRMode.VAD_ONLY |
| self._model = None |
| else: |
| logger.warning("[WhisperASR] whisper not installed; using VAD-only mode") |
|
|
| def transcribe(self, audio: np.ndarray, sample_rate: int = 16000) -> TranscriptSegment: |
| """ |
| Transcribe audio to text. Returns a TranscriptSegment. |
| In VAD-only mode, text is empty but is_speech/is_non_verbal are still set. |
| """ |
| |
| if audio.dtype != np.float32: |
| audio = audio.astype(np.float32) |
| if audio.size == 0: |
| return TranscriptSegment(text="", start_time=time.time(), |
| end_time=time.time(), confidence=0.0) |
| if self.mode == ASRMode.WHISPER and self._model is not None: |
| try: |
| if self._backend == "openai-whisper": |
| result = self._model.transcribe(audio, fp16=False, language="en") |
| text = result.get("text", "").strip() |
| segments = result.get("segments", []) |
| confidence = float(np.mean([s.get("avg_logprob", -1) for s in segments])) if segments else 0.0 |
| confidence = max(0.0, min(1.0, (confidence + 1.0) / 1.0)) |
| elif self._backend == "faster-whisper": |
| segments_iter, info = self._model.transcribe(audio, language="en", beam_size=1) |
| segments_list = list(segments_iter) |
| text = " ".join(s.text.strip() for s in segments_list).strip() |
| confidence = 0.0 |
| if segments_list: |
| avg_logprob = float(np.mean([s.avg_log_prob for s in segments_list])) |
| confidence = max(0.0, min(1.0, (avg_logprob + 1.0) / 1.0)) |
| return TranscriptSegment( |
| text=text, |
| start_time=time.time() - len(audio) / sample_rate, |
| end_time=time.time(), |
| confidence=confidence, |
| ) |
| except Exception as e: |
| logger.warning("[WhisperASR] transcription failed: %s", e) |
| |
| is_speech = self.vad.detect_speech(audio) |
| return TranscriptSegment( |
| text="" if not is_speech else "[speech detected]", |
| start_time=time.time() - len(audio) / sample_rate, |
| end_time=time.time(), |
| confidence=0.0, |
| ) |
|
|
| def is_backchannel_text(self, text: str) -> bool: |
| """Check if transcribed text is a backchannel ('yeah', 'mm-hmm', etc.).""" |
| if not text: |
| return False |
| text_lower = text.lower().strip().strip(".?!,") |
| backchannel_vocab = { |
| "yeah", "yes", "yep", "yup", "mhm", "mm-hmm", "mm", "hmm", |
| "uh-huh", "right", "sure", "ok", "okay", "i see", "got it", |
| "makes sense", "true", "exactly", "wow", "oh", "ah", |
| } |
| return text_lower in backchannel_vocab |
|
|
|
|
| def _torch_cuda_available() -> bool: |
| """Check if torch + CUDA are available.""" |
| try: |
| import torch |
| return torch.cuda.is_available() |
| except ImportError: |
| return False |
|
|
|
|
| |
| |
| |
|
|
| class ProceduralFormantTTS: |
| """ |
| Fallback TTS using formant synthesis. Produces understandable but |
| robotic speech. Used when Coqui XTTS is not available. |
| """ |
|
|
| SAMPLE_RATE: int = 22050 |
|
|
| |
| PHONEME_MAP: Dict[str, Tuple[str, float, str]] = { |
| "a": ("vowel", 0.10, "ah"), "e": ("vowel", 0.10, "eh"), |
| "i": ("vowel", 0.10, "ee"), "o": ("vowel", 0.10, "oh"), |
| "u": ("vowel", 0.10, "oo"), |
| "b": ("plosive", 0.05, ""), "p": ("plosive", 0.05, ""), |
| "t": ("plosive", 0.05, ""), "d": ("plosive", 0.05, ""), |
| "k": ("plosive", 0.05, ""), "g": ("plosive", 0.05, ""), |
| "s": ("fricative", 0.12, ""), "z": ("fricative", 0.12, ""), |
| "f": ("fricative", 0.10, ""), "v": ("fricative", 0.10, ""), |
| "h": ("fricative", 0.08, ""), |
| "m": ("nasal", 0.08, ""), "n": ("nasal", 0.08, ""), |
| "l": ("approximant", 0.07, ""), "r": ("approximant", 0.07, ""), |
| "w": ("approximant", 0.07, ""), "y": ("approximant", 0.07, ""), |
| } |
|
|
| VOWEL_FORMANTS: Dict[str, Dict] = { |
| "ah": {"F1": 730, "F2": 1090, "F3": 2440, "bw": [60, 80, 120]}, |
| "eh": {"F1": 530, "F2": 1840, "F3": 2480, "bw": [50, 70, 110]}, |
| "ee": {"F1": 270, "F2": 2290, "F3": 3010, "bw": [40, 60, 100]}, |
| "oh": {"F1": 570, "F2": 840, "F3": 2410, "bw": [55, 75, 115]}, |
| "oo": {"F1": 300, "F2": 870, "F3": 2240, "bw": [45, 65, 105]}, |
| } |
|
|
| def __init__(self, sample_rate: int = 22050): |
| self.sample_rate = sample_rate |
| self._nonverbal = ProceduralNonVerbalSynth(sample_rate) |
|
|
| def synthesize(self, text: str, prosody: ProsodyParams) -> np.ndarray: |
| """Synthesize text to speech using formant synthesis.""" |
| if not text.strip(): |
| return np.zeros(0, dtype=np.float32) |
| |
| frames = self._text_to_phonemes(text) |
| if not frames: |
| return np.zeros(0, dtype=np.float32) |
| |
| chunks = [] |
| for ptype, duration, vowel_or_noise in frames: |
| if ptype == "pause": |
| chunks.append(np.zeros(int(self.sample_rate * duration), dtype=np.float32)) |
| elif ptype == "vowel": |
| chunks.append(self._synth_vowel(vowel_or_noise, duration, prosody)) |
| elif ptype == "plosive": |
| chunks.append(self._synth_plosive(duration, prosody)) |
| elif ptype == "fricative": |
| chunks.append(self._synth_fricative(duration, prosody)) |
| elif ptype == "nasal": |
| chunks.append(self._synth_nasal(duration, prosody)) |
| elif ptype == "approximant": |
| chunks.append(self._synth_approximant(duration, prosody)) |
| audio = np.concatenate(chunks) if chunks else np.zeros(0, dtype=np.float32) |
| |
| audio = self._apply_prosody(audio, prosody) |
| return audio.astype(np.float32) |
|
|
| def _text_to_phonemes(self, text: str) -> List[Tuple[str, float, str]]: |
| """Simple grapheme-to-phoneme: one char β one phoneme.""" |
| frames = [] |
| text = text.lower() |
| i = 0 |
| while i < len(text): |
| char = text[i] |
| if char in self.PHONEME_MAP: |
| ptype, dur, vowel = self.PHONEME_MAP[char] |
| frames.append((ptype, dur, vowel)) |
| elif char == " ": |
| frames.append(("pause", 0.08, "")) |
| elif char in ".,!?;:": |
| frames.append(("pause", 0.20, "")) |
| i += 1 |
| return frames |
|
|
| def _synth_vowel(self, vowel: str, duration: float, prosody: ProsodyParams) -> np.ndarray: |
| n = int(self.sample_rate * duration) |
| if n < 2: |
| return np.zeros(max(2, n), dtype=np.float32) |
| t = np.linspace(0, duration, n, dtype=np.float64) |
| |
| f0 = prosody.base_pitch_hz |
| vibrato = prosody.vibrato_depth * np.sin(2 * np.pi * 5.5 * t) |
| phase = 2.0 * np.pi * np.cumsum(f0 + vibrato) / self.sample_rate |
| source = np.sin(phase) |
| for h in range(2, 6): |
| source += (0.4 / h) * np.sin(phase * h) |
| source /= 3.0 |
| |
| formants = self.VOWEL_FORMANTS.get(vowel, self.VOWEL_FORMANTS["ah"]) |
| output = np.zeros(n) |
| for fi, (fn, bw) in enumerate(zip( |
| [formants["F1"], formants["F2"], formants["F3"]], formants["bw"] |
| )): |
| r = float(np.exp(-np.pi * bw / self.sample_rate)) |
| a1 = -2 * r * math.cos(2 * math.pi * fn / self.sample_rate) |
| a2 = r * r |
| gain = (1 - r) * math.sqrt(max(0, 1 - 2 * r * math.cos(2 * math.pi * fn / self.sample_rate) + r * r)) |
| filtered = np.zeros(n) |
| for i in range(2, n): |
| filtered[i] = gain * source[i] - a1 * filtered[i - 1] - a2 * filtered[i - 2] |
| formant_gains = [1.0, 0.6, 0.3] |
| output += filtered * formant_gains[fi] |
| |
| noise = np.random.normal(0, prosody.breathiness, n) |
| output += noise * 0.3 |
| |
| attack = min(int(0.015 * self.sample_rate), n // 4) |
| release = min(int(0.025 * self.sample_rate), n // 4) |
| env = np.ones(n) |
| if attack > 0: |
| env[:attack] = np.linspace(0, 1, attack) |
| if release > 0: |
| env[-release:] = np.linspace(1, 0, release) |
| return (output * env * prosody.energy).astype(np.float32) |
|
|
| def _synth_plosive(self, duration: float, prosody: ProsodyParams) -> np.ndarray: |
| n = int(self.sample_rate * duration) |
| burst_len = max(2, min(int(0.008 * self.sample_rate), n)) |
| audio = np.zeros(n, dtype=np.float32) |
| audio[:burst_len] = np.random.normal(0, 1, burst_len) * np.hanning(burst_len) |
| return audio * prosody.energy * 0.5 |
|
|
| def _synth_fricative(self, duration: float, prosody: ProsodyParams) -> np.ndarray: |
| n = int(self.sample_rate * duration) |
| noise = np.random.normal(0, 1, n) |
| |
| audio = self._bandpass_simple(noise, 3000, 7000) |
| return (audio * prosody.energy * 0.3).astype(np.float32) |
|
|
| def _synth_nasal(self, duration: float, prosody: ProsodyParams) -> np.ndarray: |
| n = int(self.sample_rate * duration) |
| t = np.linspace(0, duration, n, dtype=np.float64) |
| f0 = prosody.base_pitch_hz * 0.8 |
| source = np.sin(2 * np.pi * f0 * t) |
| |
| audio = self._lowpass_simple(source, 1500) |
| return (audio * prosody.energy * 0.4).astype(np.float32) |
|
|
| def _synth_approximant(self, duration: float, prosody: ProsodyParams) -> np.ndarray: |
| n = int(self.sample_rate * duration) |
| t = np.linspace(0, duration, n, dtype=np.float64) |
| f0 = prosody.base_pitch_hz |
| source = np.sin(2 * np.pi * f0 * t) * 0.7 |
| noise = np.random.normal(0, 0.2, n) |
| audio = source + noise * 0.3 |
| return (audio * prosody.energy * 0.35).astype(np.float32) |
|
|
| def _apply_prosody(self, audio: np.ndarray, prosody: ProsodyParams) -> np.ndarray: |
| """Apply global prosody modifications (energy, warmth).""" |
| if len(audio) == 0: |
| return audio |
| |
| if prosody.warmth > 0.5: |
| low_boost = self._lowpass_simple(audio, 800) |
| audio = audio + low_boost * (prosody.warmth - 0.5) * 0.5 |
| |
| if prosody.pitch_variance > 0: |
| n = len(audio) |
| mod = 1.0 + prosody.pitch_variance * 0.05 * np.sin(2 * np.pi * 2.0 * np.arange(n) / self.sample_rate) |
| audio = audio * mod |
| |
| max_val = float(np.max(np.abs(audio))) |
| if max_val > 0: |
| audio = audio / max_val * 0.85 |
| return audio |
|
|
| def _lowpass_simple(self, signal: np.ndarray, cutoff_hz: float) -> np.ndarray: |
| if len(signal) == 0: |
| return signal |
| rc = 1.0 / (2 * math.pi * cutoff_hz) |
| dt = 1.0 / self.sample_rate |
| alpha = dt / (rc + dt) |
| output = np.zeros_like(signal) |
| output[0] = signal[0] * alpha |
| for i in range(1, len(signal)): |
| output[i] = output[i - 1] + alpha * (signal[i] - output[i - 1]) |
| return output |
|
|
| def _bandpass_simple(self, signal: np.ndarray, low_hz: float, high_hz: float) -> np.ndarray: |
| lp = self._lowpass_simple(signal, high_hz) |
| |
| hp = lp - self._lowpass_simple(lp, low_hz) |
| return hp |
|
|
|
|
| class CoquiXTTSBackend: |
| """ |
| Coqui XTTS neural TTS backend. Produces high-quality natural speech |
| with optional voice cloning. Falls back to ProceduralFormantTTS if |
| Coqui is not installed or model loading fails. |
| """ |
|
|
| def __init__(self, model_name: str = "tts_models/multilingual/multi-dataset/xtts_v2", |
| speaker_wav: Optional[str] = None, |
| language: str = "en"): |
| self.model_name = model_name |
| self.speaker_wav = speaker_wav |
| self.language = language |
| self.mode = TTSMode.COQUI_XTTS if COQUI_TTS_AVAILABLE else TTSMode.PROCEDURAL |
| self._model = None |
| self._fallback = ProceduralFormantTTS() |
| if self.mode == TTSMode.COQUI_XTTS: |
| try: |
| logger.info("[CoquiXTTS] loading model '%s'...", model_name) |
| self._model = CoquiTTS(model_name) |
| logger.info("[CoquiXTTS] model loaded") |
| except Exception as e: |
| logger.warning("[CoquiXTTS] failed to load (%s); falling back to procedural", e) |
| self.mode = TTSMode.PROCEDURAL |
| self._model = None |
| else: |
| logger.warning("[CoquiXTTS] TTS package not installed; using procedural formant fallback") |
|
|
| def synthesize(self, text: str, prosody: ProsodyParams) -> np.ndarray: |
| """Synthesize text to speech. Returns float32 audio at 22050 Hz.""" |
| if not text.strip(): |
| return np.zeros(0, dtype=np.float32) |
| if self.mode == TTSMode.COQUI_XTTS and self._model is not None: |
| try: |
| kwargs = { |
| "text": text, |
| "language": self.language, |
| "speaker_wav": self.speaker_wav, |
| } if self.speaker_wav else { |
| "text": text, |
| "language": self.language, |
| "speaker": "Ana NeP", |
| } |
| wav = self._model.tts(**kwargs) |
| audio = np.array(wav, dtype=np.float32) |
| |
| audio = self._apply_prosody(audio, prosody) |
| return audio |
| except Exception as e: |
| logger.warning("[CoquiXTTS] synthesis failed (%s); using fallback for this utterance", e) |
| return self._fallback.synthesize(text, prosody) |
|
|
| def _apply_prosody(self, audio: np.ndarray, prosody: ProsodyParams) -> np.ndarray: |
| """Apply prosody modifications to Coqui output.""" |
| if len(audio) == 0: |
| return audio |
| |
| audio = audio * prosody.energy |
| |
| max_val = float(np.max(np.abs(audio))) |
| if max_val > 0: |
| audio = audio / max_val * 0.9 |
| return audio.astype(np.float32) |
|
|
|
|
| |
| |
| |
|
|
| class InterruptDetector: |
| """ |
| Classifies detected user speech during Nima's turn into: |
| - REAL_INTERRUPT: user is taking the turn (long speech, starts mid-Nima) |
| - NON_VERBAL: laughter/sigh/gasp/etc. β IGNORE (not an interrupt) |
| - BACKCHANNEL: "yeah", "mm-hmm" β IGNORE (not an interrupt) |
| - COLLABORATIVE_TURN_SHARING: user finishing Nima's sentence β IGNORE |
| - SILENCE: no speech detected |
| |
| This is the KEY DIFFERENTIATOR: the system doesn't treat all user |
| speech as an interrupt. Backchannels and non-verbal expressions are |
| natural parts of conversation and should NOT trigger Nima to stop. |
| """ |
|
|
| |
| BACKCHANNEL_VOCAB: Set[str] = { |
| "yeah", "yes", "yep", "yup", "mhm", "mm-hmm", "mm", "hmm", |
| "uh-huh", "right", "sure", "ok", "okay", "i see", "got it", |
| "makes sense", "true", "exactly", "wow", "oh", "ah", "ha", |
| } |
|
|
| |
| BACKCHANNEL_MAX_DURATION: float = 0.8 |
| NON_VERBAL_MAX_DURATION: float = 1.5 |
| REAL_INTERRUPT_MIN_DURATION: float = 1.0 |
|
|
| |
| COLLABORATIVE_WINDOW_S: float = 0.3 |
|
|
| def __init__(self, asr: WhisperASR): |
| self._asr = asr |
| self._vad = asr.vad |
|
|
| def classify(self, audio: np.ndarray, sample_rate: int = 16000, |
| nima_text_progress: float = 1.0, |
| nima_speech_remaining_s: float = 0.0) -> InterruptClassification: |
| """ |
| Classify a segment of user speech detected during Nima's turn. |
| |
| Args: |
| audio: user audio (float32, mono) |
| sample_rate: audio sample rate |
| nima_text_progress: 0.0 = Nima just started, 1.0 = Nima finished |
| nima_speech_remaining_s: seconds left in Nima's current utterance |
| |
| Returns: |
| InterruptClassification with the verdict. |
| """ |
| if len(audio) == 0: |
| return InterruptClassification( |
| interrupt_type=InterruptType.SILENCE, |
| reason="no audio", |
| ) |
|
|
| |
| duration = len(audio) / sample_rate |
| energy = self._vad.compute_energy(audio) |
| spectral = self._compute_spectral_features(audio, sample_rate) |
|
|
| |
| if not self._vad.detect_speech(audio): |
| return InterruptClassification( |
| interrupt_type=InterruptType.SILENCE, |
| reason="below VAD threshold", |
| duration_s=duration, |
| spectral_features=spectral, |
| ) |
|
|
| |
| segment = self._asr.transcribe(audio, sample_rate) |
| transcript = segment.text.strip().lower() |
|
|
| |
|
|
| |
| if duration < self.BACKCHANNEL_MAX_DURATION: |
| if self._asr.is_backchannel_text(transcript) or self._is_backchannel_spectral(spectral): |
| return InterruptClassification( |
| interrupt_type=InterruptType.BACKCHANNEL, |
| confidence=0.85, |
| reason=f"short ({duration:.2f}s) + backchannel vocab/spectral", |
| transcript=transcript, |
| duration_s=duration, |
| spectral_features=spectral, |
| ) |
|
|
| |
| non_verbal_match = self._classify_non_verbal(spectral, duration) |
| if non_verbal_match: |
| return InterruptClassification( |
| interrupt_type=InterruptType.NON_VERBAL, |
| confidence=non_verbal_match[1], |
| reason=f"non-verbal spectral match: {non_verbal_match[0]}", |
| transcript=transcript, |
| duration_s=duration, |
| spectral_features=spectral, |
| ) |
|
|
| |
| if nima_text_progress > 0.7 and nima_speech_remaining_s < self.COLLABORATIVE_WINDOW_S: |
| if duration < 1.5: |
| return InterruptClassification( |
| interrupt_type=InterruptType.COLLABORATIVE_TURN_SHARING, |
| confidence=0.70, |
| reason=f"speech at end of Nima's turn (progress={nima_text_progress:.2f})", |
| transcript=transcript, |
| duration_s=duration, |
| spectral_features=spectral, |
| ) |
|
|
| |
| confidence = min(1.0, duration / 2.0) |
| return InterruptClassification( |
| interrupt_type=InterruptType.REAL_INTERRUPT, |
| confidence=confidence, |
| reason=f"real speech ({duration:.2f}s, progress={nima_text_progress:.2f})", |
| transcript=transcript, |
| duration_s=duration, |
| spectral_features=spectral, |
| ) |
|
|
| def _compute_spectral_features(self, audio: np.ndarray, sr: int) -> Dict[str, float]: |
| """Compute spectral features for non-verbal classification.""" |
| if len(audio) < 256: |
| return {} |
| |
| fft = np.fft.rfft(audio.astype(np.float32)) |
| magnitude = np.abs(fft) |
| freqs = np.fft.rfftfreq(len(audio), 1.0 / sr) |
| |
| if magnitude.sum() > 0: |
| centroid = float(np.sum(freqs * magnitude) / np.sum(magnitude)) |
| else: |
| centroid = 0.0 |
| |
| cumsum = np.cumsum(magnitude) |
| if cumsum[-1] > 0: |
| rolloff_idx = np.searchsorted(cumsum, 0.85 * cumsum[-1]) |
| rolloff = float(freqs[min(rolloff_idx, len(freqs) - 1)]) |
| else: |
| rolloff = 0.0 |
| |
| zcr = float(np.mean(np.abs(np.diff(np.sign(audio))) > 0)) |
| |
| energy = float(np.sqrt(np.mean(audio ** 2))) |
| |
| low_mask = freqs < 500 |
| low_energy = float(np.sum(magnitude[low_mask]) / max(1e-10, np.sum(magnitude))) |
| |
| periodicity = self._estimate_periodicity(audio, sr) |
| |
| |
| |
| frame_size = int(sr * 0.02) |
| n_frames = max(1, len(audio) // frame_size) |
| frame_energies = [] |
| for i in range(n_frames): |
| frame = audio[i * frame_size:(i + 1) * frame_size] |
| if len(frame) > 0: |
| frame_energies.append(float(np.sqrt(np.mean(frame ** 2)))) |
| if len(frame_energies) >= 3: |
| energy_mean = float(np.mean(frame_energies)) |
| energy_std = float(np.std(frame_energies)) |
| |
| energy_cv = energy_std / max(1e-6, energy_mean) |
| else: |
| energy_cv = 0.0 |
| return { |
| "centroid_hz": centroid, |
| "rolloff_hz": rolloff, |
| "zcr": zcr, |
| "energy": energy, |
| "low_freq_ratio": low_energy, |
| "periodicity": periodicity, |
| "duration_s": len(audio) / sr, |
| "energy_cv": energy_cv, |
| } |
|
|
| def _estimate_periodicity(self, audio: np.ndarray, sr: int) -> float: |
| """Estimate periodicity (0=aperiodic/noise, 1=strongly periodic).""" |
| if len(audio) < sr * 0.05: |
| return 0.0 |
| |
| audio_centered = audio - np.mean(audio) |
| if np.std(audio_centered) < 1e-6: |
| return 0.0 |
| autocorr = np.correlate(audio_centered, audio_centered, mode="full") |
| autocorr = autocorr[len(autocorr) // 2:] |
| if autocorr[0] == 0: |
| return 0.0 |
| |
| autocorr = autocorr / autocorr[0] |
| |
| min_lag = int(sr * 0.05) |
| max_lag = int(sr * 0.20) |
| if max_lag >= len(autocorr): |
| return 0.0 |
| region = autocorr[min_lag:max_lag] |
| if len(region) == 0: |
| return 0.0 |
| peak = float(np.max(region)) |
| return max(0.0, min(1.0, peak)) |
|
|
| def _is_backchannel_spectral(self, spectral: Dict[str, float]) -> bool: |
| """Check if spectral features match a backchannel (short, voiced, soft-ish).""" |
| if not spectral: |
| return False |
| energy = spectral.get("energy", 0.0) |
| low_ratio = spectral.get("low_freq_ratio", 0.0) |
| zcr = spectral.get("zcr", 0.5) |
| periodicity = spectral.get("periodicity", 0.0) |
| centroid = spectral.get("centroid_hz", 0.0) |
| duration = spectral.get("duration_s", 1.0) |
| |
| if duration > 0.8: |
| return False |
| is_voiced = periodicity > 0.2 or low_ratio > 0.25 |
| is_smooth = zcr < 0.35 |
| |
| is_low_centroid = centroid < 1500 |
| return is_voiced and is_smooth and is_low_centroid and energy > 0.01 |
|
|
| def _classify_non_verbal(self, spectral: Dict[str, float], |
| duration: float) -> Optional[Tuple[str, float]]: |
| """ |
| Classify non-verbal expression from spectral features. |
| Returns (expression_name, confidence) or None. |
| |
| Key insight: non-verbal expressions have DISTINCTIVE spectral |
| signatures + are typically SHORT (<1.5s). Sustained voiced |
| audio >1.0s with low energy variance is likely real speech, |
| NOT a non-verbal expression β even if periodicity is high. |
| """ |
| if not spectral: |
| return None |
| periodicity = spectral.get("periodicity", 0.0) |
| centroid = spectral.get("centroid_hz", 0.0) |
| energy = spectral.get("energy", 0.0) |
| zcr = spectral.get("zcr", 0.0) |
| low_ratio = spectral.get("low_freq_ratio", 0.0) |
| energy_cv = spectral.get("energy_cv", 0.0) |
|
|
| |
| |
| if duration > 1.0 and energy_cv < 0.3: |
| return None |
|
|
| |
| |
| if energy_cv > 0.3 and periodicity > 0.1 and 0.3 < duration < 2.0 and energy > 0.03: |
| if centroid > 1500: |
| return ("laughter", 0.8) |
| return ("giggle", 0.7) |
|
|
| |
| if periodicity < 0.2 and 0.3 < duration < 1.0 and centroid > 1500 and energy > 0.02: |
| return ("sigh", 0.65) |
|
|
| |
| if duration < 0.35 and centroid > 1500 and energy > 0.03: |
| return ("gasp", 0.75) |
|
|
| |
| if duration > 0.4 and duration < 0.8 and centroid < 1200 and periodicity > 0.2: |
| return ("groan", 0.65) |
|
|
| |
| if 0.5 < duration < 1.0 and 1000 < centroid < 2000 and low_ratio > 0.35: |
| return ("moan", 0.6) |
|
|
| |
| if duration < 0.12 and zcr > 0.3: |
| return ("click", 0.5) |
|
|
| return None |
|
|
|
|
| |
| |
| |
|
|
| class BackchannelController: |
| """ |
| Decides when to emit backchannels (verbal nods + non-verbal expressions) |
| while the user is speaking. |
| |
| Triggers (per user's spec): |
| - ON_PAUSE: user paused 0.3-0.8s mid-utterance β soft verbal nod ("mm-hmm") |
| - ON_EMOTION_SHIFT: user's prosody shifted (arousal spike) β non-verbal reaction |
| |
| The controller also avoids over-backchanneling: minimum 1.5s between |
| any two backchannels. |
| """ |
|
|
| MIN_BACKCHANNEL_INTERVAL_S: float = 1.5 |
| PAUSE_MIN_S: float = 0.3 |
| PAUSE_MAX_S: float = 0.8 |
| AROUSAL_SPIKE_THRESHOLD: float = 0.3 |
|
|
| |
| VERBAL_NODS: List[str] = ["mm-hmm", "yeah", "right", "i see", "mhm", "uh-huh"] |
|
|
| |
| EMOTION_REACTIONS: Dict[str, NonVerbalType] = { |
| "surprise": NonVerbalType.GASp, |
| "joy": NonVerbalType.LAUGHTER, |
| "sadness": NonVerbalType.AWW, |
| "fear": NonVerbalType.GASp, |
| "anger": NonVerbalType.GROAN, |
| "neutral": NonVerbalType.MM, |
| } |
|
|
| def __init__(self, tts: CoquiXTTSBackend, nonverbal_synth: ProceduralNonVerbalSynth, |
| sample_rate: int = 22050): |
| self._tts = tts |
| self._nonverbal = nonverbal_synth |
| self.sample_rate = sample_rate |
| self._last_backchannel_time: float = 0.0 |
| self._last_arousal: float = 0.3 |
| self._arousal_history: Deque[float] = deque(maxlen=10) |
|
|
| def should_backchannel(self, state: ConversationState, |
| audio: Optional[np.ndarray] = None) -> Optional[BackchannelEvent]: |
| """ |
| Check if a backchannel should be emitted based on current state. |
| |
| Returns a BackchannelEvent if one should fire, else None. |
| """ |
| now = time.time() |
| |
| if now - self._last_backchannel_time < self.MIN_BACKCHANNEL_INTERVAL_S: |
| return None |
|
|
| |
| if state.phase != ConversationPhase.USER_SPEAKING: |
| return None |
|
|
| |
| if audio is not None and len(audio) > 0: |
| vad = EnergyVAD(sample_rate=16000) |
| is_pause, pause_dur = vad.detect_pause(audio, self.PAUSE_MIN_S, self.PAUSE_MAX_S) |
| if is_pause: |
| nod_text = random.choice(self.VERBAL_NODS) |
| audio_out = self._tts.synthesize(nod_text, ProsodyParams( |
| base_pitch_hz=160, energy=0.4, warmth=0.8, breathiness=0.2, |
| )) |
| event = BackchannelEvent( |
| trigger=BackchannelTrigger.ON_PAUSE, |
| audio=audio_out, |
| is_verbal=True, |
| label=nod_text, |
| ) |
| self._last_backchannel_time = now |
| logger.debug("[Backchannel] ON_PAUSE nod: '%s' (pause=%.2fs)", nod_text, pause_dur) |
| return event |
|
|
| |
| current_arousal = state.user_emotion_arousal |
| self._arousal_history.append(current_arousal) |
| if len(self._arousal_history) >= 3: |
| baseline = float(np.mean(list(self._arousal_history)[:-2])) |
| shift = current_arousal - baseline |
| if shift > self.AROUSAL_SPIKE_THRESHOLD: |
| |
| emotion = self._classify_emotion_shift( |
| state.user_emotion_valence, current_arousal |
| ) |
| expr_type = self.EMOTION_REACTIONS.get(emotion, NonVerbalType.MM) |
| audio_out = self._nonverbal.synth(expr_type, intensity=0.6) |
| event = BackchannelEvent( |
| trigger=BackchannelTrigger.ON_EMOTION_SHIFT, |
| audio=audio_out, |
| sample_rate=self._nonverbal.sample_rate, |
| is_verbal=False, |
| label=expr_type.value, |
| ) |
| self._last_backchannel_time = now |
| logger.debug("[Backchannel] ON_EMOTION_SHIFT: %s (arousal %.2fβ%.2f)", |
| expr_type.value, baseline, current_arousal) |
| return event |
|
|
| return None |
|
|
| def _classify_emotion_shift(self, valence: float, arousal: float) -> str: |
| """Classify the emotion from valence + arousal.""" |
| if arousal > 0.7 and valence > 0.3: |
| return "joy" |
| if arousal > 0.7 and valence < -0.3: |
| return "anger" |
| if arousal > 0.6 and valence < -0.2: |
| return "fear" |
| if arousal > 0.6: |
| return "surprise" |
| if valence < -0.3: |
| return "sadness" |
| return "neutral" |
|
|
|
|
| |
| |
| |
|
|
| class InterruptionResponse: |
| """ |
| Generates context-dependent responses when a real interrupt is detected. |
| Instead of just stopping, Nima says one of: |
| - "I'm sorry, were you saying something?" (early in Nima's utterance) |
| - "Sorry, please go ahead." (mid/late in Nima's utterance) |
| |
| The response is chosen based on: |
| - How far into the utterance the interrupt occurred |
| - Whether the user's speech seems urgent (high arousal) |
| - Conversation history (don't apologize every time) |
| """ |
|
|
| EARLY_RESPONSES: List[str] = [ |
| "I'm sorry, were you saying something?", |
| "Oh, sorry β please, go ahead.", |
| "My apologies, you were saying?", |
| ] |
|
|
| LATE_RESPONSES: List[str] = [ |
| "Sorry, please go ahead.", |
| "Go right ahead β I can wait.", |
| "Of course, after you.", |
| ] |
|
|
| URGENT_RESPONSES: List[str] = [ |
| "Of course, go ahead.", |
| "Please, go on.", |
| "I'm listening β go ahead.", |
| ] |
|
|
| |
| COOLDOWN_S: float = 30.0 |
|
|
| def __init__(self): |
| self._last_response_time: float = 0.0 |
| self._response_count: int = 0 |
|
|
| def should_respond(self, classification: InterruptClassification) -> bool: |
| """Check if an interruption response should be emitted.""" |
| if classification.interrupt_type != InterruptType.REAL_INTERRUPT: |
| return False |
| |
| now = time.time() |
| if now - self._last_response_time < self.COOLDOWN_S: |
| return False |
| return True |
|
|
| def generate_response(self, classification: InterruptClassification, |
| nima_text_progress: float, |
| user_arousal: float = 0.3) -> str: |
| """ |
| Generate the appropriate interruption response text. |
| |
| Args: |
| classification: the interrupt classification |
| nima_text_progress: 0.0 = Nima just started, 1.0 = Nima almost done |
| user_arousal: detected arousal level of the user's interrupt |
| |
| Returns: |
| Response text string. |
| """ |
| self._last_response_time = time.time() |
| self._response_count += 1 |
|
|
| |
| if user_arousal > 0.7: |
| return random.choice(self.URGENT_RESPONSES) |
|
|
| |
| if nima_text_progress < 0.3: |
| return random.choice(self.EARLY_RESPONSES) |
|
|
| |
| return random.choice(self.LATE_RESPONSES) |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
|
|
| class AdaptiveProsodyShaper: |
| """ |
| Dynamically adjusts pitch, rhythm, and timbre based on emotional state |
| or context. Softer tone when empathetic, brighter when excited. |
| |
| Maps an emotional context (valence + arousal + empathy_level) to |
| concrete prosody modifications applied on top of the base ProsodyParams. |
| """ |
|
|
| |
| EMOTION_PROFILES: Dict[str, Dict[str, float]] = { |
| "empathetic": {"pitch_mult": 0.92, "rate_mult": 0.88, "warmth_add": 0.20, "breathiness_add": 0.08, "energy_mult": 0.85}, |
| "excited": {"pitch_mult": 1.18, "rate_mult": 1.12, "warmth_add": 0.05, "breathiness_add": -0.03, "energy_mult": 1.25}, |
| "contemplative": {"pitch_mult": 0.96, "rate_mult": 0.82, "warmth_add": 0.10, "breathiness_add": 0.05, "energy_mult": 0.90}, |
| "concerned": {"pitch_mult": 0.88, "rate_mult": 0.90, "warmth_add": 0.15, "breathiness_add": 0.10, "energy_mult": 0.80}, |
| "joyful": {"pitch_mult": 1.10, "rate_mult": 1.08, "warmth_add": 0.12, "breathiness_add": -0.02, "energy_mult": 1.15}, |
| "vulnerable": {"pitch_mult": 0.85, "rate_mult": 0.85, "warmth_add": 0.25, "breathiness_add": 0.15, "energy_mult": 0.70}, |
| "assertive": {"pitch_mult": 0.98, "rate_mult": 1.05, "warmth_add": -0.05, "breathiness_add": -0.05, "energy_mult": 1.20}, |
| } |
|
|
| def shape(self, base_prosody: ProsodyParams, |
| emotion: str = "neutral", |
| valence: float = 0.0, |
| arousal: float = 0.3, |
| empathy_level: float = 0.5) -> ProsodyParams: |
| """ |
| Apply adaptive shaping to base prosody. |
| |
| Args: |
| base_prosody: the starting prosody params |
| emotion: emotion label (empathetic, excited, contemplative, etc.) |
| valence: [-1, 1] emotional valence |
| arousal: [0, 1] emotional arousal |
| empathy_level: [0, 1] how empathetic the response should be |
| |
| Returns: |
| New ProsodyParams with adaptive modifications applied. |
| """ |
| |
| shaped = ProsodyParams( |
| base_pitch_hz=base_prosody.base_pitch_hz, |
| speech_rate_wpm=base_prosody.speech_rate_wpm, |
| energy=base_prosody.energy, |
| breathiness=base_prosody.breathiness, |
| warmth=base_prosody.warmth, |
| vibrato_depth=base_prosody.vibrato_depth, |
| pitch_variance=base_prosody.pitch_variance, |
| emotional_tone=emotion, |
| ) |
|
|
| |
| profile = self.EMOTION_PROFILES.get(emotion, {}) |
| if profile: |
| shaped.base_pitch_hz *= profile.get("pitch_mult", 1.0) |
| shaped.speech_rate_wpm *= profile.get("rate_mult", 1.0) |
| shaped.energy *= profile.get("energy_mult", 1.0) |
| shaped.warmth = float(min(1.0, max(0.0, shaped.warmth + profile.get("warmth_add", 0.0)))) |
| shaped.breathiness = float(min(0.5, max(0.0, shaped.breathiness + profile.get("breathiness_add", 0.0)))) |
|
|
| |
| shaped.pitch_variance = float(min(0.4, max(0.05, 0.15 + valence * 0.10))) |
|
|
| |
| shaped.energy = float(min(1.0, shaped.energy * (0.7 + arousal * 0.6))) |
| shaped.speech_rate_wpm *= (0.9 + arousal * 0.3) |
|
|
| |
| if empathy_level > 0.5: |
| empathy_boost = (empathy_level - 0.5) * 2.0 |
| shaped.warmth = float(min(1.0, shaped.warmth + 0.15 * empathy_boost)) |
| shaped.breathiness = float(min(0.5, shaped.breathiness + 0.05 * empathy_boost)) |
| shaped.base_pitch_hz *= (1.0 - 0.03 * empathy_boost) |
|
|
| return shaped |
|
|
|
|
| class MicroIntonationInjector: |
| """ |
| Adds tiny hesitations, breaths, and emphasis shifts that signal |
| thoughtfulness or uncertainty. These make speech feel alive. |
| |
| Injects micro-events at sentence boundaries and before key words: |
| - "..." hesitation (50-150ms pause + subtle pitch drop) |
| - inhale breath (80ms) |
| - emphasis shift (pitch bump on the emphasized word) |
| """ |
|
|
| |
| EMPHASIS_WORDS: Set[str] = { |
| "really", "truly", "actually", "honestly", "important", |
| "never", "always", "exactly", "absolutely", "indeed", |
| } |
|
|
| |
| HESITATIONS: List[str] = ["...", "um", "hmm", "well"] |
|
|
| def __init__(self, sample_rate: int = 22050): |
| self.sample_rate = sample_rate |
| self._breath_synth = ProceduralNonVerbalSynth(sample_rate) |
|
|
| def inject(self, text: str, prosody: ProsodyParams, |
| thoughtfulness: float = 0.3, |
| uncertainty: float = 0.2) -> Tuple[str, List[Dict[str, Any]]]: |
| """ |
| Analyze text and inject micro-intonation events. |
| |
| Args: |
| text: the input text |
| prosody: current prosody params |
| thoughtfulness: [0, 1] how thoughtful/reflective (more hesitations) |
| uncertainty: [0, 1] how uncertain (more fillers + pitch drops) |
| |
| Returns: |
| (modified_text, events) where events is a list of dicts: |
| {"type": "hesitation"|"breath"|"emphasis", "position": float, "audio": np.ndarray} |
| """ |
| modified = text |
| events: List[Dict[str, Any]] = [] |
|
|
| |
| if thoughtfulness > 0.4 and random.random() < thoughtfulness: |
| hesitation = random.choice(self.HESITATIONS[:2]) |
| modified = f"{hesitation} {modified}" |
| events.append({ |
| "type": "hesitation", |
| "position": 0.0, |
| "duration_s": 0.1 + thoughtfulness * 0.15, |
| "audio": self._gen_hesitation_audio(0.1 + thoughtfulness * 0.15, prosody), |
| }) |
|
|
| |
| if thoughtfulness > 0.3: |
| breath_chance = thoughtfulness * 0.6 |
| words = modified.split() |
| new_words = [] |
| for i, word in enumerate(words): |
| new_words.append(word) |
| if word.endswith(",") or word.endswith("."): |
| if random.random() < breath_chance: |
| events.append({ |
| "type": "breath", |
| "position": (i + 1) / len(words), |
| "duration_s": 0.08, |
| "audio": self._breath_synth.synth(NonVerbalType.SIGH, intensity=0.2), |
| }) |
| modified = " ".join(new_words) |
|
|
| |
| words = modified.split() |
| for i, word in enumerate(words): |
| clean = word.lower().strip(".,!?;:") |
| if clean in self.EMPHASIS_WORDS: |
| events.append({ |
| "type": "emphasis", |
| "position": i / max(1, len(words)), |
| "word": word, |
| "pitch_bump": 30.0, |
| }) |
|
|
| |
| if uncertainty > 0.5: |
| events.append({ |
| "type": "uncertainty_drop", |
| "position": 1.0, |
| "pitch_drop": 20.0 * uncertainty, |
| }) |
|
|
| return modified, events |
|
|
| def _gen_hesitation_audio(self, duration: float, prosody: ProsodyParams) -> np.ndarray: |
| """Generate a subtle hesitation sound (low 'um' or breath).""" |
| n = int(self.sample_rate * duration) |
| t = np.linspace(0, duration, n, dtype=np.float64) |
| |
| f0 = prosody.base_pitch_hz * 0.7 |
| source = np.sin(2 * np.pi * f0 * t) * 0.3 |
| |
| env = np.ones(n) |
| attack = min(int(0.03 * self.sample_rate), n // 3) |
| release = min(int(0.05 * self.sample_rate), n // 3) |
| if attack > 0: |
| env[:attack] = np.linspace(0, 1, attack) |
| if release > 0: |
| env[-release:] = np.linspace(1, 0, release) |
| return (source * env * 0.3).astype(np.float32) |
|
|
|
|
| class TurnTakingPredictor: |
| """ |
| Predicts when the user is about to finish speaking, so the system |
| can smoothly take the floor instead of waiting for silence. |
| |
| Uses a combination of: |
| - Speech rate deceleration (users slow down at turn ends) |
| - Pitch declination (pitch drops at sentence ends) |
| - Pause lengthening (longer pauses near turn end) |
| - Filler detection ("you know", "so yeah") |
| """ |
|
|
| |
| TURN_END_FILLERS: Set[str] = { |
| "you know", "so yeah", "i think", "something like that", |
| "that's about it", "yeah", "right", "anyway", |
| } |
|
|
| def __init__(self): |
| self._speech_rate_history: Deque[float] = deque(maxlen=10) |
| self._pitch_history: Deque[float] = deque(maxlen=10) |
| self._pause_history: Deque[float] = deque(maxlen=5) |
|
|
| def update(self, speech_rate: float, pitch: float, pause_duration: float): |
| """Update the predictor with recent observations.""" |
| self._speech_rate_history.append(speech_rate) |
| self._pitch_history.append(pitch) |
| self._pause_history.append(pause_duration) |
|
|
| def predict_turn_end_probability(self, transcript: str = "") -> float: |
| """ |
| Predict the probability [0, 1] that the user is about to finish. |
| """ |
| prob = 0.0 |
| |
| if len(self._speech_rate_history) >= 3: |
| recent = list(self._speech_rate_history)[-3:] |
| if recent[2] < recent[0] * 0.8: |
| prob += 0.3 |
| |
| if len(self._pitch_history) >= 3: |
| recent = list(self._pitch_history)[-3:] |
| if recent[2] < recent[0] * 0.9: |
| prob += 0.25 |
| |
| if len(self._pause_history) >= 2: |
| recent = list(self._pause_history)[-2:] |
| if recent[1] > 0.5: |
| prob += 0.2 |
| |
| if transcript: |
| tl = transcript.lower() |
| for filler in self.TURN_END_FILLERS: |
| if filler in tl: |
| prob += 0.25 |
| break |
| return float(min(1.0, prob)) |
|
|
| def should_take_floor(self, transcript: str = "") -> bool: |
| """Returns True if the system should start speaking now.""" |
| return self.predict_turn_end_probability(transcript) > 0.6 |
|
|
|
|
| |
|
|
| class AffectiveMirror: |
| """ |
| Matches the user's emotional tone (calm, energetic, concerned) with |
| subtle vocal adjustments. The voice subtly reflects the user's state |
| without mimicking it overtly. |
| |
| Mapping: |
| - User calm β Nima slightly slower, warmer |
| - User energetic β Nima slightly faster, brighter |
| - User concerned β Nima softer, lower pitch |
| - User joyful β Nima lighter, more pitch variance |
| """ |
|
|
| def mirror(self, user_valence: float, user_arousal: float, |
| base_prosody: ProsodyParams) -> Tuple[ProsodyParams, str]: |
| """ |
| Mirror the user's emotional state in the voice. |
| |
| Returns: |
| (mirrored_prosody, emotion_label) |
| """ |
| mirrored = ProsodyParams( |
| base_pitch_hz=base_prosody.base_pitch_hz, |
| speech_rate_wpm=base_prosody.speech_rate_wpm, |
| energy=base_prosody.energy, |
| breathiness=base_prosody.breathiness, |
| warmth=base_prosody.warmth, |
| vibrato_depth=base_prosody.vibrato_depth, |
| pitch_variance=base_prosody.pitch_variance, |
| ) |
|
|
| |
| if user_arousal < 0.3 and abs(user_valence) < 0.3: |
| emotion = "calm" |
| mirrored.speech_rate_wpm *= 0.95 |
| mirrored.warmth = float(min(1.0, mirrored.warmth + 0.05)) |
| elif user_arousal > 0.6 and user_valence > 0.3: |
| emotion = "energetic" |
| mirrored.speech_rate_wpm *= 1.08 |
| mirrored.base_pitch_hz *= 1.05 |
| mirrored.energy = float(min(1.0, mirrored.energy * 1.1)) |
| elif user_valence < -0.3: |
| emotion = "concerned" |
| mirrored.base_pitch_hz *= 0.95 |
| mirrored.breathiness = float(min(0.3, mirrored.breathiness + 0.05)) |
| mirrored.warmth = float(min(1.0, mirrored.warmth + 0.10)) |
| elif user_valence > 0.4: |
| emotion = "joyful" |
| mirrored.pitch_variance = float(min(0.35, mirrored.pitch_variance + 0.08)) |
| mirrored.base_pitch_hz *= 1.03 |
| else: |
| emotion = "neutral" |
|
|
| return mirrored, emotion |
|
|
|
|
| class SomaticFeedbackIntegrator: |
| """ |
| Ties voice modulation to system "strain" or "energy" states. |
| Like biological fatigue signals β when the system is under strain, |
| the voice becomes slightly slower, breathier, lower-pitched. |
| |
| Reads NIMA's phenomenological_strain and allostatic_load to modulate |
| the voice. This makes the voice itself a signal of the system's |
| internal state. |
| """ |
|
|
| def __init__(self): |
| self._current_strain: float = 0.0 |
| self._current_energy: float = 1.0 |
| self._allostatic_load: float = 0.0 |
|
|
| def update_from_nima(self, strain: float, allostatic_load: float = 0.0): |
| """Update the somatic state from NIMA's metrics.""" |
| self._current_strain = float(max(0.0, min(2.0, strain))) |
| self._allostatic_load = float(max(0.0, min(1.0, allostatic_load))) |
| |
| self._current_energy = float(max(0.3, 1.0 - 0.3 * self._current_strain - 0.2 * self._allostatic_load)) |
|
|
| def apply_somatic_modulation(self, prosody: ProsodyParams) -> ProsodyParams: |
| """Apply fatigue/strain modulation to prosody.""" |
| if self._current_strain < 0.1 and self._allostatic_load < 0.1: |
| return prosody |
| modulated = ProsodyParams( |
| base_pitch_hz=prosody.base_pitch_hz, |
| speech_rate_wpm=prosody.speech_rate_wpm, |
| energy=prosody.energy, |
| breathiness=prosody.breathiness, |
| warmth=prosody.warmth, |
| vibrato_depth=prosody.vibrato_depth, |
| pitch_variance=prosody.pitch_variance, |
| emotional_tone=prosody.emotional_tone, |
| ) |
| |
| strain_factor = min(1.0, self._current_strain) |
| modulated.base_pitch_hz *= (1.0 - 0.05 * strain_factor) |
| modulated.speech_rate_wpm *= (1.0 - 0.10 * strain_factor) |
| modulated.breathiness = float(min(0.4, modulated.breathiness + 0.08 * strain_factor)) |
| |
| modulated.energy *= (1.0 - 0.15 * self._allostatic_load) |
| modulated.warmth = float(min(1.0, modulated.warmth + 0.05 * self._allostatic_load)) |
| return modulated |
|
|
| @property |
| def strain(self) -> float: |
| return self._current_strain |
|
|
| @property |
| def energy(self) -> float: |
| return self._current_energy |
|
|
|
|
| class EmpathyPhraseGenerator: |
| """ |
| Generates short contextual empathy inserts instead of generic nods. |
| Instead of "mm-hmm", generates "That must feel tough" or "I get what you mean." |
| |
| Selects the phrase based on the user's emotional state + topic keywords. |
| """ |
|
|
| |
| EMPATHY_PHRASES: Dict[str, List[str]] = { |
| "sadness": [ |
| "That sounds really hard.", |
| "I can hear how much this weighs on you.", |
| "That must feel tough.", |
| "I'm sorry you're going through this.", |
| ], |
| "joy": [ |
| "That's wonderful to hear.", |
| "I can feel your excitement.", |
| "That sounds amazing.", |
| "I love that for you.", |
| ], |
| "anger": [ |
| "That sounds frustrating.", |
| "I can see why that would upset you.", |
| "That would make me angry too.", |
| "You have every right to feel that way.", |
| ], |
| "fear": [ |
| "That sounds scary.", |
| "I can understand why you'd be worried.", |
| "It makes sense that you're concerned.", |
| "That's a lot to sit with.", |
| ], |
| "surprise": [ |
| "Oh wow.", |
| "That's unexpected.", |
| "I didn't see that coming either.", |
| "Hmm, that's something.", |
| ], |
| "neutral": [ |
| "I hear you.", |
| "I get what you mean.", |
| "That makes sense.", |
| "I'm following you.", |
| "Go on, I'm listening.", |
| ], |
| } |
|
|
| def generate(self, user_emotion: str = "neutral", |
| user_valence: float = 0.0, |
| user_arousal: float = 0.3) -> str: |
| """Generate a contextual empathy phrase.""" |
| |
| if user_emotion == "neutral": |
| if user_valence < -0.3 and user_arousal > 0.5: |
| user_emotion = "anger" |
| elif user_valence < -0.3: |
| user_emotion = "sadness" |
| elif user_valence > 0.4 and user_arousal > 0.6: |
| user_emotion = "joy" |
| elif user_arousal > 0.6: |
| user_emotion = "surprise" |
| phrases = self.EMPATHY_PHRASES.get(user_emotion, self.EMPATHY_PHRASES["neutral"]) |
| return random.choice(phrases) |
|
|
|
|
| |
|
|
| @dataclass |
| class VoiceEvent: |
| """An episodic voice event stored in MemPalace.""" |
| event_id: str = field(default_factory=lambda: f"ve_{uuid.uuid4().hex[:12]}") |
| timestamp: float = field(default_factory=time.time) |
| speaker: str = "nima" |
| text: str = "" |
| audio_duration_s: float = 0.0 |
| prosody_snapshot: Dict[str, float] = field(default_factory=dict) |
| emotion: str = "neutral" |
| valence: float = 0.0 |
| arousal: float = 0.3 |
| strain: float = 0.0 |
| conversation_phase: str = "nima_speaking" |
| interrupt_count: int = 0 |
| backchannel_count: int = 0 |
|
|
|
|
| class VoiceEventMemoryBridge: |
| """ |
| Stores every utterance as an episodic voice event with affective tags. |
| Later, the system recalls not just what was said but how it was said. |
| |
| This bridge connects OmniVoice to NIMA's MemoryPalace. Each voice |
| event is stored as an Episode with the speaker, text, prosody, and |
| affective state β enabling later recall of vocal quality, not just |
| content. |
| """ |
|
|
| def __init__(self, palace: Any = None): |
| """ |
| Args: |
| palace: a NIMA MemoryPalace instance. If None, voice events |
| are stored in an in-memory list (no persistence). |
| """ |
| self._palace = palace |
| self._local_events: Deque[VoiceEvent] = deque(maxlen=500) |
| self._event_count = 0 |
|
|
| def store_voice_event(self, event: VoiceEvent) -> str: |
| """Store a voice event in MemPalace (if available) + local buffer.""" |
| self._local_events.append(event) |
| self._event_count += 1 |
| |
| if self._palace is not None: |
| try: |
| self._palace.store_episode( |
| processor_name=f"voice_{event.speaker}", |
| sensory_intensity=event.arousal, |
| affective_weight=abs(event.valence) * 0.5 + event.arousal * 0.5, |
| score=event.strain, |
| valence=event.valence, |
| arousal=event.arousal, |
| novelty=0.3, |
| input_text=event.text[:500], |
| content={ |
| "speaker": event.speaker, |
| "audio_duration_s": event.audio_duration_s, |
| "prosody_snapshot": event.prosody_snapshot, |
| "emotion": event.emotion, |
| "conversation_phase": event.conversation_phase, |
| "interrupt_count": event.interrupt_count, |
| "backchannel_count": event.backchannel_count, |
| "event_type": "voice_event", |
| }, |
| ) |
| except Exception as e: |
| logger.warning("[VoiceEventMemoryBridge] MemPalace store failed: %s", e) |
| return event.event_id |
|
|
| def recall_voice_events(self, speaker: Optional[str] = None, |
| emotion: Optional[str] = None, |
| limit: int = 5) -> List[VoiceEvent]: |
| """Recall recent voice events, optionally filtered.""" |
| results = list(self._local_events) |
| if speaker: |
| results = [e for e in results if e.speaker == speaker] |
| if emotion: |
| results = [e for e in results if e.emotion == emotion] |
| return results[-limit:] |
|
|
| def get_stats(self) -> Dict[str, Any]: |
| return { |
| "total_events": self._event_count, |
| "buffered_events": len(self._local_events), |
| "palace_connected": self._palace is not None, |
| } |
|
|
|
|
| class NarrativeContinuityEngine: |
| """ |
| References past conversations naturally. The voice stream can say |
| "As you mentioned yesterday, you sounded excited about..." because |
| it recalls the episodic voice events with their affective tags. |
| |
| Generates narrative continuity phrases by querying VoiceEventMemoryBridge |
| for past events that match the current context. |
| """ |
|
|
| |
| CONTINUITY_TEMPLATES: List[str] = [ |
| "Earlier you mentioned {topic}. You sounded {emotion} about it.", |
| "As you said before, {topic}. I remember how {emotion} you were.", |
| "Going back to what you said about {topic} β you seemed {emotion}.", |
| "I was thinking about what you said earlier, about {topic}.", |
| "You mentioned {topic} earlier. That stayed with me.", |
| ] |
|
|
| def __init__(self, memory_bridge: VoiceEventMemoryBridge): |
| self._memory = memory_bridge |
|
|
| def generate_continuity_phrase(self, current_topic: str = "", |
| current_emotion: str = "neutral") -> Optional[str]: |
| """ |
| Generate a natural continuity phrase referencing a past voice event. |
| Returns None if no suitable past event exists. |
| """ |
| past_events = self._memory.recall_voice_events( |
| speaker="user", limit=10 |
| ) |
| if not past_events: |
| return None |
| |
| candidate = None |
| for event in reversed(past_events[:-1]): |
| if event.text and len(event.text) > 10: |
| candidate = event |
| break |
| if candidate is None: |
| return None |
| |
| topic = self._extract_topic(candidate.text) |
| emotion_word = self._emotion_to_word(candidate.emotion, candidate.valence) |
| template = random.choice(self.CONTINUITY_TEMPLATES) |
| return template.format(topic=topic, emotion=emotion_word) |
|
|
| def _extract_topic(self, text: str) -> str: |
| """Extract a short topic phrase from past text.""" |
| words = text.split() |
| if len(words) <= 5: |
| return text |
| |
| start = max(0, len(words) // 2 - 2) |
| end = min(len(words), start + 5) |
| fragment = " ".join(words[start:end]).strip(".,!?") |
| return fragment |
|
|
| def _emotion_to_word(self, emotion: str, valence: float) -> str: |
| """Map emotion label to a descriptive word.""" |
| mapping = { |
| "joy": "excited" if valence > 0.5 else "positive", |
| "sadness": "down" if valence < -0.3 else "thoughtful", |
| "anger": "frustrated", |
| "fear": "worried", |
| "surprise": "surprised", |
| "neutral": "engaged" if valence > 0 else "reflective", |
| } |
| return mapping.get(emotion, "engaged") |
|
|
|
|
| |
|
|
| class SingingInterjectionModule: |
| """ |
| Short melodic phrases (humming, tonal affirmations) woven into speech. |
| These add a distinctive, near-human musicality to the voice. |
| |
| Interjection types: |
| - affirmation_hum: a rising "mm-mm" confirming what was said |
| - thinking_hum: a contemplative "hmmm" while processing |
| - transition_tone: a brief melodic bridge between topics |
| - warmth_chord: a soft harmonic when expressing empathy |
| """ |
|
|
| def __init__(self, sample_rate: int = 22050): |
| self.sample_rate = sample_rate |
| self._nonverbal = ProceduralNonVerbalSynth(sample_rate) |
|
|
| def synth_affirmation_hum(self, duration: float = 0.4) -> np.ndarray: |
| """A rising 'mm-mm' that affirms what was said.""" |
| n = int(self.sample_rate * duration) |
| t = np.linspace(0, duration, n, dtype=np.float64) |
| |
| f0 = 120.0 + 60.0 * (t / duration) |
| phase = 2.0 * np.pi * np.cumsum(f0) / self.sample_rate |
| source = np.sin(phase) * 0.5 |
| |
| audio = self._nonverbal._lowpass(source, 1500) |
| |
| env = np.ones(n) |
| attack = min(int(0.05 * self.sample_rate), n // 4) |
| release = min(int(0.08 * self.sample_rate), n // 4) |
| if attack > 0: |
| env[:attack] = np.linspace(0, 1, attack) |
| if release > 0: |
| env[-release:] = np.linspace(1, 0.3, release) |
| return (audio * env * 0.4).astype(np.float32) |
|
|
| def synth_thinking_hum(self, duration: float = 0.6) -> np.ndarray: |
| """A contemplative 'hmmm' while processing.""" |
| n = int(self.sample_rate * duration) |
| t = np.linspace(0, duration, n, dtype=np.float64) |
| |
| f0 = 140.0 + 10.0 * np.sin(2 * np.pi * 3.0 * t) |
| phase = 2.0 * np.pi * np.cumsum(f0) / self.sample_rate |
| source = np.sin(phase) * 0.4 |
| audio = self._nonverbal._lowpass(source, 1200) |
| env = np.ones(n) |
| attack = min(int(0.08 * self.sample_rate), n // 4) |
| release = min(int(0.12 * self.sample_rate), n // 4) |
| if attack > 0: |
| env[:attack] = np.linspace(0, 1, attack) |
| if release > 0: |
| env[-release:] = np.linspace(1, 0.4, release) |
| return (audio * env * 0.35).astype(np.float32) |
|
|
| def synth_transition_tone(self, duration: float = 0.5) -> np.ndarray: |
| """A brief melodic bridge between topics.""" |
| n = int(self.sample_rate * duration) |
| t = np.linspace(0, duration, n, dtype=np.float64) |
| |
| notes = [220, 261, 293, 329] |
| note_duration = duration / len(notes) |
| audio = np.zeros(n) |
| for i, freq in enumerate(notes): |
| start = int(i * note_duration * self.sample_rate) |
| end = min(n, int((i + 1) * note_duration * self.sample_rate)) |
| note_t = t[:end - start] |
| note_phase = 2 * np.pi * freq * note_t |
| note_audio = np.sin(note_phase) * 0.3 |
| |
| note_len = end - start |
| note_attack = min(int(0.02 * self.sample_rate), note_len // 3) |
| note_env = np.ones(note_len) |
| if note_attack > 0: |
| note_env[:note_attack] = np.linspace(0, 1, note_attack) |
| note_env[-min(int(0.02 * self.sample_rate), note_len // 3):] *= np.linspace(1, 0.3, min(int(0.02 * self.sample_rate), note_len // 3)) |
| audio[start:end] = note_audio * note_env |
| return (audio * 0.3).astype(np.float32) |
|
|
| def synth_warmth_chord(self, duration: float = 0.8) -> np.ndarray: |
| """A soft harmonic chord when expressing empathy.""" |
| n = int(self.sample_rate * duration) |
| t = np.linspace(0, duration, n, dtype=np.float64) |
| |
| chord = (np.sin(2 * np.pi * 130 * t) + |
| 0.7 * np.sin(2 * np.pi * 165 * t) + |
| 0.5 * np.sin(2 * np.pi * 196 * t)) / 2.2 |
| audio = self._nonverbal._lowpass(chord, 800) |
| env = np.ones(n) |
| attack = min(int(0.15 * self.sample_rate), n // 3) |
| release = min(int(0.25 * self.sample_rate), n // 3) |
| if attack > 0: |
| env[:attack] = np.linspace(0, 1, attack) |
| if release > 0: |
| env[-release:] = np.linspace(1, 0.2, release) |
| return (audio * env * 0.25).astype(np.float32) |
|
|
|
|
| @dataclass |
| class MultimodalCue: |
| """A non-audio cue paired with a voice event.""" |
| cue_type: str |
| intensity: float = 0.5 |
| duration_s: float = 0.3 |
| pattern: str = "pulse" |
| timestamp: float = field(default_factory=time.time) |
|
|
|
|
| class MultimodalCueEmitter: |
| """ |
| Pairs voice with subtle haptic or visual signals. |
| Example: a soft vibration or light pulse when nodding. |
| |
| This module emits cue events that an external system (robotics, |
| display, haptic actuator) can consume. It doesn't produce audio |
| itself β it produces cue metadata synchronized to voice events. |
| """ |
|
|
| def __init__(self): |
| self._cue_history: Deque[MultimodalCue] = deque(maxlen=100) |
| self._cue_callback: Optional[Callable[[MultimodalCue], None]] = None |
|
|
| def set_callback(self, callback: Callable[[MultimodalCue], None]): |
| """Set a callback to receive cues in real-time.""" |
| self._cue_callback = callback |
|
|
| def emit_for_backchannel(self, is_verbal: bool, intensity: float = 0.5): |
| """Emit a cue when a backchannel is emitted.""" |
| cue = MultimodalCue( |
| cue_type="haptic", |
| intensity=0.3 + intensity * 0.3, |
| duration_s=0.2, |
| pattern="pulse", |
| ) |
| self._emit(cue) |
|
|
| def emit_for_empathy(self, emotion: str = "neutral"): |
| """Emit a cue when an empathy phrase is spoken.""" |
| intensity = 0.4 if emotion in ("sadness", "fear") else 0.3 |
| cue = MultimodalCue( |
| cue_type="light", |
| intensity=intensity, |
| duration_s=0.5, |
| pattern="wave", |
| ) |
| self._emit(cue) |
|
|
| def emit_for_laughter(self, intensity: float = 0.7): |
| """Emit a cue when laughter is emitted.""" |
| cue = MultimodalCue( |
| cue_type="haptic", |
| intensity=0.4 + intensity * 0.4, |
| duration_s=0.3, |
| pattern="pulse", |
| ) |
| self._emit(cue) |
|
|
| def _emit(self, cue: MultimodalCue): |
| self._cue_history.append(cue) |
| if self._cue_callback: |
| try: |
| self._cue_callback(cue) |
| except Exception as e: |
| logger.warning("[MultimodalCueEmitter] callback failed: %s", e) |
|
|
| def get_recent_cues(self, n: int = 10) -> List[MultimodalCue]: |
| return list(self._cue_history)[-n:] |
|
|
|
|
| class DynamicLaughterSynth: |
| """ |
| Procedural laughter that adapts to intensity. |
| Chuckle (low intensity) β full laugh (high intensity). |
| |
| Instead of fixed samples, scales: |
| - Number of "ha" bursts |
| - Pitch (higher for chuckle, lower for full laugh) |
| - Energy |
| - Breathiness |
| """ |
|
|
| def __init__(self, sample_rate: int = 22050): |
| self.sample_rate = sample_rate |
| self._nonverbal = ProceduralNonVerbalSynth(sample_rate) |
|
|
| def synth(self, intensity: float = 0.5, |
| duration: Optional[float] = None) -> np.ndarray: |
| """ |
| Synthesize adaptive laughter. |
| |
| Args: |
| intensity: [0, 1] 0.2 = chuckle, 0.5 = normal laugh, 0.9 = full laugh |
| duration: override duration (auto-computed if None) |
| |
| Returns: |
| Laughter audio (float32). |
| """ |
| intensity = float(max(0.1, min(1.0, intensity))) |
|
|
| |
| if intensity < 0.3: |
| |
| n_has = random.randint(2, 3) |
| ha_period = 0.12 |
| pitch = 240 + random.uniform(-20, 20) |
| energy = 0.4 |
| elif intensity < 0.6: |
| |
| n_has = random.randint(4, 6) |
| ha_period = 0.10 |
| pitch = 180 + random.uniform(-15, 15) |
| energy = 0.6 |
| else: |
| |
| n_has = random.randint(6, 9) |
| ha_period = 0.09 |
| pitch = 150 + random.uniform(-10, 10) |
| energy = 0.8 |
|
|
| total_dur = duration or (n_has * ha_period * 1.3) |
| chunks = [] |
| for i in range(n_has): |
| ha = self._gen_ha(ha_period * 0.7, energy, pitch) |
| gap = np.zeros(int(self.sample_rate * ha_period * 0.3)) |
| |
| decay = 1.0 - 0.2 * (i / max(1, n_has - 1)) |
| chunks.append(ha * decay) |
| chunks.append(gap) |
| |
| if intensity > 0.5: |
| breath = self._nonverbal.synth(NonVerbalType.SIGH, intensity=0.3) |
| chunks.append(breath[:int(self.sample_rate * 0.3)]) |
| audio = np.concatenate(chunks) if chunks else np.zeros(0) |
| |
| max_val = float(np.max(np.abs(audio))) if len(audio) > 0 else 0.0 |
| if max_val > 0: |
| audio = audio / max_val * 0.7 * intensity |
| return audio.astype(np.float32) |
|
|
| def _gen_ha(self, duration: float, intensity: float, pitch: float) -> np.ndarray: |
| """Generate a single 'ha' burst.""" |
| n = int(self.sample_rate * duration) |
| if n < 2: |
| return np.zeros(max(2, n), dtype=np.float32) |
| t = np.linspace(0, duration, n, dtype=np.float64) |
| |
| phase = 2.0 * np.pi * pitch * t |
| source = np.sin(phase) |
| for h in range(2, 5): |
| source += (0.4 / h) * np.sin(phase * h) |
| source /= 3.0 |
| |
| noise = np.random.normal(0, 0.3, n) |
| mixed = source * 0.6 + noise * 0.4 |
| mixed = self._nonverbal._bandpass(mixed, 400, 3000) |
| |
| env = np.ones(n) |
| attack = min(int(0.01 * self.sample_rate), n // 4) |
| release = min(int(0.04 * self.sample_rate), n // 4) |
| if attack > 0: |
| env[:attack] = np.linspace(0, 1, attack) |
| if release > 0: |
| env[-release:] = np.linspace(1, 0, release) |
| return (mixed * env * intensity).astype(np.float32) |
|
|
|
|
| |
|
|
| class ContextAwareApologyGenerator: |
| """ |
| Differentiates between casual and serious interruptions. |
| |
| Casual: "Sorry, please go ahead" |
| Serious: "I didn't mean to cut you off, please continue" |
| |
| Determines seriousness from: |
| - How far into the utterance the interrupt occurred (early = more serious) |
| - User's arousal (high = more serious) |
| - Frequency of interrupts (repeated = more serious) |
| """ |
|
|
| CASUAL_RESPONSES: List[str] = [ |
| "Sorry, please go ahead.", |
| "Go right ahead.", |
| "After you.", |
| "Of course β go on.", |
| ] |
|
|
| SERIOUS_RESPONSES: List[str] = [ |
| "I'm sorry, I didn't mean to cut you off. Please continue.", |
| "My apologies β please, go ahead, I'm listening.", |
| "I'm sorry, were you saying something? Please, continue.", |
| "Forgive me β I didn't mean to interrupt. What were you saying?", |
| ] |
|
|
| URGENT_RESPONSES: List[str] = [ |
| "Of course, go ahead.", |
| "Please, go on.", |
| "I'm listening.", |
| ] |
|
|
| COOLDOWN_S: float = 15.0 |
|
|
| def __init__(self): |
| self._last_response_time: float = 0.0 |
| self._interrupt_history: Deque[float] = deque(maxlen=10) |
|
|
| def generate(self, nima_text_progress: float, user_arousal: float = 0.3, |
| interrupt_count: int = 0) -> str: |
| """Generate a context-appropriate apology.""" |
| now = time.time() |
| self._interrupt_history.append(now) |
|
|
| |
| recent = sum(1 for t in self._interrupt_history if now - t < 60.0) |
|
|
| |
| is_serious = ( |
| nima_text_progress < 0.2 or |
| user_arousal > 0.7 or |
| recent > 2 |
| ) |
| is_urgent = user_arousal > 0.8 |
|
|
| self._last_response_time = now |
|
|
| if is_urgent: |
| return random.choice(self.URGENT_RESPONSES) |
| elif is_serious: |
| return random.choice(self.SERIOUS_RESPONSES) |
| else: |
| return random.choice(self.CASUAL_RESPONSES) |
|
|
| def should_respond(self, interrupt_type: InterruptType) -> bool: |
| """Check if an apology should be emitted.""" |
| if interrupt_type != InterruptType.REAL_INTERRUPT: |
| return False |
| now = time.time() |
| if now - self._last_response_time < self.COOLDOWN_S: |
| return False |
| return True |
|
|
|
|
| class NonBlockingContinuationManager: |
| """ |
| Keeps the voice stream flowing even after acknowledging an interrupt. |
| Instead of stopping entirely, the system: |
| 1. Pauses briefly (200ms) |
| 2. Speaks the apology ("Sorry, please go ahead") |
| 3. Yields the floor but remains ready to resume |
| |
| This makes the interaction feel conversational rather than mechanical. |
| """ |
|
|
| PAUSE_BEFORE_APOLOGY_S: float = 0.2 |
| RESUME_THRESHOLD_S: float = 1.5 |
|
|
| def __init__(self): |
| self._is_paused: bool = False |
| self._pause_start: float = 0.0 |
| self._deferred_text: str = "" |
| self._deferred_position: int = 0 |
|
|
| def yield_floor(self, deferred_text: str, position: int): |
| """Yield the floor but remember where to resume from.""" |
| self._is_paused = True |
| self._pause_start = time.time() |
| self._deferred_text = deferred_text |
| self._deferred_position = position |
|
|
| def should_resume(self, user_speaking: bool) -> bool: |
| """Check if the system should resume its deferred utterance.""" |
| if not self._is_paused: |
| return False |
| |
| if not user_speaking: |
| elapsed = time.time() - self._pause_start |
| if elapsed > self.RESUME_THRESHOLD_S: |
| self._is_paused = False |
| return True |
| return False |
|
|
| def get_resume_text(self) -> Optional[str]: |
| """Get the text to resume (from the deferred position).""" |
| if not self._deferred_text: |
| return None |
| remaining = self._deferred_text[self._deferred_position:] |
| |
| if remaining: |
| return f"As I was saying, {remaining.lower().lstrip()}" |
| return None |
|
|
| @property |
| def is_paused(self) -> bool: |
| return self._is_paused |
|
|
|
|
| |
| |
| |
|
|
| class OmniVoiceEngine: |
| """ |
| The main OmniVoice engine. Orchestrates ASR, TTS, non-verbal synthesis, |
| backchannel emission, and interrupt handling into a unified real-time |
| voice conversation system. |
| |
| Usage: |
| engine = OmniVoiceEngine() |
| async for audio_chunk in engine.stream("Hello, how are you?"): |
| play(audio_chunk) |
| """ |
|
|
| def __init__(self, |
| whisper_model: str = "base", |
| coqui_model: str = "tts_models/multilingual/multi-dataset/xtts_v2", |
| speaker_wav: Optional[str] = None, |
| language: str = "en", |
| sample_rate: int = 22050, |
| palace: Any = None): |
| logger.info("[OmniVoice] initializing v%s...", OMNIVOICE_VERSION) |
|
|
| self.sample_rate = sample_rate |
|
|
| |
| self.asr = WhisperASR(model_name=whisper_model) |
| self.tts = CoquiXTTSBackend(model_name=coqui_model, speaker_wav=speaker_wav, |
| language=language) |
| self.nonverbal = ProceduralNonVerbalSynth(sample_rate=sample_rate) |
| self.backchannel = BackchannelController(self.tts, self.nonverbal, sample_rate) |
| self.interrupt_detector = InterruptDetector(self.asr) |
| self.interrupt_response = InterruptionResponse() |
|
|
| |
| self.prosody_shaper = AdaptiveProsodyShaper() |
| self.micro_intonation = MicroIntonationInjector(sample_rate) |
| self.turn_predictor = TurnTakingPredictor() |
| self.affective_mirror = AffectiveMirror() |
| self.somatic_integrator = SomaticFeedbackIntegrator() |
| self.empathy_generator = EmpathyPhraseGenerator() |
| self.voice_memory = VoiceEventMemoryBridge(palace=palace) |
| self.narrative_engine = NarrativeContinuityEngine(self.voice_memory) |
| self.singing = SingingInterjectionModule(sample_rate) |
| self.multimodal = MultimodalCueEmitter() |
| self.dynamic_laughter = DynamicLaughterSynth(sample_rate) |
| self.apology_generator = ContextAwareApologyGenerator() |
| self.continuation_manager = NonBlockingContinuationManager() |
|
|
| |
| self.state = ConversationState() |
| self._nima_audio_queue: Deque[np.ndarray] = deque() |
| self._user_audio_buffer: List[np.ndarray] = [] |
| self._lock = threading.Lock() |
|
|
| logger.info("[OmniVoice] ready (ASR=%s, TTS=%s)", |
| self.asr.mode.value, self.tts.mode.value) |
|
|
| def update_prosody_from_nima(self, snapshot: Any) -> ProsodyParams: |
| """ |
| Update prosody parameters from a NIMA ConsciousnessSnapshot. |
| This is the NIMA integration point β when NIMA is ready, pass its |
| snapshot here to drive voice prosody in real-time. |
| """ |
| prosody = ProsodyParams() |
| if snapshot is None: |
| return prosody |
| try: |
| |
| if hasattr(snapshot, "phi") and snapshot.phi: |
| prosody.energy = float(max(0.3, min(1.0, 0.5 + snapshot.phi.phi_composite * 0.5))) |
| |
| if hasattr(snapshot, "rho") and snapshot.rho: |
| prosody.warmth = float(max(0.2, min(1.0, snapshot.rho.integrity))) |
| |
| if hasattr(snapshot, "emotion") and snapshot.emotion: |
| prosody.base_pitch_hz = 180.0 + (snapshot.emotion.arousal - 0.3) * 60.0 |
| prosody.emotional_tone = getattr(snapshot.emotion, "label", "neutral") |
| if snapshot.emotion.valence < -0.3: |
| prosody.pitch_variance = 0.08 |
| elif snapshot.emotion.valence > 0.3: |
| prosody.pitch_variance = 0.25 |
| |
| if hasattr(snapshot, "qualia") and snapshot.qualia: |
| prosody.breathiness = float(max(0.05, 0.3 - snapshot.qualia.authenticity_index * 0.25)) |
| except Exception as e: |
| logger.warning("[OmniVoice] NIMA snapshot mapping failed: %s", e) |
| return prosody |
|
|
| async def stream(self, text: str, |
| prosody: Optional[ProsodyParams] = None, |
| user_audio_stream: Optional[AsyncGenerator[np.ndarray, None]] = None, |
| ) -> AsyncGenerator[np.ndarray, None]: |
| """ |
| Stream synthesized speech for `text`, yielding audio chunks. |
| If `user_audio_stream` is provided, simultaneously monitors for |
| interrupts and emits backchannels. |
| |
| Args: |
| text: text to synthesize |
| prosody: prosody parameters (if None, uses defaults) |
| user_audio_stream: async generator of user audio frames |
| (for real-time interrupt detection + backchanneling) |
| |
| Yields: |
| Audio chunks (float32 numpy arrays at self.sample_rate Hz). |
| """ |
| prosody = prosody or ProsodyParams() |
| self.state.phase = ConversationPhase.NIMA_SPEAKING |
| self.state.nima_speech_start = time.time() |
| self.state.current_text = text |
| self.state.current_text_position = 0.0 |
|
|
| |
| full_audio = self.tts.synthesize(text, prosody) |
| if len(full_audio) == 0: |
| self.state.phase = ConversationPhase.IDLE |
| return |
|
|
| total_duration = len(full_audio) / self.sample_rate |
| chunk_size = int(self.sample_rate * 0.05) |
| chunks_yielded = 0 |
| total_chunks = max(1, len(full_audio) // chunk_size) |
|
|
| |
| if user_audio_stream is None: |
| for i in range(0, len(full_audio), chunk_size): |
| chunk = full_audio[i:i + chunk_size] |
| self.state.current_text_position = min(1.0, (i + chunk_size) / len(full_audio)) |
| yield chunk |
| self.state.phase = ConversationPhase.IDLE |
| return |
|
|
| |
| user_audio_task = asyncio.create_task(self._collect_user_audio(user_audio_stream)) |
| try: |
| for i in range(0, len(full_audio), chunk_size): |
| chunk = full_audio[i:i + chunk_size] |
| chunks_yielded += 1 |
| self.state.current_text_position = min(1.0, chunks_yielded / total_chunks) |
| self.state.nima_speech_duration = time.time() - self.state.nima_speech_start |
|
|
| |
| |
| |
| |
|
|
| |
| remaining_s = (len(full_audio) - i) / self.sample_rate |
| interrupt = self._check_for_interrupt(remaining_s) |
| if interrupt and self.interrupt_response.should_respond(interrupt): |
| |
| response_text = self.interrupt_response.generate_response( |
| interrupt, self.state.current_text_position, |
| self.state.user_emotion_arousal, |
| ) |
| response_audio = self.tts.synthesize(response_text, ProsodyParams( |
| base_pitch_hz=200, energy=0.6, warmth=0.8, |
| )) |
| yield chunk |
| |
| for j in range(0, len(response_audio), chunk_size): |
| yield response_audio[j:j + chunk_size] |
| self.state.phase = ConversationPhase.YIELDING |
| self.state.interrupt_count += 1 |
| logger.info("[OmniVoice] interrupted at %.0f%%: '%s'", |
| self.state.current_text_position * 100, response_text) |
| return |
|
|
| yield chunk |
|
|
| |
| self.state.phase = ConversationPhase.IDLE |
| finally: |
| user_audio_task.cancel() |
| try: |
| await user_audio_task |
| except asyncio.CancelledError: |
| pass |
|
|
| async def _collect_user_audio(self, stream: AsyncGenerator[np.ndarray, None]): |
| """Background task: collect user audio for interrupt detection.""" |
| try: |
| async for frame in stream: |
| with self._lock: |
| self._user_audio_buffer.append(frame) |
| |
| max_samples = 16000 * 2 |
| total = sum(len(f) for f in self._user_audio_buffer) |
| while total > max_samples and self._user_audio_buffer: |
| removed = self._user_audio_buffer.pop(0) |
| total -= len(removed) |
| except asyncio.CancelledError: |
| pass |
|
|
| def _check_for_interrupt(self, remaining_s: float) -> Optional[InterruptClassification]: |
| """Check if there's an interrupt in the buffered user audio.""" |
| with self._lock: |
| if not self._user_audio_buffer: |
| return None |
| audio = np.concatenate(self._user_audio_buffer[-5:]) |
| self._user_audio_buffer.clear() |
| if len(audio) < 1600: |
| return None |
| classification = self.interrupt_detector.classify( |
| audio, sample_rate=16000, |
| nima_text_progress=self.state.current_text_position, |
| nima_speech_remaining_s=remaining_s, |
| ) |
| if classification.interrupt_type == InterruptType.REAL_INTERRUPT: |
| return classification |
| |
| if classification.interrupt_type != InterruptType.SILENCE: |
| logger.debug("[OmniVoice] ignored %s: %s", |
| classification.interrupt_type.value, classification.reason) |
| return None |
|
|
| def emit_backchannel(self, user_audio: np.ndarray) -> Optional[BackchannelEvent]: |
| """ |
| Check if a backchannel should be emitted while the user is speaking. |
| Call this with recent user audio frames. |
| |
| Returns a BackchannelEvent if one should fire, else None. |
| """ |
| return self.backchannel.should_backchannel(self.state, user_audio) |
|
|
| def synth_non_verbal(self, expr_type: NonVerbalType, intensity: float = 0.7) -> np.ndarray: |
| """Synthesize a non-verbal expression directly.""" |
| return self.nonverbal.synth(expr_type, intensity) |
|
|
| def get_stats(self) -> Dict[str, Any]: |
| return { |
| "version": OMNIVOICE_VERSION, |
| "asr_mode": self.asr.mode.value, |
| "tts_mode": self.tts.mode.value, |
| "sample_rate": self.sample_rate, |
| "conversation_state": { |
| "phase": self.state.phase.value, |
| "interrupt_count": self.state.interrupt_count, |
| "backchannel_count": self.state.backchannel_count, |
| }, |
| |
| "v2_modules": { |
| "prosody_shaper": "active", |
| "micro_intonation": "active", |
| "turn_predictor": "active", |
| "affective_mirror": "active", |
| "somatic_integrator": { |
| "strain": self.somatic_integrator.strain, |
| "energy": self.somatic_integrator.energy, |
| }, |
| "empathy_generator": "active", |
| "voice_memory": self.voice_memory.get_stats(), |
| "narrative_engine": "active", |
| "singing_interjections": "active", |
| "multimodal_cues": len(self.multimodal.get_recent_cues(1000)), |
| "dynamic_laughter": "active", |
| "apology_generator": "active", |
| "continuation_manager": { |
| "is_paused": self.continuation_manager.is_paused, |
| }, |
| }, |
| } |
|
|
|
|
| |
| |
| |
|
|
| class NimaVoiceAdapter: |
| """ |
| Bridges NIMA's ConsciousnessSnapshot β OmniVoice prosody params. |
| Also bridges NIMA's CTM tournament + MemoryPalace episodes β voice context. |
| |
| v2.0.0: Now integrates ALL "mind through voice" modules: |
| - AdaptiveProsodyShaper (emotion β prosody dynamics) |
| - AffectiveMirror (mirrors user's emotional tone) |
| - SomaticFeedbackIntegrator (strain β voice fatigue) |
| - VoiceEventMemoryBridge (stores voice events in MemPalace) |
| - NarrativeContinuityEngine (references past conversations) |
| |
| Usage: |
| adapter = NimaVoiceAdapter(engine) |
| prosody = adapter.snapshot_to_prosody(nima_snapshot) |
| async for chunk in engine.stream(text, prosody=prosody): |
| ... |
| |
| Full NIMA + CTM + MemPalace integration: |
| # After NIMA's process_stimulus(): |
| adapter.update_from_snapshot(snapshot) |
| adapter.update_from_ctm_winner(ctm_winner) |
| adapter.update_somatic_from_nima(snapshot.phi, snapshot.rho) |
| prosody = adapter.get_contextual_prosody() |
| # After speaking: |
| adapter.store_voice_event(text, prosody, duration_s) |
| """ |
|
|
| def __init__(self, engine: OmniVoiceEngine): |
| self._engine = engine |
| self._last_snapshot: Any = None |
| self._last_ctm_winner: Optional[Dict[str, Any]] = None |
| self._last_episode_context: Optional[Dict[str, Any]] = None |
| self._user_emotion: str = "neutral" |
| self._user_valence: float = 0.0 |
| self._user_arousal: float = 0.3 |
|
|
| def update_from_snapshot(self, snapshot: Any) -> ProsodyParams: |
| """Update engine prosody from a NIMA ConsciousnessSnapshot.""" |
| self._last_snapshot = snapshot |
| |
| if snapshot and hasattr(snapshot, "emotion") and snapshot.emotion: |
| self._user_valence = float(getattr(snapshot.emotion, "valence", 0.0)) |
| self._user_arousal = float(getattr(snapshot.emotion, "arousal", 0.3)) |
| self._user_emotion = getattr(snapshot.emotion, "label", "neutral") |
| return self._engine.update_prosody_from_nima(snapshot) |
|
|
| def update_from_ctm_winner(self, ctm_winner: Optional[Dict[str, Any]]) -> None: |
| """ |
| Update engine context from a CTM tournament winner. |
| The winning processor's character influences voice style: |
| - memory_palace β warmer, more nostalgic |
| - somatic_registry β more emotionally resonant |
| - wernicke β clearer, more articulate |
| - broca β faster, more fluent |
| """ |
| if ctm_winner is None: |
| self._last_ctm_winner = None |
| return |
| self._last_ctm_winner = ctm_winner |
| logger.debug("[NimaVoiceAdapter] CTM winner: %s (score=%.3f)", |
| ctm_winner.get("processor_name", "?"), |
| ctm_winner.get("score", 0.0)) |
|
|
| def update_somatic_from_nima(self, phi: Any, rho: Any) -> None: |
| """ |
| Update the somatic feedback integrator from NIMA's phi + rho. |
| Ties voice modulation to system strain (biological fatigue signals). |
| """ |
| strain = 0.0 |
| allostatic = 0.0 |
| if phi and hasattr(phi, "phenomenological_strain"): |
| strain = float(phi.phenomenological_strain) |
| |
| if rho and hasattr(rho, "dissonance"): |
| allostatic = float(rho.dissonance) |
| self._engine.somatic_integrator.update_from_nima(strain, allostatic) |
|
|
| def update_from_episode(self, episode: Optional[Dict[str, Any]]) -> None: |
| """ |
| Update engine context from a MemoryPalace episode. |
| If the episode has high strain or negative valence, the voice |
| should reflect that (lower pitch, more breathiness). |
| """ |
| if episode is None: |
| self._last_episode_context = None |
| return |
| self._last_episode_context = episode |
| logger.debug("[NimaVoiceAdapter] episode context updated: valence=%.2f", |
| episode.get("valence", 0.0)) |
|
|
| def get_contextual_prosody(self) -> ProsodyParams: |
| """ |
| Get prosody params that reflect NIMA state + CTM winner + episode |
| context + somatic feedback + affective mirroring + adaptive shaping. |
| |
| This is the FULL v2.0.0 integration β all modules contribute. |
| """ |
| |
| prosody = self._engine.update_prosody_from_nima(self._last_snapshot) |
|
|
| |
| prosody, mirror_emotion = self._engine.affective_mirror.mirror( |
| self._user_valence, self._user_arousal, prosody |
| ) |
|
|
| |
| empathy_level = 0.5 |
| if self._user_valence < -0.3: |
| empathy_level = 0.8 |
| emotion_for_shaping = self._user_emotion if self._user_emotion != "neutral" else mirror_emotion |
| prosody = self._engine.prosody_shaper.shape( |
| prosody, emotion=emotion_for_shaping, |
| valence=self._user_valence, arousal=self._user_arousal, |
| empathy_level=empathy_level, |
| ) |
|
|
| |
| prosody = self._engine.somatic_integrator.apply_somatic_modulation(prosody) |
|
|
| |
| if self._last_ctm_winner: |
| processor = self._last_ctm_winner.get("processor_name", "") |
| if processor == "memory_palace": |
| prosody.warmth = float(min(1.0, prosody.warmth + 0.10)) |
| prosody.speech_rate_wpm *= 0.95 |
| elif processor == "somatic_registry": |
| prosody.breathiness = float(min(0.3, prosody.breathiness + 0.05)) |
| prosody.pitch_variance = float(min(0.35, prosody.pitch_variance + 0.05)) |
| elif processor == "wernicke": |
| prosody.speech_rate_wpm *= 1.05 |
| elif processor == "broca": |
| prosody.speech_rate_wpm *= 1.08 |
|
|
| |
| if self._last_episode_context: |
| ep = self._last_episode_context |
| strain = ep.get("score", 0.0) |
| if strain > 0.5: |
| prosody.base_pitch_hz -= 10.0 |
| prosody.breathiness = float(min(0.4, prosody.breathiness + 0.05)) |
| if ep.get("valence", 0.0) < -0.3: |
| prosody.warmth = float(min(1.0, prosody.warmth + 0.1)) |
| prosody.speech_rate_wpm -= 10.0 |
|
|
| return prosody |
|
|
| def store_voice_event(self, text: str, prosody: ProsodyParams, |
| duration_s: float, speaker: str = "nima") -> str: |
| """ |
| Store a voice event in MemPalace with full affective tags. |
| Call this after each utterance to build episodic voice memory. |
| """ |
| event = VoiceEvent( |
| speaker=speaker, |
| text=text, |
| audio_duration_s=duration_s, |
| prosody_snapshot={ |
| "pitch_hz": prosody.base_pitch_hz, |
| "rate_wpm": prosody.speech_rate_wpm, |
| "energy": prosody.energy, |
| "warmth": prosody.warmth, |
| "breathiness": prosody.breathiness, |
| }, |
| emotion=prosody.emotional_tone, |
| valence=self._user_valence, |
| arousal=self._user_arousal, |
| strain=self._engine.somatic_integrator.strain, |
| conversation_phase=self._engine.state.phase.value, |
| interrupt_count=self._engine.state.interrupt_count, |
| backchannel_count=self._engine.state.backchannel_count, |
| ) |
| return self._engine.voice_memory.store_voice_event(event) |
|
|
| def get_narrative_continuity(self, current_topic: str = "") -> Optional[str]: |
| """ |
| Generate a narrative continuity phrase referencing a past voice event. |
| Returns None if no suitable past event exists. |
| """ |
| return self._engine.narrative_engine.generate_continuity_phrase(current_topic) |
|
|
| def get_empathy_phrase(self) -> str: |
| """Generate a contextual empathy phrase based on current user state.""" |
| return self._engine.empathy_generator.generate( |
| self._user_emotion, self._user_valence, self._user_arousal |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def save_wav(audio: np.ndarray, path: str, sample_rate: int = 22050) -> str: |
| """Save audio array to a WAV file.""" |
| audio_int16 = np.clip(audio * 32767, -32768, 32767).astype(np.int16) |
| with wave.open(path, "wb") as wf: |
| wf.setnchannels(1) |
| wf.setsampwidth(2) |
| wf.setframerate(sample_rate) |
| wf.writeframes(audio_int16.tobytes()) |
| return path |
|
|
|
|
| def load_wav(path: str) -> Tuple[np.ndarray, int]: |
| """Load a WAV file into a float32 numpy array.""" |
| with wave.open(path, "rb") as wf: |
| n_channels = wf.getnchannels() |
| sampwidth = wf.getsampwidth() |
| sample_rate = wf.getframerate() |
| frames = wf.readframes(wf.getnframes()) |
| if sampwidth == 2: |
| audio = np.frombuffer(frames, dtype=np.int16).astype(np.float32) / 32768.0 |
| elif sampwidth == 1: |
| audio = (np.frombuffer(frames, dtype=np.uint8).astype(np.float32) - 128) / 128.0 |
| else: |
| raise ValueError(f"Unsupported sample width: {sampwidth}") |
| if n_channels > 1: |
| audio = audio[::n_channels] |
| return audio, sample_rate |
|
|
|
|
| async def demo(): |
| """OmniVoice Engine demo.""" |
| print("\n" + "=" * 70) |
| print(f" OmniVoice Engine v{OMNIVOICE_VERSION} β Demo") |
| print("=" * 70 + "\n") |
|
|
| engine = OmniVoiceEngine() |
| print(f"ASR mode: {engine.asr.mode.value}") |
| print(f"TTS mode: {engine.tts.mode.value}") |
| print() |
|
|
| |
| print("[Test 1] Basic speech synthesis...") |
| prosody = ProsodyParams(base_pitch_hz=180, energy=0.8, warmth=0.7) |
| audio = engine.tts.synthesize("Hello, I am OmniVoice. Nice to meet you.", prosody) |
| print(f" Audio: {len(audio)} samples, {len(audio)/engine.sample_rate:.2f}s") |
| save_wav(audio, "/home/z/my-project/download/omnivoice_test1_speech.wav", engine.sample_rate) |
| print(f" Saved: omnivoice_test1_speech.wav") |
| print() |
|
|
| |
| print("[Test 2] Non-verbal expressions...") |
| for expr in [NonVerbalType.LAUGHTER, NonVerbalType.SIGH, NonVerbalType.GASp, |
| NonVerbalType.GROAN, NonVerbalType.AWW, NonVerbalType.MM]: |
| audio = engine.synth_non_verbal(expr, intensity=0.7) |
| print(f" {expr.value:12s}: {len(audio)} samples, {len(audio)/engine.sample_rate:.2f}s") |
| |
| laugh = engine.synth_non_verbal(NonVerbalType.LAUGHTER) |
| save_wav(laugh, "/home/z/my-project/download/omnivoice_test2_laughter.wav", engine.sample_rate) |
| print(f" Saved: omnivoice_test2_laughter.wav") |
| print() |
|
|
| |
| print("[Test 3] Streaming speech...") |
| chunks = [] |
| async for chunk in engine.stream("This is a streaming test of the OmniVoice engine.", prosody=prosody): |
| chunks.append(chunk) |
| full = np.concatenate(chunks) |
| print(f" Streamed {len(chunks)} chunks, total {len(full)} samples, {len(full)/engine.sample_rate:.2f}s") |
| save_wav(full, "/home/z/my-project/download/omnivoice_test3_stream.wav", engine.sample_rate) |
| print(f" Saved: omnivoice_test3_stream.wav") |
| print() |
|
|
| |
| print("[Test 4] Interrupt classification...") |
| |
| test_cases = [ |
| ("Backchannel 'yeah'", engine.synth_non_verbal(NonVerbalType.MM, 0.3)[:int(16000*0.4)]), |
| ("Laughter", engine.synth_non_verbal(NonVerbalType.LAUGHTER, 0.7)[:int(16000*0.8)]), |
| ("Sigh", engine.synth_non_verbal(NonVerbalType.SIGH, 0.6)[:int(16000*0.5)]), |
| ] |
| for name, audio in test_cases: |
| cls = engine.interrupt_detector.classify(audio, sample_rate=16000) |
| print(f" {name:25s} β {cls.interrupt_type.value} (conf={cls.confidence:.2f}, reason='{cls.reason}')") |
| print() |
|
|
| |
| print("[Test 5] Interruption responses...") |
| for progress in [0.1, 0.5, 0.9]: |
| fake_interrupt = InterruptClassification( |
| interrupt_type=InterruptType.REAL_INTERRUPT, |
| confidence=0.8, |
| duration_s=1.5, |
| ) |
| response = engine.interrupt_response.generate_response( |
| fake_interrupt, nima_text_progress=progress, user_arousal=0.4, |
| ) |
| print(f" Progress {progress:.0%}: '{response}'") |
| print() |
|
|
| |
| print("[Test 6] NIMA voice adapter...") |
| adapter = NimaVoiceAdapter(engine) |
| prosody = adapter.get_contextual_prosody() |
| print(f" Default prosody: pitch={prosody.base_pitch_hz:.0f}Hz, energy={prosody.energy:.2f}, warmth={prosody.warmth:.2f}") |
| |
| adapter.update_from_episode({"valence": -0.5, "score": 0.7, "processor_name": "somatic_registry"}) |
| prosody2 = adapter.get_contextual_prosody() |
| print(f" With episode (val=-0.5, strain=0.7): pitch={prosody2.base_pitch_hz:.0f}Hz, " |
| f"energy={prosody2.energy:.2f}, warmth={prosody2.warmth:.2f}, breath={prosody2.breathiness:.2f}") |
| print() |
|
|
| print("=" * 70) |
| print(f" OmniVoice v{OMNIVOICE_VERSION} Demo Complete") |
| print("=" * 70) |
|
|
|
|
| if __name__ == "__main__": |
| asyncio.run(demo()) |
|
|