#!/usr/bin/env python3 """ NumberBlocks One Voice Cloner - V7 Architecture Fix CRITICAL FIX: Model classes now match the actual checkpoint architecture. - n_mels=128 (was 80), hidden=256 (was 192), enc_out=512, z_channels=192 - Encoder: 5 Conv+BN+LayerNorm (not PosteriorEncoder) - Flow: single AffineCouplingFlow (not ResidualCouplingBlock) - Decoder: 5 Conv+BN (not generic Decoder) """ import os import random import tempfile try: import numpy as np except ImportError: # Fallback: use torch operations instead np = None print("[WARN] numpy not available, using torch fallback") import soundfile as sf import torch import torch.nn as nn import torch.nn.functional as F import torchaudio from pathlib import Path from huggingface_hub import hf_hub_download, HfApi import gradio as gr print("=== NumberBlocks One Voice Cloner V7 (Architecture Fix) ===") # ============================================================ # CORRECT Model Architecture # ============================================================ class Encoder(nn.Module): def __init__(self, in_channels=128, hidden=256, out_channels=512): super().__init__() self.conv1 = nn.Conv1d(in_channels, hidden, 5, padding=2) self.bn1 = nn.BatchNorm1d(hidden) self.conv2 = nn.Conv1d(hidden, hidden, 5, padding=2) self.bn2 = nn.BatchNorm1d(hidden) self.conv3 = nn.Conv1d(hidden, hidden, 5, padding=2) self.bn3 = nn.BatchNorm1d(hidden) self.conv4 = nn.Conv1d(hidden, out_channels, 5, padding=2) self.bn4 = nn.BatchNorm1d(out_channels) self.conv5 = nn.Conv1d(out_channels, out_channels, 3, padding=1) self.bn5 = nn.BatchNorm1d(out_channels) self.ln = nn.LayerNorm(out_channels) def forward(self, x): x = F.relu(self.bn1(self.conv1(x))) x = F.relu(self.bn2(self.conv2(x))) x = F.relu(self.bn3(self.conv3(x))) x = F.relu(self.bn4(self.conv4(x))) x = F.relu(self.bn5(self.conv5(x))) x = x.permute(0, 2, 1) x = self.ln(x) x = x.permute(0, 2, 1) return x class Posterior(nn.Module): def __init__(self, in_channels=512, z_channels=192): super().__init__() self.conv = nn.Conv1d(in_channels, z_channels * 2, 1) def forward(self, x): h = self.conv(x) mu, logvar = h.chunk(2, dim=1) return mu, logvar class AffineCouplingFlow(nn.Module): def __init__(self, z_channels=192, hidden=256): super().__init__() self.net = nn.Sequential( nn.Conv1d(z_channels // 2, hidden, 1), nn.ReLU(), nn.Conv1d(hidden, hidden, 1), nn.ReLU(), nn.Conv1d(hidden, z_channels, 1), ) def forward(self, z, reverse=False): z1, z2 = z.chunk(2, dim=1) sb = self.net(z1) s, b = sb.chunk(2, dim=1) s = torch.clamp(s, -5.0, 5.0) if not reverse: z2_new = z2 * torch.exp(s) + b z_out = torch.cat([z1, z2_new], dim=1) logdet = torch.sum(s) return z_out, logdet else: z2_new = (z2 - b) * torch.exp(-s) z_out = torch.cat([z1, z2_new], dim=1) return z_out class Decoder(nn.Module): def __init__(self, in_channels=192, out_channels=128): super().__init__() self.conv1 = nn.Conv1d(in_channels, 512, 5, padding=2) self.bn1 = nn.BatchNorm1d(512) self.conv2 = nn.Conv1d(512, 512, 5, padding=2) self.bn2 = nn.BatchNorm1d(512) self.conv3 = nn.Conv1d(512, 256, 5, padding=2) self.bn3 = nn.BatchNorm1d(256) self.conv4 = nn.Conv1d(256, 256, 3, padding=1) self.bn4 = nn.BatchNorm1d(256) self.conv5 = nn.Conv1d(256, out_channels, 1) def forward(self, x): x = F.relu(self.bn1(self.conv1(x))) x = F.relu(self.bn2(self.conv2(x))) x = F.relu(self.bn3(self.conv3(x))) x = F.relu(self.bn4(self.conv4(x))) x = self.conv5(x) return x class RVCModel(nn.Module): def __init__(self, n_mels=128, hidden=256, enc_out=512, z_channels=192): super().__init__() self.n_mels = n_mels self.encoder = Encoder(n_mels, hidden, enc_out) self.posterior = Posterior(enc_out, z_channels) self.flow = AffineCouplingFlow(z_channels, hidden) self.decoder = Decoder(z_channels, n_mels) def forward(self, mel): h = self.encoder(mel) mu, logvar = self.posterior(h) z = mu + torch.randn_like(logvar) * torch.exp(logvar) * 0.0 z_p, _ = self.flow(z) z_back = self.flow(z_p, reverse=True) mel_out = self.decoder(z_back) return mel_out def infer(self, mel, noise_scale=0.0): h = self.encoder(mel) mu, logvar = self.posterior(h) z = mu + torch.randn_like(logvar) * torch.exp(logvar) * noise_scale z_p, _ = self.flow(z) z_back = self.flow(z_p, reverse=True) mel_out = self.decoder(z_back) return mel_out # ============================================================ # HiFi-GAN Vocoder # ============================================================ class ResBlock1(nn.Module): def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5)): super().__init__() self.convs = nn.ModuleList() for d in dilation: self.convs.append(nn.Sequential( nn.LeakyReLU(0.1), nn.Conv1d(channels, channels, kernel_size, dilation=d, padding=(kernel_size - 1) * d // 2), nn.LeakyReLU(0.1), nn.Conv1d(channels, channels, kernel_size, dilation=1, padding=(kernel_size - 1) // 2), )) def forward(self, x): for conv in self.convs: x = x + conv(x) return x class HiFiGANGenerator(nn.Module): def __init__(self, in_channels=80, upsample_rates=(8, 8, 2, 2), upsample_kernel_sizes=(16, 16, 4, 4), upsample_initial_channel=512, resblock_kernel_sizes=(3, 7, 11), resblock_dilation_sizes=((1, 3, 5), (1, 3, 5), (1, 3, 5))): super().__init__() self.conv_pre = nn.Conv1d(in_channels, upsample_initial_channel, 7, padding=3) self.num_upsamples = len(upsample_rates) self.num_kernels = len(resblock_kernel_sizes) self.ups = nn.ModuleList() self.resblocks = nn.ModuleList() ch = upsample_initial_channel for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)): ch_new = ch // 2 self.ups.append(nn.ConvTranspose1d(ch, ch_new, k, u, padding=(k - u) // 2)) for _, (rk, rd) in enumerate(zip(resblock_kernel_sizes, resblock_dilation_sizes)): self.resblocks.append(ResBlock1(ch_new, rk, rd)) ch = ch_new self.conv_post = nn.Sequential( nn.LeakyReLU(0.1), nn.Conv1d(ch, 1, 7, padding=3), nn.Tanh(), ) def forward(self, x): x = self.conv_pre(x) for i in range(self.num_upsamples): x = F.leaky_relu(x, 0.1) x = self.ups[i](x) xs = 0 for j in range(self.num_kernels): xs += self.resblocks[i * self.num_kernels + j](x) x = xs / self.num_kernels x = self.conv_post(x) return x # ============================================================ # Mel utilities # ============================================================ SAMPLE_RATE = 40000 N_MELS = 128 # MATCHES MODEL def compute_mel(y, sr=SAMPLE_RATE): mel_transform = torchaudio.transforms.MelSpectrogram( sample_rate=sr, n_fft=1024, hop_length=256, n_mels=N_MELS, f_min=0.0, f_max=float(sr // 2), power=2.0, norm=None, mel_scale="htk", ) mel = mel_transform(y) mel = torch.log(torch.clamp(mel, min=1e-5)) return mel def _get_mel_fb_pinv(sr=SAMPLE_RATE, n_mels=N_MELS): """Compute pseudo-inverse of mel filterbank (cached).""" fb = torchaudio.functional.melscale_filterbanks( n_freqs=513, f_min=0, f_max=float(sr // 2), n_mels=n_mels, sample_rate=sr, norm=None, mel_scale="htk", ) return torch.linalg.pinv(fb) # (513, n_mels) _FB_PINV_CACHE = {} def mel_to_audio_griffinlim(mel, sr=SAMPLE_RATE, n_iter=60): key = (sr, mel.shape[0]) if key not in _FB_PINV_CACHE: _FB_PINV_CACHE[key] = _get_mel_fb_pinv(sr=sr, n_mels=mel.shape[0]) fb_pinv = _FB_PINV_CACHE[key] mel_power = torch.exp(mel) spec = fb_pinv @ mel_power spec = torch.clamp(spec, min=0) gl = torchaudio.transforms.GriffinLim(n_fft=1024, hop_length=256, n_iter=n_iter) audio = gl(spec) return audio.detach().cpu().numpy() if np is not None else audio.detach().cpu().tolist() # ============================================================ # Inference Engine # ============================================================ class VoiceCloner: def __init__(self): self.device = torch.device("cpu") self.rvc_model = None self.hifigan = None self._hifigan_loaded = False self.model_loaded = False self.samples = None self.dataset_id = "ayf3/numberblocks-one-voice-dataset" self._load_rvc() def _load_rvc(self): print("[STARTUP] Loading RVC model (V7 correct architecture)...") try: model_path = hf_hub_download( repo_id=self.dataset_id, filename="models/one_voice_rvc_v2.pth", repo_type="dataset" ) ckpt = torch.load(model_path, map_location="cpu", weights_only=False) sd = ckpt["model_state_dict"] model = RVCModel(n_mels=128, hidden=256, enc_out=512, z_channels=192) result = model.load_state_dict(sd, strict=True) print(f"[STARTUP] strict=True: missing={result.missing_keys}, unexpected={result.unexpected_keys}") model.eval() self.rvc_model = model self.model_loaded = True print(f"[STARTUP] RVC model loaded OK (5,296,064 params, strict=True)") except Exception as e: print(f"[STARTUP] RVC model load FAILED: {e}") import traceback traceback.print_exc() def _ensure_hifigan(self): if self._hifigan_loaded: return self._hifigan_loaded = True print("[LAZY] Loading HiFi-GAN vocoder...") try: hifigan_path = hf_hub_download( repo_id="csdc-atl/hifigan-universal_v1", filename="g_02500000" ) ckpt = torch.load(hifigan_path, map_location="cpu", weights_only=False) state_dict = ckpt.get("generator", ckpt.get("state_dict", ckpt)) if any(k.startswith("generator.") for k in state_dict): state_dict = {k.replace("generator.", ""): v for k, v in state_dict.items() if k.startswith("generator.")} self.hifigan = HiFiGANGenerator() self.hifigan.load_state_dict(state_dict, strict=False) self.hifigan.eval() print("[LAZY] HiFi-GAN loaded OK (Griffin-Lim fallback for mel conversion)") except Exception as e: print(f"[LAZY] HiFi-GAN FAILED: {e}") self.hifigan = None def _ensure_samples(self): if self.samples is not None: return self.samples = [] try: api = HfApi() files = api.list_repo_files(self.dataset_id, repo_type="dataset") # Look for cleaned audio files as samples self.samples = [f for f in files if f.startswith("audio/") and f.endswith("_cleaned.wav")] if not self.samples: self.samples = [f for f in files if f.startswith("audio/") and f.endswith(".wav") and not f.endswith("_cleaned.wav")][:10] print(f"[LAZY] Found {len(self.samples)} samples") except Exception as e: print(f"[LAZY] Could not list samples: {e}") def _mel_to_audio(self, mel_out): """Convert mel spectrogram back to audio. RVC model outputs 128-bin mel @ 40kHz. HiFi-GAN expects 80-bin mel @ 22.05kHz. Pipeline: Griffin-Lim(128bin@40k) → audio → resample(22.05k) → mel(80bin) → HiFi-GAN → audio """ if self.hifigan is not None: try: # Step 1: Griffin-Lim to get rough audio at 40kHz audio_gl = mel_to_audio_griffinlim(mel_out, sr=SAMPLE_RATE) audio_tensor = torch.as_tensor(audio_gl, dtype=torch.float32) if isinstance(audio_gl, torch.Tensor) else torch.from_numpy(audio_gl).float() if np is not None else torch.tensor(audio_gl, dtype=torch.float32) # Step 2: Resample 40kHz → 22.05kHz resampler = torchaudio.transforms.Resample(SAMPLE_RATE, 22050) audio_22k = resampler(audio_tensor) # Step 3: Compute 80-bin mel @ 22.05kHz for HiFi-GAN mel_80 = torchaudio.transforms.MelSpectrogram( sample_rate=22050, n_fft=1024, hop_length=256, n_mels=80, f_min=0.0, f_max=8000.0, power=2.0, norm=None, mel_scale="htk", )(audio_22k) mel_80 = torch.log(torch.clamp(mel_80, min=1e-5)) # Step 4: HiFi-GAN with torch.no_grad(): audio_out = self.hifigan(mel_80.unsqueeze(0)) audio_out = audio_out.squeeze(0).squeeze(0).detach().cpu().numpy() if np is not None else audio_out.squeeze(0).squeeze(0).detach().cpu().tolist() return audio_out, 22050, "HiFi-GAN+GL" except Exception as e: print(f"HiFi-GAN pipeline failed, falling back to Griffin-Lim: {e}") # Fallback: Griffin-Lim only audio_out = mel_to_audio_griffinlim(mel_out, sr=SAMPLE_RATE) return audio_out, SAMPLE_RATE, "Griffin-Lim" def process_audio(self, input_audio, pitch_shift=0): if not self.model_loaded: return None, "Model not loaded. Check logs." if input_audio is None: return None, "Please upload an audio file." self._ensure_hifigan() try: audio_data, sr = sf.read(input_audio, dtype="float32") if audio_data.ndim > 1: audio_data = audio_data.mean(axis=1) y = torch.from_numpy(audio_data) if np is not None else torch.tensor(audio_data, dtype=torch.float32) if sr != SAMPLE_RATE: y = torchaudio.transforms.Resample(sr, SAMPLE_RATE)(y) sr = SAMPLE_RATE if pitch_shift != 0: factor = 2.0 ** (abs(pitch_shift) / 12.0) new_len = int(len(y) / factor) if pitch_shift > 0 else int(len(y) * factor) y = F.interpolate(y.unsqueeze(0).unsqueeze(0), size=new_len, mode="linear").squeeze(0).squeeze(0) # Trim silence energy = y ** 2 window_size = int(0.1 * sr) if len(energy) > window_size: kernel = torch.ones(window_size) / window_size smooth_energy = F.conv1d( energy.unsqueeze(0).unsqueeze(0), kernel.unsqueeze(0).unsqueeze(0), padding=window_size // 2 ).squeeze() threshold = smooth_energy.max() * (10 ** (-20 / 10)) active = torch.where(smooth_energy > threshold)[0] if len(active) > 0: y = y[active[0]:active[-1] + 1] max_len = 10 * SAMPLE_RATE if len(y) > max_len: y = y[:max_len] mel = compute_mel(y, sr=SAMPLE_RATE) with torch.no_grad(): mel_out = self.rvc_model.infer(mel.unsqueeze(0), noise_scale=0.0) mel_out = mel_out.squeeze(0) audio_out, out_sr, vocoder_name = self._mel_to_audio(mel_out) audio_out = audio_out / (torch.max(torch.abs(torch.tensor(audio_out) if not isinstance(audio_out, torch.Tensor) else audio_out)) + 1e-7).item() * 0.95 output_path = tempfile.mktemp(suffix=".wav") sf.write(output_path, audio_out, out_sr) return output_path, f"✅ {vocoder_name} | {len(y)/SAMPLE_RATE:.1f}s → {len(audio_out)/out_sr:.1f}s | Model: strict=True, 128-mel" except Exception as e: import traceback traceback.print_exc() return None, f"❌ Error: {str(e)}" def generate_random(self): self._ensure_samples() if not self.samples: return None, "No samples available" try: sample = random.choice(self.samples) sample_path = hf_hub_download(repo_id=self.dataset_id, filename=sample, repo_type="dataset") output, msg = self.process_audio(sample_path) if output: return output, f"{msg}\nSample: {Path(sample).name}" return output, msg except Exception as e: return None, f"❌ Error: {str(e)}" # ============================================================ # Gradio UI # ============================================================ print("[STARTUP] Creating VoiceCloner (V7 correct architecture)...") cloner = VoiceCloner() print(f"[STARTUP] Ready. model_loaded={cloner.model_loaded}") demo = gr.Blocks(title="NumberBlocks One Voice Cloner V7") with demo: gr.Markdown("# 🎤 NumberBlocks One Voice Cloner V7") gr.Markdown("RVC v2 Model (60.7MB, strict=True, 128-mel) + HiFi-GAN Vocoder | Upload audio → convert to One's voice") with gr.Tab("Voice Conversion"): with gr.Row(): input_audio = gr.Audio(label="Upload Audio", type="filepath") output_audio = gr.Audio(label="Result", type="filepath") pitch_slider = gr.Slider(minimum=-12, maximum=12, value=0, step=1, label="Pitch Shift (semitones)") convert_btn = gr.Button("🎤 Convert Voice", variant="primary") status_text = gr.Textbox(label="Status") convert_btn.click( fn=cloner.process_audio, inputs=[input_audio, pitch_slider], outputs=[output_audio, status_text], ) with gr.Tab("Random Sample"): rand_audio = gr.Audio(label="Result", type="filepath") rand_status = gr.Textbox(label="Status") rand_btn = gr.Button("🎲 Generate Random", variant="primary") rand_btn.click( fn=cloner.generate_random, inputs=[], outputs=[rand_audio, rand_status], ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)