torch.distributed.algorithms.join 的源代码
# mypy: 允许未类型化定义
导入
警告
from abc 导入 ABC,
抽象方法
from 类型
导入 TracebackType
from 打字
导入
任何,
命名元组,
可选
导入
火炬
导入 torch.distributed
是 dist
全部 = ["JoinHook",
"可加入的",
"加入"]
[文档]class JoinHook:
这是一个连接钩子定义,它为连接上下文管理器提供了两个入口点。
入口点:一个主钩子,在存在非连接进程时被反复调用,以及一个后钩子,在所有进程都连接后调用一次。
入口点:一个主钩子,在存在非连接进程时被反复调用,以及一个后钩子,在所有进程都连接后调用一次。
入口点:一个主钩子,在存在非连接进程时被反复调用,以及一个后钩子,在所有进程都连接后调用一次。
实现一个通用连接上下文管理器的连接钩子,定义一个
类继承自 :class:`JoinHook` 并重写 ``main_hook()`` 方法
根据适当情况调用 `post_hook()`。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
[文档] def main_hook(self) -> None:
r"""在训练迭代中存在未连接的进程以阴影集体通信时调用此钩子。
训练迭代,即在一次正向传递、反向传递和优化器步骤中。
"""
[文档] def post_hook(self, is_last_joiner: bool) -> None:
r"""
在所有进程都加入后调用钩子。
它传递一个额外的 ``bool`` 参数 ``is_last_joiner``,表示该排名是否是最后加入的之一。
参数:
is_last_joiner (bool): ``True`` 如果排名是最后加入的之一;否则 ``False``。
is_last_joiner (bool): ``True`` 如果排名是最后加入的之一;否则 ``False``。
"""
[文档]class Joinable(ABC):
r"""
这定义了一个用于可连接类的抽象基类。
一个可连接类
(继承自 :class:`Joinable`)应实现 :meth:`join_hook`,
返回一个 :class:`JoinHook` 实例,此外
:meth:`join_device` 和 :meth:`join_process_group` 返回设备
处理组信息,分别。
"""
@abstractmethod
def __init__(self) -> None:
super().__init__()
self._join_config = _JoinConfig.construct_disabled_join_config()
[文档] @abstractmethod
def join_hook(self, **kwargs) -> JoinHook:
r"""
为给定的 :class:`Joinable` 返回一个 :class:`JoinHook` 实例
参数:
kwargs (dict): 一个 :class:`dict`,包含任何关键字参数
以在运行时修改连接钩子的行为;所有
具有相同连接上下文的 :class:`Joinable` 实例
管理员被转发相同的 `kwargs` 值。
"""
...
@property @abstractmethod def join_device(self) -> torch.device: r"""从 join 上下文管理器所需的集体通信中返回设备。""" ... @property @abstractmethod def join_process_group(self) -> Any: r"""返回 join 上下文管理器自身所需的集体通信的进程组。""" ...
类 _JoinConfig(
命名元组):
r"""这包括 join 上下文管理器端所需的从 :class:`Joinable` 实例中需要的所有字段。"""
启用:
布尔
throw_on_early_termination: 布尔
是否可加入:
布尔
@staticmethod
def 构建禁用加入配置():
r返回一个指示应禁用加入相关逻辑的 :class:`_JoinConfig` 实例。
例如,如果调用者不在加入上下文管理器中。
"文档"
返回
加入配置(
启用=
假的,
在早期终止时抛出异常=
假的,
是否可加入=
假
)
[文档]
类
加入:
r""
这个类定义了通用的连接上下文管理器,它允许在进程连接后调用自定义钩子。
这些钩子应该作为影子
集体通信以防止未加入进程的挂起
错误处理并确保算法正确性。参考::class:`JoinHook`
关于钩子定义的详细信息。
.. 警告::
上下文管理器要求每个参与的 :class:`Joinable` 在其自己的迭代集体通信之前调用 :meth:`notify_join_context()` 方法,以确保正确性。
调用方法 :meth:`notify_join_context()`,以确保正确性。
调用方法 :meth:`notify_join_context()`,以确保正确性。
.. 警告::
上下文管理器要求所有 `process_group` 属性
这些 :class:`JoinHook` 对象是相同的。如果有多个
:类:`JoinHook` 对象,然后使用第一个的 ``device``。
进程组与设备信息用于检查非
进程合并,并通知进程抛出异常
启用`throw_on_early_termination`,两者都使用全部-
减少。
参数:
参与项(列表[Joinable]):一个参与项列表
`Joinable` 类的 s;它们的钩子按给定顺序迭代
的顺序。
enable(布尔值):一个启用不均匀输入检测的标志;设置为
``False`` 将禁用上下文管理器的功能,并且应该
仅当用户知道输入不会不均匀时才设置
(默认:``True``)。
throw_on_early_termination(布尔值):一个控制是否在检测到不均匀输入时抛出异常的标志(默认:``False``)。
在检测到不均匀输入时抛出异常(默认:``False``)。
示例::
>>> 导入 os
>>> 导入 torch
>>> 导入 torch.distributed 作为 dist
>>> 导入 torch.multiprocessing 模块
>>> # xdoctest: +SKIP
>>> 导入 torch.nn.parallel.DistributedDataParallel 模块
>>> 导入 torch.distributed.optim.ZeroRedundancyOptimizer 模块
>>> 从 torch.distributed.algorithms.join 导入 Join
...
>>> # 在每个创建的进程中
>>> def worker(rank):
>>> dist.init_process_group("nccl", rank=rank, world_size=2)
>>> model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank])
>>> optim = ZeRO(model.parameters(), torch.optim.Adam, lr=0.01)
>>> # Rank 1 获取比 rank 0 多一个输入
>>> inputs = [torch.tensor([1.]).to(rank) for _ in range(10 + rank)]
>>> with Join([model, optim]):
>>> for 输入 in 输入列表:
>>> 损失 = 模型(输入).求和()
>>> 损失.反向传播()
>>> 优化器.步进()
>>> # 所有等级都到达这里而不会挂起/出错
"文档"
def __init__(
我,
可加入的:
列表[
可加入的
],
启用:
布尔 = True,
在提前终止时抛出:
布尔 =
假的,
**kwargs,
):
if 长度(
可连接项) == 0:
raise ValueError("该连接上下文管理器至少需要一个可连接项")
我._joinables =
可连接项
我.
加入钩子 = [
可加入的.
加入钩(**kwargs)
为
可加入的
在
我.
_可连接项
]
我.
_启用 =
启用
我.
_在提前终止时抛出 =
在提前终止时抛出
我.
_设置可连接配置()
我.
_提取距离信息()
def _设置可连接配置(
我) ->
无:
r设置每个参与 :class:`Joinable` 的 :class:`_JoinConfig`。
断言
长度(
我.
可连接项) > 0
是否为第一个可连接项 =
真实
为
可连接项
在
我.
可连接项列表:
可加入的.
_加入配置 = _JoinConfig(
启用=
我.
_启用,
抛出早期终止=
我.
抛出早期终止 _,
是否是第一个可加入的=
是否是第一个可加入的,
)
是否可加入第一个 =
假
def _提取距离信息(
我) ->
无:
r""
从可加入项中提取进程组和设备信息。
如果有多个可加入项,那么上下文管理器将使用
首次指定的设备。
预设条件:
``self._joinables`` 不为 ``None`` 且不为空。
抛出异常:
ValueError
如果存在多个冲突的 ``process_group`` 属性
在 ``Joinable`` 对象之间。
"文档"
process_group = 无
设备 =
无
为 joinable
在
我.
可连接项:
if 流程组
是
无:
流程组 =
可连接的.
加入进程组
elif 处理组 !=
可加入.
加入处理组:
raise ValueError(
使用多个进程组与 join 上下文管理器
)
如果
设备
是
无:
设备 =
可加入的.
加入设备
我.
_进程组 =
进程组
我.
_排名 =
距离.
获取排名(
我.
处理组)
我._device =
设备
def __进入__(
我): ...
def __退出__(
我,
类型:
可选[
类型[
基础异常]],
值:
可选[
基础异常
],
跟踪回溯:
可选[
跟踪回溯类型
],
):
r""
重复运行主钩子,直到所有进程都加入;然后运行后钩子。
抛出异常:
运行时错误
如果 `throw_on_early_termination=True`。
"文档"
如果
不
我.
启用
或者
类型:
返回
直接传播抛出的异常(如果有的话)
所有进程已连接 =
假
是最后加入的 =
真实
i = 0
警告阈值 = 1000
警告.
简单过滤器(
一次)
当
不
所有进程连接:
如果 i >
警告阈值:
警告.
警告(
"检测到输入倾斜不均大于"
f"{警告阈值}
。这意味着排名"
f"{我._rank}
至少有{
警告阈值} "
f"比其他当前活跃的排名少一些输入。"
"这种偏差可能导致性能问题。"
"训练过程中的退化。"
)
# 遮蔽非连接进程中的 all-reduce
num_nonjoined_procs = 我._get_num_nonjoined_procs()
如果
未加入的进程数 == 0:
所有进程已加入 =
真实
否则:
如果
我.
在提前终止时抛出异常:
我.
通知进程终止()
运行主钩子
for join_hook 在
我.
_加入钩子:
join_hook.主钩子()
是否为最后一个加入者 =
假
i += 1
# 运行后钩子
for 加入钩子
在
我._join_hooks:
join_hook.后钩(is_last_joiner)
def _get_num_nonjoined_procs(我):
r追踪非加入进程中的所有 reduce 操作,以返回非加入进程的数量。
非加入进程数 =
火把.
零值(1,
设备=
我.
设备)
距离.
全量归约(
非加入进程数,
群组=
我.
_进程组)
返回 num_nonjoined_procs.
项目()
def _notify_procs_to_terminate(我):
r计划一次 all-reduce 操作,以通知未加入进程终止。
同时抛出一个``RuntimeError``异常,表示当前进程已耗尽其输入。
"文档"
ones = torch.一(1,
设备=
我.
设备)
距离.
全量归约(
一,
群组=
我.
处理组)
raise 运行时错误(f
排名{
我.
排名}
已耗尽所有输入。)
[文档] @staticmethod
def 通知加入上下文(
可加入的:
可加入的):
r""
通知加入上下文管理器,调用进程尚未加入。
然后,如果 `throw_on_early_termination=True`,则检查是否检测到不均匀的输入
(即如果某个进程已经加入)并抛出异常。
该方法应在从 :class:`Joinable` 对象中调用之前
它的每迭代集体通信。例如,这应该
前向传递开始时被调用
class:`DistributedDataParallel`
只有第一个传入上下文的 :class:`Joinable` 对象
管理器在此方法中执行集体通信,并且
对于其他人来说,这种方法是空洞的。
参数:
可连接的(Joinable):调用此的 :class:`Joinable` 对象
方法。
返回:
一个用于全量减少的异步工作句柄,用于通知上下文
管理器如果“joinable”是“尚未加入”的进程
第一个传递给上下文管理器;否则为“None”。
"文档"
断言
有属性(
可加入的, "_join_config"), (
f检查是否{
类型(
可加入的)}
构造函数调用 "
"``Joinable`` 构造函数"
)
加入配置 =
可加入.
_加入配置_
# 第一个可加入负责集体通讯
if 不
加入配置.
是否可加入第一个
或者
不
加入配置.
启用:
返回
无
设备 =
可加入的.
加入设备
处理组 =
可加入的.
加入进程组
# 安排一个 all-reduce 操作,表示调用者尚未加入
ones = 火把.
一(1,
设备=
设备)
工作 =
距离.
全量归约(
一,
群组=
进程组, async_op=
是)
如果
加入配置.
在早期终止时抛出异常:
检查是否检测到不均匀的输入
零值 =
火炬.
零(1,
设备=
设备)
距离.
全量归约(
零值,
群组=
进程组)
应该抛出异常 =
零值.
项目()
如果
应该抛出异常:
raise 运行时错误(
"检测到至少有一个等级耗尽了输入。"
"跨所有等级抛出异常。"
)
返回
工作