NumberBlocks Bot
🔧 修复 HTTP 500 错误 - NumPy 兼容性处理
a2b53be
#!/usr/bin/env python3
"""
Hugging Face Space - Numberblocks One Voice Extraction
🔧 故障修复版 v2 - 修复 NumPy 兼容性和启动错误
主要改进:
1. 改进 NumPy 兼容性处理
2. 增强启动错误处理
3. 添加诊断端点
4. 确保 Flask 能启动即使模型加载失败
"""
import os
import sys
from pathlib import Path
from threading import Thread
import time
import json
import traceback
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
# 🔧 修复:更健壮的 NumPy 兼容性补丁
import numpy as np
# 确保 np.nan 可用(NumPy 2.x 移除了 np.NaN)
if hasattr(np, 'NaN'):
pass # NumPy 1.x,已有 np.NaN
elif hasattr(np, 'nan'):
# NumPy 2.x,添加 np.NaN 别名
np.NaN = np.nan
else:
# 极端情况:定义一个
np.NaN = float('nan')
# 检查补丁是否生效
try:
test_nan = np.NaN
print(f"✓ NumPy 兼容性检查通过: np.NaN = {test_nan}")
except Exception as e:
print(f"✗ NumPy 兼容性检查失败: {e}")
sys.exit(1)
# 捕获导入错误
import_error = None
try:
from flask import Flask, jsonify
from huggingface_hub import HfApi, upload_file, hf_hub_download
import torch
from pyannote.audio import Pipeline
import librosa
import soundfile as sf
print("✓ 所有依赖导入成功")
except Exception as e:
import_error = str(e)
print(f"✗ 依赖导入失败: {import_error}")
# ========== 配置 ==========
TARGET_DATASET_ID = "ayf3/numberblocks-one-voice-dataset"
SOURCE_DATASET_ID = "ayf3/numberblocks-audio"
OUTPUT_DIR = Path("/data/output")
ONE_AUDIO_DIR = OUTPUT_DIR / "one_audio"
CACHE_DIR = Path("/data/hf_cache")
DOWNLOAD_DIR = Path("/data/download")
REPORT_FILE = OUTPUT_DIR / "processing_report.json"
MODEL_ID = "pyannote/speaker-diarization-3.1"
MAX_AUDIO_DURATION = 310
PROCESSING_TIMEOUT = 1800
NUM_PROCESSES = 4
# 创建目录
for dir_path in [CACHE_DIR, OUTPUT_DIR, ONE_AUDIO_DIR, DOWNLOAD_DIR]:
dir_path.mkdir(parents=True, exist_ok=True)
# 环境变量
HF_TOKEN = os.environ.get("HF_TOKEN")
if not HF_TOKEN:
print("⚠️ 警告: HF_TOKEN 环境变量未设置", flush=True)
# 设置缓存环境变量
os.environ['HF_HOME'] = str(CACHE_DIR)
os.environ['HUGGINGFACE_HUB_CACHE'] = str(CACHE_DIR / 'hub')
# ========== 全局变量 ==========
pipeline_model = None
processing_status = {
'state': 'initializing',
'total_files': 0,
'processed_files': 0,
'skipped_files': 0,
'current_file': None,
'output_files': [],
'uploaded_files': [],
'start_time': None,
'error': None,
'model_loaded': False,
'last_update': None,
'estimated_completion': None,
'current_file_duration': None,
'average_processing_time': None,
'total_skipped_duration': 0,
'resumed_files': 0,
'startup_error': import_error # 🔧 新增:记录启动错误
}
# ========== 辅助函数 ==========
def log(message, level="INFO"):
"""记录日志"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
msg = f"[{timestamp}] [{level}] {message}"
print(msg, flush=True)
sys.stdout.flush()
def update_status(key, value):
"""更新状态"""
processing_status[key] = value
processing_status['last_update'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def format_duration(seconds):
"""格式化时长"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
def upload_single_file(file_path, retry_count=3):
"""上传单个文件到数据集"""
try:
api = HfApi(token=HF_TOKEN)
for attempt in range(retry_count):
try:
log(f" 📤 上传: {Path(file_path).name} (尝试 {attempt+1}/{retry_count})")
upload_file(
path_or_fileobj=str(file_path),
repo_id=TARGET_DATASET_ID,
path_in_repo=f"audio/{Path(file_path).name}",
repo_type="dataset",
token=HF_TOKEN
)
update_status('uploaded_files', processing_status['uploaded_files'] + [Path(file_path).name])
log(f" ✅ 上传成功")
return True
except Exception as e:
log(f" ⚠️ 上传失败: {str(e)[:100]}", "WARNING")
if attempt < retry_count - 1:
time.sleep(5)
else:
log(f" ❌ 上传失败(已重试{retry_count}次)", "ERROR")
return False
except Exception as e:
log(f" ❌ 上传异常: {str(e)[:100]}", "ERROR")
return False
def download_audio_file(audio_file):
"""下载单个音频文件"""
try:
audio_path = DOWNLOAD_DIR / Path(audio_file).name
if audio_path.exists():
log(f" ✅ 已存在: {audio_file}")
return str(audio_path)
log(f" 📥 下载: {audio_file}")
hf_hub_download(
repo_id=SOURCE_DATASET_ID,
filename=audio_file,
repo_type="dataset",
local_dir=str(DOWNLOAD_DIR),
token=HF_TOKEN,
cache_dir=str(CACHE_DIR)
)
log(f" ✅ 下载完成")
return str(audio_path)
except Exception as e:
log(f" ❌ 下载失败: {str(e)[:100]}", "ERROR")
return None
def load_model():
"""加载说话人分离模型"""
global pipeline_model
try:
log("🔄 加载说话人分离模型...")
log(f" 模型: {MODEL_ID}")
log(f" 设备: {'cuda' if torch.cuda.is_available() else 'cpu'}")
pipeline_model = Pipeline.from_pretrained(
MODEL_ID,
use_auth_token=HF_TOKEN
)
if torch.cuda.is_available():
pipeline_model.to(torch.device("cuda"))
log(" ✅ 模型已加载 (GPU)")
else:
pipeline_model.to(torch.device("cpu"))
log(" ✅ 模型已加载 (CPU)")
update_status('model_loaded', True)
return True
except Exception as e:
error_msg = str(e)
log(f"❌ 模型加载失败: {error_msg}", "ERROR")
log(traceback.format_exc(), "ERROR")
update_status('error', error_msg)
return False
def get_audio_files_from_dataset():
"""从数据集获取音频文件列表"""
try:
api = HfApi(token=HF_TOKEN)
files = api.list_repo_files(
repo_id=SOURCE_DATASET_ID,
repo_type="dataset",
token=HF_TOKEN
)
audio_files = [f for f in files if f.endswith('.wav')]
log(f"📁 找到 {len(audio_files)} 个音频文件")
return sorted(audio_files)
except Exception as e:
log(f"❌ 获取文件列表失败: {str(e)}", "ERROR")
return []
def process_with_timeout(func, *args, timeout=PROCESSING_TIMEOUT, **kwargs):
"""在超时限制内执行函数"""
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, *args, **kwargs)
try:
result = future.result(timeout=timeout)
return result
except FutureTimeoutError:
log(f" ❌ 处理超时(超过 {timeout} 秒)", "ERROR")
future.cancel()
raise TimeoutError(f"Processing timeout after {timeout} seconds")
def process_single_file_internal(audio_file, file_idx, total_files):
"""处理单个音频文件的内部实现"""
global pipeline_model
try:
output_filename = f"{Path(audio_file).stem}_one.wav"
output_path = ONE_AUDIO_DIR / output_filename
if output_path.exists():
log(f"\n{'='*70}")
log(f"[{file_idx}/{total_files}] ⏭️ 跳过(已存在): {audio_file}")
update_status('current_file', audio_file)
update_status('resumed_files', processing_status['resumed_files'] + 1)
file_size = output_path.stat().st_size
if file_size > 0:
log(f" ✅ 文件已存在: {output_filename} ({file_size/1024/1024:.2f} MB)")
return (True, output_filename, 0, "已存在")
else:
log(f" ⚠️ 文件损坏,重新处理", "WARNING")
output_path.unlink()
log(f"\n{'='*70}")
log(f"[{file_idx}/{total_files}] 🎵 处理: {audio_file}")
update_status('current_file', audio_file)
audio_path = download_audio_file(audio_file)
if not audio_path:
log(f" ❌ 下载失败", "ERROR")
return (False, None, 0, None)
log(f" 📊 分析音频...")
audio, sr = librosa.load(audio_path, sr=16000)
audio_duration = len(audio) / sr
update_status('current_file_duration', audio_duration)
log(f" ⏱️ 音频时长: {format_duration(audio_duration)}")
if audio_duration > MAX_AUDIO_DURATION:
log(f" ⏭️ 跳过:超过时长限制 ({MAX_AUDIO_DURATION}秒)", "WARNING")
update_status('skipped_files', processing_status['skipped_files'] + 1)
update_status('total_skipped_duration', processing_status['total_skipped_duration'] + audio_duration)
if Path(audio_path).exists():
Path(audio_path).unlink()
return (False, None, audio_duration, f"超过时长限制 ({format_duration(audio_duration)})")
log(f" 🎯 执行说话人分离...")
log(f" ⏰ 开始时间: {datetime.now().strftime('%H:%M:%S')}")
start_time = time.time()
def heartbeat():
while True:
time.sleep(60)
log(f" 💓 处理中... ({audio_file})")
heartbeat_thread = Thread(target=heartbeat, daemon=True)
heartbeat_thread.start()
diarization = pipeline_model(audio_path)
elapsed = time.time() - start_time
log(f" ✅ 分离完成,耗时: {format_duration(elapsed)}")
if diarization is None:
log(f" ❌ 说话人分离失败", "ERROR")
return (False, None, audio_duration, "分离失败")
log(f" 👥 分析说话人...")
speaker_durations = {}
for turn, _, speaker in diarization.itertracks(yield_label=True):
if speaker not in speaker_durations:
speaker_durations[speaker] = 0.0
speaker_durations[speaker] += (turn.end - turn.start)
if not speaker_durations:
log(f" ⚠️ 未检测到任何说话人", "WARNING")
return (False, None, audio_duration, "未检测到说话人")
main_speaker = max(speaker_durations, key=speaker_durations.get)
main_speaker_duration = speaker_durations[main_speaker]
log(f" 👤 检测到 {len(speaker_durations)} 个说话人")
for spk, dur in sorted(speaker_durations.items(), key=lambda x: -x[1]):
log(f" - {spk}: {format_duration(dur)}")
log(f" ✅ 选择主要说话人: {main_speaker} ({format_duration(main_speaker_duration)})")
one_audio = np.zeros_like(audio)
one_segments = 0
one_duration = 0.0
for turn, _, speaker in diarization.itertracks(yield_label=True):
if speaker == main_speaker:
start_sample = int(turn.start * sr)
end_sample = int(turn.end * sr)
segment_duration = turn.end - turn.start
one_audio[start_sample:end_sample] = audio[start_sample:end_sample]
one_segments += 1
one_duration += segment_duration
sf.write(output_path, one_audio, sr)
log(f" ✅ 提取成功: {one_segments} 个片段, {one_duration:.1f}秒 ({one_duration*100/audio_duration:.1f}% of audio)")
upload_success = upload_single_file(output_path)
update_status('processed_files', file_idx)
update_status('output_files', processing_status['output_files'] + [output_filename])
if processing_status['start_time']:
elapsed_total = (datetime.now() - datetime.strptime(processing_status['start_time'], "%Y-%m-%d %H:%M:%S")).total_seconds()
avg_time = elapsed_total / file_idx
update_status('average_processing_time', avg_time)
remaining_files = total_files - file_idx
eta_seconds = avg_time * remaining_files
eta_datetime = datetime.now() + timedelta(seconds=eta_seconds)
update_status('estimated_completion', eta_datetime.strftime("%Y-%m-%d %H:%M:%S"))
log(f" 📊 进度: {file_idx}/{total_files} ({file_idx*100//total_files}%)")
log(f" ⏱️ 平均处理时间: {format_duration(avg_time)}/文件")
log(f" 🎯 预计完成: {processing_status['estimated_completion']}")
if audio_path and Path(audio_path).exists():
Path(audio_path).unlink()
return (True, output_filename, one_duration, None)
except Exception as e:
log(f" ❌ 处理失败: {str(e)[:100]}", "ERROR")
log(traceback.format_exc(), "ERROR")
return (False, None, 0, str(e)[:100])
def process_single_file(audio_file, file_idx, total_files):
"""处理单个音频文件(带超时保护)"""
try:
result = process_with_timeout(
process_single_file_internal,
audio_file, file_idx, total_files,
timeout=PROCESSING_TIMEOUT
)
return result
except TimeoutError as e:
log(f" ❌ 处理超时: {str(e)}", "ERROR")
update_status('skipped_files', processing_status['skipped_files'] + 1)
return (False, None, 0, "处理超时")
except Exception as e:
log(f" ❌ 处理异常: {str(e)[:100]}", "ERROR")
log(traceback.format_exc(), "ERROR")
return (False, None, 0, str(e)[:100])
def process_all_files():
"""顺序处理所有文件"""
try:
update_status('state', 'running')
update_status('start_time', datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
log("="*70)
log("🚀 开始处理音频文件(修复版 v2 + NumPy 兼容性修复)")
log(f"⚡ 音频时长限制: {format_duration(MAX_AUDIO_DURATION)}")
log(f"⏰ 处理超时: {format_duration(PROCESSING_TIMEOUT)}")
log(f"🔧 并行进程数: {NUM_PROCESSES}")
log("="*70)
if not load_model():
update_status('state', 'error')
update_status('error', 'Failed to load model')
return
audio_files = get_audio_files_from_dataset()
update_status('total_files', len(audio_files))
if not audio_files:
log("❌ 没有找到音频文件", "ERROR")
update_status('state', 'error')
update_status('error', 'No audio files found')
return
existing_files = set([f.name for f in ONE_AUDIO_DIR.glob("*.wav")])
if existing_files:
log(f"🔄 断点续传: 发现 {len(existing_files)} 个已处理文件")
results = []
skipped = []
total_one_duration = 0.0
for idx, audio_file in enumerate(audio_files, 1):
success, filename, duration, skip_reason = process_single_file(audio_file, idx, len(audio_files))
if success:
results.append({
'source': audio_file,
'output': filename,
'duration': duration
})
total_one_duration += duration
elif skip_reason:
skipped.append({
'source': audio_file,
'reason': skip_reason,
'duration': duration
})
log("\n" + "="*70)
log("📊 处理完成")
log("="*70)
log(f"总文件数: {len(audio_files)}")
log(f"成功提取: {len(results)}")
log(f"断点续传: {processing_status['resumed_files']}")
log(f"跳过文件: {len(skipped)}")
log(f"失败: {len(audio_files) - len(results) - len(skipped)}")
log(f"One 音频总时长: {total_one_duration / 3600:.2f} 小时")
if skipped:
log(f"\n⏭️ 跳过的文件:")
for s in skipped[:10]:
log(f" - {s['source']}: {s['reason']}")
if len(skipped) > 10:
log(f" ... 还有 {len(skipped) - 10} 个文件")
log("="*70)
report = {
'total_files': len(audio_files),
'successful': len(results),
'resumed': processing_status['resumed_files'],
'skipped': len(skipped),
'failed': len(audio_files) - len(results) - len(skipped),
'total_one_audio_hours': total_one_duration / 3600,
'output_files': [r['output'] for r in results],
'skipped_files': skipped,
'details': results,
'start_time': processing_status['start_time'],
'end_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'configuration': {
'max_audio_duration': MAX_AUDIO_DURATION,
'processing_timeout': PROCESSING_TIMEOUT,
'num_processes': NUM_PROCESSES
}
}
with open(REPORT_FILE, 'w') as f:
json.dump(report, f, indent=2)
try:
api = HfApi(token=HF_TOKEN)
upload_file(
path_or_fileobj=str(REPORT_FILE),
repo_id=TARGET_DATASET_ID,
path_in_repo="processing_report.json",
repo_type="dataset",
token=HF_TOKEN
)
log("✅ 处理报告已上传")
except Exception as e:
log(f"处理报告上传失败: {e}", "ERROR")
update_status('state', 'completed')
except Exception as e:
log(f"❌ 处理异常: {e}", "ERROR")
log(traceback.format_exc(), "ERROR")
update_status('state', 'error')
update_status('error', str(e))
# ========== Flask 应用 ==========
app = Flask(__name__)
@app.route('/')
def index():
"""主页"""
return jsonify({
'message': 'Numberblocks One Voice Extraction (修复版 v2)',
'fix_version': 'v2 - NumPy 兼容性修复',
'configuration': {
'max_audio_duration': MAX_AUDIO_DURATION,
'processing_timeout': PROCESSING_TIMEOUT,
'num_processes': NUM_PROCESSES
},
'status': processing_status,
'endpoints': {
'/status': 'Get processing status',
'/files': 'List output files',
'/report': 'Get processing report',
'/diagnose': '🔧 新增:诊断信息'
}
})
@app.route('/status')
def status():
"""获取处理状态"""
return jsonify(processing_status)
@app.route('/files')
def list_files():
"""列出输出文件"""
try:
files = sorted([f.name for f in ONE_AUDIO_DIR.glob("*.wav")])
return jsonify({
'count': len(files),
'files': files
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/report')
def get_report():
"""获取处理报告"""
try:
if REPORT_FILE.exists():
with open(REPORT_FILE, 'r') as f:
report = json.load(f)
return jsonify(report)
else:
return jsonify({'error': 'Report not found'}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/health')
def health():
"""健康检查"""
return jsonify({'status': 'ok'}), 200
# 🔧 新增:诊断端点
@app.route('/diagnose')
def diagnose():
"""诊断信息"""
import sys
diagnostics = {
'python_version': sys.version,
'numpy_version': getattr(np, '__version__', 'unknown'),
'numpy_has_NaN': hasattr(np, 'NaN'),
'numpy_has_nan': hasattr(np, 'nan'),
'torch_version': getattr(torch, '__version__', 'unknown' if 'torch' in sys.modules else 'not_imported'),
'cuda_available': torch.cuda.is_available() if 'torch' in sys.modules else False,
'hf_token_set': bool(HF_TOKEN),
'hf_token_length': len(HF_TOKEN) if HF_TOKEN else 0,
'startup_error': processing_status.get('startup_error'),
'model_loaded': processing_status.get('model_loaded'),
'current_state': processing_status.get('state'),
'directories': {
'OUTPUT_DIR': str(OUTPUT_DIR),
'ONE_AUDIO_DIR': str(ONE_AUDIO_DIR),
'CACHE_DIR': str(CACHE_DIR),
'DOWNLOAD_DIR': str(DOWNLOAD_DIR),
},
'permissions': {
'OUTPUT_DIR_writable': os.access(OUTPUT_DIR, os.W_OK),
'CACHE_DIR_writable': os.access(CACHE_DIR, os.W_OK),
}
}
return jsonify(diagnostics)
def start_background_processing():
"""在后台线程中启动处理任务"""
def run_processing():
try:
process_all_files()
except Exception as e:
log(f"❌ 后台处理异常: {e}", "ERROR")
update_status('state', 'error')
update_status('error', str(e))
thread = Thread(target=run_processing, daemon=True)
thread.start()
log("✅ 后台处理线程已启动")
# ========== 主程序 ==========
if __name__ == '__main__':
log("="*70)
log("Numberblocks One Voice Extraction - 修复版 v2")
log("="*70)
log(f"源数据集: {SOURCE_DATASET_ID}")
log(f"目标数据集: {TARGET_DATASET_ID}")
log(f"输出目录: {ONE_AUDIO_DIR}")
log(f"🔧 主要修复: NumPy 2.x 兼容性 + 更好的错误处理")
log("="*70)
# 检查是否有启动错误
if import_error:
log(f"⚠️ 启动时发现导入错误: {import_error}", "ERROR")
log("⚠️ 模型加载可能会失败", "WARNING")
# 启动时自动开始处理
start_background_processing()
# 启动 Flask 服务器
log("✅ Flask 服务器启动中...")
app.run(host='0.0.0.0', port=7860)