• 文档 >
  • 多进程包 - torch.multiprocessing
快捷键

多进程包 - torch.multiprocessing ¶

torch.multiprocessing 是对原生 multiprocessing 模块的封装。

它注册了自定义的减少器,这些减少器使用共享内存来提供对同一数据的共享视图。一旦张量/存储被移动到共享内存(见 share_memory_() ),就可以将其发送到其他进程而无需复制。

API 与原始模块 100%兼容 - 只需将 import multiprocessing 更改为 import torch.multiprocessing ,即可使所有张量通过队列发送或通过其他机制共享,移动到共享内存中。

由于 API 的相似性,我们没有记录这个包的大部分内容,并建议参考原始模块的优秀文档。

警告

如果主进程突然退出(例如,由于接收到的信号),Python 的 multiprocessing 有时无法清理其子进程。这是一个已知的缺陷,所以如果您在中断解释器后看到任何资源泄漏,这很可能意味着这刚刚发生在您身上。

策略管理 ¶

torch.multiprocessing.get_all_sharing_strategies()[source][source]

返回当前系统支持的共享策略集合。

torch.multiprocessing.get_sharing_strategy()[source][source]

返回当前共享 CPU 张量的策略。

torch.multiprocessing.set_sharing_strategy(new_strategy)[source][source]

设置 CPU 张量的共享策略。

参数:

new_strategy (str) – 选择策略的名称。应该是 get_all_sharing_strategies() 返回的值之一。

共享 CUDA 张量

在 Python 3 中,仅支持使用 spawnforkserver 启动方法在进程间共享 CUDA 张量。

与 CPU 张量不同,发送进程需要保留原始张量,直到接收进程保留张量的副本。底层的引用计数已实现,但需要用户遵循以下最佳实践。

警告

如果消费者进程因致命信号异常死亡,只要发送进程正在运行,共享张量可能会永远保留在内存中。

  1. 消费者应尽快释放内存。

## Good
x = queue.get()
# do somethings with x
del x
## Bad
x = queue.get()
# do somethings with x
# do everything else (producer have to keep x in memory)

保持生产者进程运行,直到所有消费者退出。这将防止生产者进程释放消费者仍在使用的内存的情况。

## producer
# send tensors, do something
event.wait()
## consumer
# receive tensors and use them
event.set()
  1. 不要传递接收到的张量。

# not going to work
x = queue.get()
queue_2.put(x)
# you need to create a process-local copy
x = queue.get()
x_clone = x.clone()
queue_2.put(x_clone)
# putting and getting from the same queue in the same process will likely end up with segfault
queue.put(tensor)
x = queue.get()

共享策略 §

本节简要介绍了不同的共享策略是如何工作的。请注意,这仅适用于 CPU 张量 - CUDA 张量始终使用 CUDA API,因为这是它们唯一可以共享的方式。

文件描述符 - file_descriptor

注意

这是默认策略(除了 macOS 和 OS X,因为不支持)。

此策略将使用文件描述符作为共享内存句柄。每当存储被移动到共享内存时,从 shm_open 获得的文件描述符将与对象一起缓存,当它要发送到其他进程时,文件描述符将通过 UNIX 套接字等方式传递给它。接收方也将缓存文件描述符并 mmap 它,以获得对存储数据的共享视图。

注意,如果有很多张量需要共享,此策略将大部分时间保持大量文件描述符打开。如果您的系统对打开文件描述符的数量有低限制,并且您无法提高它们,则应使用 file_system 策略。

文件系统 - file_system

此策略将使用分配给 shm_open 的文件名来识别共享内存区域。这有一个好处,即不需要实现缓存从它获得的文件描述符,但同时也容易发生共享内存泄漏。文件在创建后不能立即删除,因为其他进程需要访问它以打开它们的视图。如果进程意外崩溃或被终止,并且没有调用存储析构函数,这些文件将保留在系统中。这非常严重,因为它们会持续占用内存,直到系统重启或手动释放。

为了解决共享内存文件泄漏的问题, torch.multiprocessing 将会启动一个名为 torch_shm_manager 的守护进程,该守护进程会将自己从当前进程组中隔离出来,并跟踪所有共享内存分配。一旦所有连接到它的进程退出,它将等待一段时间以确保不会有新的连接,然后遍历由该组分配的所有共享内存文件。如果它发现其中任何一个仍然存在,它们将被释放。我们已经测试了这种方法,并且它对各种故障都表现出鲁棒性。尽管如此,如果您的系统有足够的限制,并且 file_descriptor 是一种受支持的策略,我们不建议切换到这种方法。

启动子进程

注意

需要 Python >= 3.4

这取决于 Python 的 multiprocessing 包中的 spawn 启动方法。

通过创建 Process 实例并调用 join 等待其完成来启动多个子进程以执行某些功能。这种方法在处理单个子进程时效果良好,但在处理多个进程时可能会出现潜在问题。

具体来说,按顺序连接进程意味着它们将按顺序终止。如果它们没有这样做,并且第一个进程没有终止,进程终止将不会被注意到。此外,没有原生的错误传播机制。

下面的 spawn 函数解决了这些问题,并负责错误传播、非顺序终止,并在检测到其中一个进程出现错误时主动终止进程。

torch.multiprocessing.spawn.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')[source][source]

启动 nprocs 个进程,这些进程以 fn 运行 args

如果其中一个进程以非零退出状态退出,则剩余进程将被终止,并抛出一个异常,异常原因即为终止原因。如果在子进程中捕获到异常,则将其转发,并在父进程中抛出的异常中包含其堆栈跟踪。

参数:
  • fn (函数) –

    函数作为启动进程的入口点被调用。此函数必须在模块的顶层定义,以便可以被序列化并启动。这是由多进程模块强制要求的。

    函数调用格式为 fn(i, *args) ,其中 i 是进程索引, args 是通过的参数元组。

  • args(元组)- 传递给 fn 的参数。

  • nprocs(整数)- 要生成的进程数。

  • join(布尔值)- 对所有进程执行阻塞式 join 操作。

  • daemon(布尔值)- 生成的进程的守护进程标志。如果设置为 True,则将创建守护进程。

  • start_method(字符串)- (已弃用)此方法始终使用 spawn 作为启动方法。要使用不同的启动方法,请使用 start_processes()

返回:

如果 joinTrue ,则为 ProcessContext ,如果 joinFalse ,则为 ProcessContext

class torch.multiprocessing.SpawnContext[source][source]

spawn() 调用时返回 join=False

join(timeout=None, grace_period=None)[源码] ¶

在 spawn 上下文中连接一个或多个进程。

尝试在此 spawn 上下文中连接一个或多个进程。如果其中之一以非零退出状态退出,此函数将杀死剩余的进程(可选地带有宽限期)并引发异常,异常原因是最先退出的进程。

如果所有进程成功连接,则返回 True ,如果还有更多进程需要连接,则返回 False

参数:
  • 超时(浮点数)- 等待这么长时间(以秒为单位)后放弃等待。

  • 宽限期(浮点数)- 当任何进程失败时,等待这么长时间(以秒为单位)让其他进程优雅地关闭,然后终止它们。如果它们仍然没有退出,则等待另一个宽限期后再杀死它们。


© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,主题由 Read the Docs 提供。

文档

PyTorch 开发者文档全面访问

查看文档

教程

获取初学者和高级开发者的深入教程

查看教程

资源

查找开发资源并获得您的疑问解答

查看资源