• 文档 >
  • 模块代码 >
  • torch >
  • torch.distributed >
  • torch.distributed.algorithms.ddp_comm_hooks.default_hooks
快捷键

torch.distributed.algorithms.ddp_comm_hooks.default_hooks 的源代码

# mypy: 允许未类型化定义
来自 打字 导入 任意, 可调用, 角色

导入 火炬
导入 torch.distributed 作为 dist


__all__ = [
    allreduce_hook,
    fp16_compress_hook,
    "bf16 压缩钩子",
    "fp16 压缩包装器",
    "bf16 压缩包装器",
]


def _allreduce_fut(
    进程组: 距离.流程组, 张量: 火炬.张量
) -> 火炬.期货.未来[火炬.张量]:
    "通过 allreduce 平均输入梯度张量并返回一个 future。"
    要使用的组 = 流程组 如果 流程组  not None 否则 距离.群组.WORLD

    首先应用除法以避免溢出,尤其是对于 FP16。
    张量.div_(使用组.尺寸())

    返回 (
        距离.all_reduce(张量, 群组=使用组, async_op=True)
        .获取未来()
        .然后(lambda fut: fut.()[0])
    )


[文档]def allreduce_hook( 进程组:dist.ProcessGroup,桶:dist.GradBucket ) -> torch.futures.Future[torch.Tensor]: "" 使用 `GradBucket` 张量调用 `allreduce`。 当梯度张量在所有工作者之间聚合后,其 `then` 回调函数取平均值并返回结果。 如果用户注册此 DDP 通信钩子, 预期 DDP 结果将与未注册钩子的情况相同。 因此,这不会改变 DDP 的行为,用户可以将此作为参考 或修改此钩子以记录有用的信息或任何其他目的,同时 不影响 DDP 行为。 示例:: >>> # xdoctest: +SKIP >>> ddp_model.register_comm_hook(process_group, allreduce_hook) "``" 返回_allreduce_fut(process_group, bucket.buffer())
def _compress_hook( 数据类型
: 火炬.数据类型, 进程组: 距离.流程组, : 距离.梯度桶, ) -> 火炬.期货.未来[火炬.张量]: 要使用的组 = 流程组 如果 流程组 not None 否则 距离.群组.WORLD 世界大小 = 使用组.尺寸() 缓冲区 = ( 角色(元组[火炬.张量, ...] )0] 如果 isinstance(, 元组) 否则 .缓冲区() ) compressed_tensor = 缓冲区.(数据类型).div_(世界大小) def 解压缩(fut): 解压缩张量 = 缓冲区 # 在原地解压缩以减少峰值内存。 # 参考:https://github.com/pytorch/pytorch/issues/45968 value = fut 如果 isinstance(fut, 火炬.张量) 否则 fut.()[0] 解压缩张量.复制_() 返回 解压缩张量 如果 火炬.编译器.is_compiling(): 梯度 = 距离._功能性集体.all_reduce( 压缩张量, "求和", 要使用的组 ) 返回 解压缩(研究生) 否则: fut = 距离.all_reduce( 压缩张量, 群组=使用组, async_op=真实 ).获取未来() 返回 fut.然后(解压缩)
[文档]def fp16_compress_hook( process_group: dist.ProcessGroup, bucket: dist.GradBucket, ) -> torch.futures.Future[torch.Tensor]: """ 通过将 `GradBucket` 转换为 `torch.float16` 并除以进程组大小来压缩。 此 DDP 通信钩子实现了一种简单的梯度压缩方法 该方法将 `GradBucket` 张量转换为半精度浮点格式(`torch.float16`) 然后除以进程组大小。 所有这些 `float16` 梯度张量都被压缩了。 所有压缩后的张量都被 allreduced 后,链式回调 `decompress` 会将其转换回输入数据类型(例如 `float32`)。 示例:: >>> # xdoctest: +SKIP >>> ddp_model.register_comm_hook(process_group, fp16_compress_hook) """ return _compress_hook(torch.float16, process_group, bucket)
[文档]def bf16_compress_hook( process_group: dist.ProcessGroup, bucket: dist.GradBucket, ) -> torch.futures.Future[torch.Tensor]: """ 警告:此 API 为实验性,需要 NCCL 版本晚于 2.9.6。 此 DDP 通信钩子实现了一种简单的梯度压缩方法 该方法将`GradBucket`张量转换为半精度 `Brain 浮点格式 `_(`torch.bfloat16`) 然后将其除以进程组大小。 所有这些 `bfloat16` 梯度张量都会被压缩。一旦所有梯度张量都被压缩,链式回调 `decompress` 就会将它转换回输入数据类型(例如 `float32`)。 张量被 allreduced 后,链式回调`decompress`将其转换回输入数据类型(如`float32`)。 示例:: >>> # xdoctest: +SKIP >>> ddp_model.register_comm_hook(process_group, bf16_compress_hook) """ return _compress_hook(torch.bfloat16, process_group, bucket)
[文档]def fp16_compress_wrapper( hook: 可调用[[Any, dist.GradBucket], torch.futures.Future[torch.Tensor]], ) -> 可调用[[Any, dist.GradBucket], torch.futures.Future[torch.Tensor]]: """ 将输入张量转换为 `torch.float16`,将钩子结果的张量转换回输入数据类型。 此包装器将给定 DDP 通信钩子的输入梯度张量转换为半精度浮点格式(`torch.float16`),并将给定钩子的结果张量转换回输入数据类型,例如 `float32`。 浮点格式(`torch.float16`),并将给定钩子的结果张量转换回输入数据类型,例如 `float32`。 类型,例如 `float32`。 因此,`fp16_compress_hook` 等同于 `fp16_compress_wrapper(allreduce_hook)`。 示例:: >>> # xdoctest: +SKIP >>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10) >>> ddp_model.register_comm_hook(state, fp16_compress_wrapper(powerSGD_hook)) """ def fp16_compress_wrapper_hook( hook_state, bucket: dist.GradBucket () -> torch.futures.Future[torch.Tensor] # 将桶张量转换为 FP16。 bucket.set_buffer(bucket.buffer().to(torch.float16)) fut = hook(hook_state, bucket) def decompress(fut): decompressed_tensor = bucket.buffer() # 在原地解压缩以减少峰值内存。 # 参考:https://github.com/pytorch/pytorch/issues/45968 decompressed_tensor.copy_(fut.value()) return decompressed_tensor # Decompress after hook has run. return fut.then(decompress) 返回 fp16_compress_wrapper_hook
[文档]def bf16_compress_wrapper( hook: 可调用[[Any, dist.GradBucket], torch.futures.Future[torch.Tensor]] ) -> 可调用[[Any, dist.GradBucket], torch.futures.Future[torch.Tensor]]: """ 警告:此 API 为实验性,需要 NCCL 版本晚于 2.9.6。 此包装器将给定 DDP 通信钩子的输入梯度张量转换为半精度 `脑浮点格式 `_ (``torch.bfloat16``), 将给定钩子的结果张量转换回输入数据类型,例如 `float32`。 因此,`bf16_compress_hook` 等同于 `bf16_compress_wrapper(allreduce_hook)`。 示例:: >>> # xdoctest: +SKIP >>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10) >>> ddp_model.register_comm_hook(state, bf16_compress_wrapper(powerSGD_hook)) """ def bf16_compress_wrapper_hook( hook_state, 桶: dist.GradBucket ) -> torch.futures.Future[torch.Tensor]: # 将桶张量转换为 BF16。 桶.set_buffer(桶.buffer().to(torch.bfloat16)) fut = hook(hook_state, bucket) def decompress(fut): decompressed_tensor = bucket.buffer() # Decompress in place to reduce the peak memory. # See: https://github.com/pytorch/pytorch/issues/45968 解压缩 tensor.copy_(fut.value()) 返回解压缩后的 tensor # 解压缩在钩子运行之后 返回 fut.then(decompress) 返回 bf16_compress_wrapper_hook

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源