多进程最佳实践¶
torch.multiprocessing
是 Python multiprocessing
模块的替代品。它支持完全相同的操作,但对其进行了扩展,以便通过 multiprocessing.Queue
发送的所有张量都将其数据移动到共享内存中,并且只发送一个句柄到另一个进程。
注意
当一个 Tensor
被发送到另一个进程时,Tensor
数据是共享的。如果 torch.Tensor.grad
不是 None
,它也会被共享。当一个没有 torch.Tensor.grad
字段的 Tensor
被发送到另一个进程后,它会创建一个标准的进程特定的 .grad
Tensor
,该张量不会像 Tensor
的数据那样自动共享给所有进程。
这允许实现各种训练方法,如 Hogwild、A3C 或任何其他需要异步操作的方法。
多进程中的 CUDA¶
CUDA 运行时不支持 fork
启动方法;需要使用 spawn
或 forkserver
启动方法才能在子进程中使用 CUDA。
注意
可以通过使用 multiprocessing.get_context(...)
创建上下文或直接使用 multiprocessing.set_start_method(...)
来设置启动方法。
与 CPU 张量不同,只要接收进程保留了张量的副本,发送进程就需要保留原始张量。它是在后台实现的,但需要用户遵循最佳实践才能使程序正常运行。例如,只要消费者进程持有对该张量的引用,发送进程就必须保持活动状态,并且如果消费者进程通过致命信号异常退出,引用计数就无法保存。请参阅本节。
另请参阅:使用 nn.parallel.DistributedDataParallel 而不是 multiprocessing 或 nn.DataParallel
最佳实践和技巧¶
避免和解决死锁¶
生成新进程时,可能会出现很多问题,其中最常见的死锁原因是后台线程。如果有任何线程持有锁或导入了模块,并且调用了 fork
,则子进程很可能会处于损坏状态,并会死锁或以其他方式失败。请注意,即使您不这样做,Python 内置库也会这样做 - 无需比 multiprocessing
更进一步。 multiprocessing.Queue
实际上是一个非常复杂的类,它会生成多个线程来序列化、发送和接收对象,它们也会导致上述问题。如果您发现自己处于这种情况,请尝试使用 SimpleQueue
,它不使用任何额外的线程。
我们正在尽最大努力使您能够轻松地确保不会发生这些死锁,但有些事情是我们无法控制的。如果您遇到任何无法解决的问题,请尝试在论坛上寻求帮助,我们会看看这是否是我们可以解决的问题。
重用通过队列传递的缓冲区¶
请记住,每次将 Tensor
放入 multiprocessing.Queue
时,都必须将其移动到共享内存中。如果它已经被共享,那么这是一个空操作,否则它将导致额外的内存复制,从而降低整个进程的速度。即使您有一个进程池将数据发送到单个进程,也要让它将缓冲区发送回来 - 这几乎是免费的,并且可以让您在发送下一批数据时避免复制。
异步多进程训练(例如 Hogwild)¶
使用 torch.multiprocessing
,可以异步训练模型,参数可以一直共享,也可以定期同步。在前一种情况下,我们建议发送整个模型对象,而在后一种情况下,我们建议只发送 state_dict()
。
我们建议使用 multiprocessing.Queue
在进程之间传递各种 PyTorch 对象。例如,在使用 fork
启动方法时,可以继承已经在共享内存中的张量和存储,但这很容易出错,应该谨慎使用,并且只应该由高级用户使用。队列虽然有时不是一个优雅的解决方案,但在所有情况下都能正常工作。
警告
您应该小心使用全局语句,这些语句没有使用 if __name__ == '__main__'
进行保护。如果使用了 fork
以外的其他启动方法,那么它们将在所有子进程中执行。
Hogwild¶
您可以在 示例存储库 中找到具体的 Hogwild 实现,但为了展示代码的整体结构,下面还有一个最小示例
import torch.multiprocessing as mp
from model import MyModel
def train(model):
# Construct data_loader, optimizer, etc.
for data, labels in data_loader:
optimizer.zero_grad()
loss_fn(model(data), labels).backward()
optimizer.step() # This will update the shared parameters
if __name__ == '__main__':
num_processes = 4
model = MyModel()
# NOTE: this is required for the ``fork`` method to work
model.share_memory()
processes = []
for rank in range(num_processes):
p = mp.Process(target=train, args=(model,))
p.start()
processes.append(p)
for p in processes:
p.join()
多进程中的 CPU¶
不恰当的多进程处理会导致 CPU 过度订阅,导致不同进程争夺 CPU 资源,从而导致效率低下。
本教程将解释什么是 CPU 过度订阅以及如何避免它。
CPU 过度订阅¶
CPU 过度订阅是一个技术术语,指的是分配给系统的虚拟 CPU 总数超过硬件上可用的虚拟 CPU 总数的情况。
这会导致对 CPU 资源的严重争夺。在这种情况下,进程之间会频繁切换,这会增加进程切换开销并降低整体系统效率。
有关 CPU 过度订阅的代码示例,请参阅 示例存储库 中的 Hogwild 实现。
在 CPU 上使用 4 个进程运行训练示例时,使用以下命令
python main.py --num-processes 4
假设机器上有 N 个虚拟 CPU 可用,执行上述命令将生成 4 个子进程。每个子进程将为自己分配 N 个虚拟 CPU, resulting in a requirement of 4*N vCPUs. 然而,机器只有 N 个虚拟 CPU 可用。因此,不同的进程将争夺资源,导致频繁的进程切换。
以下观察结果表明存在 CPU 过度订阅
CPU 利用率高:通过使用
htop
命令,您可以观察到 CPU 利用率持续很高,通常达到或超过其最大容量。这表明对 CPU 资源的需求超过了可用的物理核心数量,导致进程之间争夺 CPU 时间。频繁的上下文切换和低系统效率:在 CPU 过度订阅的情况下,进程会争夺 CPU 时间,操作系统需要在不同进程之间快速切换以公平地分配资源。这种频繁的上下文切换会增加开销并降低整体系统效率。
避免 CPU 过度订阅¶
避免 CPU 过度订阅的一个好方法是适当的资源分配。确保并发运行的进程或线程数量不超过可用的 CPU 资源。
在这种情况下,一种解决方案是在子进程中指定适当的线程数。这可以通过使用子进程中的 torch.set_num_threads(int)
函数为每个进程设置线程数来实现。
假设机器上有 N 个虚拟 CPU,并且将生成 M 个进程,则每个进程使用的最大 num_threads
值为 floor(N/M)
。为了避免 mnist_hogwild 示例中的 CPU 过度订阅,需要对 示例存储库 中的 train.py
文件进行以下更改。
def train(rank, args, model, device, dataset, dataloader_kwargs):
torch.manual_seed(args.seed + rank)
#### define the num threads used in current sub-processes
torch.set_num_threads(floor(N/M))
train_loader = torch.utils.data.DataLoader(dataset, **dataloader_kwargs)
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
for epoch in range(1, args.epochs + 1):
train_epoch(epoch, args, model, device, train_loader, optimizer)
使用 torch.set_num_threads(floor(N/M))
为每个进程设置 num_thread
。其中,将 N 替换为可用的虚拟 CPU 数量,将 M 替换为选择的进程数量。适当的 num_thread
值将根据具体任务而异。但是,作为一般准则,num_thread
的最大值应为 floor(N/M)
,以避免 CPU 过度订阅。在 mnist_hogwild 训练示例中,在避免 CPU 过度订阅后,您可以实现 30 倍的性能提升。