快捷键

torch.distributed.algorithms.join 的源代码

# mypy: 允许未类型化定义
导入 警告
from abc 导入 ABC, 抽象方法
from 类型 导入 TracebackType
from 打字 导入 任何, 命名元组, 可选

导入 火炬
导入 torch.distributed  dist


全部 = ["JoinHook", "可加入的", "加入"]


[文档]class JoinHook: 这是一个连接钩子定义,它为连接上下文管理器提供了两个入口点。 入口点:一个主钩子,在存在非连接进程时被反复调用,以及一个后钩子,在所有进程都连接后调用一次。 入口点:一个主钩子,在存在非连接进程时被反复调用,以及一个后钩子,在所有进程都连接后调用一次。 入口点:一个主钩子,在存在非连接进程时被反复调用,以及一个后钩子,在所有进程都连接后调用一次。 实现一个通用连接上下文管理器的连接钩子,定义一个 类继承自 :class:`JoinHook` 并重写 ``main_hook()`` 方法 根据适当情况调用 `post_hook()`。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ```
[文档] def main_hook(self) -> None: r"""在训练迭代中存在未连接的进程以阴影集体通信时调用此钩子。 训练迭代,即在一次正向传递、反向传递和优化器步骤中。 """
[文档] def post_hook(self, is_last_joiner: bool) -> None: r""" 在所有进程都加入后调用钩子。 它传递一个额外的 ``bool`` 参数 ``is_last_joiner``,表示该排名是否是最后加入的之一。 参数: is_last_joiner (bool): ``True`` 如果排名是最后加入的之一;否则 ``False``。 is_last_joiner (bool): ``True`` 如果排名是最后加入的之一;否则 ``False``。 """
[文档]class Joinable(ABC): r""" 这定义了一个用于可连接类的抽象基类。 一个可连接类 (继承自 :class:`Joinable`)应实现 :meth:`join_hook`, 返回一个 :class:`JoinHook` 实例,此外 :meth:`join_device` 和 :meth:`join_process_group` 返回设备 处理组信息,分别。 """ @abstractmethod def __init__(self) -> None: super().__init__() self._join_config = _JoinConfig.construct_disabled_join_config()
[文档] @abstractmethod def join_hook(self, **kwargs) -> JoinHook: r""" 为给定的 :class:`Joinable` 返回一个 :class:`JoinHook` 实例 参数: kwargs (dict): 一个 :class:`dict`,包含任何关键字参数 以在运行时修改连接钩子的行为;所有 具有相同连接上下文的 :class:`Joinable` 实例 管理员被转发相同的 `kwargs` 值。 """ ...
@property @abstractmethod def join_device(self) -> torch.device: r"""从 join 上下文管理器所需的集体通信中返回设备。""" ... @property @abstractmethod def join_process_group(self) -> Any: r"""返回 join 上下文管理器自身所需的集体通信的进程组。""" ...
_JoinConfig(命名元组): r"""这包括 join 上下文管理器端所需的从 :class:`Joinable` 实例中需要的所有字段。""" 启用: 布尔 throw_on_early_termination: 布尔 是否可加入: 布尔 @staticmethod def 构建禁用加入配置(): r返回一个指示应禁用加入相关逻辑的 :class:`_JoinConfig` 实例。 例如,如果调用者不在加入上下文管理器中。 "文档" 返回 加入配置( 启用=假的, 在早期终止时抛出异常=假的, 是否可加入= )
[文档] 加入: r"" 这个类定义了通用的连接上下文管理器,它允许在进程连接后调用自定义钩子。 这些钩子应该作为影子 集体通信以防止未加入进程的挂起 错误处理并确保算法正确性。参考::class:`JoinHook` 关于钩子定义的详细信息。 .. 警告:: 上下文管理器要求每个参与的 :class:`Joinable` 在其自己的迭代集体通信之前调用 :meth:`notify_join_context()` 方法,以确保正确性。 调用方法 :meth:`notify_join_context()`,以确保正确性。 调用方法 :meth:`notify_join_context()`,以确保正确性。 .. 警告:: 上下文管理器要求所有 `process_group` 属性 这些 :class:`JoinHook` 对象是相同的。如果有多个 :类:`JoinHook` 对象,然后使用第一个的 ``device``。 进程组与设备信息用于检查非 进程合并,并通知进程抛出异常 启用`throw_on_early_termination`,两者都使用全部- 减少。 参数: 参与项(列表[Joinable]):一个参与项列表 `Joinable` 类的 s;它们的钩子按给定顺序迭代 的顺序。 enable(布尔值):一个启用不均匀输入检测的标志;设置为 ``False`` 将禁用上下文管理器的功能,并且应该 仅当用户知道输入不会不均匀时才设置 (默认:``True``)。 throw_on_early_termination(布尔值):一个控制是否在检测到不均匀输入时抛出异常的标志(默认:``False``)。 在检测到不均匀输入时抛出异常(默认:``False``)。 示例:: >>> 导入 os >>> 导入 torch >>> 导入 torch.distributed 作为 dist >>> 导入 torch.multiprocessing 模块 >>> # xdoctest: +SKIP >>> 导入 torch.nn.parallel.DistributedDataParallel 模块 >>> 导入 torch.distributed.optim.ZeroRedundancyOptimizer 模块 >>> 从 torch.distributed.algorithms.join 导入 Join ... >>> # 在每个创建的进程中 >>> def worker(rank): >>> dist.init_process_group("nccl", rank=rank, world_size=2) >>> model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank]) >>> optim = ZeRO(model.parameters(), torch.optim.Adam, lr=0.01) >>> # Rank 1 获取比 rank 0 多一个输入 >>> inputs = [torch.tensor([1.]).to(rank) for _ in range(10 + rank)] >>> with Join([model, optim]): >>> for 输入 in 输入列表: >>> 损失 = 模型(输入).求和() >>> 损失.反向传播() >>> 优化器.步进() >>> # 所有等级都到达这里而不会挂起/出错 "文档" def __init__( , 可加入的: 列表[可加入的], 启用: 布尔 = True, 在提前终止时抛出: 布尔 = 假的, **kwargs, ): if 长度(可连接项) == 0: raise ValueError("该连接上下文管理器至少需要一个可连接项") ._joinables = 可连接项 .加入钩子 = [ 可加入的.加入钩(**kwargs) 可加入的 ._可连接项 ] ._启用 = 启用 ._在提前终止时抛出 = 在提前终止时抛出 ._设置可连接配置() ._提取距离信息() def _设置可连接配置() -> : r设置每个参与 :class:`Joinable` 的 :class:`_JoinConfig`。 断言 长度(.可连接项) > 0 是否为第一个可连接项 = 真实 可连接项 .可连接项列表: 可加入的._加入配置 = _JoinConfig( 启用=._启用, 抛出早期终止=.抛出早期终止 _, 是否是第一个可加入的=是否是第一个可加入的, ) 是否可加入第一个 = def _提取距离信息() -> : r"" 从可加入项中提取进程组和设备信息。 如果有多个可加入项,那么上下文管理器将使用 首次指定的设备。 预设条件: ``self._joinables`` 不为 ``None`` 且不为空。 抛出异常: ValueError 如果存在多个冲突的 ``process_group`` 属性 在 ``Joinable`` 对象之间。 "文档" process_group = 设备 = joinable .可连接项: if 流程组 : 流程组 = 可连接的.加入进程组 elif 处理组 != 可加入.加入处理组: raise ValueError( 使用多个进程组与 join 上下文管理器 ) 如果 设备 : 设备 = 可加入的.加入设备 ._进程组 = 进程组 ._排名 = 距离.获取排名(.处理组) ._device = 设备 def __进入__(): ... def __退出__( , 类型: 可选[类型[基础异常]], : 可选[基础异常], 跟踪回溯: 可选[跟踪回溯类型], ): r"" 重复运行主钩子,直到所有进程都加入;然后运行后钩子。 抛出异常: 运行时错误 如果 `throw_on_early_termination=True`。 "文档" 如果 .启用 或者 类型: 返回 直接传播抛出的异常(如果有的话) 所有进程已连接 = 是最后加入的 = 真实 i = 0 警告阈值 = 1000 警告.简单过滤器(一次) 所有进程连接: 如果 i > 警告阈值: 警告.警告( "检测到输入倾斜不均大于" f"{警告阈值}。这意味着排名" f"{._rank}至少有{警告阈值} " f"比其他当前活跃的排名少一些输入。" "这种偏差可能导致性能问题。" "训练过程中的退化。" ) # 遮蔽非连接进程中的 all-reduce num_nonjoined_procs = ._get_num_nonjoined_procs() 如果 未加入的进程数 == 0: 所有进程已加入 = 真实 否则: 如果 .在提前终止时抛出异常: .通知进程终止() 运行主钩子 for join_hook ._加入钩子: join_hook.主钩子() 是否为最后一个加入者 = i += 1 # 运行后钩子 for 加入钩子 ._join_hooks: join_hook.后钩(is_last_joiner) def _get_num_nonjoined_procs(): r追踪非加入进程中的所有 reduce 操作,以返回非加入进程的数量。 非加入进程数 = 火把.零值(1, 设备=.设备) 距离.全量归约(非加入进程数, 群组=._进程组) 返回 num_nonjoined_procs.项目() def _notify_procs_to_terminate(): r计划一次 all-reduce 操作,以通知未加入进程终止。 同时抛出一个``RuntimeError``异常,表示当前进程已耗尽其输入。 "文档" ones = torch.(1, 设备=.设备) 距离.全量归约(, 群组=.处理组) raise 运行时错误(f排名{.排名}已耗尽所有输入。)
[文档] @staticmethod def 通知加入上下文(可加入的: 可加入的): r"" 通知加入上下文管理器,调用进程尚未加入。 然后,如果 `throw_on_early_termination=True`,则检查是否检测到不均匀的输入 (即如果某个进程已经加入)并抛出异常。 该方法应在从 :class:`Joinable` 对象中调用之前 它的每迭代集体通信。例如,这应该 前向传递开始时被调用 class:`DistributedDataParallel` 只有第一个传入上下文的 :class:`Joinable` 对象 管理器在此方法中执行集体通信,并且 对于其他人来说,这种方法是空洞的。 参数: 可连接的(Joinable):调用此的 :class:`Joinable` 对象 方法。 返回: 一个用于全量减少的异步工作句柄,用于通知上下文 管理器如果“joinable”是“尚未加入”的进程 第一个传递给上下文管理器;否则为“None”。 "文档" 断言 有属性(可加入的, "_join_config"), ( f检查是否{类型(可加入的)}构造函数调用 " "``Joinable`` 构造函数" ) 加入配置 = 可加入._加入配置_ # 第一个可加入负责集体通讯 if 加入配置.是否可加入第一个 或者 加入配置.启用: 返回 设备 = 可加入的.加入设备 处理组 = 可加入的.加入进程组 # 安排一个 all-reduce 操作,表示调用者尚未加入 ones = 火把.(1, 设备=设备) 工作 = 距离.全量归约(, 群组=进程组, async_op=) 如果 加入配置.在早期终止时抛出异常: 检查是否检测到不均匀的输入 零值 = 火炬.(1, 设备=设备) 距离.全量归约(零值, 群组=进程组) 应该抛出异常 = 零值.项目() 如果 应该抛出异常: raise 运行时错误( "检测到至少有一个等级耗尽了输入。" "跨所有等级抛出异常。" ) 返回 工作

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源