• 文档 >
  • 多进程包 - torch.multiprocessing
快捷方式

多进程包 - torch.multiprocessing

torch.multiprocessing 是对原生 multiprocessing 模块的包装器。

它注册自定义的 reducer,这些 reducer 使用共享内存来提供对不同进程中相同数据的共享视图。一旦张量/存储被移动到共享内存(参见 share_memory_()),就可以将它发送到其他进程,而无需进行任何复制。

该 API 与原始模块 100% 兼容 - 只需将 import multiprocessing 更改为 import torch.multiprocessing,就可以通过队列发送所有张量,或通过其他机制共享所有张量,并将它们移到共享内存中。

由于 API 的相似性,我们不会记录此包的大部分内容,建议参考原始模块的优质文档。

警告

如果主进程突然退出(例如,由于传入信号),Python 的 multiprocessing 有时无法清理其子进程。这是一个已知的缺陷,因此如果您在中断解释器后看到任何资源泄漏,这可能意味着这种情况刚刚发生在您身上。

策略管理

torch.multiprocessing.get_all_sharing_strategies()[source]

返回当前系统支持的一组共享策略。

torch.multiprocessing.get_sharing_strategy()[source]

返回当前用于共享 CPU 张量的策略。

torch.multiprocessing.set_sharing_strategy(new_strategy)[source]

设置共享 CPU 张量的策略。

参数

new_strategy (str) – 所选策略的名称。应为 get_all_sharing_strategies() 返回的值之一。

共享 CUDA 张量

仅在 Python 3 中支持在进程之间共享 CUDA 张量,使用 spawnforkserver 启动方法。

与 CPU 张量不同,发送进程需要保留原始张量,只要接收进程保留了张量的副本。引用计数在后台实现,但要求用户遵循以下最佳实践。

警告

如果消费者进程因致命信号异常终止,共享张量可能会永远保留在内存中,只要发送进程正在运行。

  1. 尽快在消费者中释放内存。

## Good
x = queue.get()
# do somethings with x
del x
## Bad
x = queue.get()
# do somethings with x
# do everything else (producer have to keep x in memory)

2. 保持生产者进程运行,直到所有消费者退出。这将防止生产者进程释放消费者仍在使用的内存的情况。

## producer
# send tensors, do something
event.wait()
## consumer
# receive tensors and use them
event.set()
  1. 不要传递接收到的张量。

# not going to work
x = queue.get()
queue_2.put(x)
# you need to create a process-local copy
x = queue.get()
x_clone = x.clone()
queue_2.put(x_clone)
# putting and getting from the same queue in the same process will likely end up with segfault
queue.put(tensor)
x = queue.get()

共享策略

本节简要概述了不同共享策略的工作原理。请注意,它只适用于 CPU 张量 - CUDA 张量将始终使用 CUDA API,因为这是它们唯一可以共享的方式。

文件描述符 - file_descriptor

注意

这是默认策略(macOS 和 OS X 除外,它们不支持该策略)。

此策略将使用文件描述符作为共享内存句柄。每当存储被移动到共享内存时,从 shm_open 获取的文件描述符将与对象一起缓存,当它要发送到其他进程时,文件描述符将被传输(例如,通过 UNIX 套接字)到它。接收方也将缓存文件描述符并 mmap 它,以获得对存储数据的共享视图。

请注意,如果有很多张量共享,此策略将始终保持大量文件描述符打开。如果您的系统对打开的文件描述符数量有较低的限制,并且您无法提高它们,您应该使用 file_system 策略。

文件系统 - file_system

此策略将使用给定给 shm_open 的文件名来标识共享内存区域。这有一个好处,即不需要实现缓存从它获得的文件描述符,但同时容易发生共享内存泄漏。文件不能在创建后立即删除,因为其他进程需要访问它来打开它们的视图。如果进程致命崩溃或被杀死,并且没有调用存储析构函数,文件将保留在系统中。这非常严重,因为它们会一直占用内存,直到系统重新启动或手动释放它们。

为了解决共享内存文件泄漏的问题,torch.multiprocessing 会启动一个名为 torch_shm_manager 的守护进程,该进程会与当前进程组隔离,并跟踪所有共享内存分配。一旦所有连接到它的进程退出,它会等待一段时间以确保没有新的连接,然后会遍历该组分配的所有共享内存文件。如果发现其中任何文件仍然存在,它们将被释放。我们已经测试了这种方法,它被证明对各种故障具有鲁棒性。但是,如果您的系统具有足够高的限制,并且 file_descriptor 是支持的策略,我们不建议切换到这种方法。

启动子进程

注意

适用于 Python >= 3.4。

这取决于 Python 的 multiprocessing 包中的 spawn 启动方法。

可以通过创建 Process 实例并调用 join 来等待它们完成,从而启动多个子进程来执行某些功能。这种方法在处理单个子进程时效果很好,但在处理多个进程时会存在潜在问题。

也就是说,按顺序连接进程意味着它们将按顺序终止。如果它们没有,并且第一个进程没有终止,则进程终止将不会被注意到。此外,没有用于错误传播的原生工具。

下面的 spawn 函数解决了这些问题,并负责错误传播、乱序终止,并在检测到其中一个进程中的错误时主动终止进程。

torch.multiprocessing.spawn.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')[source]

生成 nprocs 个进程,这些进程运行 fn 并使用 args

如果其中一个进程以非零退出状态退出,则会杀死剩余的进程,并引发一个异常,其中包含终止原因。如果在子进程中捕获到异常,则会转发该异常,并且其回溯信息将包含在父进程中引发的异常中。

参数
  • fn (function) –

    该函数被调用作为生成进程的入口点。此函数必须在模块的顶层定义,以便可以将其腌制并生成。这是多处理强加的要求。

    该函数被调用为 fn(i, *args),其中 i 是进程索引,args 是传递的元组参数。

  • args (tuple) – 传递给 fn 的参数。

  • nprocs (int) – 要生成的进程数。

  • join (bool) – 对所有进程执行阻塞连接。

  • daemon (bool) – 生成的进程的守护进程标志。如果设置为 True,则会创建守护进程。

  • start_method (str) – (已弃用) 此方法将始终使用 spawn 作为启动方法。要使用其他启动方法,请使用 start_processes()

返回值

如果 joinTrue,则为 None;如果 joinFalse,则为 ProcessContext

class torch.multiprocessing.SpawnContext[source]

spawn() 使用 join=False 调用时返回。

join(timeout=None)

在 spawn 上下文中加入一个或多个进程。

尝试在 spawn 上下文中加入一个或多个进程。如果其中一个进程以非零退出状态退出,则此函数将杀死剩余的进程并引发一个包含第一个进程退出原因的异常。

如果所有进程都已成功加入,则返回 True;如果还有更多进程需要加入,则返回 False

参数

timeout (float) – 在放弃等待之前等待的时间。

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源