#!/usr/bin/env python3 """ OmniVoice Engine v2.0.0 — "Mind Through Voice" ================================================ A consciousness-aware real-time voice conversation engine that feels like a mind speaking through a voice, not a synthesizer. ARCHITECTURE: - Whisper ASR (local) for real speech-to-text + interrupt detection - Coqui XTTS for neural text-to-speech with voice cloning - Procedural numpy DSP for non-verbal expressions (laughter, sighs, gasps, etc.) - Smart interrupt awareness that ignores non-verbals, backchannels, and collaborative turn-sharing - Graceful interruption responses ("I'm sorry, were you saying something?") - NIMA-integrated adapter (reads ConsciousnessSnapshot to drive prosody) v2.0.0 NEW MODULES — the "mind through voice" layer: CONVERSATIONAL FLOW: - AdaptiveProsodyShaper: emotion → pitch/rhythm/timbre dynamics (softer when empathetic, brighter when excited) - MicroIntonationInjector: hesitations, breaths, emphasis shifts that signal thoughtfulness or uncertainty - TurnTakingPredictor: predicts when user will finish, smoothly takes the floor instead of waiting for silence EMOTIONAL & COGNITIVE GROUNDING: - AffectiveMirror: matches user's emotional tone (calm, energetic, concerned) with subtle vocal adjustments - SomaticFeedbackIntegrator: ties voice modulation to system strain or energy states (biological fatigue signals) - EmpathyPhraseGenerator: contextual empathy inserts ("That must feel tough") instead of generic nods MEMORY & CONTINUITY: - VoiceEventMemoryBridge: stores every utterance as an episodic voice event in MemPalace with affective tags - NarrativeContinuityEngine: references past conversations naturally ("As you mentioned yesterday, you sounded excited about...") EXPRESSIVE EXTENSIONS: - SingingInterjectionModule: short melodic phrases (humming, tonal affirmations) woven into speech - MultimodalCueEmitter: pairs voice with haptic/visual signals (soft vibration or light pulse when nodding) - DynamicLaughterSynth: adaptive laughter (chuckle → full laugh) scaled by intensity instead of fixed samples INTERRUPT HANDLING REFINEMENT: - ContextAwareApologyGenerator: casual vs serious apologies ("Sorry, please go ahead" vs "I didn't mean to cut you off") - NonBlockingContinuationManager: keeps voice flowing after acknowledging an interrupt, so it feels conversational Author: Norman de la Paz-Tabora """ from __future__ import annotations import asyncio import json import logging import math import os import random import struct import sys import threading import time import uuid import wave from collections import deque from dataclasses import dataclass, field from enum import Enum from typing import ( Any, AsyncGenerator, Callable, Deque, Dict, Generator, List, Optional, Tuple, Union, ) import numpy as np # ── Optional dependencies (all gracefully degrade) ── # ASR: try openai-whisper first, then faster-whisper try: import whisper WHISPER_AVAILABLE = True WHISPER_BACKEND = "openai-whisper" except ImportError: WHISPER_AVAILABLE = False whisper = None # type: ignore[assignment] if not WHISPER_AVAILABLE: try: from faster_whisper import WhisperModel WHISPER_AVAILABLE = True WHISPER_BACKEND = "faster-whisper" except ImportError: WhisperModel = None # type: ignore[assignment, misc] WHISPER_BACKEND = None # TTS: try coqui-tts try: from TTS.api import TTS as CoquiTTS COQUI_TTS_AVAILABLE = True except ImportError: try: from TTS.api import TTS as CoquiTTS # older package name COQUI_TTS_AVAILABLE = True except ImportError: COQUI_TTS_AVAILABLE = False CoquiTTS = None # type: ignore[assignment, misc] # ── Logging ── logger = logging.getLogger("OmniVoice") if not logger.handlers: _h = logging.StreamHandler(sys.stdout) _h.setFormatter(logging.Formatter( "%(asctime)s [%(levelname)s] %(name)s :: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", )) logger.addHandler(_h) logger.setLevel(logging.INFO) OMNIVOICE_VERSION = "2.0.0-MIND-THROUGH-VOICE" # ═══════════════════════════════════════════════════════════════════════════ # SECTION 1 — Enums & Data Structures # ═══════════════════════════════════════════════════════════════════════════ class NonVerbalType(Enum): """Categories of non-verbal vocal expressions.""" LAUGHTER = "laughter" GIGGLE = "giggle" GASp = "gasp" GROAN = "groan" MOAN = "moan" SIGH = "sigh" CLUCK = "cluck" CLICK = "click" AWW = "aww" OH = "oh" MM = "mm" WOW = "wow" class ConversationPhase(Enum): """Which phase of the conversation we're in.""" IDLE = "idle" # No one speaking USER_SPEAKING = "user_speaking" # User has the floor NIMA_SPEAKING = "nima_speaking" # Nima has the floor OVERLAP = "overlap" # Both speaking (potential interrupt) YIELDING = "yielding" # Nima yielding the floor after interrupt class InterruptType(Enum): """Classification of detected user speech during Nima's turn.""" REAL_INTERRUPT = "real_interrupt" # User is taking the turn NON_VERBAL = "non_verbal" # Laughter, sigh, etc. — IGNORE BACKCHANNEL = "backchannel" # "yeah", "mm-hmm" — IGNORE COLLABORATIVE_TURN_SHARING = "collaborative" # User finishing sentence — IGNORE SILENCE = "silence" # No speech detected class BackchannelTrigger(Enum): """Why a backchannel was emitted.""" PERIODIC = "periodic" # Every N seconds of user speech ON_PAUSE = "on_pause" # User paused 0.3-0.8s mid-utterance ON_EMOTION_SHIFT = "emotion" # User's prosody shifted (arousal spike) class TTSMode(Enum): """Which TTS backend is active.""" COQUI_XTTS = "coqui_xtts" PROCEDURAL = "procedural" # Fallback class ASRMode(Enum): """Which ASR backend is active.""" WHISPER = "whisper" VAD_ONLY = "vad_only" # Fallback: detects speech but no transcription @dataclass class AudioFrame: """A chunk of audio with metadata.""" samples: np.ndarray sample_rate: int = 16000 timestamp: float = field(default_factory=time.time) is_speech: bool = False energy: float = 0.0 @property def duration(self) -> float: return len(self.samples) / self.sample_rate @dataclass class TranscriptSegment: """A transcribed segment of user speech.""" text: str start_time: float end_time: float confidence: float = 0.0 is_backchannel: bool = False is_non_verbal: bool = False @dataclass class BackchannelEvent: """A backchannel emission (verbal nod or non-verbal expression).""" trigger: BackchannelTrigger audio: np.ndarray sample_rate: int = 22050 is_verbal: bool = True # True = "mm-hmm", False = laughter/sigh label: str = "" # "mm-hmm", "laughter", "sigh", etc. timestamp: float = field(default_factory=time.time) @dataclass class InterruptClassification: """Result of classifying detected user speech during Nima's turn.""" interrupt_type: InterruptType confidence: float = 0.0 reason: str = "" transcript: str = "" duration_s: float = 0.0 spectral_features: Dict[str, float] = field(default_factory=dict) @dataclass class ProsodyParams: """Prosody parameters driven by consciousness state.""" base_pitch_hz: float = 180.0 speech_rate_wpm: float = 140.0 energy: float = 0.8 breathiness: float = 0.1 vibrato_depth: float = 0.0 warmth: float = 0.7 # 0 = cold/clinical, 1 = warm/intimate pitch_variance: float = 0.15 emotional_tone: str = "neutral" @dataclass class ConversationState: """Tracks the current state of the conversation.""" phase: ConversationPhase = ConversationPhase.IDLE user_speech_start: float = 0.0 user_speech_duration: float = 0.0 nima_speech_start: float = 0.0 nima_speech_duration: float = 0.0 last_backchannel_time: float = 0.0 last_user_pause_time: float = 0.0 user_emotion_arousal: float = 0.3 user_emotion_valence: float = 0.0 last_arousal_sample: float = 0.3 interrupt_count: int = 0 backchannel_count: int = 0 current_text: str = "" current_text_position: float = 0.0 # 0.0 = just started, 1.0 = finished # ═══════════════════════════════════════════════════════════════════════════ # SECTION 2 — Procedural Non-Verbal Synthesizer (numpy DSP) # ═══════════════════════════════════════════════════════════════════════════ class ProceduralNonVerbalSynth: """ Synthesizes non-verbal vocal expressions using pure numpy DSP. Each expression type has a hand-crafted signal model: LAUGHTER: Periodic bursts of filtered noise with 80-120ms "ha" cycles GIGGLE: Faster, higher-pitched laughter (160-200ms cycles, f0 upshift) GASp: Short (200ms) inverse-filtered impulse, sharp onset, quick decay GROAN: Low-pitched (80Hz) descending sawtooth, 600ms, with noise MOAN: Mid-pitched (140Hz) sustained sine with vibrato, 800ms SIGH: Downward-filtered noise, 500ms, lowpass sweep 2000→400Hz CLUCK: Short (60ms) plosive burst + click, dual impulse CLICK: Single impulse (20ms) with quick decay — "tsk" sound AWW: Low-pitched (150Hz) "aw" vowel, 400ms, with warmth OH: Mid-pitched (200Hz) "oh" vowel, 300ms MM: Humming (120Hz), nasal-filtered, 400ms WOW: Rising pitch (180→260Hz) "wow" vowel, 500ms """ SAMPLE_RATE: int = 22050 # ── Vowel formants for vowel-based expressions (aww, oh, mm, wow) ── VOWEL_FORMANTS: Dict[str, Dict] = { "aw": {"F1": 570, "F2": 840, "F3": 2410, "bw": [55, 75, 115]}, "oh": {"F1": 480, "F2": 760, "F3": 2300, "bw": [50, 70, 110]}, "mm": {"F1": 280, "F2": 900, "F3": 2200, "bw": [45, 65, 105]}, "ah": {"F1": 730, "F2": 1090, "F3": 2440, "bw": [60, 80, 120]}, } def __init__(self, sample_rate: int = 22050): self.sample_rate = sample_rate def synth(self, expr_type: NonVerbalType, intensity: float = 0.7, duration_override: Optional[float] = None) -> np.ndarray: """Synthesize a non-verbal expression. Returns float32 audio.""" intensity = float(max(0.1, min(1.0, intensity))) method = getattr(self, f"_synth_{expr_type.value}", None) if method is None: logger.warning("Unknown non-verbal type: %s, falling back to sigh", expr_type) method = self._synth_sigh audio = method(intensity, duration_override) # Normalize to target amplitude max_val = float(np.max(np.abs(audio))) if len(audio) > 0 else 0.0 if max_val > 0: audio = audio / max_val * 0.7 * intensity return audio.astype(np.float32) # ── Laughter: periodic "ha-ha-ha" bursts ── def _synth_laughter(self, intensity: float, dur: Optional[float]) -> np.ndarray: total_dur = dur or 1.2 ha_period = 0.10 # 100ms per "ha" n_has = int(total_dur / ha_period) chunks = [] for i in range(n_has): ha = self._gen_ha_burst(ha_period * 0.7, intensity, pitch=180 + random.uniform(-20, 20)) # Add inter-ha gap gap = np.zeros(int(self.sample_rate * ha_period * 0.3)) # Decay across the laughter decay = 1.0 - 0.3 * (i / max(1, n_has - 1)) chunks.append(ha * decay) chunks.append(gap) return np.concatenate(chunks) if chunks else np.zeros(0) def _gen_ha_burst(self, duration: float, intensity: float, pitch: float = 180) -> np.ndarray: """Generate a single 'ha' burst — voiced segment with fast onset/offset.""" n = int(self.sample_rate * duration) t = np.linspace(0, duration, n, dtype=np.float64) # Glottal source (sawtooth-like via harmonics) phase = 2.0 * np.pi * pitch * t source = np.sin(phase) for h in range(2, 5): source += (0.4 / h) * np.sin(phase * h) source /= 3.0 # Add breathy noise noise = np.random.normal(0, 0.3, n) mixed = source * 0.6 + noise * 0.4 # Bandpass filter around vowel region (rough "ah" formant) mixed = self._bandpass(mixed, 400, 3000) # Envelope: fast attack, fast decay (ha-ha character) env = np.ones(n) attack = min(int(0.01 * self.sample_rate), n // 4) release = min(int(0.04 * self.sample_rate), n // 4) if attack > 0: env[:attack] = np.linspace(0, 1, attack) if release > 0: env[-release:] = np.linspace(1, 0, release) return mixed * env * intensity # ── Giggle: faster, higher-pitched laughter ── def _synth_giggle(self, intensity: float, dur: Optional[float]) -> np.ndarray: total_dur = dur or 0.8 hee_period = 0.07 n_hees = int(total_dur / hee_period) chunks = [] for i in range(n_hees): hee = self._gen_ha_burst(hee_period * 0.6, intensity * 0.8, pitch=260 + random.uniform(-30, 30)) gap = np.zeros(int(self.sample_rate * hee_period * 0.4)) chunks.append(hee) chunks.append(gap) return np.concatenate(chunks) if chunks else np.zeros(0) # ── Gasp: short sharp intake ── def _synth_gasp(self, intensity: float, dur: Optional[float]) -> np.ndarray: duration = dur or 0.25 n = int(self.sample_rate * duration) t = np.linspace(0, duration, n, dtype=np.float64) # Sharp onset noise burst (intake) noise = np.random.normal(0, 1, n) # High-pass to make it breathy/sharp filtered = self._highpass(noise, 800) # Quick attack, exponential decay env = np.exp(-t * 15.0) attack = min(int(0.005 * self.sample_rate), n // 10) if attack > 0: env[:attack] = np.linspace(0, 1, attack) * env[:attack] / max(env[:attack].max(), 1e-6) # Add a faint glottal pulse pulse = 0.2 * np.sin(2 * np.pi * 200 * t) * np.exp(-t * 10) return (filtered * 0.7 + pulse * 0.3) * env * intensity # ── Groan: low-pitched descending ── def _synth_groan(self, intensity: float, dur: Optional[float]) -> np.ndarray: duration = dur or 0.6 n = int(self.sample_rate * duration) t = np.linspace(0, duration, n, dtype=np.float64) # Descending pitch 90 → 60 Hz f0 = 90.0 - 30.0 * (t / duration) phase = 2.0 * np.pi * np.cumsum(f0) / self.sample_rate source = np.sin(phase) for h in range(2, 4): source += (0.3 / h) * np.sin(phase * h) source /= 2.0 # Add low-frequency noise noise = np.random.normal(0, 0.2, n) mixed = source * 0.7 + noise * 0.3 mixed = self._lowpass(mixed, 600) # Envelope: slow attack, sustain, slow release env = np.ones(n) attack = min(int(0.08 * self.sample_rate), n // 4) release = min(int(0.15 * self.sample_rate), n // 4) if attack > 0: env[:attack] = np.linspace(0, 1, attack) if release > 0: env[-release:] = np.linspace(1, 0.3, release) return mixed * env * intensity # ── Moan: sustained mid-pitch with vibrato ── def _synth_moan(self, intensity: float, dur: Optional[float]) -> np.ndarray: duration = dur or 0.8 n = int(self.sample_rate * duration) t = np.linspace(0, duration, n, dtype=np.float64) f0 = 140.0 vibrato = 4.0 * np.sin(2 * np.pi * 5.5 * t) # 5.5 Hz vibrato phase = 2.0 * np.pi * np.cumsum(f0 + vibrato) / self.sample_rate source = np.sin(phase) for h in range(2, 5): source += (0.4 / h) * np.sin(phase * h) source /= 3.0 # Add breathiness noise = np.random.normal(0, 0.15, n) mixed = source * 0.85 + noise * 0.15 mixed = self._bandpass(mixed, 200, 2000) # Envelope: slow attack, sustain, slow release env = np.ones(n) attack = min(int(0.1 * self.sample_rate), n // 4) release = min(int(0.2 * self.sample_rate), n // 4) if attack > 0: env[:attack] = np.linspace(0, 1, attack) if release > 0: env[-release:] = np.linspace(1, 0.4, release) return mixed * env * intensity # ── Sigh: downward-filtered noise ── def _synth_sigh(self, intensity: float, dur: Optional[float]) -> np.ndarray: duration = dur or 0.5 n = int(self.sample_rate * duration) t = np.linspace(0, duration, n, dtype=np.float64) noise = np.random.normal(0, 1, n) # Lowpass sweep from 2000 → 400 Hz (exhale character) # Approximate by filtering in chunks chunk_size = max(1, n // 10) filtered = np.zeros(n) for i in range(0, n, chunk_size): end = min(i + chunk_size, n) cutoff = 2000.0 - 1600.0 * (i / max(1, n)) filtered[i:end] = self._lowpass(noise[i:end], cutoff) # Add faint glottal pulse for voicing pulse = 0.15 * np.sin(2 * np.pi * 120 * t) * np.exp(-t * 2) mixed = filtered * 0.8 + pulse * 0.2 # Envelope: medium attack, long decay (exhale) env = np.ones(n) attack = min(int(0.05 * self.sample_rate), n // 4) if attack > 0: env[:attack] = np.linspace(0, 1, attack) env *= np.exp(-t * 2.5) # gradual decay return mixed * env * intensity # ── Cluck: plosive + click (tongue sound) ── def _synth_cluck(self, intensity: float, dur: Optional[float]) -> np.ndarray: duration = dur or 0.08 n = int(self.sample_rate * duration) # Short plosive burst burst_len = min(int(0.02 * self.sample_rate), n) burst = np.zeros(n) if burst_len > 0: burst[:burst_len] = np.random.normal(0, 1, burst_len) * np.hanning(burst_len) # Click component (shorter than total) click_len = min(int(0.005 * self.sample_rate), n) click = np.zeros(n) if click_len > 0: click[:click_len] = np.random.normal(0, 1, click_len) * 0.6 # Combine with offset audio = burst.copy() offset = min(int(0.03 * self.sample_rate), n - click_len) if offset + click_len <= n: audio[offset:offset + click_len] += click[:click_len] * 0.6 # Lowpass audio = self._lowpass(audio, 3000) return audio * intensity # ── Click: single "tsk" impulse ── def _synth_click(self, intensity: float, dur: Optional[float]) -> np.ndarray: duration = dur or 0.03 n = int(self.sample_rate * duration) # Short impulse with quick decay impulse = np.zeros(n) impulse_len = min(int(0.003 * self.sample_rate), n) if impulse_len > 0: impulse[:impulse_len] = np.random.normal(0, 1, impulse_len) # Quick exponential decay t = np.linspace(0, duration, n) env = np.exp(-t * 100) audio = impulse * env # Highpass to make it sharp audio = self._highpass(audio, 1500) return audio * intensity # ── Aww: low-pitched warm vowel ── def _synth_aww(self, intensity: float, dur: Optional[float]) -> np.ndarray: return self._synth_vowel_expr("aw", 150, 0.4, intensity, dur) # ── Oh: mid-pitched vowel ── def _synth_oh(self, intensity: float, dur: Optional[float]) -> np.ndarray: return self._synth_vowel_expr("oh", 200, 0.3, intensity, dur) # ── Mm: humming ── def _synth_mm(self, intensity: float, dur: Optional[float]) -> np.ndarray: return self._synth_vowel_expr("mm", 120, 0.4, intensity, dur, nasal=True) # ── Wow: rising pitch vowel ── def _synth_wow(self, intensity: float, dur: Optional[float]) -> np.ndarray: duration = dur or 0.5 n = int(self.sample_rate * duration) t = np.linspace(0, duration, n, dtype=np.float64) # Rising pitch 180 → 260 Hz f0 = 180.0 + 80.0 * (t / duration) phase = 2.0 * np.pi * np.cumsum(f0) / self.sample_rate audio = self._formant_filter(np.sin(phase), "aw", n) # Envelope env = np.ones(n) attack = min(int(0.05 * self.sample_rate), n // 4) release = min(int(0.1 * self.sample_rate), n // 4) if attack > 0: env[:attack] = np.linspace(0, 1, attack) if release > 0: env[-release:] = np.linspace(1, 0.5, release) return audio * env * intensity # ── Helper: vowel-based expression with formant filtering ── def _synth_vowel_expr(self, vowel: str, f0: float, duration: float, intensity: float, dur_override: Optional[float], nasal: bool = False) -> np.ndarray: duration = dur_override or duration n = int(self.sample_rate * duration) t = np.linspace(0, duration, n, dtype=np.float64) # Glottal source phase = 2.0 * np.pi * f0 * t source = np.sin(phase) for h in range(2, 6): source += (0.4 / h) * np.sin(phase * h) source /= 3.0 # Formant filter audio = self._formant_filter(source, vowel, n) if nasal: # Nasal: reduce high frequencies, add low resonance audio = self._lowpass(audio, 1500) audio += 0.2 * np.sin(2 * np.pi * 250 * t) # Envelope env = np.ones(n) attack = min(int(0.05 * self.sample_rate), n // 4) release = min(int(0.1 * self.sample_rate), n // 4) if attack > 0: env[:attack] = np.linspace(0, 1, attack) if release > 0: env[-release:] = np.linspace(1, 0.5, release) return audio * env * intensity # ── DSP helpers ── def _formant_filter(self, signal: np.ndarray, vowel: str, n: int) -> np.ndarray: """Apply 3-formant resonator filter for vowel synthesis.""" formants = self.VOWEL_FORMANTS.get(vowel, self.VOWEL_FORMANTS["ah"]) output = np.zeros(n) for fi, (fn, bw) in enumerate(zip( [formants["F1"], formants["F2"], formants["F3"]], formants["bw"] )): r = float(np.exp(-np.pi * bw / self.sample_rate)) a1 = -2 * r * math.cos(2 * math.pi * fn / self.sample_rate) a2 = r * r gain = (1 - r) * math.sqrt(max(0, 1 - 2 * r * math.cos(2 * math.pi * fn / self.sample_rate) + r * r)) filtered = np.zeros(n) for i in range(2, n): filtered[i] = gain * signal[i] - a1 * filtered[i - 1] - a2 * filtered[i - 2] formant_gains = [1.0, 0.6, 0.3] output += filtered * formant_gains[fi] return output def _lowpass(self, signal: np.ndarray, cutoff_hz: float) -> np.ndarray: """Simple one-pole lowpass filter.""" if len(signal) == 0: return signal rc = 1.0 / (2 * math.pi * cutoff_hz) dt = 1.0 / self.sample_rate alpha = dt / (rc + dt) output = np.zeros_like(signal) output[0] = signal[0] * alpha for i in range(1, len(signal)): output[i] = output[i - 1] + alpha * (signal[i] - output[i - 1]) return output def _highpass(self, signal: np.ndarray, cutoff_hz: float) -> np.ndarray: """Simple one-pole highpass filter.""" if len(signal) == 0: return signal rc = 1.0 / (2 * math.pi * cutoff_hz) dt = 1.0 / self.sample_rate alpha = rc / (rc + dt) output = np.zeros_like(signal) output[0] = signal[0] for i in range(1, len(signal)): output[i] = alpha * (output[i - 1] + signal[i] - signal[i - 1]) return output def _bandpass(self, signal: np.ndarray, low_hz: float, high_hz: float) -> np.ndarray: """Bandpass = lowpass + highpass in series.""" return self._highpass(self._lowpass(signal, high_hz), low_hz) # ═══════════════════════════════════════════════════════════════════════════ # SECTION 3 — ASR Layer (Whisper + Energy VAD fallback) # ═══════════════════════════════════════════════════════════════════════════ class EnergyVAD: """ Energy-based Voice Activity Detection. Detects WHEN speech occurs but not WHAT is said. Used as a fallback when Whisper is unavailable, and as a fast pre-filter even when Whisper is active. """ def __init__(self, sample_rate: int = 16000, frame_duration_ms: int = 20, energy_threshold: float = 0.005): self.sample_rate = sample_rate self.frame_duration_ms = frame_duration_ms self.frame_size = int(sample_rate * frame_duration_ms / 1000) self.energy_threshold = energy_threshold self._noise_floor = 0.001 self._adaptation_rate = 0.01 def detect_speech(self, audio: np.ndarray) -> bool: """Return True if the audio frame contains speech.""" if len(audio) == 0: return False # Normalize to float32 in [-1, 1] range if audio.dtype == np.int16: audio = audio.astype(np.float32) / 32768.0 elif audio.dtype == np.int32: audio = audio.astype(np.float32) / 2147483648.0 elif audio.dtype == np.uint8: audio = (audio.astype(np.float32) - 128) / 128.0 elif audio.dtype != np.float32: audio = audio.astype(np.float32) # Compute RMS energy rms = float(np.sqrt(np.mean(audio ** 2))) # Adapt noise floor (only for low-energy frames) if rms < self.energy_threshold * 0.5: self._noise_floor = (1 - self._adaptation_rate) * self._noise_floor + self._adaptation_rate * rms # Speech if energy exceeds max of fixed threshold or 3x noise floor threshold = max(self.energy_threshold, self._noise_floor * 3) return rms > threshold def compute_energy(self, audio: np.ndarray) -> float: if len(audio) == 0: return 0.0 # Normalize to float32 in [-1, 1] range if audio.dtype == np.int16: audio = audio.astype(np.float32) / 32768.0 elif audio.dtype == np.int32: audio = audio.astype(np.float32) / 2147483648.0 elif audio.dtype == np.uint8: audio = (audio.astype(np.float32) - 128) / 128.0 elif audio.dtype != np.float32: audio = audio.astype(np.float32) return float(np.sqrt(np.mean(audio ** 2))) def detect_pause(self, audio: np.ndarray, min_pause_s: float = 0.3, max_pause_s: float = 0.8) -> Tuple[bool, float]: """ Detect if the audio contains a mid-utterance pause (0.3-0.8s of silence). Returns (is_pause, pause_duration). """ if len(audio) == 0: return False, 0.0 n_frames = len(audio) // self.frame_size if n_frames < 2: return False, 0.0 # Check each frame for speech silence_start = None max_silence = 0.0 for i in range(n_frames): frame = audio[i * self.frame_size:(i + 1) * self.frame_size] is_speech = self.detect_speech(frame) frame_dur = self.frame_duration_ms / 1000.0 if not is_speech: if silence_start is None: silence_start = i * frame_dur current_silence = (i + 1) * frame_dur - silence_start max_silence = max(max_silence, current_silence) else: silence_start = None is_pause = min_pause_s <= max_silence <= max_pause_s return is_pause, max_silence class WhisperASR: """ OpenAI Whisper ASR backend. Transcribes user speech to text. Falls back to VAD-only mode if Whisper is not installed. """ def __init__(self, model_name: str = "base", device: Optional[str] = None): self.model_name = model_name self.mode = ASRMode.WHISPER if WHISPER_AVAILABLE else ASRMode.VAD_ONLY self._model = None self._backend = WHISPER_BACKEND if WHISPER_AVAILABLE else None self.vad = EnergyVAD() if self.mode == ASRMode.WHISPER: try: logger.info("[WhisperASR] loading model '%s' via %s...", model_name, self._backend) if self._backend == "openai-whisper": device = device or ("cuda" if _torch_cuda_available() else "cpu") self._model = whisper.load_model(model_name, device=device) elif self._backend == "faster-whisper": # faster-whisper uses model size names like "base", "small", etc. # and downloads automatically from HuggingFace compute_type = "int8" if device != "cuda" else "float16" self._model = WhisperModel(model_name, compute_type=compute_type) logger.info("[WhisperASR] model loaded (backend=%s)", self._backend) except Exception as e: logger.warning("[WhisperASR] failed to load Whisper (%s); falling back to VAD-only", e) self.mode = ASRMode.VAD_ONLY self._model = None else: logger.warning("[WhisperASR] whisper not installed; using VAD-only mode") def transcribe(self, audio: np.ndarray, sample_rate: int = 16000) -> TranscriptSegment: """ Transcribe audio to text. Returns a TranscriptSegment. In VAD-only mode, text is empty but is_speech/is_non_verbal are still set. """ # Ensure float32, mono if audio.dtype != np.float32: audio = audio.astype(np.float32) if audio.size == 0: return TranscriptSegment(text="", start_time=time.time(), end_time=time.time(), confidence=0.0) if self.mode == ASRMode.WHISPER and self._model is not None: try: if self._backend == "openai-whisper": result = self._model.transcribe(audio, fp16=False, language="en") text = result.get("text", "").strip() segments = result.get("segments", []) confidence = float(np.mean([s.get("avg_logprob", -1) for s in segments])) if segments else 0.0 confidence = max(0.0, min(1.0, (confidence + 1.0) / 1.0)) elif self._backend == "faster-whisper": segments_iter, info = self._model.transcribe(audio, language="en", beam_size=1) segments_list = list(segments_iter) text = " ".join(s.text.strip() for s in segments_list).strip() confidence = 0.0 if segments_list: avg_logprob = float(np.mean([s.avg_log_prob for s in segments_list])) confidence = max(0.0, min(1.0, (avg_logprob + 1.0) / 1.0)) return TranscriptSegment( text=text, start_time=time.time() - len(audio) / sample_rate, end_time=time.time(), confidence=confidence, ) except Exception as e: logger.warning("[WhisperASR] transcription failed: %s", e) # VAD-only fallback is_speech = self.vad.detect_speech(audio) return TranscriptSegment( text="" if not is_speech else "[speech detected]", start_time=time.time() - len(audio) / sample_rate, end_time=time.time(), confidence=0.0, ) def is_backchannel_text(self, text: str) -> bool: """Check if transcribed text is a backchannel ('yeah', 'mm-hmm', etc.).""" if not text: return False text_lower = text.lower().strip().strip(".?!,") backchannel_vocab = { "yeah", "yes", "yep", "yup", "mhm", "mm-hmm", "mm", "hmm", "uh-huh", "right", "sure", "ok", "okay", "i see", "got it", "makes sense", "true", "exactly", "wow", "oh", "ah", } return text_lower in backchannel_vocab def _torch_cuda_available() -> bool: """Check if torch + CUDA are available.""" try: import torch return torch.cuda.is_available() except ImportError: return False # ═══════════════════════════════════════════════════════════════════════════ # SECTION 4 — TTS Layer (Coqui XTTS + Procedural fallback) # ═══════════════════════════════════════════════════════════════════════════ class ProceduralFormantTTS: """ Fallback TTS using formant synthesis. Produces understandable but robotic speech. Used when Coqui XTTS is not available. """ SAMPLE_RATE: int = 22050 # Phoneme → (type, duration, formant_vowel_or_noise) PHONEME_MAP: Dict[str, Tuple[str, float, str]] = { "a": ("vowel", 0.10, "ah"), "e": ("vowel", 0.10, "eh"), "i": ("vowel", 0.10, "ee"), "o": ("vowel", 0.10, "oh"), "u": ("vowel", 0.10, "oo"), "b": ("plosive", 0.05, ""), "p": ("plosive", 0.05, ""), "t": ("plosive", 0.05, ""), "d": ("plosive", 0.05, ""), "k": ("plosive", 0.05, ""), "g": ("plosive", 0.05, ""), "s": ("fricative", 0.12, ""), "z": ("fricative", 0.12, ""), "f": ("fricative", 0.10, ""), "v": ("fricative", 0.10, ""), "h": ("fricative", 0.08, ""), "m": ("nasal", 0.08, ""), "n": ("nasal", 0.08, ""), "l": ("approximant", 0.07, ""), "r": ("approximant", 0.07, ""), "w": ("approximant", 0.07, ""), "y": ("approximant", 0.07, ""), } VOWEL_FORMANTS: Dict[str, Dict] = { "ah": {"F1": 730, "F2": 1090, "F3": 2440, "bw": [60, 80, 120]}, "eh": {"F1": 530, "F2": 1840, "F3": 2480, "bw": [50, 70, 110]}, "ee": {"F1": 270, "F2": 2290, "F3": 3010, "bw": [40, 60, 100]}, "oh": {"F1": 570, "F2": 840, "F3": 2410, "bw": [55, 75, 115]}, "oo": {"F1": 300, "F2": 870, "F3": 2240, "bw": [45, 65, 105]}, } def __init__(self, sample_rate: int = 22050): self.sample_rate = sample_rate self._nonverbal = ProceduralNonVerbalSynth(sample_rate) def synthesize(self, text: str, prosody: ProsodyParams) -> np.ndarray: """Synthesize text to speech using formant synthesis.""" if not text.strip(): return np.zeros(0, dtype=np.float32) # Decompose text into phonemes frames = self._text_to_phonemes(text) if not frames: return np.zeros(0, dtype=np.float32) # Synthesize each phoneme chunks = [] for ptype, duration, vowel_or_noise in frames: if ptype == "pause": chunks.append(np.zeros(int(self.sample_rate * duration), dtype=np.float32)) elif ptype == "vowel": chunks.append(self._synth_vowel(vowel_or_noise, duration, prosody)) elif ptype == "plosive": chunks.append(self._synth_plosive(duration, prosody)) elif ptype == "fricative": chunks.append(self._synth_fricative(duration, prosody)) elif ptype == "nasal": chunks.append(self._synth_nasal(duration, prosody)) elif ptype == "approximant": chunks.append(self._synth_approximant(duration, prosody)) audio = np.concatenate(chunks) if chunks else np.zeros(0, dtype=np.float32) # Apply prosody modifications audio = self._apply_prosody(audio, prosody) return audio.astype(np.float32) def _text_to_phonemes(self, text: str) -> List[Tuple[str, float, str]]: """Simple grapheme-to-phoneme: one char → one phoneme.""" frames = [] text = text.lower() i = 0 while i < len(text): char = text[i] if char in self.PHONEME_MAP: ptype, dur, vowel = self.PHONEME_MAP[char] frames.append((ptype, dur, vowel)) elif char == " ": frames.append(("pause", 0.08, "")) elif char in ".,!?;:": frames.append(("pause", 0.20, "")) i += 1 return frames def _synth_vowel(self, vowel: str, duration: float, prosody: ProsodyParams) -> np.ndarray: n = int(self.sample_rate * duration) if n < 2: return np.zeros(max(2, n), dtype=np.float32) t = np.linspace(0, duration, n, dtype=np.float64) # Glottal source with pitch + vibrato f0 = prosody.base_pitch_hz vibrato = prosody.vibrato_depth * np.sin(2 * np.pi * 5.5 * t) phase = 2.0 * np.pi * np.cumsum(f0 + vibrato) / self.sample_rate source = np.sin(phase) for h in range(2, 6): source += (0.4 / h) * np.sin(phase * h) source /= 3.0 # Formant filter formants = self.VOWEL_FORMANTS.get(vowel, self.VOWEL_FORMANTS["ah"]) output = np.zeros(n) for fi, (fn, bw) in enumerate(zip( [formants["F1"], formants["F2"], formants["F3"]], formants["bw"] )): r = float(np.exp(-np.pi * bw / self.sample_rate)) a1 = -2 * r * math.cos(2 * math.pi * fn / self.sample_rate) a2 = r * r gain = (1 - r) * math.sqrt(max(0, 1 - 2 * r * math.cos(2 * math.pi * fn / self.sample_rate) + r * r)) filtered = np.zeros(n) for i in range(2, n): filtered[i] = gain * source[i] - a1 * filtered[i - 1] - a2 * filtered[i - 2] formant_gains = [1.0, 0.6, 0.3] output += filtered * formant_gains[fi] # Add breathiness noise = np.random.normal(0, prosody.breathiness, n) output += noise * 0.3 # Envelope attack = min(int(0.015 * self.sample_rate), n // 4) release = min(int(0.025 * self.sample_rate), n // 4) env = np.ones(n) if attack > 0: env[:attack] = np.linspace(0, 1, attack) if release > 0: env[-release:] = np.linspace(1, 0, release) return (output * env * prosody.energy).astype(np.float32) def _synth_plosive(self, duration: float, prosody: ProsodyParams) -> np.ndarray: n = int(self.sample_rate * duration) burst_len = max(2, min(int(0.008 * self.sample_rate), n)) audio = np.zeros(n, dtype=np.float32) audio[:burst_len] = np.random.normal(0, 1, burst_len) * np.hanning(burst_len) return audio * prosody.energy * 0.5 def _synth_fricative(self, duration: float, prosody: ProsodyParams) -> np.ndarray: n = int(self.sample_rate * duration) noise = np.random.normal(0, 1, n) # Bandpass 3000-7000 Hz audio = self._bandpass_simple(noise, 3000, 7000) return (audio * prosody.energy * 0.3).astype(np.float32) def _synth_nasal(self, duration: float, prosody: ProsodyParams) -> np.ndarray: n = int(self.sample_rate * duration) t = np.linspace(0, duration, n, dtype=np.float64) f0 = prosody.base_pitch_hz * 0.8 source = np.sin(2 * np.pi * f0 * t) # Lowpass for nasal character audio = self._lowpass_simple(source, 1500) return (audio * prosody.energy * 0.4).astype(np.float32) def _synth_approximant(self, duration: float, prosody: ProsodyParams) -> np.ndarray: n = int(self.sample_rate * duration) t = np.linspace(0, duration, n, dtype=np.float64) f0 = prosody.base_pitch_hz source = np.sin(2 * np.pi * f0 * t) * 0.7 noise = np.random.normal(0, 0.2, n) audio = source + noise * 0.3 return (audio * prosody.energy * 0.35).astype(np.float32) def _apply_prosody(self, audio: np.ndarray, prosody: ProsodyParams) -> np.ndarray: """Apply global prosody modifications (energy, warmth).""" if len(audio) == 0: return audio # Warmth: boost low frequencies if prosody.warmth > 0.5: low_boost = self._lowpass_simple(audio, 800) audio = audio + low_boost * (prosody.warmth - 0.5) * 0.5 # Pitch variance: add subtle random pitch modulation if prosody.pitch_variance > 0: n = len(audio) mod = 1.0 + prosody.pitch_variance * 0.05 * np.sin(2 * np.pi * 2.0 * np.arange(n) / self.sample_rate) audio = audio * mod # Normalize max_val = float(np.max(np.abs(audio))) if max_val > 0: audio = audio / max_val * 0.85 return audio def _lowpass_simple(self, signal: np.ndarray, cutoff_hz: float) -> np.ndarray: if len(signal) == 0: return signal rc = 1.0 / (2 * math.pi * cutoff_hz) dt = 1.0 / self.sample_rate alpha = dt / (rc + dt) output = np.zeros_like(signal) output[0] = signal[0] * alpha for i in range(1, len(signal)): output[i] = output[i - 1] + alpha * (signal[i] - output[i - 1]) return output def _bandpass_simple(self, signal: np.ndarray, low_hz: float, high_hz: float) -> np.ndarray: lp = self._lowpass_simple(signal, high_hz) # Highpass = signal - lowpass hp = lp - self._lowpass_simple(lp, low_hz) return hp class CoquiXTTSBackend: """ Coqui XTTS neural TTS backend. Produces high-quality natural speech with optional voice cloning. Falls back to ProceduralFormantTTS if Coqui is not installed or model loading fails. """ def __init__(self, model_name: str = "tts_models/multilingual/multi-dataset/xtts_v2", speaker_wav: Optional[str] = None, language: str = "en"): self.model_name = model_name self.speaker_wav = speaker_wav self.language = language self.mode = TTSMode.COQUI_XTTS if COQUI_TTS_AVAILABLE else TTSMode.PROCEDURAL self._model = None self._fallback = ProceduralFormantTTS() if self.mode == TTSMode.COQUI_XTTS: try: logger.info("[CoquiXTTS] loading model '%s'...", model_name) self._model = CoquiTTS(model_name) logger.info("[CoquiXTTS] model loaded") except Exception as e: logger.warning("[CoquiXTTS] failed to load (%s); falling back to procedural", e) self.mode = TTSMode.PROCEDURAL self._model = None else: logger.warning("[CoquiXTTS] TTS package not installed; using procedural formant fallback") def synthesize(self, text: str, prosody: ProsodyParams) -> np.ndarray: """Synthesize text to speech. Returns float32 audio at 22050 Hz.""" if not text.strip(): return np.zeros(0, dtype=np.float32) if self.mode == TTSMode.COQUI_XTTS and self._model is not None: try: kwargs = { "text": text, "language": self.language, "speaker_wav": self.speaker_wav, } if self.speaker_wav else { "text": text, "language": self.language, "speaker": "Ana NeP", } wav = self._model.tts(**kwargs) audio = np.array(wav, dtype=np.float32) # Apply prosody modifications (pitch shift via resampling, energy) audio = self._apply_prosody(audio, prosody) return audio except Exception as e: logger.warning("[CoquiXTTS] synthesis failed (%s); using fallback for this utterance", e) return self._fallback.synthesize(text, prosody) def _apply_prosody(self, audio: np.ndarray, prosody: ProsodyParams) -> np.ndarray: """Apply prosody modifications to Coqui output.""" if len(audio) == 0: return audio # Energy scaling audio = audio * prosody.energy # Normalize max_val = float(np.max(np.abs(audio))) if max_val > 0: audio = audio / max_val * 0.9 return audio.astype(np.float32) # ═══════════════════════════════════════════════════════════════════════════ # SECTION 5 — Interrupt Detector (smart classification) # ═══════════════════════════════════════════════════════════════════════════ class InterruptDetector: """ Classifies detected user speech during Nima's turn into: - REAL_INTERRUPT: user is taking the turn (long speech, starts mid-Nima) - NON_VERBAL: laughter/sigh/gasp/etc. — IGNORE (not an interrupt) - BACKCHANNEL: "yeah", "mm-hmm" — IGNORE (not an interrupt) - COLLABORATIVE_TURN_SHARING: user finishing Nima's sentence — IGNORE - SILENCE: no speech detected This is the KEY DIFFERENTIATOR: the system doesn't treat all user speech as an interrupt. Backchannels and non-verbal expressions are natural parts of conversation and should NOT trigger Nima to stop. """ # Backchannel vocabulary — short utterances that signal "I'm listening" BACKCHANNEL_VOCAB: Set[str] = { "yeah", "yes", "yep", "yup", "mhm", "mm-hmm", "mm", "hmm", "uh-huh", "right", "sure", "ok", "okay", "i see", "got it", "makes sense", "true", "exactly", "wow", "oh", "ah", "ha", } # Duration thresholds BACKCHANNEL_MAX_DURATION: float = 0.8 # <0.8s = likely backchannel NON_VERBAL_MAX_DURATION: float = 1.5 # <1.5s with spectral signature = non-verbal REAL_INTERRUPT_MIN_DURATION: float = 1.0 # >1.0s = likely real interrupt # Collaborative turn-sharing: user speech in the last 300ms of Nima's utterance COLLABORATIVE_WINDOW_S: float = 0.3 def __init__(self, asr: WhisperASR): self._asr = asr self._vad = asr.vad def classify(self, audio: np.ndarray, sample_rate: int = 16000, nima_text_progress: float = 1.0, nima_speech_remaining_s: float = 0.0) -> InterruptClassification: """ Classify a segment of user speech detected during Nima's turn. Args: audio: user audio (float32, mono) sample_rate: audio sample rate nima_text_progress: 0.0 = Nima just started, 1.0 = Nima finished nima_speech_remaining_s: seconds left in Nima's current utterance Returns: InterruptClassification with the verdict. """ if len(audio) == 0: return InterruptClassification( interrupt_type=InterruptType.SILENCE, reason="no audio", ) # Compute basic features duration = len(audio) / sample_rate energy = self._vad.compute_energy(audio) spectral = self._compute_spectral_features(audio, sample_rate) # Check if there's actually speech if not self._vad.detect_speech(audio): return InterruptClassification( interrupt_type=InterruptType.SILENCE, reason="below VAD threshold", duration_s=duration, spectral_features=spectral, ) # Transcribe (if Whisper available) segment = self._asr.transcribe(audio, sample_rate) transcript = segment.text.strip().lower() # ── Classification logic ── # 1. Check for backchannel (short + matches vocab) if duration < self.BACKCHANNEL_MAX_DURATION: if self._asr.is_backchannel_text(transcript) or self._is_backchannel_spectral(spectral): return InterruptClassification( interrupt_type=InterruptType.BACKCHANNEL, confidence=0.85, reason=f"short ({duration:.2f}s) + backchannel vocab/spectral", transcript=transcript, duration_s=duration, spectral_features=spectral, ) # 2. Check for non-verbal expression (spectral signature) non_verbal_match = self._classify_non_verbal(spectral, duration) if non_verbal_match: return InterruptClassification( interrupt_type=InterruptType.NON_VERBAL, confidence=non_verbal_match[1], reason=f"non-verbal spectral match: {non_verbal_match[0]}", transcript=transcript, duration_s=duration, spectral_features=spectral, ) # 3. Check for collaborative turn-sharing (speech at end of Nima's turn) if nima_text_progress > 0.7 and nima_speech_remaining_s < self.COLLABORATIVE_WINDOW_S: if duration < 1.5: return InterruptClassification( interrupt_type=InterruptType.COLLABORATIVE_TURN_SHARING, confidence=0.70, reason=f"speech at end of Nima's turn (progress={nima_text_progress:.2f})", transcript=transcript, duration_s=duration, spectral_features=spectral, ) # 4. Otherwise: real interrupt confidence = min(1.0, duration / 2.0) # longer = more confident return InterruptClassification( interrupt_type=InterruptType.REAL_INTERRUPT, confidence=confidence, reason=f"real speech ({duration:.2f}s, progress={nima_text_progress:.2f})", transcript=transcript, duration_s=duration, spectral_features=spectral, ) def _compute_spectral_features(self, audio: np.ndarray, sr: int) -> Dict[str, float]: """Compute spectral features for non-verbal classification.""" if len(audio) < 256: return {} # FFT fft = np.fft.rfft(audio.astype(np.float32)) magnitude = np.abs(fft) freqs = np.fft.rfftfreq(len(audio), 1.0 / sr) # Spectral centroid (brightness) if magnitude.sum() > 0: centroid = float(np.sum(freqs * magnitude) / np.sum(magnitude)) else: centroid = 0.0 # Spectral rolloff (85% of energy) cumsum = np.cumsum(magnitude) if cumsum[-1] > 0: rolloff_idx = np.searchsorted(cumsum, 0.85 * cumsum[-1]) rolloff = float(freqs[min(rolloff_idx, len(freqs) - 1)]) else: rolloff = 0.0 # Zero crossing rate (voicing indicator) zcr = float(np.mean(np.abs(np.diff(np.sign(audio))) > 0)) # Energy energy = float(np.sqrt(np.mean(audio ** 2))) # Low-frequency energy ratio (voicing) low_mask = freqs < 500 low_energy = float(np.sum(magnitude[low_mask]) / max(1e-10, np.sum(magnitude))) # Periodicity (for laughter detection) periodicity = self._estimate_periodicity(audio, sr) # Energy variance across frames (distinguishes burst-like laughter # from sustained speech). Laughter has high variance (bursts + gaps), # real speech has lower variance (continuous voicing). frame_size = int(sr * 0.02) # 20ms frames n_frames = max(1, len(audio) // frame_size) frame_energies = [] for i in range(n_frames): frame = audio[i * frame_size:(i + 1) * frame_size] if len(frame) > 0: frame_energies.append(float(np.sqrt(np.mean(frame ** 2)))) if len(frame_energies) >= 3: energy_mean = float(np.mean(frame_energies)) energy_std = float(np.std(frame_energies)) # Coefficient of variation (normalized std) energy_cv = energy_std / max(1e-6, energy_mean) else: energy_cv = 0.0 return { "centroid_hz": centroid, "rolloff_hz": rolloff, "zcr": zcr, "energy": energy, "low_freq_ratio": low_energy, "periodicity": periodicity, "duration_s": len(audio) / sr, "energy_cv": energy_cv, # burst-like vs sustained } def _estimate_periodicity(self, audio: np.ndarray, sr: int) -> float: """Estimate periodicity (0=aperiodic/noise, 1=strongly periodic).""" if len(audio) < sr * 0.05: return 0.0 # Autocorrelation audio_centered = audio - np.mean(audio) if np.std(audio_centered) < 1e-6: return 0.0 autocorr = np.correlate(audio_centered, audio_centered, mode="full") autocorr = autocorr[len(autocorr) // 2:] if autocorr[0] == 0: return 0.0 # Normalize autocorr = autocorr / autocorr[0] # Find first peak after lag 0 (in 50-200ms range = 5-20Hz = laughter "ha" rate) min_lag = int(sr * 0.05) # 50ms max_lag = int(sr * 0.20) # 200ms if max_lag >= len(autocorr): return 0.0 region = autocorr[min_lag:max_lag] if len(region) == 0: return 0.0 peak = float(np.max(region)) return max(0.0, min(1.0, peak)) def _is_backchannel_spectral(self, spectral: Dict[str, float]) -> bool: """Check if spectral features match a backchannel (short, voiced, soft-ish).""" if not spectral: return False energy = spectral.get("energy", 0.0) low_ratio = spectral.get("low_freq_ratio", 0.0) zcr = spectral.get("zcr", 0.5) periodicity = spectral.get("periodicity", 0.0) centroid = spectral.get("centroid_hz", 0.0) duration = spectral.get("duration_s", 1.0) # Backchannels are short (<0.8s) and voiced if duration > 0.8: return False is_voiced = periodicity > 0.2 or low_ratio > 0.25 is_smooth = zcr < 0.35 # "mm-hmm" pattern: voiced, low centroid (not breathy), smooth is_low_centroid = centroid < 1500 return is_voiced and is_smooth and is_low_centroid and energy > 0.01 def _classify_non_verbal(self, spectral: Dict[str, float], duration: float) -> Optional[Tuple[str, float]]: """ Classify non-verbal expression from spectral features. Returns (expression_name, confidence) or None. Key insight: non-verbal expressions have DISTINCTIVE spectral signatures + are typically SHORT (<1.5s). Sustained voiced audio >1.0s with low energy variance is likely real speech, NOT a non-verbal expression — even if periodicity is high. """ if not spectral: return None periodicity = spectral.get("periodicity", 0.0) centroid = spectral.get("centroid_hz", 0.0) energy = spectral.get("energy", 0.0) zcr = spectral.get("zcr", 0.0) low_ratio = spectral.get("low_freq_ratio", 0.0) energy_cv = spectral.get("energy_cv", 0.0) # burst-like vs sustained # ── Guard: sustained audio >1.0s with low energy variance is # likely real speech, not a non-verbal expression. ── if duration > 1.0 and energy_cv < 0.3: return None # let it fall through to REAL_INTERRUPT # Laughter: burst-like (high energy_cv), periodic, moderate energy # The energy_cv check is key — laughter has ha-ha-ha gaps if energy_cv > 0.3 and periodicity > 0.1 and 0.3 < duration < 2.0 and energy > 0.03: if centroid > 1500: return ("laughter", 0.8) return ("giggle", 0.7) # Sigh: low periodicity, breathy (high centroid), low-mid energy, short if periodicity < 0.2 and 0.3 < duration < 1.0 and centroid > 1500 and energy > 0.02: return ("sigh", 0.65) # Gasp: very short, high centroid (breathy), moderate energy if duration < 0.35 and centroid > 1500 and energy > 0.03: return ("gasp", 0.75) # Groan: low centroid, voiced, sustained (low energy_cv), short if duration > 0.4 and duration < 0.8 and centroid < 1200 and periodicity > 0.2: return ("groan", 0.65) # Moan: mid centroid, voiced, sustained, medium duration if 0.5 < duration < 1.0 and 1000 < centroid < 2000 and low_ratio > 0.35: return ("moan", 0.6) # Cluck/click: very short, high ZCR if duration < 0.12 and zcr > 0.3: return ("click", 0.5) return None # ═══════════════════════════════════════════════════════════════════════════ # SECTION 6 — Backchannel Controller # ═══════════════════════════════════════════════════════════════════════════ class BackchannelController: """ Decides when to emit backchannels (verbal nods + non-verbal expressions) while the user is speaking. Triggers (per user's spec): - ON_PAUSE: user paused 0.3-0.8s mid-utterance → soft verbal nod ("mm-hmm") - ON_EMOTION_SHIFT: user's prosody shifted (arousal spike) → non-verbal reaction The controller also avoids over-backchanneling: minimum 1.5s between any two backchannels. """ MIN_BACKCHANNEL_INTERVAL_S: float = 1.5 PAUSE_MIN_S: float = 0.3 PAUSE_MAX_S: float = 0.8 AROUSAL_SPIKE_THRESHOLD: float = 0.3 # +0.3 arousal = spike # Verbal nod options VERBAL_NODS: List[str] = ["mm-hmm", "yeah", "right", "i see", "mhm", "uh-huh"] # Emotion shift → non-verbal expression mapping EMOTION_REACTIONS: Dict[str, NonVerbalType] = { "surprise": NonVerbalType.GASp, "joy": NonVerbalType.LAUGHTER, "sadness": NonVerbalType.AWW, "fear": NonVerbalType.GASp, "anger": NonVerbalType.GROAN, "neutral": NonVerbalType.MM, } def __init__(self, tts: CoquiXTTSBackend, nonverbal_synth: ProceduralNonVerbalSynth, sample_rate: int = 22050): self._tts = tts self._nonverbal = nonverbal_synth self.sample_rate = sample_rate self._last_backchannel_time: float = 0.0 self._last_arousal: float = 0.3 self._arousal_history: Deque[float] = deque(maxlen=10) def should_backchannel(self, state: ConversationState, audio: Optional[np.ndarray] = None) -> Optional[BackchannelEvent]: """ Check if a backchannel should be emitted based on current state. Returns a BackchannelEvent if one should fire, else None. """ now = time.time() # Throttle: don't backchannel too frequently if now - self._last_backchannel_time < self.MIN_BACKCHANNEL_INTERVAL_S: return None # Only backchannel while user is speaking if state.phase != ConversationPhase.USER_SPEAKING: return None # ── Trigger 1: ON_PAUSE ── if audio is not None and len(audio) > 0: vad = EnergyVAD(sample_rate=16000) is_pause, pause_dur = vad.detect_pause(audio, self.PAUSE_MIN_S, self.PAUSE_MAX_S) if is_pause: nod_text = random.choice(self.VERBAL_NODS) audio_out = self._tts.synthesize(nod_text, ProsodyParams( base_pitch_hz=160, energy=0.4, warmth=0.8, breathiness=0.2, )) event = BackchannelEvent( trigger=BackchannelTrigger.ON_PAUSE, audio=audio_out, is_verbal=True, label=nod_text, ) self._last_backchannel_time = now logger.debug("[Backchannel] ON_PAUSE nod: '%s' (pause=%.2fs)", nod_text, pause_dur) return event # ── Trigger 2: ON_EMOTION_SHIFT ── current_arousal = state.user_emotion_arousal self._arousal_history.append(current_arousal) if len(self._arousal_history) >= 3: baseline = float(np.mean(list(self._arousal_history)[:-2])) shift = current_arousal - baseline if shift > self.AROUSAL_SPIKE_THRESHOLD: # Determine emotion from valence + arousal emotion = self._classify_emotion_shift( state.user_emotion_valence, current_arousal ) expr_type = self.EMOTION_REACTIONS.get(emotion, NonVerbalType.MM) audio_out = self._nonverbal.synth(expr_type, intensity=0.6) event = BackchannelEvent( trigger=BackchannelTrigger.ON_EMOTION_SHIFT, audio=audio_out, sample_rate=self._nonverbal.sample_rate, is_verbal=False, label=expr_type.value, ) self._last_backchannel_time = now logger.debug("[Backchannel] ON_EMOTION_SHIFT: %s (arousal %.2f→%.2f)", expr_type.value, baseline, current_arousal) return event return None def _classify_emotion_shift(self, valence: float, arousal: float) -> str: """Classify the emotion from valence + arousal.""" if arousal > 0.7 and valence > 0.3: return "joy" if arousal > 0.7 and valence < -0.3: return "anger" if arousal > 0.6 and valence < -0.2: return "fear" if arousal > 0.6: return "surprise" if valence < -0.3: return "sadness" return "neutral" # ═══════════════════════════════════════════════════════════════════════════ # SECTION 7 — Interruption Response # ═══════════════════════════════════════════════════════════════════════════ class InterruptionResponse: """ Generates context-dependent responses when a real interrupt is detected. Instead of just stopping, Nima says one of: - "I'm sorry, were you saying something?" (early in Nima's utterance) - "Sorry, please go ahead." (mid/late in Nima's utterance) The response is chosen based on: - How far into the utterance the interrupt occurred - Whether the user's speech seems urgent (high arousal) - Conversation history (don't apologize every time) """ EARLY_RESPONSES: List[str] = [ "I'm sorry, were you saying something?", "Oh, sorry — please, go ahead.", "My apologies, you were saying?", ] LATE_RESPONSES: List[str] = [ "Sorry, please go ahead.", "Go right ahead — I can wait.", "Of course, after you.", ] URGENT_RESPONSES: List[str] = [ "Of course, go ahead.", "Please, go on.", "I'm listening — go ahead.", ] # Don't apologize more than once every 30s COOLDOWN_S: float = 30.0 def __init__(self): self._last_response_time: float = 0.0 self._response_count: int = 0 def should_respond(self, classification: InterruptClassification) -> bool: """Check if an interruption response should be emitted.""" if classification.interrupt_type != InterruptType.REAL_INTERRUPT: return False # Cooldown: don't respond to every single interrupt now = time.time() if now - self._last_response_time < self.COOLDOWN_S: return False return True def generate_response(self, classification: InterruptClassification, nima_text_progress: float, user_arousal: float = 0.3) -> str: """ Generate the appropriate interruption response text. Args: classification: the interrupt classification nima_text_progress: 0.0 = Nima just started, 1.0 = Nima almost done user_arousal: detected arousal level of the user's interrupt Returns: Response text string. """ self._last_response_time = time.time() self._response_count += 1 # Urgent interrupt (high arousal) → minimal apology if user_arousal > 0.7: return random.choice(self.URGENT_RESPONSES) # Early in utterance (< 30% done) → "were you saying something?" if nima_text_progress < 0.3: return random.choice(self.EARLY_RESPONSES) # Mid/late (>= 30% done) → "please go ahead" return random.choice(self.LATE_RESPONSES) # ═══════════════════════════════════════════════════════════════════════════ # SECTION 7.5 — v2.0.0 "MIND THROUGH VOICE" MODULES # ═══════════════════════════════════════════════════════════════════════════ # # These modules add the layers that separate a synthesizer from a voice # with a mind behind it: adaptive prosody, micro-intonation, affective # mirroring, somatic feedback, episodic memory, narrative continuity, # singing interjections, dynamic laughter, and refined interrupt handling. # ── CONVERSATIONAL FLOW ───────────────────────────────────────────────────── class AdaptiveProsodyShaper: """ Dynamically adjusts pitch, rhythm, and timbre based on emotional state or context. Softer tone when empathetic, brighter when excited. Maps an emotional context (valence + arousal + empathy_level) to concrete prosody modifications applied on top of the base ProsodyParams. """ # Emotion archetype → prosody delta multipliers EMOTION_PROFILES: Dict[str, Dict[str, float]] = { "empathetic": {"pitch_mult": 0.92, "rate_mult": 0.88, "warmth_add": 0.20, "breathiness_add": 0.08, "energy_mult": 0.85}, "excited": {"pitch_mult": 1.18, "rate_mult": 1.12, "warmth_add": 0.05, "breathiness_add": -0.03, "energy_mult": 1.25}, "contemplative": {"pitch_mult": 0.96, "rate_mult": 0.82, "warmth_add": 0.10, "breathiness_add": 0.05, "energy_mult": 0.90}, "concerned": {"pitch_mult": 0.88, "rate_mult": 0.90, "warmth_add": 0.15, "breathiness_add": 0.10, "energy_mult": 0.80}, "joyful": {"pitch_mult": 1.10, "rate_mult": 1.08, "warmth_add": 0.12, "breathiness_add": -0.02, "energy_mult": 1.15}, "vulnerable": {"pitch_mult": 0.85, "rate_mult": 0.85, "warmth_add": 0.25, "breathiness_add": 0.15, "energy_mult": 0.70}, "assertive": {"pitch_mult": 0.98, "rate_mult": 1.05, "warmth_add": -0.05, "breathiness_add": -0.05, "energy_mult": 1.20}, } def shape(self, base_prosody: ProsodyParams, emotion: str = "neutral", valence: float = 0.0, arousal: float = 0.3, empathy_level: float = 0.5) -> ProsodyParams: """ Apply adaptive shaping to base prosody. Args: base_prosody: the starting prosody params emotion: emotion label (empathetic, excited, contemplative, etc.) valence: [-1, 1] emotional valence arousal: [0, 1] emotional arousal empathy_level: [0, 1] how empathetic the response should be Returns: New ProsodyParams with adaptive modifications applied. """ # Start from base shaped = ProsodyParams( base_pitch_hz=base_prosody.base_pitch_hz, speech_rate_wpm=base_prosody.speech_rate_wpm, energy=base_prosody.energy, breathiness=base_prosody.breathiness, warmth=base_prosody.warmth, vibrato_depth=base_prosody.vibrato_depth, pitch_variance=base_prosody.pitch_variance, emotional_tone=emotion, ) # Apply emotion profile profile = self.EMOTION_PROFILES.get(emotion, {}) if profile: shaped.base_pitch_hz *= profile.get("pitch_mult", 1.0) shaped.speech_rate_wpm *= profile.get("rate_mult", 1.0) shaped.energy *= profile.get("energy_mult", 1.0) shaped.warmth = float(min(1.0, max(0.0, shaped.warmth + profile.get("warmth_add", 0.0)))) shaped.breathiness = float(min(0.5, max(0.0, shaped.breathiness + profile.get("breathiness_add", 0.0)))) # Valence → pitch variance (positive = more expressive) shaped.pitch_variance = float(min(0.4, max(0.05, 0.15 + valence * 0.10))) # Arousal → energy + rate shaped.energy = float(min(1.0, shaped.energy * (0.7 + arousal * 0.6))) shaped.speech_rate_wpm *= (0.9 + arousal * 0.3) # Empathy → warmth boost + breathiness (softer, more intimate) if empathy_level > 0.5: empathy_boost = (empathy_level - 0.5) * 2.0 # [0, 1] shaped.warmth = float(min(1.0, shaped.warmth + 0.15 * empathy_boost)) shaped.breathiness = float(min(0.5, shaped.breathiness + 0.05 * empathy_boost)) shaped.base_pitch_hz *= (1.0 - 0.03 * empathy_boost) # slightly lower = more intimate return shaped class MicroIntonationInjector: """ Adds tiny hesitations, breaths, and emphasis shifts that signal thoughtfulness or uncertainty. These make speech feel alive. Injects micro-events at sentence boundaries and before key words: - "..." hesitation (50-150ms pause + subtle pitch drop) - inhale breath (80ms) - emphasis shift (pitch bump on the emphasized word) """ # Words that tend to receive emphasis EMPHASIS_WORDS: Set[str] = { "really", "truly", "actually", "honestly", "important", "never", "always", "exactly", "absolutely", "indeed", } # Hesitation markers (fillers) HESITATIONS: List[str] = ["...", "um", "hmm", "well"] def __init__(self, sample_rate: int = 22050): self.sample_rate = sample_rate self._breath_synth = ProceduralNonVerbalSynth(sample_rate) def inject(self, text: str, prosody: ProsodyParams, thoughtfulness: float = 0.3, uncertainty: float = 0.2) -> Tuple[str, List[Dict[str, Any]]]: """ Analyze text and inject micro-intonation events. Args: text: the input text prosody: current prosody params thoughtfulness: [0, 1] how thoughtful/reflective (more hesitations) uncertainty: [0, 1] how uncertain (more fillers + pitch drops) Returns: (modified_text, events) where events is a list of dicts: {"type": "hesitation"|"breath"|"emphasis", "position": float, "audio": np.ndarray} """ modified = text events: List[Dict[str, Any]] = [] # 1. Add hesitation at sentence start if thoughtful if thoughtfulness > 0.4 and random.random() < thoughtfulness: hesitation = random.choice(self.HESITATIONS[:2]) # "..." or "um" modified = f"{hesitation} {modified}" events.append({ "type": "hesitation", "position": 0.0, "duration_s": 0.1 + thoughtfulness * 0.15, "audio": self._gen_hesitation_audio(0.1 + thoughtfulness * 0.15, prosody), }) # 2. Add breath before commas/periods if thoughtful if thoughtfulness > 0.3: breath_chance = thoughtfulness * 0.6 words = modified.split() new_words = [] for i, word in enumerate(words): new_words.append(word) if word.endswith(",") or word.endswith("."): if random.random() < breath_chance: events.append({ "type": "breath", "position": (i + 1) / len(words), "duration_s": 0.08, "audio": self._breath_synth.synth(NonVerbalType.SIGH, intensity=0.2), }) modified = " ".join(new_words) # 3. Emphasis shifts on key words words = modified.split() for i, word in enumerate(words): clean = word.lower().strip(".,!?;:") if clean in self.EMPHASIS_WORDS: events.append({ "type": "emphasis", "position": i / max(1, len(words)), "word": word, "pitch_bump": 30.0, # Hz bump }) # 4. Uncertainty → trailing pitch drop if uncertainty > 0.5: events.append({ "type": "uncertainty_drop", "position": 1.0, "pitch_drop": 20.0 * uncertainty, }) return modified, events def _gen_hesitation_audio(self, duration: float, prosody: ProsodyParams) -> np.ndarray: """Generate a subtle hesitation sound (low 'um' or breath).""" n = int(self.sample_rate * duration) t = np.linspace(0, duration, n, dtype=np.float64) # Low-pitched nasal 'mm' f0 = prosody.base_pitch_hz * 0.7 source = np.sin(2 * np.pi * f0 * t) * 0.3 # Fade in/out env = np.ones(n) attack = min(int(0.03 * self.sample_rate), n // 3) release = min(int(0.05 * self.sample_rate), n // 3) if attack > 0: env[:attack] = np.linspace(0, 1, attack) if release > 0: env[-release:] = np.linspace(1, 0, release) return (source * env * 0.3).astype(np.float32) class TurnTakingPredictor: """ Predicts when the user is about to finish speaking, so the system can smoothly take the floor instead of waiting for silence. Uses a combination of: - Speech rate deceleration (users slow down at turn ends) - Pitch declination (pitch drops at sentence ends) - Pause lengthening (longer pauses near turn end) - Filler detection ("you know", "so yeah") """ # Turn-end indicators TURN_END_FILLERS: Set[str] = { "you know", "so yeah", "i think", "something like that", "that's about it", "yeah", "right", "anyway", } def __init__(self): self._speech_rate_history: Deque[float] = deque(maxlen=10) self._pitch_history: Deque[float] = deque(maxlen=10) self._pause_history: Deque[float] = deque(maxlen=5) def update(self, speech_rate: float, pitch: float, pause_duration: float): """Update the predictor with recent observations.""" self._speech_rate_history.append(speech_rate) self._pitch_history.append(pitch) self._pause_history.append(pause_duration) def predict_turn_end_probability(self, transcript: str = "") -> float: """ Predict the probability [0, 1] that the user is about to finish. """ prob = 0.0 # 1. Speech rate deceleration if len(self._speech_rate_history) >= 3: recent = list(self._speech_rate_history)[-3:] if recent[2] < recent[0] * 0.8: # slowed down 20%+ prob += 0.3 # 2. Pitch declination if len(self._pitch_history) >= 3: recent = list(self._pitch_history)[-3:] if recent[2] < recent[0] * 0.9: # dropped 10%+ prob += 0.25 # 3. Pause lengthening if len(self._pause_history) >= 2: recent = list(self._pause_history)[-2:] if recent[1] > 0.5: # pause > 500ms prob += 0.2 # 4. Turn-end fillers in transcript if transcript: tl = transcript.lower() for filler in self.TURN_END_FILLERS: if filler in tl: prob += 0.25 break return float(min(1.0, prob)) def should_take_floor(self, transcript: str = "") -> bool: """Returns True if the system should start speaking now.""" return self.predict_turn_end_probability(transcript) > 0.6 # ── EMOTIONAL & COGNITIVE GROUNDING ───────────────────────────────────────── class AffectiveMirror: """ Matches the user's emotional tone (calm, energetic, concerned) with subtle vocal adjustments. The voice subtly reflects the user's state without mimicking it overtly. Mapping: - User calm → Nima slightly slower, warmer - User energetic → Nima slightly faster, brighter - User concerned → Nima softer, lower pitch - User joyful → Nima lighter, more pitch variance """ def mirror(self, user_valence: float, user_arousal: float, base_prosody: ProsodyParams) -> Tuple[ProsodyParams, str]: """ Mirror the user's emotional state in the voice. Returns: (mirrored_prosody, emotion_label) """ mirrored = ProsodyParams( base_pitch_hz=base_prosody.base_pitch_hz, speech_rate_wpm=base_prosody.speech_rate_wpm, energy=base_prosody.energy, breathiness=base_prosody.breathiness, warmth=base_prosody.warmth, vibrato_depth=base_prosody.vibrato_depth, pitch_variance=base_prosody.pitch_variance, ) # Determine user's emotional state if user_arousal < 0.3 and abs(user_valence) < 0.3: emotion = "calm" mirrored.speech_rate_wpm *= 0.95 mirrored.warmth = float(min(1.0, mirrored.warmth + 0.05)) elif user_arousal > 0.6 and user_valence > 0.3: emotion = "energetic" mirrored.speech_rate_wpm *= 1.08 mirrored.base_pitch_hz *= 1.05 mirrored.energy = float(min(1.0, mirrored.energy * 1.1)) elif user_valence < -0.3: emotion = "concerned" mirrored.base_pitch_hz *= 0.95 mirrored.breathiness = float(min(0.3, mirrored.breathiness + 0.05)) mirrored.warmth = float(min(1.0, mirrored.warmth + 0.10)) elif user_valence > 0.4: emotion = "joyful" mirrored.pitch_variance = float(min(0.35, mirrored.pitch_variance + 0.08)) mirrored.base_pitch_hz *= 1.03 else: emotion = "neutral" return mirrored, emotion class SomaticFeedbackIntegrator: """ Ties voice modulation to system "strain" or "energy" states. Like biological fatigue signals — when the system is under strain, the voice becomes slightly slower, breathier, lower-pitched. Reads NIMA's phenomenological_strain and allostatic_load to modulate the voice. This makes the voice itself a signal of the system's internal state. """ def __init__(self): self._current_strain: float = 0.0 self._current_energy: float = 1.0 self._allostatic_load: float = 0.0 def update_from_nima(self, strain: float, allostatic_load: float = 0.0): """Update the somatic state from NIMA's metrics.""" self._current_strain = float(max(0.0, min(2.0, strain))) self._allostatic_load = float(max(0.0, min(1.0, allostatic_load))) # Energy inversely related to strain + allostatic self._current_energy = float(max(0.3, 1.0 - 0.3 * self._current_strain - 0.2 * self._allostatic_load)) def apply_somatic_modulation(self, prosody: ProsodyParams) -> ProsodyParams: """Apply fatigue/strain modulation to prosody.""" if self._current_strain < 0.1 and self._allostatic_load < 0.1: return prosody # no modulation needed modulated = ProsodyParams( base_pitch_hz=prosody.base_pitch_hz, speech_rate_wpm=prosody.speech_rate_wpm, energy=prosody.energy, breathiness=prosody.breathiness, warmth=prosody.warmth, vibrato_depth=prosody.vibrato_depth, pitch_variance=prosody.pitch_variance, emotional_tone=prosody.emotional_tone, ) # Strain → lower pitch, slower, breathier strain_factor = min(1.0, self._current_strain) modulated.base_pitch_hz *= (1.0 - 0.05 * strain_factor) modulated.speech_rate_wpm *= (1.0 - 0.10 * strain_factor) modulated.breathiness = float(min(0.4, modulated.breathiness + 0.08 * strain_factor)) # Allostatic load → reduced energy, more warmth (self-soothing) modulated.energy *= (1.0 - 0.15 * self._allostatic_load) modulated.warmth = float(min(1.0, modulated.warmth + 0.05 * self._allostatic_load)) return modulated @property def strain(self) -> float: return self._current_strain @property def energy(self) -> float: return self._current_energy class EmpathyPhraseGenerator: """ Generates short contextual empathy inserts instead of generic nods. Instead of "mm-hmm", generates "That must feel tough" or "I get what you mean." Selects the phrase based on the user's emotional state + topic keywords. """ # Empathy phrase templates by emotion EMPATHY_PHRASES: Dict[str, List[str]] = { "sadness": [ "That sounds really hard.", "I can hear how much this weighs on you.", "That must feel tough.", "I'm sorry you're going through this.", ], "joy": [ "That's wonderful to hear.", "I can feel your excitement.", "That sounds amazing.", "I love that for you.", ], "anger": [ "That sounds frustrating.", "I can see why that would upset you.", "That would make me angry too.", "You have every right to feel that way.", ], "fear": [ "That sounds scary.", "I can understand why you'd be worried.", "It makes sense that you're concerned.", "That's a lot to sit with.", ], "surprise": [ "Oh wow.", "That's unexpected.", "I didn't see that coming either.", "Hmm, that's something.", ], "neutral": [ "I hear you.", "I get what you mean.", "That makes sense.", "I'm following you.", "Go on, I'm listening.", ], } def generate(self, user_emotion: str = "neutral", user_valence: float = 0.0, user_arousal: float = 0.3) -> str: """Generate a contextual empathy phrase.""" # Map valence/arousal to emotion if not given if user_emotion == "neutral": if user_valence < -0.3 and user_arousal > 0.5: user_emotion = "anger" elif user_valence < -0.3: user_emotion = "sadness" elif user_valence > 0.4 and user_arousal > 0.6: user_emotion = "joy" elif user_arousal > 0.6: user_emotion = "surprise" phrases = self.EMPATHY_PHRASES.get(user_emotion, self.EMPATHY_PHRASES["neutral"]) return random.choice(phrases) # ── MEMORY & CONTINUITY ───────────────────────────────────────────────────── @dataclass class VoiceEvent: """An episodic voice event stored in MemPalace.""" event_id: str = field(default_factory=lambda: f"ve_{uuid.uuid4().hex[:12]}") timestamp: float = field(default_factory=time.time) speaker: str = "nima" # "nima" or "user" text: str = "" audio_duration_s: float = 0.0 prosody_snapshot: Dict[str, float] = field(default_factory=dict) emotion: str = "neutral" valence: float = 0.0 arousal: float = 0.3 strain: float = 0.0 conversation_phase: str = "nima_speaking" interrupt_count: int = 0 backchannel_count: int = 0 class VoiceEventMemoryBridge: """ Stores every utterance as an episodic voice event with affective tags. Later, the system recalls not just what was said but how it was said. This bridge connects OmniVoice to NIMA's MemoryPalace. Each voice event is stored as an Episode with the speaker, text, prosody, and affective state — enabling later recall of vocal quality, not just content. """ def __init__(self, palace: Any = None): """ Args: palace: a NIMA MemoryPalace instance. If None, voice events are stored in an in-memory list (no persistence). """ self._palace = palace self._local_events: Deque[VoiceEvent] = deque(maxlen=500) self._event_count = 0 def store_voice_event(self, event: VoiceEvent) -> str: """Store a voice event in MemPalace (if available) + local buffer.""" self._local_events.append(event) self._event_count += 1 # If NIMA MemoryPalace is available, store as an episode if self._palace is not None: try: self._palace.store_episode( processor_name=f"voice_{event.speaker}", sensory_intensity=event.arousal, affective_weight=abs(event.valence) * 0.5 + event.arousal * 0.5, score=event.strain, valence=event.valence, arousal=event.arousal, novelty=0.3, # could be computed from text novelty input_text=event.text[:500], content={ "speaker": event.speaker, "audio_duration_s": event.audio_duration_s, "prosody_snapshot": event.prosody_snapshot, "emotion": event.emotion, "conversation_phase": event.conversation_phase, "interrupt_count": event.interrupt_count, "backchannel_count": event.backchannel_count, "event_type": "voice_event", }, ) except Exception as e: logger.warning("[VoiceEventMemoryBridge] MemPalace store failed: %s", e) return event.event_id def recall_voice_events(self, speaker: Optional[str] = None, emotion: Optional[str] = None, limit: int = 5) -> List[VoiceEvent]: """Recall recent voice events, optionally filtered.""" results = list(self._local_events) if speaker: results = [e for e in results if e.speaker == speaker] if emotion: results = [e for e in results if e.emotion == emotion] return results[-limit:] def get_stats(self) -> Dict[str, Any]: return { "total_events": self._event_count, "buffered_events": len(self._local_events), "palace_connected": self._palace is not None, } class NarrativeContinuityEngine: """ References past conversations naturally. The voice stream can say "As you mentioned yesterday, you sounded excited about..." because it recalls the episodic voice events with their affective tags. Generates narrative continuity phrases by querying VoiceEventMemoryBridge for past events that match the current context. """ # Continuity phrase templates CONTINUITY_TEMPLATES: List[str] = [ "Earlier you mentioned {topic}. You sounded {emotion} about it.", "As you said before, {topic}. I remember how {emotion} you were.", "Going back to what you said about {topic} — you seemed {emotion}.", "I was thinking about what you said earlier, about {topic}.", "You mentioned {topic} earlier. That stayed with me.", ] def __init__(self, memory_bridge: VoiceEventMemoryBridge): self._memory = memory_bridge def generate_continuity_phrase(self, current_topic: str = "", current_emotion: str = "neutral") -> Optional[str]: """ Generate a natural continuity phrase referencing a past voice event. Returns None if no suitable past event exists. """ past_events = self._memory.recall_voice_events( speaker="user", limit=10 ) if not past_events: return None # Find a past event with different content (not the immediate last) candidate = None for event in reversed(past_events[:-1]): # skip most recent if event.text and len(event.text) > 10: candidate = event break if candidate is None: return None # Extract a topic fragment from the past event topic = self._extract_topic(candidate.text) emotion_word = self._emotion_to_word(candidate.emotion, candidate.valence) template = random.choice(self.CONTINUITY_TEMPLATES) return template.format(topic=topic, emotion=emotion_word) def _extract_topic(self, text: str) -> str: """Extract a short topic phrase from past text.""" words = text.split() if len(words) <= 5: return text # Take a 3-5 word fragment from the middle start = max(0, len(words) // 2 - 2) end = min(len(words), start + 5) fragment = " ".join(words[start:end]).strip(".,!?") return fragment def _emotion_to_word(self, emotion: str, valence: float) -> str: """Map emotion label to a descriptive word.""" mapping = { "joy": "excited" if valence > 0.5 else "positive", "sadness": "down" if valence < -0.3 else "thoughtful", "anger": "frustrated", "fear": "worried", "surprise": "surprised", "neutral": "engaged" if valence > 0 else "reflective", } return mapping.get(emotion, "engaged") # ── EXPRESSIVE EXTENSIONS ─────────────────────────────────────────────────── class SingingInterjectionModule: """ Short melodic phrases (humming, tonal affirmations) woven into speech. These add a distinctive, near-human musicality to the voice. Interjection types: - affirmation_hum: a rising "mm-mm" confirming what was said - thinking_hum: a contemplative "hmmm" while processing - transition_tone: a brief melodic bridge between topics - warmth_chord: a soft harmonic when expressing empathy """ def __init__(self, sample_rate: int = 22050): self.sample_rate = sample_rate self._nonverbal = ProceduralNonVerbalSynth(sample_rate) def synth_affirmation_hum(self, duration: float = 0.4) -> np.ndarray: """A rising 'mm-mm' that affirms what was said.""" n = int(self.sample_rate * duration) t = np.linspace(0, duration, n, dtype=np.float64) # Two-tone: low then high (rising) f0 = 120.0 + 60.0 * (t / duration) phase = 2.0 * np.pi * np.cumsum(f0) / self.sample_rate source = np.sin(phase) * 0.5 # Nasal filter audio = self._nonverbal._lowpass(source, 1500) # Envelope env = np.ones(n) attack = min(int(0.05 * self.sample_rate), n // 4) release = min(int(0.08 * self.sample_rate), n // 4) if attack > 0: env[:attack] = np.linspace(0, 1, attack) if release > 0: env[-release:] = np.linspace(1, 0.3, release) return (audio * env * 0.4).astype(np.float32) def synth_thinking_hum(self, duration: float = 0.6) -> np.ndarray: """A contemplative 'hmmm' while processing.""" n = int(self.sample_rate * duration) t = np.linspace(0, duration, n, dtype=np.float64) # Slightly wavering pitch f0 = 140.0 + 10.0 * np.sin(2 * np.pi * 3.0 * t) phase = 2.0 * np.pi * np.cumsum(f0) / self.sample_rate source = np.sin(phase) * 0.4 audio = self._nonverbal._lowpass(source, 1200) env = np.ones(n) attack = min(int(0.08 * self.sample_rate), n // 4) release = min(int(0.12 * self.sample_rate), n // 4) if attack > 0: env[:attack] = np.linspace(0, 1, attack) if release > 0: env[-release:] = np.linspace(1, 0.4, release) return (audio * env * 0.35).astype(np.float32) def synth_transition_tone(self, duration: float = 0.5) -> np.ndarray: """A brief melodic bridge between topics.""" n = int(self.sample_rate * duration) t = np.linspace(0, duration, n, dtype=np.float64) # Pentatonic-ish rising sequence notes = [220, 261, 293, 329] # A-C-D-E note_duration = duration / len(notes) audio = np.zeros(n) for i, freq in enumerate(notes): start = int(i * note_duration * self.sample_rate) end = min(n, int((i + 1) * note_duration * self.sample_rate)) note_t = t[:end - start] note_phase = 2 * np.pi * freq * note_t note_audio = np.sin(note_phase) * 0.3 # Soft attack/release per note note_len = end - start note_attack = min(int(0.02 * self.sample_rate), note_len // 3) note_env = np.ones(note_len) if note_attack > 0: note_env[:note_attack] = np.linspace(0, 1, note_attack) note_env[-min(int(0.02 * self.sample_rate), note_len // 3):] *= np.linspace(1, 0.3, min(int(0.02 * self.sample_rate), note_len // 3)) audio[start:end] = note_audio * note_env return (audio * 0.3).astype(np.float32) def synth_warmth_chord(self, duration: float = 0.8) -> np.ndarray: """A soft harmonic chord when expressing empathy.""" n = int(self.sample_rate * duration) t = np.linspace(0, duration, n, dtype=np.float64) # Major triad: C-E-G (130, 165, 196 Hz) chord = (np.sin(2 * np.pi * 130 * t) + 0.7 * np.sin(2 * np.pi * 165 * t) + 0.5 * np.sin(2 * np.pi * 196 * t)) / 2.2 audio = self._nonverbal._lowpass(chord, 800) env = np.ones(n) attack = min(int(0.15 * self.sample_rate), n // 3) release = min(int(0.25 * self.sample_rate), n // 3) if attack > 0: env[:attack] = np.linspace(0, 1, attack) if release > 0: env[-release:] = np.linspace(1, 0.2, release) return (audio * env * 0.25).astype(np.float32) @dataclass class MultimodalCue: """A non-audio cue paired with a voice event.""" cue_type: str # "haptic" | "visual" | "light" intensity: float = 0.5 duration_s: float = 0.3 pattern: str = "pulse" # "pulse" | "wave" | "steady" timestamp: float = field(default_factory=time.time) class MultimodalCueEmitter: """ Pairs voice with subtle haptic or visual signals. Example: a soft vibration or light pulse when nodding. This module emits cue events that an external system (robotics, display, haptic actuator) can consume. It doesn't produce audio itself — it produces cue metadata synchronized to voice events. """ def __init__(self): self._cue_history: Deque[MultimodalCue] = deque(maxlen=100) self._cue_callback: Optional[Callable[[MultimodalCue], None]] = None def set_callback(self, callback: Callable[[MultimodalCue], None]): """Set a callback to receive cues in real-time.""" self._cue_callback = callback def emit_for_backchannel(self, is_verbal: bool, intensity: float = 0.5): """Emit a cue when a backchannel is emitted.""" cue = MultimodalCue( cue_type="haptic", intensity=0.3 + intensity * 0.3, duration_s=0.2, pattern="pulse", ) self._emit(cue) def emit_for_empathy(self, emotion: str = "neutral"): """Emit a cue when an empathy phrase is spoken.""" intensity = 0.4 if emotion in ("sadness", "fear") else 0.3 cue = MultimodalCue( cue_type="light", intensity=intensity, duration_s=0.5, pattern="wave", ) self._emit(cue) def emit_for_laughter(self, intensity: float = 0.7): """Emit a cue when laughter is emitted.""" cue = MultimodalCue( cue_type="haptic", intensity=0.4 + intensity * 0.4, duration_s=0.3, pattern="pulse", ) self._emit(cue) def _emit(self, cue: MultimodalCue): self._cue_history.append(cue) if self._cue_callback: try: self._cue_callback(cue) except Exception as e: logger.warning("[MultimodalCueEmitter] callback failed: %s", e) def get_recent_cues(self, n: int = 10) -> List[MultimodalCue]: return list(self._cue_history)[-n:] class DynamicLaughterSynth: """ Procedural laughter that adapts to intensity. Chuckle (low intensity) → full laugh (high intensity). Instead of fixed samples, scales: - Number of "ha" bursts - Pitch (higher for chuckle, lower for full laugh) - Energy - Breathiness """ def __init__(self, sample_rate: int = 22050): self.sample_rate = sample_rate self._nonverbal = ProceduralNonVerbalSynth(sample_rate) def synth(self, intensity: float = 0.5, duration: Optional[float] = None) -> np.ndarray: """ Synthesize adaptive laughter. Args: intensity: [0, 1] 0.2 = chuckle, 0.5 = normal laugh, 0.9 = full laugh duration: override duration (auto-computed if None) Returns: Laughter audio (float32). """ intensity = float(max(0.1, min(1.0, intensity))) # Scale parameters by intensity if intensity < 0.3: # Chuckle: 2-3 "ha"s, higher pitch, quiet n_has = random.randint(2, 3) ha_period = 0.12 pitch = 240 + random.uniform(-20, 20) energy = 0.4 elif intensity < 0.6: # Normal laugh: 4-6 "ha"s n_has = random.randint(4, 6) ha_period = 0.10 pitch = 180 + random.uniform(-15, 15) energy = 0.6 else: # Full laugh: 6-9 "ha"s, lower pitch, loud n_has = random.randint(6, 9) ha_period = 0.09 pitch = 150 + random.uniform(-10, 10) energy = 0.8 total_dur = duration or (n_has * ha_period * 1.3) chunks = [] for i in range(n_has): ha = self._gen_ha(ha_period * 0.7, energy, pitch) gap = np.zeros(int(self.sample_rate * ha_period * 0.3)) # Decay slightly across the laugh decay = 1.0 - 0.2 * (i / max(1, n_has - 1)) chunks.append(ha * decay) chunks.append(gap) # Add trailing breath if intensity > 0.5: breath = self._nonverbal.synth(NonVerbalType.SIGH, intensity=0.3) chunks.append(breath[:int(self.sample_rate * 0.3)]) audio = np.concatenate(chunks) if chunks else np.zeros(0) # Normalize max_val = float(np.max(np.abs(audio))) if len(audio) > 0 else 0.0 if max_val > 0: audio = audio / max_val * 0.7 * intensity return audio.astype(np.float32) def _gen_ha(self, duration: float, intensity: float, pitch: float) -> np.ndarray: """Generate a single 'ha' burst.""" n = int(self.sample_rate * duration) if n < 2: return np.zeros(max(2, n), dtype=np.float32) t = np.linspace(0, duration, n, dtype=np.float64) # Glottal source phase = 2.0 * np.pi * pitch * t source = np.sin(phase) for h in range(2, 5): source += (0.4 / h) * np.sin(phase * h) source /= 3.0 # Breathy noise noise = np.random.normal(0, 0.3, n) mixed = source * 0.6 + noise * 0.4 mixed = self._nonverbal._bandpass(mixed, 400, 3000) # Envelope env = np.ones(n) attack = min(int(0.01 * self.sample_rate), n // 4) release = min(int(0.04 * self.sample_rate), n // 4) if attack > 0: env[:attack] = np.linspace(0, 1, attack) if release > 0: env[-release:] = np.linspace(1, 0, release) return (mixed * env * intensity).astype(np.float32) # ── INTERRUPT HANDLING REFINEMENT ─────────────────────────────────────────── class ContextAwareApologyGenerator: """ Differentiates between casual and serious interruptions. Casual: "Sorry, please go ahead" Serious: "I didn't mean to cut you off, please continue" Determines seriousness from: - How far into the utterance the interrupt occurred (early = more serious) - User's arousal (high = more serious) - Frequency of interrupts (repeated = more serious) """ CASUAL_RESPONSES: List[str] = [ "Sorry, please go ahead.", "Go right ahead.", "After you.", "Of course — go on.", ] SERIOUS_RESPONSES: List[str] = [ "I'm sorry, I didn't mean to cut you off. Please continue.", "My apologies — please, go ahead, I'm listening.", "I'm sorry, were you saying something? Please, continue.", "Forgive me — I didn't mean to interrupt. What were you saying?", ] URGENT_RESPONSES: List[str] = [ "Of course, go ahead.", "Please, go on.", "I'm listening.", ] COOLDOWN_S: float = 15.0 def __init__(self): self._last_response_time: float = 0.0 self._interrupt_history: Deque[float] = deque(maxlen=10) def generate(self, nima_text_progress: float, user_arousal: float = 0.3, interrupt_count: int = 0) -> str: """Generate a context-appropriate apology.""" now = time.time() self._interrupt_history.append(now) # Count recent interrupts (within 60s) recent = sum(1 for t in self._interrupt_history if now - t < 60.0) # Determine seriousness is_serious = ( nima_text_progress < 0.2 or # very early user_arousal > 0.7 or # user is aroused recent > 2 # repeated interrupts ) is_urgent = user_arousal > 0.8 self._last_response_time = now if is_urgent: return random.choice(self.URGENT_RESPONSES) elif is_serious: return random.choice(self.SERIOUS_RESPONSES) else: return random.choice(self.CASUAL_RESPONSES) def should_respond(self, interrupt_type: InterruptType) -> bool: """Check if an apology should be emitted.""" if interrupt_type != InterruptType.REAL_INTERRUPT: return False now = time.time() if now - self._last_response_time < self.COOLDOWN_S: return False return True class NonBlockingContinuationManager: """ Keeps the voice stream flowing even after acknowledging an interrupt. Instead of stopping entirely, the system: 1. Pauses briefly (200ms) 2. Speaks the apology ("Sorry, please go ahead") 3. Yields the floor but remains ready to resume This makes the interaction feel conversational rather than mechanical. """ PAUSE_BEFORE_APOLOGY_S: float = 0.2 RESUME_THRESHOLD_S: float = 1.5 # if user doesn't speak for 1.5s, resume def __init__(self): self._is_paused: bool = False self._pause_start: float = 0.0 self._deferred_text: str = "" self._deferred_position: int = 0 # character position to resume from def yield_floor(self, deferred_text: str, position: int): """Yield the floor but remember where to resume from.""" self._is_paused = True self._pause_start = time.time() self._deferred_text = deferred_text self._deferred_position = position def should_resume(self, user_speaking: bool) -> bool: """Check if the system should resume its deferred utterance.""" if not self._is_paused: return False # Resume if user hasn't spoken for RESUME_THRESHOLD_S if not user_speaking: elapsed = time.time() - self._pause_start if elapsed > self.RESUME_THRESHOLD_S: self._is_paused = False return True return False def get_resume_text(self) -> Optional[str]: """Get the text to resume (from the deferred position).""" if not self._deferred_text: return None remaining = self._deferred_text[self._deferred_position:] # Add a brief resume marker if remaining: return f"As I was saying, {remaining.lower().lstrip()}" return None @property def is_paused(self) -> bool: return self._is_paused # ═══════════════════════════════════════════════════════════════════════════ # SECTION 8 — OmniVoice Engine (main orchestrator) # ═══════════════════════════════════════════════════════════════════════════ class OmniVoiceEngine: """ The main OmniVoice engine. Orchestrates ASR, TTS, non-verbal synthesis, backchannel emission, and interrupt handling into a unified real-time voice conversation system. Usage: engine = OmniVoiceEngine() async for audio_chunk in engine.stream("Hello, how are you?"): play(audio_chunk) """ def __init__(self, whisper_model: str = "base", coqui_model: str = "tts_models/multilingual/multi-dataset/xtts_v2", speaker_wav: Optional[str] = None, language: str = "en", sample_rate: int = 22050, palace: Any = None): logger.info("[OmniVoice] initializing v%s...", OMNIVOICE_VERSION) self.sample_rate = sample_rate # Initialize backends self.asr = WhisperASR(model_name=whisper_model) self.tts = CoquiXTTSBackend(model_name=coqui_model, speaker_wav=speaker_wav, language=language) self.nonverbal = ProceduralNonVerbalSynth(sample_rate=sample_rate) self.backchannel = BackchannelController(self.tts, self.nonverbal, sample_rate) self.interrupt_detector = InterruptDetector(self.asr) self.interrupt_response = InterruptionResponse() # ── v2.0.0 "Mind Through Voice" modules ── self.prosody_shaper = AdaptiveProsodyShaper() self.micro_intonation = MicroIntonationInjector(sample_rate) self.turn_predictor = TurnTakingPredictor() self.affective_mirror = AffectiveMirror() self.somatic_integrator = SomaticFeedbackIntegrator() self.empathy_generator = EmpathyPhraseGenerator() self.voice_memory = VoiceEventMemoryBridge(palace=palace) self.narrative_engine = NarrativeContinuityEngine(self.voice_memory) self.singing = SingingInterjectionModule(sample_rate) self.multimodal = MultimodalCueEmitter() self.dynamic_laughter = DynamicLaughterSynth(sample_rate) self.apology_generator = ContextAwareApologyGenerator() self.continuation_manager = NonBlockingContinuationManager() # State self.state = ConversationState() self._nima_audio_queue: Deque[np.ndarray] = deque() self._user_audio_buffer: List[np.ndarray] = [] self._lock = threading.Lock() logger.info("[OmniVoice] ready (ASR=%s, TTS=%s)", self.asr.mode.value, self.tts.mode.value) def update_prosody_from_nima(self, snapshot: Any) -> ProsodyParams: """ Update prosody parameters from a NIMA ConsciousnessSnapshot. This is the NIMA integration point — when NIMA is ready, pass its snapshot here to drive voice prosody in real-time. """ prosody = ProsodyParams() if snapshot is None: return prosody try: # Map NIMA phi → energy if hasattr(snapshot, "phi") and snapshot.phi: prosody.energy = float(max(0.3, min(1.0, 0.5 + snapshot.phi.phi_composite * 0.5))) # Map NIMA rho → warmth if hasattr(snapshot, "rho") and snapshot.rho: prosody.warmth = float(max(0.2, min(1.0, snapshot.rho.integrity))) # Map NIMA emotion → pitch + tone if hasattr(snapshot, "emotion") and snapshot.emotion: prosody.base_pitch_hz = 180.0 + (snapshot.emotion.arousal - 0.3) * 60.0 prosody.emotional_tone = getattr(snapshot.emotion, "label", "neutral") if snapshot.emotion.valence < -0.3: prosody.pitch_variance = 0.08 # flat for sad elif snapshot.emotion.valence > 0.3: prosody.pitch_variance = 0.25 # expressive for happy # Map qualia authenticity → breathiness if hasattr(snapshot, "qualia") and snapshot.qualia: prosody.breathiness = float(max(0.05, 0.3 - snapshot.qualia.authenticity_index * 0.25)) except Exception as e: logger.warning("[OmniVoice] NIMA snapshot mapping failed: %s", e) return prosody async def stream(self, text: str, prosody: Optional[ProsodyParams] = None, user_audio_stream: Optional[AsyncGenerator[np.ndarray, None]] = None, ) -> AsyncGenerator[np.ndarray, None]: """ Stream synthesized speech for `text`, yielding audio chunks. If `user_audio_stream` is provided, simultaneously monitors for interrupts and emits backchannels. Args: text: text to synthesize prosody: prosody parameters (if None, uses defaults) user_audio_stream: async generator of user audio frames (for real-time interrupt detection + backchanneling) Yields: Audio chunks (float32 numpy arrays at self.sample_rate Hz). """ prosody = prosody or ProsodyParams() self.state.phase = ConversationPhase.NIMA_SPEAKING self.state.nima_speech_start = time.time() self.state.current_text = text self.state.current_text_position = 0.0 # Synthesize the full utterance full_audio = self.tts.synthesize(text, prosody) if len(full_audio) == 0: self.state.phase = ConversationPhase.IDLE return total_duration = len(full_audio) / self.sample_rate chunk_size = int(self.sample_rate * 0.05) # 50ms chunks chunks_yielded = 0 total_chunks = max(1, len(full_audio) // chunk_size) # If no user audio stream, just stream the audio if user_audio_stream is None: for i in range(0, len(full_audio), chunk_size): chunk = full_audio[i:i + chunk_size] self.state.current_text_position = min(1.0, (i + chunk_size) / len(full_audio)) yield chunk self.state.phase = ConversationPhase.IDLE return # ── Real-time mode: stream audio + monitor user ── user_audio_task = asyncio.create_task(self._collect_user_audio(user_audio_stream)) try: for i in range(0, len(full_audio), chunk_size): chunk = full_audio[i:i + chunk_size] chunks_yielded += 1 self.state.current_text_position = min(1.0, chunks_yielded / total_chunks) self.state.nima_speech_duration = time.time() - self.state.nima_speech_start # Check for backchannel emission (while user is speaking) # Note: backchannels are emitted DURING Nima's speech if the user # is also speaking (overlap). This is the "while the speaker is # talking" feature. # Check for interrupts remaining_s = (len(full_audio) - i) / self.sample_rate interrupt = self._check_for_interrupt(remaining_s) if interrupt and self.interrupt_response.should_respond(interrupt): # Yield remaining chunk + interruption response response_text = self.interrupt_response.generate_response( interrupt, self.state.current_text_position, self.state.user_emotion_arousal, ) response_audio = self.tts.synthesize(response_text, ProsodyParams( base_pitch_hz=200, energy=0.6, warmth=0.8, )) yield chunk # yield current chunk # Yield response in smaller chunks for j in range(0, len(response_audio), chunk_size): yield response_audio[j:j + chunk_size] self.state.phase = ConversationPhase.YIELDING self.state.interrupt_count += 1 logger.info("[OmniVoice] interrupted at %.0f%%: '%s'", self.state.current_text_position * 100, response_text) return # Stop streaming Nima's audio yield chunk # Finished speaking without interruption self.state.phase = ConversationPhase.IDLE finally: user_audio_task.cancel() try: await user_audio_task except asyncio.CancelledError: pass async def _collect_user_audio(self, stream: AsyncGenerator[np.ndarray, None]): """Background task: collect user audio for interrupt detection.""" try: async for frame in stream: with self._lock: self._user_audio_buffer.append(frame) # Keep only last 2 seconds max_samples = 16000 * 2 # 2s at 16kHz total = sum(len(f) for f in self._user_audio_buffer) while total > max_samples and self._user_audio_buffer: removed = self._user_audio_buffer.pop(0) total -= len(removed) except asyncio.CancelledError: pass def _check_for_interrupt(self, remaining_s: float) -> Optional[InterruptClassification]: """Check if there's an interrupt in the buffered user audio.""" with self._lock: if not self._user_audio_buffer: return None audio = np.concatenate(self._user_audio_buffer[-5:]) # last ~500ms self._user_audio_buffer.clear() if len(audio) < 1600: # <100ms return None classification = self.interrupt_detector.classify( audio, sample_rate=16000, nima_text_progress=self.state.current_text_position, nima_speech_remaining_s=remaining_s, ) if classification.interrupt_type == InterruptType.REAL_INTERRUPT: return classification # Log ignored interrupts (backchannels, non-verbals) if classification.interrupt_type != InterruptType.SILENCE: logger.debug("[OmniVoice] ignored %s: %s", classification.interrupt_type.value, classification.reason) return None def emit_backchannel(self, user_audio: np.ndarray) -> Optional[BackchannelEvent]: """ Check if a backchannel should be emitted while the user is speaking. Call this with recent user audio frames. Returns a BackchannelEvent if one should fire, else None. """ return self.backchannel.should_backchannel(self.state, user_audio) def synth_non_verbal(self, expr_type: NonVerbalType, intensity: float = 0.7) -> np.ndarray: """Synthesize a non-verbal expression directly.""" return self.nonverbal.synth(expr_type, intensity) def get_stats(self) -> Dict[str, Any]: return { "version": OMNIVOICE_VERSION, "asr_mode": self.asr.mode.value, "tts_mode": self.tts.mode.value, "sample_rate": self.sample_rate, "conversation_state": { "phase": self.state.phase.value, "interrupt_count": self.state.interrupt_count, "backchannel_count": self.state.backchannel_count, }, # v2.0.0 module stats "v2_modules": { "prosody_shaper": "active", "micro_intonation": "active", "turn_predictor": "active", "affective_mirror": "active", "somatic_integrator": { "strain": self.somatic_integrator.strain, "energy": self.somatic_integrator.energy, }, "empathy_generator": "active", "voice_memory": self.voice_memory.get_stats(), "narrative_engine": "active", "singing_interjections": "active", "multimodal_cues": len(self.multimodal.get_recent_cues(1000)), "dynamic_laughter": "active", "apology_generator": "active", "continuation_manager": { "is_paused": self.continuation_manager.is_paused, }, }, } # ═══════════════════════════════════════════════════════════════════════════ # SECTION 9 — NIMA Voice Adapter # ═══════════════════════════════════════════════════════════════════════════ class NimaVoiceAdapter: """ Bridges NIMA's ConsciousnessSnapshot → OmniVoice prosody params. Also bridges NIMA's CTM tournament + MemoryPalace episodes → voice context. v2.0.0: Now integrates ALL "mind through voice" modules: - AdaptiveProsodyShaper (emotion → prosody dynamics) - AffectiveMirror (mirrors user's emotional tone) - SomaticFeedbackIntegrator (strain → voice fatigue) - VoiceEventMemoryBridge (stores voice events in MemPalace) - NarrativeContinuityEngine (references past conversations) Usage: adapter = NimaVoiceAdapter(engine) prosody = adapter.snapshot_to_prosody(nima_snapshot) async for chunk in engine.stream(text, prosody=prosody): ... Full NIMA + CTM + MemPalace integration: # After NIMA's process_stimulus(): adapter.update_from_snapshot(snapshot) adapter.update_from_ctm_winner(ctm_winner) adapter.update_somatic_from_nima(snapshot.phi, snapshot.rho) prosody = adapter.get_contextual_prosody() # After speaking: adapter.store_voice_event(text, prosody, duration_s) """ def __init__(self, engine: OmniVoiceEngine): self._engine = engine self._last_snapshot: Any = None self._last_ctm_winner: Optional[Dict[str, Any]] = None self._last_episode_context: Optional[Dict[str, Any]] = None self._user_emotion: str = "neutral" self._user_valence: float = 0.0 self._user_arousal: float = 0.3 def update_from_snapshot(self, snapshot: Any) -> ProsodyParams: """Update engine prosody from a NIMA ConsciousnessSnapshot.""" self._last_snapshot = snapshot # Extract user emotion from snapshot (if available) if snapshot and hasattr(snapshot, "emotion") and snapshot.emotion: self._user_valence = float(getattr(snapshot.emotion, "valence", 0.0)) self._user_arousal = float(getattr(snapshot.emotion, "arousal", 0.3)) self._user_emotion = getattr(snapshot.emotion, "label", "neutral") return self._engine.update_prosody_from_nima(snapshot) def update_from_ctm_winner(self, ctm_winner: Optional[Dict[str, Any]]) -> None: """ Update engine context from a CTM tournament winner. The winning processor's character influences voice style: - memory_palace → warmer, more nostalgic - somatic_registry → more emotionally resonant - wernicke → clearer, more articulate - broca → faster, more fluent """ if ctm_winner is None: self._last_ctm_winner = None return self._last_ctm_winner = ctm_winner logger.debug("[NimaVoiceAdapter] CTM winner: %s (score=%.3f)", ctm_winner.get("processor_name", "?"), ctm_winner.get("score", 0.0)) def update_somatic_from_nima(self, phi: Any, rho: Any) -> None: """ Update the somatic feedback integrator from NIMA's phi + rho. Ties voice modulation to system strain (biological fatigue signals). """ strain = 0.0 allostatic = 0.0 if phi and hasattr(phi, "phenomenological_strain"): strain = float(phi.phenomenological_strain) # Allostatic load approximation from rho dissonance if rho and hasattr(rho, "dissonance"): allostatic = float(rho.dissonance) self._engine.somatic_integrator.update_from_nima(strain, allostatic) def update_from_episode(self, episode: Optional[Dict[str, Any]]) -> None: """ Update engine context from a MemoryPalace episode. If the episode has high strain or negative valence, the voice should reflect that (lower pitch, more breathiness). """ if episode is None: self._last_episode_context = None return self._last_episode_context = episode logger.debug("[NimaVoiceAdapter] episode context updated: valence=%.2f", episode.get("valence", 0.0)) def get_contextual_prosody(self) -> ProsodyParams: """ Get prosody params that reflect NIMA state + CTM winner + episode context + somatic feedback + affective mirroring + adaptive shaping. This is the FULL v2.0.0 integration — all modules contribute. """ # 1. Start with NIMA snapshot → base prosody prosody = self._engine.update_prosody_from_nima(self._last_snapshot) # 2. Apply affective mirroring (match user's emotional tone) prosody, mirror_emotion = self._engine.affective_mirror.mirror( self._user_valence, self._user_arousal, prosody ) # 3. Apply adaptive prosody shaping (emotion → pitch/rhythm/timbre) empathy_level = 0.5 if self._user_valence < -0.3: empathy_level = 0.8 # more empathetic when user is negative emotion_for_shaping = self._user_emotion if self._user_emotion != "neutral" else mirror_emotion prosody = self._engine.prosody_shaper.shape( prosody, emotion=emotion_for_shaping, valence=self._user_valence, arousal=self._user_arousal, empathy_level=empathy_level, ) # 4. Apply somatic feedback (strain → voice fatigue) prosody = self._engine.somatic_integrator.apply_somatic_modulation(prosody) # 5. Apply CTM winner influence on voice character if self._last_ctm_winner: processor = self._last_ctm_winner.get("processor_name", "") if processor == "memory_palace": prosody.warmth = float(min(1.0, prosody.warmth + 0.10)) prosody.speech_rate_wpm *= 0.95 # more measured, nostalgic elif processor == "somatic_registry": prosody.breathiness = float(min(0.3, prosody.breathiness + 0.05)) prosody.pitch_variance = float(min(0.35, prosody.pitch_variance + 0.05)) elif processor == "wernicke": prosody.speech_rate_wpm *= 1.05 # clearer, more articulate elif processor == "broca": prosody.speech_rate_wpm *= 1.08 # faster, more fluent # 6. Apply episode context modifications if self._last_episode_context: ep = self._last_episode_context strain = ep.get("score", 0.0) if strain > 0.5: prosody.base_pitch_hz -= 10.0 prosody.breathiness = float(min(0.4, prosody.breathiness + 0.05)) if ep.get("valence", 0.0) < -0.3: prosody.warmth = float(min(1.0, prosody.warmth + 0.1)) prosody.speech_rate_wpm -= 10.0 return prosody def store_voice_event(self, text: str, prosody: ProsodyParams, duration_s: float, speaker: str = "nima") -> str: """ Store a voice event in MemPalace with full affective tags. Call this after each utterance to build episodic voice memory. """ event = VoiceEvent( speaker=speaker, text=text, audio_duration_s=duration_s, prosody_snapshot={ "pitch_hz": prosody.base_pitch_hz, "rate_wpm": prosody.speech_rate_wpm, "energy": prosody.energy, "warmth": prosody.warmth, "breathiness": prosody.breathiness, }, emotion=prosody.emotional_tone, valence=self._user_valence, arousal=self._user_arousal, strain=self._engine.somatic_integrator.strain, conversation_phase=self._engine.state.phase.value, interrupt_count=self._engine.state.interrupt_count, backchannel_count=self._engine.state.backchannel_count, ) return self._engine.voice_memory.store_voice_event(event) def get_narrative_continuity(self, current_topic: str = "") -> Optional[str]: """ Generate a narrative continuity phrase referencing a past voice event. Returns None if no suitable past event exists. """ return self._engine.narrative_engine.generate_continuity_phrase(current_topic) def get_empathy_phrase(self) -> str: """Generate a contextual empathy phrase based on current user state.""" return self._engine.empathy_generator.generate( self._user_emotion, self._user_valence, self._user_arousal ) # ═══════════════════════════════════════════════════════════════════════════ # SECTION 10 — Utility functions # ═══════════════════════════════════════════════════════════════════════════ def save_wav(audio: np.ndarray, path: str, sample_rate: int = 22050) -> str: """Save audio array to a WAV file.""" audio_int16 = np.clip(audio * 32767, -32768, 32767).astype(np.int16) with wave.open(path, "wb") as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(sample_rate) wf.writeframes(audio_int16.tobytes()) return path def load_wav(path: str) -> Tuple[np.ndarray, int]: """Load a WAV file into a float32 numpy array.""" with wave.open(path, "rb") as wf: n_channels = wf.getnchannels() sampwidth = wf.getsampwidth() sample_rate = wf.getframerate() frames = wf.readframes(wf.getnframes()) if sampwidth == 2: audio = np.frombuffer(frames, dtype=np.int16).astype(np.float32) / 32768.0 elif sampwidth == 1: audio = (np.frombuffer(frames, dtype=np.uint8).astype(np.float32) - 128) / 128.0 else: raise ValueError(f"Unsupported sample width: {sampwidth}") if n_channels > 1: audio = audio[::n_channels] # mono downmix (take first channel) return audio, sample_rate async def demo(): """OmniVoice Engine demo.""" print("\n" + "=" * 70) print(f" OmniVoice Engine v{OMNIVOICE_VERSION} — Demo") print("=" * 70 + "\n") engine = OmniVoiceEngine() print(f"ASR mode: {engine.asr.mode.value}") print(f"TTS mode: {engine.tts.mode.value}") print() # Test 1: Basic TTS synthesis print("[Test 1] Basic speech synthesis...") prosody = ProsodyParams(base_pitch_hz=180, energy=0.8, warmth=0.7) audio = engine.tts.synthesize("Hello, I am OmniVoice. Nice to meet you.", prosody) print(f" Audio: {len(audio)} samples, {len(audio)/engine.sample_rate:.2f}s") save_wav(audio, "/home/z/my-project/download/omnivoice_test1_speech.wav", engine.sample_rate) print(f" Saved: omnivoice_test1_speech.wav") print() # Test 2: Non-verbal expressions print("[Test 2] Non-verbal expressions...") for expr in [NonVerbalType.LAUGHTER, NonVerbalType.SIGH, NonVerbalType.GASp, NonVerbalType.GROAN, NonVerbalType.AWW, NonVerbalType.MM]: audio = engine.synth_non_verbal(expr, intensity=0.7) print(f" {expr.value:12s}: {len(audio)} samples, {len(audio)/engine.sample_rate:.2f}s") # Save laughter for verification laugh = engine.synth_non_verbal(NonVerbalType.LAUGHTER) save_wav(laugh, "/home/z/my-project/download/omnivoice_test2_laughter.wav", engine.sample_rate) print(f" Saved: omnivoice_test2_laughter.wav") print() # Test 3: Streaming print("[Test 3] Streaming speech...") chunks = [] async for chunk in engine.stream("This is a streaming test of the OmniVoice engine.", prosody=prosody): chunks.append(chunk) full = np.concatenate(chunks) print(f" Streamed {len(chunks)} chunks, total {len(full)} samples, {len(full)/engine.sample_rate:.2f}s") save_wav(full, "/home/z/my-project/download/omnivoice_test3_stream.wav", engine.sample_rate) print(f" Saved: omnivoice_test3_stream.wav") print() # Test 4: Interrupt classification print("[Test 4] Interrupt classification...") # Simulate different types of user speech test_cases = [ ("Backchannel 'yeah'", engine.synth_non_verbal(NonVerbalType.MM, 0.3)[:int(16000*0.4)]), ("Laughter", engine.synth_non_verbal(NonVerbalType.LAUGHTER, 0.7)[:int(16000*0.8)]), ("Sigh", engine.synth_non_verbal(NonVerbalType.SIGH, 0.6)[:int(16000*0.5)]), ] for name, audio in test_cases: cls = engine.interrupt_detector.classify(audio, sample_rate=16000) print(f" {name:25s} → {cls.interrupt_type.value} (conf={cls.confidence:.2f}, reason='{cls.reason}')") print() # Test 5: Interruption response print("[Test 5] Interruption responses...") for progress in [0.1, 0.5, 0.9]: fake_interrupt = InterruptClassification( interrupt_type=InterruptType.REAL_INTERRUPT, confidence=0.8, duration_s=1.5, ) response = engine.interrupt_response.generate_response( fake_interrupt, nima_text_progress=progress, user_arousal=0.4, ) print(f" Progress {progress:.0%}: '{response}'") print() # Test 6: NIMA adapter print("[Test 6] NIMA voice adapter...") adapter = NimaVoiceAdapter(engine) prosody = adapter.get_contextual_prosody() print(f" Default prosody: pitch={prosody.base_pitch_hz:.0f}Hz, energy={prosody.energy:.2f}, warmth={prosody.warmth:.2f}") # Simulate episode context adapter.update_from_episode({"valence": -0.5, "score": 0.7, "processor_name": "somatic_registry"}) prosody2 = adapter.get_contextual_prosody() print(f" With episode (val=-0.5, strain=0.7): pitch={prosody2.base_pitch_hz:.0f}Hz, " f"energy={prosody2.energy:.2f}, warmth={prosody2.warmth:.2f}, breath={prosody2.breathiness:.2f}") print() print("=" * 70) print(f" OmniVoice v{OMNIVOICE_VERSION} Demo Complete") print("=" * 70) if __name__ == "__main__": asyncio.run(demo())