• 文档 >
  • 模块代码 >
  • torch >
  • torch.nn.parallel.distributed
快捷键

torch.nn.parallel.distributed 的源代码

# mypy: 允许未类型化定义
导入 复制
导入 functools
导入 检查
导入 itertools
导入 记录日志
导入 操作系统
导入 系统
导入 警告
导入 弱引用
来自 集合 导入 defaultdict, 双端队列
来自 contextlib 导入 contextmanager
来自 dataclasses 导入 数据类, 字段, 是数据类
来自 枚举 导入 自动, 枚举
来自 打字 导入 任意, 可调用, 可选, 类型检查

导入 火炬
导入 torch.distributed 作为 dist
来自 torch._utils 导入 _get_device_index
来自 torch 自动微分 导入 函数, 变量
来自 torch.distributed.algorithms.join 导入 加入, 可加入的, JoinHook
来自 torch.nn.modules 导入 模块
来自 torch.nn.parallel.scatter_gather 导入 收集, scatter_kwargs
来自 torch.utils._pytree 导入 树扁平化, tree_unflatten


RPC 可用 = 
如果 距离.是否可用():
    来自 torch.distributed.distributed_c10d 导入 (
        获取默认组,
        _rank_not_in_group,
        ReduceOp,
    )
    来自 torch.distributed.utils 导入 (
        _alloc_storage,
        _cast_forward_inputs,
        _free_storage,
        _sync_module_states,
        _to_kwargs,
        在各个流程中验证参数形状,
    )
如果 距离.rpc.是否可用():
    RPC 可用 = 真实
    来自 torch.distributed.rpc 导入 RRef

如果 类型检查:
    来自 torch.utils.hooks 导入 可移除句柄


__all__ = [分布式数据并行]

日志记录器 = 记录日志.获取日志记录器(__name__)


@dataclass
 混合精度:
    ""
配置 DDP 本地混合精度训练。

属性:
param_dtype (torch.dtype):这指定了模型的 dtype
参数,输入(当“cast_forward_inputs”被设置时)
``True``),因此用于计算的 dtype。
然而,在正向和反向传递之外,参数处于
全精度。模型检查点总是在全精度下发生
精度。
reduce_dtype (torch.dtype):这指定了梯度的数据类型
减少值,允许与 ``param_dtype`` 不同。
buffer_dtype (torch.dtype):指定缓冲区的数据类型。

.. 注意:: 此 API 为实验性,可能随时更改。

.. 注意:: 只有浮点张量会被转换为指定的数据类型。

.. note:: 将参数和缓冲区完整地保存到 `state_dict` 中
精度。

.. 注意:: 每个低精度数据类型必须明确指定。
示例,`_MixedPrecision(reduce_dtype=torch.float16)` 仅指定
降低数据类型的精度,DDP 不会进行类型转换
参数或缓冲区。

.. note:: 如果未指定 `reduce_dtype`,则梯度缩减
发生 `param_dtype` 中,如果指定或使用原始参数数据类型
否则。例如,``_MixedPrecision(param_dtype=torch.float16)``
将导致通信以 fp16 进行。
"源代码"

    param_dtype: 可选[火炬.数据类型] = 
    reduce_dtype: 可选[火炬.数据类型] = 
    缓冲区数据类型: 可选[火炬.数据类型] = 
    # TODO (rohan-varma): keep_low_precision_grads: bool = False
    # TODO (rohan-varma): 提供 API 以允许用户运行批归一化和层归一化
    # 在全精度下。对于 DDP,可以通过不执行以下操作来实现。
    # 参数类型转换用于 BN 和 LN 单元。


def _cast_buffers(混合精度配置, 根模块):
    """将缓冲区转换为指定的 ``buffer_dtype``。"""
     缓冲区  根模块.缓冲区():
        如果 有属性(缓冲区, _ddp_ignored)  缓冲区._ddp_ignored:
            继续

        缓冲区.数据 = 缓冲区.(数据类型=混合精度配置.缓冲区数据类型)


def _setup_mixed_precision_params(混合精度配置, 根模块):
    创建并释放混合精度参数的存储空间。
     参数  根模块.参数():
        不要为 DDP 忽略的参数设置混合精度。
        如果 有属性(参数, _ddp_忽略)  参数._ddp_忽略:
            继续

        如果 not 有属性(参数, _mp 参数):
            参数._mp 参数 = 火炬.与...相同形状的零(
                参数,
                设备=参数.设备,
                数据类型=混合精度配置.参数数据类型,
                需要梯度=参数.需要梯度,
            )
            _空闲存储(参数._mp 参数)
            # _fp_param 将指向全精度参数,以便可以切换
            返回到前后翻页的末尾。
            参数._fp_param = 参数.数据


def _tree_flatten_with_rref(输出):
    输出是 rref = RPC 可用  isinstance(输出, RRef)
    如果 输出是 rref:
        输出张量列表, treespec = 树扁平化(输出.本地值())
    否则:
        输出张量列表, 树木规范 = 树扁平化(输出)
    需要返回展平的张量,指定如何重新打包它们
    #就像返回类型实际上是 RRef 的引用来重建一样。
    返回 输出张量列表, 树木规范, 输出是 rref


def 使用 rref 展开树(输出, 树规范, 输出是 rref):
    输出 = 树形展开(输出, 树规范)
    如果 输出为 rref:
        输出 = RRef(输出)
    返回 输出


def 查找张量(对象):
    r递归查找指定对象中包含的所有张量。
    如果 RPC 可用  isinstance(对象, RRef):
        如果当前节点是 RRef 的所有者,则展开它并尝试
        查找张量。
        TODO:扩展到远程 RRef。
        如果 对象.is_owner():
            返回 查找张量(对象.本地值())
    如果 isinstance(对象, 火炬.张量):
        返回 [对象]
    如果 isinstance(对象, (列表, 元组)):
        返回 itertools.chain.from_iterable(地图(查找张量, 对象))
    如果 isinstance(对象, 字典):
        返回 itertools.chain.from_iterable(地图(查找张量, 对象.()))
    如果 是数据类(对象):
        返回 itertools.chain.from_iterable(
            地图(查找张量, (getattr(对象, f.名称)  f  字段(对象)))
        )

    返回 []


def 导出 DDP 相关环境变量():
    相关环境变量 = [
        RANK,
        本地排名,
        WORLD_SIZE,
        "主端口",
        "主地址",
        CUDA_VISIBLE_DEVICES,
        "Gloo 套接字接口名称",
        "Gloo 设备传输",
        "NCCL SOCKET IFNAME",
        "TORCH NCCL 阻塞等待",
        "NCCL 调试",
        "NCCL 调试子系统",
        "NCCL_IB_DISABLE",
        # 更多 NCCL 环境变量:
        "NCCL_P2P_DISABLE",
        "NCCL_P2P_LEVEL",
        "NCCL 禁用共享内存",
        "NCCL 套接字线程数",
        "NCCL 每个线程的套接字数",
        "NCCL 缓冲区大小",
        "NCCL_NTHREADS",
        "NCCL_RINGS",
        "NCCL_MAX_NCHANNELS",
        "NCCL_MIN_NCHANNELS",
        "NCCL_CHECKS_DISABLE",
        "NCCL_CHECK_POINTERS",
        "NCCL_LAUNCH_MODE",
        "NCCL_IB_HCA",
        "NCCL_IB_TIMEOUT",
        "NCCL_IB_RETRY_CNT",
        "NCCL_IB_GID_INDEX",
        "NCCL_IB_SL",
        "NCCL_IB_TC",
        "NCCL_IB_AR_THRESHOLD",
        "NCCL_IB_CUDA 支持",
        "NCCL_NET_GDR 级别",
        "NCCL_NET_GDR 读取",
        NCCL_SINGLE_RING_THRESHOLD,
        "NCCL_LL_THRESHOLD",
        "NCCL_TREE_THRESHOLD",
        "NCCL_ALGO",
        "NCCL_PROTO",
        "NCCL 忽略 CPU 亲和性",
        "NCCL 调试文件",
        "NCCL 启用网络通信",
        "NCCL 拓扑文件",
        NCCL 拓扑转储文件,
        PyTorch NCCL 异步错误处理,
    ]
    格式化输出 = 请提供需要翻译的文本
     变量  相关环境变量:
        value = 操作系统.环境[变量] 如果 变量  操作系统.环境 否则 无效
        格式化输出 += fenv:{变量}={}输入文本翻译为简体中文为:\n"
    打印(格式化输出)


 _缓冲通信钩子位置(枚举):
    预转发 = 自动()
    后转发 = 自动()


@dataclass
 缓冲区通信钩子:
    缓冲区通信钩子: 可调用
    缓冲区通信钩子状态: 任何
    缓冲区通信钩子位置: _缓冲通信钩子位置


# 添加 DDPSink 以在向后启动时运行各种功能,例如
# 队列最外层向后/图任务的回调调用,
# 这有助于在所有梯度计算完成后触发回调
# 已完成。
 _DDPSink(函数):
    @staticmethod
    def 前向(ctx, ddp_weakref, *输入):
        # 设置 materialize_grads(False) 将确保 None 梯度保持为
        无效且不为零填充。
        ctx.set_materialize_grads(错误)
        ctx.ddp_weakref = ddp_weakref
        返回 = 输入
        如果 ddp_weakref()._ddp_sink_clone:
            返回 = 元组(
                输入.克隆() 如果 isinstance(输入, 火炬.张量) 否则 输入  输入  输入
            )
        返回 返回

    @staticmethod
    def 反向(ctx, *梯度输出):
        # 将静态图训练中的所有 reduce 操作延迟入队
        迭代
        ddp_weakref = ctx.ddp 弱引用()
        减法器 = ddp 弱引用.减法器
        静态图 = ddp_weakref.静态图
        延迟入队 = (
            静态图  ddp_weakref._静态图延迟_allreduce_enqueued
        )
        如果 静态图  not 延迟入队:
            变量._execution_engine.队列回调(  # type: ignore[call-arg,misc]
                简化器._延迟_all_reduce
            )
            ddp_weakref._static_graph_delay_allreduce_enqueued = 真实

        返回 (, *梯度输出)


 _DDPJoinHook(JoinHook):
    def 初始化(self, ddp, divide_by_initial_world_size):
        设置内部使用的配置变量。
        断言 isinstance(ddp, 分布式数据并行), (
            DDP 加入钩子需要传递一个 DistributedDataParallel
            "实例作为状态"
        )
        断言 ddp.日志记录器  not 
        ddp.记录器._设置不均匀输入连接()
        self.ddp = ddp
        self.ddp.根据初始世界大小进行除法 = 根据初始世界大小进行除法
        超级().初始化()

    def 主钩子(self):
        在正向和反向传递中跟踪 DDP 集体通信操作。
        ddp = self.深度学习框架
        训练期间桶只重建一次
        ddp.简化器.重建桶()

        安排广播,如果我们正在同步模块缓冲区
        前向传递
        TODO:实现 DDP 不均匀输入上下文管理器支持缓冲区
        通信钩子(https://github.com/pytorch/pytorch/issues/65436)
        ddp._check_and_sync_module_buffers()

        # 需要检查在反向传播中是否需要同步
        应该同步反向 = ddp._检查全局是否需要反向梯度同步(
            是否已连接的 rank=真实
        )
        在下一次迭代中,如果跳过梯度同步,则禁用前向参数同步
        因此,相应地设置 `require_forward_param_sync`
        `require_forward_param_sync`
        ddp.require_forward_param_sync = 应该向后同步
        如果 not 应该向后同步:
            返回

        # 为每个梯度桶安排一次 allreduce 以匹配反向
        # 执行 allreduce
        ddp._match_all_reduce_for_bwd_pass()

        # 检查是否需要本地 allreduce 未使用参数
        如果 ddp.find_unused_parameters:
            ddp._match_unused_params_allreduce()

        训练期间重建的参数只推送一次
        ddp.简化器._push_all_rebuilt_params()

    def 后钩(self, is_last_joiner: 布尔):
        同步最终模型以确保所有进程中的模型相同
        self.ddp.同步最终模型(is_last_joiner)


[文档] 分布式数据并行(模块, 可加入的): r实现基于 ``torch.distributed`` 的模块级分布式数据并行。 此容器通过同步梯度提供数据并行性 在每个模型副本之间。要同步的设备 指定由输入 `process_group` 指定,即整个世界 默认情况下。请注意,`DistributedDataParallel` 不分块或 否则将输入数据分片分配给参与的计算单元;用户是 负责定义如何做到这一点,例如通过使用 关于 :class:`DistributedSampler` 的描述。 参见::ref:`分布式基础` 和 :ref:`cuda-nn-ddp-instead`。 与 :class:`torch.nn.DataParallel` 相同的输入约束适用。 创建此类需要 ``torch.distributed`` 已经存在。 通过调用 :func:`torch.distributed.init_process_group` 初始化。 ``DistributedDataParallel`` 已被证明在单节点多 GPU 数据并行训练方面比 :class:`torch.nn.DataParallel` 快得多。 class:`torch.nn.DataParallel` 用于单节点多 GPU 数据并行训练。 并行训练。 在具有 N 个 GPU 的主机上使用`DistributedDataParallel`,你应该启动 启动`N`个进程,确保每个进程仅独立工作于单一 GPU 从 0 到 N-1。这可以通过设置 CUDA_VISIBLE_DEVICES 对每个进程或通过调用: >>> # xdoctest: +SKIP("未定义变量") >>> torch.cuda.set_device(i) where i is from 0 to N-1. In each process, you should refer to the following to construct this module: >>> # xdoctest: +SKIP("未定义变量") >>> torch.distributed.init_process_group( >>> backend='nccl', world_size=N, init_method='...' >>> ) >>> model = DistributedDataParallel(model, device_ids=[i], output_device=i) 为了在每个节点上启动多个进程,您可以使用 `torch.distributed.launch` 或 `torch.multiprocessing.spawn`。 ``torch.distributed.launch`` 或 ``torch.multiprocessing.spawn``。 .. 注意:: 请参阅 `PyTorch 分布式概述 `__ 用于了解所有与分布式训练相关的功能简介。 .. 注意:: 分布式数据并行可用于与 `:class:`torch.distributed.optim.ZeroRedundancyOptimizer` 用于减少 每个 rank 的优化器状态内存占用。请参阅 `ZeroRedundancyOptimizer 配方 `__ 更多详情请查看。 .. 注意:: ``nccl`` 后端目前是最快且强烈推荐的 后端,当使用 GPU 时适用。这适用于单节点和多节点 多节点分布式训练。 .. 注意:: 此模块还支持混合精度分布式训练。 这意味着您的模型可以有不同的参数类型,例如 混合类型的 ``fp16`` 和 ``fp32``,在这些参数上的梯度下降 混合类型的参数将正常工作。 .. 注意:: 如果在一个进程中使用 `torch.save` 来保存模块的检查点, 确保在其它进程中使用 `torch.load` 来恢复它。 每个进程的 `map_location` 都已正确配置。如果没有 `map_location`, `torch.load` 将恢复到设备上的模块 模块被保存的位置。 .. note:: 当一个模型在 `M` 节点上使用 `batch=N` 进行训练时, 与相同模型相比,梯度将小 `M` 倍。 在单个节点上训练,如果损失总和(NOT) 平均通常)跨批次实例(因为梯度 不同节点之间的平均值)。您应该取这个平均值 考虑当你想要获得数学上等效的 与本地训练的对比,训练过程。但在大多数情况下,你可以将 DistributedDataParallel 包装的模型、DataParallel 包装的模型和单 GPU 上的普通模型视为相同(例如,使用等效批次的相同学习率)。 在大多数情况下,你可以将 DistributedDataParallel 包装的模型、DataParallel 包装的模型和单 GPU 上的普通模型视为相同(例如,使用等效批次的相同学习率)。 在大多数情况下,你可以将 DistributedDataParallel 包装的模型、DataParallel 包装的模型和单 GPU 上的普通模型视为相同(例如,使用等效批次的相同学习率)。 在大多数情况下,你可以将 DistributedDataParallel 包装的模型、DataParallel 包装的模型和单 GPU 上的普通模型视为相同(例如,使用等效批次的相同学习率)。 .. 注意:: 参数永远不会在进程之间广播。该模块执行 梯度上的全量减少步骤,并假设它们将被修改 由优化器以相同方式作用于所有进程。缓冲区 (例如,BatchNorm 状态)从正在处理的模块中广播出来 在每次迭代中,都向系统中的所有其他副本发送。 .. 注意:: 如果您正在与 DistributedDataParallel 一起使用 ref:`分布式 RPC 框架`,您应该始终使用 meth:`torch.distributed.autograd.backward` 来计算梯度 用于优化的 :class:`torch.distributed.optim.DistributedOptimizer` 参数 示例:: >>> # xdoctest: +SKIP("未定义变量") >>> import torch.distributed.autograd as dist_autograd >>> 从 torch.nn.parallel 导入 DistributedDataParallel 作为 DDP >>> 导入 torch >>> 从 torch 导入 optim >>> 从 torch.distributed.optim 导入 DistributedOptimizer >>> 导入 torch.distributed.rpc 作为 rpc >>> 从 torch.distributed.rpc 导入 RRef ... >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> rref = rpc.remote("worker1", torch.add, args=(t1, t2)) >>> ddp_model = DDP(my_model) ... >>> # 设置优化器 >>> optimizer_params = [rref] >>> for param in ddp_model.parameters(): >>> optimizer_params.append(RRef(param)) ... >>> dist_optim = DistributedOptimizer( >>> 优化器.SGD, >>> optimizer_params, >>> lr=0.05, >>> ) ... >>> 使用 dist_autograd.context() 上下文管理器 as context_id: >>> pred = ddp_model(rref.to_here()) >>> loss = loss_func(pred, target) >>> dist_autograd.backward(context_id, [loss]) >>> dist_optim.step(context_id) .. 注意:: DistributedDataParallel 目前对梯度 checkpointing 使用:meth:`torch.utils.checkpoint`提供了有限的支持。 如果使用 use_reentrant=False(推荐)进行 checkpoint,DDP 将按预期工作,没有任何限制。 然而,如果使用默认的 use_reentrant=True 进行检查点操作, DDP 在没有模型中未使用参数的情况下将按预期工作 并且每层最多只检查点一次(确保您没有传递) `find_unused_parameters=True` 用于 DDP。我们目前不支持 情况是一个层被多次检查点,或者当有未使用的 模型检查点中的参数。 .. 注意:: 要使非 DDP 模型加载 DDP 模型的 state dict :meth:`~torch.nn.modules.utils.consume_prefix_in_state_dict_if_present` 翻译为::meth:`~torch.nn.modules.utils.consume_prefix_in_state_dict_if_present` 在加载前需要应用去除 DDP 状态字典前缀 "module." 的操作。 .. 警告:: 构造函数、前向方法和输出(或该模块输出的函数)的微分是分布式同步点。考虑到不同进程可能的情况,请考虑这一点。 函数的输出(或该模块输出的函数)是分布式同步点。考虑到不同进程可能的情况,请考虑这一点。 考虑到不同进程可能的情况,请考虑这一点。 执行不同的代码。 .. 警告:: 此模块假定所有参数在创建时已注册到模型中。 时间创建后,不应添加或删除任何参数。 同样适用于缓冲区。 .. 警告:: 本模块假定所有参数都已注册在每个模型的模型中 分布式进程顺序相同。模块本身将 执行梯度 ``allreduce`` 按照反向顺序 模型注册的参数。换句话说,它是用户的 确保每个分布式进程具有相同的模型以及相同的参数注册顺序的责任 此模块允许具有非行主序连续步长的参数 .. 警告:: 例如,您的模型可能包含一些参数,这些参数的 例如,您的模型可能包含一些参数,这些参数的 `torch.memory_format` 是 `torch.contiguous_format` 以及其他格式为 `torch.channels_last` 的内容。然而, 不同进程中的对应参数必须具有相同的步长。 相同。 .. 警告:: 此模块不与 :func:`torch.autograd.grad` 兼容(即它将 仅当梯度要累积在 ``.grad`` 属性中时才工作 参数)。 .. 警告:: 如果您计划使用此模块与 ``nccl`` 后端或 ``gloo`` 结合使用 后端(使用 Infiniband),以及一个使用 DataLoader 的数据加载器 多个工作者,请将多进程启动方法改为 `forkserver`(仅限 Python 3)或 `spawn`。遗憾的是 Gloo(使用 Infiniband)和 NCCL2 不安全,并且你将 如果不对这个设置进行更改,可能会遇到死锁问题。 .. 警告:: 您不应该在封装模型后尝试更改模型的参数 提升您的模型使用 `DistributedDataParallel`。因为,当 将模型封装为 `DistributedDataParallel`,构造函数 `DistributedDataParallel` 将注册额外的梯度 对模型本身的全部参数进行降维函数处理。 如果之后更改模型的参数,则梯度降维函数将不再匹配正确的集合。 使用 ``DistributedDataParallel`` 与之结合。 参数 .. 警告:: 使用 ``DistributedDataParallel``。 `:ref:`分布式 RPC 框架`处于实验阶段,可能会发生变化。 Args: 模块(Module):要并行化的模块 device_ids(整数列表或 torch.device):CUDA 设备。 1) 对于单设备模块,`device_ids`可以 包含一个设备 ID,该 ID 代表唯一的 CUDA 设备,其中包含对应此进程的输入模块。 或者,`device_ids` 也可以为 `None`。 2) 对于多设备模块和 CPU 模块, ``device_ids``必须为``None``。 当两种情况下的``device_ids``均为``None``时, 前向传递的输入数据和实际模块 必须放置在正确的设备上。 (默认:`None`) 输出设备(int 或 torch.device):输出位置的设备 单设备 CUDA 模块。对于多设备模块和 CPU 模块,它必须为`None`,模块本身 决定输出位置。(默认:`device_ids[0]`) (适用于单设备模块) (广播缓冲区)(布尔值):启用同步(广播)的标志 模块在 ``forward`` 函数开始时的缓冲区 (默认:``True``) 初始化同步(布尔值):是否在初始化期间同步以验证参数 形状和广播参数以及缓冲区。 警告:如果设置为 False,则用户需要 确保它们自己权重相同 所有等级。 (默认:``True``) process_group:用于分布式数据的进程组 all-reduction。如果为 ``None``,则使用默认进程组, 由 :func:`torch.distributed.init_process_group` 创建, 将被使用。(默认:``None``) bucket_cap_mb: ``DistributedDataParallel`` 将参数分桶到 多个桶中,以便每个桶的梯度减少 桶可能与其他反向计算重叠。 attr:`bucket_cap_mb` 控制桶的大小。 兆字节(MiB)。如果为 ``None``,则使用默认大小 25 MiB。 (默认:``None``)。 find_unused_parameters (bool): 从所有返回值中包含的张量开始遍历 autograd 图。 tensors contained in the return value of the wrapped module's ``forward`` function. Parameters that don't receive gradients as part of this 图被预先标记为已准备好 将可能被减少。此外,参数 已被用于包装模块的 `forward` 函数但不是损失计算的一部分 因此也不会收到梯度,会被预先标记为准备减少。 预先标记为准备减少。 (默认:``False``) check_reduction: 此参数已弃用。 gradient_as_bucket_view (bool): 当设置为 ``True`` 时,梯度将变为视图 指向 ``allreduce`` 通信的不同偏移量 桶。这可以减少峰值内存使用,其中 保存的内存大小将与总梯度相等 大小。此外,它避免了复制之间的开销。 梯度与`allreduce`通信桶。当 梯度是视图,`detach_()` 不能在它们上调用 如果遇到此类错误,请通过参考 `torch.optim.Optimizer.zero_grad` 函数来解决。 在 `torch/optim/optimizer.py` 中查找该函数作为解决方案。 注意,梯度将在第一次迭代后变为视图。 请注意,梯度将在第一次迭代后变为视图。 第一次迭代后应检查峰值内存节省情况。 static_graph (bool): 当设置为 ``True`` 时,DDP 知道训练图是 静态的。静态图意味着 1) 使用的和未使用的 参数在整个训练循环中不会改变;在 在这种情况下,用户是否设置 ``find_unused_parameters = True`` 都无关紧要。2) 图的训练 在整个训练循环中不会改变(意味着没有 基于迭代的控制流)。 当 static_graph 设置为 ``True`` 时,DDP 将支持以下情况 无法在以前支持: 1) 可重入回退。 2) 多次激活检查点。 3) 当模型有未使用参数时的激活检查点。 4) 模型参数中存在不在前向函数之外的参数。 5) 当存在未使用参数时,可能提高性能, 因为 DDP 不会在每次迭代中搜索图来检测未使用的 当 static_graph 设置为 True 时的参数。 要检查是否可以将 static_graph 设置为 True,一种方法是在 您之前模型训练的结尾检查 ddp 日志数据, 如果 ddp_logging_data.get("can_set_static_graph") == True,通常情况下, 也可以将 `static_graph = True` 设置为 True。 示例:: >>> # xdoctest: +SKIP("未定义变量") >>> model_DDP = torch.nn.parallel.DistributedDataParallel(model) >>> # 训练循环 >>> ... >>> ddp_logging_data = model_DDP._get_ddp_logging_data() >>> static_graph = ddp_logging_data.get("can_set_static_graph") delay_all_reduce_named_params (list of tuple of str and torch.nn.Parameter): a list of named parameters whose all reduce will be delayed when the gradient of 指定的 `param_to_hook_all_reduce` 参数已准备好。其他 DDP 的参数不适用于此参数中指定的命名参数 这些命名参数将被 DDP reducer 忽略。 param_to_hook_all_reduce (torch.nn.Parameter):用于钩子延迟全量减少的参数 在 `delay_all_reduce_named_params` 中指定的参数。 属性: 模块(Module):要并行化的模块。 示例:: >>> # xdoctest: +SKIP("未定义变量") >>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...') >>> net = torch.nn.parallel.DistributedDataParallel(model) "源代码" 用于跟踪给定线程是否在 ddp forward 中用于 torchdynamo 目的 _active_ddp_module: 可选[分布式数据并行] = def 初始化( self, 模块, 设备 ID=, 输出设备=, 暗淡=0, 广播缓冲区=True, 初始化同步=True, 进程组=, 桶容量(MB)=, 查找未使用参数=错误, 检查缩减=错误, 渐进式桶视图=错误, 静态图=错误, 延迟所有 reduce 命名参数=, 将参数钩子到所有 reduce=, 混合精度: 可选[混合精度] = , 装置网状结构=, ): 超级().初始化() 可加入的.初始化(self) self.记录器: 可选[距离.记录器] = 如果 布尔(延迟所有 reduce 命名参数 not ) != 布尔( 将参数钩子到 all_reduce not ): self.记录并抛出异常( ValueError, "delay_all_reduce_named_params 和 param_to_hook_all_reduce 需要同时设置。" "不能同时指定 process_group 和 device_mesh 参数。", ) 如果 流程组 设备网状 not : 提升 运行时错误( "无法指定 process_group 和 device_mesh 参数。" ) elif 流程组 设备网状 : self.流程组 = 获取默认组() elif 设备网状 : self.流程组 = 流程组 否则: 如果 装置网状结构.维数 != 1: 提升 运行时错误( f"仅支持 1D 设备网格,但得到了"{装置网状结构} ) self.设备网状 = 设备网状 self.流程组 = 装置网状结构.get_group(网格维度=0) 来自 torch.distributed.device_mesh 导入 _mesh_resources 根网格 = _网状资源.获取根网格(装置网状结构) # 如果根网格与 device_mesh 不同, # 从根网格中切出 device_mesh。 如果 根网格 != 装置网状结构: # TODO:这是一个临时解决方案,以启用 DDP + TP。 # 我们应该在 DDP 中实现逻辑,以便 2D 实现能够 # 声音和 state_dict 能够直接工作。 在检查未初始化参数之前必须完成此操作。 来自 torch.distributed.tensor.parallel.ddp 导入 ( _pre_dp_module_transform, ) _pre_dp_module_transform(模块) self.全部延迟所有 reduce 参数 = [] 如果 有属性(模块, 忽略的_ddp 参数和缓冲区): self.忽略的参数 = 集合(模块.忽略的_ddp 参数和缓冲区) 否则: self.忽略的参数 = 集合() 如果 延迟所有 reduce 命名参数 not : 名称, 参数 延迟所有 reduce 命名参数: self.忽略的参数.添加(名称) self.全部延迟所有 reduce 参数.追加(参数) self.模块参数 = [ p n, p 模块.命名参数。() 如果 n not self.忽略的参数 ] 如果 not 任何(p.requires_grad p self.模块参数): 如果 长度(self.延迟所有全量 reduce 参数): 记录器.信息(延迟所有参数的 AllReduce 操作。) 否则: self._log_and_throw( 运行时错误, 分布式数据并行在模块 " "没有需要梯度的参数。", ) 如果 设备 ID not 长度(设备 ID) > 1: self._log_and_throw( ValueError, "device_ids 只能为 None 或包含单个元素。", ) self.是多设备模块 = ( 长度({p.设备 p self.模块参数}) > 1 ) 独特设备类型 = { p.设备.类型 p self.模块参数 如果 p.设备 not } 如果 长度(独特设备类型) != 1: self.记录并抛出( ValueError, "DistributedDataParallel 的输入模块必须在" f"同一类型的设备上,但输入模块的参数位于"{不同设备类型}.", ) self.设备类型 = 下一(迭代(独特设备类型)) 如果 ( 设备 ID 长度(设备 ID) == 0 # 为向后兼容 self.设备类型 == cpu self.是否为多设备模块 ): 如果 设备 ID 输出设备: self._log_and_throw( ValueError, "分布式数据并行 device_ids 和 output_device 参数" "仅与单设备/多设备 GPU 模块或 CPU 模块兼容," f"但获取了 device_ids"{设备 ID}输出设备{输出设备}," f模块参数{({p.设备 p self._模块参数})}.", ) self.设备 ID = self.输出设备 = 否则: self.设备 ID = [获取设备索引(x, True) x 设备 ID] 如果 输出设备 : 输出设备 = 设备 ID[0] self.输出设备 = 获取设备索引(输出设备, True) self.静态图 = self.维度 = 维度 self.模块 = 模块 self.设备 = 下一(迭代(self.模块参数)).设备 self.广播缓冲区 = 广播缓冲区 self.查找未使用参数 = 查找未使用参数 self.需要反向梯度同步 = 真实 self.需要正向参数同步 = 真实 self.渐进式桶视图 = 渐进式桶视图 self.混合精度 = 混合精度 如果 self.混合精度 not : 记录器.警告("收到混合精度配置"%s", self.混合精度) 如果 检查缩减: # 此参数已不再使用,因为缩减器 确保即使某些参数减少也能完成减少 # 不接收梯度。 warnings.警告( `DistributedDataParallel`中的`check_reduction`参数 模块已弃用。请避免使用。, 未来警告, 栈级别=2, ) 检查模块是否未初始化参数 参数 self._模块参数: 如果 isinstance(参数, 火炬.nn.参数.未初始化参数): self._记录并抛出( 运行时错误, "具有未初始化参数的模块不能与 `DistributedDataParallel` 一起使用。" 运行一个虚拟前向传递以正确初始化模块, ) 用于节点内参数同步和节点间同步 self.广播桶大小 = int(250 * 1024 * 1024) 减少桶大小 如果 桶容量(MB) : # 默认情况(桶容量为 25 MiB) 桶容量(MB) = 25 self.桶字节容量默认值 = 真实 否则: self.桶字节容量默认值 = self.桶字节容量 = int(桶容量 MB * 1024 * 1024) 是否在侧流上执行输入张量 CPU 到 GPU 的复制 self.使用侧边流进行张量复制 = ( 操作系统.环境.获取("使用 PYTORCH_DDP 侧边流", "1") == 1 ) 初始化梯度缓冲区并注册所有 reduce 钩子 self._延迟梯度缓冲区: 可选[火炬.张量] = self.延迟梯度视图: 列表[火炬.张量] = [] self.延迟所有 reduce 参数 = 如果 长度(self.延迟所有 reduce 参数) != 0: self.注册延迟所有 reduce 钩子( 桶容量(MB)=桶容量(MB), 钩子所有 reduce 的参数=钩子所有 reduce 的参数, 设备 ID=设备 ID, ) 如果 self._延迟所有 reduce 参数: 返回 # 构建 reducer 的参数。 参数, 预期稀疏梯度 = self._构建 reducer 参数() 初始化期间所有集体均由此标志控制。 如果 init_sync: 验证模型等价性。 _verify_param_shape_across_processes(self.进程组, 参数) 同步参数和缓冲区。确保所有 DDP 模型从相同值开始。 _同步模块状态( 模块=self.模块, 进程组=self.进程组, 广播桶大小=self.广播桶大小, =0, 要忽略的参数和缓冲区=self.要忽略的参数, 广播缓冲区=self.广播缓冲区, ) # 在调试模式下,构建参数索引 -> 参数的映射。 参数到名称映射 = self.构建调试参数到名称的映射(参数) 构建 reducer。 self._ddp_init_helper( 参数, 预期稀疏梯度, 参数到名称映射, 静态图, ) self._通信钩子: 列表[元组[可调用, 对象]] = [] 如果 self.混合精度 not : _设置混合精度参数(self.混合精度, self.模块) _转换缓冲区(self.混合精度, self.模块) # 用于异步低精度复制的流。 self._mp_stream = 火炬.() self._submodule_to_event = defaultdict(双端队列) # type: ignore[var-annotated] # 在根模块中添加前向预钩子以启动向低层复制 # 提高精度。 self.模块.注册前向钩子( self._root_copy_hook, 预先添加=错误, with_kwargs=真实 ) # 在所有子模块中添加前向预钩子以等待复制事件 # 在运行计算之前。 模块 self.模块.模块(): 模块.注册前向钩子( self._module_wait_for_copy_hook, 预先添加=错误, with_kwargs=True, ) # 在反向操作中设置回调以提升类型并使用完整精度 # params. TODO (rohan-varma): 使其与通用功能组合 # comm hooks 和 apply_optimizer_in_backward。将 inline 导入 # 避免循环导入问题。 来自 torch.distributed.algorithms.ddp_comm_hooks.mixed_precision_hooks 导入 ( _AllreduceUpcastHookState, _reducer_allreduce_and_upcast_hook, ) upcast_hook_state = _AllreduceUpcastHookState( ddp 弱引用=弱引用.ref(self), 提升流=火炬.(), ) self.注册通信钩子( 提升钩子状态, _reducer_allreduce_and_upcast_hook, ) # 通知减少精度参数数据类型的正确性给减少器 梯度与桶之间的类型检查次数。 self.简化器.设置混合精度参数数据类型( # 类型: 忽略[attr-defined] self.混合精度.参数数据类型 ) self._has_rebuilt_buckets = 如果 静态图: self._设置静态图() self._懒加载已执行 = # 注册 AccumulateGrad 后置钩子,如果 optimize_ddp 为 True # True。如果 compiled_autograd 未启用,则将取消注册钩子。 # self._accum_grad_hooks: 列表[可移除句柄] = [] 优化分布式数据并行 = 火炬._dynamo.工具.获取优化分布式数据并行模式() self.使用 Python 缩减器 = 优化分布式数据并行 == python_reducer 如果 self.使用 python_reducer: 火炬.电磁感抗.配置.合并 DDP 通信 = 真实 火炬.电磁感抗.配置.合并 DDP 桶大小 = 桶容量(MB) 直接将其添加到跟踪规则中将会干扰使用 DDPOptimizer 的用户 的用户。 火炬._dynamo.跟踪规则.LEGACY_MOD_INLINELIST.添加( torch.nn.parallel.distributed ) 火炬._dynamo.跟踪规则.get_legacy_mod_inlinelist.清除缓存() # NOTE: 我们应该延迟初始化这些 self._register_accum_grad_hook() 是否 DDPSink 执行克隆。 self._ddp_sink_clone = 真实 def _register_accum_grad_hook(self): 导入 torch.distributed._functional_collectives 作为 fcol def 编译累积梯度钩子( 参数, *, 参数索引: int, ): 如果 not self.需要反向梯度同步: 返回 如果 参数.梯度 : 返回 如果 self._comm_hooks: hook, 状态 self._comm_hooks: hook(状态, (参数.研究生, 参数)) 否则: gradient = 参数.梯度 / self.进程组.尺寸() gradient = fcol.all_reduce(渐变, "求和", self.进程组) 参数.研究生.复制_(渐变) 索引, 参数 列举(self.模块参数): 如果 not 参数.需要梯度: 继续 self.累积梯度钩子.追加( 参数.register_post_accumulate_grad_hook( functools.偏函数( 编译后的累积梯度钩子, 参数索引=索引, ) ) ) def 延迟_all_reduce 钩子(self, 研究生): 世界大小 = 距离.获取世界大小(self.进程组) self.延迟梯度缓冲区.div_(世界大小) # 类型:忽略[联合属性] _ = 距离.all_reduce( self.延迟梯度缓冲区, 群组=self.进程组, async_op=真实 ) 返回 梯度 def 注册延迟所有 reduce 钩子( self, 桶容量(MB), 将参数钩子到所有 reduce, 设备 ID, ): 创建渐变缓冲区 设备 = 火炬.设备("cpu") 如果 设备 ID 否则 设备 ID[0] self.延迟渐变缓冲区 = 火炬.( 总和(p.元素数量() p self._延迟_all_reduce 参数), 设备=设备, ) # 2. 广播参数 分离参数 = [p.detach() p self._延迟_all_reduce 参数] 距离._广播合并_(self.进程组, 分离参数, 桶容量(MB), 0) # 3. 将所有 reduce 钩子连接到指定的参数 钩子所有 reduce 的参数.注册钩子(self._延迟所有 reduce 钩子) # 4. 构建梯度张量视图 偏移 = 0 参数 self._delay_all_reduce_params: grad_view = self._delay_grad_buffer[偏移 : (偏移 + 参数.元素数量())].视图( 参数.形状 ) self._延迟梯度视图.追加(梯度视图) 偏移 = 偏移 + 参数.元素数量() # 5. 检查所有需要梯度的参数的全量梯度是否延迟。 模块名称, 模块 self.模块.命名模块(): ` 的类型为 List[torch.Tensor], 参数 模块.命名参数。(递归=错误): 如果 参数.需要梯度: 全名 = f"{模块名称}.{` 的类型为 List[torch.Tensor]}" 如果 全名 not self.要忽略的参数: # 至少有一个参数的所有 reduce 不会延迟。 # 在这种情况下,我们不应设置 self._delay_all_reduce_all_params # to True. 返回 self._delay_all_reduce_all_params = 真实 def _setup_in_backward_optimizers(self): # 检查用户是否已使用 apply_optim_in_backward 来重叠优化器 # 步 + DDP 反向. 当前限制: # 1. 目前只支持 allreduce,不支持自定义通信。 # 2. 对于 DDP 管理的参数,如果它们的优化器在 # 反向中运行,它们的梯度将被设置为 ``None``。如果您的用例 # 在 DDP 参数 grad 设置不为 None 之后,请 ping # 后向优化运行中,请 ping # https://github.com/pytorch/pytorch/issues/90052. # 注意:我们使用 self._module_parameters 而不是.parameters(),因为 前者排除忽略的(非 DDP 管理的)参数。 如果 任何(有属性(p, _in_backward_optimizers) p self._module_parameters): 火炬._C._log_api_usage_once(ddp.optimizer_in_backward) 移除因为 apply_optim_in_backward 注册的钩子 DDP 根据 allreduce 自定义优化器与反向传播的叠加方式 param_to_handle_map = ( 距离.优化.在反向传播中应用优化器.参数到优化钩子句柄映射 ) p self._模块参数: handle 参数到句柄映射.获取(p, []): 处理.删除() 需要将 DDP 实例的弱引用传递给 all_reduce(来自 reducer) # 获取管理的 DDP 参数。 ddp_weakref = 弱引用.ref(self) # 注意:在函数中导入,否则这将导致循环 导入。 来自 torch.distributed.algorithms.ddp_comm_hooks.optimizer_overlap_hooks 导入 ( _apply_optim_in_backward_hook, ) self.注册通信钩子( ddp_weakref, _apply_optim_in_backward_hook( gradient_is_bucket_view=self.渐进式桶视图 ), ) self.简化器.在反向过程中设置优化器() # 类型: 忽略[attr-defined] def 触发减少器的自动微分钩子(self, 索引, *未使用): "" 触发减少器的自动微分钩子以 allreduce Reducer 桶中的参数。 请注意,这仅在混合精度训练期间使用 构造时安装的 Reducer 钩子不会在低精度参数设置时被调用 因为我们在进行低精度参数设置操作。 "源代码" self.简化器._autograd_hook(索引) # 类型: 忽略[attr-defined] def _root_copy_hook(self, *参数: 任意, **kwargs: 任意) -> : "" 对于 DDP 混合精度,将低精度副本放在单独的流上,并创建事件等待它们。 当使用 DDP 混合精度进行训练时,此根预前向钩子关闭 在单独的流上关闭低精度副本,并创建相应的事件等待它们。 的事件等待它们。 "源代码" 清除之前迭代子模块到事件。这是因为我们 # 可能为一些最终没有完成的模块填充了一些事件 # 已使用。 self._submodule_to_event = defaultdict(双端队列) # type: ignore[var-annotated] self._mp_stream: 子模块 self.模块.模块(): 参数 子模块.参数(递归=错误): 不要将 DDP 忽略的参数进行类型转换。 如果 有属性(参数, _ddp_ignored) 参数._ddp_忽略: 继续 _分配存储(参数._mp_param, 参数.尺寸()) # 模拟拷贝()隐式转换为低精度 火炬.不梯度(): 参数._mp_param.复制_(参数.数据) # TODO: 当 zero_grad(set_to_none=False)或处于 grad 中 # 累加情况,累加的梯度可以是 fp32 # 运行 DDP 反向传播时可能会因为 # 进入和累积梯度类型不匹配而引发错误。 # 因此我们现在手动将累积梯度向下转换, # 未来我们可能转向 FSDP 风格的梯度 累积管理,其中累积梯度 # 已保存且 .grad 字段设置为 None,绕过 # 这个问题。 如果 参数.梯度 not : 参数.研究生.数据 = 参数.研究生.( self.混合精度.参数数据类型 # 类型:忽略[联合属性] ) 参数.数据 = 参数._mp_param 复制事件 = 火炬.活动() 复制事件.记录() self._submodule_to_event[子模块].追加(复制事件) def _模块等待复制钩子( self, 模块, *参数: 任意, **kwargs: 任意, ) -> : 在执行计算之前,等待适当的事件以确保低精度复制已完成。 try: 事件 = self._子模块到事件[模块].popleft() 除了 索引错误: # 复制事件已被等待 返回 事件.等待(=火炬.加速器.current_stream()) p 模块.参数(递归=错误): 如果参数不需要梯度,则不要注册钩子 如果 not p.requires_grad (有属性(p, _ddp_ignored) p._ddp_ignored): 继续 我们需要在这里注册自动微分钩子,而不是 DDP 的构造函数中 由于我们正在使用低精度参数,请注册它们 通过获取梯度累加器 临时 = p.展开为(p) grad_acc = tmp.grad_fn.下一个函数[0] [0] 钩子 = grad_acc.注册钩子( functools.偏函数(self._fire_reducer_autograd_hook, p._idx) ) p._ddp_mp_hook_state = (grad_acc, hook) def _log_and_throw(self, 错误类型, 错误信息): 如果 self.日志记录器 not : self.记录器.设置错误并记录(f"{字符串(错误类型)}: {错误信息}") 提升 错误类型(错误信息) def _ddp 初始化助手( self, 参数, 预期稀疏梯度, 参数到名称映射, 静态图, ): "" DDP 初始化辅助函数,用于管理参数、梯度钩子、日志和同步 BatchNorm。 初始化辅助函数,执行以下操作: (1)对参数进行桶化以进行缩减 (2) 重置桶化状态 (3) 注册梯度钩子 (4) 记录构造时 DDP 的日志数据 (5) 将 DDP 句柄传递给 SyncBatchNorm 层 "源代码" 注意,参数的顺序并不是它们被使用的顺序, 尤其是在有控制流的模型中。 # 与参数并列的并不是它们在实际执行中的顺序, 如果某个模型恰好也 # 1) 在其反向图中具有其他集体通信操作的节点。 # 2) 在全球子集排名中存在未使用的参数。 # 桶化操作可能会过早地在具有未使用参数的排名上插入 ALL-REDUCE 通信操作, # 与其他排名上的其他集体通信操作意外匹配。 # 为了处理这种特殊情况,当参数不是实际执行顺序时 我们不进行桶划分,因此所有梯度之后只插入一个 ALL-REDUCE 图中所有节点数都被计算了。 # 注意,这里我们仅在第 1 次迭代中禁用分桶。 第一次迭代后,可以重建桶, 因为“桶重建”根据反向图中的实际执行顺序对参数进行桶化。 一旦#73732 合并,可以移除这个分支。 如果 静态图 真实 self.find_unused_parameters 错误: 桶大小限制 = [系统.最大尺寸] 否则: 如果 self.桶字节容量默认值: 桶大小限制 = [ 距离.默认第一个桶的字节数, self.桶字节容量, ] 否则: 桶大小限制 = [self.桶字节上限] ( 桶索引, 每桶大小限制, ) = 距离.根据大小计算桶分配( 参数, 桶大小限制, 预期稀疏梯度, ) # 记录索引以供混合精度使用,因为我们 # 需要通过 Python 将索引传递给 Reducer 的 autograd 钩子。 如果 self.混合精度 not : i, p 列举(参数): p._idx = i # 注意:反转存储桶列表,因为我们想近似 生成梯度的顺序,并假设它们 按照定义的顺序在正向传播中使用。 self.减法器 = 距离.减法器( 参数, 列表(反转(桶索引)), 列表(反转(每个桶的大小限制)), self.进程组, 期待稀疏梯度, 桶大小限制在构造函数中指定。 此外,我们允许有一个单独的小桶用于参数 首先定义的,这样它们的梯度不会溢出 # 添加梯度后,增加一个更大的桶,导致不必要的延迟 # 计算完成后。实验表明 1MB 是一个合理的值。 self.bucket_bytes_cap, self.find_unused_parameters, self.gradient_as_bucket_view, 参数到名称映射, 用户可以将 dist._DEFAULT_FIRST_BUCKET_BYTES 设置为调整 DDP 首个桶的大小 桶。 ( 距离._DEFAULT_FIRST_BUCKET_BYTES 如果 self.桶字节容量默认值 否则 self.桶字节容量 ), ) self.日志记录器 = 距离.记录器(self.简化器) # 设置为弱引用以避免日志记录器和减少器之间的引用循环 # 日志记录器和减少器 self.简化器.设置日志记录器(self.记录器) 具有同步 BN = 子模块 self.模块.模块(): 如果 isinstance(子模块, 火炬.nn.同步批归一化): 具有同步 BN = 真实 断开 在构造时设置可获取的日志数据。 self.记录器.设置构造数据并记录日志( self.模块..__name__, [] 如果 self.设备 ID 否则 self.设备 ID, -1 如果 self.输出设备 否则 self.输出设备, self.广播缓冲区, 具有同步 BN, 静态图, ) # 将句柄传递给 torch.nn.SyncBatchNorm 层 self._传递同步批归一化句柄(self.模块) def __getstate__(self): self.检查默认组() attrs = 复制.复制(self.字典) 删除 属性["进程组"] 删除 属性[减法器] 删除 属性[记录器] 返回 attrs def __setstate__(self, 状态): 如果可序列化,则进程组应为默认组 self.流程组 = 获取默认组() 超级().__setstate__(状态) self.字典.setdefault(require_forward_param_sync, True) self.字典.setdefault(require_backward_grad_sync, True) 参数, expect_sparse_gradient = self._build_params_for_reducer() # 在调试模式下,构建参数索引 -> 参数的映射。 参数到名称映射 = self.构建调试参数到名称的映射(参数) 构建 reducer。 self._ddp_init_helper( 参数, 预期稀疏梯度, 参数到名称映射, self.静态图, ) 如果 self.静态图: self.简化器._设置静态图() 断言 self.日志记录器 not self.记录器._设置静态图() def 构建用于 reducer 的参数(self): # 为所有需要梯度的参数构建(模块,参数)元组。 模块和参数 = [ (模块, 参数) 模块名称, 模块 self.模块.命名模块() 参数 [ 参数 请注意,我们访问 module.named_parameters 而不是 # 参数(module). 仅在需要使用模块参数时才需要 parameters(module) 单进程多设备案例,其中它访问复制的 通过 _former_parameters 参数。 ` 的类型为 List[torch.Tensor], 参数 模块.命名参数。(递归=错误) 如果 参数.requires_grad f"{模块名称}.{` 的类型为 List[torch.Tensor]}" not self.忽略的参数 ] ] # 在子模块之间删除任何共享的参数。 备忘录 = 集合() 模块和参数 = [ # "p 不在 memo 中" 是去重检查。 "not memo.add(p)" 总是 True,只是为了在需要时执行 "add(p)"。 (m, p) m, p 模块与参数 如果 p not 备忘录 not 描述.添加(p) # type: ignore[func-returns-value] ] # 构建参数列表。 参数 = [参数 _, 参数 模块和参数] # 检查一个模块是否会生成稀疏梯度。 def 生成稀疏梯度(模块): 如果 isinstance(模块, (火炬.nn.嵌入, 火炬.nn.EmbeddingBag)): 返回 模块.稀疏的 返回 # 构建一个布尔值列表,表示是否期望稀疏 # 对应参数的梯度。 期待稀疏梯度 = [ 生成稀疏梯度(模块) 模块, _ 模块与参数 ] self._分配模块缓冲区() 返回 参数, 预期稀疏梯度 def _分配模块缓冲区(self): "" 将 self.module.named_buffers 分配给 self.modules_buffers。 将模块缓冲区分配给 self.modules_buffers,然后当 broadcast_buffers=True 时广播到各个进程。注意,这 些缓冲区将被广播到各个进程。注意,这 每次需要同步缓冲区时都必须调用,因为缓冲区可能会被用户模块重新分配。 请参阅 https://github.com/pytorch/pytorch/issues/63916。 收集模块的缓冲区,过滤掉应该被忽略的缓冲区。 "源代码" # 收集模块的缓冲区,过滤掉应该被忽略的缓冲区。 命名模块缓冲区 = [ (缓冲区, 缓冲区名称) 缓冲区名称, 缓冲区 self.模块.命名缓冲区() 如果 缓冲区名称 not self.要忽略的参数 ] self.模块缓冲区 = [ 缓冲区 (缓冲区, 缓冲区名称) 命名模块缓冲区 ] 表示未由 DDP 忽略的模块缓冲区的字典[str, tensor] self.命名模块缓冲区 = { 缓冲区名称: 缓冲区 (缓冲区, 缓冲区名称) 命名模块缓冲区 } def 构建调试参数到名称映射的_build_debug_param_to_name_mapping(self, 参数): 参数到参数索引 = {参数[i]: i i 范围(长度(参数))} 参数集 = 集合(参数) 参数索引到参数全称 = {} 模块名称, 模块 self.模块.命名模块(): ` 的类型为 List[torch.Tensor], 参数 模块.命名参数。(递归=错误): 完全限定名 = f"{模块名称}.{` 的类型为 List[torch.Tensor]}" # 跳过忽略的参数,因为那些参数不会被 DDP 减少 # 首先。 如果 完全限定名 not self.# 要忽略的参数 参数.需要梯度: 如果 参数 not 参数集: self._log_and_throw( ValueError, f"参数名为{完全限定名}在模块参数中找到,但不在 DDP 参数中。 "这表明 DDP 中存在一个错误,请向 PyTorch 报告问题。", ) 参数索引 = 参数到参数索引[参数] 参数索引到参数全称[参数索引] = 完全限定名 确保我们涵盖了所有参数 如果 长度(参数集) != 长度(param_index_to_param_fqn): self._log_and_throw( ValueError, ( 预期参数到名称映射应涵盖所有参数,但实际上 f出现了冲突的长度:{长度(参数集)}与 " 相比 f"{长度(param_index_to_param_fqn)}这表明 DDP 中存在一个 bug ,请向 PyTorch 报告一个问题。 ), ) 返回 param_index_to_param_fqn def 获取参数(self, m, 递归=True): 返回模块参数的生成器。 def 模型参数(m): ps = ( m._former_parameters.() 如果 有属性(m, _前参数) 否则 m.参数(递归=错误) ) yield from ps 修饰 m.模块() 如果 递归 否则 [m]: yield from 模型参数(模块) def _检查默认组(self): 不支持 pickle = try: 如果 self.流程组 != 获取默认组(): 不支持 pickle = 真实 除了 运行时错误: pickle 不支持 = 真实 如果 pickle 不支持: self._log_and_throw( 运行时错误, "DDP Pickling/Unpickling 仅支持" "当使用默认进程的 DDP 时" "组。也就是说,当你调用" "init_process_group 并且没有传递" "process_group 参数给 DDP 构造函数", )
[文档] @contextmanager def no_sync(self): r""" 禁用 DDP 进程间梯度同步的上下文管理器。 在此上下文中,梯度将在模块上累积 变量,稍后将与第一个同步 前向-后向传递退出上下文。 示例: >>> # xdoctest: +忽略("未定义变量") >>> ddp = torch.nn.parallel.DistributedDataParallel(model, pg) >>> with ddp.no_sync(): >>> for input in inputs: >>> ddp(input).backward() # 无同步,累加梯度 >>> ddp(another_input).backward() # 同步梯度 .. 警告: 前向传播应包含在上下文管理器内 否则梯度仍然会同步。 """ old_require_backward_grad_sync = self.require_backward_grad_sync self.require_backward_grad_sync = False try: yield finally: self.require_backward_grad_sync = old_require_backward_grad_sync
@classmethod def 获取活动 DDP 模块
(): `TorchDynamo`需要 DDP 的状态和模块以进行协同优化。 返回 ._active_ddp_module # 注意,此 ctxmgr 函数在 torchdynamo 中被标记为'skip',因此 dynamo 不会启动 '用于“module_to_run”下' '请参阅 torch._dynamo/eval_frame.py 中的 TorchPatcher.patch 以获取更多详细信息' @contextmanager @torch._禁用_dynamo(递归=错误) def '_inside_ddp_forward'(self): 分布式数据并行.'_active_ddp_module' = self try: 产生 最后: 分布式数据并行._active_ddp_module = def _run_ddp_forward(self, *输入, **kwargs): 如果 self._use_python_reducer: 返回 self.模块(*输入, **kwargs) 忽略索引 否则: self._inside_ddp_forward(): 返回 self.模块(*输入, **kwargs) 忽略索引 def 清除梯度缓冲区(self): # 假设梯度累积是在 autograd 引擎中就地进行的,基于此,在反向传播之前,param.grad 指向的 grad buffers 是 # 假设梯度累积是在 autograd 引擎中就地进行的,基于此,在反向传播之前,param.grad 指向的 grad buffers 是 # 对于某些边缘情况,如果 autograd 引擎中的梯度累积不是就地进行的, # 然后将 param.grad 和 grad 缓冲区断开连接。 如果 self._延迟梯度缓冲区 not : # 我们通过重置整个梯度缓冲区来批量执行所有 params 的 zero_grad。 # 当所有 params 的梯度设置为 None 时。 所有参数梯度为空 = 所有( 参数.梯度 参数 self.延迟所有 reduce 参数 ) 索引, 参数 列举(self._延迟_all_reduce 参数): 如果 参数.梯度 : 参数.梯度 = self.延迟梯度视图[索引] 如果 not 所有参数梯度无: 参数.研究生.零_() 如果 所有参数梯度无: self.延迟梯度缓冲区.零_() def _lazy_init(self): # 构造后但延迟初始化的 DDP 初始化 在第一次前向传递之前。 self.在反向优化器中设置。() self._懒加载已执行 = 真实 def _pre_forward(self, *输入, **kwargs): 如果 self.使用 Python 缩减器。: 返回 输入, kwargs 如果 not self._懒加载已执行 not 火炬.编译器.is_compiling(): self._lazy_init() 如果 self.延迟所有 reduce_all_params: 返回 输入, kwargs 如果 火炬.梯度是否启用() self.需要反向梯度同步: 断言 self.日志记录器 not self.记录器.设置运行时统计和日志() self.简化器.准备前向() # 通知加入上下文,此进程尚未加入 # 如果需要 工作 = 加入.通知加入上下文(self) 如果 工作: self.简化器._set_forward_pass_work_handle( 工作, self.根据初始世界大小进行除法 # type: ignore[arg-type] ) # 在前向计算之前调用 _rebuild_buckets, 在释放旧桶之前可能分配新的桶 在_rebuild_buckets 中。为了节省峰值内存使用, 在峰值内存使用增加之前调用_rebuild_buckets 在前向计算期间。 在整个训练期间只能调用一次。 如果 火炬.梯度是否启用() self.简化器._重建桶(): 记录器.信息("在本迭代中已重建减少器桶。") self._has_rebuilt_buckets = 真实 根据位置(前向/后向)同步参数(用户) # 指定作为钩子的一部分,如果指定了钩子。 如果 self._check_sync_bufs_pre_fwd(): self._sync_buffers() 如果 self._join_config.启用: # 通知加入的队列是否应该在反向传递中进行同步。 self.检查全局反向梯度同步需求(是否已连接的 rank=错误) 如果 self.设备 ID: 移动的输入, 移动的关键字参数 = _to_kwargs( 输入, kwargs, 火炬.设备(self.设备类型, self.设备 ID[0)] self.使用侧流进行张量复制, ) 参数, kwargs = 移动输入[0] 移动参数[0] 如有必要,将输入转换为低精度。 如果 self.混合精度 not : 参数, kwargs = _将前向输入转换为低精度( self.混合精度.参数数据类型, *参数, **kwargs, ) 返回 参数, kwargs 否则: 如有必要,将输入转换为低精度。 # TODO (rohan-varma) 测试此代码路径。 如果 self.混合精度 not : 输入, kwargs = _向前传递输入( self.混合精度.参数数据类型, *输入, **kwargs, ) 返回 输入, kwargs def _post_forward(self, 输出): 如果 self._使用 Python 聚合器: 返回 输出 如果 self._delay_all_reduce_all_params: self._clear_grad_buffer() 返回 输出 # 根据位置(前/后 forward)同步参数(用户指定) # 如果指定了 hook,则作为 hook 的一部分指定。 如果 self._check_sync_bufs_post_fwd(): self._sync_buffers() 如果 火炬.梯度是否启用() self.require_backward_grad_sync: self.require_forward_param_sync = 真实 # 我们将原封不动地返回输出对象,因为它是自由形式的 # 因为我们需要找出这个对象中的任何张量, # 因为我们需要确定在这次前向传递中使用了哪些参数, # 以确保我们为任何需要短路减少的情况 # 未使用参数。仅在 `find_unused_parameters` 设置时使用。 如果 self.find_unused_parameters not self.静态图: # 对于静态图不需要填充此信息。 self.简化器.准备反向操作(列表(查找张量(输出))) 否则: self.简化器.准备反向操作([]) 否则: self.require_forward_param_sync = DDPSink 目前启用检测未使用参数 静态图训练第一次迭代 如果 (self.查找未使用参数 not self.静态图) ( self.静态图 not self._静态图延迟_allreduce_enqueued ): ( 输出张量列表, 树规范, 输出为 rref, ) = 使用_rref 进行树扁平化(输出) 输出占位符: 列表[可选[火炬.张量]] = [ _ 范围(长度(输出张量列表)) ] 不要触摸没有 grad_fn 的 tensor,这可能会引发问题 例如:https://github.com/pytorch/pytorch/issues/60733 i, 输出 列举(输出张量列表): 如果 火炬.is_tensor(输出) 输出.grad_fn : 输出占位符[i] = 输出 当 find_unused_parameters=True 时,生成需要梯度的张量 # 通过 DDPSink 反向传播。当不是所有输出都 # 用于损失时,这会使相应的张量接收 未定义的梯度,然后由 reducer 处理以确保 不修改 param.grad 字段,我们也不会出错。 透传张量列表 = _DDPSink.应用( 弱引用.ref(self), *输出张量列表, ) i 范围(长度(输出占位符)): 如果 输出占位符[i] : 输出占位符[i] = passthrough_tensor_list[i] # 重建输出数据结构。 输出 = 使用 rref 展开树( output_placeholders, 树规范, 输出是 rref ) # 前向传播结束后,重置梯度缓冲区和梯度视图 self.清除梯度缓冲区() 返回 输出 def 前向(self, *输入, **kwargs): 火炬.自动微分.分析器.记录功能(DistributedDataParallel.forward): 输入, kwargs = self._pre_forward(*输入, **kwargs) 输出 = ( self.模块.前向(*输入, **kwargs) 如果 self._delay_all_reduce_all_params 否则 self._run_ddp_forward(*输入, **kwargs) ) 返回 self._post_forward(输出) def 分散(self, 输入, kwargs, 设备 ID): 返回 scatter 参数(输入, kwargs, 设备 ID, 暗淡=self.暗淡) def to_kwargs(self, 输入, kwargs, 设备 ID): 为向后兼容保留 返回 _to_kwargs( 输入, kwargs, 火炬.设备(self.设备类型, 设备 ID), self.使用侧流进行张量复制, ) def 收集(self, 输出, 输出设备): 返回 收集(输出, 输出设备, 暗淡=self.暗淡) def 训练(self, 模式=True): 超级().训练(模式) 返回 self 当以连接模式运行时,安排一个 allreduce 来通知已连接的进程 是否将在本次迭代运行反向传播同步。 def _check_global_requires_backward_grad_sync(self, is_joined_rank): 如果 not 已加入排名 self.需要反向梯度同步: 需要同步张量 = 火炬.(1, 设备=self.设备) 否则: 需要同步张量 = 火炬.(1, 设备=self.设备) 工作 = 距离.all_reduce( 需要同步张量, 群组=self.进程组, async_op=真实 ) # (kwen2501) 这个 if 条件是之前内容的直接翻译 # behavior,即当`is_joined_rank=False`时,`work.wait()` # 没有被调用,它也不关心结果。我在猜测 它只想触发一个匹配的全量减少操作,而不想 主流等待。 如果 is_joined_rank 是已加入排名: 工作.等待() 应该向后同步 = 需要同步张量.项目() != 0 返回 应该向后同步 否则: 返回 返回值不应/不应该被使用。 当以连接模式运行时,如果模型有在正向传播中应该同步的缓冲区,则检查并执行模块缓冲区的同步。 # 在正向传播中应该同步的缓冲区。 def _check_and_sync_module_buffers(self): 如果 self._check_sync_bufs_pre_fwd(): 权威排名 = self._find_common_rank(self._distributed_rank, 错误) self._sync_module_buffers(权威排名) # 当在联合模式下运行时,就共同排名和广播模式达成一致 # 将参数广播到所有其他排名 def _sync_final_model(self, is_last_joiner): 达成一致,确定将作为权威模型副本的过程。 当前排名是权威副本的候选者,如果 is_last_joiner=True。我们通过选择较大的排名来打破平局。 self.权威排名 = self.查找共同排名( self.分布式排名, 是否为最后一个加入者 ) 同步模块状态( 模块=self.模块, 进程组=self.进程组, 广播桶大小=self.广播桶大小, =self.权威排名, 要忽略的参数和缓冲区=self.要忽略的参数, 广播缓冲区=self.广播缓冲区, ) 将通信操作调度与在 reducer 的逆向调度中安排的调度相匹配 通过。 def _match_all_reduce_for_bwd_pass(self): 通信工作 = [] 以与 Reducer 安排相同的顺序调度通信,即 从 reducer 中检索桶的顺序 确保在连接模式下保持相同的顺序,例如在动态重建顺序时 # 顺序 返回按顺序排列的 grad_buckets,但用实际的张量替换了 同形状的零张量 梯度桶 = self.简化器._get_zeros_like_grad_buckets() 梯度桶 grad_buckets 梯度桶: 进程加入不贡献梯度。在情况下 # divide_by_initial_world_size=True, 我们将梯度除以初始世界大小 世界大小,如果没有,则除数减少 参与进程的数量。 工作 = self.简化器.运行通信钩子。(累加桶。) 通信工作。.追加(工作) 工作 comm_work: 工作.等待() 所有进程都减少了跨 rank 使用的参数映射。 def _match_unused_params_allreduce(self): 本地使用的参数映射 = self.简化器.获取本地已用地图() self.进程组.allreduce(本地已用参数地图)
[文档] def 加入( self, 除以初始世界大小: 布尔类型 = True, 启用: 布尔类型 = True, 抛出早期终止: 布尔类型 = 错误, ): r"" DDP 中处理进程间输入不均匀的上下文管理器。 此上下文管理器将跟踪已加入的 DDP 进程, 并通过插入集体通信操作来“阴影”前向和反向传递,以匹配非加入进程创建的操作。 这样可以确保每个集体调用都有一个相应的操作。 这将确保每个集体调用都有一个相应的操作。 通过已连接的 DDP 进程调用,防止因进程间输入不均匀而导致的挂起或错误 在训练过程中,否则可能会发生此类错误 如果将标志`throw_on_early_termination`指定为`True`,一旦有一个 rank 抛出错误,所有训练器都将抛出错误 输入用尽,允许捕获和处理这些错误 根据应用逻辑。 一旦所有 DDP 进程都已加入,上下文管理器将广播 模型对应于最后加入的所有进程 确保所有进程中的模型相同 (由 DDP 保证)。 要使用此方法启用进程间输入不均匀的训练, 简单地将此上下文管理器包装在您的训练循环中。无需进一步 模型或数据加载需要修改。 .. 警告:: 如果该上下文管理器包装的模型或训练循环 具有额外的分布式集体操作,例如 模型前向传播中的 `SyncBatchNorm`,然后设置标志 必须启用 `throw_on_early_termination`。这是因为这 上下文管理器不了解非 DDP 集体通信。 此标志将导致所有等级在任何一个等级抛出异常 耗尽输入,允许捕获并恢复这些错误 来自所有等级。 Args: divide_by_initial_world_size (bool): 如果 ``True``,将根据初始 ``world_size`` 分割梯度。DDP 训练是以该 ``world_size`` 启动的。如果 ``False``,将计算有效世界大小。 梯度将根据初始 ``world_size`` 进行分割。DDP 训练是以该 ``world_size`` 启动的。 如果 ``False``,将计算有效世界大小。 (尚未耗尽输入的排名数量)和 将梯度在整个 allreduce 过程中除以那个值。设置 将输入的文本翻译为简体中文如下: ``按初始世界大小除=True`` 以确保每个输入 样本中不均匀的输入在权重上相等 他们对全球梯度贡献了多少。这是 通过始终将梯度除以初始值来实现的 即使遇到不均匀的输入,也会这样做。如果您将其设置为“False”,则将梯度除以剩余的 这将导致梯度被除以剩余的值 节点数量。这确保了与较小数据集训练的均衡性。 ``world_size`` 虽然这也意味着不均匀的输入会更多地影响全局梯度。 通常情况下,您希望将此设置为 ``True``,以便在最后几个样本对全局梯度影响更大的情况下。 您可能希望将其设置为 ``True``,以应对此类情况。 您的训练作业输入不均匀。在极端情况下,当输入数量差异很大时,将其设置为 ``False`` 可能会提供更好的结果。 启用(布尔值):是否启用不均匀输入检测。传递 该参数为 ``False`` 时,将关闭不均匀输入检测。 enable(布尔值):是否启用不均匀输入检测。 在 ``enable=False`` 中禁用,在你知道的情况下 输入在参与进程中均匀分布。默认为 ``True``,则进行交换。 抛出早期终止错误(布尔值):是否抛出错误 或至少有一个等级耗尽时继续训练 输入。如果为 ``True``,则在第一个排名到达数据末尾时抛出异常 。如果为 ``False``,则使用更小的有效世界大小继续训练,直到所有排名都加入 。注意,如果指定了此标志,则该标志 ... ``divide_by_initial_world_size`` 将被忽略。默认 是 ``False``。 示例:: >>> # xdoctest: +SKIP("分布式") >>> 导入 torch >>> 导入 torch.distributed 作为 dist >>> 导入 os >>> 导入 torch.multiprocessing 模块 >>> 从 torch.nn 导入 nn >>> # 在每个创建的进程中 >>> def worker(rank): >>> dist.init_process_group("nccl", rank=rank, world_size=2) >>> torch.cuda.set_device(rank) >>> model = nn.Linear(1, 1, bias=False).to(rank) >>> model = torch.nn.parallel.DistributedDataParallel( >>> model, device_ids=[rank], output_device=rank >>> ) >>> # 排名 1 比排名 0 多一个输入。 >>> inputs = [torch.tensor([1]).float() for _ in range(10 + rank)] >>> with model.join(): >>> for _ in range(5): >>> for inp in inputs: >>> 损失 = 模型(inp).sum() >>> 损失.backward() >>> # 没有使用 join() API,下面的同步将会挂起 >>> # 等待 rank 1 的 allreduce 完成。 >>> torch.cuda.synchronize(device=rank) "源代码" 返回 加入( [self] 启用, 抛出早期终止, divide_by_initial_world_size=divide_by_initial_world_size, )
[文档] def join_hook( self, **kwargs, ): r""" DDP 加入钩子通过在正向和反向传递中镜像通信,使训练能够在不均匀的输入上进行。 参数: kwargs (dict): 一个包含任何关键字段的 :class:`dict` 修改运行时连接钩子的行为;所有 :类:`Joinable` 实例共享相同的连接上下文 管理者将相同的值转发给 `kwargs`。 钩子支持以下关键字参数: divide_by_initial_world_size (bool, 可选): 如果为 ``True``,则梯度将除以 DDP 启动时的初始世界大小。 如果为 ``False``,则梯度将除以有效世界大小。 如果为 ``False``,则梯度将除以有效世界大小。 大小(即非连接进程的数量),意味着 不均匀的输入对全局梯度贡献更大。 通常,如果程度为“True” 不均匀性很小,但在极端情况下可以设置为 ``False`` 可能获得更好结果的案例。 默认为 ``True``。 """ divide_by_initial_world_size = kwargs.get("divide_by_initial_world_size", True) return _DDPJoinHook( self, divide_by_initial_world_size=divide_by_initial_world_size )
@property def 连接设备(self): 返回
self.设备 @property def 加入处理组(self): 返回 self.流程组 def _register_buffer_comm_hook( self, 状态, hook: 可调用, 沉浸式翻译位置=_缓冲通信钩子位置.POST_FORWARD, ): r"" 允许自定义注册钩子,该钩子定义了如何在各个进程间同步缓冲区。 钩子接收一个可选的状态,并以 Dict[str, Tensor] 的形式传递。 对应缓冲区名称和缓冲区,可以运行任意 reductions 相比于 DDP 的默认从 rank 0 广播,这在以下情况下很有用: 例如,如果计数器需要在每次迭代中跨 rank 求和或平均。 Args: 状态(Any):传递给钩子的可选状态。 hook (Callable): 具有以下签名的可调用对象: ``hook(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]`` comm_hook_location (_BufferCommHookLocation): 表示运行钩子位置的枚举值。 where to run the hook. `_BufferCommHookLocation.PRE_FORWARD` 表示在转发之前执行钩子, 钩子将在转发之前运行,并且 `_BufferCommHookLocation.POST_FORWARD` 表示在转发之后执行钩子, 钩子将在转发之后运行。 注意:为了最大化性能,用户可以返回 从他们的钩子中获取 List[torch.futures.Future],并且 DDP 将 安装并适当在末尾等待这些钩子 反向传播。这将确保所有缓冲区都 同步至反向遍历结束。如果此 设置被使用时,建议传递 comm_hook_location = _BufferCommHookLocation.POST_FORWARD, 这将触发前向传递后的钩子。 如果使用 _BufferCommHookLocation.PRE_FORWARD,用户必须 确保在正向传递中操作 GPU 缓冲区时的适当同步。 在正向传递中操作 GPU 缓冲区时。 "源代码" 断言 可调用(hook) self.缓冲区钩子 = 缓冲区通信钩子( 缓冲区通信钩子=hook, 缓冲区通信钩子状态=状态, 缓冲区通信钩子位置=comm_hook_location, )
[文档] def 注册通信钩子(self, 状态: 对象, hook: 可调用): r"" 注册用户定义的跨多个工作进程的 DDP 梯度聚合的通信钩子。这将非常有用,研究人员可以借此尝试新想法。例如,此钩子可以用来实现诸如 GossipGrad 等算法。 此钩子对于研究人员尝试新想法非常有用。例如,此钩子可以用来实现诸如 GossipGrad 等算法。 此钩子可以用来实现诸如 GossipGrad 等算法。 并且涉及不同的通信策略的梯度压缩, 在运行分布式数据并行训练时的参数同步。 Args: 状态(对象):传递给钩子以在训练过程中维护任何状态信息。 例如,在梯度压缩中的错误反馈等。 在 GossipGrad 中与下一个通信的节点等。 每个工作节点本地存储 并由工作节点上的所有梯度张量共享。 hook(可调用函数):具有以下签名的可调用函数: ``hook(state: 对象, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]``: 此函数在桶准备好后被调用。该 hook 可以执行所需的任何处理并返回 一个表示任何异步工作(例如:allreduce)完成的未来 如果钩子不执行任何通信,它仍然 必须返回一个完成的 Future。Future 应包含 新的 grad bucket 张量的值。一旦桶准备好, c10d reducer 会调用此钩子并使用返回的张量 通过 Future 并将梯度复制到各个参数中。 注意,Future 的返回类型必须是一个单独的张量。 我们还提供了一个名为 ``get_future`` 的 API 来检索一个 与`c10d.ProcessGroup.Work`完成相关的未来。 `get_future`目前支持 NCCL,也支持 GLOO 和 MPI 上的大多数操作。 除了点对点操作(发送/接收)之外。 ..警告:: Grad bucket 的 tensor 不会由 world_size 预先划分。用户负责。 在进行 allreduce 等操作时,需要除以 world_size。 ..警告:: DDP 通信钩子只能注册一次,并且应该在调用 backward 之前注册。 钩子返回的 Future 对象应包含单个张量。 ..警告:: 钩子返回的 Future 对象应包含单个张量。 与 grad bucket 内部的张量具有相同形状。 ..警告:: ``get_future`` API 支持 NCCL,部分支持 GLOO 和 MPI 后端(不支持像 send/recv 这样的对等操作)并将返回一个``torch.futures.Future``。 下面是一个返回相同张量的空操作 hook 示例。 示例:: 以下是一个返回相同张量的空操作 hook 示例。 >>> # xdoctest: +SKIP('undefined name') >>> def 无操作(state: object, 桶: dist.GradBucket) -> torch.futures.Future[torch.Tensor]: >>> fut = torch.futures.Future() >>> fut.set_result(桶.buffer()) >>> 返回 fut >>> ddp.register_comm_hook(state=None, hook=noop) 示例:: 下面是一个并行 SGD 算法的示例,其中梯度在 allreduce 之前进行编码,然后在 allreduce 之后进行解码。 所有梯度在 allreduce 之前进行编码,然后在 allreduce 之后进行解码。 >>> # xdoctest: +SKIP('undefined name') >>> 定义 encode_and_decode 函数,参数为 state 对象和 bucket 类型的 GradBucket,返回 torch.futures.Future[torch.Tensor] 类型的 Future 对象 >>> 对 bucket.buffer() 编码后的梯度进行编码 >>> fut = torch.distributed.all_reduce(encoded_tensor).get_future() >>> # 定义解码的回调函数。 >>> def decode(fut): >>> decoded_tensor = decode(fut.value()[0]) # 解码梯度 >>> return decoded_tensor >>> 返回 fut.then(decode) >>> ddp.register_comm_hook(state=None, hook=encode_and_decode) "源代码" self._check_comm_hook(hook) 断言 self.日志记录器 not self.记录器._set_comm_hook_name(hook.__qualname__) self._comm_hooks.追加((hook, 状态)) 距离.注册通信钩子(self.简化器, 状态, hook)
def 注册内置通信钩子(self, comm_hook_type): r"" 注册一个内置的通信钩子,该钩子指定了 DDP 如何在多个工作者之间聚合梯度。 内置钩子的目的是提供某些钩子的高效 C++实现。 如果使用 Python 通信钩子实现,可能不会那么高效。 Args: comm_hook_type (dist.BuiltinCommHookType):通信钩子类型,例如 ALLREDUCE、FP16_COMPRESS 等。 ..警告:: DDP 通信钩子只能注册一次,应该在调用 backward 之前注册。 在调用 backward 之前。 示例:: 以下是 FP16 压缩的一个示例,其中梯度是 压缩为 16 位浮点数后再进行 allreduce 操作,并 然后在对所有 reduce 操作完成后进行解压缩。 >>> # xdoctest: +SKIP('未定义的名称') >>> ddp._register_builtin_comm_hook(dist.BuiltinCommHookType.FP16_COMPRESS) "源代码" 断言 self.日志记录器 not self.记录器._set_comm_hook_name(字符串(comm_hook_type)) 距离.注册内置通信钩子(self.简化器, comm_hook_type) def 注册融合优化器(self, 优化: 类型, *参数, 优化器参数=, **kwargs): r"" 在 DDP 中注册一个优化器,以便在梯度缩减后立即优化参数。 将优化器与 DDP 注册,以便对 参数将在该参数的梯度运行立即 完成缩减,而不是等待所有参数 梯度完成缩减。这可能导致训练速度加快 根据您的工作负载,因为优化器可以在梯度计算时运行 其他参数的优化仍在进行中。此外,它还有潜力在训练期间降低峰值内存消耗,因为它只需要一次性加载单个参数的优化器状态,而不是加载所有参数的优化器状态。 这样可以降低训练过程中的峰值内存消耗,因为它只需要一次性加载单个参数的优化器状态,而不是加载所有参数的优化器状态。 只需一次性加载单个参数的优化器状态,而不是加载所有参数的优化器状态,因此具有降低峰值内存消耗的潜力。 只需一次性加载单个参数的优化器状态,而不是加载所有参数的优化器状态,从而降低了训练过程中的峰值内存消耗。 同时翻译多个状态。 Args: optim(类型):一个用于注册的 ``torch.optim.Optimizer`` 类。 作为融合优化器。 *args(序列[任何类型]):传递给 `optim` 的参数。 optim_params (Optional[Iterable[torch.Tensor]]): 需要优化的参数集合,类似于传统 `torch.optim` 的 `params` 参数 要优化的,类似于传统 `torch.optim` 的 `params` 参数 优化器。如果省略,则所有 DDP 模型参数将被 优化。 **kwargs: (字典[str, 任意]): 将关键字参数传递给 `optim`。 ..警告:: _register_fused_optim 应仅在 DDP 实例上调用一次 注册多个针对同一 DDP 模型的融合优化器 当前不支持。请 ping https://github.com/pytorch/pytorch/issues/71595 如果这是必要的 适用于您的用例。 ..警告:: _register_fused_optim 和 register_comm_hook 目前无法 组合在一起,这意味着自定义 DDP 通信钩子 不支持重叠优化器。请提醒 如果这是必要的,请访问 https://github.com/pytorch/pytorch/issues/71595 以满足您的用例。 ..警告:: 目前不支持梯度累积和 DDP 的`no_sync` 与重叠的优化器一起。请 ping 如果这是必要的,请访问 https://github.com/pytorch/pytorch/issues/71595 以满足您的用例。 示例:: >>> # xdoctest: +SKIP("没有 rendezvous 处理器") >>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...') >>> net = torch.nn.parallel.DistributedDataParallel(model, pg) >>> lr = 1e-2 >>> betas = (0.9, 0.99) >>> eps = 1e-6 >>> net._register_fused_optim(torch.optim.Adam, lr, betas=beta, eps=eps) >>> # 示例使用参数子集 >>> params_to_opt = [list(net.parameters())[0]] >>> net._register_fused_optim( ... torch.optim.Adam, lr, optim_params=params_to_opt, betas=betas, eps=eps ... ) "源代码" # 注意:在函数中导入,否则这将会导致循环引用 # 导入作为优化器重叠模块需要导入 DistributedDataParallel。 来自 torch.distributed.algorithms._optimizer_overlap 导入 _as_overlapped_optim overlapped_optim = _重叠优化(优化, 优化参数, *参数, **kwargs) try: 重叠优化.注册分布式数据并行(self) 除了 未实现异常 作为 e: 提升 运行时错误( f"{优化}不支持重叠的 DDP。请向 PyTorch 或相应所有者提交问题。{优化} ) 来自 e def _分布式广播合并( self, 张量, 缓冲区大小, 权威性 rank=0 ): 距离._广播合并_( self.进程组, 张量, 缓冲区大小, 权威排名 ) def _检查同步缓冲区后转发(self): 返回 ( self.将同步模块缓冲区() 有属性(self, 缓冲区钩子) self.缓冲区钩子.缓冲区通信钩子位置 == _缓冲通信钩子位置.后转发 ) def _check_sync_bufs_pre_fwd(self): 返回 self.将同步模块缓冲区() ( not 有属性(self, 缓冲区钩子) self.缓冲区钩子.缓冲区通信钩子位置 == _缓冲通信钩子位置.预转发 ) def 将同步模块缓冲区(self): 返回 ( self.require_forward_param_sync self.广播缓冲区 长度(self.modules_buffers) > 0 ) def _find_common_rank(self, input_rank, 排名条件): # -1 表示此排名不在考虑范围内 # 常见排名 rank_to_use 排名使用 = 火炬.张量( [输入排名 如果 排名条件 否则 -1] 设备=self.设备, ) 距离.all_reduce(使用的排名, 操作=ReduceOp.MAX, 群组=self.进程组) 如果 使用排名.项目() == -1: self._log_and_throw( ValueError, "BUG!期望至少有一个进程的 rank_cond 为 true。" "这表明 PyTorch 中存在一个 bug,请报告问题。", ) 返回 使用排名.项目() def 同步缓冲区(self): 火炬.不梯度(): # 模块缓冲同步 # 在进程间同步缓冲区。 # 如果我们在使用 DDP 与 join manager,我们必须同意 # 从哪个 rank 同步模块缓冲区,因为 rank 0 可能 已连接并具有过时的模块缓冲区。 如果 self._join_config.启用: 权威排名 = self._find_common_rank( self.分布式排名, 真实 ) 否则: # 排名 0 的过程被认为是权威副本。 权威排名 = 0 # 更新 self.modules_buffers,以防有任何缓冲区被 #重新分配。 self._分配模块缓冲区() self._同步模块缓冲区(权威排名) def 同步模块缓冲区(self, 权威排名): 如果 not 有属性(self, 缓冲区钩子): self.默认广播合并(权威排名=权威排名) 否则: 钩子 = self.缓冲区钩子.缓冲区通信钩子 状态 = self.缓冲区钩子.缓冲区通信钩子状态 期货 = hook(状态, self.命名模块缓冲区) 如果 期货 not : self.简化器.安装后向未来(futs) def _default_broadcast_coalesced( self, 缓冲区=, 桶大小=, authoritative_rank=0 ): "" 从 rank 0 广播缓冲区到其他工作进程。 如果 bufs、bucket_size 为 None,则使用默认值 self.modules_buffers 和 self.broadcast_bucket_size 代替。 "源代码" 如果 缓冲区 : 缓冲区 = self.modules_buffers 如果 桶大小 : 桶大小 = self.广播桶大小 self._分布式广播合并(缓冲区, 桶大小, 权威排名) def _传递同步批归一化处理(self, 模块): 模块.模块(): 如果 isinstance(, 火炬.nn.模块.同步批量归一化): 如果 self.设备类型 == "cpu": self._log_and_throw( ValueError, "同步批量归一化层仅与 GPU 模块一起工作", ) def _check_comm_hook(self, hook): 如果 not 可调用(hook): self._log_and_throw(类型错误, "通信钩子必须可调用。") sig = 检查.签名(hook) 如果 ( 签名.参数[].注释 != 检查._空_ 签名.参数[].注释 != 距离.毕业桶 ): self._log_and_throw( ValueError, "通信钩子:桶注释应为 dist.GradBucket。", ) 如果 ( 签名.返回注解 != 检查. 签名.返回注解 != 火炬.期货.未来[火炬.张量] ): self._log_and_throw( ValueError, 通信钩子:返回注释应为 torch.futures.Future[torch.Tensor]。, ) 如果 hook.__name__ ["bf16 压缩钩子", "bf16 压缩包装钩子"]: 支持 CUDA = ( 火炬.版本.cuda not ) 火炬.版本.hip not 支持 NCCL = ( 距离.是否可用() 距离.检查 NCCL 是否可用() 火炬.cuda.nccl.版本() >= (2, 10) ) xpu_xccl 支持 = ( 距离.是否可用() 距离.是否支持 xccl() 火炬.xpu.是否可用() ) 如果 not ((支持 CUDA 支持 NCCL) 支持 XPU 和 XCCL): self._log_and_throw( 类型错误, 需要 BF16 all reduce 通信钩子,要求 CUDA 11+和 NCCL 2.10+或 XPU 和 XCCL, ) @property def 分布式排名(self): 返回 距离.获取排名(self.进程组) @staticmethod def 获取数据并行参数(模块, 命名参数=错误): 返回由给定 DDP 单元管理的参数生成器。 参数 ( 模块.参数() 如果 not 命名参数 否则 模块.命名参数。() ): 如果 not 有属性(参数, _ddp_ignored): 产生 参数 @staticmethod def _设置模型忽略的参数和缓冲区( 模块, 忽略的参数和缓冲区 ): "" 设置要由 DDP 忽略的参数和缓冲区。 参数的预期格式为完全限定名:{module_name}.{param_name},并且 类似地,{module_name}.{buffer_name}用于缓冲区。例如: params_to_ignore = [] # NB: 模型此处为纯 PyTorch 模块,尚未使用 DDP 进行封装。 for module_name, module in model.named_modules(): for param_name, param in module.named_parameters(recurse=False): if should_ignore(param): 创建预期格式 fqn = f"{module_name}.{param_name}" params_to_ignore.append(fqn) torch.nn.parallel.DistributedDataParallel._set_params_and_buffers_to_ignore_for_model( 模型, 忽略的参数 ) "源代码" 这是一个绕过方法,用于设置 DDP 应忽略的参数和缓冲区 在同步期间。当 API 最终确定后,它将被删除 作为解决 https://github.com/pytorch/pytorch/issues/43690 的一部分 模块.忽略的_ddp 参数和缓冲区 = 忽略的参数和缓冲区 名称, 参数 模块.命名参数。(): 如果 名称 要忽略的参数和缓冲区: 参数._ddp_忽略的 = 真实 名称, 缓冲区 模块.命名缓冲区(): 如果 名称 要忽略的参数和缓冲区: 缓冲区._ddp_ignored = 真实 def _get_ddp_logging_data(self): r"" 返回用于调试和分析的日志数据字典。 在调用 DistributedDataParallel() 之后,可以调用此接口。 构造函数返回一个日志数据字典。这有助于 调试和分析。日志数据包括 DistributedDataParallel 构造函数的输入参数,DistributedDataParallel 的一些内部状态 以及性能指标。简单打印字典,看看 这些指标是。 这是一个原型界面,未来可能会有所变化。 "源代码" 断言 self.日志记录器 not ddp_logging_data = self.记录器._get_ddp_logging_data() 返回 {**ddp 日志数据.strs 映射, **ddp 日志数据.ints 映射} def 设置 DDP 运行时日志采样率(self, 采样率): r"" 设置收集运行时统计信息的采样率。 此接口允许用户设置收集采样率。 运行时统计信息。运行时统计信息将在前 10 次迭代中记录, 之后每 10 次迭代记录一次。默认情况下,运行时统计信息将记录前 10 次迭代, 每"sample_rate"次训练迭代记录一次。在默认情况下, 运行时统计信息将记录前 10 次迭代, 经过 10 次迭代后,运行时统计信息每进行一次记录。 "kDDPRuntimeLoggingSampleRate=100" 训练迭代次数。 这是一个原型界面,未来可能会有所变化。 "源代码" 如果 采样率 < 1: self._log_and_throw( ValueError, DDP 运行时日志采样率应等于或大于 1, ) self.简化器.设置 DDP 运行时日志采样率(采样率) def _设置静态图(self): "" 为 DDP 设置静态图。 建议在 DDP 构造函数中设置静态图, 在内部调用此私有 API。 "源代码" 如果已设置 self.static_graph,则无需再次设置。 如果 self.静态图: warnings.警告( 您已将 static_graph 设置为 True,无需再次设置。 ) 返回 self.静态图 = 真实 self._static_graph_delay_allreduce_enqueued = self.简化器._设置静态图() 断言 self.日志记录器 not self.记录器._设置静态图() 如果 self.find_unused_parameters: warnings.警告( 您已将 find_unused_parameters=true 传递给 DistributedDataParallel。 "`_set_static_graph` 将自动检测未使用的参数,因此 " "您不需要设置 find_unused_parameters=true,只需确保这些 " "未使用的参数在调用 `_set_static_graph` 的训练循环期间不会改变 " "`_set_static_graph`。" ) def 移除自动求导钩子(self): "移除由 reducer 注册到模型参数上的 autograd 钩子。" self.简化器._移除 autograd 钩子() def _检查 reducer 是否已最终化(self): "" 检查 reducer 是否已处理所有桶并适当地最终化反向操作。 在训练循环中调用 .backward() 方法之后调用此方法是有用的 以避免由于 reducer 未最终化而导致的后续难以调试的错误 在未来的路上 "源代码" self.简化器._check_reducer_finalized() def 设置稀疏元数据(self, 全局唯一标识符): self.简化器.设置稀疏元数据(全局唯一标识符) def 更新进程组(self, 新进程组): "" 动态更新 DDP 的过程组,以便我们可以缩小/扩展 DDP 无需重新初始化 DDP 的全局大小 注意:如果您正在使用通过 register_comm_hook 注册的自定义通信钩子, 您需要分别更新那些钩子的进程组。 "源代码" 强制重建新进程组的存储桶。这确保了所有进程的编号 # 在重建桶的时间上保持同步 # 重新评估基于世界大小可能产生的桶的先前假设 # 已更改。 self._has_rebuilt_buckets = self.简化器.重置状态() 如果 not _rank_not_in_group(新建进程组): self.流程组 = 新建进程组 self.简化器.更新进程组(新进程组) def _设置 ddp_sink_clone(self, val: 布尔): "" 设置 DDPSink 是否应该克隆输出张量。 默认为 True,因为如果损失在原地修改,我们运行 视图中的修改是在原地发生的错误。 尽管克隆张量可以增加显著的内存和 性能下降,如果张量的数量和大小很大。 结果,这可以设置为 False,如果您不是在原地修改 损失。 "源代码" self._ddp_sink_clone = val

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源