快捷键

torch.distributed.tensor.parallel.loss 的源代码

# mypy: 允许未类型化定义
© Meta Platforms, Inc. 及其关联公司
导入 contextlib
来自 打字 导入 角色, 可选

导入 火炬
导入 torch._prims_common 作为 工具
导入 torch.distributed._functional_collectives 作为 funcol
导入 torch.distributed.distributed_c10d 作为 c10d
来自 火炬 导入 张量
来自 torch.distributed.device_mesh 导入 设备网格
来自 torch.distributed.tensor 导入 DTensor, 复制, 片段
来自 torch.distributed.tensor._dtensor_spec 导入 DTensorSpec, 张量元
来自 torch.distributed.tensor._ops._embedding_ops 导入 _MaskPartial
来自 torch.distributed.tensor._ops._math_ops 导入 (
    _skip_dim,
    Reduction,
    复制降维维度,
)
来自 torch.distributed.tensor.placement_types 导入 安置


aten = 火炬.操作.aten


__all__ = ["损失并行"]


[文档]@contextlib.contextmanager def 损失并行(): ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 一个启用损失并行性的上下文管理器,其中高效的并行化损失计算 可以在输入在类别维度上分片时执行。目前仅支持交叉熵 损失函数是支持的。 在这个上下文管理器中,可以使用 :func:`~torch.nn.functional.cross_entropy` 或 class:`~torch.nn.CrossEntropyLoss` 如常,对输入参数有以下假设。 如果有,相应的 ``backward()`` 调用也需要在这个上下文管理器下进行。 Args: 输入 (:class:`DTensor`): 输入 logits。假设在类别维度上分片。 目标 (Union[:class:`torch.Tensor`, :class:`DTensor`]): 必须是真实类索引(当前不支持类概率)。 假设已在 ``DeviceMesh`` 上复制。 重量(联合[:class:`torch.Tensor`,:class:`DTensor`],可选): 如果提供,假设已在 ``DeviceMesh`` 上复制。 label_smoothing: 目前不支持。 返回: 一个复制的 :class:`DTensor`。 示例: 在这里手动创建一个分片 DTensor 以展示其用法。 实际上,它通常是 TP 模块的输出。 >>> # xdoctest: +SKIP("分布式") >>> 从 torch.distributed.tensor.parallel 导入 loss_parallel >>> 从 torch.distributed.device_mesh 导入 init_device_mesh >>> ... >>> 初始化 device_mesh = init_device_mesh("cuda", (8,)) >>> input = torch.randn(4, 16, device="cuda", requires_grad=True) >>> dist_input = distribute_tensor(input, device_mesh, placements=[Shard(1)]) >>> target = torch.randint(16, (4,), device="cuda") >>> with loss_parallel(): >>> 损失 = F.cross_entropy(dist_input, target, reduction="mean") >>> 损失.backward() >>> ... """ _启用自定义损失操作() yield _禁用自定义损失操作()
目前仅需要支持一维 DeviceMesh;一般情况下返回 # 网格维度 mesh_dim 与 placements[mesh_dim].is_shard(dim)相关 def _find_all_reduce_mesh_dim(安排
: 元组[安置, ...] 暗淡: int) -> int: 如果 not 长度(安排) == 1: 提升 ValueError( "目前 loss_parallel()仅支持一维 DeviceMesh 上的输入。" ) 如果 not 安排[0].is_shard(暗淡): 提升 ValueError( f"只有当输入张量在维度上分片时,才应启用 loss_parallel()。"{暗淡} ) 返回 0 def 转换为_dtensor( 张量, 安排: 元组[安置, ...] 网格: 设备网格 ) -> DTensor: 如果 isinstance(张量, DTensor): 如果 张量.布局 == 安排: 返回 张量 否则: 提升 运行时错误(f预期{安排}但是得到了{张量.安排}.") elif isinstance(张量, 火炬.张量): 返回 DTensor.从本地( 张量, 装置网状结构=网格, 安排=安排, 运行检查= ) 否则: 提升 类型错误(f"不支持类型"{类型(张量)}") def 传播张量元信息( 操作调用: 火炬.操作符.操作重载, 参数: 元组[对象, ...] kwargs: 字典[字符串, 对象] ) -> 张量元: 操作信息 = DTensor._op_dispatcher.解包到操作信息(操作调用, 参数, kwargs) 张量元数据 = DTensor._op_dispatcher.分片传播器._传播张量元信息( op_info.架构 ) 如果 isinstance(张量元, 张量元): 返回 张量元数据 elif isinstance(张量元, 元组): 返回 张量元[0] 否则: 提升 运行时错误(f"意外张量元类型:"{类型(张量元)}.") # 注意:实现遵循 torch._decomp.decomposition._log_softmax, # 手动插入 all_reduce 以执行分布式计算。 def log_softmax(x, 暗淡, half_to_float, 网格, 网格维度): 如果 half_to_float: 断言 x.dtype == 火炬.半精度 computation_dtype, 结果数据类型 = 工具.元素级数据类型( x, 类型提升类型=工具.元素级类型提升类型.默认 ) x = x.(数据类型=计算数据类型, 内存格式=火炬.连续格式) 如果 x.元素数量() == 0: 平移 = x 否则: x_max = 火炬.amax(x, 暗淡, 保持维度=True) x_max = funcol.all_reduce( x_max, 简化运算符=c10d.ReduceOp.MAX.名称, 群组=(网格, 网格维度) ) 平移 = x - x_max 平移求和指数 = 火炬.总和(火炬.exp(位移), 暗淡, 保持维度=True) 位移和指数 = funcol.all_reduce( 位移和指数, 简化运算符=c10d.ReduceOp.SUM.名称, 群组=(网格, 网格维度) ) 位移对数和指数 = 火炬.日志(平移求和指数) 结果 = 平移 - 平移对数求和指数 如果 not 半浮点数转换: 结果 = 结果.(结果数据类型) 返回 结果 def _对数软阈值处理函数( 操作调用: 火炬.操作符.操作重载, 参数: 元组[对象, ...] kwargs: 字典[字符串, 对象] ) -> 对象: x = 角色(DTensor, 参数[0]) 维度 = 角色(int, 参数[1]) 半浮点 = 角色(布尔, 参数[2]) 规范 = x._spec 网格维度 = _find_all_reduce_mesh_dim(规格.安排, 暗淡) 输出张量元信息 = _propagate_tensor_meta(op_call, 参数, kwargs) res = _log_softmax(x._本地张量, 暗淡, half_to_float, 规格.网格, 网格维度) res_spec = DTensorSpec( 规格.网格, 规格.安排, 张量元=输出张量元数据, ) 返回 DTensor( 资源, 资源规范, 需要梯度=资源.需要梯度, ) # 注意:如下文所述在_nll_loss_and_log_softmax_backward 中,_log_softmax_backward_handler 实际上并没有进行任何计算。 # _log_softmax_backward_handler 并没有进行任何计算。 def _log_softmax_backward_handler( op_call: 火炬.操作符.操作重载, 参数: 元组[对象, ...] kwargs: 字典[字符串, 对象] ) -> 对象: 梯度输出 = 角色(DTensor, 参数[0]) 输入数据类型 = 角色(火炬.数据类型, 参数[3]) 返回 grad_output.(输入数据类型) # 注意:实现遵循 torch._decomp.decomposition._nll_loss_forward, # 并插入自定义通信以执行分布式计算。 def _nll_loss_forward( x: 张量, 目标: 张量, 重量: 可选[张量] local_weight: 可选[张量] 缩减: int, 忽略索引: int, input_shape: 火炬.尺寸, channel_dim: int, 网格: 设备网状结构, 网格维度: int, ) -> 元组[张量, 张量]: n_dims = x.暗淡() 通道维度 = 1 如果 n_dims < 2: 通道维度 = 0 def 权重视图(重量: 张量) -> 张量: 如果 n_dims > 1: 形状 = [ 1, ] * n_dims shape[通道维度] = 重量.shape[0] w = 重量.视图(shape) 否则: w = 权重 返回 w 如果 权重 not : w = 权重视图(重量) 断言 本地权重 not None 本地 w = 权重视图(本地权重) x = x * 本地_w 安全目标 = 火炬.哪里(目标 != 忽略索引, 目标, 0) 安全目标_ = 安全目标.展平(通道维度) # 以下代码块是一个分布式版本 # result = -torch.gather(self, channel_dim, safe_target_).squeeze(channel_dim) 部分放置 = 部分遮罩(偏移形状=输入形状, 偏移维度=通道维度) 安全目标部分_ = 部分放置._partition_value( 安全目标_, 网格, 网格维度 ) 结果部分 = 火炬.收集(x, 通道维度, 安全目标部分_) 全局归约发生在这里 归约结果 = 部分放置._reduce_value(部分结果, 网格, 网格维度) 结果 = -结果减少.挤压(通道维度) 结果 = 火炬.哪里(目标 != 忽略索引, 结果, 0) 如果 减少 == 减少..value n_dims > 1: 总重量 = x.新满((), 0.0) 返回 结果, 总重量 如果 权重 not : 新形状 = 列表(x.shape) 新形状[通道维度] = -1 w = w.展开(新形状) 加权求和 = 火炬.收集(w, 频道维度, 安全目标).挤压(通道维度) 加权求和 = 火炬.哪里(目标 != 忽略索引, 加权求和, 0) 总权重 = 无总结.总和() 否则: 总重量 = (目标 != 忽略索引).总和().(x) # 注意:仅在 1D DeviceMesh 上正确;否则需要结果和总重量进行 all-reduce # 需要 all-reduce 操作 如果 减少 == 聚合.SUM.: 结果 = 结果.总和() elif 减少 == 减少.平均值.: 结果 = 结果.总和() / 总重量 返回 结果, 总重量 def _nll_loss_forward_handler( op_call: 火炬.操作符.操作重载, 参数: 元组[对象, ...] kwargs: 字典[字符串, 对象] ) -> 对象: x = 角色(DTensor, 参数[0]) 目标 = 参数[1] 权重 = 参数[2] 减少 = 角色(int, 参数[3]) ignore_index = 角色(int, 参数[4]) 通道维度 = 1 如果 x.暗淡() >= 2 否则 0 规范 = x._spec 网格维度 = _查找所有 reduce 网格维度(规格.安排, 通道维度) # 检查用户输入:如果目标和权重不是 DTensors,则将它们转换为 DTensors; 检查它们是否为 DTensors,并确认它们具有所需的放置。 目标放置 = _跳过维度( 复制降维维度(规格.安排, [通道维度)] 通道维度 ) 所有复制放置 = (复制(),) * 规格.网格.维数 目标 = 转换为数据张量(目标, 目标位置, 规格.网格) 本地权重 = None 如果 权重 not : 权重 = 转换为_dtensor(重量, 所有复制位置, 规格.网格) # 用于本地计算,需要(复制的)权重和(分片的)本地权重 # 在_nll_loss_forward()中都需要。本地权重在这里使用 # DTensor API 生成,无需任何通信。 分片放置 = [ 片段(0) 如果 i == 网格维度 否则 复制() i 范围(规格.网格.ndim) ] 本地权重 = 重量.重新分配(规格.网格, 分片放置)._local_tensor 断言 本地权重.shape[0] == x._本地张量.shape[通道维度] 如果 减少 == 减少..: 输出位置 = 目标位置 否则: 输出位置 = 所有复制位置 # _propagate_tensor_meta 需要的张量输入必须是 DTensors args = 列表(参数) 参数[1] 参数[2] = 目标, 权重 输出张量元数据 = _propagate_tensor_meta(操作调用, 元组(参数), kwargs) 结果, 总重量 = _nll_loss_forward( x._本地张量, 目标._本地张量, 重量._local_tensor 如果 权重 not None 否则 , 本地权重, 缩减, 忽略索引, x.shape, 通道维度, 规格.网格, 网格维度, ) 输出规范 = DTensorSpec(规格.网格, 输出位置, 张量元=输出张量元数据) 返回 ( DTensor( 结果, 输出规范, 需要梯度=结果.需要梯度, ), 总权重, ) # 备注:交叉熵的反向计算分为两步: # 对 nll_loss 进行反向传播,然后对 log_softmax 进行反向传播。在损失并行中, # 这两个步骤被融合为以下函数(由_nll_loss_backward_handler 调用) # 以避免当目标包含类别索引而不是类别概率时的通信。 # 此外,_log_softmax_backward_handler 不执行计算。 实现类似于 _nll_loss_backward 和 _log_softmax_backward_data 来自 torch._decomp.decomposition. def _nll_loss_and_log_softmax_backward( grad_output: 张量, x: 张量, 目标: 张量, 重量: 可选[张量] 缩减: int, 忽略索引: int, 总权重: 张量, 输入形状: 火炬.尺寸, 通道维度: int, 网格: 设备网状结构, 网格维度: int, ) -> 张量: 通道维度 = 0 如果 x.暗淡() < 2 否则 1 如果 减少 == 减少.MEAN.: 梯度输出 = 梯度输出 / 总重量 目标 = 目标.展平(通道维度) 安全目标 = 火炬.哪里(目标 != 忽略索引, 目标, 0) grad_input = 火炬.与...相同形状的零(x) # 以下代码块是一个分布式版本 # grad_input = torch.scatter(grad_input, channel_dim, safe_target, -1.0) 部分放置 = 部分蒙版(偏移形状=输入形状, 偏移维度=频道维度) 安全目标 = 安全目标.挤压(频道维度).展平() 隐藏安全目标 = 部分放置._partition_value(安全目标, 网格, 网格维度) 只有当未标记时才更新 grad_input 为-1 断言 部分放置.遮罩缓冲区.数据 not None 梯度更新 = 部分放置.遮罩缓冲区.数据.(梯度输入.数据类型) - 1.0 一维 arange = 火炬.arange( 遮罩安全目标.shape[0] 设备=隐藏的安全目标.设备 ) 前两个案例中 x.dim() <= 2 的情况是针对 aten.nll_loss_backward.default 的; 最后一个案例是针对 aten.nll_loss2d_backward.default 的。 如果 x.暗淡() == 1: grad_input[隐藏安全目标] = 梯度更新 elif x.暗淡() == 2: 梯度输入[一维数组生成, 隐藏安全目标] = 梯度更新 否则: 梯度输入_t = 梯度输入.转置(通道维度, -1) 中间形状 = 输入梯度.形状 输入梯度_2d = grad_input_t.重塑(-1, x.shape[channel_dim]) grad_input_2d[arange_1d, 隐藏安全目标] = 梯度更新 梯度输入 = 梯度输入_2d.视图(中间形状).转置(通道维度, -1) 如果 输入梯度.暗淡() > grad_output.暗淡() > 0: 梯度输出 = grad_output.展平(通道维度) 如果 权重 not : 新形状 = [1 _ 范围(x.暗淡]]()) 新形状[通道维度] = 重量.shape[0] 权重 = 重量.重塑(新形状) 为了使融合计算工作,以下行被重写。 # grad_output = grad_output * weight 的翻译 新形状 = 列表(x.shape) 新形状[通道维度] = -1 w = 重量.展开(新形状) 目标 = 火炬.收集(w, 通道维度, 目标) 梯度输出 = 梯度输出 * 目标 梯度输出 = 火炬.哪里(目标 != 忽略索引, grad_output, 0) # 备注:对于 log_softmax,不应直接将 grad_input 作为 grad_output 返回 我们对整个 log_softmax 进行反向计算,以避免额外的 all_gather 通信。 否则会有额外的 all_gather 通信。 返回 grad_input * grad_output。 返回 (grad_input。 + 火炬.exp(x)) * 梯度输出 def _nll_loss_backward_handler( op_call: 火炬.操作符.操作重载, 参数: 元组[对象, ...] kwargs: 字典[字符串, 对象] ) -> 对象: 梯度输出 = 角色(DTensor, 参数[0]) x = 角色(DTensor, 参数[1]) 目标 = 参数[2] 权重 = 参数[3] 减少 = 角色(int, 参数[4]) ignore_index = 角色(int, 参数[5]) total_weight = 角色(张量, 参数[6]) channel_dim = 1 如果 x.暗淡() >= 2 否则 0 规范 = x._spec 网格维度 = _find_all_reduce_mesh_dim(规格.安排, channel_dim) # 如果目标和权重不是 DTensors,则将它们转换为 DTensors target_placements = 跳过维度( 复制减少维度(规格.安排, [通道维度)] 通道维度 ) 所有复制位置 = (复制(),) * 规格.网格.维数 目标 = 转换为 dtensor(目标, 目标位置, 规格.网格) 如果 权重 not : 权重 = 转换为 dtensor(重量, 所有复制位置, 规格.网格) # _propagate_tensor_meta 需要的张量输入必须是 DTensors args = 列表(参数) 参数[2] 参数[3] = 目标, 权重 参数[6] = _cast_to_dtensor(总权重, 所有复制放置, 规格.网格) 输出张量元信息 = _传播张量元信息(操作调用, 元组(参数), kwargs) 结果 = _nll_loss_and_log_softmax_backward( grad_output._本地张量, x._本地张量, 目标._本地张量, 重量._local_tensor 如果 权重 not None 否则 , 缩减, 忽略索引, 总权重, x.shape, 通道维度, 规格.网格, 网格维度, ) 输出分片与输入分片相同:在 mesh_dim 上 Shard(通道维度) 输出规范 = DTensorSpec( 规格.网格, 规格.安排, 张量元=输出张量元数据, ) 返回 DTensor( 结果, 输出规范, 需要梯度=结果.需要梯度, ) 定制损失操作 = { aten._log_softmax.默认: _log_softmax_handler, aten._log_softmax 反向数据.默认: _log_softmax 反向处理程序, aten.nll_loss 正向.默认: _nll_loss 正向处理程序, aten.nll_loss2d_forward.默认: _nll_loss_forward_handler, aten.nll_loss_backward.默认: _nll_loss_backward_handler, aten.nll_loss2d_backward.默认: _nll_loss_backward_handler, } def _enable_custom_loss_ops(): DTensor._op_dispatcher._custom_op_handlers.更新(自定义损失操作) def 禁用自定义损失操作(): 自定义操作 自定义损失操作: DTensor._op_dispatcher.自定义操作处理器.弹出(自定义操作)

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源