#!/usr/bin/env python3 """ NumberBlocks One Voice Cloner - V5 Fixed Pinned gradio version to avoid jinja2 schema bugs """ import os import random import tempfile import numpy as np import soundfile as sf import torch import torch.nn as nn import torch.nn.functional as F import torchaudio from pathlib import Path from huggingface_hub import hf_hub_download, HfApi import gradio as gr # ============================================================ # 模型定义 - VITS-like RVC Model # ============================================================ class PosteriorEncoder(nn.Module): def __init__(self, in_channels, hidden_channels, kernel_size=5, dilation_rate=1, n_layers=4): super().__init__() self.pre = nn.Conv1d(in_channels, hidden_channels, 1) self.enc = nn.ModuleList() for _ in range(n_layers): self.enc.append(nn.Sequential( nn.Conv1d(hidden_channels, hidden_channels, kernel_size, padding=(kernel_size - 1) * dilation_rate // 2, dilation=dilation_rate), nn.GLU(dim=1), )) self.proj = nn.Conv1d(hidden_channels, hidden_channels * 2, 1) def forward(self, x): x = self.pre(x) for layer in self.enc: x = x + layer(x) stats = self.proj(x) m, logs = stats.chunk(2, dim=1) return m, logs class ResidualCouplingLayer(nn.Module): def __init__(self, channels, hidden_channels, kernel_size=5, dilation_rate=1, n_layers=4): super().__init__() self.pre = nn.Conv1d(channels, hidden_channels, 1) self.enc = nn.ModuleList() for _ in range(n_layers): self.enc.append(nn.Sequential( nn.Conv1d(hidden_channels, hidden_channels, kernel_size, padding=(kernel_size - 1) * dilation_rate // 2, dilation=dilation_rate), nn.GLU(dim=1), )) self.post = nn.Conv1d(hidden_channels, channels * 2, 1) self.post.weight.data.zero_() self.post.bias.data.zero_() def forward(self, x, reverse=False): h = self.pre(x) for layer in self.enc: h = h + layer(h) stats = self.post(h) m, logs = stats.chunk(2, dim=1) if not reverse: log_s = torch.clamp(logs, -5.0, 5.0) y = m + x * torch.exp(log_s) logdet = torch.sum(log_s) return y, logdet else: log_s = torch.clamp(logs, -5.0, 5.0) y = (x - m) * torch.exp(-log_s) return y class Flip(nn.Module): def forward(self, x, reverse=False): if not reverse: return torch.flip(x, [1]), 0 else: return torch.flip(x, [1]) class ResidualCouplingBlock(nn.Module): def __init__(self, channels, hidden_channels, kernel_size=5, dilation_rate=1, n_flows=4, n_layers=4): super().__init__() self.flows = nn.ModuleList() for _ in range(n_flows): self.flows.append(ResidualCouplingLayer(channels, hidden_channels, kernel_size, dilation_rate, n_layers)) self.flows.append(Flip()) def forward(self, x, reverse=False): if not reverse: for flow in self.flows: x, _ = flow(x, reverse=reverse) else: for flow in reversed(self.flows): x = flow(x, reverse=reverse) return x class Decoder(nn.Module): def __init__(self, hidden_channels, out_channels, kernel_size=5, dilation_rate=1, n_layers=4): super().__init__() self.pre = nn.Conv1d(hidden_channels, hidden_channels, 1) self.dec = nn.ModuleList() for _ in range(n_layers): self.dec.append(nn.Sequential( nn.Conv1d(hidden_channels, hidden_channels, kernel_size, padding=(kernel_size - 1) * dilation_rate // 2, dilation=dilation_rate), nn.GLU(dim=1), )) self.proj = nn.Conv1d(hidden_channels, out_channels, 1) def forward(self, x): x = self.pre(x) for layer in self.dec: x = x + layer(x) return self.proj(x) class RVCModel(nn.Module): """VITS-like RVC v3.0 Model""" def __init__(self, n_mels=80, hidden_channels=192): super().__init__() self.enc_p = PosteriorEncoder(n_mels, hidden_channels) self.flow = ResidualCouplingBlock(hidden_channels, hidden_channels) self.dec = Decoder(hidden_channels, n_mels) self.n_mels = n_mels def forward(self, mel): m, logs = self.enc_p(mel) z = m + torch.randn_like(logs) * torch.exp(logs) * 0.0 z_p = self.flow(z) z_back = self.flow(z_p, reverse=True) mel_out = self.dec(z_back) return mel_out def infer(self, mel, noise_scale=0.0): m, logs = self.enc_p(mel) z = m + torch.randn_like(logs) * torch.exp(logs) * noise_scale z_p = self.flow(z) z_back = self.flow(z_p, reverse=True) mel_out = self.dec(z_back) return mel_out # ============================================================ # HiFi-GAN Vocoder # ============================================================ class ResBlock1(nn.Module): def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5)): super().__init__() self.convs = nn.ModuleList() for d in dilation: self.convs.append(nn.Sequential( nn.LeakyReLU(0.1), nn.Conv1d(channels, channels, kernel_size, dilation=d, padding=(kernel_size - 1) * d // 2), nn.LeakyReLU(0.1), nn.Conv1d(channels, channels, kernel_size, dilation=1, padding=(kernel_size - 1) // 2), )) def forward(self, x): for conv in self.convs: x = x + conv(x) return x class HiFiGANGenerator(nn.Module): def __init__(self, in_channels=80, upsample_rates=(8, 8, 2, 2), upsample_kernel_sizes=(16, 16, 4, 4), upsample_initial_channel=512, resblock_kernel_sizes=(3, 7, 11), resblock_dilation_sizes=((1, 3, 5), (1, 3, 5), (1, 3, 5))): super().__init__() self.conv_pre = nn.Conv1d(in_channels, upsample_initial_channel, 7, padding=3) self.num_upsamples = len(upsample_rates) self.num_kernels = len(resblock_kernel_sizes) self.ups = nn.ModuleList() self.resblocks = nn.ModuleList() ch = upsample_initial_channel for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)): ch_new = ch // 2 self.ups.append(nn.ConvTranspose1d(ch, ch_new, k, u, padding=(k - u) // 2)) for _, (rk, rd) in enumerate(zip(resblock_kernel_sizes, resblock_dilation_sizes)): self.resblocks.append(ResBlock1(ch_new, rk, rd)) ch = ch_new self.conv_post = nn.Sequential( nn.LeakyReLU(0.1), nn.Conv1d(ch, 1, 7, padding=3), nn.Tanh(), ) def forward(self, x): x = self.conv_pre(x) for i in range(self.num_upsamples): x = F.leaky_relu(x, 0.1) x = self.ups[i](x) xs = 0 for j in range(self.num_kernels): xs += self.resblocks[i * self.num_kernels + j](x) x = xs / self.num_kernels x = self.conv_post(x) return x # ============================================================ # Mel utilities (no librosa) # ============================================================ def compute_mel(y, sample_rate=40000, n_fft=1024, hop_length=256, n_mels=80): mel_transform = torchaudio.transforms.MelSpectrogram( sample_rate=sample_rate, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels, f_min=0.0, f_max=float(sample_rate // 2), power=2.0, norm=None, mel_scale="htk", ) mel = mel_transform(y) mel = torch.log(torch.clamp(mel, min=1e-5)) return mel def mel_to_audio_griffinlim(mel, sample_rate=40000, n_fft=1024, hop_length=256, n_iter=32): inverse_mel = torchaudio.transforms.InverseMelScale( n_stft=n_fft // 2 + 1, n_mels=mel.shape[0], sample_rate=sample_rate, f_min=0, f_max=float(sample_rate // 2), mel_scale="htk", ) mel_power = torch.exp(mel) spec = inverse_mel(mel_power) griffin_lim = torchaudio.transforms.GriffinLim(n_fft=n_fft, hop_length=hop_length, n_iter=n_iter) audio = griffin_lim(spec) return audio.numpy() # ============================================================ # Inference Engine # ============================================================ class VoiceCloner: def __init__(self): self.device = torch.device('cpu') self.rvc_model = None self.hifigan = None self.sample_rate = 40000 self.dataset_id = "ayf3/numberblocks-one-voice-dataset" self.model_loaded = False self.samples = [] self.load_models() def load_models(self): print("Loading RVC model...") try: model_path = hf_hub_download( repo_id=self.dataset_id, filename="models/one_voice_rvc_v2.pth", repo_type="dataset" ) ckpt = torch.load(model_path, map_location='cpu', weights_only=False) state_dict = ckpt.get('model', ckpt.get('state_dict', ckpt)) hidden_ch = 192 for k, v in state_dict.items(): if 'enc_p.pre.weight' in k: hidden_ch = v.shape[0] break self.rvc_model = RVCModel(n_mels=80, hidden_channels=hidden_ch) self.rvc_model.load_state_dict(state_dict, strict=False) self.rvc_model.eval() print(f"RVC model loaded (hidden={hidden_ch})") except Exception as e: print(f"RVC model load failed: {e}") print("Loading HiFi-GAN vocoder...") try: hifigan_path = hf_hub_download(repo_id="jik876/hifi-gan", filename="UNIVERSAL_V1/g_02500000") ckpt = torch.load(hifigan_path, map_location='cpu', weights_only=False) state_dict = ckpt.get('generator', ckpt.get('state_dict', ckpt)) if any(k.startswith('generator.') for k in state_dict): state_dict = {k.replace('generator.', ''): v for k, v in state_dict.items() if k.startswith('generator.')} self.hifigan = HiFiGANGenerator() self.hifigan.load_state_dict(state_dict, strict=False) self.hifigan.eval() print("HiFi-GAN vocoder loaded") except Exception as e: print(f"HiFi-GAN load failed: {e}, using Griffin-Lim fallback") self.hifigan = None try: api = HfApi() files = api.list_repo_files(self.dataset_id, repo_type="dataset") self.samples = [f for f in files if f.startswith('models/top_') and f.endswith('.wav') and '_p+' not in f and '_p-' not in f and '_s+' not in f] print(f"Found {len(self.samples)} sample audio files") except Exception as e: print(f"Could not list samples: {e}") self.model_loaded = self.rvc_model is not None def process_audio(self, input_audio, pitch_shift=0): if not self.model_loaded: return None, "Model not loaded" if input_audio is None: return None, "Please upload an audio file" try: y, sr = torchaudio.load(input_audio) if y.shape[0] > 1: y = y.mean(dim=0) else: y = y.squeeze(0) if sr != self.sample_rate: y = torchaudio.transforms.Resample(sr, self.sample_rate)(y) sr = self.sample_rate if pitch_shift != 0: factor = 2.0 ** (abs(pitch_shift) / 12.0) new_len = int(len(y) / factor) if pitch_shift > 0 else int(len(y) * factor) y = F.interpolate(y.unsqueeze(0).unsqueeze(0), size=new_len, mode='linear').squeeze(0).squeeze(0) # Trim silence energy = y ** 2 window_size = int(0.1 * sr) if len(energy) > window_size: kernel = torch.ones(window_size) / window_size smooth_energy = F.conv1d( energy.unsqueeze(0).unsqueeze(0), kernel.unsqueeze(0).unsqueeze(0), padding=window_size // 2 ).squeeze() threshold = smooth_energy.max() * (10 ** (-20 / 10)) active = torch.where(smooth_energy > threshold)[0] if len(active) > 0: y = y[active[0]:active[-1] + 1] max_len = 10 * self.sample_rate if len(y) > max_len: y = y[:max_len] mel = compute_mel(y, sample_rate=self.sample_rate, n_mels=80) with torch.no_grad(): mel_out = self.rvc_model.infer(mel.unsqueeze(0), noise_scale=0.0) mel_out = mel_out.squeeze(0) if self.hifigan is not None: with torch.no_grad(): audio_out = self.hifigan(mel_out.unsqueeze(0)) audio_out = audio_out.squeeze(0).squeeze(0).cpu().numpy() vocoder_name = "HiFi-GAN" else: audio_out = mel_to_audio_griffinlim(mel_out, sr=self.sample_rate) vocoder_name = "Griffin-Lim" audio_out = audio_out / (np.max(np.abs(audio_out)) + 1e-7) * 0.95 output_path = tempfile.mktemp(suffix='.wav') sf.write(output_path, audio_out, self.sample_rate) return output_path, f"Success ({vocoder_name}) | {len(y)/sr:.1f}s -> {len(audio_out)/self.sample_rate:.1f}s" except Exception as e: import traceback traceback.print_exc() return None, f"Error: {str(e)}" def generate_random(self): if not self.samples: return None, "No samples available" try: sample = random.choice(self.samples) sample_path = hf_hub_download(repo_id=self.dataset_id, filename=sample, repo_type="dataset") output, msg = self.process_audio(sample_path) if output: return output, f"{msg}\nSample: {Path(sample).name}" return output, msg except Exception as e: return None, f"Error: {str(e)}" # ============================================================ # Gradio UI # ============================================================ print("Initializing NumberBlocks One Voice Cloner...") cloner = VoiceCloner() vc_interface = gr.Interface( fn=cloner.process_audio, inputs=[ gr.Audio(label="Upload Audio", type="filepath"), gr.Slider(minimum=-12, maximum=12, value=0, step=1, label="Pitch Shift (semitones)"), ], outputs=[ gr.Audio(label="Result", type="filepath"), gr.Textbox(label="Status"), ], title="NumberBlocks One Voice Cloner", description="RVC v2 Model (60.7MB) + HiFi-GAN Vocoder | Upload audio to convert to One's voice", allow_flagging="never", ) rand_interface = gr.Interface( fn=cloner.generate_random, inputs=[], outputs=[ gr.Audio(label="Result", type="filepath"), gr.Textbox(label="Status"), ], title="Random Sample Generation", description="Generate from random dataset sample + RVC conversion", allow_flagging="never", ) demo = gr.TabbedInterface( [vc_interface, rand_interface], ["Voice Conversion", "Random Sample"], ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)