Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| NumberBlocks One Voice Cloner - V5 Fixed | |
| Pinned gradio version to avoid jinja2 schema bugs | |
| """ | |
| import os | |
| import random | |
| import tempfile | |
| import numpy as np | |
| import soundfile as sf | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import torchaudio | |
| from pathlib import Path | |
| from huggingface_hub import hf_hub_download, HfApi | |
| import gradio as gr | |
| # ============================================================ | |
| # 模型定义 - VITS-like RVC Model | |
| # ============================================================ | |
| class PosteriorEncoder(nn.Module): | |
| def __init__(self, in_channels, hidden_channels, kernel_size=5, dilation_rate=1, n_layers=4): | |
| super().__init__() | |
| self.pre = nn.Conv1d(in_channels, hidden_channels, 1) | |
| self.enc = nn.ModuleList() | |
| for _ in range(n_layers): | |
| self.enc.append(nn.Sequential( | |
| nn.Conv1d(hidden_channels, hidden_channels, kernel_size, | |
| padding=(kernel_size - 1) * dilation_rate // 2, | |
| dilation=dilation_rate), | |
| nn.GLU(dim=1), | |
| )) | |
| self.proj = nn.Conv1d(hidden_channels, hidden_channels * 2, 1) | |
| def forward(self, x): | |
| x = self.pre(x) | |
| for layer in self.enc: | |
| x = x + layer(x) | |
| stats = self.proj(x) | |
| m, logs = stats.chunk(2, dim=1) | |
| return m, logs | |
| class ResidualCouplingLayer(nn.Module): | |
| def __init__(self, channels, hidden_channels, kernel_size=5, dilation_rate=1, n_layers=4): | |
| super().__init__() | |
| self.pre = nn.Conv1d(channels, hidden_channels, 1) | |
| self.enc = nn.ModuleList() | |
| for _ in range(n_layers): | |
| self.enc.append(nn.Sequential( | |
| nn.Conv1d(hidden_channels, hidden_channels, kernel_size, | |
| padding=(kernel_size - 1) * dilation_rate // 2, | |
| dilation=dilation_rate), | |
| nn.GLU(dim=1), | |
| )) | |
| self.post = nn.Conv1d(hidden_channels, channels * 2, 1) | |
| self.post.weight.data.zero_() | |
| self.post.bias.data.zero_() | |
| def forward(self, x, reverse=False): | |
| h = self.pre(x) | |
| for layer in self.enc: | |
| h = h + layer(h) | |
| stats = self.post(h) | |
| m, logs = stats.chunk(2, dim=1) | |
| if not reverse: | |
| log_s = torch.clamp(logs, -5.0, 5.0) | |
| y = m + x * torch.exp(log_s) | |
| logdet = torch.sum(log_s) | |
| return y, logdet | |
| else: | |
| log_s = torch.clamp(logs, -5.0, 5.0) | |
| y = (x - m) * torch.exp(-log_s) | |
| return y | |
| class Flip(nn.Module): | |
| def forward(self, x, reverse=False): | |
| if not reverse: | |
| return torch.flip(x, [1]), 0 | |
| else: | |
| return torch.flip(x, [1]) | |
| class ResidualCouplingBlock(nn.Module): | |
| def __init__(self, channels, hidden_channels, kernel_size=5, dilation_rate=1, n_flows=4, n_layers=4): | |
| super().__init__() | |
| self.flows = nn.ModuleList() | |
| for _ in range(n_flows): | |
| self.flows.append(ResidualCouplingLayer(channels, hidden_channels, kernel_size, dilation_rate, n_layers)) | |
| self.flows.append(Flip()) | |
| def forward(self, x, reverse=False): | |
| if not reverse: | |
| for flow in self.flows: | |
| x, _ = flow(x, reverse=reverse) | |
| else: | |
| for flow in reversed(self.flows): | |
| x = flow(x, reverse=reverse) | |
| return x | |
| class Decoder(nn.Module): | |
| def __init__(self, hidden_channels, out_channels, kernel_size=5, dilation_rate=1, n_layers=4): | |
| super().__init__() | |
| self.pre = nn.Conv1d(hidden_channels, hidden_channels, 1) | |
| self.dec = nn.ModuleList() | |
| for _ in range(n_layers): | |
| self.dec.append(nn.Sequential( | |
| nn.Conv1d(hidden_channels, hidden_channels, kernel_size, | |
| padding=(kernel_size - 1) * dilation_rate // 2, | |
| dilation=dilation_rate), | |
| nn.GLU(dim=1), | |
| )) | |
| self.proj = nn.Conv1d(hidden_channels, out_channels, 1) | |
| def forward(self, x): | |
| x = self.pre(x) | |
| for layer in self.dec: | |
| x = x + layer(x) | |
| return self.proj(x) | |
| class RVCModel(nn.Module): | |
| """VITS-like RVC v3.0 Model""" | |
| def __init__(self, n_mels=80, hidden_channels=192): | |
| super().__init__() | |
| self.enc_p = PosteriorEncoder(n_mels, hidden_channels) | |
| self.flow = ResidualCouplingBlock(hidden_channels, hidden_channels) | |
| self.dec = Decoder(hidden_channels, n_mels) | |
| self.n_mels = n_mels | |
| def forward(self, mel): | |
| m, logs = self.enc_p(mel) | |
| z = m + torch.randn_like(logs) * torch.exp(logs) * 0.0 | |
| z_p = self.flow(z) | |
| z_back = self.flow(z_p, reverse=True) | |
| mel_out = self.dec(z_back) | |
| return mel_out | |
| def infer(self, mel, noise_scale=0.0): | |
| m, logs = self.enc_p(mel) | |
| z = m + torch.randn_like(logs) * torch.exp(logs) * noise_scale | |
| z_p = self.flow(z) | |
| z_back = self.flow(z_p, reverse=True) | |
| mel_out = self.dec(z_back) | |
| return mel_out | |
| # ============================================================ | |
| # HiFi-GAN Vocoder | |
| # ============================================================ | |
| class ResBlock1(nn.Module): | |
| def __init__(self, channels, kernel_size=3, dilation=(1, 3, 5)): | |
| super().__init__() | |
| self.convs = nn.ModuleList() | |
| for d in dilation: | |
| self.convs.append(nn.Sequential( | |
| nn.LeakyReLU(0.1), | |
| nn.Conv1d(channels, channels, kernel_size, dilation=d, | |
| padding=(kernel_size - 1) * d // 2), | |
| nn.LeakyReLU(0.1), | |
| nn.Conv1d(channels, channels, kernel_size, dilation=1, | |
| padding=(kernel_size - 1) // 2), | |
| )) | |
| def forward(self, x): | |
| for conv in self.convs: | |
| x = x + conv(x) | |
| return x | |
| class HiFiGANGenerator(nn.Module): | |
| def __init__(self, in_channels=80, upsample_rates=(8, 8, 2, 2), | |
| upsample_kernel_sizes=(16, 16, 4, 4), | |
| upsample_initial_channel=512, | |
| resblock_kernel_sizes=(3, 7, 11), | |
| resblock_dilation_sizes=((1, 3, 5), (1, 3, 5), (1, 3, 5))): | |
| super().__init__() | |
| self.conv_pre = nn.Conv1d(in_channels, upsample_initial_channel, 7, padding=3) | |
| self.num_upsamples = len(upsample_rates) | |
| self.num_kernels = len(resblock_kernel_sizes) | |
| self.ups = nn.ModuleList() | |
| self.resblocks = nn.ModuleList() | |
| ch = upsample_initial_channel | |
| for i, (u, k) in enumerate(zip(upsample_rates, upsample_kernel_sizes)): | |
| ch_new = ch // 2 | |
| self.ups.append(nn.ConvTranspose1d(ch, ch_new, k, u, padding=(k - u) // 2)) | |
| for _, (rk, rd) in enumerate(zip(resblock_kernel_sizes, resblock_dilation_sizes)): | |
| self.resblocks.append(ResBlock1(ch_new, rk, rd)) | |
| ch = ch_new | |
| self.conv_post = nn.Sequential( | |
| nn.LeakyReLU(0.1), | |
| nn.Conv1d(ch, 1, 7, padding=3), | |
| nn.Tanh(), | |
| ) | |
| def forward(self, x): | |
| x = self.conv_pre(x) | |
| for i in range(self.num_upsamples): | |
| x = F.leaky_relu(x, 0.1) | |
| x = self.ups[i](x) | |
| xs = 0 | |
| for j in range(self.num_kernels): | |
| xs += self.resblocks[i * self.num_kernels + j](x) | |
| x = xs / self.num_kernels | |
| x = self.conv_post(x) | |
| return x | |
| # ============================================================ | |
| # Mel utilities (no librosa) | |
| # ============================================================ | |
| def compute_mel(y, sample_rate=40000, n_fft=1024, hop_length=256, n_mels=80): | |
| mel_transform = torchaudio.transforms.MelSpectrogram( | |
| sample_rate=sample_rate, n_fft=n_fft, hop_length=hop_length, | |
| n_mels=n_mels, f_min=0.0, f_max=float(sample_rate // 2), | |
| power=2.0, norm=None, mel_scale="htk", | |
| ) | |
| mel = mel_transform(y) | |
| mel = torch.log(torch.clamp(mel, min=1e-5)) | |
| return mel | |
| def mel_to_audio_griffinlim(mel, sample_rate=40000, n_fft=1024, hop_length=256, n_iter=32): | |
| inverse_mel = torchaudio.transforms.InverseMelScale( | |
| n_stft=n_fft // 2 + 1, n_mels=mel.shape[0], | |
| sample_rate=sample_rate, f_min=0, f_max=float(sample_rate // 2), mel_scale="htk", | |
| ) | |
| mel_power = torch.exp(mel) | |
| spec = inverse_mel(mel_power) | |
| griffin_lim = torchaudio.transforms.GriffinLim(n_fft=n_fft, hop_length=hop_length, n_iter=n_iter) | |
| audio = griffin_lim(spec) | |
| return audio.numpy() | |
| # ============================================================ | |
| # Inference Engine | |
| # ============================================================ | |
| class VoiceCloner: | |
| def __init__(self): | |
| self.device = torch.device('cpu') | |
| self.rvc_model = None | |
| self.hifigan = None | |
| self.sample_rate = 40000 | |
| self.dataset_id = "ayf3/numberblocks-one-voice-dataset" | |
| self.model_loaded = False | |
| self.samples = [] | |
| self.load_models() | |
| def load_models(self): | |
| print("Loading RVC model...") | |
| try: | |
| model_path = hf_hub_download( | |
| repo_id=self.dataset_id, filename="models/one_voice_rvc_v2.pth", repo_type="dataset" | |
| ) | |
| ckpt = torch.load(model_path, map_location='cpu', weights_only=False) | |
| state_dict = ckpt.get('model', ckpt.get('state_dict', ckpt)) | |
| hidden_ch = 192 | |
| for k, v in state_dict.items(): | |
| if 'enc_p.pre.weight' in k: | |
| hidden_ch = v.shape[0] | |
| break | |
| self.rvc_model = RVCModel(n_mels=80, hidden_channels=hidden_ch) | |
| self.rvc_model.load_state_dict(state_dict, strict=False) | |
| self.rvc_model.eval() | |
| print(f"RVC model loaded (hidden={hidden_ch})") | |
| except Exception as e: | |
| print(f"RVC model load failed: {e}") | |
| print("Loading HiFi-GAN vocoder...") | |
| try: | |
| hifigan_path = hf_hub_download(repo_id="jik876/hifi-gan", filename="UNIVERSAL_V1/g_02500000") | |
| ckpt = torch.load(hifigan_path, map_location='cpu', weights_only=False) | |
| state_dict = ckpt.get('generator', ckpt.get('state_dict', ckpt)) | |
| if any(k.startswith('generator.') for k in state_dict): | |
| state_dict = {k.replace('generator.', ''): v for k, v in state_dict.items() if k.startswith('generator.')} | |
| self.hifigan = HiFiGANGenerator() | |
| self.hifigan.load_state_dict(state_dict, strict=False) | |
| self.hifigan.eval() | |
| print("HiFi-GAN vocoder loaded") | |
| except Exception as e: | |
| print(f"HiFi-GAN load failed: {e}, using Griffin-Lim fallback") | |
| self.hifigan = None | |
| try: | |
| api = HfApi() | |
| files = api.list_repo_files(self.dataset_id, repo_type="dataset") | |
| self.samples = [f for f in files if f.startswith('models/top_') | |
| and f.endswith('.wav') | |
| and '_p+' not in f and '_p-' not in f and '_s+' not in f] | |
| print(f"Found {len(self.samples)} sample audio files") | |
| except Exception as e: | |
| print(f"Could not list samples: {e}") | |
| self.model_loaded = self.rvc_model is not None | |
| def process_audio(self, input_audio, pitch_shift=0): | |
| if not self.model_loaded: | |
| return None, "Model not loaded" | |
| if input_audio is None: | |
| return None, "Please upload an audio file" | |
| try: | |
| y, sr = torchaudio.load(input_audio) | |
| if y.shape[0] > 1: | |
| y = y.mean(dim=0) | |
| else: | |
| y = y.squeeze(0) | |
| if sr != self.sample_rate: | |
| y = torchaudio.transforms.Resample(sr, self.sample_rate)(y) | |
| sr = self.sample_rate | |
| if pitch_shift != 0: | |
| factor = 2.0 ** (abs(pitch_shift) / 12.0) | |
| new_len = int(len(y) / factor) if pitch_shift > 0 else int(len(y) * factor) | |
| y = F.interpolate(y.unsqueeze(0).unsqueeze(0), size=new_len, mode='linear').squeeze(0).squeeze(0) | |
| # Trim silence | |
| energy = y ** 2 | |
| window_size = int(0.1 * sr) | |
| if len(energy) > window_size: | |
| kernel = torch.ones(window_size) / window_size | |
| smooth_energy = F.conv1d( | |
| energy.unsqueeze(0).unsqueeze(0), kernel.unsqueeze(0).unsqueeze(0), padding=window_size // 2 | |
| ).squeeze() | |
| threshold = smooth_energy.max() * (10 ** (-20 / 10)) | |
| active = torch.where(smooth_energy > threshold)[0] | |
| if len(active) > 0: | |
| y = y[active[0]:active[-1] + 1] | |
| max_len = 10 * self.sample_rate | |
| if len(y) > max_len: | |
| y = y[:max_len] | |
| mel = compute_mel(y, sample_rate=self.sample_rate, n_mels=80) | |
| with torch.no_grad(): | |
| mel_out = self.rvc_model.infer(mel.unsqueeze(0), noise_scale=0.0) | |
| mel_out = mel_out.squeeze(0) | |
| if self.hifigan is not None: | |
| with torch.no_grad(): | |
| audio_out = self.hifigan(mel_out.unsqueeze(0)) | |
| audio_out = audio_out.squeeze(0).squeeze(0).cpu().numpy() | |
| vocoder_name = "HiFi-GAN" | |
| else: | |
| audio_out = mel_to_audio_griffinlim(mel_out, sr=self.sample_rate) | |
| vocoder_name = "Griffin-Lim" | |
| audio_out = audio_out / (np.max(np.abs(audio_out)) + 1e-7) * 0.95 | |
| output_path = tempfile.mktemp(suffix='.wav') | |
| sf.write(output_path, audio_out, self.sample_rate) | |
| return output_path, f"Success ({vocoder_name}) | {len(y)/sr:.1f}s -> {len(audio_out)/self.sample_rate:.1f}s" | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return None, f"Error: {str(e)}" | |
| def generate_random(self): | |
| if not self.samples: | |
| return None, "No samples available" | |
| try: | |
| sample = random.choice(self.samples) | |
| sample_path = hf_hub_download(repo_id=self.dataset_id, filename=sample, repo_type="dataset") | |
| output, msg = self.process_audio(sample_path) | |
| if output: | |
| return output, f"{msg}\nSample: {Path(sample).name}" | |
| return output, msg | |
| except Exception as e: | |
| return None, f"Error: {str(e)}" | |
| # ============================================================ | |
| # Gradio UI | |
| # ============================================================ | |
| print("Initializing NumberBlocks One Voice Cloner...") | |
| cloner = VoiceCloner() | |
| vc_interface = gr.Interface( | |
| fn=cloner.process_audio, | |
| inputs=[ | |
| gr.Audio(label="Upload Audio", type="filepath"), | |
| gr.Slider(minimum=-12, maximum=12, value=0, step=1, label="Pitch Shift (semitones)"), | |
| ], | |
| outputs=[ | |
| gr.Audio(label="Result", type="filepath"), | |
| gr.Textbox(label="Status"), | |
| ], | |
| title="NumberBlocks One Voice Cloner", | |
| description="RVC v2 Model (60.7MB) + HiFi-GAN Vocoder | Upload audio to convert to One's voice", | |
| allow_flagging="never", | |
| ) | |
| rand_interface = gr.Interface( | |
| fn=cloner.generate_random, | |
| inputs=[], | |
| outputs=[ | |
| gr.Audio(label="Result", type="filepath"), | |
| gr.Textbox(label="Status"), | |
| ], | |
| title="Random Sample Generation", | |
| description="Generate from random dataset sample + RVC conversion", | |
| allow_flagging="never", | |
| ) | |
| demo = gr.TabbedInterface( | |
| [vc_interface, rand_interface], | |
| ["Voice Conversion", "Random Sample"], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |