#!/usr/bin/env python3 """ Hugging Face Space - Numberblocks One Voice Extraction 🔧 故障修复版 v2 - 修复 NumPy 兼容性和启动错误 主要改进: 1. 改进 NumPy 兼容性处理 2. 增强启动错误处理 3. 添加诊断端点 4. 确保 Flask 能启动即使模型加载失败 """ import os import sys from pathlib import Path from threading import Thread import time import json import traceback from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError # 🔧 修复:更健壮的 NumPy 兼容性补丁 import numpy as np # 确保 np.nan 可用(NumPy 2.x 移除了 np.NaN) if hasattr(np, 'NaN'): pass # NumPy 1.x,已有 np.NaN elif hasattr(np, 'nan'): # NumPy 2.x,添加 np.NaN 别名 np.NaN = np.nan else: # 极端情况:定义一个 np.NaN = float('nan') # 检查补丁是否生效 try: test_nan = np.NaN print(f"✓ NumPy 兼容性检查通过: np.NaN = {test_nan}") except Exception as e: print(f"✗ NumPy 兼容性检查失败: {e}") sys.exit(1) # 捕获导入错误 import_error = None try: from flask import Flask, jsonify from huggingface_hub import HfApi, upload_file, hf_hub_download import torch from pyannote.audio import Pipeline import librosa import soundfile as sf print("✓ 所有依赖导入成功") except Exception as e: import_error = str(e) print(f"✗ 依赖导入失败: {import_error}") # ========== 配置 ========== TARGET_DATASET_ID = "ayf3/numberblocks-one-voice-dataset" SOURCE_DATASET_ID = "ayf3/numberblocks-audio" OUTPUT_DIR = Path("/data/output") ONE_AUDIO_DIR = OUTPUT_DIR / "one_audio" CACHE_DIR = Path("/data/hf_cache") DOWNLOAD_DIR = Path("/data/download") REPORT_FILE = OUTPUT_DIR / "processing_report.json" MODEL_ID = "pyannote/speaker-diarization-3.1" MAX_AUDIO_DURATION = 310 PROCESSING_TIMEOUT = 1800 NUM_PROCESSES = 4 # 创建目录 for dir_path in [CACHE_DIR, OUTPUT_DIR, ONE_AUDIO_DIR, DOWNLOAD_DIR]: dir_path.mkdir(parents=True, exist_ok=True) # 环境变量 HF_TOKEN = os.environ.get("HF_TOKEN") if not HF_TOKEN: print("⚠️ 警告: HF_TOKEN 环境变量未设置", flush=True) # 设置缓存环境变量 os.environ['HF_HOME'] = str(CACHE_DIR) os.environ['HUGGINGFACE_HUB_CACHE'] = str(CACHE_DIR / 'hub') # ========== 全局变量 ========== pipeline_model = None processing_status = { 'state': 'initializing', 'total_files': 0, 'processed_files': 0, 'skipped_files': 0, 'current_file': None, 'output_files': [], 'uploaded_files': [], 'start_time': None, 'error': None, 'model_loaded': False, 'last_update': None, 'estimated_completion': None, 'current_file_duration': None, 'average_processing_time': None, 'total_skipped_duration': 0, 'resumed_files': 0, 'startup_error': import_error # 🔧 新增:记录启动错误 } # ========== 辅助函数 ========== def log(message, level="INFO"): """记录日志""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") msg = f"[{timestamp}] [{level}] {message}" print(msg, flush=True) sys.stdout.flush() def update_status(key, value): """更新状态""" processing_status[key] = value processing_status['last_update'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") def format_duration(seconds): """格式化时长""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) return f"{hours:02d}:{minutes:02d}:{secs:02d}" def upload_single_file(file_path, retry_count=3): """上传单个文件到数据集""" try: api = HfApi(token=HF_TOKEN) for attempt in range(retry_count): try: log(f" 📤 上传: {Path(file_path).name} (尝试 {attempt+1}/{retry_count})") upload_file( path_or_fileobj=str(file_path), repo_id=TARGET_DATASET_ID, path_in_repo=f"audio/{Path(file_path).name}", repo_type="dataset", token=HF_TOKEN ) update_status('uploaded_files', processing_status['uploaded_files'] + [Path(file_path).name]) log(f" ✅ 上传成功") return True except Exception as e: log(f" ⚠️ 上传失败: {str(e)[:100]}", "WARNING") if attempt < retry_count - 1: time.sleep(5) else: log(f" ❌ 上传失败(已重试{retry_count}次)", "ERROR") return False except Exception as e: log(f" ❌ 上传异常: {str(e)[:100]}", "ERROR") return False def download_audio_file(audio_file): """下载单个音频文件""" try: audio_path = DOWNLOAD_DIR / Path(audio_file).name if audio_path.exists(): log(f" ✅ 已存在: {audio_file}") return str(audio_path) log(f" 📥 下载: {audio_file}") hf_hub_download( repo_id=SOURCE_DATASET_ID, filename=audio_file, repo_type="dataset", local_dir=str(DOWNLOAD_DIR), token=HF_TOKEN, cache_dir=str(CACHE_DIR) ) log(f" ✅ 下载完成") return str(audio_path) except Exception as e: log(f" ❌ 下载失败: {str(e)[:100]}", "ERROR") return None def load_model(): """加载说话人分离模型""" global pipeline_model try: log("🔄 加载说话人分离模型...") log(f" 模型: {MODEL_ID}") log(f" 设备: {'cuda' if torch.cuda.is_available() else 'cpu'}") pipeline_model = Pipeline.from_pretrained( MODEL_ID, use_auth_token=HF_TOKEN ) if torch.cuda.is_available(): pipeline_model.to(torch.device("cuda")) log(" ✅ 模型已加载 (GPU)") else: pipeline_model.to(torch.device("cpu")) log(" ✅ 模型已加载 (CPU)") update_status('model_loaded', True) return True except Exception as e: error_msg = str(e) log(f"❌ 模型加载失败: {error_msg}", "ERROR") log(traceback.format_exc(), "ERROR") update_status('error', error_msg) return False def get_audio_files_from_dataset(): """从数据集获取音频文件列表""" try: api = HfApi(token=HF_TOKEN) files = api.list_repo_files( repo_id=SOURCE_DATASET_ID, repo_type="dataset", token=HF_TOKEN ) audio_files = [f for f in files if f.endswith('.wav')] log(f"📁 找到 {len(audio_files)} 个音频文件") return sorted(audio_files) except Exception as e: log(f"❌ 获取文件列表失败: {str(e)}", "ERROR") return [] def process_with_timeout(func, *args, timeout=PROCESSING_TIMEOUT, **kwargs): """在超时限制内执行函数""" with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(func, *args, **kwargs) try: result = future.result(timeout=timeout) return result except FutureTimeoutError: log(f" ❌ 处理超时(超过 {timeout} 秒)", "ERROR") future.cancel() raise TimeoutError(f"Processing timeout after {timeout} seconds") def process_single_file_internal(audio_file, file_idx, total_files): """处理单个音频文件的内部实现""" global pipeline_model try: output_filename = f"{Path(audio_file).stem}_one.wav" output_path = ONE_AUDIO_DIR / output_filename if output_path.exists(): log(f"\n{'='*70}") log(f"[{file_idx}/{total_files}] ⏭️ 跳过(已存在): {audio_file}") update_status('current_file', audio_file) update_status('resumed_files', processing_status['resumed_files'] + 1) file_size = output_path.stat().st_size if file_size > 0: log(f" ✅ 文件已存在: {output_filename} ({file_size/1024/1024:.2f} MB)") return (True, output_filename, 0, "已存在") else: log(f" ⚠️ 文件损坏,重新处理", "WARNING") output_path.unlink() log(f"\n{'='*70}") log(f"[{file_idx}/{total_files}] 🎵 处理: {audio_file}") update_status('current_file', audio_file) audio_path = download_audio_file(audio_file) if not audio_path: log(f" ❌ 下载失败", "ERROR") return (False, None, 0, None) log(f" 📊 分析音频...") audio, sr = librosa.load(audio_path, sr=16000) audio_duration = len(audio) / sr update_status('current_file_duration', audio_duration) log(f" ⏱️ 音频时长: {format_duration(audio_duration)}") if audio_duration > MAX_AUDIO_DURATION: log(f" ⏭️ 跳过:超过时长限制 ({MAX_AUDIO_DURATION}秒)", "WARNING") update_status('skipped_files', processing_status['skipped_files'] + 1) update_status('total_skipped_duration', processing_status['total_skipped_duration'] + audio_duration) if Path(audio_path).exists(): Path(audio_path).unlink() return (False, None, audio_duration, f"超过时长限制 ({format_duration(audio_duration)})") log(f" 🎯 执行说话人分离...") log(f" ⏰ 开始时间: {datetime.now().strftime('%H:%M:%S')}") start_time = time.time() def heartbeat(): while True: time.sleep(60) log(f" 💓 处理中... ({audio_file})") heartbeat_thread = Thread(target=heartbeat, daemon=True) heartbeat_thread.start() diarization = pipeline_model(audio_path) elapsed = time.time() - start_time log(f" ✅ 分离完成,耗时: {format_duration(elapsed)}") if diarization is None: log(f" ❌ 说话人分离失败", "ERROR") return (False, None, audio_duration, "分离失败") log(f" 👥 分析说话人...") speaker_durations = {} for turn, _, speaker in diarization.itertracks(yield_label=True): if speaker not in speaker_durations: speaker_durations[speaker] = 0.0 speaker_durations[speaker] += (turn.end - turn.start) if not speaker_durations: log(f" ⚠️ 未检测到任何说话人", "WARNING") return (False, None, audio_duration, "未检测到说话人") main_speaker = max(speaker_durations, key=speaker_durations.get) main_speaker_duration = speaker_durations[main_speaker] log(f" 👤 检测到 {len(speaker_durations)} 个说话人") for spk, dur in sorted(speaker_durations.items(), key=lambda x: -x[1]): log(f" - {spk}: {format_duration(dur)}") log(f" ✅ 选择主要说话人: {main_speaker} ({format_duration(main_speaker_duration)})") one_audio = np.zeros_like(audio) one_segments = 0 one_duration = 0.0 for turn, _, speaker in diarization.itertracks(yield_label=True): if speaker == main_speaker: start_sample = int(turn.start * sr) end_sample = int(turn.end * sr) segment_duration = turn.end - turn.start one_audio[start_sample:end_sample] = audio[start_sample:end_sample] one_segments += 1 one_duration += segment_duration sf.write(output_path, one_audio, sr) log(f" ✅ 提取成功: {one_segments} 个片段, {one_duration:.1f}秒 ({one_duration*100/audio_duration:.1f}% of audio)") upload_success = upload_single_file(output_path) update_status('processed_files', file_idx) update_status('output_files', processing_status['output_files'] + [output_filename]) if processing_status['start_time']: elapsed_total = (datetime.now() - datetime.strptime(processing_status['start_time'], "%Y-%m-%d %H:%M:%S")).total_seconds() avg_time = elapsed_total / file_idx update_status('average_processing_time', avg_time) remaining_files = total_files - file_idx eta_seconds = avg_time * remaining_files eta_datetime = datetime.now() + timedelta(seconds=eta_seconds) update_status('estimated_completion', eta_datetime.strftime("%Y-%m-%d %H:%M:%S")) log(f" 📊 进度: {file_idx}/{total_files} ({file_idx*100//total_files}%)") log(f" ⏱️ 平均处理时间: {format_duration(avg_time)}/文件") log(f" 🎯 预计完成: {processing_status['estimated_completion']}") if audio_path and Path(audio_path).exists(): Path(audio_path).unlink() return (True, output_filename, one_duration, None) except Exception as e: log(f" ❌ 处理失败: {str(e)[:100]}", "ERROR") log(traceback.format_exc(), "ERROR") return (False, None, 0, str(e)[:100]) def process_single_file(audio_file, file_idx, total_files): """处理单个音频文件(带超时保护)""" try: result = process_with_timeout( process_single_file_internal, audio_file, file_idx, total_files, timeout=PROCESSING_TIMEOUT ) return result except TimeoutError as e: log(f" ❌ 处理超时: {str(e)}", "ERROR") update_status('skipped_files', processing_status['skipped_files'] + 1) return (False, None, 0, "处理超时") except Exception as e: log(f" ❌ 处理异常: {str(e)[:100]}", "ERROR") log(traceback.format_exc(), "ERROR") return (False, None, 0, str(e)[:100]) def process_all_files(): """顺序处理所有文件""" try: update_status('state', 'running') update_status('start_time', datetime.now().strftime("%Y-%m-%d %H:%M:%S")) log("="*70) log("🚀 开始处理音频文件(修复版 v2 + NumPy 兼容性修复)") log(f"⚡ 音频时长限制: {format_duration(MAX_AUDIO_DURATION)}") log(f"⏰ 处理超时: {format_duration(PROCESSING_TIMEOUT)}") log(f"🔧 并行进程数: {NUM_PROCESSES}") log("="*70) if not load_model(): update_status('state', 'error') update_status('error', 'Failed to load model') return audio_files = get_audio_files_from_dataset() update_status('total_files', len(audio_files)) if not audio_files: log("❌ 没有找到音频文件", "ERROR") update_status('state', 'error') update_status('error', 'No audio files found') return existing_files = set([f.name for f in ONE_AUDIO_DIR.glob("*.wav")]) if existing_files: log(f"🔄 断点续传: 发现 {len(existing_files)} 个已处理文件") results = [] skipped = [] total_one_duration = 0.0 for idx, audio_file in enumerate(audio_files, 1): success, filename, duration, skip_reason = process_single_file(audio_file, idx, len(audio_files)) if success: results.append({ 'source': audio_file, 'output': filename, 'duration': duration }) total_one_duration += duration elif skip_reason: skipped.append({ 'source': audio_file, 'reason': skip_reason, 'duration': duration }) log("\n" + "="*70) log("📊 处理完成") log("="*70) log(f"总文件数: {len(audio_files)}") log(f"成功提取: {len(results)}") log(f"断点续传: {processing_status['resumed_files']}") log(f"跳过文件: {len(skipped)}") log(f"失败: {len(audio_files) - len(results) - len(skipped)}") log(f"One 音频总时长: {total_one_duration / 3600:.2f} 小时") if skipped: log(f"\n⏭️ 跳过的文件:") for s in skipped[:10]: log(f" - {s['source']}: {s['reason']}") if len(skipped) > 10: log(f" ... 还有 {len(skipped) - 10} 个文件") log("="*70) report = { 'total_files': len(audio_files), 'successful': len(results), 'resumed': processing_status['resumed_files'], 'skipped': len(skipped), 'failed': len(audio_files) - len(results) - len(skipped), 'total_one_audio_hours': total_one_duration / 3600, 'output_files': [r['output'] for r in results], 'skipped_files': skipped, 'details': results, 'start_time': processing_status['start_time'], 'end_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'configuration': { 'max_audio_duration': MAX_AUDIO_DURATION, 'processing_timeout': PROCESSING_TIMEOUT, 'num_processes': NUM_PROCESSES } } with open(REPORT_FILE, 'w') as f: json.dump(report, f, indent=2) try: api = HfApi(token=HF_TOKEN) upload_file( path_or_fileobj=str(REPORT_FILE), repo_id=TARGET_DATASET_ID, path_in_repo="processing_report.json", repo_type="dataset", token=HF_TOKEN ) log("✅ 处理报告已上传") except Exception as e: log(f"处理报告上传失败: {e}", "ERROR") update_status('state', 'completed') except Exception as e: log(f"❌ 处理异常: {e}", "ERROR") log(traceback.format_exc(), "ERROR") update_status('state', 'error') update_status('error', str(e)) # ========== Flask 应用 ========== app = Flask(__name__) @app.route('/') def index(): """主页""" return jsonify({ 'message': 'Numberblocks One Voice Extraction (修复版 v2)', 'fix_version': 'v2 - NumPy 兼容性修复', 'configuration': { 'max_audio_duration': MAX_AUDIO_DURATION, 'processing_timeout': PROCESSING_TIMEOUT, 'num_processes': NUM_PROCESSES }, 'status': processing_status, 'endpoints': { '/status': 'Get processing status', '/files': 'List output files', '/report': 'Get processing report', '/diagnose': '🔧 新增:诊断信息' } }) @app.route('/status') def status(): """获取处理状态""" return jsonify(processing_status) @app.route('/files') def list_files(): """列出输出文件""" try: files = sorted([f.name for f in ONE_AUDIO_DIR.glob("*.wav")]) return jsonify({ 'count': len(files), 'files': files }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/report') def get_report(): """获取处理报告""" try: if REPORT_FILE.exists(): with open(REPORT_FILE, 'r') as f: report = json.load(f) return jsonify(report) else: return jsonify({'error': 'Report not found'}), 404 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/health') def health(): """健康检查""" return jsonify({'status': 'ok'}), 200 # 🔧 新增:诊断端点 @app.route('/diagnose') def diagnose(): """诊断信息""" import sys diagnostics = { 'python_version': sys.version, 'numpy_version': getattr(np, '__version__', 'unknown'), 'numpy_has_NaN': hasattr(np, 'NaN'), 'numpy_has_nan': hasattr(np, 'nan'), 'torch_version': getattr(torch, '__version__', 'unknown' if 'torch' in sys.modules else 'not_imported'), 'cuda_available': torch.cuda.is_available() if 'torch' in sys.modules else False, 'hf_token_set': bool(HF_TOKEN), 'hf_token_length': len(HF_TOKEN) if HF_TOKEN else 0, 'startup_error': processing_status.get('startup_error'), 'model_loaded': processing_status.get('model_loaded'), 'current_state': processing_status.get('state'), 'directories': { 'OUTPUT_DIR': str(OUTPUT_DIR), 'ONE_AUDIO_DIR': str(ONE_AUDIO_DIR), 'CACHE_DIR': str(CACHE_DIR), 'DOWNLOAD_DIR': str(DOWNLOAD_DIR), }, 'permissions': { 'OUTPUT_DIR_writable': os.access(OUTPUT_DIR, os.W_OK), 'CACHE_DIR_writable': os.access(CACHE_DIR, os.W_OK), } } return jsonify(diagnostics) def start_background_processing(): """在后台线程中启动处理任务""" def run_processing(): try: process_all_files() except Exception as e: log(f"❌ 后台处理异常: {e}", "ERROR") update_status('state', 'error') update_status('error', str(e)) thread = Thread(target=run_processing, daemon=True) thread.start() log("✅ 后台处理线程已启动") # ========== 主程序 ========== if __name__ == '__main__': log("="*70) log("Numberblocks One Voice Extraction - 修复版 v2") log("="*70) log(f"源数据集: {SOURCE_DATASET_ID}") log(f"目标数据集: {TARGET_DATASET_ID}") log(f"输出目录: {ONE_AUDIO_DIR}") log(f"🔧 主要修复: NumPy 2.x 兼容性 + 更好的错误处理") log("="*70) # 检查是否有启动错误 if import_error: log(f"⚠️ 启动时发现导入错误: {import_error}", "ERROR") log("⚠️ 模型加载可能会失败", "WARNING") # 启动时自动开始处理 start_background_processing() # 启动 Flask 服务器 log("✅ Flask 服务器启动中...") app.run(host='0.0.0.0', port=7860)