• 文档 >
  • 分布式通信包 - torch.distributed
快捷键

分布式通信包 - torch.distributed

注意

请参阅 PyTorch 分布式概述以了解有关分布式训练的所有相关功能的简要介绍。

后端

支持三个内置后端,每个后端都有不同的功能。下表显示了与 CPU / CUDA 张量一起可用的函数。如果用于构建 PyTorch 的实现支持,则 MPI 仅支持 CUDA。

后端

gloo

mpi

nccl

设备

CPU

GPU

CPU

GPU

CPU

GPU

发送

?

接收

?

广播

?

全量归约

?

归约

?

全局聚合

?

收集

?

分散

?

减少分散

全部到全部

?

障碍

?

随 PyTorch 一起提供的后端

PyTorch 分布式包支持 Linux(稳定)、MacOS(稳定)和 Windows(原型)。默认情况下,对于 Linux,Gloo 和 NCCL 后端已构建并包含在 PyTorch 分布式中(仅当使用 CUDA 构建时为 NCCL)。MPI 是一个可选后端,只能在从源代码构建 PyTorch 时包含。(例如,在已安装 MPI 的主机上构建 PyTorch。)

注意

截至 PyTorch v1.8 版本,Windows 支持所有集体通信后端,除了 NCCL,如果 init_process_group() 的 init_method 参数指向一个文件,它必须遵循以下模式:

  • 本地文件系统, init_method="file:///d:/tmp/some_file"

  • 共享文件系统, init_method="file://////{machine_name}/{share_folder_name}/some_file"

与 Linux 平台相同,您可以通过设置环境变量 MASTER_ADDR 和 MASTER_PORT 来启用 TcpStore。

使用哪个后端?

过去我们经常被问到:“我应该使用哪个后端?”

  • 经验法则

    • 使用 NCCL 后端进行分布式 GPU 训练

    • 使用 Gloo 后端进行分布式 CPU 训练。

  • 使用 InfiniBand 互连的 GPU 主机

    • 使用 NCCL,因为它是目前唯一支持 InfiniBand 和 GPUDirect 的后端。

  • 使用以太网互连的 GPU 主机

    • 使用 NCCL,因为它目前提供了最佳的分布式 GPU 训练性能,尤其是对于多进程单节点或多节点分布式训练。如果您在使用 NCCL 时遇到任何问题,请使用 Gloo 作为后备选项。(请注意,Gloo 目前对于 GPU 的运行速度比 NCCL 慢。)

  • 带有 InfiniBand 互连的 CPU 主机

    • 如果您的 InfiniBand 已启用 IP over IB,请使用 Gloo,否则请使用 MPI。我们计划在即将发布的版本中为 Gloo 添加 InfiniBand 支持。

  • 带有以太网互连的 CPU 主机

    • 使用 Gloo,除非你有特定理由使用 MPI。

常见环境变量 ¶

选择要使用的网络接口 ¶

默认情况下,NCCL 和 Gloo 后端都会尝试找到正确的网络接口来使用。如果自动检测到的接口不正确,您可以使用以下环境变量来覆盖它(适用于相应后端):

  • NCCL_SOCKET_IFNAME,例如 export NCCL_SOCKET_IFNAME=eth0

  • GLOO_SOCKET_IFNAME,例如 export GLOO_SOCKET_IFNAME=eth0

如果您使用 Gloo 后端,可以通过逗号分隔指定多个接口,例如: export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3 。后端将以轮询方式在这些接口上调度操作。所有进程必须在此变量中指定相同数量的接口,这是强制性的。

其他 NCCL 环境变量

调试 - 如果出现 NCCL 失败,您可以将 NCCL_DEBUG=INFO 设置为打印明确的警告信息以及基本的 NCCL 初始化信息。

您还可以使用 NCCL_DEBUG_SUBSYS 来获取关于 NCCL 特定方面的更多详细信息。例如, NCCL_DEBUG_SUBSYS=COLL 会打印集体调用的日志,这在调试挂起时可能很有帮助,尤其是由于集体类型或消息大小不匹配引起的挂起。在拓扑检测失败的情况下,将 NCCL_DEBUG_SUBSYS=GRAPH 设置为检查详细的检测结果并将其保存为参考,如果需要 NCCL 团队进一步的帮助,这将很有帮助。

性能调优 - NCCL 根据其拓扑检测自动进行调优,以节省用户的调优工作量。在某些基于套接字的系统上,用户仍然可以尝试调整 NCCL_SOCKET_NTHREADSNCCL_NSOCKS_PERTHREAD 以增加套接字网络带宽。这两个环境变量已经被 NCCL 为一些云提供商(如 AWS 或 GCP)预先调优。

有关 NCCL 环境变量的完整列表,请参阅 NVIDIA NCCL 的官方文档。

基础 §

torch.distributed 包提供了 PyTorch 的支持和跨多个计算节点(运行在一台或多台机器上)的通信原语,以支持多进程并行。类 torch.nn.parallel.DistributedDataParallel() 基于此功能,为任何 PyTorch 模型提供同步分布式训练作为包装器。这与 Multiprocessing 包提供的并行方式不同,即 torch.multiprocessing 和 torch.nn.DataParallel() ,因为它支持多个网络连接的机器,并且用户必须为每个进程显式启动主训练脚本的单独副本。

在单机同步的情况下,torch.distributed 或 torch.nn.parallel.DistributedDataParallel() 包装器可能仍然比其他数据并行方法具有优势,包括 torch.nn.DataParallel()

  • 每个进程维护自己的优化器,并在每次迭代中执行完整的优化步骤。虽然这看起来可能是多余的,因为梯度已经被收集并平均到各个进程中,因此对于每个进程都是相同的,但这意味着不需要参数广播步骤,从而减少了在节点之间传输张量所花费的时间。

  • 每个进程都包含一个独立的 Python 解释器,消除了从单个 Python 进程驱动多个执行线程、模型副本或 GPU 所带来的额外解释器开销和“GIL-thrashing”。这对于大量使用 Python 运行时的模型尤为重要,包括具有循环层的模型或许多小型组件的模型。

初始化

在调用任何其他方法之前,需要使用 torch.distributed.init_process_group()torch.distributed.device_mesh.init_device_mesh() 函数初始化该包。这两个函数都会阻塞,直到所有进程都加入。

警告

初始化不是线程安全的。进程组创建应从单个线程执行,以防止跨 rank 的“UUID”分配不一致,并防止初始化过程中可能导致的挂起。

torch.distributed.is_available()[source][source]

返回 True 如果分布式包可用。

否则, torch.distributed 不公开其他任何 API。目前, torch.distributed 在 Linux、MacOS 和 Windows 上可用。在从源代码构建 PyTorch 时设置 USE_DISTRIBUTED=1 以启用它。目前,默认值为 Linux 和 Windows 的 USE_DISTRIBUTED=1 ,MacOS 的 USE_DISTRIBUTED=0

返回类型:

布尔型

torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=- 1, rank=- 1, store=None, group_name='', pg_options=None, device_id=None)[source][source]

初始化默认的分布式进程组。

这也将初始化分布式包。

初始化进程组主要有两种方式:
  1. 明确指定 storerankworld_size

  2. 指定 init_method (一个 URL 字符串),表示如何发现对等节点。可选指定 rankworld_size ,或者将所有必需参数编码到 URL 中并省略它们。

如果两者都没有指定,则假定 init_method 为“env://”。

参数:
  • 后端(str 或 Backend,可选)- 要使用的后端。根据构建时配置,有效值包括 mpiglooncclucc 或由第三方插件注册的后端。自 2.6 版本起,如果未提供 backend ,c10d 将使用为设备类型指定的后端注册(如果提供了 device_id 关键字参数)。目前已知的默认注册包括: nccl 用于 cudagloo 用于 cpu 。如果未提供 backenddevice_id ,c10d 将在运行时机器上检测加速器并使用为该检测到的加速器注册的后端(或 cpu )。此字段可以作为小写字符串提供(例如, "gloo" ),也可以通过 Backend 属性访问(例如, Backend.GLOO )。如果使用 nccl 后端在每台机器上使用多个进程,则每个进程必须对每个使用的 GPU 具有独占访问权限,因为进程间共享 GPU 可能会导致死锁或 NCCL 无效使用。 ucc 后端是实验性的。

  • 初始化方法(字符串,可选)- 指定如何初始化进程组的 URL。如果没有指定 init_methodstore ,则默认为“env://”。与 store 互斥。

  • world_size(整数,可选)- 参与作业的进程数量。如果指定了 store ,则为必需。

  • rank(整数,可选)- 当前进程的排名(它应该是一个介于 0 和 world_size -1 之间的数字)。如果指定了 store ,则为必需。

  • store(存储,可选)- 可供所有工作者访问的键/值存储,用于交换连接/地址信息。与 init_method 互斥。

  • 超时(timedelta,可选)- 对进程组执行的操作的超时时间。默认值为 NCCL 的 10 分钟和其他后端的 30 分钟。这是集体操作将被异步中止并进程崩溃的持续时间。这是由于 CUDA 执行是异步的,继续执行用户代码不再安全,因为失败的异步 NCCL 操作可能会导致后续在损坏数据上运行的 CUDA 操作。当设置 TORCH_NCCL_BLOCKING_WAIT 时,进程将阻塞并等待此超时。

  • group_name(str,可选,已弃用)- 组名。此参数被忽略

  • pg_options(ProcessGroupOptions,可选)- 指定在构建特定进程组时需要传递的附加选项的进程组选项。目前,我们支持的是 ProcessGroupNCCL.Options 用于 nccl 后端, is_high_priority_stream 可以指定,以便 nccl 后端可以在计算内核等待时选择高优先级的 CUDA 流。有关配置 nccl 的其他可用选项,请参阅 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t

  • device_id (torch.device, optional) – 将此进程“绑定”到单个、特定的设备上,允许进行后端特定的优化。目前这有两个效果,仅在 NCCL 下:通信器立即形成(立即调用 ncclCommInit* 而不是正常的延迟调用)并且子组将尽可能使用 ncclCommSplit 以避免创建组的额外开销。如果您想尽早了解 NCCL 初始化错误,也可以使用此字段。

注意

要启用 backend == Backend.MPI ,PyTorch 需要在支持 MPI 的系统上从源代码构建。

注意

多个后端的支持是实验性的。目前如果没有指定后端,将创建 gloonccl 后端。 gloo 后端将用于与 CPU 张量相关的集体操作,而 nccl 后端将用于与 CUDA 张量相关的集体操作。可以通过传递格式为“:,:”的字符串来指定自定义后端,例如“cpu:gloo,cuda:custom_backend”。

torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None)[source][source]

根据 device_type、mesh_shape 和 mesh_dim_names 参数初始化 DeviceMesh。

这将创建一个具有 n 维数组布局的 DeviceMesh,其中 n 是 mesh_shape 的长度。如果提供了 mesh_dim_names,则每个维度将标记为 mesh_dim_names[i]。

注意

init_device_mesh 遵循 SPMD 编程模型,意味着相同的 PyTorch Python 程序在集群的所有进程/排名上运行。确保 mesh_shape(描述设备布局的 nD 数组的维度)在所有排名上相同。不一致的 mesh_shape 可能导致挂起。

注意

如果找不到进程组,init_device_mesh 将在幕后初始化所需的分布式进程组/组以进行分布式通信。

参数:
  • device_type (str) – 网状结构的设备类型。目前支持:“cpu”,“cuda/cuda-like”。不允许传入带有 GPU 索引的设备类型,如“cuda:0”。

  • mesh_shape (Tuple[int]) – 定义描述设备布局的多维数组维度的元组。

  • mesh_dim_names (Tuple[str], optional) – 一个元组,包含要分配给描述设备布局的多维数组每个维度的网状维度名称。其长度必须与 mesh_shape 的长度匹配。mesh_dim_names 中的每个字符串必须是唯一的。

返回:

A DeviceMesh 代表设备布局的对象。

返回类型:

设备网格

示例::
>>> from torch.distributed.device_mesh import init_device_mesh
>>>
>>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,))
>>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))
torch.distributed.is_initialized()[source][source]

检查默认进程组是否已初始化。

返回类型:

布尔型

torch.distributed.is_mpi_available()[source][source]

检查 MPI 后端是否可用。

返回类型:

布尔型

torch.distributed.is_nccl_available()[source][source]

检查 NCCL 后端是否可用。

返回类型:

布尔型

torch.distributed.is_gloo_available()[source][source]

检查 Gloo 后端是否可用。

返回类型:

布尔型

torch.distributed.distributed_c10d.is_xccl_available()[source][source]

检查 XCCL 后端是否可用。

返回类型:

布尔型

torch.distributed.is_torchelastic_launched()[source][source]

检查此进程是否以 torch.distributed.elastic (即 torchelastic)启动。

TORCHELASTIC_RUN_ID 环境变量的存在被用作判断当前进程是否以 torchelastic 启动的代理。这是一个合理的代理,因为 TORCHELASTIC_RUN_ID 映射到会合 ID,它始终是一个非空值,表示用于对等发现的作业 ID。

返回类型:

布尔型


目前支持三种初始化方法:

TCP 初始化 §

使用 TCP 初始化有两种方式,都需要一个所有进程都能访问的网络地址以及一个期望的 world_size 。第一种方式需要指定属于 rank 0 进程的地址。这种初始化方法要求所有进程都手动指定 rank。

注意,在最新的分布式包中不再支持多播地址。 group_name 也已弃用。

import torch.distributed as dist

# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
                        rank=args.rank, world_size=4)

共享文件系统初始化 §

另一种初始化方法利用的是组内所有机器都能访问的共享文件系统,以及一个期望的 world_size 。URL 应该以 file:// 开头,并包含共享文件系统上(在现有目录中)一个不存在的文件的路径。文件系统初始化将自动创建该文件(如果不存在),但不会删除该文件。因此,您需要确保在下次 init_process_group() 调用相同的文件路径/名称之前清理该文件。

注意,在最新发布的分布式包中不再支持自动排名分配,并且 group_name 也已弃用。

警告

此方法假设文件系统支持使用 fcntl 进行锁定 - 大多数本地系统和 NFS 都支持。

警告

此方法始终会创建文件,并尽力在程序结束时清理并删除文件。换句话说,每次使用文件初始化方法进行初始化时,都需要一个全新的空文件才能成功。如果再次使用之前初始化(未清理)的同一文件,这将是意外的行为,通常会导致死锁和失败。因此,尽管此方法会尽力清理文件,但如果自动删除失败,您有责任确保在训练结束时删除文件,以防止在下次使用时再次使用同一文件。如果您计划多次在同一个文件名上调用 init_process_group() ,这尤为重要。换句话说,如果文件未被删除/清理,并且您再次在文件上调用 init_process_group() ,则预期会出现失败。这里的经验法则是,确保每次调用 init_process_group() 时文件不存在或为空。

import torch.distributed as dist

# rank should always be specified
dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile',
                        world_size=4, rank=args.rank)

环境变量初始化

此方法将从环境变量中读取配置,允许用户完全自定义信息获取方式。需要设置的变量包括:

  • MASTER_PORT - 必需;必须在机器上具有排名 0 的空闲端口

  • MASTER_ADDR - 必需(排名 0 除外);排名 0 节点的地址

  • WORLD_SIZE - 必需;可以在此处设置,也可以在 init 函数调用中设置

  • RANK - 必需项;可以在此处设置,也可以在 init 函数调用时设置

将使用排名为 0 的机器来设置所有连接。

这是默认方法,意味着 init_method 不需要指定(或可以是 env:// )。

初始化后 ¶

运行 torch.distributed.init_process_group() 后,可以使用以下功能。要检查进程组是否已经初始化,请使用 torch.distributed.is_initialized()

class torch.distributed.Backend(name)[source][source]

一个类似于枚举的后端类。

可用后端:GLOO、NCCL、UCC、MPI、XCCL 以及其他已注册后端。

该类的值是全小写的字符串,例如, "gloo" 。它们可以作为属性访问,例如, Backend.NCCL

该类可以直接调用以解析字符串,例如, Backend(backend_str) 将检查 backend_str 是否有效,如果是,则返回解析后的全小写字符串。它也接受大写字符串,例如, Backend("GLOO") 返回 "gloo"

注意

条目 Backend.UNDEFINED 存在,但仅用作某些字段的初始值。用户不应直接使用它或假设其存在。

classmethod register_backend(name, func, extended_api=False, devices=None)[source][source]

使用给定的名称和实例化函数注册新的后端。

此类方法由第三方 ProcessGroup 扩展用于注册新的后端。

参数:
  • name (str) – ProcessGroup 扩展的后端名称。它应与 init_process_group() 中的名称匹配。

  • func (函数) – 实例化后端的函数处理器。该函数应在后端扩展中实现,并接受四个参数,包括 storerankworld_sizetimeout

  • extended_api (bool, 可选) – 后端是否支持扩展参数结构。默认: False 。如果设置为 True ,后端将获取 c10d::DistributedBackendOptions 的实例,以及后端实现定义的进程组选项对象。

  • device (str 或 str 列表,可选) – 此后端支持的设备类型,例如“cpu”,“cuda”等。如果为 None,则假设同时支持“cpu”和“cuda”。

注意

对第三方后端的支持是实验性的,可能随时更改。

torch.distributed.get_backend(group=None)[source][source]

返回给定进程组的后端。

参数:

group (ProcessGroup, 可选) – 要工作的进程组。默认为通用主进程组。如果指定了另一个特定组,调用进程必须是 group 的成员。

返回:

给定进程组的后端作为小写字符串。

返回类型:

后端

torch.distributed.get_rank(group=None)[source][source]

返回当前进程在提供的 group 中的排名,默认情况下返回。

排名是分配给分布式进程组中每个进程的唯一标识符。它们始终是连续的整数,范围从 0 到 world_size

参数:

group (进程组,可选) – 要工作的进程组。如果为 None,则使用默认进程组。

返回:

进程组的等级为-1,如果不是该组的一部分

返回类型:

int

torch.distributed.get_world_size(group=None)[source][source]

返回当前进程组中的进程数量。

参数:

group (进程组,可选) – 要工作的进程组。如果为 None,则使用默认进程组。

返回:

进程组的世界大小为-1,如果不属于该组

返回类型:

int

关闭

在退出时通过调用 destroy_process_group() 来清理资源是很重要的。

最简单的模式是在训练脚本中不再需要通信的地方,通常在 main()函数的末尾,通过调用 destroy_process_group() 来销毁每个进程组和后端,对于组参数使用默认值 None。这个调用应该在每个训练进程中只执行一次,而不是在外部进程启动器级别执行。

如果在超时时间内不是所有进程在进程组中调用 destroy_process_group() ,尤其是在应用程序中有多个进程组时(例如,对于 N-D 并行),则可能在退出时挂起。这是因为 ProcessGroupNCCL 的析构函数调用 ncclCommAbort,它必须集体调用,但如果由 Python 的 GC 调用,则调用 ProcessGroupNCCL 的析构函数的顺序是不确定的。调用 destroy_process_group() 有助于确保 ncclCommAbort 在各个进程中的调用顺序一致,并避免在 ProcessGroupNCCL 的析构函数中调用 ncclCommAbort。

重初始化

可以使用 destroy_process_group 来销毁单个进程组。一个用例可能是容错训练,其中进程组可能在运行时被销毁然后重新初始化。在这种情况下,在调用 destroy 之后和随后初始化之前,使用除 torch.distributed primitives 之外的其他方式同步训练进程至关重要。由于实现这种同步的难度,这种行为目前不受支持/未经验证,被视为已知问题。如果这是阻碍您的用例,请提交 github 问题或 RFC。


默认情况下,集体操作在默认组(也称为世界)上运行,并要求所有进程进入分布式函数调用。然而,某些工作负载可以从更细粒度的通信中受益。这时分布式组就派上用场了。可以使用 new_group() 函数创建新组,包含所有进程的任意子集。它返回一个不可见的组句柄,可以作为 group 参数传递给所有集体(集体是在某些已知编程模式中交换信息的分布式函数)。

torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None, device_id=None)[source][source]

创建一个新的分布式组。

此函数要求主组中的所有进程(即所有属于分布式作业的进程)进入此函数,即使它们不是组的成员。此外,所有进程应按相同顺序创建组。

警告

安全并发使用:在使用带有 NCCL 后端的多个进程组时,用户必须确保跨节点集体操作的执行顺序全局一致。

如果进程内的多个线程发出集体操作,则需要显式同步以确保顺序一致。

当使用 torch.distributed 通信 API 的异步变体时,会返回一个工作对象,并将通信内核入队到单独的 CUDA 流中,允许通信和计算的重叠。一旦在一个进程组中发出一个或多个异步操作,它们必须通过调用 work.wait() 与其他 CUDA 流同步,然后再使用另一个进程组。

更多详情请参阅《同时使用多个 NCCL 通信器的使用方法》。

参数:
  • ranks (list[int]) – 群组成员的等级列表。如果为 None ,则将设置为所有等级。默认为 None

  • timeout (timedelta, optional) – 详细信息和默认值请参阅 init_process_group。

  • backend (str or Backend, optional) – 要使用的后端。根据构建时配置,有效值是 gloonccl 。默认使用与全局组相同的后端。此字段应作为小写字符串提供(例如, "gloo" ),也可以通过 Backend 属性访问(例如, Backend.GLOO )。如果传入 None ,则将使用对应于默认进程组的后端。默认为 None

  • pg_options (ProcessGroupOptions, optional) – 指定在构建特定进程组时需要传递的附加选项的进程组选项。例如,对于 nccl 后端,可以指定 is_high_priority_stream 以使进程组能够获取高优先级的 CUDA 流。有关配置 nccl 的其他可用选项,请参阅 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t

  • 使用本地同步(bool,可选)- 在进程组创建结束时执行组内屏障。与之前不同,非成员 rank 无需调用 API 也不需要加入屏障。

  • group_desc(str,可选)- 描述进程组的字符串。

  • device_id(torch.device,可选)- 将此进程“绑定”到单个、特定的设备上,如果提供此字段,new_group 调用将尝试立即为该设备初始化通信后端。

返回:

可用于集体调用或 GroupMember.NON_GROUP_MEMBER 的分布式组的句柄,如果 rank 不是 ranks 的一部分。

注意:use_local_synchronization 与 MPI 不兼容。

注意:当 use_local_synchronization=True 时,在较大的集群和较小的进程组中可能会显著提高速度,但必须小心,因为它会改变集群的行为,因为非成员排名不会加入 group barrier()。

注意:当每个排名创建多个重叠的进程组时,use_local_synchronization=True 可能会导致死锁。为了避免这种情况,请确保所有排名遵循相同的全局创建顺序。

torch.distributed.get_group_rank(group, global_rank)[source][source]

将全局排名转换为组排名。

global_rank 必须是 group 的部分,否则会引发 RuntimeError。

参数:
  • 组(ProcessGroup)- 用于查找相对排名的 ProcessGroup。

  • global_rank(int)- 要查询的全局排名。

返回:

global_rank 相对于 group 的组别排名

返回类型:

int

注意。在默认进程组上调用此函数返回恒等值

torch.distributed.get_global_rank(group, group_rank)[source][source]

将组别排名转换为全局排名。

group_rank 必须是组的一部分,否则会引发 RuntimeError。

参数:
  • group (ProcessGroup) – 要查找全局排名的 ProcessGroup。

  • group_rank (int) – 要查询的组排名。

返回:

group_rank 相对于 group 的全局排名。

返回类型:

int

注意:在默认进程组上调用此函数返回恒等映射

torch.distributed.get_process_group_ranks(group)[source][source]

获取与 group 关联的所有 rank。

参数:

group (进程组) – 获取所有 rank 的进程组。

返回:

全球排名列表,按组排名排序。

返回类型:

list[int]

设备网格

设备网格是一种高级抽象,用于管理进程组(或 NCCL 通信器)。它允许用户轻松创建跨节点和节点内进程组,无需担心如何为不同的子进程组正确设置排名,并有助于轻松管理这些分布式进程组。可以使用 init_device_mesh() 函数创建新的设备网格,其中网格形状描述了设备拓扑。

class torch.distributed.device_mesh.DeviceMesh(device_type, mesh, *, mesh_dim_names=None, _init_backend=True)[source][source]

DeviceMesh 表示设备网格,其中设备的布局可以用 n 维维度数组表示,n 维数组的每个值都是默认进程组 rank 的全局 ID。

DeviceMesh 可用于描述集群中设备的布局,并在集群内部设备列表之间作为通信的代理。

DeviceMesh 可以用作上下文管理器。

注意

DeviceMesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序在集群中的所有进程/排名上运行。因此,用户需要确保网格数组(描述设备布局)在所有排名上应保持一致。不一致的网格将导致静默挂起。

参数:
  • device_type(字符串)- 网格的设备类型。目前支持:“cpu”,“cuda/cuda-like”。

  • mesh(ndarray)- 一个多维数组或整数张量,描述设备的布局,其中 ID 是默认进程组的全局 ID。

返回:

代表设备布局的 DeviceMesh 对象。

返回类型:

设备网格

以下程序以 SPMD 方式在每个进程/排名上运行。在这个例子中,我们有 2 个主机,每个主机有 4 个 GPU。对网格的第一个维度的归约将跨越列(0, 4)..和(3, 7),对网格的第二个维度的归约将跨越行(0, 1, 2, 3)和(4, 5, 6, 7)。

示例::
>>> from torch.distributed.device_mesh import DeviceMesh
>>>
>>> # Initialize device mesh as (2, 4) to represent the topology
>>> # of cross-host(dim 0), and within-host (dim 1).
>>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
static from_group(group, device_type, mesh=None, *, mesh_dim_names=None)[source][source]

从现有的 ProcessGroup 或现有 ProcessGroup 的列表中构建 DeviceMeshdevice_type

构建的设备网格的维度数等于传入的组数。例如,如果传入一个进程组,则生成的 DeviceMesh 是一个一维网格。如果传入 2 个进程组列表,则生成的 DeviceMesh 是一个二维网格。

如果传入多个组,则必须提供 meshmesh_dim_names 参数。传入的进程组的顺序决定了网格的拓扑结构。例如,第一个进程组将是 DeviceMesh 的 0 维。传入的网格张量必须具有与传入的进程组数相同的维度数,并且网格张量中维度的顺序必须与传入的进程组顺序匹配。

参数:
  • group(进程组或进程组列表)- 已存在的 ProcessGroup 或已存在的 ProcessGroup 列表。

  • device_type(字符串)- 网格的设备类型。目前支持:“cpu”,“cuda/cuda-like”。不允许传入带有 GPU 索引的设备类型,例如“cuda:0”。

  • mesh(torch.Tensor 或 ArrayLike,可选)- 描述设备布局的多维数组或整数张量,其中 ID 是默认进程组的全局 ID。默认值为 None。

  • mesh_dim_names(tuple[str],可选)- 将分配给描述设备布局的多维数组每个维度的网格维度名称的元组。其长度必须与 mesh_shape 的长度匹配。mesh_dim_names 中的每个字符串必须是唯一的。默认值为 None。

返回:

代表设备布局的 DeviceMesh 对象。

返回类型:

DeviceMesh

get_all_groups()[source][source]

返回所有网格维度的 ProcessGroups 列表。

返回:

一个包含 ProcessGroup 对象的列表。

返回类型:

list[torch.distributed.distributed_c10d.ProcessGroup]

get_coordinate()[source][source]

返回此秩相对于网格所有维度的相对索引。如果此秩不属于网格,则返回 None。

返回类型:

Optional[list[int]]

get_group(mesh_dim=None)[来源][来源] ¶

返回由 mesh_dim 指定的单个 ProcessGroup,如果未指定 mesh_dim 且 DeviceMesh 为 1 维,则返回网格中的唯一 ProcessGroup。

参数:
  • mesh_dim (str/python:int, 可选) – 它可以是网格维度的名称或索引

  • None.(网格维度的。默认为) –

返回:

一个 ProcessGroup 对象。

返回类型:

流程组

get_local_rank(mesh_dim=None)[来源][来源] ¶

返回给定 DeviceMesh 的 mesh_dim 的本地排名。

参数:
  • mesh_dim (str/python:int, 可选) – 它可以是网格维度的名称或索引

  • None. (网格维度的。默认是) –

返回:

整数表示局部排名。

返回类型:

int

以下程序以 SPMD 方式在每个进程/排名上运行。在这个例子中,我们有 2 个主机,每个主机有 4 个 GPU。在排名 0、1、2、3 上调用 mesh_2d.get_local_rank(mesh_dim=0)将返回 0。在排名 4、5、6、7 上调用 mesh_2d.get_local_rank(mesh_dim=0)将返回 1。在排名 0、4 上调用 mesh_2d.get_local_rank(mesh_dim=1)将返回 0。在排名 1、5 上调用 mesh_2d.get_local_rank(mesh_dim=1)将返回 1。在排名 2、6 上调用 mesh_2d.get_local_rank(mesh_dim=1)将返回 2。在排名 3、7 上调用 mesh_2d.get_local_rank(mesh_dim=1)将返回 3。

示例::
>>> from torch.distributed.device_mesh import DeviceMesh
>>>
>>> # Initialize device mesh as (2, 4) to represent the topology
>>> # of cross-host(dim 0), and within-host (dim 1).
>>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
get_rank()[source][source]

返回当前的全局排名。

返回类型:

int

点对点通信

torch.distributed.send(tensor, dst=None, group=None, tag=0, group_dst=None)[source][source]

同步发送张量。

警告

NCCL 后端不支持 tag

参数:
  • 张量(Tensor)- 要发送的张量。

  • dst(整数)- 全局进程组上的目标 rank(无论 group 参数)。目标 rank 不应与当前进程的 rank 相同。

  • group(进程组,可选)- 要工作的进程组。如果为 None,则使用默认进程组。

  • tag(整数,可选)- 用于匹配发送与远程接收的标签

  • group_dst(整数,可选)- 目标 rank 在 group 上。不能同时指定 dstgroup_dst

torch.distributed.recv(tensor, src=None, group=None, tag=0, group_src=None)[source][source]

同步接收张量。

警告

NCCL 后端不支持 tag

参数:
  • 张量(Tensor)- 用接收到的数据填充的张量。

  • src(int,可选)- 全局进程组上的源 rank(无论 group 参数)。如果未指定,将从任何进程接收。

  • group(进程组,可选)- 要工作的进程组。如果为 None,则使用默认进程组。

  • tag(整数,可选)- 用于匹配接收与远程发送的标签

  • group_src(整数,可选)- 目标 rank 在 group 上。同时指定 srcgroup_src 是无效的。

返回:

发送者 rank 为-1,如果不在组内

返回类型:

int

isend()irecv() 使用时返回分布式请求对象。通常,此对象类型未指定,因为它们不应手动创建,但它们保证支持两种方法:

  • is_completed() - 如果操作已完成则返回 True

  • wait() - 将阻塞进程直到操作完成。 is_completed() 一旦返回将保证返回 True。

torch.distributed.isend(tensor, dst=None, group=None, tag=0, group_dst=None)[source][source]

异步发送张量。

警告

在请求完成之前修改 tensor 会导致未定义的行为。

警告

tag 不支持 NCCL 后端。

与阻塞的 send 不同,isend 允许 src == dst rank,即发送到自身。

参数:
  • 张量(Tensor)- 要发送的张量。

  • dst(整数)- 在全局进程组上的目标 rank(无论 group 参数)

  • group(进程组,可选)- 要工作的进程组。如果为 None,则使用默认进程组。

  • tag(整数,可选)- 标签以匹配发送与远程接收。

  • group_dst (int, 可选) – 目标秩在 group 上。同时指定 dstgroup_dst 是无效的

返回:

分布式请求对象。如果不是组的一部分则为 None

返回类型:

可选[工作]

torch.distributed.irecv(tensor, src=None, group=None, tag=0, group_src=None)[源代码][源代码]

异步接收张量。

警告

NCCL 后端不支持 tag

与 recv 不同,recv 是阻塞的,irecv 允许 src == dst rank,即从自身接收。

参数:
  • 张量(Tensor)- 用接收到的数据填充的张量。

  • 源(int,可选)- 全局进程组上的源排名(无论是否指定 group 参数)。如果未指定,将接收来自任何进程的数据。

  • 组(ProcessGroup,可选)- 要工作的进程组。如果为 None,将使用默认进程组。

  • 标签(int,可选)- 用于匹配接收与远程发送的标签

  • 目标_rank(int,可选)- 在 group 上的目标排名。不能同时指定 srcgroup_src

返回:

分布式请求对象。如果不是组的一部分则为空

返回类型:

可选[工作]

torch.distributed.send_object_list(object_list, dst=None, group=None, device=None, group_dst=None)[source][source]

以同步方式发送可序列化的对象列表。

send() 类似,但可以传递 Python 对象。注意, object_list 中的所有对象都必须是可序列化的,才能发送。

参数:
  • object_list (List[Any]) – 要发送的输入对象列表。每个对象都必须是可序列化的。接收方必须提供大小相等的列表。

  • dst (int) – 发送 object_list 的目标排名。目标排名基于全局进程组(不考虑 group 参数)

  • group (Optional[ProcessGroup]) – (ProcessGroup, 可选): 要工作的进程组。如果为 None,则使用默认进程组。默认为 None

  • device ( torch.device , 可选) – 如果不为 None,则对象将被序列化并转换为张量,在发送前移动到 device 。默认为 None

  • group_dst (int, 可选) – 在 group 上的目标排名。必须指定 dstgroup_dst 中的一个,但不能同时指定两个

返回:

None.

注意

对于基于 NCCL 的过程组,对象内部张量表示必须在通信之前移动到 GPU 设备上。在这种情况下,使用的设备由 torch.cuda.current_device() 指定,并且用户有责任确保这一点,以便每个 rank 都有一个单独的 GPU,通过 torch.cuda.set_device()

警告

send_object_list() 隐式使用 pickle 模块,已知此模块不安全。可以构造恶意的 pickle 数据,在反序列化过程中执行任意代码。仅在使用您信任的数据时调用此函数。

警告

使用 GPU 张量调用 send_object_list() 不受良好支持且效率低下,因为它会涉及 GPU 到 CPU 的传输,因为张量将被序列化。请考虑使用 send() 代替。

示例::
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes backend is not NCCL
>>> device = torch.device("cpu")
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 2.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>>     dist.send_object_list(objects, dst=1, device=device)
>>> else:
>>>     objects = [None, None, None]
>>>     dist.recv_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]
torch.distributed.recv_object_list(object_list, src=None, group=None, device=None, group_src=None)[source][source]

同步接收可序列化的对象。

recv() 类似,但可以接收 Python 对象。

参数:
  • object_list (List[Any]) – 要接收的对象列表。必须提供与发送列表大小相等的尺寸列表。

  • src (int, 可选) – 接收 object_list 的源 rank。源 rank 基于全局进程组(无论 group 参数如何)。如果设置为 None,则从任何 rank 接收。默认为 None

  • group (Optional[ProcessGroup]) – (ProcessGroup, optional): 要工作的进程组。如果为 None,则使用默认进程组。默认为 None

  • device ( torch.device , optional) – 如果不为 None,则在设备上接收。默认为 None

  • group_src (int, optional) – 在 group 上的目标排名。如果指定了 srcgroup_src ,则无效。

返回:

发送者排名。如果排名不是组的一部分,则为 -1。如果排名是组的一部分, object_list 将包含来自 src 排名的发送对象。

注意

对于基于 NCCL 的过程组,对象内部张量表示必须在通信之前移动到 GPU 设备上。在这种情况下,使用的设备由 torch.cuda.current_device() 指定,并且用户有责任确保这一点,以便每个 rank 都有一个单独的 GPU,通过 torch.cuda.set_device()

警告

recv_object_list() 隐式使用 pickle 模块,已知此模块不安全。可以构造恶意的 pickle 数据,在反序列化过程中执行任意代码。仅在使用您信任的数据时调用此函数。

警告

使用 GPU 张量调用 recv_object_list() 不受良好支持且效率低下,因为它会涉及 GPU 到 CPU 的传输,因为张量会被序列化。请考虑使用 recv()

示例::
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes backend is not NCCL
>>> device = torch.device("cpu")
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 2.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>>     dist.send_object_list(objects, dst=1, device=device)
>>> else:
>>>     objects = [None, None, None]
>>>     dist.recv_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]
torch.distributed.batch_isend_irecv(p2p_op_list)[source][source]

异步发送或接收一批张量并返回请求列表。

处理 p2p_op_list 中的每个操作并返回相应的请求。目前支持 NCCL、Gloo 和 UCC 后端。

参数:

p2p_op_list (list[torch.distributed.distributed_c10d.P2POp]) – 一个点对点操作列表(每个操作器的类型为 torch.distributed.P2POp )。列表中 isend/irecv 的顺序很重要,需要与远程端的相应 isend/irecv 匹配。

返回:

由调用 op_list 中相应操作返回的分布式请求对象列表。

返回类型:

list[torch.distributed.distributed_c10d.Work]

示例

>>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank
>>> recv_tensor = torch.randn(2, dtype=torch.float32)
>>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1) % world_size)
>>> recv_op = dist.P2POp(
...     dist.irecv, recv_tensor, (rank - 1 + world_size) % world_size
... )
>>> reqs = batch_isend_irecv([send_op, recv_op])
>>> for req in reqs:
>>>     req.wait()
>>> recv_tensor
tensor([2, 3])     # Rank 0
tensor([0, 1])     # Rank 1

注意

注意,当使用 NCCL PG 后端调用此 API 时,用户必须使用 torch.cuda.set_device 设置当前 GPU 设备,否则可能会导致意外的挂起问题。

此外,如果此 API 是传递给 dist.P2POpgroup 中的第一个集体调用,则 group 的所有等级必须参与此 API 调用;否则,行为未定义。如果此 API 调用不是 group 中的第一个集体调用,则允许仅涉及 group 子集等级的批处理 P2P 操作。

class torch.distributed.P2POp(op, tensor, peer=None, group=None, tag=0, group_peer=None)[source][source]

一个用于构建 batch_isend_irecv 点对点操作的类。

此类构建 P2P 操作类型、通信缓冲区、对等方等级、进程组和标签。此类实例将被传递给 batch_isend_irecv 以进行点对点通信。

参数:
  • op (Callable) – 一个用于向或从对等进程发送或接收数据的函数。 op 的类型为 torch.distributed.isendtorch.distributed.irecv 之一。

  • tensor (Tensor) – 要发送或接收的张量。

  • peer (int, optional) – 目标或源排名。

  • group (ProcessGroup, optional) – 要工作的进程组。如果为 None,则使用默认进程组。

  • 标签(int,可选)- 用于匹配发送与接收的标签。

  • group_peer(int,可选)- 目标或源排名。

同步和异步集体操作 ¶

每个集体操作函数都支持以下两种操作,这取决于传递给集体操作的 async_op 标志的设置:

同步操作 - 默认模式,当 async_op 设置为 False 时。当函数返回时,保证执行集体操作。在 CUDA 操作的情况下,不能保证 CUDA 操作完成,因为 CUDA 操作是异步的。对于 CPU 集体操作,任何进一步调用集体操作输出的函数都将按预期行为。对于 CUDA 集体操作,在同一 CUDA 流上使用输出的函数调用将按预期行为。用户必须注意在不同流下运行时的同步问题。有关 CUDA 语义(如流同步)的详细信息,请参阅 CUDA 语义。请参阅下面的脚本,以查看 CPU 和 CUDA 操作在这些语义上的差异示例。

异步操作 - 当 async_op 设置为 True 时。集体操作函数返回一个分布式请求对象。通常,您不需要手动创建它,并且它保证支持两种方法:

  • is_completed() - 在 CPU 集体操作的情况下,如果完成则返回 True 。在 CUDA 操作的情况下,如果操作已成功入队到 CUDA 流中,并且输出可以在默认流上使用而无需进一步同步,则返回 True

  • wait() - 在 CPU 集体操作的情况下,将阻塞进程直到操作完成。在 CUDA 集体操作的情况下,将阻塞当前活动的 CUDA 流直到操作完成(但不会阻塞 CPU)。

  • get_future() - 返回 torch._C.Future 对象。支持 NCCL,也支持 GLOO 和 MPI 上的大多数操作,但除了对等操作外。注意:随着我们继续采用 Futures 和合并 API, get_future() 调用可能变得不再必要。

示例

以下代码可以作为使用分布式集体操作时 CUDA 操作的语义参考。它显示了在使用不同 CUDA 流上的集体输出时显式同步的必要性:

# Code runs on each rank.
dist.init_process_group("nccl", rank=rank, world_size=2)
output = torch.tensor([rank]).cuda(rank)
s = torch.cuda.Stream()
handle = dist.all_reduce(output, async_op=True)
# Wait ensures the operation is enqueued, but not necessarily complete.
handle.wait()
# Using result on non-default stream.
with torch.cuda.stream(s):
    s.wait_stream(torch.cuda.default_stream())
    output.add_(100)
if rank == 0:
    # if the explicit call to wait_stream was omitted, the output below will be
    # non-deterministically 1 or 101, depending on whether the allreduce overwrote
    # the value after the add completed.
    print(output)

集合函数 ¶

torch.distributed.broadcast(tensor, src=None, group=None, async_op=False, group_src=None)[source][source]

将张量广播到整个组。

tensor 所有参与集体操作的所有进程中必须具有相同数量的元素。

参数:
  • 张量(Tensor)- 如果 src 是当前进程的秩,则发送的数据,否则用于保存接收数据的张量。

  • src(整数)- 在全局进程组上的源秩(无论 group 参数如何)。

  • group(ProcessGroup,可选)- 要工作的进程组。如果为 None,则使用默认进程组。

  • async_op(布尔值,可选)- 此操作是否应为异步操作。

  • group_src(int)- 在 group 上的源排名。必须指定 group_srcsrc 中的一个,但不能同时指定两个。

返回:

异步工作处理,如果 async_op 设置为 True。如果没有 async_op 或者不是组的一部分,则为 None。

torch.distributed.broadcast_object_list(object_list, src=None, group=None, device=None, group_src=None)[source][source]

object_list 中的可序列化对象广播到整个组。

broadcast() 相似,但可以传入 Python 对象。注意, object_list 中的所有对象都必须是可序列化的,以便进行广播。

参数:
  • object_list (List[Any]) – 要广播的输入对象列表。每个对象都必须是可序列化的。只有 src 排名上的对象将被广播,但每个排名必须提供大小相等的列表。

  • src (int) – 要从哪个源排名广播 object_list 。源排名基于全局进程组(不考虑 group 参数)。

  • group (Optional[ProcessGroup]) – (ProcessGroup, 可选):要工作的进程组。如果为 None,则使用默认进程组。默认为 None

  • 设备( torch.device ,可选)- 如果不为空,则将对象序列化并转换为张量,然后移动到 device 进行广播。默认为 None

  • group_src(int)- group 上的源秩。不得同时指定 group_srcsrc 中的一个,但不能同时都不指定。

返回:

None 。如果秩是组的一部分, object_list 将包含来自 src 秩的广播对象。

注意

对于基于 NCCL 的进程组,在通信之前必须将对象的内部张量表示移动到 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,并且用户有责任确保这样设置,以便每个秩都有一个单独的 GPU,通过 torch.cuda.set_device()

注意

注意,此 API 与 broadcast() 集体略有不同,因为它不提供 async_op 句柄,因此将是一个阻塞调用。

警告

broadcast_object_list() 隐式使用 pickle 模块,已知此模块不安全。可以构造恶意 pickle 数据,在反序列化时执行任意代码。请仅使用您信任的数据调用此函数。

警告

使用 GPU 张量调用 broadcast_object_list() 不支持且效率低下,因为它会涉及 GPU 到 CPU 的传输,因为张量将被序列化。请考虑使用 broadcast() 代替。

示例::
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 3.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>> else:
>>>     objects = [None, None, None]
>>> # Assumes backend is not NCCL
>>> device = torch.device("cpu")
>>> dist.broadcast_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]
torch.distributed.all_reduce(tensor, op=, group=None, async_op=False)[source][source]

在所有机器上以相同的方式减少张量数据,使所有机器都获得最终结果。

调用 tensor 后,在所有进程中将进行位运算相同。

支持复杂张量。

参数:
  • 张量(Tensor)- 集体的输入和输出。该函数在原地操作。

  • op(可选)- torch.distributed.ReduceOp 枚举中的值之一。指定用于逐元素减少的操作。

  • group(ProcessGroup,可选)- 要工作的进程组。如果为 None,则使用默认进程组。

  • async_op(bool,可选)- 此操作是否应为异步操作

返回:

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不是组的一部分,则为 None

示例

>>> # All tensors below are of torch.int64 type.
>>> # We have 2 process groups, 2 ranks.
>>> device = torch.device(f"cuda:{rank}")
>>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4, 6], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1
>>> # All tensors below are of torch.cfloat type.
>>> # We have 2 process groups, 2 ranks.
>>> tensor = torch.tensor(
...     [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device
... ) + 2 * rank * (1 + 1j)
>>> tensor
tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0
tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1
torch.distributed.reduce(tensor, dst=None, op=<RedOpType.SUM: 0>, group=None, async_op=False, group_dst=None)[source][source]

在所有机器上对张量数据进行聚合。

只有进程编号为 dst 的进程将接收最终结果。

参数:
  • tensor (Tensor) – 集合的输入和输出。该函数在原地操作。

  • dst (int) – 全局进程组上的目标排名(无论是否有 group 参数)

  • op (可选) – torch.distributed.ReduceOp 枚举值之一。指定用于逐元素减少的操作。

  • group (ProcessGroup,可选) – 要工作的进程组。如果为 None,则使用默认进程组。

  • async_op (bool,可选) – 此操作是否应为异步操作

  • group_dst (int) – 目标排名在 group 上。必须指定 group_dstdst 中的一个,但不能同时指定两个。

返回:

异步工作处理,如果 async_op 设置为 True。如果没有 async_op 或不属于该组,则为 None。

torch.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[source][source]

将整个组中的张量收集到一个列表中。

支持复杂且尺寸不均匀的张量。

参数:
  • tensor_list(列表[Tensor])- 输出列表。它应包含正确尺寸的张量,用于集体输出。支持尺寸不均匀的张量。

  • tensor(Tensor)- 要从当前进程广播的张量。

  • group(进程组,可选)- 要工作的进程组。如果为 None,则使用默认进程组。

  • async_op (bool, 可选) – 是否将此操作设置为异步操作

返回:

如果 async_op 设置为 True,则处理异步工作。如果没有 async_op 或不属于该组,则为 None

示例

>>> # All tensors below are of torch.int64 dtype.
>>> # We have 2 process groups, 2 ranks.
>>> device = torch.device(f"cuda:{rank}")
>>> tensor_list = [
...     torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2)
... ]
>>> tensor_list
[tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0
[tensor([0, 0], device='cuda:1'), tensor([0, 0], device='cuda:1')] # Rank 1
>>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> dist.all_gather(tensor_list, tensor)
>>> tensor_list
[tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0
[tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1
>>> # All tensors below are of torch.cfloat dtype.
>>> # We have 2 process groups, 2 ranks.
>>> tensor_list = [
...     torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2)
... ]
>>> tensor_list
[tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0
[tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1
>>> tensor = torch.tensor(
...     [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device
... ) + 2 * rank * (1 + 1j)
>>> tensor
tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
>>> dist.all_gather(tensor_list, tensor)
>>> tensor_list
[tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0
[tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1
torch.distributed.all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False)[source][source]

从所有排名收集张量并将它们放入单个输出张量中。

此函数要求每个进程上的张量大小必须相同。

参数:
  • output_tensor(张量)- 用于容纳来自所有进程的张量元素的输出张量。它必须具有以下形式之一:(i)所有输入张量沿主维度的连接;有关“连接”的定义,请参阅 torch.cat() ;(ii)所有输入张量沿主维度的堆叠;有关“堆叠”的定义,请参阅 torch.stack() 。以下示例可能更好地解释所支持的输出形式。

  • input_tensor(张量)- 从当前进程收集的张量。与 all_gather API 不同,此 API 中的输入张量必须在所有进程中具有相同的大小。

  • group(进程组,可选)- 要工作的进程组。如果为 None,则使用默认进程组。

  • async_op (布尔值,可选) – 是否将此操作设置为异步操作

返回:

如果 async_op 设置为 True,则处理异步工作。如果没有 async_op 或不属于该组,则为 None

示例

>>> # All tensors below are of torch.int64 dtype and on CUDA devices.
>>> # We have two ranks.
>>> device = torch.device(f"cuda:{rank}")
>>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor_in
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> # Output in concatenation form
>>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device)
>>> dist.all_gather_into_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([1, 2, 3, 4], device='cuda:0') # Rank 0
tensor([1, 2, 3, 4], device='cuda:1') # Rank 1
>>> # Output in stack form
>>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device)
>>> dist.all_gather_into_tensor(tensor_out2, tensor_in)
>>> tensor_out2
tensor([[1, 2],
        [3, 4]], device='cuda:0') # Rank 0
tensor([[1, 2],
        [3, 4]], device='cuda:1') # Rank 1

警告

Gloo 后端不支持此 API。

torch.distributed.all_gather_object(object_list, obj, group=None)[source][source]

从整个组中收集可序列化的对象到一个列表中。

all_gather() 类似,但可以传入 Python 对象。注意,对象必须是可序列化的才能被收集。

参数:
  • object_list (列表[任意]) – 输出列表。它应该正确地调整大小以适应该集体的大小,并将包含输出。

  • obj (任意) – 从当前进程广播的可序列化 Python 对象。

  • group(进程组,可选)- 要工作的进程组。如果为 None,则使用默认进程组。默认为 None

返回:

如果调用方进程属于此组,则集体操作的输出将被填充到输入 object_list 。如果调用方进程不属于该组,则传入的 object_list 将保持不变。

注意

注意,此 API 与 all_gather() 集体操作略有不同,因为它不提供 async_op 句柄,因此将是一个阻塞调用。

注意

对于基于 NCCL 的进程组,在通信之前必须将对象的内部张量表示移动到 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,并且用户有责任确保这样设置,以便每个进程都有一个单独的 GPU,通过 torch.cuda.set_device()

警告

all_gather_object() 隐式使用 pickle 模块,已知存在安全风险。可能构造恶意 pickle 数据,在反序列化时执行任意代码。请仅使用您信任的数据调用此函数。

警告

使用 all_gather_object() 与 GPU 张量不兼容,效率低下,因为它会引发 GPU -> CPU 的数据传输,因为张量将被序列化。请考虑使用 all_gather() 代替。

示例::
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes world_size of 3.
>>> gather_objects = ["foo", 12, {1: 2}] # any picklable object
>>> output = [None for _ in gather_objects]
>>> dist.all_gather_object(output, gather_objects[dist.get_rank()])
>>> output
['foo', 12, {1: 2}]
torch.distributed.gather(tensor, gather_list=None, dst=None, group=None, async_op=False, group_dst=None)[source][source]

在单个进程中收集张量列表。

此函数要求每个进程上的张量大小必须相同。

参数:
  • 张量(Tensor)- 输入张量。

  • gather_list(可选列表[Tensor])- 用于收集数据的适当大小、相同大小的张量列表(默认为 None,必须在目标排名上指定)

  • dst(可选 int)- 全局进程组上的目标排名(无论 group 参数)。(如果 dstgroup_dst 都为 None,则默认为全局排名 0)

  • group(进程组,可选)- 要工作的进程组。如果为 None,则使用默认进程组。

  • async_op(布尔值,可选)- 此操作是否应为异步操作

  • group_dst(整数,可选)- 目标 rank 在 group 上。同时指定 dstgroup_dst 无效

返回:

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不是组的一部分,则为 None

注意

注意,gather_list 中的所有张量必须具有相同的大小。

示例::
>>> # We have 2 process groups, 2 ranks.
>>> tensor_size = 2
>>> device = torch.device(f'cuda:{rank}')
>>> tensor = torch.ones(tensor_size, device=device) + rank
>>> if dist.get_rank() == 0:
>>>     gather_list = [torch.zeros_like(tensor, device=device) for i in range(2)]
>>> else:
>>>     gather_list = None
>>> dist.gather(tensor, gather_list, dst=0)
>>> # Rank 0 gets gathered data.
>>> gather_list
[tensor([1., 1.], device='cuda:0'), tensor([2., 2.], device='cuda:0')] # Rank 0
None                                                                   # Rank 1
torch.distributed.gather_object(obj, object_gather_list=None, dst=None, group=None, group_dst=None)[source][source]

从整个组中收集可序列化的对象到单个进程中。

gather() 类似,但可以传递 Python 对象。请注意,对象必须可序列化才能被收集。

参数:
  • obj (Any) – 输入对象。必须是可序列化的。

  • object_gather_list (list[Any]) – 输出列表。在 dst 排序上,它应该与该集体的大小正确匹配,并将包含输出。在非目标节点上必须是 None 。(默认为 None

  • dst (int, optional) – 全局进程组上的目标节点(无论 group 参数)。(如果 dstgroup_dst 都为 None,则默认为全局节点 0)

  • group (Optional[ProcessGroup]) – (进程组,可选):要工作的进程组。如果为 None,则使用默认进程组。默认为 None

  • group_dst (int, 可选) – 目标秩在 group 上。同时指定 dstgroup_dst 是无效的。

返回:

无。在 dst 秩上, object_gather_list 将包含集体的输出。

注意

注意,此 API 与 gather 集体略有不同,因为它不提供 async_op 处理句柄,因此将是一个阻塞调用。

注意

对于基于 NCCL 的处理组,在通信之前,对象的内部张量表示必须移动到 GPU 设备上。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,并且确保每个秩有一个单独的 GPU 是用户的责任,通过 torch.cuda.set_device() 实现。

警告

gather_object() 隐式使用 pickle 模块,已知存在安全风险。可能构造恶意 pickle 数据,在反序列化时执行任意代码。请仅使用您信任的数据调用此函数。

警告

使用 gather_object() 与 GPU 张量不兼容,效率低下,因为它会引发 GPU -> CPU 的数据传输,因为张量将被序列化。请考虑使用 gather() 代替。

示例::
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes world_size of 3.
>>> gather_objects = ["foo", 12, {1: 2}] # any picklable object
>>> output = [None for _ in gather_objects]
>>> dist.gather_object(
...     gather_objects[dist.get_rank()],
...     output if dist.get_rank() == 0 else None,
...     dst=0
... )
>>> # On rank 0
>>> output
['foo', 12, {1: 2}]
torch.distributed.scatter(tensor, scatter_list=None, src=None, group=None, async_op=False, group_src=None)[source][source]

将张量列表分散到组中的所有进程中。

每个进程将恰好接收一个张量,并将数据存储在 tensor 参数中。

支持复杂张量。

参数:
  • 张量(Tensor)- 输出张量。

  • scatter_list(列表[Tensor])- 要分散的张量列表(默认为 None,必须在源 rank 上指定)。

  • src(int)- 全局进程组上的源排名(无论是否有 group 参数)。(如果 srcgroup_src 都为 None,则默认为全局排名 0)

  • group(ProcessGroup,可选)- 要工作的进程组。如果为 None,则使用默认进程组。

  • async_op(bool,可选)- 此操作是否应为异步操作

  • group_src(int,可选)- 在 group 上的源排名。不能同时指定 srcgroup_src

返回:

异步工作处理,如果 async_op 设置为 True。如果没有 async_op 或者不属于该组,则为 None

注意

注意 scatter_list 中的所有张量必须具有相同的大小。

示例::
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> tensor_size = 2
>>> device = torch.device(f'cuda:{rank}')
>>> output_tensor = torch.zeros(tensor_size, device=device)
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 2.
>>>     # Only tensors, all of which must be the same size.
>>>     t_ones = torch.ones(tensor_size, device=device)
>>>     t_fives = torch.ones(tensor_size, device=device) * 5
>>>     scatter_list = [t_ones, t_fives]
>>> else:
>>>     scatter_list = None
>>> dist.scatter(output_tensor, scatter_list, src=0)
>>> # Rank i gets scatter_list[i].
>>> output_tensor
tensor([1., 1.], device='cuda:0') # Rank 0
tensor([5., 5.], device='cuda:1') # Rank 1
torch.distributed.scatter_object_list(scatter_object_output_list, scatter_object_input_list=None, src=None, group=None, group_src=None)[source][source]

将可序列化的对象在 scatter_object_input_list 中分散到整个组。

scatter() 类似,但可以传入 Python 对象。在每个 rank 上,分散的对象将存储在 scatter_object_output_list 的第一个元素中。请注意, scatter_object_input_list 中的所有对象都必须是可序列化的,以便进行分散。

参数:
  • scatter_object_output_list(Any 类型的列表)- 非空列表,其第一个元素将存储分散到该 rank 的对象。

  • scatter_object_input_list(Any 类型的列表,可选)- 要分散的输入对象列表。每个对象都必须是可序列化的。只有 src rank 上的对象将被分散,对于非 src rank,参数可以是 None

  • src(整数)- 分散 scatter_object_input_list 的源 rank。源 rank 基于全局进程组(不考虑 group 参数)。(如果 srcgroup_src 都为 None,则默认为全局 rank 0)

  • group (Optional[ProcessGroup]) – (ProcessGroup, optional): 要工作的进程组。如果为 None,则使用默认进程组。默认为 None

  • group_src (int, optional) – 在 group 上的源排名。无效同时指定 srcgroup_src

返回:

None 。如果排名属于该组, scatter_object_output_list 将将其第一个元素设置为该排名的散列对象。

注意

注意,此 API 与 scatter 集合体略有不同,因为它不提供 async_op 处理器,因此将是一个阻塞调用。

警告

scatter_object_list() 隐式使用 pickle 模块,已知存在安全风险。可能构造恶意 pickle 数据,在反序列化时执行任意代码。请仅使用您信任的数据调用此函数。

警告

使用 scatter_object_list() 与 GPU 张量不兼容,效率低下,因为它会引发 GPU -> CPU 的数据传输,因为张量将被序列化。请考虑使用 scatter() 代替。

示例::
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 3.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>> else:
>>>     # Can be any list on non-src ranks, elements are not used.
>>>     objects = [None, None, None]
>>> output_list = [None]
>>> dist.scatter_object_list(output_list, objects, src=0)
>>> # Rank i gets objects[i]. For example, on rank 2:
>>> output_list
[{1: 2}]
torch.distributed.reduce_scatter(output, input_list, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source][source]

将张量列表聚合后分散到组中的所有进程。

参数:
  • 输出(张量)- 输出张量。

  • 输入列表(list[Tensor])- 要进行归约和散列的张量列表。

  • op(可选)- 来自 torch.distributed.ReduceOp 枚举值之一。指定用于逐元素归约的操作。

  • group(ProcessGroup,可选)- 要工作的进程组。如果为 None,则使用默认进程组。

  • async_op (布尔值,可选) – 是否将此操作设置为异步操作。

返回:

如果 async_op 设置为 True,则处理异步工作。如果不是 async_op 或不属于该组,则为 None。

torch.distributed.reduce_scatter_tensor(output, input, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source][source]

将张量减少后分散到组内所有进程。

参数:
  • 输出(张量)- 输出张量。它应该在所有进程中具有相同的大小。

  • 输入(张量)- 需要减少和分散的输入张量。其大小应为输出张量大小乘以全局大小。输入张量可以具有以下形状之一:(i)输出张量在主维度上的连接,或(ii)输出张量在主维度上的堆叠。有关“连接”的定义,请参阅 torch.cat() 。有关“堆叠”的定义,请参阅 torch.stack()

  • 组(ProcessGroup,可选)- 要工作的进程组。如果为 None,则使用默认进程组。

  • async_op (布尔值,可选) – 是否将此操作设置为异步操作。

返回:

如果 async_op 设置为 True,则处理异步工作。如果不是 async_op 或不属于该组,则为 None。

示例

>>> # All tensors below are of torch.int64 dtype and on CUDA devices.
>>> # We have two ranks.
>>> device = torch.device(f"cuda:{rank}")
>>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device)
>>> # Input in concatenation form
>>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device)
>>> tensor_in
tensor([0, 1, 2, 3], device='cuda:0') # Rank 0
tensor([0, 1, 2, 3], device='cuda:1') # Rank 1
>>> dist.reduce_scatter_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([0, 2], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1
>>> # Input in stack form
>>> tensor_in = torch.reshape(tensor_in, (world_size, 2))
>>> tensor_in
tensor([[0, 1],
        [2, 3]], device='cuda:0') # Rank 0
tensor([[0, 1],
        [2, 3]], device='cuda:1') # Rank 1
>>> dist.reduce_scatter_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([0, 2], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1

警告

Gloo 后端不支持此 API。

torch.distributed.all_to_all_single(output, input, output_split_sizes=None, input_split_sizes=None, group=None, async_op=False)[source][source]

将输入张量分割,然后将分割后的列表分散到组中的所有进程中。

之后,从组中的所有进程中拼接接收到的张量,并返回单个输出张量。

支持复杂张量。

参数:
  • 输出(张量)- 收集拼接的输出张量。

  • 输入(张量)- 要散列的输入张量。

  • output_split_sizes - (可选的 Int 列表):如果指定,则为 dim 0 的输出分割大小,如果没有指定或为空,则 output 张量的 dim 0 必须能被 world_size 整除。

  • input_split_sizes - (可选的 Int 列表):如果指定,则为 dim 0 的输入分割大小,如果没有指定或为空,则 input 张量的 dim 0 必须能被 world_size 整除。

  • group(进程组,可选)- 要工作的进程组。如果没有指定,将使用默认进程组。

  • async_op (布尔值,可选) – 是否将此操作设置为异步操作。

返回:

如果 async_op 设置为 True,则处理异步工作。如果不是 async_op 或不属于该组,则为 None。

警告

all_to_all_single 是实验性的,可能会更改。

示例

>>> input = torch.arange(4) + rank * 4
>>> input
tensor([0, 1, 2, 3])     # Rank 0
tensor([4, 5, 6, 7])     # Rank 1
tensor([8, 9, 10, 11])   # Rank 2
tensor([12, 13, 14, 15]) # Rank 3
>>> output = torch.empty([4], dtype=torch.int64)
>>> dist.all_to_all_single(output, input)
>>> output
tensor([0, 4, 8, 12])    # Rank 0
tensor([1, 5, 9, 13])    # Rank 1
tensor([2, 6, 10, 14])   # Rank 2
tensor([3, 7, 11, 15])   # Rank 3
>>> # Essentially, it is similar to following operation:
>>> scatter_list = list(input.chunk(world_size))
>>> gather_list = list(output.chunk(world_size))
>>> for i in range(world_size):
>>>     dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i)
>>> # Another example with uneven split
>>> input
tensor([0, 1, 2, 3, 4, 5])                                       # Rank 0
tensor([10, 11, 12, 13, 14, 15, 16, 17, 18])                     # Rank 1
tensor([20, 21, 22, 23, 24])                                     # Rank 2
tensor([30, 31, 32, 33, 34, 35, 36])                             # Rank 3
>>> input_splits
[2, 2, 1, 1]                                                     # Rank 0
[3, 2, 2, 2]                                                     # Rank 1
[2, 1, 1, 1]                                                     # Rank 2
[2, 2, 2, 1]                                                     # Rank 3
>>> output_splits
[2, 3, 2, 2]                                                     # Rank 0
[2, 2, 1, 2]                                                     # Rank 1
[1, 2, 1, 2]                                                     # Rank 2
[1, 2, 1, 1]                                                     # Rank 3
>>> output = ...
>>> dist.all_to_all_single(output, input, output_splits, input_splits)
>>> output
tensor([ 0,  1, 10, 11, 12, 20, 21, 30, 31])                     # Rank 0
tensor([ 2,  3, 13, 14, 22, 32, 33])                             # Rank 1
tensor([ 4, 15, 16, 23, 34, 35])                                 # Rank 2
tensor([ 5, 17, 18, 24, 36])                                     # Rank 3
>>> # Another example with tensors of torch.cfloat type.
>>> input = torch.tensor(
...     [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j], dtype=torch.cfloat
... ) + 4 * rank * (1 + 1j)
>>> input
tensor([1+1j, 2+2j, 3+3j, 4+4j])                                # Rank 0
tensor([5+5j, 6+6j, 7+7j, 8+8j])                                # Rank 1
tensor([9+9j, 10+10j, 11+11j, 12+12j])                          # Rank 2
tensor([13+13j, 14+14j, 15+15j, 16+16j])                        # Rank 3
>>> output = torch.empty([4], dtype=torch.int64)
>>> dist.all_to_all_single(output, input)
>>> output
tensor([1+1j, 5+5j, 9+9j, 13+13j])                              # Rank 0
tensor([2+2j, 6+6j, 10+10j, 14+14j])                            # Rank 1
tensor([3+3j, 7+7j, 11+11j, 15+15j])                            # Rank 2
tensor([4+4j, 8+8j, 12+12j, 16+16j])                            # Rank 3
torch.distributed.all_to_all(output_tensor_list, input_tensor_list, group=None, async_op=False)[source][source]

将输入张量列表分散到组中的所有进程中,并返回输出列表中收集的张量列表。

支持复杂张量。

参数:
  • output_tensor_list(列表[Tensor])- 每个 rank 收集一个张量的张量列表。

  • input_tensor_list(列表[Tensor])- 每个 rank 分散一个张量的张量列表。

  • group(进程组,可选)- 要工作的进程组。如果为 None,则使用默认进程组。

  • async_op(布尔值,可选)- 此操作是否应为异步操作。

返回:

如果 async_op 设置为 True,则为异步工作处理。如果不是 async_op 或不是组的一部分,则为 None。

警告

all_to_all 是实验性的,可能会更改。

示例

>>> input = torch.arange(4) + rank * 4
>>> input = list(input.chunk(4))
>>> input
[tensor([0]), tensor([1]), tensor([2]), tensor([3])]     # Rank 0
[tensor([4]), tensor([5]), tensor([6]), tensor([7])]     # Rank 1
[tensor([8]), tensor([9]), tensor([10]), tensor([11])]   # Rank 2
[tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3
>>> output = list(torch.empty([4], dtype=torch.int64).chunk(4))
>>> dist.all_to_all(output, input)
>>> output
[tensor([0]), tensor([4]), tensor([8]), tensor([12])]    # Rank 0
[tensor([1]), tensor([5]), tensor([9]), tensor([13])]    # Rank 1
[tensor([2]), tensor([6]), tensor([10]), tensor([14])]   # Rank 2
[tensor([3]), tensor([7]), tensor([11]), tensor([15])]   # Rank 3
>>> # Essentially, it is similar to following operation:
>>> scatter_list = input
>>> gather_list = output
>>> for i in range(world_size):
>>>     dist.scatter(gather_list[i], scatter_list if i == rank else [], src=i)
>>> input
tensor([0, 1, 2, 3, 4, 5])                                       # Rank 0
tensor([10, 11, 12, 13, 14, 15, 16, 17, 18])                     # Rank 1
tensor([20, 21, 22, 23, 24])                                     # Rank 2
tensor([30, 31, 32, 33, 34, 35, 36])                             # Rank 3
>>> input_splits
[2, 2, 1, 1]                                                     # Rank 0
[3, 2, 2, 2]                                                     # Rank 1
[2, 1, 1, 1]                                                     # Rank 2
[2, 2, 2, 1]                                                     # Rank 3
>>> output_splits
[2, 3, 2, 2]                                                     # Rank 0
[2, 2, 1, 2]                                                     # Rank 1
[1, 2, 1, 2]                                                     # Rank 2
[1, 2, 1, 1]                                                     # Rank 3
>>> input = list(input.split(input_splits))
>>> input
[tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])]                   # Rank 0
[tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1
[tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])]                 # Rank 2
[tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])]         # Rank 3
>>> output = ...
>>> dist.all_to_all(output, input)
>>> output
[tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])]   # Rank 0
[tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])]           # Rank 1
[tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])]              # Rank 2
[tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])]                  # Rank 3
>>> # Another example with tensors of torch.cfloat type.
>>> input = torch.tensor(
...     [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j], dtype=torch.cfloat
... ) + 4 * rank * (1 + 1j)
>>> input = list(input.chunk(4))
>>> input
[tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])]            # Rank 0
[tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])]            # Rank 1
[tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])]      # Rank 2
[tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])]    # Rank 3
>>> output = list(torch.empty([4], dtype=torch.int64).chunk(4))
>>> dist.all_to_all(output, input)
>>> output
[tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])]          # Rank 0
[tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])]        # Rank 1
[tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])]        # Rank 2
[tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])]        # Rank 3
torch.distributed.barrier(group=None, async_op=False, device_ids=None)[source][source]

同步所有进程。

如果 async_op 为 False,或者如果在 wait()上调用异步工作句柄,则此集体操作将阻塞进程,直到整个组进入此函数。

参数:
  • group (进程组,可选) – 要工作的进程组。如果为 None,则使用默认进程组。

  • async_op (bool, 可选) – 是否将此操作设置为异步操作

  • device_ids ([int], 可选) – 设备/GPU ID 列表。

返回:

如果 async_op 设置为 True,则为异步工作句柄。如果没有 async_op 或不属于该组,则为 None

注意

ProcessGroupNCCL 现在会阻塞 CPU 线程,直到屏障集体操作完成。

torch.distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False)[source][source]

同步进程类似于 torch.distributed.barrier ,但考虑可配置的超时时间。

能够报告在提供超时时间内未通过此屏障的进程编号。具体来说,对于非零编号,将阻塞直到从编号 0 的进程发送/接收操作完成。编号 0 将阻塞直到其他所有编号的发送/接收操作完成,并将报告未及时响应的编号。请注意,如果某个编号没有达到 monitored_barrier(例如由于挂起),则所有其他编号都会在 monitored_barrier 中失败。

此集体操作将阻塞组中的所有进程/编号,直到整个组成功退出函数,这使得它在调试和同步方面非常有用。然而,它可能会影响性能,并且仅应用于调试或需要主机端完全同步点的场景。为了调试目的,可以在应用程序的集体调用之前插入此屏障,以检查是否有编号不同步。

注意

注意,此集合仅支持使用 GLOO 后端。

参数:
  • group(进程组,可选)- 要工作的进程组。如果为 None ,则使用默认进程组。

  • timeout(datetime.timedelta,可选)- 监控屏障的超时时间。如果为 None ,则使用默认进程组超时时间。

  • wait_all_ranks(bool,可选)- 是否收集所有失败的进程。默认情况下,这是 Falsemonitored_barrier 在 rank 0 上将抛出第一个遇到的失败进程以快速失败。通过设置 wait_all_ranks=True monitored_barrier 将收集所有失败的进程并抛出一个包含所有失败进程信息的错误。

返回:

None.

示例::
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> if dist.get_rank() != 1:
>>>     dist.monitored_barrier() # Raises exception indicating that
>>> # rank 1 did not call into monitored_barrier.
>>> # Example with wait_all_ranks=True
>>> if dist.get_rank() == 0:
>>>     dist.monitored_barrier(wait_all_ranks=True) # Raises exception
>>> # indicating that ranks 1, 2, ... world_size - 1 did not call into
>>> # monitored_barrier.
class torch.distributed.Work

一个 Work 对象代表 PyTorch 分布式包中挂起的异步操作的句柄。它由非阻塞的集体操作返回,例如 dist.all_reduce(tensor, async_op=True)。

boxed(self: torch._C._distributed_c10d.Work) object
exception(self: torch._C._distributed_c10d.Work) std::__exception_ptr::exception_ptr
get_future(self:torch._C._distributed_c10d.Work) → torch.Future
返回:

一个与 Work 完成相关的 torch.futures.Future 对象。例如,可以通过 fut = process_group.allreduce(tensors).get_future() 获取 future 对象。

示例::

下面是一个使用 get_future` API to retrieve a Future associated with the completion of ``allreduce 的简单 allreduce DDP 通信钩子的示例。

>>> def allreduce(process_group: dist.ProcessGroup, bucket: dist.GradBucket): -> torch.futures.Future
>>>     group_to_use = process_group if process_group is not None else torch.distributed.group.WORLD
>>>     tensor = bucket.buffer().div_(group_to_use.size())
>>>     return torch.distributed.all_reduce(tensor, group=group_to_use, async_op=True).get_future()
>>> ddp_model.register_comm_hook(state=None, hook=allreduce)

警告

get_future API 支持 NCCL,部分支持 GLOO 和 MPI 后端(不支持像 send/recv 这样的对等操作)并将返回一个 torch.futures.Future

在上面的示例中, allreduce 将使用 NCCL 后端在 GPU 上执行工作, fut.wait() 将在将适当的 NCCL 流与 PyTorch 当前设备流同步后返回,以确保我们可以进行异步 CUDA 执行,并且它不会等待整个操作在 GPU 上完成。请注意, CUDAFuture 不支持 TORCH_NCCL_BLOCKING_WAIT 标志或 NCCL 的 barrier() 。此外,如果通过 fut.then() 添加了回调函数,它将等待 WorkNCCL 的 NCCL 流与 ProcessGroupNCCL 的专用回调流同步,并在回调流上运行回调后立即调用回调。 fut.then() 将返回另一个 CUDAFuture ,该 CUDAFuture 包含回调的返回值和一个记录回调流的 CUDAEvent

  1. 对于 CPU 工作, fut.done() 当工作完成且 value() 张量准备就绪时返回 true。

  2. 对于 GPU 工作, fut.done() 只有在操作已入队时才返回 true。

  3. 对于混合 CPU-GPU 工作(例如,使用 GLOO 发送 GPU 张量), fut.done() 当张量到达各自的节点时返回 true,但还不一定在各自的 GPU 上同步(类似于 GPU 工作)。

get_future_result(self:torch._C._distributed_c10d.Work) → torch.Future
返回:

A torch.futures.Future object of int type which maps to the enum type of WorkResult. As an example, a future object can be retrieved by fut = process_group.allreduce(tensor).get_future_result() .

示例::

用户可以使用 fut.wait() 阻塞等待工作完成,并通过 fut.value() 获取 WorkResult。此外,用户还可以使用 fut.then(call_back_func) 注册一个回调函数,在工作完成时调用,而不会阻塞当前线程。

警告

get_future_result API 支持 NCCL

is_completed(self: torch._C._distributed_c10d.Work) bool
is_success(self: torch._C._distributed_c10d.Work) bool
result(self: torch._C._distributed_c10d.Work) list[torch.Tensor]
source_rank(self: torch._C._distributed_c10d.Work) int
synchronize(self:torch._C._distributed_c10d.Work) → None
静态解包(arg0object) → torch._C._distributed_c10d.Work
wait(self:torch._C._distributed_c10d.Work, timeoutdatetime.timedelta=datetime.timedelta(0)) → bool
返回:

true/false.

示例::
尝试:

work.wait(timeout)

except:

# 处理中

警告

在正常情况下,用户无需设置超时。调用 wait() 与调用 synchronize() 相同:让当前流在 NCCL 工作完成时阻塞。然而,如果设置了超时,它将阻塞 CPU 线程,直到 NCCL 工作完成或超时。如果超时,将抛出异常。

class torch.distributed.ReduceOp

用于可用归约操作的枚举类: SUMPRODUCTMINMAXBANDBORBXOR ,以及 PREMUL_SUM

使用 NCCL 后端时, BANDBORBXOR 的缩减不可用。

AVG 在求和前将值除以世界大小。 AVG 仅在 NCCL 后端可用,并且仅适用于 NCCL 2.10 或更高版本。

PREMUL_SUM 在本地将输入乘以给定的标量后再进行缩减。 PREMUL_SUM 仅在 NCCL 后端可用,并且仅适用于 NCCL 2.11 或更高版本。用户应使用 torch.distributed._make_nccl_premul_sum

此外, MAXMINPRODUCT 不支持复数张量。

这个类的值可以作为属性访问,例如, ReduceOp.SUM 。它们用于指定减少集体策略,例如, reduce()

此类不支持 __members__ 属性。

torch.distributed.reduce_op 类

已废弃的枚举类,用于减少操作: SUMPRODUCTMIN ,和 MAX

建议使用 ReduceOp 代替。

分布式键值存储 ¶

分布式包自带分布式键值存储,可用于在组内进程间共享信息,以及初始化分布式包在 torch.distributed.init_process_group() (通过显式创建存储作为指定 init_method 的替代。)键值存储有 3 种选择: TCPStoreFileStore ,和 HashStore

class torch.distributed.Store

所有存储实现的基类,例如 PyTorch 分布式提供的 3 个:( TCPStoreFileStoreHashStore )。

__init__(self:torch._C._distributed_c10d.Store) → None
add(self:torch._C._distributed_c10d.Store, arg0str, arg1int) → int

对于给定的 key ,第一次调用 add 将在存储中创建与 key 关联的计数器,初始化为 amount 。随后的调用 add 与相同的 key 将计数器增加指定的 amount 。如果使用已通过 set() 在存储中设置的键调用 add() ,将导致异常。

参数:
  • key (str) – 存储中的键,其计数将被增加。

  • amount (int) – 增加计数器的数量。

示例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.add("first_key", 1)
>>> store.add("first_key", 6)
>>> # Should return 7
>>> store.get("first_key")
append(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None

将键值对根据提供的 keyvalue 追加到存储中。如果 key 不存在于存储中,则将其创建。

参数:
  • key (str) – 要附加到存储中的键。

  • value (str) – 与 key 关联的值,要添加到存储中。

示例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.append("first_key", "po")
>>> store.append("first_key", "tato")
>>> # Should return "potato"
>>> store.get("first_key")
check(self: torch._C._distributed_c10d.Store, arg0: list[str]) bool

检查调用,以确定给定的 keys 列表中是否有值存储在存储中。在正常情况下,此调用将立即返回,但仍可能遇到一些边缘死锁情况,例如在 TCPStore 被销毁后调用 check。使用 check() 调用要检查是否存储在存储中的键列表。

参数:

keys (lisr[str]) – 查询是否存储在存储中的键。

示例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.add("first_key", 1)
>>> # Should return 7
>>> store.check(["first_key"])
compare_set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str, arg2: str) bytes

根据提供的 key 将键值对插入存储中,并在插入前对 expected_valuedesired_value 进行比较。如果 keyexpected_value 已在存储中存在或如果 expected_value 为空字符串,则 desired_value 将被设置。

参数:
  • key (str) – 要在存储中检查的键。

  • expected_value (str) – 要在插入前检查的与 key 关联的值。

  • desired_value (str) – 要添加到存储中的与 key 关联的值。

示例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("key", "first_value")
>>> store.compare_set("key", "first_value", "second_value")
>>> # Should return "second_value"
>>> store.get("key")
delete_key(self: torch._C._distributed_c10d.Store, arg0: str) bool

从存储中删除与 key 关联的键值对。如果键成功删除,则返回 true,否则返回 false。

警告

delete_key API 仅支持 TCPStoreHashStore 。使用此 API 与 FileStore 将导致异常。

参数:

key(字符串)- 要从存储中删除的键

返回:

如果 key 已被删除,则为 True,否则为 False。

示例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, HashStore can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key")
>>> # This should return true
>>> store.delete_key("first_key")
>>> # This should return false
>>> store.delete_key("bad_key")
get(self: torch._C._distributed_c10d.Store, arg0: str) bytes

获取与给定 key 在存储中关联的值。如果 key 不在存储中,函数将在抛出异常之前等待 timeout ,该值在初始化存储时定义。

参数:

key (str) – 函数将返回与此键关联的值。

返回:

如果 key 在存储中,则与 key 关联的值。

示例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "first_value")
>>> # Should return "first_value"
>>> store.get("first_key")
has_extended_api(self: torch._C._distributed_c10d.Store) bool

返回 true 如果存储支持扩展操作。

multi_get(self: torch._C._distributed_c10d.Store, arg0: list[str]) list[bytes]

获取 keys 中的所有值。如果 keys 中的任何键不在存储中,函数将等待 timeout

参数:

keys (List[str]) – 要从存储中检索的键。

示例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "po")
>>> store.set("second_key", "tato")
>>> # Should return [b"po", b"tato"]
>>> store.multi_get(["first_key", "second_key"])
multi_set(self: torch._C._distributed_c10d.Store, arg0: list[str], arg1: list[str]) None

根据提供的 keysvalues 将键值对列表插入存储中。

参数:
  • keys (List[str]) – 要插入的键。

  • values (List[str]) – 要插入的值列表。

示例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.multi_set(["first_key", "second_key"], ["po", "tato"])
>>> # Should return b"po"
>>> store.get("first_key")
num_keys(self: torch._C._distributed_c10d.Store) int

返回存储中设置的键的数量。请注意,这个数字通常比通过 set()add() 添加的键的数量多一个,因为有一个键用于协调使用存储的所有工作者。

警告

当与 TCPStore 结合使用时, num_keys 返回写入底层文件的键的数量。如果销毁存储并使用同一文件创建另一个存储,则原始键将被保留。

返回:

当前存储中键的数量。

示例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "first_value")
>>> # This should return 2
>>> store.num_keys()
设置(self:torch._C._distributed_c10d.Store, arg0str, arg1str) → None ¶

将键值对插入到根据提供的 keyvalue 指定的存储中。如果 key 已在存储中存在,则将使用新提供的 value 覆盖旧值。

参数:
  • 键(字符串)- 要添加到存储的键。

  • value (str) – 与 key 关联要添加到存储中的值。

示例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "first_value")
>>> # Should return "first_value"
>>> store.get("first_key")
set_timeout(self: torch._C._distributed_c10d.Store, arg0: datetime.timedelta) None

设置存储的默认超时。此超时在初始化以及 wait()get() 中使用。

参数:

timeout (timedelta) – 要在存储中设置的超时。

示例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set_timeout(timedelta(seconds=10))
>>> # This will throw an exception after 10 seconds
>>> store.wait(["bad_key"])
属性超时 ¶

获取存储的超时时间。

wait(*args, **kwargs)

重载函数。

  1. wait(self: torch._C._distributed_c10d.Store, arg0: list[str]) -> None

等待 keys 中的每个键被添加到存储中。如果 timeout (在存储初始化期间设置)之前没有设置所有键,则 wait 将抛出异常。

参数:

keys (list) – 等待直到这些键在存储中设置好的键列表。

示例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> # This will throw an exception after 30 seconds
>>> store.wait(["bad_key"])
  1. wait(self: torch._C._distributed_c10d.Store, arg0: list[str], arg1: datetime.timedelta) -> None

等待每个键在 keys 中被添加到存储中,如果键未被通过 timeout 设置,则抛出异常。

参数:
  • keys(列表)- 等待直到这些键在存储中设置。

  • timeout(timedelta)- 在抛出异常之前等待键被添加的时间。

示例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> # This will throw an exception after 10 seconds
>>> store.wait(["bad_key"], timedelta(seconds=10))
类 torch.distributed.TCPStore

基于 TCP 的分布式键值存储实现。服务器存储持有数据,而客户端存储可以通过 TCP 连接到服务器存储,执行插入键值对、检索键值对等操作。应始终初始化一个服务器存储,因为客户端存储将等待服务器建立连接。

参数:
  • host_name (str) – 服务器存储应运行的计算机名或 IP 地址。

  • port (int) – 服务器存储应监听请求的端口号。

  • world_size (int, optional) – 存储用户总数(客户端数量加 1 为服务器)。默认为 None(None 表示非固定数量的存储用户)。

  • is_master (bool, optional) – 当初始化服务器存储时为 True,客户端存储时为 False。默认为 False。

  • timeout (timedelta, optional) – 存储在初始化期间以及用于 get()wait() 等方法时的超时时间。默认为 timedelta(seconds=300)。

  • wait_for_workers (bool, optional) – 是否等待所有工作进程连接到服务器存储。仅在 world_size 为固定值时适用。默认为 True。

  • multi_tenant (bool, optional) – 如果为 True,当前进程中的所有具有相同主机/端口的 TCPStore 实例将使用相同的底层 TCPServer 。默认为 False。

  • master_listen_fd (int, 可选) – 如果指定,底层 TCPServer 将监听此文件描述符,它必须是一个已绑定到 port 的套接字。在某些场景中,这有助于避免端口分配竞争。默认值为 None(表示服务器创建一个新的套接字并尝试将其绑定到 port )。

  • use_libuv (bool, 可选) – 如果为 True,则使用 libuv 作为 TCPServer 后端。默认值为 True。

示例::
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Run on process 1 (server)
>>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30))
>>> # Run on process 2 (client)
>>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False)
>>> # Use any of the store methods from either the client or server after initialization
>>> server_store.set("first_key", "first_value")
>>> client_store.get("first_key")
__init__(self: torch._C._distributed_c10d.TCPStore, host_name: str, port: int, world_size: Optional[int] = None, is_master: bool = False, timeout: datetime.timedelta = datetime.timedelta(seconds=300), wait_for_workers: bool = True, multi_tenant: bool = False, master_listen_fd: Optional[int] = None, use_libuv: bool = True) None

创建一个新的 TCPStore。

属性主机

获取商店监听请求的主机名。

属性 libuvBackend

如果使用 libuv 后端,则返回 True。

属性端口 ¶

获取商店监听请求的端口号。

类 torch.distributed.HashStore ¶

基于底层哈希表的线程安全存储实现。此存储可以在同一进程内使用(例如,由其他线程使用),但不能跨进程使用。

示例::
>>> import torch.distributed as dist
>>> store = dist.HashStore()
>>> # store can be used from other threads
>>> # Use any of the store methods after initialization
>>> store.set("first_key", "first_value")
__init__(self: torch._C._distributed_c10d.HashStore) None

创建一个新的 HashStore。

class torch.distributed.FileStore

使用文件存储底层键值对的存储实现。

参数:
  • 文件名(str)- 存储键值对的文件路径

  • world_size(int,可选)- 使用存储的总进程数。默认为-1(负值表示非固定数量的存储用户)

示例::
>>> import torch.distributed as dist
>>> store1 = dist.FileStore("/tmp/filestore", 2)
>>> store2 = dist.FileStore("/tmp/filestore", 2)
>>> # Use any of the store methods from either the client or server after initialization
>>> store1.set("first_key", "first_value")
>>> store2.get("first_key")
__init__(self:torch._C._distributed_c10d.FileStore, file_name: str, world_size: int = -1) → None ¶

创建一个新的 FileStore。

属性路径

获取 FileStore 存储键值对所使用的文件路径。

类 torch.distributed.PrefixStore

对 3 个键值存储( TCPStoreFileStoreHashStore )中的任何一个进行包装,为存储中插入的每个键添加前缀。

参数:
  • prefix (str) – 在每个键插入存储之前附加的前缀字符串。

  • store (torch.distributed.store) – 构成底层键值存储的存储对象。

__init__(self: torch._C._distributed_c10d.PrefixStore, prefix: str, store: torch._C._distributed_c10d.Store) None

创建一个新的 PrefixStore。

属性 underlying_store ¶

获取 PrefixStore 封装的底层存储对象。

集体通信分析 ¶

注意,您可以使用 torch.profiler (推荐,仅在 1.8.1 之后可用)或 torch.autograd.profiler 来分析此处提到的集体通信和点对点通信 API。所有开箱即用的后端( glooncclmpi )都受支持,并且集体通信的使用将在分析输出/跟踪中按预期呈现。分析您的代码与任何常规 torch 操作相同:

import torch
import torch.distributed as dist
with torch.profiler():
    tensor = torch.randn(20, 10)
    dist.all_reduce(tensor)

请参阅分析器文档以全面了解分析器功能。

多 GPU 集体函数 ¶

警告

多 GPU 函数(代表每个 CPU 线程有多个 GPU)已被弃用。截至今天,PyTorch 分布式首选的编程模型是每个线程一个设备,如本文档中的 API 所示。如果您是后端开发者并希望支持每个线程多个设备,请联系 PyTorch 分布式维护者。

第三方后端 ¶

除了内置的 GLOO/MPI/NCCL 后端,PyTorch 分布式还通过运行时注册机制支持第三方后端。有关如何通过 C++ 扩展开发第三方后端的参考,请参阅教程 - 自定义 C++ 和 CUDA 扩展和 test/cpp_extensions/cpp_c10d_extension.cpp 。第三方后端的能力由它们自己的实现决定。

新后端继承自 c10d::ProcessGroup ,并在导入时通过 torch.distributed.Backend.register_backend() 注册后端名称和实例化接口。

当手动导入此后端并使用相应的后端名称调用 torch.distributed.init_process_group() 时, torch.distributed 包将在此新后端上运行。

警告

第三方后端的支持是实验性的,可能随时更改。

启动实用程序

torch.distributed 包还提供了 torch.distributed.launch 的启动实用程序。这个辅助实用程序可以用于在每个节点上启动多个进程以进行分布式训练。

模块 torch.distributed.launch

torch.distributed.launch 是一个模块,可以在每个训练节点上启动多个分布式训练进程。

警告

此模块将因 torchrun 而弃用。

该实用程序可用于单节点分布式训练,其中每个节点将启动一个或多个进程。该实用程序可用于 CPU 训练或 GPU 训练。如果用于 GPU 训练,每个分布式进程将在单个 GPU 上运行。这可以实现单节点训练性能的显著提升。它还可以用于多节点分布式训练,通过在每个节点上启动多个进程,以实现多节点分布式训练性能的显著提升。对于具有多个 Infiniband 接口且支持直接 GPU 的系统,这将特别有益,因为所有这些接口都可以用于聚合通信带宽。

在单节点分布式训练或多节点分布式训练的两种情况下,此实用程序将启动每个节点上的给定数量的进程( --nproc-per-node )。如果用于 GPU 训练,此数量需要小于或等于当前系统上的 GPU 数量( nproc_per_node ),并且每个进程将运行在从 GPU 0 到 GPU(nproc_per_node - 1)的单个 GPU 上。

如何使用此模块:

  1. 单节点多进程分布式训练

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
           arguments of your training script)
  1. 多节点多进程分布式训练:(例如两个节点)

节点 1:(IP:192.168.1.1,并有一个空闲端口:1234)

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node-rank=0 --master-addr="192.168.1.1"
           --master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)

节点 2:

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node-rank=1 --master-addr="192.168.1.1"
           --master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
  1. 查看此模块提供的可选参数:

python -m torch.distributed.launch --help

重要通知:

1. 此实用程序和多进程分布式(单节点或多节点)GPU 训练目前仅通过 NCCL 分布式后端实现最佳性能。因此,NCCL 后端是推荐用于 GPU 训练的后端。

2. 在您的训练程序中,您必须解析命令行参数: --local-rank=LOCAL_PROCESS_RANK ,该参数将由本模块提供。如果您的训练程序使用 GPU,您应确保您的代码仅在 LOCAL_PROCESS_RANK 的 GPU 设备上运行。这可以通过以下方式实现:

解析本地 rank 参数

>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local-rank", "--local_rank", type=int)
>>> args = parser.parse_args()

使用以下任一方式设置您的设备为本地 rank

>>> torch.cuda.set_device(args.local_rank)  # before your code runs

或者

>>> with torch.cuda.device(args.local_rank):
>>>    # your code to run
>>>    ...

版本 2.0.0 变更:启动器将传递 --local-rank=<rank> 参数给您的脚本。从 PyTorch 2.0.0 版本开始,推荐使用破折号 --local-rank ,而不是之前使用的下划线 --local_rank

为了向后兼容,用户可能需要在他们的参数解析代码中处理这两种情况。这意味着在参数解析器中包括 "--local-rank""--local_rank" 。如果只提供 "--local_rank" ,启动器将触发错误:“错误:未识别的参数:–local-rank=”。对于仅支持 PyTorch 2.0.0+的训练代码,包括 "--local-rank" 应该足够。

3. 在您的训练程序中,您应该在开始时调用以下函数以启动分布式后端。强烈建议使用 init_method=env:// 。其他初始化方法(例如 tcp:// )可能可行,但 env:// 是本模块官方支持的方法。

>>> torch.distributed.init_process_group(backend='YOUR BACKEND',
>>>                                      init_method='env://')

4. 在您的训练程序中,您可以使用常规的分布式函数或使用 torch.nn.parallel.DistributedDataParallel() 模块。如果您的训练程序使用 GPU 进行训练并希望使用 torch.nn.parallel.DistributedDataParallel() 模块,以下是配置方法。

>>> model = torch.nn.parallel.DistributedDataParallel(model,
>>>                                                   device_ids=[args.local_rank],
>>>                                                   output_device=args.local_rank)

请确保将 device_ids 参数设置为您的代码将操作的唯一 GPU 设备 ID。这通常是进程的本地 rank。换句话说, device_ids 需要是 [args.local_rank]output_device 需要是 args.local_rank ,以便使用此实用程序。

5. 另一种通过环境变量 LOCAL_RANKlocal_rank 传递给子进程的方法。当您使用 --use-env=True 启动脚本时,此行为会被启用。您必须调整上述子进程示例,将 args.local_rank 替换为 os.environ['LOCAL_RANK'] ;当您指定此标志时,启动器不会传递 --local-rank

警告

local_rank 并非全局唯一:它仅在机器上的每个进程中是唯一的。因此,不要用它来决定是否,例如,写入网络文件系统。有关如何正确执行此操作的示例,请参阅 https://github.com/pytorch/pytorch/issues/12042。

生成工具 ¶

多进程包 - torch.multiprocessing 包也提供了一个 spawn 函数在 torch.multiprocessing.spawn() 中。此辅助函数可以用来生成多个进程。它通过传入要运行的函数并生成 N 个进程来运行它。这也可以用于多进程分布式训练。

有关如何使用它的参考,请参阅 PyTorch 示例 - ImageNet 实现

注意,此功能需要 Python 3.4 或更高版本。

调试 torch.distributed 应用程序

调试分布式应用程序可能具有挑战性,因为难以理解的挂起、崩溃或跨排名不一致的行为。 torch.distributed 提供了一套工具,以自助方式帮助调试训练应用程序:

Python 断点

在分布式环境中使用 Python 的调试器非常方便,但因为它默认不工作,所以很多人根本不使用它。PyTorch 提供了一个围绕 pdb 的定制包装器,简化了这一过程。

torch.distributed.breakpoint 使得这个过程变得简单。内部,它以两种方式定制了 pdb 的断点行为,但其他方面则与正常的 pdb 相同。1. 仅在用户指定的 rank 上附加调试器。2. 通过使用 torch.distributed.barrier() 确保所有其他 rank 停止,一旦调试 rank 发出 continue 命令,该 barrier 将释放。3. 将子进程的 stdin 重定向,使其连接到您的终端。

要使用它,只需在所有 rank 上发出 torch.distributed.breakpoint(rank),每个情况下的 rank 值都相同。

监控屏障

截至 v1.10 版本, torch.distributed.monitored_barrier() 作为 torch.distributed.barrier() 的替代方案存在,当崩溃时,torch.distributed.barrier() 会提供有关可能出错的排名的有用信息,即不是所有排名在提供的超时时间内调用 torch.distributed.monitored_barrier()torch.distributed.monitored_barrier() 通过使用 send / recv 通信原语实现主机端屏障,其过程类似于确认,允许排名 0 报告哪些排名未能及时确认屏障。例如,考虑以下函数,其中排名 1 未调用 torch.distributed.monitored_barrier() (在实际中,这可能是由于应用程序错误或前一个集体操作中的挂起):

import os
from datetime import timedelta

import torch
import torch.distributed as dist
import torch.multiprocessing as mp


def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    # monitored barrier requires gloo process group to perform host-side sync.
    group_gloo = dist.new_group(backend="gloo")
    if rank not in [1]:
        dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))


if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    mp.spawn(worker, nprocs=2, args=())

在排名 0 上产生以下错误信息,使用户能够确定哪些排名可能存在问题,并进一步调查:

RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms
 Original exception:
[gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594

TORCH_DISTRIBUTED_DEBUG

使用 TORCH_CPP_LOG_LEVEL=INFO ,可以通过环境变量 TORCH_DISTRIBUTED_DEBUG 触发额外的有用日志记录和集体同步检查,以确保所有排名都适当同步。 TORCH_DISTRIBUTED_DEBUG 可以设置为 OFF (默认值)、 INFODETAIL ,具体取决于所需的调试级别。请注意,最详细的选项 DETAIL 可能会影响应用程序性能,因此仅在调试问题时使用。

设置 TORCH_DISTRIBUTED_DEBUG=INFO 将在使用 torch.nn.parallel.DistributedDataParallel() 训练的模型初始化时产生额外的调试日志,并且 TORCH_DISTRIBUTED_DEBUG=DETAIL 将额外记录选定迭代次数的运行时性能统计信息。这些运行时统计信息包括前向时间、反向时间、梯度通信时间等数据。例如,给定以下应用程序:

import os

import torch
import torch.distributed as dist
import torch.multiprocessing as mp


class TwoLinLayerNet(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.a = torch.nn.Linear(10, 10, bias=False)
        self.b = torch.nn.Linear(10, 1, bias=False)

    def forward(self, x):
        a = self.a(x)
        b = self.b(x)
        return (a, b)


def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    torch.cuda.set_device(rank)
    print("init model")
    model = TwoLinLayerNet().cuda()
    print("init ddp")
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])

    inp = torch.randn(10, 10).cuda()
    print("train")

    for _ in range(20):
        output = ddp_model(inp)
        loss = output[0] + output[1]
        loss.sum().backward()


if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
    os.environ[
        "TORCH_DISTRIBUTED_DEBUG"
    ] = "DETAIL"  # set to DETAIL for runtime logging.
    mp.spawn(worker, nprocs=2, args=())

初始化时渲染以下日志:

I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO

在运行时(当设置 TORCH_DISTRIBUTED_DEBUG=DETAIL 时)渲染以下日志:

I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] Training TwoLinLayerNet unused_parameter_size=0
 Avg forward compute time: 40838608
 Avg backward compute time: 5983335
Avg backward comm. time: 4326421
 Avg backward comm/comp overlap time: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] Training TwoLinLayerNet unused_parameter_size=0
 Avg forward compute time: 42850427
 Avg backward compute time: 3885553
Avg backward comm. time: 2357981
 Avg backward comm/comp overlap time: 2234674

此外, TORCH_DISTRIBUTED_DEBUG=INFO 由于模型中存在未使用的参数,增强了 torch.nn.parallel.DistributedDataParallel() 的崩溃日志功能。目前,如果正向传递中可能存在未使用的参数,则必须将 find_unused_parameters=True 传递给 torch.nn.parallel.DistributedDataParallel() 的初始化,并且从 v1.10 版本开始,所有模型输出都必须用于损失计算,因为 torch.nn.parallel.DistributedDataParallel() 不支持反向传递中的未使用参数。这些限制对于大型模型尤其具有挑战性,因此当发生错误崩溃时, torch.nn.parallel.DistributedDataParallel() 将记录所有未使用的参数的完全限定名称。例如,在上面的应用程序中,如果我们修改 loss 以计算为 loss = output[1] ,那么 TwoLinLayerNet.a 在反向传递中不会接收到梯度,从而导致 DDP 失败。在崩溃时,用户会收到有关未使用参数的信息,对于大型模型来说,手动查找这些信息可能具有挑战性:

RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing
 the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by
making sure all `forward` function outputs participate in calculating loss.
If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return va
lue of `forward` of your module when reporting this issue (e.g. list, dict, iterable).
Parameters which did not receive grad for rank 0: a.weight
Parameter indices which did not receive grad for rank 0: 0

设置 TORCH_DISTRIBUTED_DEBUG=DETAIL 将触发用户发出的每个集体调用(无论是直接还是间接,如 DDP allreduce )的额外一致性和同步检查。这是通过创建一个包装进程组来完成的,该进程组包装了 torch.distributed.init_process_group()torch.distributed.new_group() API 返回的所有进程组。因此,这些 API 将返回一个包装进程组,它可以像常规进程组一样使用,但在将集体调度到底层进程组之前执行一致性检查。目前,这些检查包括一个 torch.distributed.monitored_barrier() ,确保所有进程完成其未完成的集体调用,并报告卡住的进程。然后,通过确保所有集体函数匹配并且使用一致的张量形状调用,对集体本身进行一致性检查。如果不一致,则在应用程序崩溃时包含详细的错误报告,而不是挂起或无信息的错误消息。例如,考虑以下具有与 torch.distributed.all_reduce() 不匹配输入形状的函数:

import torch
import torch.distributed as dist
import torch.multiprocessing as mp


def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    torch.cuda.set_device(rank)
    tensor = torch.randn(10 if rank == 0 else 20).cuda()
    dist.all_reduce(tensor)
    torch.cuda.synchronize(device=rank)


if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
    os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
    mp.spawn(worker, nprocs=2, args=())

使用 NCCL 后端,这样的应用程序可能会挂起,这在非平凡场景中很难找到根本原因。如果用户启用 TORCH_DISTRIBUTED_DEBUG=DETAIL 并重新运行应用程序,以下错误消息将揭示根本原因:

work = default_pg.allreduce([tensor], opts)
RuntimeError: Error when verifying shape tensors for collective ALLREDUCE on rank 0. This likely indicates that input shapes into the collective are mismatched across ranks. Got shapes:  10
[ torch.LongTensor{1} ]

注意

在运行时对调试级别进行细粒度控制时,还可以使用 torch.distributed.set_debug_level()torch.distributed.set_debug_level_from_env()torch.distributed.get_debug_level() 函数。

此外,可以使用 TORCH_DISTRIBUTED_DEBUG=DETAIL 与 TORCH_SHOW_CPP_STACKTRACES=1 结合使用,以记录检测到集体不同步时的整个调用栈。这些集体不同步检查将适用于所有使用 c10d 集体调用并由 torch.distributed.init_process_group()torch.distributed.new_group() API 创建的进程组支持的应用程序。

记录日志

除了通过 torch.distributed.monitored_barrier()TORCH_DISTRIBUTED_DEBUG 的显式调试支持外, torch.distributed 的底层 C++ 库还会以各种级别输出日志消息。这些消息有助于理解分布式训练作业的执行状态,以及排查网络连接故障等问题。以下矩阵显示了如何通过组合 TORCH_CPP_LOG_LEVELTORCH_DISTRIBUTED_DEBUG 环境变量来调整日志级别。

TORCH_CPP_LOG_LEVEL

TORCH_DISTRIBUTED_DEBUG

有效日志级别

ERROR

忽略

错误

WARNING

忽略

警告

INFO

忽略

信息

INFO

INFO

调试

INFO

DETAIL

跟踪(又称全部)

分布式组件引发自定义的从 RuntimeError 派生的异常类型:

  • torch.distributed.DistError:这是所有分布式异常的基类型。

  • torch.distributed.DistBackendError:当发生特定后端错误时抛出此异常。例如,如果使用 NCCL 后端,并且用户尝试使用 NCCL 库不可用的 GPU。

  • torch.distributed.DistNetworkError:当网络库遇到错误时抛出此异常(例如:连接由对方重置)

  • torch.distributed.DistStoreError:当存储遇到错误时抛出此异常(例如:TCPStore 超时)

类 torch.distributed.DistError

分布式库发生错误时引发的异常

torch.distributed.DistBackendError 类

在分布式计算中发生后端错误时引发的异常

class torch.distributed.DistNetworkError

在分布式计算中发生网络错误时引发的异常

class torch.distributed.DistStoreError

在分布式存储中发生错误时引发的异常

如果您正在运行单节点训练,那么可能方便地交互式地中断您的脚本。我们提供了一种方便地中断单个 rank 的方法:

torch.distributed.breakpoint(rank=0, skip=0)[source][source]

设置断点,但仅限于单个 rank。其他 rank 将等待您完成断点后再继续。

参数:
  • rank(整数)- 要在哪个 rank 上断点。默认: 0

  • skip(整数)- 跳过第一个 skip 次对该断点的调用。默认: 0


© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,主题由 Read the Docs 提供。

文档

PyTorch 开发者文档全面访问

查看文档

教程

获取初学者和高级开发者的深入教程

查看教程

资源

查找开发资源并获得您的疑问解答

查看资源