注意
点击 此处 下载完整的示例代码
单机模型并行最佳实践¶
作者:李申
模型并行广泛应用于分布式训练技术中。之前的文章解释了如何使用 DataParallel 在多个 GPU 上训练神经网络;此功能将相同的模型复制到所有 GPU,每个 GPU 使用输入数据的不同分区。虽然它可以显着加快训练过程,但它不适用于模型太大而无法放入单个 GPU 的某些用例。本文展示了如何通过使用**模型并行**来解决此问题,与 DataParallel
相比,它将单个模型拆分到不同的 GPU 上,而不是将整个模型复制到每个 GPU 上(具体来说,假设模型 m
包含 10 层:使用 DataParallel
时,每个 GPU 将拥有这 10 层中的每一层的副本,而当在两个 GPU 上使用模型并行时,每个 GPU 可以承载 5 层)。
模型并行的高级思想是将模型的不同子网络放置到不同的设备上,并相应地实现 forward
方法以在设备之间移动中间输出。由于只有模型的一部分在任何单个设备上运行,因此一组设备可以协同服务于更大的模型。在这篇文章中,我们不会尝试构建巨大的模型并将它们压缩到有限数量的 GPU 中。相反,本文重点展示模型并行的思想。读者可以将这些思想应用到实际应用中。
注意
对于跨越多个服务器的分布式模型并行训练,请参考 分布式 RPC 框架入门 以获取示例和详细信息。
基本用法¶
让我们从一个包含两个线性层的玩具模型开始。要在两个GPU上运行此模型,只需将每个线性层放在不同的GPU上,并相应地移动输入和中间输出以匹配层设备。
import torch
import torch.nn as nn
import torch.optim as optim
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = torch.nn.Linear(10, 10).to('cuda:0')
self.relu = torch.nn.ReLU()
self.net2 = torch.nn.Linear(10, 5).to('cuda:1')
def forward(self, x):
x = self.relu(self.net1(x.to('cuda:0')))
return self.net2(x.to('cuda:1'))
请注意,上面的 ToyModel
看起来与在单个GPU上实现它的方式非常相似,除了四个 to(device)
调用将线性层和张量放置在正确的设备上。这是模型中唯一需要更改的地方。 backward()
和 torch.optim
将自动处理梯度,就像模型在一个GPU上一样。您只需要确保标签与调用损失函数时的输出在同一设备上即可。
model = ToyModel()
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = model(torch.randn(20, 10))
labels = torch.randn(20, 5).to('cuda:1')
loss_fn(outputs, labels).backward()
optimizer.step()
将模型并行应用于现有模块¶
也可以通过几行代码更改来在多个GPU上运行现有的单GPU模块。以下代码展示了如何将 torchvision.models.resnet50()
分解到两个GPU上。其思想是从现有的 ResNet
模块继承,并在构造期间将层拆分到两个GPU上。然后,覆盖 forward
方法,通过相应地移动中间输出将两个子网络连接起来。
from torchvision.models.resnet import ResNet, Bottleneck
num_classes = 1000
class ModelParallelResNet50(ResNet):
def __init__(self, *args, **kwargs):
super(ModelParallelResNet50, self).__init__(
Bottleneck, [3, 4, 6, 3], num_classes=num_classes, *args, **kwargs)
self.seq1 = nn.Sequential(
self.conv1,
self.bn1,
self.relu,
self.maxpool,
self.layer1,
self.layer2
).to('cuda:0')
self.seq2 = nn.Sequential(
self.layer3,
self.layer4,
self.avgpool,
).to('cuda:1')
self.fc.to('cuda:1')
def forward(self, x):
x = self.seq2(self.seq1(x).to('cuda:1'))
return self.fc(x.view(x.size(0), -1))
上述实现解决了模型过大而无法容纳到单个GPU中的情况。但是,您可能已经注意到,如果您的模型适合,它将比在单个GPU上运行它慢。这是因为,在任何时间点,只有两个GPU中的一个在工作,而另一个则处于空闲状态。由于中间输出需要在 layer2
和 layer3
之间从 cuda:0
复制到 cuda:1
,因此性能进一步下降。
让我们进行一个实验,以更定量地了解执行时间。在这个实验中,我们通过运行随机输入和标签来训练 ModelParallelResNet50
和现有的 torchvision.models.resnet50()
。训练结束后,模型不会产生任何有用的预测,但我们可以对执行时间有一个合理的了解。
import torchvision.models as models
num_batches = 3
batch_size = 120
image_w = 128
image_h = 128
def train(model):
model.train(True)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)
one_hot_indices = torch.LongTensor(batch_size) \
.random_(0, num_classes) \
.view(batch_size, 1)
for _ in range(num_batches):
# generate random inputs and labels
inputs = torch.randn(batch_size, 3, image_w, image_h)
labels = torch.zeros(batch_size, num_classes) \
.scatter_(1, one_hot_indices, 1)
# run forward pass
optimizer.zero_grad()
outputs = model(inputs.to('cuda:0'))
# run backward pass
labels = labels.to(outputs.device)
loss_fn(outputs, labels).backward()
optimizer.step()
上面的 train(model)
方法使用 nn.MSELoss
作为损失函数,使用 optim.SGD
作为优化器。它模拟在 128 X 128
图像上进行训练,这些图像被组织成3个批次,每个批次包含120张图像。然后,我们使用 timeit
运行 train(model)
方法10次,并绘制执行时间及其标准差。
import matplotlib.pyplot as plt
plt.switch_backend('Agg')
import numpy as np
import timeit
num_repeat = 10
stmt = "train(model)"
setup = "model = ModelParallelResNet50()"
mp_run_times = timeit.repeat(
stmt, setup, number=1, repeat=num_repeat, globals=globals())
mp_mean, mp_std = np.mean(mp_run_times), np.std(mp_run_times)
setup = "import torchvision.models as models;" + \
"model = models.resnet50(num_classes=num_classes).to('cuda:0')"
rn_run_times = timeit.repeat(
stmt, setup, number=1, repeat=num_repeat, globals=globals())
rn_mean, rn_std = np.mean(rn_run_times), np.std(rn_run_times)
def plot(means, stds, labels, fig_name):
fig, ax = plt.subplots()
ax.bar(np.arange(len(means)), means, yerr=stds,
align='center', alpha=0.5, ecolor='red', capsize=10, width=0.6)
ax.set_ylabel('ResNet50 Execution Time (Second)')
ax.set_xticks(np.arange(len(means)))
ax.set_xticklabels(labels)
ax.yaxis.grid(True)
plt.tight_layout()
plt.savefig(fig_name)
plt.close(fig)
plot([mp_mean, rn_mean],
[mp_std, rn_std],
['Model Parallel', 'Single GPU'],
'mp_vs_rn.png')
结果表明,模型并行实现的执行时间比现有的单GPU实现长 4.02/3.75-1=7%
。因此,我们可以得出结论,在GPU之间来回复制张量大约有7%的开销。还有改进的空间,因为我们知道两个GPU中的一个在整个执行过程中都处于空闲状态。一种选择是将每个批次进一步划分为一个流水线的分裂,这样当一个分裂到达第二个子网络时,后续的分裂就可以被送入第一个子网络。这样,两个连续的分裂就可以在两个GPU上并发运行。
通过流水线输入加速¶
在以下实验中,我们将每个120图像的批次进一步划分为20图像的分裂。由于PyTorch异步启动CUDA操作,因此实现不需要生成多个线程来实现并发。
class PipelineParallelResNet50(ModelParallelResNet50):
def __init__(self, split_size=20, *args, **kwargs):
super(PipelineParallelResNet50, self).__init__(*args, **kwargs)
self.split_size = split_size
def forward(self, x):
splits = iter(x.split(self.split_size, dim=0))
s_next = next(splits)
s_prev = self.seq1(s_next).to('cuda:1')
ret = []
for s_next in splits:
# A. ``s_prev`` runs on ``cuda:1``
s_prev = self.seq2(s_prev)
ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))
# B. ``s_next`` runs on ``cuda:0``, which can run concurrently with A
s_prev = self.seq1(s_next).to('cuda:1')
s_prev = self.seq2(s_prev)
ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))
return torch.cat(ret)
setup = "model = PipelineParallelResNet50()"
pp_run_times = timeit.repeat(
stmt, setup, number=1, repeat=num_repeat, globals=globals())
pp_mean, pp_std = np.mean(pp_run_times), np.std(pp_run_times)
plot([mp_mean, rn_mean, pp_mean],
[mp_std, rn_std, pp_std],
['Model Parallel', 'Single GPU', 'Pipelining Model Parallel'],
'mp_vs_rn_vs_pp.png')
请注意,设备到设备的张量复制操作在源设备和目标设备上的当前流上同步。如果您创建多个流,则必须确保复制操作正确同步。在完成复制操作之前写入源张量或读取/写入目标张量会导致未定义的行为。上述实现仅使用源设备和目标设备上的默认流,因此不需要强制执行其他同步。
实验结果表明,将输入流水线化到模型并行ResNet50可以将训练过程加速大约 3.75/2.51-1=49%
。这仍然离理想的100%加速相差甚远。由于我们在流水线并行实现中引入了一个新的参数 split_sizes
,因此不清楚新参数如何影响整体训练时间。直观地说,使用小的 split_size
会导致许多微小的CUDA内核启动,而使用大的 split_size
会导致在第一个和最后一个分裂期间出现相对较长的空闲时间。两者都不是最佳的。对于这个特定实验,可能存在一个最佳的 split_size
配置。让我们尝试通过使用几个不同的 split_size
值运行实验来找到它。
means = []
stds = []
split_sizes = [1, 3, 5, 8, 10, 12, 20, 40, 60]
for split_size in split_sizes:
setup = "model = PipelineParallelResNet50(split_size=%d)" % split_size
pp_run_times = timeit.repeat(
stmt, setup, number=1, repeat=num_repeat, globals=globals())
means.append(np.mean(pp_run_times))
stds.append(np.std(pp_run_times))
fig, ax = plt.subplots()
ax.plot(split_sizes, means)
ax.errorbar(split_sizes, means, yerr=stds, ecolor='red', fmt='ro')
ax.set_ylabel('ResNet50 Execution Time (Second)')
ax.set_xlabel('Pipeline Split Size')
ax.set_xticks(split_sizes)
ax.yaxis.grid(True)
plt.tight_layout()
plt.savefig("split_size_tradeoff.png")
plt.close(fig)
结果表明,将 split_size
设置为12可以达到最快的训练速度,这可以使速度提高 3.75/2.43-1=54%
。仍然有机会进一步加速训练过程。例如, cuda:0
上的所有操作都放置在其默认流上。这意味着下一个分裂的计算不能与 prev
分裂的复制操作重叠。但是,由于 prev
和下一个分裂是不同的张量,因此将一个的计算与另一个的复制重叠没有问题。实现需要在两个GPU上使用多个流,并且不同的子网络结构需要不同的流管理策略。由于没有通用的多流解决方案适用于所有模型并行用例,因此我们不会在本教程中讨论它。
注意
这篇文章展示了一些性能测量结果。当您在自己的机器上运行相同的代码时,可能会看到不同的数字,因为结果取决于底层硬件和软件。为了获得您环境的最佳性能,一种合适的方法是首先生成曲线以找出最佳的分割大小,然后使用该分割大小来流水线化输入。
脚本的总运行时间:(2分钟37.338秒)