|
|
| """
|
| 性能基准测试脚本
|
| 测试模型训练和推理性能
|
| """
|
|
|
| import os
|
| import sys
|
| import time
|
| import argparse
|
| import yaml
|
| import torch
|
| import torch.nn as nn
|
| from torch.utils.data import DataLoader
|
| import numpy as np
|
| from tqdm import tqdm
|
|
|
|
|
| sys.path.append(os.path.dirname(os.path.dirname(__file__)))
|
|
|
| from src.models.unet_light import UNetLight
|
| from src.models.diffusion import DiffusionProcess
|
| from src.data.dataset import create_data_loaders
|
| from src.inference.optimization import InferenceBenchmark
|
| from src.inference.sampler import DDIMSampler
|
|
|
|
|
| def load_config(config_path: str) -> dict:
|
| """加载配置文件"""
|
| with open(config_path, 'r') as f:
|
| config = yaml.safe_load(f)
|
| return config
|
|
|
|
|
| def benchmark_training(config: dict):
|
| """训练性能基准测试"""
|
| print("=" * 60)
|
| print("训练性能基准测试")
|
| print("=" * 60)
|
|
|
|
|
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
|
|
|
| model_config = load_config('configs/model/unet_light.yaml')
|
|
|
|
|
| model = UNetLight(model_config).to(device)
|
|
|
|
|
| diffusion_config = load_config('configs/model/diffusion.yaml')
|
| diffusion = DiffusionProcess(diffusion_config)
|
|
|
|
|
| data_config = load_config('configs/data/laion_filtered.yaml')
|
| train_loader, _ = create_data_loaders(data_config)
|
|
|
|
|
| optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
|
|
|
|
|
| print("预热...")
|
| warmup_batches = 5
|
| for i, batch in enumerate(train_loader):
|
| if i >= warmup_batches:
|
| break
|
|
|
| images = batch['images'].to(device)
|
| text_embeddings = batch['text_embeddings'].to(device)
|
|
|
|
|
| loss = diffusion.compute_loss(model, images, text_embeddings)
|
|
|
|
|
| loss.backward()
|
| optimizer.zero_grad()
|
|
|
|
|
| if torch.cuda.is_available():
|
| torch.cuda.synchronize()
|
|
|
|
|
| print("运行训练基准测试...")
|
| num_batches = 20
|
| batch_times = []
|
| memory_usage = []
|
|
|
| model.train()
|
|
|
| for i, batch in enumerate(train_loader):
|
| if i >= num_batches:
|
| break
|
|
|
| images = batch['images'].to(device)
|
| text_embeddings = batch['text_embeddings'].to(device)
|
|
|
|
|
| start_time = time.time()
|
|
|
|
|
| loss = diffusion.compute_loss(model, images, text_embeddings)
|
|
|
|
|
| loss.backward()
|
| optimizer.step()
|
| optimizer.zero_grad()
|
|
|
|
|
| if torch.cuda.is_available():
|
| torch.cuda.synchronize()
|
|
|
|
|
| end_time = time.time()
|
| batch_time = end_time - start_time
|
| batch_times.append(batch_time)
|
|
|
|
|
| if torch.cuda.is_available():
|
| memory_allocated = torch.cuda.memory_allocated() / 1024**3
|
| memory_usage.append(memory_allocated)
|
|
|
|
|
| print(f"批次 {i+1}/{num_batches}: {batch_time:.3f}s")
|
|
|
|
|
| batch_times = np.array(batch_times)
|
|
|
| print("\n" + "=" * 60)
|
| print("训练基准测试结果:")
|
| print(f" 平均批次时间: {batch_times.mean():.3f} ± {batch_times.std():.3f} s")
|
| print(f" 最小批次时间: {batch_times.min():.3f} s")
|
| print(f" 最大批次时间: {batch_times.max():.3f} s")
|
| print(f" 吞吐量: {1 / batch_times.mean():.2f} batches/s")
|
|
|
| if memory_usage:
|
| memory_usage = np.array(memory_usage)
|
| print(f" 平均GPU内存使用: {memory_usage.mean():.2f} ± {memory_usage.std():.2f} GB")
|
|
|
| print("=" * 60)
|
|
|
| return batch_times.mean()
|
|
|
|
|
| def benchmark_inference(config: dict):
|
| """推理性能基准测试"""
|
| print("\n" + "=" * 60)
|
| print("推理性能基准测试")
|
| print("=" * 60)
|
|
|
|
|
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
|
|
|
| model_config = load_config('configs/model/unet_light.yaml')
|
|
|
|
|
| model = UNetLight(model_config).to(device)
|
| model.eval()
|
|
|
|
|
| diffusion_config = load_config('configs/model/diffusion.yaml')
|
| diffusion = DiffusionProcess(diffusion_config)
|
|
|
|
|
| benchmark = InferenceBenchmark(model, device)
|
|
|
|
|
| resolutions = [(256, 256), (512, 512), (768, 768)]
|
| results = {}
|
|
|
| for height, width in resolutions:
|
| print(f"\n测试分辨率: {width}x{height}")
|
|
|
|
|
| latent_height = height // 8
|
| latent_width = width // 8
|
|
|
|
|
| stats = benchmark.benchmark(
|
| input_shape=(1, model.in_channels, latent_height, latent_width),
|
| num_iterations=10,
|
| warmup_iterations=3
|
| )
|
|
|
| results[f"{width}x{height}"] = stats
|
|
|
|
|
| print("\n" + "=" * 60)
|
| print("推理基准测试总结:")
|
|
|
| for resolution, stats in results.items():
|
| print(f"\n 分辨率 {resolution}:")
|
| print(f" 平均时间: {stats['mean_ms']:.1f} ms")
|
| print(f" FPS: {stats['fps']:.1f}")
|
|
|
| print("=" * 60)
|
|
|
| return results
|
|
|
|
|
| def benchmark_sampling(config: dict):
|
| """采样性能基准测试"""
|
| print("\n" + "=" * 60)
|
| print("采样性能基准测试")
|
| print("=" * 60)
|
|
|
|
|
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
|
|
|
| model_config = load_config('configs/model/unet_light.yaml')
|
|
|
|
|
| model = UNetLight(model_config).to(device)
|
| model.eval()
|
|
|
|
|
| diffusion_config = load_config('configs/model/diffusion.yaml')
|
| diffusion = DiffusionProcess(diffusion_config)
|
|
|
|
|
| sampler = DDIMSampler(model, diffusion, num_inference_steps=50)
|
|
|
|
|
| step_configs = [20, 30, 50]
|
| results = {}
|
|
|
| for num_steps in step_configs:
|
| print(f"\n测试采样步数: {num_steps}")
|
|
|
|
|
| sampler.set_timesteps(num_steps)
|
|
|
|
|
| prompt_embeds = torch.randn(1, 77, 768, device=device)
|
|
|
|
|
| print(" 预热...")
|
| with torch.no_grad():
|
| for _ in range(3):
|
| _ = sampler.sample(
|
| prompt_embeds=prompt_embeds,
|
| height=512,
|
| width=512,
|
| progress_bar=False
|
| )
|
|
|
|
|
| print(" 运行基准测试...")
|
| times = []
|
|
|
| for i in range(5):
|
| start_time = time.time()
|
|
|
| with torch.no_grad():
|
| _ = sampler.sample(
|
| prompt_embeds=prompt_embeds,
|
| height=512,
|
| width=512,
|
| progress_bar=False
|
| )
|
|
|
| if torch.cuda.is_available():
|
| torch.cuda.synchronize()
|
|
|
| end_time = time.time()
|
| times.append(end_time - start_time)
|
|
|
| print(f" 迭代 {i+1}: {times[-1]:.2f}s")
|
|
|
|
|
| times = np.array(times)
|
|
|
| results[num_steps] = {
|
| 'mean_time': times.mean(),
|
| 'std_time': times.std(),
|
| 'fps': 1 / times.mean()
|
| }
|
|
|
|
|
| print("\n" + "=" * 60)
|
| print("采样基准测试总结:")
|
|
|
| for num_steps, stats in results.items():
|
| print(f"\n 采样步数 {num_steps}:")
|
| print(f" 平均时间: {stats['mean_time']:.2f} ± {stats['std_time']:.2f} s")
|
| print(f" FPS: {stats['fps']:.2f}")
|
|
|
| print("=" * 60)
|
|
|
| return results
|
|
|
|
|
| def benchmark_memory(config: dict):
|
| """内存使用基准测试"""
|
| print("\n" + "=" * 60)
|
| print("内存使用基准测试")
|
| print("=" * 60)
|
|
|
| if not torch.cuda.is_available():
|
| print("GPU不可用,跳过内存基准测试")
|
| return {}
|
|
|
|
|
| device = torch.device('cuda')
|
|
|
|
|
| model_config = load_config('configs/model/unet_light.yaml')
|
|
|
|
|
| batch_sizes = [1, 2, 4, 8]
|
| results = {}
|
|
|
| for batch_size in batch_sizes:
|
| print(f"\n测试批次大小: {batch_size}")
|
|
|
|
|
| model = UNetLight(model_config).to(device)
|
| model.eval()
|
|
|
|
|
| input_shape = (batch_size, model.in_channels, 64, 64)
|
| x = torch.randn(*input_shape, device=device)
|
| t = torch.tensor([500] * batch_size, device=device)
|
| context = torch.randn(batch_size, 77, 768, device=device)
|
|
|
|
|
| torch.cuda.empty_cache()
|
|
|
|
|
| initial_memory = torch.cuda.memory_allocated()
|
|
|
|
|
| with torch.no_grad():
|
| _ = model(x, t, context)
|
|
|
|
|
| peak_memory = torch.cuda.max_memory_allocated()
|
| current_memory = torch.cuda.memory_allocated()
|
|
|
|
|
| memory_used = peak_memory - initial_memory
|
|
|
| results[batch_size] = {
|
| 'initial_memory_gb': initial_memory / 1024**3,
|
| 'peak_memory_gb': peak_memory / 1024**3,
|
| 'current_memory_gb': current_memory / 1024**3,
|
| 'memory_used_gb': memory_used / 1024**3,
|
| 'memory_per_sample_gb': memory_used / (batch_size * 1024**3)
|
| }
|
|
|
| print(f" 初始内存: {initial_memory / 1024**3:.2f} GB")
|
| print(f" 峰值内存: {peak_memory / 1024**3:.2f} GB")
|
| print(f" 当前内存: {current_memory / 1024**3:.2f} GB")
|
| print(f" 内存使用: {memory_used / 1024**3:.2f} GB")
|
| print(f" 每样本内存: {memory_used / (batch_size * 1024**3):.2f} GB")
|
|
|
|
|
| del model
|
| torch.cuda.empty_cache()
|
|
|
|
|
| print("\n" + "=" * 60)
|
| print("内存基准测试总结:")
|
|
|
| for batch_size, stats in results.items():
|
| print(f"\n 批次大小 {batch_size}:")
|
| print(f" 总内存使用: {stats['memory_used_gb']:.2f} GB")
|
| print(f" 每样本内存: {stats['memory_per_sample_gb']:.2f} GB")
|
|
|
| print("=" * 60)
|
|
|
| return results
|
|
|
|
|
| def generate_report(results: dict, output_file: str = "benchmark_report.md"):
|
| """生成基准测试报告"""
|
| print(f"\n生成报告: {output_file}")
|
|
|
| with open(output_file, 'w') as f:
|
| f.write("# Lumina 性能基准测试报告\n\n")
|
| f.write(f"生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n")
|
|
|
| f.write("## 系统信息\n")
|
| f.write(f"- PyTorch版本: {torch.__version__}\n")
|
| f.write(f"- CUDA可用: {torch.cuda.is_available()}\n")
|
| if torch.cuda.is_available():
|
| f.write(f"- GPU: {torch.cuda.get_device_name(0)}\n")
|
| f.write(f"- CUDA版本: {torch.version.cuda}\n")
|
| f.write(f"- 系统内存: {os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') / 1024**3:.1f} GB\n\n")
|
|
|
| if 'training' in results:
|
| f.write("## 训练性能\n")
|
| f.write(f"- 平均批次时间: {results['training']:.3f} s\n")
|
| f.write(f"- 吞吐量: {1/results['training']:.2f} batches/s\n\n")
|
|
|
| if 'inference' in results:
|
| f.write("## 推理性能\n")
|
| for resolution, stats in results['inference'].items():
|
| f.write(f"### 分辨率 {resolution}\n")
|
| f.write(f"- 平均推理时间: {stats['mean_ms']:.1f} ms\n")
|
| f.write(f"- FPS: {stats['fps']:.1f}\n\n")
|
|
|
| if 'sampling' in results:
|
| f.write("## 采样性能\n")
|
| for num_steps, stats in results['sampling'].items():
|
| f.write(f"### 采样步数 {num_steps}\n")
|
| f.write(f"- 平均采样时间: {stats['mean_time']:.2f} s\n")
|
| f.write(f"- FPS: {stats['fps']:.2f}\n\n")
|
|
|
| if 'memory' in results:
|
| f.write("## 内存使用\n")
|
| for batch_size, stats in results['memory'].items():
|
| f.write(f"### 批次大小 {batch_size}\n")
|
| f.write(f"- 总内存使用: {stats['memory_used_gb']:.2f} GB\n")
|
| f.write(f"- 每样本内存: {stats['memory_per_sample_gb']:.2f} GB\n\n")
|
|
|
| f.write("## 建议\n")
|
| f.write("1. 根据GPU内存选择适当的批次大小\n")
|
| f.write("2. 推理时使用适当的采样步数平衡质量和速度\n")
|
| f.write("3. 训练时使用梯度累积来模拟大批次训练\n")
|
|
|
| print(f"报告已保存: {output_file}")
|
|
|
|
|
| def main():
|
| """主函数"""
|
| parser = argparse.ArgumentParser(description="Lumina性能基准测试")
|
|
|
| parser.add_argument(
|
| "--config",
|
| type=str,
|
| default="configs/training/p4_optimized.yaml",
|
| help="配置文件路径"
|
| )
|
|
|
| parser.add_argument(
|
| "--test",
|
| type=str,
|
| nargs="+",
|
| default=['all'],
|
| choices=['training', 'inference', 'sampling', 'memory', 'all'],
|
| help="测试项目"
|
| )
|
|
|
| parser.add_argument(
|
| "--output",
|
| type=str,
|
| default="benchmark_report.md",
|
| help="输出报告文件"
|
| )
|
|
|
| args = parser.parse_args()
|
|
|
|
|
| config = load_config(args.config)
|
|
|
|
|
| results = {}
|
|
|
| if 'all' in args.test or 'training' in args.test:
|
| try:
|
| results['training'] = benchmark_training(config)
|
| except Exception as e:
|
| print(f"训练基准测试失败: {e}")
|
|
|
| if 'all' in args.test or 'inference' in args.test:
|
| try:
|
| results['inference'] = benchmark_inference(config)
|
| except Exception as e:
|
| print(f"推理基准测试失败: {e}")
|
|
|
| if 'all' in args.test or 'sampling' in args.test:
|
| try:
|
| results['sampling'] = benchmark_sampling(config)
|
| except Exception as e:
|
| print(f"采样基准测试失败: {e}")
|
|
|
| if 'all' in args.test or 'memory' in args.test:
|
| try:
|
| results['memory'] = benchmark_memory(config)
|
| except Exception as e:
|
| print(f"内存基准测试失败: {e}")
|
|
|
|
|
| generate_report(results, args.output)
|
|
|
|
|
| if __name__ == "__main__":
|
| main() |