#!/usr/bin/env python3 """ 性能基准测试脚本 测试模型训练和推理性能 """ import os import sys import time import argparse import yaml import torch import torch.nn as nn from torch.utils.data import DataLoader import numpy as np from tqdm import tqdm # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.dirname(__file__))) from src.models.unet_light import UNetLight from src.models.diffusion import DiffusionProcess from src.data.dataset import create_data_loaders from src.inference.optimization import InferenceBenchmark from src.inference.sampler import DDIMSampler def load_config(config_path: str) -> dict: """加载配置文件""" with open(config_path, 'r') as f: config = yaml.safe_load(f) return config def benchmark_training(config: dict): """训练性能基准测试""" print("=" * 60) print("训练性能基准测试") print("=" * 60) # 设备 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 加载模型配置 model_config = load_config('configs/model/unet_light.yaml') # 创建模型 model = UNetLight(model_config).to(device) # 创建扩散过程 diffusion_config = load_config('configs/model/diffusion.yaml') diffusion = DiffusionProcess(diffusion_config) # 创建数据加载器 data_config = load_config('configs/data/laion_filtered.yaml') train_loader, _ = create_data_loaders(data_config) # 优化器 optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4) # 预热 print("预热...") warmup_batches = 5 for i, batch in enumerate(train_loader): if i >= warmup_batches: break images = batch['images'].to(device) text_embeddings = batch['text_embeddings'].to(device) # 前向传播 loss = diffusion.compute_loss(model, images, text_embeddings) # 反向传播 loss.backward() optimizer.zero_grad() # 同步 if torch.cuda.is_available(): torch.cuda.synchronize() # 基准测试 print("运行训练基准测试...") num_batches = 20 batch_times = [] memory_usage = [] model.train() for i, batch in enumerate(train_loader): if i >= num_batches: break images = batch['images'].to(device) text_embeddings = batch['text_embeddings'].to(device) # 开始计时 start_time = time.time() # 前向传播 loss = diffusion.compute_loss(model, images, text_embeddings) # 反向传播 loss.backward() optimizer.step() optimizer.zero_grad() # 同步 if torch.cuda.is_available(): torch.cuda.synchronize() # 结束计时 end_time = time.time() batch_time = end_time - start_time batch_times.append(batch_time) # 记录内存使用 if torch.cuda.is_available(): memory_allocated = torch.cuda.memory_allocated() / 1024**3 memory_usage.append(memory_allocated) # 进度 print(f"批次 {i+1}/{num_batches}: {batch_time:.3f}s") # 统计 batch_times = np.array(batch_times) print("\n" + "=" * 60) print("训练基准测试结果:") print(f" 平均批次时间: {batch_times.mean():.3f} ± {batch_times.std():.3f} s") print(f" 最小批次时间: {batch_times.min():.3f} s") print(f" 最大批次时间: {batch_times.max():.3f} s") print(f" 吞吐量: {1 / batch_times.mean():.2f} batches/s") if memory_usage: memory_usage = np.array(memory_usage) print(f" 平均GPU内存使用: {memory_usage.mean():.2f} ± {memory_usage.std():.2f} GB") print("=" * 60) return batch_times.mean() def benchmark_inference(config: dict): """推理性能基准测试""" print("\n" + "=" * 60) print("推理性能基准测试") print("=" * 60) # 设备 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 加载模型配置 model_config = load_config('configs/model/unet_light.yaml') # 创建模型 model = UNetLight(model_config).to(device) model.eval() # 创建扩散过程 diffusion_config = load_config('configs/model/diffusion.yaml') diffusion = DiffusionProcess(diffusion_config) # 创建基准测试器 benchmark = InferenceBenchmark(model, device) # 测试不同分辨率 resolutions = [(256, 256), (512, 512), (768, 768)] results = {} for height, width in resolutions: print(f"\n测试分辨率: {width}x{height}") # 潜在空间大小 latent_height = height // 8 latent_width = width // 8 # 运行基准测试 stats = benchmark.benchmark( input_shape=(1, model.in_channels, latent_height, latent_width), num_iterations=10, warmup_iterations=3 ) results[f"{width}x{height}"] = stats # 打印总结 print("\n" + "=" * 60) print("推理基准测试总结:") for resolution, stats in results.items(): print(f"\n 分辨率 {resolution}:") print(f" 平均时间: {stats['mean_ms']:.1f} ms") print(f" FPS: {stats['fps']:.1f}") print("=" * 60) return results def benchmark_sampling(config: dict): """采样性能基准测试""" print("\n" + "=" * 60) print("采样性能基准测试") print("=" * 60) # 设备 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 加载模型配置 model_config = load_config('configs/model/unet_light.yaml') # 创建模型 model = UNetLight(model_config).to(device) model.eval() # 创建扩散过程 diffusion_config = load_config('configs/model/diffusion.yaml') diffusion = DiffusionProcess(diffusion_config) # 创建采样器 sampler = DDIMSampler(model, diffusion, num_inference_steps=50) # 测试不同采样步数 step_configs = [20, 30, 50] results = {} for num_steps in step_configs: print(f"\n测试采样步数: {num_steps}") # 设置采样步数 sampler.set_timesteps(num_steps) # 准备输入 prompt_embeds = torch.randn(1, 77, 768, device=device) # 预热 print(" 预热...") with torch.no_grad(): for _ in range(3): _ = sampler.sample( prompt_embeds=prompt_embeds, height=512, width=512, progress_bar=False ) # 基准测试 print(" 运行基准测试...") times = [] for i in range(5): start_time = time.time() with torch.no_grad(): _ = sampler.sample( prompt_embeds=prompt_embeds, height=512, width=512, progress_bar=False ) if torch.cuda.is_available(): torch.cuda.synchronize() end_time = time.time() times.append(end_time - start_time) print(f" 迭代 {i+1}: {times[-1]:.2f}s") # 统计 times = np.array(times) results[num_steps] = { 'mean_time': times.mean(), 'std_time': times.std(), 'fps': 1 / times.mean() } # 打印总结 print("\n" + "=" * 60) print("采样基准测试总结:") for num_steps, stats in results.items(): print(f"\n 采样步数 {num_steps}:") print(f" 平均时间: {stats['mean_time']:.2f} ± {stats['std_time']:.2f} s") print(f" FPS: {stats['fps']:.2f}") print("=" * 60) return results def benchmark_memory(config: dict): """内存使用基准测试""" print("\n" + "=" * 60) print("内存使用基准测试") print("=" * 60) if not torch.cuda.is_available(): print("GPU不可用,跳过内存基准测试") return {} # 设备 device = torch.device('cuda') # 加载模型配置 model_config = load_config('configs/model/unet_light.yaml') # 测试不同批次大小 batch_sizes = [1, 2, 4, 8] results = {} for batch_size in batch_sizes: print(f"\n测试批次大小: {batch_size}") # 创建模型 model = UNetLight(model_config).to(device) model.eval() # 准备输入 input_shape = (batch_size, model.in_channels, 64, 64) x = torch.randn(*input_shape, device=device) t = torch.tensor([500] * batch_size, device=device) context = torch.randn(batch_size, 77, 768, device=device) # 清空缓存 torch.cuda.empty_cache() # 记录初始内存 initial_memory = torch.cuda.memory_allocated() # 前向传播 with torch.no_grad(): _ = model(x, t, context) # 记录峰值内存 peak_memory = torch.cuda.max_memory_allocated() current_memory = torch.cuda.memory_allocated() # 计算内存使用 memory_used = peak_memory - initial_memory results[batch_size] = { 'initial_memory_gb': initial_memory / 1024**3, 'peak_memory_gb': peak_memory / 1024**3, 'current_memory_gb': current_memory / 1024**3, 'memory_used_gb': memory_used / 1024**3, 'memory_per_sample_gb': memory_used / (batch_size * 1024**3) } print(f" 初始内存: {initial_memory / 1024**3:.2f} GB") print(f" 峰值内存: {peak_memory / 1024**3:.2f} GB") print(f" 当前内存: {current_memory / 1024**3:.2f} GB") print(f" 内存使用: {memory_used / 1024**3:.2f} GB") print(f" 每样本内存: {memory_used / (batch_size * 1024**3):.2f} GB") # 清理 del model torch.cuda.empty_cache() # 打印总结 print("\n" + "=" * 60) print("内存基准测试总结:") for batch_size, stats in results.items(): print(f"\n 批次大小 {batch_size}:") print(f" 总内存使用: {stats['memory_used_gb']:.2f} GB") print(f" 每样本内存: {stats['memory_per_sample_gb']:.2f} GB") print("=" * 60) return results def generate_report(results: dict, output_file: str = "benchmark_report.md"): """生成基准测试报告""" print(f"\n生成报告: {output_file}") with open(output_file, 'w') as f: f.write("# Lumina 性能基准测试报告\n\n") f.write(f"生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write("## 系统信息\n") f.write(f"- PyTorch版本: {torch.__version__}\n") f.write(f"- CUDA可用: {torch.cuda.is_available()}\n") if torch.cuda.is_available(): f.write(f"- GPU: {torch.cuda.get_device_name(0)}\n") f.write(f"- CUDA版本: {torch.version.cuda}\n") f.write(f"- 系统内存: {os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') / 1024**3:.1f} GB\n\n") if 'training' in results: f.write("## 训练性能\n") f.write(f"- 平均批次时间: {results['training']:.3f} s\n") f.write(f"- 吞吐量: {1/results['training']:.2f} batches/s\n\n") if 'inference' in results: f.write("## 推理性能\n") for resolution, stats in results['inference'].items(): f.write(f"### 分辨率 {resolution}\n") f.write(f"- 平均推理时间: {stats['mean_ms']:.1f} ms\n") f.write(f"- FPS: {stats['fps']:.1f}\n\n") if 'sampling' in results: f.write("## 采样性能\n") for num_steps, stats in results['sampling'].items(): f.write(f"### 采样步数 {num_steps}\n") f.write(f"- 平均采样时间: {stats['mean_time']:.2f} s\n") f.write(f"- FPS: {stats['fps']:.2f}\n\n") if 'memory' in results: f.write("## 内存使用\n") for batch_size, stats in results['memory'].items(): f.write(f"### 批次大小 {batch_size}\n") f.write(f"- 总内存使用: {stats['memory_used_gb']:.2f} GB\n") f.write(f"- 每样本内存: {stats['memory_per_sample_gb']:.2f} GB\n\n") f.write("## 建议\n") f.write("1. 根据GPU内存选择适当的批次大小\n") f.write("2. 推理时使用适当的采样步数平衡质量和速度\n") f.write("3. 训练时使用梯度累积来模拟大批次训练\n") print(f"报告已保存: {output_file}") def main(): """主函数""" parser = argparse.ArgumentParser(description="Lumina性能基准测试") parser.add_argument( "--config", type=str, default="configs/training/p4_optimized.yaml", help="配置文件路径" ) parser.add_argument( "--test", type=str, nargs="+", default=['all'], choices=['training', 'inference', 'sampling', 'memory', 'all'], help="测试项目" ) parser.add_argument( "--output", type=str, default="benchmark_report.md", help="输出报告文件" ) args = parser.parse_args() # 加载配置 config = load_config(args.config) # 运行基准测试 results = {} if 'all' in args.test or 'training' in args.test: try: results['training'] = benchmark_training(config) except Exception as e: print(f"训练基准测试失败: {e}") if 'all' in args.test or 'inference' in args.test: try: results['inference'] = benchmark_inference(config) except Exception as e: print(f"推理基准测试失败: {e}") if 'all' in args.test or 'sampling' in args.test: try: results['sampling'] = benchmark_sampling(config) except Exception as e: print(f"采样基准测试失败: {e}") if 'all' in args.test or 'memory' in args.test: try: results['memory'] = benchmark_memory(config) except Exception as e: print(f"内存基准测试失败: {e}") # 生成报告 generate_report(results, args.output) if __name__ == "__main__": main()