| import importlib |
| import os |
| import numpy as np |
| import cv2 |
| import torch |
| import torch.distributed as dist |
| import torchvision |
|
|
|
|
| def count_params(model, verbose=False): |
| total_params = sum(p.numel() for p in model.parameters()) |
| if verbose: |
| print(f"{model.__class__.__name__} has {total_params*1.e-6:.2f} M params.") |
| return total_params |
|
|
|
|
| def check_istarget(name, para_list): |
| """ |
| name: full name of source para |
| para_list: partial name of target para |
| """ |
| istarget = False |
| for para in para_list: |
| if para in name: |
| return True |
| return istarget |
|
|
|
|
| def instantiate_from_config(config): |
| if not "target" in config: |
| if config == "__is_first_stage__": |
| return None |
| elif config == "__is_unconditional__": |
| return None |
| raise KeyError("Expected key `target` to instantiate.") |
| return get_obj_from_str(config["target"])(**config.get("params", dict())) |
|
|
|
|
| def get_obj_from_str(string, reload=False): |
| module, cls = string.rsplit(".", 1) |
| if reload: |
| module_imp = importlib.import_module(module) |
| importlib.reload(module_imp) |
| return getattr(importlib.import_module(module, package=None), cls) |
|
|
|
|
| def load_npz_from_dir(data_dir): |
| data = [ |
| np.load(os.path.join(data_dir, data_name))["arr_0"] |
| for data_name in os.listdir(data_dir) |
| ] |
| data = np.concatenate(data, axis=0) |
| return data |
|
|
|
|
| def load_npz_from_paths(data_paths): |
| data = [np.load(data_path)["arr_0"] for data_path in data_paths] |
| data = np.concatenate(data, axis=0) |
| return data |
|
|
|
|
| def resize_numpy_image(image, max_resolution=512 * 512, resize_short_edge=None): |
| h, w = image.shape[:2] |
| if resize_short_edge is not None: |
| k = resize_short_edge / min(h, w) |
| else: |
| k = max_resolution / (h * w) |
| k = k**0.5 |
| h = int(np.round(h * k / 64)) * 64 |
| w = int(np.round(w * k / 64)) * 64 |
| image = cv2.resize(image, (w, h), interpolation=cv2.INTER_LANCZOS4) |
| return image |
|
|
|
|
| def setup_dist(args): |
| if dist.is_initialized(): |
| return |
| torch.cuda.set_device(args.local_rank) |
| torch.distributed.init_process_group("nccl", init_method="env://") |
|
|
|
|
| def save_videos(batch_tensors, savedir, filenames, fps=16): |
| |
| n_samples = batch_tensors.shape[1] |
| for idx, vid_tensor in enumerate(batch_tensors): |
| video = vid_tensor.detach().cpu() |
| video = torch.clamp(video.float(), -1.0, 1.0) |
| video = video.permute(2, 0, 1, 3, 4) |
| frame_grids = [ |
| torchvision.utils.make_grid(framesheet, nrow=int(n_samples)) |
| for framesheet in video |
| ] |
| grid = torch.stack(frame_grids, dim=0) |
| grid = (grid + 1.0) / 2.0 |
| grid = (grid * 255).to(torch.uint8).permute(0, 2, 3, 1) |
| savepath = os.path.join(savedir, f"{filenames[idx]}.mp4") |
| torchvision.io.write_video( |
| savepath, grid, fps=fps, video_codec="h264", options={"crf": "10"} |
| ) |