多进程包 - torch.multiprocessing¶
torch.multiprocessing 是对原生 multiprocessing
模块的包装器。
它注册自定义化简器,这些化简器使用共享内存来提供对不同进程中相同数据的共享视图。一旦张量/存储器被移动到共享内存(参见 share_memory_()
),就可以将其发送到其他进程,而无需进行任何复制。
该 API 与原始模块 100% 兼容 - 只需将 import multiprocessing
更改为 import torch.multiprocessing
,即可将所有通过队列发送或通过其他机制共享的张量移动到共享内存。
由于 API 的相似性,我们不对此包的大部分内容进行文档化,建议参考原始模块的详细文档。
警告
如果主进程意外退出(例如,由于接收到的信号),Python 的 multiprocessing
有时无法清理其子进程。这是一个已知问题,因此如果您在中断解释器后看到任何资源泄漏,这可能意味着您遇到了这种情况。
策略管理¶
- torch.multiprocessing.set_sharing_strategy(new_strategy)[source]¶
设置共享 CPU 张量的策略。
- 参数
new_strategy (str) – 所选策略的名称。应为
get_all_sharing_strategies()
返回的值之一。
共享 CUDA 张量¶
仅在 Python 3 中支持在进程之间共享 CUDA 张量,使用 spawn
或 forkserver
启动方法。
与 CPU 张量不同,发送进程需要保留原始张量,只要接收进程保留张量的副本即可。引用计数是在幕后实现的,但要求用户遵循以下最佳实践。
警告
如果消费者进程因致命信号而异常退出,共享张量可能会永远保留在内存中,只要发送进程正在运行。
尽快在消费者中释放内存。
## Good
x = queue.get()
# do somethings with x
del x
## Bad
x = queue.get()
# do somethings with x
# do everything else (producer have to keep x in memory)
2. 让生产者进程一直运行,直到所有消费者都退出。这将防止生产者进程释放仍在被消费者使用的内存的情况。
## producer
# send tensors, do something
event.wait()
## consumer
# receive tensors and use them
event.set()
不要传递接收到的张量。
# not going to work
x = queue.get()
queue_2.put(x)
# you need to create a process-local copy
x = queue.get()
x_clone = x.clone()
queue_2.put(x_clone)
# putting and getting from the same queue in the same process will likely end up with segfault
queue.put(tensor)
x = queue.get()
共享策略¶
本节简要概述了不同共享策略的工作原理。请注意,它仅适用于 CPU 张量 - CUDA 张量将始终使用 CUDA API,因为这是它们唯一可以共享的方式。
文件描述符 - file_descriptor
¶
注意
这是默认策略(除了 macOS 和 OS X,因为它们不支持该策略)。
此策略将使用文件描述符作为共享内存句柄。每当存储被移动到共享内存时,从 shm_open
获取的文件描述符将与对象一起缓存,当它要发送到其他进程时,文件描述符将被传递(例如通过 UNIX 套接字)给它。接收器也会缓存文件描述符并 mmap
它,以获得对存储数据的共享视图。
请注意,如果有很多张量被共享,此策略将一直保持大量的文件描述符处于打开状态。如果您的系统对打开文件描述符的数量有限制,并且您无法提高限制,则应使用 file_system
策略。
文件系统 - file_system
¶
此策略将使用给出给 shm_open
的文件名来标识共享内存区域。这有一个好处,即不需要实现缓存从它获取的文件描述符,但同时容易造成共享内存泄漏。文件不能在创建后立即删除,因为其他进程需要访问它来打开自己的视图。如果进程致命崩溃或被杀死,并且没有调用存储析构函数,文件将保留在系统中。这非常严重,因为它们会一直占用内存,直到系统重新启动或手动释放它们。
为了解决共享内存文件泄漏的问题,torch.multiprocessing
将产生一个名为 torch_shm_manager
的守护进程,它将与当前进程组隔离,并跟踪所有共享内存分配。一旦所有连接到它的进程退出,它将等待一段时间以确保不会有新的连接,并将迭代该组分配的所有共享内存文件。如果它发现其中任何一个仍然存在,它们将被释放。我们已经测试了这种方法,它被证明对各种故障是健壮的。尽管如此,如果您的系统有足够高的限制,并且 file_descriptor
是一个受支持的策略,我们不建议切换到这个策略。
生成子进程¶
注意
适用于 Python >= 3.4。
这取决于 Python 的 multiprocessing
包中的 spawn
启动方法。
生成多个子进程来执行某些函数可以通过创建 Process
实例并调用 join
来等待它们完成。当处理单个子进程时,这种方法可以很好地工作,但当处理多个进程时,会存在潜在的问题。
即,顺序地加入进程意味着它们将顺序地终止。如果它们没有,并且第一个进程没有终止,那么进程终止将不会被注意到。此外,没有用于错误传播的原生工具。
下面的 spawn
函数解决了这些问题,并处理了错误传播、无序终止,并且将在检测到其中一个进程中的错误时主动终止进程。
- torch.multiprocessing.spawn.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')[source]¶
生成
nprocs
个进程,这些进程运行使用args
的fn
。如果其中一个进程以非零退出状态退出,则其余进程将被杀死,并抛出一个包含终止原因的异常。如果在子进程中捕获到异常,它将被转发,并且其回溯将包含在父进程中抛出的异常中。
- 参数
fn (function) –
函数作为生成进程的入口点被调用。此函数必须在模块的顶层定义,以便它可以被腌制和生成。这是由 multiprocessing 强加的要求。
该函数被调用为
fn(i, *args)
,其中i
是进程索引,args
是传递的元组参数。args (tuple) – 传递给
fn
的参数。nprocs (int) – 要生成的进程数量。
join (bool) – 对所有进程执行阻塞加入。
daemon (bool) – 生成的进程的守护进程标志。如果设置为 True,则将创建守护进程。
start_method (str) – (已弃用)此方法将始终使用
spawn
作为启动方法。若要使用其他启动方法,请使用start_processes()
。
- 返回值
如果
join
为True
,则为 None;如果join
为False
,则为ProcessContext