# mypy: 允许未类型化定义
来自
打字
导入
任意,
可调用,
角色
导入
火炬
导入 torch.distributed
作为 dist
__all__ = [
allreduce_hook,
fp16_compress_hook,
"bf16 压缩钩子",
"fp16 压缩包装器",
"bf16 压缩包装器",
]
def _allreduce_fut(
进程组:
距离.
流程组,
张量:
火炬.
张量
) -> 火炬.
期货.
未来[
火炬.
张量
]:
"通过 allreduce 平均输入梯度张量并返回一个 future。"
要使用的组 =
流程组
如果
流程组
是 not None
否则
距离.
群组.WORLD
首先应用除法以避免溢出,尤其是对于 FP16。
张量.div_(
使用组.
尺寸())
返回 (
距离.all_reduce(
张量,
群组=
使用组, async_op=True)
.获取未来()
.然后(lambda fut: fut.
值()[0])
)
[文档]def allreduce_hook(
进程组:dist.ProcessGroup,桶:dist.GradBucket
) -> torch.futures.Future[torch.Tensor]:
""
使用 `GradBucket` 张量调用 `allreduce`。
当梯度张量在所有工作者之间聚合后,其 `then`
回调函数取平均值并返回结果。
如果用户注册此 DDP 通信钩子,
预期 DDP 结果将与未注册钩子的情况相同。
因此,这不会改变 DDP 的行为,用户可以将此作为参考
或修改此钩子以记录有用的信息或任何其他目的,同时
不影响 DDP 行为。
示例::
>>> # xdoctest: +SKIP
>>> ddp_model.register_comm_hook(process_group, allreduce_hook)
"``"
返回_allreduce_fut(process_group, bucket.buffer())
def _compress_hook(
数据类型:
火炬.
数据类型,
进程组:
距离.
流程组,
桶:
距离.
梯度桶,
) -> 火炬.
期货.
未来[
火炬.
张量
]:
要使用的组 =
流程组
如果
流程组
是 not None
否则
距离.
群组.WORLD
世界大小 =
使用组.
尺寸()
缓冲区 = (
角色(
元组[
火炬.
张量, ...
]
桶
)0]
如果 isinstance(
桶,
元组)
否则
桶.
缓冲区()
)
compressed_tensor = 缓冲区.
到(
数据类型).div_(
世界大小)
def 解压缩(fut):
解压缩张量 =
缓冲区
# 在原地解压缩以减少峰值内存。
# 参考:https://github.com/pytorch/pytorch/issues/45968
value = fut 如果 isinstance(fut,
火炬.
张量)
否则 fut.
值()[0]
解压缩张量.
复制_(
值)
返回
解压缩张量
如果
火炬.
编译器.is_compiling():
梯度 =
距离.
_功能性集体.all_reduce(
压缩张量,
"求和",
要使用的组
)
返回
解压缩(
研究生)
否则:
fut = 距离.all_reduce(
压缩张量,
群组=
使用组, async_op=
真实
).获取未来()
返回 fut.
然后(
解压缩)
[文档]def fp16_compress_hook(
process_group: dist.ProcessGroup,
bucket: dist.GradBucket,
) -> torch.futures.Future[torch.Tensor]:
"""
通过将 `GradBucket` 转换为 `torch.float16` 并除以进程组大小来压缩。
此 DDP 通信钩子实现了一种简单的梯度压缩方法
该方法将 `GradBucket` 张量转换为半精度浮点格式(`torch.float16`)
然后除以进程组大小。
所有这些 `float16` 梯度张量都被压缩了。
所有压缩后的张量都被 allreduced 后,链式回调 `decompress` 会将其转换回输入数据类型(例如 `float32`)。
示例::
>>> # xdoctest: +SKIP
>>> ddp_model.register_comm_hook(process_group, fp16_compress_hook)
"""
return _compress_hook(torch.float16, process_group, bucket)
[文档]def bf16_compress_hook(
process_group: dist.ProcessGroup,
bucket: dist.GradBucket,
) -> torch.futures.Future[torch.Tensor]:
"""
警告:此 API 为实验性,需要 NCCL 版本晚于 2.9.6。
此 DDP 通信钩子实现了一种简单的梯度压缩方法
该方法将`GradBucket`张量转换为半精度
`Brain 浮点格式 `_(`torch.bfloat16`)
然后将其除以进程组大小。
所有这些 `bfloat16` 梯度张量都会被压缩。一旦所有梯度张量都被压缩,链式回调 `decompress` 就会将它转换回输入数据类型(例如 `float32`)。
张量被 allreduced 后,链式回调`decompress`将其转换回输入数据类型(如`float32`)。
示例::
>>> # xdoctest: +SKIP
>>> ddp_model.register_comm_hook(process_group, bf16_compress_hook)
"""
return _compress_hook(torch.bfloat16, process_group, bucket)
[文档]def fp16_compress_wrapper(
hook: 可调用[[Any, dist.GradBucket], torch.futures.Future[torch.Tensor]],
) -> 可调用[[Any, dist.GradBucket], torch.futures.Future[torch.Tensor]]:
"""
将输入张量转换为 `torch.float16`,将钩子结果的张量转换回输入数据类型。
此包装器将给定 DDP 通信钩子的输入梯度张量转换为半精度浮点格式(`torch.float16`),并将给定钩子的结果张量转换回输入数据类型,例如 `float32`。
浮点格式(`torch.float16`),并将给定钩子的结果张量转换回输入数据类型,例如 `float32`。
类型,例如 `float32`。
因此,`fp16_compress_hook` 等同于 `fp16_compress_wrapper(allreduce_hook)`。
示例::
>>> # xdoctest: +SKIP
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10)
>>> ddp_model.register_comm_hook(state, fp16_compress_wrapper(powerSGD_hook))
"""
def fp16_compress_wrapper_hook(
hook_state, bucket: dist.GradBucket
() -> torch.futures.Future[torch.Tensor]
# 将桶张量转换为 FP16。
bucket.set_buffer(bucket.buffer().to(torch.float16))
fut = hook(hook_state, bucket)
def decompress(fut):
decompressed_tensor = bucket.buffer()
# 在原地解压缩以减少峰值内存。
# 参考:https://github.com/pytorch/pytorch/issues/45968
decompressed_tensor.copy_(fut.value())
return decompressed_tensor
# Decompress after hook has run.
return fut.then(decompress)
返回 fp16_compress_wrapper_hook
[文档]def bf16_compress_wrapper(
hook: 可调用[[Any, dist.GradBucket], torch.futures.Future[torch.Tensor]]
) -> 可调用[[Any, dist.GradBucket], torch.futures.Future[torch.Tensor]]:
"""
警告:此 API 为实验性,需要 NCCL 版本晚于 2.9.6。
此包装器将给定 DDP 通信钩子的输入梯度张量转换为半精度
`脑浮点格式 `_ (``torch.bfloat16``),
将给定钩子的结果张量转换回输入数据类型,例如 `float32`。
因此,`bf16_compress_hook` 等同于 `bf16_compress_wrapper(allreduce_hook)`。
示例::
>>> # xdoctest: +SKIP
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10)
>>> ddp_model.register_comm_hook(state, bf16_compress_wrapper(powerSGD_hook))
"""
def bf16_compress_wrapper_hook(
hook_state, 桶: dist.GradBucket
) -> torch.futures.Future[torch.Tensor]:
# 将桶张量转换为 BF16。
桶.set_buffer(桶.buffer().to(torch.bfloat16))
fut = hook(hook_state, bucket)
def decompress(fut):
decompressed_tensor = bucket.buffer()
# Decompress in place to reduce the peak memory.
# See: https://github.com/pytorch/pytorch/issues/45968
解压缩 tensor.copy_(fut.value())
返回解压缩后的 tensor
# 解压缩在钩子运行之后
返回 fut.then(decompress)
返回 bf16_compress_wrapper_hook