#!/usr/bin/env python3 """ NumberBlocks One Voice Cloner - V6 Robust Fixes: user creation in Dockerfile, lazy-load HiFi-GAN, startup-timeout protection """ import os import random import tempfile import numpy as np import soundfile as sf import torch import torch.nn as nn import torch.nn.functional as F import torchaudio from pathlib import Path from huggingface_hub import hf_hub_download, HfApi import gradio as gr print("=== NumberBlocks One Voice Cloner V6 ===") # ============================================================ # Model Definitions # ============================================================ class PosteriorEncoder(nn.Module): def __init__(self, in_channels, hidden_channels, kernel_size=5, dilation_rate=1, n_layers=4): super().__init__() self.pre = nn.Conv1d(in_channels, hidden_channels, 1) self.enc = nn.ModuleList() for _ in range(n_layers): self.enc.append(nn.Sequential( nn.Conv1d(hidden_channels, hidden_channels, kernel_size, padding=(kernel_size - 1) * dilation_rate // 2, dilation=dilation_rate), nn.GLU(dim=1), )) self.proj = nn.Conv1d(hidden_channels, hidden_channels * 2, 1) def forward(self, x): x = self.pre(x) for layer in self.enc: x = x + layer(x) stats = self.proj(x) m, logs = stats.chunk(2, dim=1) return m, logs class ResidualCouplingLayer(nn.Module): def __init__(self, channels, hidden_channels, kernel_size=5, dilation_rate=1, n_layers=4): super().__init__() self.pre = nn.Conv1d(channels, hidden_channels, 1) self.enc = nn.ModuleList() for _ in range(n_layers): self.enc.append(nn.Sequential( nn.Conv1d(hidden_channels, hidden_channels, kernel_size, padding=(kernel_size - 1) * dilation_rate // 2, dilation=dilation_rate), nn.GLU(dim=1), )) self.post = nn.Conv1d(hidden_channels, channels * 2, 1) self.post.weight.data.zero_() self.post.bias.data.zero_() def forward(self, x, reverse=False): h = self.pre(x) for layer in self.enc: h = h + layer(h) stats = self.post(h) m, logs = stats.chunk(2, dim=1) if not reverse: log_s = torch.clamp(logs, -5.0, 5.0) y = m + x * torch.exp(log_s) logdet = torch.sum(log_s) return y, logdet else: log_s = torch.clamp(logs, -5.0, 5.0) y = (x - m) * torch.exp(-log_s) return y class Flip(nn.Module): def forward(self, x, reverse=False): if not reverse: return torch.flip(x, [1]), 0 else: return torch.flip(x, [1]) class ResidualCouplingBlock(nn.Module): def __init__(self, channels, hidden_channels, kernel_size=5, dilation_rate=1, n_flows=4, n_layers=4): super().__init__() self.flows = nn.ModuleList() for _ in range(n_flows): self.flows.append(ResidualCouplingLayer(channels, hidden_channels, kernel_size, dilation_rate, n_layers)) self.flows.append(Flip()) def forward(self, x, reverse=False): if not reverse: for flow in self.flows: x, _ = flow(x, reverse=reverse) else: for flow in reversed(self.flows): x = flow(x, reverse=reverse) return x class Decoder(nn.Module): def __init__(self, hidden_channels, out_channels, kernel_size=5, dilation_rate=1, n_layers=4): super().__init__() self.pre = nn.Conv1d(hidden_channels, hidden_channels, 1) self.dec = nn.ModuleList() for _ in range(n_layers): self.dec.append(nn.Sequential( nn.Conv1d(hidden_channels, hidden_channels, kernel_size, padding=(kernel_size - 1) * dilation_rate // 2, dilation=dilation_rate), nn.GLU(dim=1), )) self.proj = nn.Conv1d(hidden_channels, out_channels, 1) def forward(self, x): x = self.pre(x) for layer in self.dec: x = x + layer(x) return self.proj(x) class RVCModel(nn.Module): def __init__(self, n_mels=80, hidden_channels=192): super().__init__() self.enc_p = PosteriorEncoder(n_mels, hidden_channels) self.flow = ResidualCouplingBlock(hidden_channels, hidden_channels) self.dec = Decoder(hidden_channels, n_mels) self.n_mels = n_mels def forward(self, mel): m, logs = self.enc_p(mel) z = m + torch.randn_like(logs) * torch.exp(logs) * 0.0 z_p = self.flow(z) z_back = self.flow(z_p, reverse=True) mel_out = self.dec(z_back) return mel_out def infer(self, mel, noise_scale=0.0): m, logs = self.enc_p(mel) z = m + torch.randn_like(logs) * torch.exp(logs) * noise_scale z_p = self.flow(z) z_back = self.flow(z_p, reverse=True) mel_out = self.dec(z_back) return mel_out # ============================================================ # HiFi-GAN Vocoder # ============================================================ class ResBlock1(nn.Module): def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5)): super().__init__() self.convs = nn.ModuleList() for d in dilation: self.convs.append(nn.Sequential( nn.LeakyReLU(0.1), nn.Conv1d(channels, channels, kernel_size, dilation=d, padding=(kernel_size - 1) * d // 2), nn.LeakyReLU(0.1), nn.Conv1d(channels, channels, kernel_size, dilation=1, padding=(kernel_size - 1) // 2), )) def forward(self, x): for conv in self.convs: x = x + conv(x) return x class HiFiGANGenerator(nn.Module): def __init__(self, in_channels=80, upsample_rates=(8, 8, 2, 2), upsample_kernel_sizes=(16, 16, 4, 4), upsample_initial_channel=512, resblock_kernel_sizes=(3, 7, 11), resblock_dilation_sizes=((1, 3, 5), (1, 3, 5), (1, 3, 5))): super().__init__() self.conv_pre = nn.Conv1d(in_channels, upsample_initial_channel, 7, padding=3) self.num_upsamples = len(upsample_rates) self.num_kernels = len(resblock_kernel_sizes) self.ups = nn.ModuleList() self.resblocks = nn.ModuleList() ch = upsample_initial_channel for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)): ch_new = ch // 2 self.ups.append(nn.ConvTranspose1d(ch, ch_new, k, u, padding=(k - u) // 2)) for _, (rk, rd) in enumerate(zip(resblock_kernel_sizes, resblock_dilation_sizes)): self.resblocks.append(ResBlock1(ch_new, rk, rd)) ch = ch_new self.conv_post = nn.Sequential( nn.LeakyReLU(0.1), nn.Conv1d(ch, 1, 7, padding=3), nn.Tanh(), ) def forward(self, x): x = self.conv_pre(x) for i in range(self.num_upsamples): x = F.leaky_relu(x, 0.1) x = self.ups[i](x) xs = 0 for j in range(self.num_kernels): xs += self.resblocks[i * self.num_kernels + j](x) x = xs / self.num_kernels x = self.conv_post(x) return x # ============================================================ # Mel utilities (no librosa) # ============================================================ def compute_mel(y, sample_rate=40000, n_fft=1024, hop_length=256, n_mels=80): mel_transform = torchaudio.transforms.MelSpectrogram( sample_rate=sample_rate, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels, f_min=0.0, f_max=float(sample_rate // 2), power=2.0, norm=None, mel_scale="htk", ) mel = mel_transform(y) mel = torch.log(torch.clamp(mel, min=1e-5)) return mel def mel_to_audio_griffinlim(mel, sample_rate=40000, n_fft=1024, hop_length=256, n_iter=32): inverse_mel = torchaudio.transforms.InverseMelScale( n_stft=n_fft // 2 + 1, n_mels=mel.shape[0], sample_rate=sample_rate, f_min=0, f_max=float(sample_rate // 2), mel_scale="htk", ) mel_power = torch.exp(mel) spec = inverse_mel(mel_power) griffin_lim = torchaudio.transforms.GriffinLim(n_fft=n_fft, hop_length=hop_length, n_iter=n_iter) audio = griffin_lim(spec) return audio.numpy() # ============================================================ # Inference Engine - with lazy loading # ============================================================ class VoiceCloner: def __init__(self): self.device = torch.device('cpu') self.rvc_model = None self.hifigan = None # lazy loaded self._hifigan_loaded = False self.sample_rate = 40000 self.dataset_id = "ayf3/numberblocks-one-voice-dataset" self.model_loaded = False self.samples = None # lazy loaded self._load_rvc_only() def _load_rvc_only(self): """Load only the RVC model at startup (fast)""" print("[STARTUP] Loading RVC model...") try: model_path = hf_hub_download( repo_id=self.dataset_id, filename="models/one_voice_rvc_v2.pth", repo_type="dataset" ) ckpt = torch.load(model_path, map_location='cpu', weights_only=False) state_dict = ckpt.get('model', ckpt.get('state_dict', ckpt)) hidden_ch = 192 for k, v in state_dict.items(): if 'enc_p.pre.weight' in k: hidden_ch = v.shape[0] break self.rvc_model = RVCModel(n_mels=80, hidden_channels=hidden_ch) self.rvc_model.load_state_dict(state_dict, strict=False) self.rvc_model.eval() self.model_loaded = True print(f"[STARTUP] RVC model loaded OK (hidden={hidden_ch})") except Exception as e: print(f"[STARTUP] RVC model load FAILED: {e}") def _ensure_hifigan(self): """Lazy-load HiFi-GAN on first inference request""" if self._hifigan_loaded: return self._hifigan_loaded = True print("[LAZY] Loading HiFi-GAN vocoder...") try: hifigan_path = hf_hub_download( repo_id="jik876/hifi-gan", filename="UNIVERSAL_V1/g_02500000" ) ckpt = torch.load(hifigan_path, map_location='cpu', weights_only=False) state_dict = ckpt.get('generator', ckpt.get('state_dict', ckpt)) if any(k.startswith('generator.') for k in state_dict): state_dict = {k.replace('generator.', ''): v for k, v in state_dict.items() if k.startswith('generator.')} self.hifigan = HiFiGANGenerator() self.hifigan.load_state_dict(state_dict, strict=False) self.hifigan.eval() print("[LAZY] HiFi-GAN loaded OK") except Exception as e: print(f"[LAZY] HiFi-GAN FAILED (Griffin-Lim fallback): {e}") self.hifigan = None def _ensure_samples(self): """Lazy-load sample list""" if self.samples is not None: return self.samples = [] try: api = HfApi() files = api.list_repo_files(self.dataset_id, repo_type="dataset") self.samples = [f for f in files if f.startswith('models/top_') and f.endswith('.wav') and '_p+' not in f and '_p-' not in f and '_s+' not in f] print(f"[LAZY] Found {len(self.samples)} samples") except Exception as e: print(f"[LAZY] Could not list samples: {e}") def process_audio(self, input_audio, pitch_shift=0): if not self.model_loaded: return None, "Model not loaded. Check logs." if input_audio is None: return None, "Please upload an audio file." # Lazy load vocoder on first real request self._ensure_hifigan() try: y, sr = torchaudio.load(input_audio) if y.shape[0] > 1: y = y.mean(dim=0) else: y = y.squeeze(0) if sr != self.sample_rate: y = torchaudio.transforms.Resample(sr, self.sample_rate)(y) sr = self.sample_rate if pitch_shift != 0: factor = 2.0 ** (abs(pitch_shift) / 12.0) new_len = int(len(y) / factor) if pitch_shift > 0 else int(len(y) * factor) y = F.interpolate(y.unsqueeze(0).unsqueeze(0), size=new_len, mode='linear').squeeze(0).squeeze(0) # Trim silence energy = y ** 2 window_size = int(0.1 * sr) if len(energy) > window_size: kernel = torch.ones(window_size) / window_size smooth_energy = F.conv1d( energy.unsqueeze(0).unsqueeze(0), kernel.unsqueeze(0).unsqueeze(0), padding=window_size // 2 ).squeeze() threshold = smooth_energy.max() * (10 ** (-20 / 10)) active = torch.where(smooth_energy > threshold)[0] if len(active) > 0: y = y[active[0]:active[-1] + 1] max_len = 10 * self.sample_rate if len(y) > max_len: y = y[:max_len] mel = compute_mel(y, sample_rate=self.sample_rate, n_mels=80) with torch.no_grad(): mel_out = self.rvc_model.infer(mel.unsqueeze(0), noise_scale=0.0) mel_out = mel_out.squeeze(0) if self.hifigan is not None: with torch.no_grad(): audio_out = self.hifigan(mel_out.unsqueeze(0)) audio_out = audio_out.squeeze(0).squeeze(0).cpu().numpy() vocoder_name = "HiFi-GAN" else: audio_out = mel_to_audio_griffinlim(mel_out, sr=self.sample_rate) vocoder_name = "Griffin-Lim" audio_out = audio_out / (np.max(np.abs(audio_out)) + 1e-7) * 0.95 output_path = tempfile.mktemp(suffix='.wav') sf.write(output_path, audio_out, self.sample_rate) return output_path, f"✅ {vocoder_name} | {len(y)/sr:.1f}s → {len(audio_out)/self.sample_rate:.1f}s" except Exception as e: import traceback traceback.print_exc() return None, f"❌ Error: {str(e)}" def generate_random(self): self._ensure_samples() if not self.samples: return None, "No samples available" try: sample = random.choice(self.samples) sample_path = hf_hub_download(repo_id=self.dataset_id, filename=sample, repo_type="dataset") output, msg = self.process_audio(sample_path) if output: return output, f"{msg}\nSample: {Path(sample).name}" return output, msg except Exception as e: return None, f"❌ Error: {str(e)}" # ============================================================ # Gradio UI # ============================================================ print("[STARTUP] Creating VoiceCloner (RVC only, HiFi-GAN lazy)...") cloner = VoiceCloner() print(f"[STARTUP] Ready. model_loaded={cloner.model_loaded}") demo = gr.Blocks(title="NumberBlocks One Voice Cloner") with demo: gr.Markdown("# 🎤 NumberBlocks One Voice Cloner") gr.Markdown("RVC v2 Model (60.7MB) + HiFi-GAN Vocoder | Upload audio → convert to One's voice") with gr.Tab("Voice Conversion"): with gr.Row(): input_audio = gr.Audio(label="Upload Audio", type="filepath") output_audio = gr.Audio(label="Result", type="filepath") pitch_slider = gr.Slider(minimum=-12, maximum=12, value=0, step=1, label="Pitch Shift (semitones)") convert_btn = gr.Button("🎤 Convert Voice", variant="primary") status_text = gr.Textbox(label="Status") convert_btn.click( fn=cloner.process_audio, inputs=[input_audio, pitch_slider], outputs=[output_audio, status_text], ) with gr.Tab("Random Sample"): rand_audio = gr.Audio(label="Result", type="filepath") rand_status = gr.Textbox(label="Status") rand_btn = gr.Button("🎲 Generate Random", variant="primary") rand_btn.click( fn=cloner.generate_random, inputs=[], outputs=[rand_audio, rand_status], ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)