# mypy: 允许未类型化定义
© Meta Platforms, Inc. 及其关联公司
来自 abc
导入 ABC,
抽象方法
来自 functools
导入
部分信息
来自
打字
导入
任意,
可选,
联合
导入
火炬
导入 torch.nn
作为
然后
来自 torch.distributed.tensor
导入 (
设备网状结构,
分布式模块,
分配张量,
DTensor,
复制,
片段,
)
来自 torch.distributed.tensor.placement_types
导入
布置
__all__ = [
"并行样式",
"行并行",
"序列并行",
"列并行",
"准备模块输入",
"准备模块输出",
]
类
并行风格(ABC):
""
并行风格合约定义了模块或子模块应该如何并行化。
它仅定义了`parallelize_module`使用的`apply`方法,这为不同类型的风格实现提供了最大灵活性。
text: text: 它为不同类型的风格实现提供了最大灵活性。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
src_data_rank: 可选[int] = 0
@abstractmethod
def 应用(
自身,
模块: nn.
模块,
装置网状结构:
设备网状结构) -> nn.
模块: ...
[文档]
类
逐列并行(
并行风格):
""
以列式方式分割兼容的 nn.Module。目前支持 nn.Linear 和 nn.Embedding。
用户可以将其与 RowwiseParallel 组合使用,以实现更复杂模块的划分。
(即 MLP、Attention)
关键字参数:
input_layouts(放置,可选):
nn.Module 的输入张量的 DTensor 布局,这用于注释输入张量以
成为 DTensor。如果未指定,我们假设输入张量是复制的。
output_layouts(放置,可选):
nn.Module 的输出 DTensor 布局,这用于确保 nn.Module 的输出
使用用户期望的布局。如果未指定,输出张量将在最后一个维度上进行分片。
use_local_output (布尔值,可选):
是否使用本地 :class:`torch.Tensor` 而不是 :class:`DTensor` 作为模块输出,默认:True。
返回:
表示 nn.Module 的 Colwise 分片的一个 :class:`ParallelStyle` 对象。
示例::
>>> # xdoctest: +SKIP(失败)
>>> 从 torch.distributed.tensor.parallel 导入 parallelize_module 和 ColwiseParallel
>>> 从 torch.distributed.device_mesh 导入 init_device_mesh
>>> ...
>>> m = Model(...) # m 是一个包含“w1”nn.Linear 子模块的 nn.Module
>>> tp_mesh = init_device_mesh("cuda", (8,))
...
>>> # 默认情况下,“w1”Linear 的输入将被转换为 Replicated DTensor
>>> # "w1"的输出将返回:class:`torch.Tensor`,该 Tensor 在最后一个维度上进行分片。
...
>>> sharded_mod = parallelize_module(m, tp_mesh, {"w1": ColwiseParallel()})
>>> ...
.. note:: 默认情况下,如果未指定`output_layouts`,`ColwiseParallel`的输出将在最后一个维度上进行分片,
如果存在需要特定张量形状的操作(即在对的`RowwiseParallel`之前),则
请注意,如果输出被分片,操作员可能需要调整到分片的大小。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 初始化(
自身,
*,
输入布局:
可选[
安置] =
无,
输出布局:
可选[
安置] =
无,
使用本地输出:
布尔类型 = True,
):
超级().
初始化()
自身.
输入布局 = (
输入布局
或
复制(),)
自身.
输出布局 = (
输出布局
或
片段(-1),)
# 按列线性运行时分片(期望的分片):
# 1. 需要复制输入
# 2. 在最后一个维度上的分片输出
自身.
需要的输入布局 = (
复制(),)
自身.
使用本地输出 =
使用本地输出
@staticmethod
def _准备输入函数(
输入布局,
需要的输入布局,
模块,
输入,
设备网状
):
# TODO: 解决 Dynamo 对实例方法的支持问题,并将此切换为实例方法
# 使用 input_layouts 注释模块输入位置/分片
输入张量 =
输入[0]
如果 not isinstance(
输入张量, DTensor):
输入张量 = DTensor.
从本地(
输入张量,
装置网状结构, input_layouts,
运行检查=
假
)
# 将输入布局转换为 ColwiseParallel 所需的布局
如果
输入布局 != desired_input_layouts:
输入张量 =
输入张量.
重新分配(
安排=desired_input_layouts, async_op=
真实
)
返回
输入张量
def _partition_linear_fn(自身,
名称,
模块,
装置网状结构):
# 将列向分片权重/偏差分片到 Shard(0),权重为 Shard(0)
# means Colwise as Linear is input * weight^T + bias, 其中
# weight 会变成 Shard(1)
为
名称,
参数
在
模块.
命名参数。():
dist_param = nn.参数(
分配张量(
参数,
装置网状结构, [
片段(0
)], src_data_rank=
自身.
src_data_rank
源数据排名
)
)
模块.
注册参数(
名称,
分布参数)
def _partition_embedding_fn(自身,
名称,
模块,
装置网状结构):
# colwise shard embedding.weight 是直接的,因为 Shard(1)
为
名称,
参数
在
模块.
命名参数。():
dist_param = nn.参数(
分配张量(
参数,
装置网状结构, [
片段(1
)], src_data_rank=
自身.
src_data_rank
源数据排名
)
)
模块.
注册参数(
名称,
分布参数)
@staticmethod
def _prepare_output_fn(输出布局,
使用本地输出,
模块,
输出,
装置网状结构):
# 输出是沿最后一个维度 DTensor 的分片,即 Shard(-1)
如果
输出.
布局 !=
输出布局:
输出 =
输出.
重新分配(
安排=
输出布局, async_op=True)
# 返回本地张量
返回
输出.
本地化()
如果
使用本地输出
否则
输出
def 应用(
自身,
模块: nn.
模块,
装置网状结构:
设备网状结构) -> nn.
模块:
如果 isinstance(
模块, nn.
线性):
分区函数 =
自身._partition_linear_fn
elif isinstance(模块, nn.
嵌入):
分区函数 =
自身.
分区嵌入函数
否则:
提升
不支持的操作异常(
ColwiseParallel 目前仅支持 nn.Linear 和 nn.Embedding!
)
返回
分布式模块(
模块,
装置网状结构,
分区函数,
偏函数(
自身._prepare_input_fn,
自身.
输入布局,
自身.
需要的输入布局
),
偏函数(
自身.
准备输出函数,
自身.
输出布局,
自身.
使用本地输出
),
)
[文档]
类
行并行(
并行风格):
""
以行方式对兼容的 nn.Module 进行分区。目前支持 nn.Linear 和 nn.Embedding。
用户可以使用 ColwiseParallel 与之组合,以实现更复杂模块的划分。
(即 MLP、Attention)
关键字参数:
输入布局(放置,可选):
DTensor 布局的输入张量,用于 nn.Module,这用于注释输入张量以
成为 DTensor。如果未指定,我们假设输入张量在最后一个维度上分片。
输出布局(放置,可选):
nn.Module 输出 DTensor 布局,用于确保 nn.Module 的输出符合用户期望的布局。如果未指定,输出张量将被复制。
使用局部输出(bool,可选):是否使用局部:class:`torch.Tensor`代替:class:`DTensor`作为模块输出,默认:True。
use_local_output(bool,可选):
是否使用局部:class:`torch.Tensor`代替:class:`DTensor`进行模块输出,默认:True。
返回:
表示 nn.Module 行分片并行风格的:class:`ParallelStyle`对象。
示例::
>>> # xdoctest: +SKIP(失败)
>>> 从 torch.distributed.tensor.parallel 导入 parallelize_module 和 RowwiseParallel
>>> 从 torch.distributed.device_mesh 导入 init_device_mesh
>>> ...
>>> m = Model(...) # m 是一个包含“w2”nn.Linear 子模块的 nn.Module
>>> tp_mesh = init_device_mesh("cuda", (8,))
...
>>> 默认情况下,"w2" 线性模块的输入将被转换为在最后一个维度上分片的 DTensor
>>> "w2" 的输出将返回一个复制的 :class:`torch.Tensor`
...
>>> sharded_mod = parallelize_module(m, tp_mesh, {"w2": RowwiseParallel()}),
>>> ...
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 初始化(
自身,
*,
输入布局:
可选[
安置] =
无,
输出布局:
可选[
安置] =
无,
使用本地输出:
布尔类型 = True,
):
超级().
初始化()
自身.
输入布局 = (
输入布局
或
片段(-1),)
自身.
输出布局 = (
输出布局
或
复制(),)
自身.
使用本地输出 =
使用本地输出
@staticmethod
def 准备输入函数(
输入布局,
期望的输入布局,
模块,
输入,
设备网状
):
输入张量 =
输入[0]
如果 not isinstance(
输入张量, DTensor):
输入张量 = DTensor.
从本地(
输入张量,
装置网状结构,
输入布局,
运行检查=
假
)
如果
输入布局 !=
期望的输入布局:
输入张量 =
输入张量.
重新分配(
安排=
期望的输入布局, async_op=
真实
)
返回
输入张量
def _分区线性函数(
自身,
名称,
模块,
装置网状结构):
# 行级分片权重到 Shard(1),偏差到 Replicate(),权重为 Shard(1)
# 表示行级,因为 nn.Linear 是输入 * 权重^T + 偏差,其中
# 重量将成为 Shard(0)
模块.
注册参数(
权重,
nn.参数(
分配张量(
模块.
重量,
装置网状结构,
[片段(1
)],
src_data_rank=自身.src_data_rank,
)
),
)
如果 getattr(
模块,
偏置,
无)
是 not
无:
# 线性模块有偏置
模块.
注册参数(
偏置,
nn.参数(
分配张量(
模块.
偏差,
装置网状结构,
[复制
]
src_data_rank=自身.src_data_rank,
)
),
)
def _partition_embedding_fn(自身,
名称,
模块,
装置网状结构):
# 行向 Shard embedding.weight 是 Shard(0)
为
名称,
参数
在
模块.
命名参数。():
距离参数 = nn.
参数(
分配张量(
参数,
装置网状结构, [
片段(0
)], src_data_rank=
自身.
src_data_rank
源数据排名
)
)
模块.
注册参数(
名称,
分布参数)
@staticmethod
def _准备输出函数(
输出布局,
使用本地输出,
模块,
输出,
装置网状结构):
行级分片会产生部分输出,具体取决于输出布局:
1. 要复制 -> allreduce
2. 要分片 -> reduce_scatter
如果
输出.
布局 != output_layouts:
输出 =
输出.
重新分配(
安排=
输出布局, async_op=True)
如果 use_local_output 为 True,则返回本地张量
返回
输出.
本地化()
如果
使用本地输出
否则
输出
def 应用(
自身,
模块: nn.
模块,
装置网状结构:
设备网状结构) -> nn.
模块:
如果 isinstance(
模块, nn.
线性):
分区函数 =
自身._partition_linear_fn
# 行列式线性运行时分区需要输入张量在最后一个维度上的分区
自身.desired_input_layouts:
元组[
安置, ...] = (
片段(-1),)
elif isinstance(模块, nn.
嵌入):
分区函数 =
自身._partition_embedding_fn
行列式嵌入运行时分片需要输入张量复制
自身.
所需输入布局 = (
复制(),)
否则:
提升
不支持的操作异常(
"RowwiseParallel 目前仅支持 nn.Linear 和 nn.Embedding!"
)
返回
分布式模块(
模块,
装置网状结构,
分区函数,
偏函数(
自身._prepare_input_fn,
自身.
输入布局,
自身.
期望输入布局
),
偏函数(
自身._prepare_output_fn,
自身.
输出布局,
自身.
使用本地输出
),
)
[文档]
类
序列并行(
并行风格):
""
SequenceParallel 复制兼容的 ``nn.Module`` 参数并运行分片计算
在序列维度上分片输入。目前支持 ``nn.LayerNorm``、``nn.Dropout`` 等。
`RMSNorm Python 实现 `__
此风格实现了论文中描述的操作。
`减少大型 Transformer 模型中的激活重计算 `__
如果传递给此 ``nn.Module`` 的输入是 :class:`torch.Tensor`,则假定输入已在序列维度上分片,并将其转换为在序列维度上分片的 :class:`DTensor`。如果传递给此 ``nn.Module`` 的输入已经是 :class:`DTensor` 但未在序列维度上分片,则将其重新分配以在序列维度上分片。
如果传递给此 ``nn.Module`` 的输入已经是 :class:`DTensor` 但未在序列维度上分片,则将其重新分配以在序列维度上分片。
如果传递给此 ``nn.Module`` 的输入已经是 :class:`DTensor` 但未在序列维度上分片,则将其重新分配以在序列维度上分片。
如果传递给此 ``nn.Module`` 的输入已经是 :class:`DTensor` 但未在序列维度上分片,则将其重新分配以在序列维度上分片。
``nn.Module``的输出将在序列维度上进行分片。
关键字参数:
sequence_dim (int, 可选):
``nn.Module``的输入张量的序列维度,用于注释输入张量以
成为在序列维度上进行分片的 DTensor,默认:1。
use_local_output (bool, 可选):
是否使用本地 :class:`torch.Tensor` 而不是 :class:`DTensor` 作为模块输出,默认:False。
返回:
代表 ``nn.Module`` 序列并行的 :class:`ParallelStyle` 对象。
示例::
>>> # xdoctest: +SKIP(失败)
>>> from torch.distributed.tensor.parallel import parallelize_module, SequenceParallel
>>> 从 torch.distributed.device_mesh 导入 init_device_mesh
>>> ...
>>> m = Model(...) # m 是一个包含 "norm" nn.LayerNorm 子模块的 nn.Module
>>> tp_mesh = init_device_mesh("cuda", (8,))
...
>>> # 默认情况下,"norm" 的输入将被转换为在序列维度上分片的 DTensor
>>> # "norm" 的输出将返回一个在序列维度上分片的 :class:`DTensor`
...
>>> sharded_mod = parallelize_module(m, tp_mesh, {"norm": SequenceParallel()})
>>> ...
.. note:: SequenceParallel 样式假定如果 nn.Module 中有权重,则使用一个初始化
`nn.LayerNorm` 或 `RMSNorm`,它们默认具有一初始化)。如果您有自定义
初始化这些模块的权重,您需要在并行化前后广播权重
确保它们被复制。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 初始化(
自身, *, sequence_dim:
整型 = 1,
使用本地输出:
布尔类型 =
错误):
超级().
初始化()
自身.sequence_sharding = (
片段(
序列维度),)
自身.
使用本地输出 =
使用本地输出
def _复制模块函数(
自身,
名称:
字符串,
模块: nn.
模块,
装置网状结构:
设备网格
):
为 p_name,
参数
在
模块.
命名参数。():
# 使用固定值从 LayerNorm/RMSNorm 初始化简单复制,允许我们直接使用 from_local
# 我们简单地使用 from_local
复制的参数 =
火炬.nn.
参数(
DTensor.从本地(
参数,
装置网状结构, [
复制
]
运行检查=
错误)
)
模块.
注册参数(p_name,
复制参数)
@staticmethod
def _准备输入函数(
序列分片,
模块,
输入,
装置网状结构):
输入张量 =
输入[0]
如果 isinstance(
输入张量, DTensor):
如果传入的输入 DTensor 在序列维度上未分片,我们需要重新分配它
如果
输入张量.
布局 !=
序列分片:
输入张量 =
输入张量.
重新分配(
安排=
序列分片, async_op=
真实
)
返回
输入张量
elif isinstance(输入张量,
火炬.
张量):
假设传入的输入已经在序列维度上分片,创建 DTensor
返回 DTensor.
从本地(
输入张量,
装置网状结构,
sequence 分片,
运行检查=
假
)
否则:
提升 ValueError(
f"期望输入为{
模块}
需要的是一个 torch.Tensor 或 DTensor,但得到了{
输入张量}"
)
@staticmethod
def 准备输出函数(
使用本地输出,
模块,
输出,
装置网状结构):
返回
输出.
本地化()
如果
使用本地输出
否则
输出
def 应用(
自身,
模块: nn.
模块,
装置网状结构:
设备网状结构) -> nn.
模块:
返回
分布式模块(
模块,
装置网状结构,
自身.
复制模块函数,
偏函数(
自身._prepare_input_fn,
自身.sequence_sharding),
偏函数(
自身._prepare_output_fn,
自身.
使用本地输出),
)
[文档]
类
准备模块输出(
并行样式):
""
配置 nn.Module 的输出,以便在运行时将 nn.Module 的输出张量转换为 DTensors,并根据`output_layouts`进行布局重分配。
和根据`desired_output_layouts`进行布局重分配。
关键字参数:
output_layouts (Union[Placement, Tuple[Placement]]):
nn.Module 的输出张量的 DTensor 布局,用于将输出张量转换为
如果它们是 :class:`torch.Tensor` 的 DTensors,如果某些输出不是 torch.Tensor 或不需要转换为 DTensors,则需要指定 ``None`` 作为占位符。
``None`` 需要被指定为占位符。
desired_output_layouts (Union[Placement, Tuple[Placement]]):
nn.Module 的期望 DTensor 布局输出张量,这用于确保 nn.Module 的输出
拥有所需的 DTensor 布局。
use_local_output (bool, 可选):
是否使用本地 :class:`torch.Tensor` 而不是 :class:`DTensor` 作为模块输出,默认:True。
返回:
一个用于准备 nn.Module 输出分片布局的 ParallelStyle 对象。
示例::
>>> # xdoctest: +SKIP(失败)
>>> from torch.distributed.tensor.parallel import parallelize_module, PrepareModuleOutput
>>> 从 torch.distributed.device_mesh 导入 init_device_mesh
>>> ...
>>> block = TransformerBlock(...) # block 是一个包含 "attn" 注意力子模块的 nn.Module
>>> tp_mesh = init_device_mesh("cuda", (8,))
...
>>> # 根据以下指定的样式,TransformerBlock 的输出将被转换为 Replicated DTensor
>>> # 然后重新分配到 Sharded DTensor
>>> parallelize_module(
>>> block, # 这可以是一个子模块或模块
>>> tp_mesh,
>>> parallelize_plan = PrepareModuleOutput(
>>> output_layouts=Replicate(),
>>> desired_output_layouts=Shard(0)
>>> )
>>> )
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 初始化(
自身,
*,
输出布局:
联盟[
安置,
元组[
安置]],
期望输出布局:
联盟[
安置,
元组[
安置]],
使用本地输出:
布尔类型 = True,
):
自身.
输出布局 = (
(输出布局,)
如果 isinstance(
输出布局,
安置)
否则
输出布局
)
自身.
所需输出布局 = (
(所需输出布局,)
如果 isinstance(
所需输出布局,
安置)
否则
期望输出布局
)
自身.
使用本地输出 =
使用本地输出
断言
长度(
自身.
输出布局) ==
长度(
自身.
预期输出布局), (
"输出布局和预期输出布局长度应相同!"
)
def _prepare_out_fn(自身,
输出,
装置网状结构):
准备好的输出 = []
如果 not isinstance(
输出,
元组):
输出 = (
输出,)
如果
长度(
输出) !=
长度(
自身.
输出布局):
抛出异常 ValueError(
"模块输出和输出布局长度应相同!"
)
为
输出,
输出布局,
所需输出布局
在
压缩(
输出,
自身.
输出布局,
自身.
所需输出布局列表
):
如果 out_layout
是 not
无:
如果 isinstance(
输出, DTensor):
# TODO: 修复编译路径后重新启用检查
# 断言 out.placements[0] == out_layout
dt_out = out
否则:
dt_out = DTensor.从本地(
输出,
装置网状结构, (
输出布局,),
运行检查=
假
)
如果
输出布局 !=
所需输出布局:
dt 输出 = dt_out.
重新分配(
安排=(
预期输出布局,))
准备好的输出.
追加(
dt_out.本地化()
如果
自身.
使用本地输出
否则 dt_out
)
否则:
准备好的输出.
追加(
输出)
如果
长度(
准备好的输出) == 1:
返回
准备好的输出[0]
否则:
返回
元组(
准备输出)
def 应用(
自身,
模块: nn.
模块,
装置网状结构:
设备网状结构) -> nn.
模块:
模块.
注册前向钩子(
lambda _, 输入,
输出:
自身.
准备输出函数名(
输出,
装置网状结构)
) # 类型:忽略[misc, call-arg]
返回
模块