# mypy: 允许未类型化定义
© Meta Platforms, Inc. 及其关联公司
导入 contextlib
来自
打字
导入
角色,
可选
导入
火炬
导入 torch._prims_common
作为
工具
导入 torch.distributed._functional_collectives
作为 funcol
导入 torch.distributed.distributed_c10d
作为 c10d
来自
火炬
导入
张量
来自 torch.distributed.device_mesh
导入
设备网格
来自 torch.distributed.tensor
导入 DTensor,
复制,
片段
来自 torch.distributed.tensor._dtensor_spec
导入 DTensorSpec,
张量元
来自 torch.distributed.tensor._ops._embedding_ops
导入 _MaskPartial
来自 torch.distributed.tensor._ops._math_ops
导入 (
_skip_dim,
Reduction,
复制降维维度,
)
来自 torch.distributed.tensor.placement_types
导入
安置
aten = 火炬.
操作.aten
__all__ = ["损失并行"]
[文档]@contextlib.contextmanager
def 损失并行():
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
一个启用损失并行性的上下文管理器,其中高效的并行化损失计算
可以在输入在类别维度上分片时执行。目前仅支持交叉熵
损失函数是支持的。
在这个上下文管理器中,可以使用 :func:`~torch.nn.functional.cross_entropy` 或
class:`~torch.nn.CrossEntropyLoss` 如常,对输入参数有以下假设。
如果有,相应的 ``backward()`` 调用也需要在这个上下文管理器下进行。
Args:
输入 (:class:`DTensor`):
输入 logits。假设在类别维度上分片。
目标 (Union[:class:`torch.Tensor`, :class:`DTensor`]):
必须是真实类索引(当前不支持类概率)。
假设已在 ``DeviceMesh`` 上复制。
重量(联合[:class:`torch.Tensor`,:class:`DTensor`],可选):
如果提供,假设已在 ``DeviceMesh`` 上复制。
label_smoothing:
目前不支持。
返回:
一个复制的 :class:`DTensor`。
示例:
在这里手动创建一个分片 DTensor 以展示其用法。
实际上,它通常是 TP 模块的输出。
>>> # xdoctest: +SKIP("分布式")
>>> 从 torch.distributed.tensor.parallel 导入 loss_parallel
>>> 从 torch.distributed.device_mesh 导入 init_device_mesh
>>> ...
>>> 初始化 device_mesh = init_device_mesh("cuda", (8,))
>>> input = torch.randn(4, 16, device="cuda", requires_grad=True)
>>> dist_input = distribute_tensor(input, device_mesh, placements=[Shard(1)])
>>> target = torch.randint(16, (4,), device="cuda")
>>> with loss_parallel():
>>> 损失 = F.cross_entropy(dist_input, target, reduction="mean")
>>> 损失.backward()
>>> ...
"""
_启用自定义损失操作()
yield
_禁用自定义损失操作()
目前仅需要支持一维 DeviceMesh;一般情况下返回
# 网格维度 mesh_dim 与 placements[mesh_dim].is_shard(dim)相关
def _find_all_reduce_mesh_dim(安排:
元组[
安置, ...
]
暗淡: int) -> int:
如果 not
长度(
安排) == 1:
提升 ValueError(
"目前 loss_parallel()仅支持一维 DeviceMesh 上的输入。"
)
如果 not
安排[0].is_shard(
暗淡):
提升 ValueError(
f"只有当输入张量在维度上分片时,才应启用 loss_parallel()。"{
暗淡}
。
)
返回 0
def 转换为_dtensor(
张量,
安排:
元组[
安置, ...
]
网格:
设备网格
) -> DTensor:
如果 isinstance(
张量, DTensor):
如果
张量.
布局 ==
安排:
返回
张量
否则:
提升
运行时错误(f
预期{
安排}
但是得到了{
张量.
安排}.")
elif isinstance(张量,
火炬.
张量):
返回 DTensor.
从本地(
张量,
装置网状结构=
网格,
安排=
安排,
运行检查=
假
)
否则:
提升
类型错误(f
"不支持类型"{
类型(
张量)}")
def 传播张量元信息(
操作调用:
火炬.
操作符.
操作重载,
参数:
元组[
对象, ...
]
kwargs: 字典[
字符串,
对象
]
) -> 张量元:
操作信息 = DTensor._op_dispatcher.
解包到操作信息(
操作调用,
参数, kwargs)
张量元数据 = DTensor._op_dispatcher.
分片传播器.
_传播张量元信息(
op_info.架构
)
如果 isinstance(
张量元,
张量元):
返回
张量元数据
elif isinstance(张量元,
元组):
返回
张量元[0]
否则:
提升
运行时错误(f
"意外张量元类型:"{
类型(
张量元)}.")
# 注意:实现遵循 torch._decomp.decomposition._log_softmax,
# 手动插入 all_reduce 以执行分布式计算。
def log_softmax(x,
暗淡, half_to_float,
网格,
网格维度):
如果 half_to_float:
断言 x.dtype ==
火炬.
半精度
computation_dtype, 结果数据类型 =
工具.
元素级数据类型(
x, 类型提升类型=
工具.
元素级类型提升类型.
默认
)
x = x.到(
数据类型=
计算数据类型,
内存格式=
火炬.
连续格式)
如果 x.
元素数量() == 0:
平移 = x
否则:
x_max = 火炬.amax(x,
暗淡,
保持维度=True)
x_max = funcol.all_reduce(
x_max, 简化运算符=c10d.ReduceOp.MAX.
名称,
群组=(
网格,
网格维度)
)
平移 = x - x_max
平移求和指数 =
火炬.
总和(
火炬.exp(
位移),
暗淡,
保持维度=True)
位移和指数 = funcol.all_reduce(
位移和指数,
简化运算符=c10d.ReduceOp.SUM.
名称,
群组=(
网格,
网格维度)
)
位移对数和指数 =
火炬.
日志(
平移求和指数)
结果 =
平移 -
平移对数求和指数
如果 not
半浮点数转换:
结果 =
结果.
到(
结果数据类型)
返回
结果
def _对数软阈值处理函数(
操作调用:
火炬.
操作符.
操作重载,
参数:
元组[
对象, ...
]
kwargs: 字典[
字符串,
对象
]
) -> 对象:
x = 角色(DTensor,
参数[0])
维度 =
角色(int,
参数[1])
半浮点 =
角色(
布尔,
参数[2])
规范 = x._spec
网格维度 = _find_all_reduce_mesh_dim(
规格.
安排,
暗淡)
输出张量元信息 = _propagate_tensor_meta(op_call,
参数, kwargs)
res = _log_softmax(x._本地张量,
暗淡, half_to_float,
规格.
网格,
网格维度)
res_spec = DTensorSpec(
规格.
网格,
规格.
安排,
张量元=
输出张量元数据,
)
返回 DTensor(
资源,
资源规范,
需要梯度=
资源.
需要梯度,
)
# 注意:如下文所述在_nll_loss_and_log_softmax_backward 中,_log_softmax_backward_handler 实际上并没有进行任何计算。
# _log_softmax_backward_handler 并没有进行任何计算。
def _log_softmax_backward_handler(
op_call: 火炬.
操作符.
操作重载,
参数:
元组[
对象, ...
]
kwargs: 字典[
字符串,
对象
]
) -> 对象:
梯度输出 =
角色(DTensor,
参数[0])
输入数据类型 =
角色(
火炬.
数据类型,
参数[3])
返回 grad_output.
到(
输入数据类型)
# 注意:实现遵循 torch._decomp.decomposition._nll_loss_forward,
# 并插入自定义通信以执行分布式计算。
def _nll_loss_forward(
x: 张量,
目标:
张量,
重量:
可选[
张量
]
local_weight: 可选[
张量
]
缩减: int,
忽略索引: int,
input_shape: 火炬.
尺寸,
channel_dim: int,
网格:
设备网状结构,
网格维度: int,
) -> 元组[
张量,
张量
]:
n_dims = x.暗淡()
通道维度 = 1
如果 n_dims < 2:
通道维度 = 0
def 权重视图(
重量:
张量) ->
张量:
如果 n_dims > 1:
形状 = [
1,
] * n_dims
shape[通道维度] =
重量.shape[0]
w = 重量.
视图(shape)
否则:
w = 权重
返回 w
如果
权重
是 not
无:
w = 权重视图(
重量)
断言
本地权重
是 not None
本地 w =
权重视图(
本地权重)
x = x * 本地_w
安全目标 =
火炬.
哪里(
目标 !=
忽略索引,
目标, 0)
安全目标_ =
安全目标.
展平(
通道维度)
# 以下代码块是一个分布式版本
# result = -torch.gather(self, channel_dim, safe_target_).squeeze(channel_dim)
部分放置 =
部分遮罩(
偏移形状=
输入形状,
偏移维度=
通道维度)
安全目标部分_ =
部分放置._partition_value(
安全目标_,
网格,
网格维度
)
结果部分 =
火炬.
收集(x,
通道维度,
安全目标部分_)
全局归约发生在这里
归约结果 =
部分放置._reduce_value(
部分结果,
网格,
网格维度)
结果 = -
结果减少.
挤压(
通道维度)
结果 =
火炬.
哪里(
目标 !=
忽略索引,
结果, 0)
如果
减少 ==
减少.
无.value
和 n_dims > 1:
总重量 = x.
新满((), 0.0)
返回
结果,
总重量
如果
权重
是 not
无:
新形状 =
列表(x.shape)
新形状[
通道维度] = -1
w = w.展开(
新形状)
加权求和 =
火炬.
收集(w,
频道维度,
安全目标).
挤压(
通道维度)
加权求和 =
火炬.
哪里(
目标 !=
忽略索引,
加权求和, 0)
总权重 =
无总结.
总和()
否则:
总重量 = (
目标 !=
忽略索引).
总和().
到(x)
# 注意:仅在 1D DeviceMesh 上正确;否则需要结果和总重量进行 all-reduce
# 需要 all-reduce 操作
如果
减少 ==
聚合.SUM.
值:
结果 =
结果.
总和()
elif 减少 ==
减少.
平均值.
值:
结果 =
结果.
总和() /
总重量
返回
结果,
总重量
def _nll_loss_forward_handler(
op_call: 火炬.
操作符.
操作重载,
参数:
元组[
对象, ...
]
kwargs: 字典[
字符串,
对象
]
) -> 对象:
x = 角色(DTensor,
参数[0])
目标 =
参数[1]
权重 =
参数[2]
减少 =
角色(int,
参数[3])
ignore_index = 角色(int,
参数[4])
通道维度 = 1
如果 x.
暗淡() >= 2
否则 0
规范 = x._spec
网格维度 =
_查找所有 reduce 网格维度(
规格.
安排,
通道维度)
# 检查用户输入:如果目标和权重不是 DTensors,则将它们转换为 DTensors;
检查它们是否为 DTensors,并确认它们具有所需的放置。
目标放置 =
_跳过维度(
复制降维维度(
规格.
安排, [
通道维度
)]
通道维度
)
所有复制放置 = (
复制(),) *
规格.
网格.
维数
目标 =
转换为数据张量(
目标,
目标位置,
规格.
网格)
本地权重 = None
如果
权重
是 not
无:
权重 =
转换为_dtensor(
重量,
所有复制位置,
规格.
网格)
# 用于本地计算,需要(复制的)权重和(分片的)本地权重
# 在_nll_loss_forward()中都需要。本地权重在这里使用
# DTensor API 生成,无需任何通信。
分片放置 = [
片段(0)
如果 i ==
网格维度
否则
复制()
为 i
在
范围(
规格.
网格.ndim)
]
本地权重 =
重量.
重新分配(
规格.
网格,
分片放置)._local_tensor
断言
本地权重.shape[0] == x.
_本地张量.shape[
通道维度]
如果
减少 ==
减少.
无.
值:
输出位置 =
目标位置
否则:
输出位置 =
所有复制位置
# _propagate_tensor_meta 需要的张量输入必须是 DTensors
args = 列表(
参数)
参数[1
]
参数[2] =
目标,
权重
输出张量元数据 = _propagate_tensor_meta(
操作调用,
元组(
参数), kwargs)
结果,
总重量 = _nll_loss_forward(
x._本地张量,
目标.
_本地张量,
重量._local_tensor
如果
权重
是 not None
否则
无,
本地权重,
缩减,
忽略索引,
x.shape,
通道维度,
规格.
网格,
网格维度,
)
输出规范 = DTensorSpec(
规格.
网格,
输出位置,
张量元=
输出张量元数据)
返回 (
DTensor(
结果,
输出规范,
需要梯度=
结果.
需要梯度,
),
总权重,
)
# 备注:交叉熵的反向计算分为两步:
# 对 nll_loss 进行反向传播,然后对 log_softmax 进行反向传播。在损失并行中,
# 这两个步骤被融合为以下函数(由_nll_loss_backward_handler 调用)
# 以避免当目标包含类别索引而不是类别概率时的通信。
# 此外,_log_softmax_backward_handler 不执行计算。
实现类似于 _nll_loss_backward 和 _log_softmax_backward_data
来自 torch._decomp.decomposition.
def _nll_loss_and_log_softmax_backward(
grad_output: 张量,
x: 张量,
目标:
张量,
重量:
可选[
张量
]
缩减: int,
忽略索引: int,
总权重:
张量,
输入形状:
火炬.
尺寸,
通道维度: int,
网格:
设备网状结构,
网格维度: int,
) -> 张量:
通道维度 = 0
如果 x.
暗淡() < 2
否则 1
如果
减少 ==
减少.MEAN.
值:
梯度输出 =
梯度输出 /
总重量
目标 =
目标.
展平(
通道维度)
安全目标 =
火炬.
哪里(
目标 !=
忽略索引,
目标, 0)
grad_input = 火炬.
与...相同形状的零(x)
# 以下代码块是一个分布式版本
# grad_input = torch.scatter(grad_input, channel_dim, safe_target, -1.0)
部分放置 =
部分蒙版(
偏移形状=
输入形状,
偏移维度=
频道维度)
安全目标 =
安全目标.
挤压(
频道维度).
展平()
隐藏安全目标 =
部分放置._partition_value(
安全目标,
网格,
网格维度)
只有当未标记时才更新 grad_input 为-1
断言
部分放置.
遮罩缓冲区.
数据
是 not None
梯度更新 =
部分放置.
遮罩缓冲区.
数据.
到(
梯度输入.
数据类型) - 1.0
一维 arange =
火炬.arange(
遮罩安全目标.shape[0
]
设备=
隐藏的安全目标.
设备
)
前两个案例中 x.dim() <= 2 的情况是针对 aten.nll_loss_backward.default 的;
最后一个案例是针对 aten.nll_loss2d_backward.default 的。
如果 x.
暗淡() == 1:
grad_input[隐藏安全目标] =
梯度更新
elif x.暗淡() == 2:
梯度输入[
一维数组生成,
隐藏安全目标] =
梯度更新
否则:
梯度输入_t =
梯度输入.
转置(
通道维度, -1)
中间形状 =
输入梯度.
形状
输入梯度_2d = grad_input_t.
重塑(-1, x.shape[channel_dim])
grad_input_2d[arange_1d, 隐藏安全目标] =
梯度更新
梯度输入 =
梯度输入_2d.
视图(
中间形状).
转置(
通道维度, -1)
如果
输入梯度.
暗淡() > grad_output.
暗淡() > 0:
梯度输出 = grad_output.
展平(
通道维度)
如果
权重
是 not
无:
新形状 = [1
为 _
在
范围(x.
暗淡
]]())
新形状[
通道维度] =
重量.shape[0]
权重 =
重量.
重塑(
新形状)
为了使融合计算工作,以下行被重写。
# grad_output = grad_output * weight 的翻译
新形状 =
列表(x.shape)
新形状[
通道维度] = -1
w = 重量.
展开(
新形状)
目标 =
火炬.
收集(w,
通道维度,
目标)
梯度输出 =
梯度输出 *
目标
梯度输出 =
火炬.
哪里(
目标 !=
忽略索引, grad_output, 0)
# 备注:对于 log_softmax,不应直接将 grad_input 作为 grad_output 返回
我们对整个 log_softmax 进行反向计算,以避免额外的 all_gather 通信。
否则会有额外的 all_gather 通信。
返回 grad_input * grad_output。
返回 (
grad_input。 +
火炬.exp(x)) *
梯度输出
def _nll_loss_backward_handler(
op_call: 火炬.
操作符.
操作重载,
参数:
元组[
对象, ...
]
kwargs: 字典[
字符串,
对象
]
) -> 对象:
梯度输出 =
角色(DTensor,
参数[0])
x = 角色(DTensor,
参数[1])
目标 =
参数[2]
权重 =
参数[3]
减少 =
角色(int,
参数[4])
ignore_index = 角色(int,
参数[5])
total_weight = 角色(
张量,
参数[6])
channel_dim = 1 如果 x.
暗淡() >= 2
否则 0
规范 = x._spec
网格维度 = _find_all_reduce_mesh_dim(
规格.
安排, channel_dim)
# 如果目标和权重不是 DTensors,则将它们转换为 DTensors
target_placements = 跳过维度(
复制减少维度(
规格.
安排, [
通道维度
)]
通道维度
)
所有复制位置 = (
复制(),) *
规格.
网格.
维数
目标 =
转换为 dtensor(
目标,
目标位置,
规格.
网格)
如果
权重
是 not
无:
权重 =
转换为 dtensor(
重量,
所有复制位置,
规格.
网格)
# _propagate_tensor_meta 需要的张量输入必须是 DTensors
args = 列表(
参数)
参数[2
]
参数[3] =
目标,
权重
参数[6] = _cast_to_dtensor(
总权重,
所有复制放置,
规格.
网格)
输出张量元信息 =
_传播张量元信息(
操作调用,
元组(
参数), kwargs)
结果 = _nll_loss_and_log_softmax_backward(
grad_output._本地张量,
x._本地张量,
目标.
_本地张量,
重量._local_tensor
如果
权重
是 not None
否则
无,
缩减,
忽略索引,
总权重,
x.shape,
通道维度,
规格.
网格,
网格维度,
)
输出分片与输入分片相同:在 mesh_dim 上 Shard(通道维度)
输出规范 = DTensorSpec(
规格.
网格,
规格.
安排,
张量元=
输出张量元数据,
)
返回 DTensor(
结果,
输出规范,
需要梯度=
结果.
需要梯度,
)
定制损失操作 = {
aten._log_softmax.默认: _log_softmax_handler,
aten._log_softmax 反向数据.
默认:
_log_softmax 反向处理程序,
aten.nll_loss 正向.
默认:
_nll_loss 正向处理程序,
aten.nll_loss2d_forward.默认: _nll_loss_forward_handler,
aten.nll_loss_backward.默认: _nll_loss_backward_handler,
aten.nll_loss2d_backward.默认: _nll_loss_backward_handler,
}
def _enable_custom_loss_ops():
DTensor._op_dispatcher._custom_op_handlers.更新(
自定义损失操作)
def 禁用自定义损失操作():
为
自定义操作
在
自定义损失操作:
DTensor._op_dispatcher.自定义操作处理器.
弹出(
自定义操作)