简介 || 什么是 DDP || 单节点多 GPU 训练 || 容错 || 多节点训练 || minGPT 训练
使用 DDP 进行多 GPU 训练¶
按照以下视频进行操作,或在 youtube 上进行操作。
在 之前的教程 中,我们对 DDP 的工作原理进行了高级概述;现在我们来看看如何在代码中使用 DDP。在本教程中,我们从一个单 GPU 训练脚本开始,并将该脚本迁移到在单个节点上的 4 个 GPU 上运行。在此过程中,我们将讨论分布式训练中的重要概念,并在我们的代码中实现这些概念。
注意
如果您的模型包含任何 BatchNorm
层,则需要将其转换为 SyncBatchNorm
以同步 BatchNorm
层在副本之间的运行统计信息。
使用辅助函数 torch.nn.SyncBatchNorm.convert_sync_batchnorm(model) 将模型中的所有 BatchNorm
层转换为 SyncBatchNorm
。
single_gpu.py 与 multigpu.py 的差异
这些是您通常对单 GPU 训练脚本进行的更改,以启用 DDP。
导入¶
torch.multiprocessing
是围绕 Python 原生多处理的 PyTorch 包装器。分布式进程组包含所有可以相互通信和同步的进程。
import torch
import torch.nn.functional as F
from utils import MyTrainDataset
+ import torch.multiprocessing as mp
+ from torch.utils.data.distributed import DistributedSampler
+ from torch.nn.parallel import DistributedDataParallel as DDP
+ from torch.distributed import init_process_group, destroy_process_group
+ import os
构建进程组¶
首先,在初始化进程组之前,调用 set_device,它设置每个进程的默认 GPU。这对于防止 GPU:0 上的挂起或过度的内存使用至关重要。
进程组可以通过 TCP(默认)或从共享文件系统初始化。有关更多信息,请阅读 进程组初始化
init_process_group 初始化分布式进程组。
有关更多信息,请阅读 选择 DDP 后端
+ def ddp_setup(rank: int, world_size: int):
+ """
+ Args:
+ rank: Unique identifier of each process
+ world_size: Total number of processes
+ """
+ os.environ["MASTER_ADDR"] = "localhost"
+ os.environ["MASTER_PORT"] = "12355"
+ torch.cuda.set_device(rank)
+ init_process_group(backend="nccl", rank=rank, world_size=world_size)
构建 DDP 模型¶
- self.model = model.to(gpu_id)
+ self.model = DDP(model, device_ids=[gpu_id])
分配输入数据¶
DistributedSampler 将输入数据分成块,分布到所有分布式进程。
每个进程将接收一个包含 32 个样本的输入批次;有效批次大小为
32 * nprocs
,或者在使用 4 个 GPU 时为 128。
train_data = torch.utils.data.DataLoader(
dataset=train_dataset,
batch_size=32,
- shuffle=True,
+ shuffle=False,
+ sampler=DistributedSampler(train_dataset),
)
在每个 epoch 的开始调用
DistributedSampler
上的set_epoch()
方法对于使跨多个 epoch 的混洗正常工作是必要的。否则,在每个 epoch 中将使用相同的排序。
def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))[0])
+ self.train_data.sampler.set_epoch(epoch)
for source, targets in self.train_data:
...
self._run_batch(source, targets)
保存模型检查点¶
我们只需要从一个进程保存模型检查点。如果没有这个条件,每个进程都会保存其相同模式的副本。有关使用 DDP 保存和加载模型的更多信息,请阅读 此处
- ckp = self.model.state_dict()
+ ckp = self.model.module.state_dict()
...
...
- if epoch % self.save_every == 0:
+ if self.gpu_id == 0 and epoch % self.save_every == 0:
self._save_checkpoint(epoch)
警告
集体调用 是在所有分布式进程上运行的函数,它们用于将某些状态或值收集到特定进程。集体调用要求所有等级都运行集体代码。在本例中,_save_checkpoint 不应包含任何集体调用,因为它仅在 rank:0
进程上运行。如果您需要进行任何集体调用,它应该在 if self.gpu_id == 0
检查之前。
运行分布式训练作业¶
包括新的参数
rank
(替换device
)和world_size
。rank
在调用 mp.spawn 时由 DDP 自动分配。world_size
是整个训练作业中的进程数量。对于 GPU 训练,这对应于正在使用的 GPU 数量,每个进程都在专用的 GPU 上工作。
- def main(device, total_epochs, save_every):
+ def main(rank, world_size, total_epochs, save_every):
+ ddp_setup(rank, world_size)
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size=32)
- trainer = Trainer(model, train_data, optimizer, device, save_every)
+ trainer = Trainer(model, train_data, optimizer, rank, save_every)
trainer.train(total_epochs)
+ destroy_process_group()
if __name__ == "__main__":
import sys
total_epochs = int(sys.argv[1])
save_every = int(sys.argv[2])
- device = 0 # shorthand for cuda:0
- main(device, total_epochs, save_every)
+ world_size = torch.cuda.device_count()
+ mp.spawn(main, args=(world_size, total_epochs, save_every,), nprocs=world_size)