分布式 RPC 框架¶
分布式 RPC 框架提供了一套原语,用于多机模型训练的远程通信机制,以及一个高级 API,用于自动区分跨多台机器分割的模型。
警告
RPC 包中的 API 是稳定的。目前有多个正在进行的工作项旨在提高性能和错误处理,这些改进将在未来的版本中发布。
警告
CUDA 支持在 PyTorch 1.9 版本中引入,目前仍为测试功能。RPC 包的并非所有功能都与 CUDA 支持兼容,因此不建议使用这些功能。这些不兼容的功能包括:RRefs、JIT 兼容性、分布式自动微分和分布式优化器,以及性能分析。这些不足将在未来的版本中得到解决。
注意
请参阅 PyTorch 分布式概述以了解有关分布式训练的所有相关功能的简要介绍。
基础 §
分布式 RPC 框架使得远程运行函数变得简单,支持引用远程对象而不需要复制实际数据,并提供自动微分和优化器 API,以透明地跨 RPC 边界运行反向操作和更新参数。这些功能可以分为四组 API。
远程过程调用(RPC)支持在指定的目标工作器上运行带有给定参数的函数,并获取返回值或创建返回值的引用。主要有三个 RPC API:
rpc_sync()
(同步)、rpc_async()
(异步)和remote()
(异步并返回远程返回值的引用)。如果用户代码没有返回值就无法继续,请使用同步 API。否则,使用异步 API 获取未来值,并在需要返回值时等待未来值。当需要远程创建某物但永远不会将其检索到调用者处时,remote()
API 很有用。想象一下,一个驱动进程正在设置参数服务器和训练器。驱动器可以在参数服务器上创建嵌入表,然后与训练器共享嵌入表的引用,但自己永远不会在本地使用嵌入表。在这种情况下,rpc_sync()
和rpc_async()
不再适用,因为它们总是意味着返回值将立即或在未来返回给调用者。远程引用(RRef)是一个指向本地或远程对象的分布式共享指针。它可以与其他工作者共享,引用计数将透明处理。每个 RRef 只有一个所有者,对象只存在于该所有者。持有 RRef 的非所有者工作者可以通过显式请求从所有者处获取对象的副本。当工作者需要访问某些数据对象,但自身既不是创建者(
remote()
的调用者)也不是对象的所有者时,这很有用。下面我们将讨论的分布式优化器就是此类用例的一个例子。分布式 Autograd 将所有参与前向传播的本地 Autograd 引擎缝合在一起,并在反向传播期间自动向它们发出请求以计算梯度。这在需要跨多台机器进行,例如分布式模型并行训练、参数服务器训练等操作时特别有用。有了这个特性,用户代码就无需担心如何发送梯度跨越 RPC 边界以及本地 Autograd 引擎的启动顺序,这在存在嵌套和相互依赖的 RPC 调用时可能会变得相当复杂。
分布式优化器的构造函数接受一个
Optimizer()
(例如,SGD()
,Adagrad()
等)和一个参数 RRefs 列表,在每个不同的 RRef 所有者上创建一个Optimizer()
实例,并在运行step()
时相应地更新参数。当你有分布式的前向和反向传播时,参数和梯度将分散在多个工作者之间,因此需要每个参与工作者上的优化器。分布式优化器将这些本地优化器包装在一个中,并提供简洁的构造函数和step()
API。
RPC 协议
在使用 RPC 和分布式自动微分原语之前,必须进行初始化。为了初始化 RPC 框架,我们需要使用 init_rpc()
,这将初始化 RPC 框架、RRef 框架和分布式自动微分。
- torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None)[source][source]
初始化 RPC 原语,如本地 RPC 代理和分布式自动微分,这立即使当前进程准备好发送和接收 RPC。
- 参数:
name (str) – 该节点的全局唯一名称。(例如,
Trainer3
,ParameterServer2
,Master
,Worker1
)名称只能包含数字、字母、下划线、冒号和/或破折号,并且长度必须小于 128 个字符。后端(BackendType,可选)- RPC 后端实现的类型。支持值为
BackendType.TENSORPIPE
(默认)。有关更多信息,请参阅后端。rank(整数)- 该节点的全局唯一 ID/排名。
world_size(整数)- 组中的工作者数量。
rpc_backend_options(RpcBackendOptions,可选)- 传递给 RpcAgent 构造函数的选项。它必须是
RpcBackendOptions
的特定子类,并包含特定于代理的初始化配置。默认情况下,对于所有代理,它将默认超时设置为 60 秒,并使用init_method = "env://"
初始化的底层进程组进行 rendezvous,这意味着需要正确设置环境变量MASTER_ADDR
和MASTER_PORT
。有关更多信息,请参阅后端,并查找可用的选项。
以下 API 允许用户远程执行函数以及创建对远程数据对象的引用(RRefs)。在这些 API 中,当传递 Tensor
作为参数或返回值时,目标工作进程将尝试创建具有相同元信息(即形状、步长等)的 Tensor
。我们有意禁止传输 CUDA 张量,因为如果源和目标工作进程的设备列表不匹配,可能会崩溃。在这种情况下,应用程序始终可以将输入张量显式地从调用者移动到 CPU,如果必要,再将其移动到被调用者所需的设备。
警告
RPC 中的 TorchScript 支持是一个原型功能,可能会发生变化。自 v1.5.0 版本以来, torch.distributed.rpc
支持将 TorchScript 函数作为 RPC 目标函数调用,这将有助于提高被调用方的并行性,因为执行 TorchScript 函数不需要 GIL。
- torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=-1.0)[source][source] ¶
向工作进程
to
发送阻塞 RPC 调用以运行函数func
。RPC 消息在 Python 代码执行并行发送和接收。此方法是线程安全的。- 参数:
to (str 或 WorkerInfo 或 int) – 目标工作者的名称/等级/
WorkerInfo
。func (Callable) – 可调用函数,例如 Python 可调用、内置运算符(例如
add()
)和注解的 TorchScript 函数。args (tuple) –
func
调用的参数元组。kwargs (dict) – 是
func
调用的关键字参数字典。超时(浮点数,可选)- 用于此 RPC 的超时时间(秒)。如果 RPC 在此时间内未完成,将引发表示超时的异常。0 值表示无限超时,即不会引发超时错误。如果未提供,则使用初始化或通过
_set_rpc_timeout
设置的默认值。
- 返回:
返回使用
func
、args
和kwargs
运行的结果。
- 示例::
确保在两个工作者上正确设置了MASTER_ADDR
和MASTER_PORT
。有关详细信息,请参阅init_process_group()
API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
在两个不同的进程中运行以下代码:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
下面是使用 RPC 运行 TorchScript 函数的示例。
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=-1.0)[source][source] ¶
向运行在
to
工作器上的函数func
发起非阻塞 RPC 调用。RPC 消息在 Python 代码执行并行发送和接收。此方法线程安全。此方法将立即返回一个Future
,可以等待其完成。- 参数:
to (str 或 WorkerInfo 或 int) – 目标工作者的名称/等级/
WorkerInfo
。func (Callable) – 可调用函数,例如 Python 可调用、内置运算符(例如
add()
)和注解的 TorchScript 函数。args (tuple) –
func
调用的参数元组。kwargs (dict) – 是
func
调用的关键字参数字典。超时(浮点数,可选)- 用于此 RPC 的超时时间(秒)。如果 RPC 在此时间内未完成,将引发表示超时的异常。0 值表示无限超时,即不会引发超时错误。如果没有提供,则使用初始化期间或通过
_set_rpc_timeout
设置的默认值。
- 返回:
返回一个可等待的
Future
对象。当完成时,可以从Future
对象中检索func
在args
和kwargs
上的返回值。
警告
由于我们不支持通过网络发送 GPU 张量,因此不支持将 GPU 张量用作
func
的参数或返回值。您需要显式地将 GPU 张量复制到 CPU,然后再将其用作func
的参数或返回值。警告
rpc_async
API 在通过网络发送参数张量之前不会复制存储的参数张量,这可能由不同线程根据 RPC 后端类型完成。调用者应确保这些张量的内容在返回的Future
完成之前保持完整。- 示例::
确保在两个工作器上都正确设置了MASTER_ADDR
和MASTER_PORT
。有关详细信息,请参阅init_process_group()
API。例如,导出 MASTER_ADDR=localhost export MASTER_PORT=5678
然后在两个不同的进程中运行以下代码:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3)) >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2)) >>> result = fut1.wait() + fut2.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
下面是使用 RPC 运行 TorchScript 函数的示例。
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3)) >>> ret = fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.remote(to, func, args=None, kwargs=None, timeout=-1.0)[source][source] ¶
向工作节点
to
调用func
并立即返回结果值RRef
。工作节点to
将是返回值RRef
的所有者,调用remote
的工作节点是用户。所有者管理其RRef
的全局引用计数,所有者RRef
仅在全局没有活跃引用时才会被销毁。- 参数:
to (str 或 WorkerInfo 或 int) – 目标工作节点的名称/排名/
WorkerInfo
。func (Callable) – 可调用函数,例如 Python 可调用函数、内置运算符(例如
add()
)和注解的 TorchScript 函数。args(元组)-
func
调用的参数元组。kwargs(字典)- 是
func
调用的关键字参数字典。timeout(浮点数,可选)- 此远程调用的超时时间(秒)。如果在此超时时间内,在工作者
to
上创建此RRef
未成功处理,则在下一次尝试使用 RRef(如to_here()
)时,将引发超时错误,表示此失败。0 的值表示无限超时,即永远不会引发超时错误。如果没有提供,则使用初始化期间或通过_set_rpc_timeout
设置的默认值。
- 返回:
将用户
RRef
实例到结果值。使用阻塞 APItorch.distributed.rpc.RRef.to_here()
在本地检索结果值。
警告
The
remote
API does not copy storages of argument tensors until sending them over the wire, which could be done by a different thread depending on the RPC backend type. The caller should make sure that the contents of those tensors stay intact until the returned RRef is confirmed by the owner, which can be checked using thetorch.distributed.rpc.RRef.confirmed_by_owner()
API.警告
Errors such as timeouts for the
remote
API are handled on a best-effort basis. This means that when remote calls initiated byremote
fail, such as with a timeout error, we take a best-effort approach to error handling. This means that errors are handled and set on the resulting RRef on an asynchronous basis. If the RRef has not been used by the application before this handling (such asto_here
or fork call), then future uses of theRRef
will appropriately raise errors. However, it is possible that the user application will use theRRef
before the errors are handled. In this case, errors may not be raised as they have not yet been handled.示例:
Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly on both workers. Refer to :meth:`~torch.distributed.init_process_group` API for more details. For example, export MASTER_ADDR=localhost export MASTER_PORT=5678 Then run the following code in two different processes: >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> x = rref1.to_here() + rref2.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() Below is an example of running a TorchScript function using RPC. >>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar) >>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rref.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.get_worker_info(worker_name=None)[source][source]¶
Get
WorkerInfo
of a given worker name. Use thisWorkerInfo
to avoid passing an expensive string on every invocation.- 参数:
worker_name (str) – 当前工作者的字符串名称。如果为
None
,则返回当前工作者的 id。 (默认None
)- 返回:
WorkerInfo
实例,对于当前工作者的worker_name
或WorkerInfo
,如果worker_name
为None
。
- torch.distributed.rpc.shutdown(graceful=True, timeout=0)[source][source]¶
执行 RPC 代理的关闭操作,并销毁 RPC 代理。这将停止本地代理接受未完成的请求,并通过终止所有 RPC 线程来关闭 RPC 框架。如果
graceful=True
,则此操作将阻塞,直到所有本地和远程 RPC 进程都调用此方法并等待所有未完成的工作完成。否则,如果graceful=False
,这是一个本地关闭操作,它不会等待其他 RPC 进程调用此方法。警告
对于由
rpc_async()
返回的Future
个对象,future.wait()
在shutdown()
之后不应被调用。- 参数:
平滑关闭(布尔值)- 是否进行平滑关闭。如果为 True,则 1) 等待直到
UserRRefs
没有挂起的系统消息并删除它们;2) 阻塞直到所有本地和远程 RPC 进程都调用此方法,并等待所有待办工作完成。
- 示例::
确保在两个工作器上正确设置了MASTER_ADDR
和MASTER_PORT
。有关详细信息,请参阅init_process_group()
API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
在两个不同的进程中运行以下代码:
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> # do some work >>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1)) >>> # ready to shutdown >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> # wait for worker 0 to finish work, and then shutdown. >>> rpc.shutdown()
- class torch.distributed.rpc.WorkerInfo
一个封装系统中工作器信息的结构。包含工作器的名称和 ID。此类不应用于直接构造,而是可以通过
get_worker_info()
获取其实例,并将结果传递给rpc_sync()
、rpc_async()
、remote()
等函数,以避免每次调用时都复制字符串。- 属性 id
全局唯一标识符,用于识别工作者。
- 属性名称¶
工作者名称。
RPC 包还提供了装饰器,允许应用程序指定在调用方如何处理给定的函数。
- torch.distributed.rpc.functions.async_execution(fn)[source][source]
装饰器用于表示函数的返回值保证为
Future
对象,并且此函数可以在 RPC 调用方异步运行。更具体地说,调用方从包装函数中提取Future
,并将后续处理步骤作为回调安装到那个Future
上。安装的回调将在完成时从Future
读取值,并将该值作为 RPC 响应发送回去。这也意味着返回的Future
仅在调用方存在,永远不会通过 RPC 发送。当包装函数的(fn
)执行需要暂停和恢复,例如包含rpc_async()
或等待其他信号时,此装饰器非常有用。注意
要启用异步执行,应用程序必须将此装饰器返回的函数对象传递给 RPC API。如果 RPC 检测到由该装饰器安装的属性,它知道该函数返回一个
Future
对象,并将相应地处理。但这并不意味着在定义函数时该装饰器必须是外层的。例如,当与@staticmethod
或@classmethod
结合使用时,@rpc.functions.async_execution
需要是内部装饰器,以便目标函数被识别为静态或类函数。此目标函数仍然可以异步执行,因为当访问时,静态或类方法会保留@rpc.functions.async_execution
安装的属性。- 示例::
返回的Future
对象可以来自rpc_async()
、then()
或Future
构造函数。以下示例显示了直接使用由then()
返回的Future
。>>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @rpc.functions.async_execution >>> def async_add_chained(to, x, y, z): >>> # This function runs on "worker1" and returns immediately when >>> # the callback is installed through the `then(cb)` API. In the >>> # mean time, the `rpc_async` to "worker2" can run concurrently. >>> # When the return value of that `rpc_async` arrives at >>> # "worker1", "worker1" will run the lambda function accordingly >>> # and set the value for the previously returned `Future`, which >>> # will then trigger RPC to send the result back to "worker0". >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add_chained, >>> args=("worker2", torch.ones(2), 1, 1) >>> ) >>> print(ret) # prints tensor([3., 3.])
当与 TorchScript 装饰器结合使用时,此装饰器必须是外层的。
>>> from torch import Tensor >>> from torch.futures import Future >>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @torch.jit.script >>> def script_add(x: Tensor, y: Tensor) -> Tensor: >>> return x + y >>> >>> @rpc.functions.async_execution >>> @torch.jit.script >>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]: >>> return rpc.rpc_async(to, script_add, (x, y)) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add, >>> args=("worker2", torch.ones(2), 1) >>> ) >>> print(ret) # prints tensor([2., 2.])
当与静态或类方法结合使用时,此装饰器必须是内层的。
>>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> class AsyncExecutionClass: >>> >>> @staticmethod >>> @rpc.functions.async_execution >>> def static_async_add(to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> @classmethod >>> @rpc.functions.async_execution >>> def class_async_add(cls, to, x, y, z): >>> ret_fut = torch.futures.Future() >>> rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: ret_fut.set_result(fut.wait() + z) >>> ) >>> return ret_fut >>> >>> @rpc.functions.async_execution >>> def bound_async_add(self, to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.static_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.]) >>> >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.class_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.])
此装饰器也适用于 RRef 辅助工具,即 .
torch.distributed.rpc.RRef.rpc_sync()
,torch.distributed.rpc.RRef.rpc_async()
和torch.distributed.rpc.RRef.remote()
。>>> from torch.distributed import rpc >>> >>> # reuse the AsyncExecutionClass class above >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2) >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait() >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here() >>> print(ret) # prints tensor([4., 4.])
后端
RPC 模块可以利用不同的后端来执行节点之间的通信。要使用哪个后端可以在 init_rpc()
函数中指定,通过传递 BackendType
枚举的某个值。无论使用哪个后端,RPC API 的其余部分都不会改变。每个后端还定义了自己的 RpcBackendOptions
类的子类,该子类的一个实例也可以传递给 init_rpc()
来配置后端的行为。
- class torch.distributed.rpc.BackendType(value, names=<未提供>, *values, module=None, qualname=None, type=None, start=1, boundary=None) ¶
- class torch.distributed.rpc.RpcBackendOptions¶
封装传入 RPC 后端选项的抽象结构。可以通过此类的实例传递给
init_rpc()
以使用特定配置初始化 RPC,例如 RPC 超时和init_method
。- 属性 init_method
指定如何初始化进程组的 URL。默认为
env://
- 属性 rpc_timeout
表示所有 RPC 使用的超时时间。如果 RPC 在此时间范围内未完成,它将抛出异常,表示已超时。
TensorPipe 后端
TensorPipe 代理,默认情况下,利用 TensorPipe 库,该库提供了一种原生的点对点通信原语,特别适合机器学习,从根本上解决了 Gloo 的一些局限性。与 Gloo 相比,它具有异步的优势,允许大量传输同时发生,每个传输以自己的速度进行,而不会相互阻塞。它仅在需要时才在节点对之间打开管道,当一个节点失败时,只有其故障管道会关闭,而其他管道将继续正常工作。此外,它能够支持多种不同的传输方式(当然包括 TCP,还包括共享内存、NVLink、InfiniBand 等),并且可以自动检测它们的可用性,并为每个管道协商最佳传输方式。
TensorPipe 后端自 PyTorch v1.6 版本引入,目前正在积极开发中。目前它仅支持 CPU 张量,GPU 支持即将推出。它采用基于 TCP 的传输,就像 Gloo 一样。它还能够自动将大张量分块并多路复用到多个套接字和线程中,以实现非常高的带宽。代理将能够自动选择最佳的传输方式,无需人工干预。
示例:
>>> import os
>>> from torch.distributed import rpc
>>> os.environ['MASTER_ADDR'] = 'localhost'
>>> os.environ['MASTER_PORT'] = '29500'
>>>
>>> rpc.init_rpc(
>>> "worker1",
>>> rank=0,
>>> world_size=2,
>>> rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
>>> num_worker_threads=8,
>>> rpc_timeout=20 # 20 second timeout
>>> )
>>> )
>>>
>>> # omitting init_rpc invocation on worker2
- class torch.distributed.rpc.TensorPipeRpcBackendOptions(*, num_worker_threads=16, rpc_timeout=60.0, init_method='env://', device_maps=None, devices=None, _transports=None, _channels=None)[source][source]¶
TensorPipeAgent
的后端选项,由RpcBackendOptions
衍生。- 参数:
num_worker_threads (int, optional) –
TensorPipeAgent
用于执行请求的线程池中的线程数(默认:16)。rpc_timeout (浮点数,可选) – RPC 请求的默认超时时间,单位为秒(默认:60 秒)。如果 RPC 在此时间范围内未完成,将抛出表示此情况的异常。调用者可以在必要时通过
rpc_sync()
和rpc_async()
覆盖单个 RPC 的此超时。init_method (字符串,可选) – 用于 rendezvous 的分布式存储初始化的 URL。它接受
init_process_group()
相同参数的任何值(默认:env://
)。device_maps (Dict[str, Dict],可选) – 从此工作器到调用者的设备放置映射。键是调用者工作器名称,值是从
Dict
的int
、str
或torch.device
映射此工作器设备到调用者工作器设备的字典。(默认:None
)devices (List[int, str, 或
torch.device
],可选) – RPC 代理使用的所有本地 CUDA 设备。默认情况下,它将初始化为其自身的device_maps
和其同伴的device_maps
对应的设备。在处理 CUDA RPC 请求时,代理将为List
中的所有设备正确同步 CUDA 流。
- 属性设备映射表
设备映射位置。
- 属性设备
本地代理使用的所有设备。
- 属性初始化方法 ¶
指定如何初始化进程组的 URL。默认为
env://
- 属性 num_worker_threads ¶
TensorPipeAgent
用于执行请求的线程池中的线程数。
- 属性 rpc_timeout ¶
一个浮点数,表示所有 RPC 使用的超时时间。如果 RPC 在此时间段内未完成,它将异常完成,表示已超时。
- set_device_map(to, device_map)[source][source]¶
设置每个 RPC 调用者和被调用者对之间的设备映射。此函数可以多次调用,以逐步添加设备放置配置。
- 参数:
to (str) – 被调用者名称。
device_map (Dict of python:int, str, or torch.device) – 从本工作者到被调用者的设备放置映射。此映射必须是可逆的。
示例
>>> # both workers >>> def add(x, y): >>> print(x) # tensor([1., 1.], device='cuda:1') >>> return x + y, (x + y).to(2) >>> >>> # on worker 0 >>> options = TensorPipeRpcBackendOptions( >>> num_worker_threads=8, >>> device_maps={"worker1": {0: 1}} >>> # maps worker0's cuda:0 to worker1's cuda:1 >>> ) >>> options.set_device_map("worker1", {1: 2}) >>> # maps worker0's cuda:1 to worker1's cuda:2 >>> >>> rpc.init_rpc( >>> "worker0", >>> rank=0, >>> world_size=2, >>> backend=rpc.BackendType.TENSORPIPE, >>> rpc_backend_options=options >>> ) >>> >>> x = torch.ones(2) >>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1)) >>> # The first argument will be moved to cuda:1 on worker1. When >>> # sending the return value back, it will follow the invert of >>> # the device map, and hence will be moved back to cuda:0 and >>> # cuda:1 on worker0 >>> print(rets[0]) # tensor([2., 2.], device='cuda:0') >>> print(rets[1]) # tensor([2., 2.], device='cuda:1')
注意
RPC 框架不会自动重试任何 rpc_sync()
、 rpc_async()
和 remote()
调用。原因是 RPC 框架无法确定操作是否幂等以及是否可以安全重试。因此,处理失败和必要时重试是应用程序的责任。RPC 通信基于 TCP,因此可能会因为网络故障或间歇性网络连接问题而出现故障。在这种情况下,应用程序需要适当地进行重试,并使用合理的退避策略,以确保网络不会被激进的重试所淹没。
RRef¶
警告
当使用 CUDA 张量时,目前不支持 RRef。
远程引用(Remote REFerence)是指对远程工作者上某种类型(例如)的值的引用。此句柄在所有者上保持引用的远程值存活,但并不意味着该值将来会转移到本地工作者。RRefs 可以在多机训练中使用,通过持有其他工作者上存在的 nn.Modules 的引用,并在训练期间调用适当的函数来检索或修改它们的参数。有关详细信息,请参阅远程引用协议。
- class torch.distributed.rpc.PyRRef(RRef)
封装远程工作者上某种类型值的引用的类。此句柄将在工作者上保持引用的远程值存活。当 1) 应用程序代码和本地 RRef 上下文中没有对该引用的引用,或 2) 应用程序已调用优雅的关闭时,将删除
UserRRef
。在已删除的 RRef 上调用方法会导致未定义的行为。RRef 实现仅提供尽力错误检测,应用程序不应在UserRRefs
之后使用rpc.shutdown()
。警告
RRefs 只能由 RPC 模块进行序列化和反序列化。在没有 RPC 的情况下序列化和反序列化 RRefs(例如 Python pickle、torch
save()
/load()
、JITsave()
/load()
等)将导致错误。- 参数:
value(对象)- 要由本 RRef 包装的值。
type_hint(Type,可选)- 应传递给
TorchScript
编译器的 Python 类型,作为value
的类型提示。
- 示例::
以下示例为了简洁省略了 RPC 初始化和关闭代码。有关这些细节,请参阅 RPC 文档。使用 rpc.remote 创建一个 RRef
>>> import torch >>> import torch.distributed.rpc as rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> # get a copy of value from the RRef >>> x = rref.to_here()
从本地对象创建一个 RRef
>>> import torch >>> from torch.distributed.rpc import RRef >>> x = torch.zeros(2, 2) >>> rref = RRef(x)
将 RRef 与其他工作进程共享
>>> # On both worker0 and worker1: >>> def f(rref): >>> return rref.to_here() + 1
>>> # On worker0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch.distributed.rpc import RRef >>> rref = RRef(torch.zeros(2, 2)) >>> # the following RPC shares the rref with worker1, reference >>> # count is automatically updated. >>> rpc.rpc_sync("worker1", f, args=(rref,))
- backward(self:torch._C._distributed_rpc.PyRRef, dist_autograd_ctx_idint=-1, retain_graphbool=False) → None
使用 RRef 作为反向传播的根节点执行反向传播。如果提供了
dist_autograd_ctx_id
,我们将使用提供的 ctx_id 从 RRef 的所有者处执行分布式反向传播。在这种情况下,应使用get_gradients()
来检索梯度。如果dist_autograd_ctx_id
等于None
,则假定这是一个本地 autograd 图,我们只执行本地反向传播。在本地情况下,调用此 API 的节点必须是 RRef 的所有者。RRef 的值预期是一个标量 Tensor。- 参数:
dist_autograd_ctx_id (int, 可选) – 应该检索梯度的分布式 autograd 上下文 id(默认:-1)。
retain_graph (bool, 可选) – 如果
False
,用于计算梯度的图将被释放。请注意,在几乎所有情况下,将此选项设置为True
通常是不必要的,并且通常可以通过更有效的方式解决。通常,您需要将其设置为True
以多次运行反向传播(默认:False)。
- 示例::
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> rref.backward(context_id)
- confirmed_by_owner(self: torch._C._distributed_rpc.PyRRef) bool ¶
返回此
RRef
是否已被所有者确认。OwnerRRef
始终返回 true,而UserRRef
仅在所有者知道此UserRRef
时才返回 true。
- is_owner(self: torch._C._distributed_rpc.PyRRef) bool ¶
返回当前节点是否为此
RRef
的所有者。
- local_value(self: torch._C._distributed_rpc.PyRRef) object ¶
如果当前节点是所有者,则返回本地值的引用。否则,抛出异常。
- owner(self: torch._C._distributed_rpc.PyRRef) torch._C._distributed_rpc.WorkerInfo ¶
返回拥有此
RRef
节点的 worker 信息。
- owner_name(self: torch._C._distributed_rpc.PyRRef) str ¶
返回拥有此
RRef
节点的工人名称。
- remote(self:torch._C._distributed_rpc.PyRRef, timeoutfloat=- 1.0) → 对象
创建一个辅助代理,以便轻松使用 RRef 的所有者作为运行此 RRef 引用的对象上的函数的目标。更具体地说,
rref.remote().func_name(*args, **kwargs)
等同于以下内容:>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 参数:
超时(浮点数,可选)-
rref.remote()
的超时时间。如果在此超时时间内没有成功创建此RRef
,则在下一次尝试使用 RRef(例如to_here
)时,将引发超时异常。如果没有提供,则使用默认的 RPC 超时时间。请参阅rpc.remote()
了解RRef
的具体超时语义。
- 示例::
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.remote().size().to_here() # returns torch.Size([2, 2]) >>> rref.remote().view(1, 4).to_here() # returns tensor([[1., 1., 1., 1.]])
- rpc_async(self: torch._C._distributed_rpc.PyRRef, timeout: float = - 1.0) object ¶
创建一个辅助代理,以便轻松使用 RRef 的所有者作为运行此 RRef 引用的对象上的函数的目标。更具体地说,
rref.rpc_async().func_name(*args, **kwargs)
等同于以下内容:>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 参数:
超时(浮点数,可选)-
rref.rpc_async()
的超时时间。如果调用在此时间范围内未完成,将引发表示此情况的异常。如果未提供此参数,将使用默认的 RPC 超时。
- 示例::
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_async().size().wait() # returns torch.Size([2, 2]) >>> rref.rpc_async().view(1, 4).wait() # returns tensor([[1., 1., 1., 1.]])
- rpc_sync(self:torch._C._distributed_rpc.PyRRef, timeoutfloat=- 1.0) → 对象
创建一个辅助代理,以便轻松启动
rpc_sync
,使用 RRef 的所有者作为运行此 RRef 引用的对象上函数的目标。更具体地说,rref.rpc_sync().func_name(*args, **kwargs)
与以下内容相同:>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 参数:
超时(浮点数,可选)-
rref.rpc_sync()
的超时时间。如果调用在此时间范围内未完成,将引发表示此情况的异常。如果未提供此参数,将使用默认的 RPC 超时。
- 示例::
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_sync().size() # returns torch.Size([2, 2]) >>> rref.rpc_sync().view(1, 4) # returns tensor([[1., 1., 1., 1.]])
- to_here(self: torch._C._distributed_rpc.PyRRef, timeout: float = - 1.0) object ¶
阻塞调用,将 RRef 的值从拥有者复制到本地节点并返回。如果当前节点是拥有者,则返回对本地值的引用。
- 参数:
超时(float,可选)-
to_here
的超时时间。如果在指定时间内调用未完成,将抛出异常。如果未提供此参数,则使用默认 RPC 超时时间(60 秒)。
关于 RRef 的更多信息
远程模块 ¶
警告
当使用 CUDA 张量时,当前不支持远程模块
RemoteModule
是在另一个进程中远程创建 nn.Module 的简单方法。实际的模块位于远程主机上,但本地主机拥有此模块的句柄,可以像常规 nn.Module 一样调用此模块。然而,调用会涉及对远程端的 RPC 调用,如果需要,可以通过 RemoteModule 支持的额外 API 异步执行。
- class torch.distributed.nn.api.remote_module.RemoteModule(*args, **kwargs)[source][source]¶
A RemoteModule 实例只能在 RPC 初始化之后创建。
它在指定的远程节点上创建一个用户指定的模块。它表现得像常规的
nn.Module
,除了forward
方法是在远程节点上执行。它负责自动记录梯度,以确保反向传播将梯度传播回相应的远程模块。它根据
module_cls
的forward
方法的签名生成两个方法forward_async
和forward
。forward_async
异步运行并返回一个 Future。forward_async
和forward
的参数与模块返回的forward
方法的参数相同,该模块由module_cls
返回。例如,如果
module_cls
返回nn.Linear
的实例,该实例具有forward
方法签名:def forward(input: Tensor) -> Tensor:
,则生成的RemoteModule
将包含以下两个方法签名:def forward(input: Tensor) -> Tensor:
def forward_async(input: Tensor) -> Future[Tensor]:
- 参数:
remote_device (str) – 我们希望在目标工作器上放置此模块的设备。格式应为“/”,其中设备字段可以解析为 torch.device 类型。例如,“trainer0/cpu”,“trainer0”,“ps0/cuda:0”。此外,设备字段是可选的,默认值为“cpu”。
module_cls (nn.Module) –
要创建的远程模块的类。例如,
>>> class MyModule(nn.Module): >>> def forward(input): >>> return input + 1 >>> >>> module_cls = MyModule
args(序列,可选)- 要传递给
module_cls
的参数。kwargs(字典,可选)- 要传递给
module_cls
的键值对。
- 返回:
用户提供的
module_cls
创建的Module
的远程模块实例,它有一个阻塞的forward
方法和一个异步的forward_async
方法,该方法返回用户提供的远程模块上forward
调用的 future。
- 示例::
在两个不同的进程中运行以下代码:>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch import nn, Tensor >>> from torch.distributed.nn.api.remote_module import RemoteModule >>> >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> remote_linear_module = RemoteModule( >>> "worker1/cpu", nn.Linear, args=(20, 30), >>> ) >>> input = torch.randn(128, 20) >>> ret_fut = remote_linear_module.forward_async(input) >>> ret = ret_fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch >>> import torch.distributed.rpc as rpc >>> >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
此外,本教程中可以找到一个结合了 DistributedDataParallel(DDP)的更实用的示例。
- get_module_rref()[source]
返回一个指向远程模块的
RRef
(RRef[nn.Module]
)。- 返回类型:
RRef[模块]
分布式自动微分框架
警告
使用 CUDA 张量时,目前不支持分布式自动微分。
本模块提供了一个基于 RPC 的分布式自动微分框架,可用于模型并行训练等应用。简而言之,应用可以通过 RPC 发送和接收梯度记录张量。在正向传播过程中,我们记录梯度记录张量通过 RPC 发送的时间,在反向传播过程中,我们使用这些信息通过 RPC 执行分布式反向传播。有关更多详细信息,请参阅分布式自动微分设计。
- torch.distributed.autograd.backward(context_idint, rootsList[Tensor], retain_graph=False) → None
使用提供的 roots 启动基于该 roots 的分布式反向传播。目前实现的是 FAST 模式算法,该算法假设在同一分布式自动微分上下文中发送的所有 RPC 消息在反向传播期间都是自动微分图的一部分。
我们使用提供的 roots 来发现自动微分图并计算适当的依赖关系。此方法会阻塞,直到整个自动微分计算完成。
我们在每个节点上适当的
torch.distributed.autograd.context
中累积梯度。当调用torch.distributed.autograd.backward()
时,将查找要使用的 autograd 上下文对应的context_id
。如果没有与给定 ID 对应的有效 autograd 上下文,我们将抛出错误。您可以使用get_gradients()
API 检索累积的梯度。- 参数:
context_id(整数)- 要检索梯度的 autograd 上下文 ID。
roots(列表)- 表示 autograd 计算根的张量。所有张量都应该是标量。
retain_graph(布尔值,可选)- 如果为 False,用于计算梯度的图将被释放。请注意,在几乎所有情况下,将此选项设置为 True 通常是不必要的,并且通常可以通过更有效的方式解决。通常,您需要将此设置为 True 才能多次运行反向操作。
- 示例::
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> pred = model.forward() >>> loss = loss_func(pred, loss) >>> dist_autograd.backward(context_id, loss)
- class torch.distributed.autograd.context[source][source]¶
上下文对象,用于包装使用分布式自动微分时的前向和反向传播。在
with
语句中生成的context_id
是必需的,用于在所有工作者上唯一标识分布式反向传播。每个工作者存储与该context_id
相关的元数据,这是正确执行分布式自动微分传播所必需的。- 示例::
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum() >>> dist_autograd.backward(context_id, [loss])
- torch.distributed.autograd.get_gradients(context_idint) → Dict[TensorTensor]
获取从 Tensor 到相应梯度的映射,该梯度累积在提供的与给定
context_id
对应的上下文中,作为分布式自动微分反向传播的一部分。- 参数:
context_id(int)- 我们需要检索其梯度的自动微分上下文 ID。
- 返回:
键为张量,值为该张量相关联的梯度的映射。
- 示例::
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = t1 + t2 >>> dist_autograd.backward(context_id, [loss.sum()]) >>> grads = dist_autograd.get_gradients(context_id) >>> print(grads[t1]) >>> print(grads[t2])
关于 RPC Autograd 的更多信息
分布式优化器 ¶
请参阅 torch.distributed.optim 页面以获取分布式优化器的文档。
设计笔记
分布式自动微分设计笔记涵盖了基于 RPC 的分布式自动微分框架的设计,该框架对模型并行训练等应用很有用。
RRef 设计笔记涵盖了 RRef(远程引用)协议的设计,该协议用于框架通过远程工作者引用值。
教程
RPC 教程向用户介绍 RPC 框架,提供使用 torch.distributed.rpc API 的几个示例应用程序,并演示如何使用分析器来分析基于 RPC 的工作负载。
结合 Distributed DataParallel 与 Distributed RPC 框架(也涵盖 RemoteModule)