#!/usr/bin/python3
# mypy: 允许未类型化定义
导入
集合
导入
输入/输出
导入
系统
导入
类型
来自 collections.abc
导入
迭代器,
映射
来自
打字
导入
任意,
可调用,
可选,
类型变量,
联合
导入
火炬
导入 torch.distributed.rpc
作为 rpc
来自
火炬
导入
设备,
数据类型, nn,
张量
来自 torch.distributed
导入
远程设备
来自 torch.distributed.nn.jit
导入
实例化器
来自 torch.distributed.rpc.internal
导入 _internal_rpc_pickler
来自 torch.nn
导入
模块
来自
torch.nn 参数
导入
参数
来自 torch.utils.hooks
导入
可移除句柄
__all__ = [远程模块]
_grad_t = 联盟[
元组[
张量, ...
]
张量]
# 请参阅 https://mypy.readthedocs.io/en/latest/generics.html#generic-methods-and-generic-self 了解用法
要注解`self`的`T`的数量。许多`Module`的方法返回`self`,我们希望那些返回值是
子类的类型,而不是`Module`的更宽松类型。
T = 类型变量(
T,
绑定=
"模块")
_非脚本远程模块模块 = (
实例化器.
实例化不可脚本化远程模块模板()
)
_远程模块已序列化属性 = (
开启,
"设备",
是否已设置设备映射,
"可脚本化",
"生成方法",
"模块_rref",
)
_序列化远程模块 =
集合.
命名元组(
# 类型:忽略[杂项]
序列化远程模块,
_REMOTE_MODULE_PICKLED_ATTRIBUTES,
)
这些属性主要来自 RemoteModule 的父类,并且有意不进行序列化。
RemoteModule 的新属性应包含在_REMOTE_MODULE_PICKLED_ATTRIBUTES 中。
# 或 _REMOTE_MODULE_ATTRIBUTES_IGNORE_FOR_PICKLING。
# 否则,它将不会被序列化。
_REMOTE_MODULE_ATTRIBUTES_IGNORE_FOR_PICKLING = (
"训练",
_参数,
_缓冲区,
非持久缓冲区设置,
"_backward_hooks",
向后预处理钩子,
"_is_full_backward_hook",
"_forward_hooks",
"_forward_hooks_with_kwargs",
"_forward_hooks_always_called",
"_forward_pre_hooks",
"_forward_pre_hooks_with_kwargs",
"_状态字典挂钩",
"_state_dict_pre_hooks",
"_加载状态字典前挂钩",
"_load_state_dict_post_hooks",
"_state_dict_pre_hooks",
_模块,
# 以下两个属性是生成方法,在序列化时不可用。
forward_async,
前进,
)
RPC 处理器。
def _instantiate_template(模块接口类,
启用将 CPU 张量移动到 CUDA):
实例化器.
实例化可脚本化远程模块模板(
模块接口类,
启用将 CPU 张量移动到 CUDA
)
def 创建模块(
模块类,
参数, kwargs,
设备):
模块 =
模块类(*
参数, **kwargs)
如果 not isinstance(
模块, nn.
模块):
提升 ValueError(
"期望 `module_cls(*args, **kwargs)` 返回一个 类型的实例,"
f"但它返回了一个实例的 "{
类型(
模块)}
。
)
模块.
到(
设备)
返回
模块
def _create_module_with_interface(
module_cls, 参数, kwargs,
设备,
模块接口类
):
模块 =
_创建模块(
模块类,
参数, kwargs,
设备)
如果
模块接口类
是 not
无:
模块 =
火炬.
算子.
脚本(
模块)
返回 rpc.RRef(
模块,
模块接口类)
def _param_rrefs(模块_rref,
递归) ->
列表[rpc.RRef[
参数
]]
返回:
列表[rpc.RRef[
参数]] = [
rpc.RRef(参数)
为
参数
在
模块_rref.
本地值().
参数(
递归)
]
返回
返回
def 不支持则抛出异常(
名称:
字符串) ->
无:
提升 ValueError(f
方法 ``{
名称}
`` 不支持远程模块)
类
远程模块(nn.
模块):
def __new__(类, *
参数, **kwargs):
使用 __new__ 进行日志记录。
火炬._C._log_api_usage_once("torch.distributed.nn.api.remote_module")
返回
超级().__new__(
类)
def 初始化(
自身,
远程设备:
字符串,
模块类:
类型[nn.
模块
]
参数:
可选[
元组] =
无,
kwargs: 可选[
字典[
字符串,
任意]] =
无,
_模块接口类:
任何 =
无,
):
""
远程模块实例只能在 RPC 初始化后创建。
它在指定的远程节点上创建用户指定的模块。
它的行为类似于一个常规的 `nn.Module`,除了 `forward` 方法是在远程节点上执行。
它负责自动记录梯度,以确保反向传播将梯度传播回相应的远程模块。
它负责自动记录梯度,以确保反向传播将梯度传播回相应的远程模块。
它负责自动记录梯度,以确保反向传播将梯度传播回相应的远程模块。
可以使用 `RPC 框架 `__ 在处理器之间共享,
而不会产生任何复制实际模块的开销,
这相当于一个指向远程模块的 :class:`~torch.distributed.rpc.RRef`,
指针。
``forward_async`` 和 ``forward`` 的参数与模块返回的 ``forward`` 方法的参数相同。
除了 ``forward_async`` 和 ``forward``,目前不支持从 nn.Module 中使用其他方法。
尤其是创建混合模型时,通常应该使用本地模块。
特别地,为了创建混合模型,通常应该使用本地模块。
在远程模块外部创建,而不是作为任何远程模块的子模块(通过调用 `add_module`)。
混合示例:
>>> class HybridModel(nn.Module):
>>> def __init__(self) -> None:
>>> nn.Module.__init__(self)
>>> self.remote_embedding = RemoteModule(...)
>>> self.local_linear = nn.Linear(...)
例如,如果 ``module_cls`` 返回 ``nn.Linear`` 的实例,
具有``forward``方法签名的,``def forward(input: Tensor) -> Tensor:``
生成的``RemoteModule``将具有 2 个方法签名的
``def forward(input: Tensor) -> Tensor:``和
``def forward_async(input: Tensor) -> Future[Tensor]:``。
.. 注意::
如果远程模块放置在 CUDA 设备上,
任何输入的 CPU 张量将被自动移动到相同的 CUDA 设备,
并且通过 TensorPipe RPC 后端,GPU 张量将根据远程工作者的设备映射返回。
参数:
remote_device (str): 我们希望放置此模块的目标工作者上的设备。
设备可以是本地设备或由以下之一指定的远程设备
格式:
1. "rank:/"(例如:"rank:0/cuda:0")。
2. "/"(例如:"trainer0/cuda:0")。
此外,设备字段可以是可选的,默认值为 "cpu"。
模块类(nn.Module):例如,
>>> 类 MyModule(nn.Module):
>>> def forward(input):
>>> 返回 input + 1
...
>>> module_cls = MyModule
args (Sequence, optional): 要传递给 ``module_cls`` 的参数。
kwargs (Dict, optional): 要传递给 ``module_cls`` 的关键字参数。
`_module_interface_cls (类型, 可选): 创建模块的 TorchScript 接口类型。类型对象应由@torch.jit.interface 装饰器装饰。`
要创建的模块。类型对象应由@torch.jit.interface 装饰器装饰。
如果未提供,生成的 RemoteModule 不支持 TorchScript。
警告:这是一个实验性 API,可能会频繁更改。
返回:
一个远程模块实例,它封装了由 :class:`~nn.Module` 创建的
用户提供的 `module_cls`,它有一个阻塞的 `forward` 方法
异步的 `forward_async` 方法返回 `forward` 调用的未来
在用户提供的远程模块上。
示例::
在两个不同的进程中运行以下代码:
>>> # xdoctest: +SKIP("distributed")
>>> # 在工作节点 0 上:
>>> 导入 torch
>>> 导入 torch.distributed.rpc 作为 rpc
>>> 从 torch 导入 nn 和 Tensor
>>> 从 torch.distributed.nn.api.remote_module 导入 RemoteModule
...
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> remote_linear_module = RemoteModule(
>>> "worker1/cpu", nn.Linear, args=(20, 30),
>>> )
>>> input = torch.randn(128, 20)
>>> ret_fut = remote_linear_module.forward_async(input)
>>> ret = ret_fut.wait()
>>> rpc.shutdown()
>>> # 在 worker 1 上:
>>> 导入 torch
>>> 导入 torch.distributed.rpc 作为 rpc
...
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
超级().
初始化()
enable_moving_cpu_tensors_to_cuda = 自身.
_准备初始化(
远程设备)
# 默认参数准备。
args = args 如果 args
是 not None
否则 ()
kwargs = kwargs 如果 kwargs
是 not None
否则 {}
如果
_模块接口类
是 not
无:
# 用户通过此字段了解生成的 RemoteModule 是否支持 TorchScript。
自身.is_scriptable =
真实
# 在远程端实例化模板。
fut = rpc.rpc 异步(
自身.
开启,
实例化模板,
(模块接口类,
启用将 CPU 张量移动到 CUDA),
)
自身._init_template(
_module_interface_cls, 启用将 CPU 张量移动到 CUDA
)
在远程端实例化模板。
fut = rpc.rpc 异步(
自身.
开启,
实例化模板,
(模块接口类,
启用将 CPU 张量移动到 CUDA),
)
在远程端创建模块。
fut.等待()
确保远程端有 remote_module_cls。
# TODO: 我们需要将其更改为 rpc.remote,并使其异步(参见下面的 else 分支)。
为了做到这一点,我们需要能够将 _module_interface_cls 应用到 rpc.remote 返回的 RRef 上。
请参阅 https://github.com/pytorch/pytorch/issues/58098 以获取更多上下文信息。
自身.module_rref = rpc.
rpc 同步(
自身.
开启,
_create_module_with_interface,
(模块类,
参数, kwargs,
自身.
设备,
_模块接口类),
)
否则:
自身.
可脚本化 =
假
自身.
生成方法 = (
_不可脚本化远程模块模块.
_生成方法
)
# 在远程端创建模块。
自身.module_rref = rpc.
远程(
自身.
开启,
创建模块,
(模块类,
参数, kwargs,
自身.
设备),
)
自身.
安装生成的方法()
自身.
检查属性的可序列化性()
def 远程参数(
自身,
递归:
布尔类型 = True) ->
列表[rpc.RRef[
参数
]]
""
返回指向远程模块参数的 :class:`~torch.distributed.rpc.RRef` 列表。
这通常可以与 ...
使用 :class:`~torch.distributed.optim.DistributedOptimizer`.
参数:
递归(布尔值):如果为 True,则返回远程参数
模块及其所有子模块。否则,
仅返回直接成员参数
远程模块。
返回:
class:`~torch.distributed.rpc.RRef`(`List[RRef[nn.Parameter]]`)的列表。
用于远程模块的参数。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
返回 rpc.
rpc 同步(
自身.
在, _param_rrefs,
参数=(
自身.module_rref,
递归))
def get_module_rref(自身) -> rpc.RRef[nn.
模块
]:
"""返回一个指向远程模块的 :class:`~torch.distributed.rpc.RRef` (``RRef[nn.Module]``) 对象。"""
返回
自身.
模块_rref
@torch.算子.
导出
def __getstate__(自身):
提升
运行时错误(
"无法在 Python pickler 中序列化 RemoteModule。RemoteModule 只能在使用 RPC 时进行序列化"
)
@torch.算子.
导出
def __setstate__(自身,
状态):
提升
运行时错误(
"无法在 Python pickler 中反序列化 RemoteModule。RemoteModule 只能在使用 RPC 时反序列化"
)
def 注册缓冲区(
自身,
名称:
字符串,
张量:
可选[
张量
]
持久性:
布尔类型 =
真实
) -> 无:
_不支持抛出异常(
自身.
注册缓冲区.__name__)
def 注册参数(
自身,
名称:
字符串,
参数:
可选[
参数]) ->
无:
_不支持抛出异常(
自身.
注册参数.__name__)
def 添加模块(
自身,
名称:
字符串,
模块:
可选[
模块]) ->
无:
_不支持抛出异常(
自身.
添加模块.__name__)
def 应用(
自身: T,
函数:
可调用[[
模块
]
无]) -> T: # type: ignore[return]
不支持时抛出异常(
自身.
应用.__name__)
def cuda(自身: T,
设备:
可选[
联盟[int,
设备]] =
无) -> T: # type: ignore[return]
不支持时抛出异常(
自身.cuda.__name__)
def ipu(自身: T,
设备:
可选[
联盟[int,
设备]] =
无) -> T: # type: ignore[return]
不支持的操作(
自身.ipu.__name__)
def xpu(自身: T,
设备:
可选[
联盟[int,
设备]] =
无) -> T: # type: ignore[return]
不支持时抛出异常(
自身.xpu.__name__)
def cpu(自身: T) -> T: # type: ignore[return]
不支持时抛出异常(
自身.cpu.__name__)
def 类型(
自身: T,
目标类型:
联盟[
数据类型,
字符串]) -> T: # type: ignore[return]
不支持时抛出异常(
自身.
类型.__name__)
def float(自身: T) -> T: # type: ignore[return]
不支持时抛出异常(
自身.float.__name__)
def double(自身: T) -> T: # type: ignore[return]
不支持时抛出异常(
自身.double.__name__)
def 半(
自身: T) -> T: # type: ignore[return]
不支持则抛出异常(
自身.
半.__name__)
def bfloat16(自身: T) -> T: # type: ignore[return]
不支持则抛出异常(
自身.bfloat16.__name__)
def 到(
自身, *
参数, **kwargs) -> T:
# 类型忽略[misc, return, type-var]
不支持则抛出异常(
自身.
到.__name__)
def 注册反向钩子( # type: ignore[return]
自身, hook:
可调用[[
模块, _grad_t, _grad_t
]
联盟[
无, _grad_t]]
) -> 可移除句柄:
_引发不支持(
自身.
注册反向钩子.__name__)
def 注册前向钩子( # type: ignore[return]
自身,
hook: 联盟[
可调用[[T,
元组[
任意, ...]],
可选[
任意]],
可调用[
[T, 元组[
任意, ...
]
字典[
字符串,
任意]],
可选[
元组[
任意,
字典[
字符串,
任意
]],
]
]
预先添加:
布尔类型 =
错误,
with_kwargs: 布尔类型 =
错误,
) -> 可移除句柄:
不支持时抛出异常(
自身.
注册前向钩子.__name__)
def 注册前向钩子(
# 类型:忽略[返回,覆盖]
自身,
hook: 联盟[
可调用[[T,
元组[
任意, ...
]
任意
]
可选[
任意]],
可调用[[T,
元组[
任意, ...
]
字典[
字符串,
任意
]
任意
]
可选[
任意]],
]
预先添加:
布尔类型 =
错误,
with_kwargs: 布尔类型 =
错误,
) -> 可移除句柄:
不支持的操作(
自身.
注册前向钩子.__name__)
def state_dict(自身, *
参数, **kwargs):
不支持此功能(
自身.state_dict.__name__)
def 加载状态字典(
自身,
state_dict: 映射[
字符串,
任意
]
严格的:
布尔类型 = True,
分配:
布尔类型 =
错误,
):
不支持的操作(
自身.
加载状态字典.__name__)
def 参数(
自身,
递归:
布尔类型 = True) ->
迭代器[
参数
]:
提升 ValueError(
"RemoteModule 不支持``parameters``方法。请使用``remote_parameters``代替。"
)
def 命名参数。( # type: ignore[return]
自身,
前缀:
字符串 = "",
递归:
布尔类型 = True,
删除重复项:
布尔类型 =
真实
) -> 迭代器[
元组[
字符串,
参数
]]
不支持的操作(
自身.
命名参数。.__name__)
def 缓冲区(
自身,
递归:
布尔类型 = True) ->
迭代器[
张量
]: # type: ignore[return]
不支持的操作(
自身.
缓冲区.__name__)
def 命名缓冲区( # type: ignore[return]
自身,
前缀:
字符串 = "",
递归:
布尔类型 = True,
删除重复项:
布尔类型 =
真实
) -> 迭代器[
元组[
字符串,
张量
]]
不支持的操作(
自身.
命名缓冲区.__name__)
def 儿童(
自身) ->
迭代器[
模块
]: # type: ignore[return]
不支持的操作(
自身.
儿童.__name__)
def 命名子项(
自身) ->
迭代器[
元组[
字符串,
模块
]] # type: ignore[return]
不支持时抛出异常(
自身.
命名子项.__name__)
def 模块(
自身) ->
迭代器[
模块
]: # type: ignore[return]
不支持的操作(
自身.
模块.__name__)
def 命名模块(
自身,
描述:
可选[
集合[
模块]] =
无,
前缀:
字符串 = "",
删除重复项:
布尔类型 = True,
):
不支持时抛出异常(
自身.
命名模块.__name__)
def 训练(
自身: T,
模式:
布尔类型 = True) -> T:
返回
自身.
模块_rref.
rpc 同步().
训练()
# 类型:忽略[运算符,联合属性]
def 评估(
自身: T) -> T:
返回
自身.
模块_rref.
rpc 同步().
评估()
# 类型:忽略[运算符,联合属性]
def 需要梯度_(
自身: T,
需要梯度:
布尔类型 = True) -> T: # type: ignore[return]
不支持的操作(
自身.
需要梯度_.__name__)
def 零梯度(
自身,
设置为 None:
布尔类型 = True) ->
无:
不支持的操作(
自身.
零梯度.__name__)
def 共享内存(
自身: T) -> T: # type: ignore[return]
不支持的操作(
自身.
共享内存.__name__)
def 额外表示(
自身) ->
字符串: # type: ignore[return]
不支持的操作(
自身.
额外表示.__name__)
def _准备初始化(
自身,
远程设备字符串:
字符串) ->
布尔:
"""准备初始化并返回是否自动将 CPU 张量移动到 CUDA 设备。"""
# 疑难杂症检查。
断言 rpc.
当前 RPC 代理是否已设置(),
RPC 模块仅适用于远程调用。
远程设备 =
_远程设备(
远程设备字符串)
自身.
开启 = (
远程设备.
工人名称()
如果
远程设备.
工人名称()
是 not None
否则
远程设备.
排名()
)
自身.
设备 =
字符串(
远程设备.
设备())
代理 = rpc.
获取当前 RPC 代理()
# 如果设置了远程工作者的设备映射,
# 则启用将任何输入 CPU 张量移动到相同的 CUDA 设备。
自身.is_device_map_set =
布尔(
代理._get_device_map(
代理.
获取工人信息(
自身.
开)) # type: ignore[arg-type]
)
``enable_moving_cpu_tensors_to_cuda`` 的限制比 ``is_device_map_set`` 更宽松:
如果 ``enable_moving_cpu_tensors_to_cuda`` 为真,但设备映射未设置,
则任何 CPU 张量仍然可以被移动到 CUDA 设备上运行前向操作,
但输出必须在通过网络发送之前被移动回 CPU。
启用将 CPU 张量移动到 CUDA =
火炬.
设备(
自身.
设备).
类型 == "cuda"
返回
启用将 CPU 张量移动到 CUDA
def _初始化模板(
自身,
模块接口类,
启用将 CPU 张量移动到 CUDA):
在本地端实例化模板。
生成模块 =
实例化器.
实例化可脚本化远程模块模板(
模块接口类,
启用将 CPU 张量移动到 CUDA
)
自身.
生成的方法 =
生成模块.
_生成方法
def _检查属性可序列化性(
自身):
检查所有属性是否已明确定义是否可序列化(即,可序列化性)。
为 k
在
自身.
字典.
键():
如果 (
k not 在
远程模块已选择属性
和 k not
在
远程模块忽略序列化属性
):
抛出异常
属性错误(
f属性{k}
必须在 ``_REMOTE_MODULE_PICKLED_ATTRIBUTES`` 或 "
"``_REMOTE_MODULE_ATTRIBUTES_IGNORE_FOR_PICKLING``."
)
def _install_generated_methods(自身):
为
方法
在
自身.
生成方法:
方法名 =
方法.__name__
方法 =
火炬.
算子.
导出(
方法)
setattr(自身,
方法名称,
类型.
方法类型(
方法,
自身))
@staticmethod
def 从模块初始化_rref(
远程设备:
字符串,
模块_rref: rpc.RRef[nn.
模块
]
_模块接口类:
任何 =
无,
):
""
除了构造函数之外,还可以使用模块 RRef 来初始化 RemoteModule 实例。
这种备选初始化方法在我们要创建多个时特别有用
远程模块实例共享相同的底层模块并减少内存消耗。
此外,这也提供了一个通过 RPC 传递脚本 RemoteModule 的解决方案
不支持。推荐的做法如下:
1. 发送方创建一个 RemoteModule;
2. 发送方通过 RPC 发送其`module_rref`;
3. 接收方调用此方法使用相同的`module_rref`初始化另一个 RemoteModule。
示例::
在两个不同的进程中运行以下代码:
>>> # xdoctest: +SKIP("distributed")
>>> # 在工作节点 0 上:
>>> 导入 torch
>>> 导入 torch.distributed.rpc 作为 rpc
>>> 从 torch 导入 nn 和 Tensor
>>> 从 torch.distributed.nn.api.remote_module 导入 RemoteModule
...
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> remote_module = RemoteModule(
>>> "worker1/cpu", nn.Linear, 参数=(20, 30),
>>> )
...
>>> remote_module1 = rpc.rpc_sync(
>>> "worker1/cpu",
>>> RemoteModule.init_from_module_rref,
>>> ("worker1/cpu", remote_module1.get_module_rref()),
>>> )
>>> rpc.shutdown()
>>> # 在 worker 1 上:
>>> 导入 torch
>>> 导入 torch.distributed.rpc 作为 rpc
...
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
参数:
remote_device (str): 目标工作器上的设备,我们希望在此放置此模块。
设备可以是本地设备或由以下之一指定的远程设备
格式:
1. "rank:/<设备>"(例如:"rank:0/cuda:0")。
2. "<工作名称>/<设备>"(例如:"trainer0/cuda:0")。
此外,设备字段是可选的,默认值为 "cpu"。
module_rref (RRef[nn.Module]): 调用者和创建的远程模块共享的模块引用
创建的远程模块。
_module_interface_cls (type, optional): 要创建的模块的 TorchScript 接口类型。类型对象应由@torch.jit.interface 装饰。
类型对象应由@torch.jit.interface 装饰。
如果未提供,生成的 RemoteModule 无法进行 torchscript 化。
警告,这是一个实验性 API,可能频繁变更。
返回:
一个远程模块实例,它封装了用户提供的:class:`~nn.Module`。
它具有阻塞的`forward`方法和一个
异步的 `forward_async` 方法返回 `forward` 调用的未来
在远程端提供的用户模块上。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
# 注意:如果此类中添加了新属性,也需要将其添加到 `_REMOTE_MODULE_PICKLED_ATTRIBUTES` 中,以便进行序列化和反序列化。
# 到 ``_REMOTE_MODULE_PICKLED_ATTRIBUTES`` 以进行序列化和反序列化。
远程模块 =
对象.__new__(RemoteModule)
启用将 CPU 张量移动到 CUDA =
远程模块.
准备初始化(
远程设备)
如果
_模块接口类
是 not
无:
# 用户通过此字段了解生成的 RemoteModule 是否支持 TorchScript。
远程模块.
可脚本化 =
真实
远程模块.
_初始化模板(
_模块接口类,
启用将 CPU 张量移动到 CUDA
)
否则:
远程模块.
可脚本化 =
假
远程模块.
生成方法 = (
_不可脚本化远程模块模块.
_生成方法
)
远程模块.
模块_rref =
模块_rref
远程模块.
安装生成方法()
远程模块.
_检查属性可序列化性()
返回
远程模块
[文档]
类 RemoteModule(
_远程模块):
""
一个远程模块实例只能在 RPC 初始化之后创建。
它在指定的远程节点上创建一个用户指定的模块。
它的行为类似于常规的 `nn.Module`,除了 `forward` 方法是
在远程节点上执行。
它负责自动微分记录,以确保反向传播
梯度返回相应的远程模块。
它基于此生成了两个方法 `forward_async` 和 `forward`
`forward` 方法的 `module_cls` 签名。`forward_async`
异步运行并返回一个 Future。`forward_async` 和 `forward` 的参数与模块 `module_cls` 返回的 `forward` 方法的参数相同
和 `forward` 相同
由 `module_cls` 返回的模块 `forward` 方法
例如,如果 `module_cls` 返回 `nn.Linear` 的实例,
那么它将具有 `forward` 方法签名:`def forward(input: Tensor) -> Tensor:`,
生成的 `RemoteModule` 将具有以下两个方法签名:
|
def forward(input: Tensor) -> Tensor:
``def forward_async(input: Tensor) -> Future[Tensor]:``
参数:
remote_device (str): 目标工作者上的设备,我们希望放置此模块。
格式应为 "/",其中设备字段可以解析为 torch.device 类型。
例如,"trainer0/cpu","trainer0","ps0/cuda:0"。
此外,设备字段是可选的,默认值为 "cpu"。
module_cls (nn.Module):要创建的远程模块的类。例如,
>>> class MyModule(nn.Module):
>>> def forward(input):
>>> 返回 input + 1
...
>>> module_cls = MyModule
args (Sequence, optional): 要传递给 ``module_cls`` 的参数。
kwargs (Dict, optional): 要传递给 ``module_cls`` 的关键字参数。
返回:
一个远程模块实例,它封装了由 :class:`~nn.Module` 创建的
用户提供的 `module_cls`,它有一个阻塞的 `forward` 方法
异步的 `forward_async` 方法返回 `forward` 调用的未来
在用户提供的远程模块上。
示例::
在两个不同的进程中运行以下代码:
>>> # xdoctest: +SKIP("distributed")
>>> # 在工作节点 0 上:
>>> 导入 torch
>>> 导入 torch.distributed.rpc 作为 rpc
>>> 从 torch 导入 nn 和 Tensor
>>> 从 torch.distributed.nn.api.remote_module 导入 RemoteModule
...
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> remote_linear_module = RemoteModule(
>>> "worker1/cpu", nn.Linear, args=(20, 30),
>>> )
>>> input = torch.randn(128, 20)
>>> ret_fut = remote_linear_module.forward_async(input)
>>> ret = ret_fut.wait()
>>> rpc.shutdown()
>>> # 在 worker 1 上:
>>> 导入 torch
>>> 导入 torch.distributed.rpc 作为 rpc
...
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
此外,一个更实用的例子,它结合了
`DistributedDataParallel <https://maskerprc.github.io/docs/stable/nn.html#torch.nn.parallel.DistributedDataParallel>`__ (DDP)
可以在这个 `教程 `__ 中找到。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 初始化(
自身,
远程设备:
字符串,
模块类:
类型[nn.
模块
]
参数:
可选[
元组] =
无,
kwargs: 可选[
字典[
字符串,
任意]] =
无,
):
超级().
初始化(
远程设备,
模块类,
参数, kwargs)
def 远程模块接收器(
*远程模块序列化属性,
):
反序列化 RemoteModule。
序列化远程模块 =
序列化远程模块.
_创建(
远程模块序列化属性
)
m = 对象.__new__(RemoteModule)
m.字典.
更新(
序列化远程模块._asdict())
# 在反序列化属性 `module_rref` 时必须调用 RRef 的 `_deserialize()` 方法。
m.module_rref = rpc.PyRRef.反序列化(m.
模块_rref)
# 在反序列化时安装生成的函数。
为
方法
在 m.generated_methods:
方法名 =
方法.__name__
方法 =
火炬.
算子.
导出(
方法)
setattr(m, 方法名称,
类型.
方法类型(
方法, m))
返回 m
def _远程模块还原器(
远程模块):
序列化一个远程模块。
序列化属性 = {}
为 k, v
在
远程模块.
字典.
项目():
将属性 `module_rref` 序列化必须调用 RRef 的 `_serialize()` 方法。
如果 k ==
module_rref:
pickled_attrs[k] = v.序列化()
elif k 在 _REMOTE_MODULE_PICKLED_ATTRIBUTES:
pickled_attrs[k] = v
# 检查反序列化后的属性是否都在_REMOTE_MODULE_ATTRIBUTES_IGNORE_FOR_PICKLING 中。
elif k not 在 _REMOTE_MODULE_ATTRIBUTES_IGNORE_FOR_PICKLING:
打印(
f"新的属性 ``{k}
`` 在 RPC 序列化过程中被忽略。"
"要序列化此属性,请将其添加到 ``_REMOTE_MODULE_PICKLED_ATTRIBUTES``。"
"否则,请明确将其添加到 ``_REMOTE_MODULE_ATTRIBUTES_IGNORE_FOR_PICKLING`` 中。",
文件=
系统.
标准错误输出,
)
返回 (
_远程模块接收器,
元组(pickled_attrs.
值()),
)
def _递归脚本模块接收器(
递归脚本模块序列化,
):
反序列化一个不包含脚本远程模块的递归脚本模块。
f = 输入/输出.BytesIO(
递归脚本模块序列化)
m = 火炬.
算子.
加载(f)
返回 m
def _递归脚本模块减少器(
递归脚本模块):
序列化一个不包含脚本远程模块的 RecursiveScriptModule,否则引发错误。
如果
有属性(
递归脚本模块._c, "module_rref"):
提升
运行时错误(
"不支持通过 RPC 传递脚本 RemoteModule。请创建一个发送方的 RemoteModule,"
"将`module_rref`发送到接收方,并在接收方通过此`module_rref`创建一个新的实例。"
)
f = 输入/输出.BytesIO()
火炬.
算子.
保存(
递归脚本模块, f)
返回 (
_递归脚本模块接收器, (f.
获取值(),))
内部 RPC 序列化器.
_注册 reducer(
远程模块,
_远程模块 reducer)
内部 RPC 序列化器.
注册 reducer(
火炬.
算子.
递归脚本模块,
递归脚本模块 reducer
)