# mypy: 允许未类型化定义
版权所有(C)Meta Platforms,Inc. 及其关联公司
导入
记录日志
导入
数学
导入
线程
来自 functools
导入 reduce
来自 itertools
导入
连接
来自
打字
导入
可选,
类型检查,
联合
导入
火炬
来自 torch.distributed
导入
是否可用
来自 torch.utils._typing_utils
导入
非空
全部 = [
初始化设备网格, "DeviceMesh"]
如果
不是
是否可用():
导入
系统
# 需要在分布式不可用的情况下创建占位符。
# 否则,我们将失败文档测试(```./.ci/pytorch/docs-test.sh```),
# 因为它会尝试导入 ``torch.distributed.device_mesh`` 或
找不到 ``torch.distributed.init_device_mesh``。
类 _DeviceMeshStub:
通过
def _init_device_mesh_stub():
通过
系统模块.
模块["torch.distributed.device_mesh"].
设备网格 =
_设备网格 Stub
# 类型:忽略[已定义]
系统模块.
模块[
"torch.distributed.device_mesh"
].初始化设备网格 = _init_device_mesh_stub
# 类型:忽略[已定义]
否则:
来自 torch._C._distributed_c10d
导入
后端
是
C10d 后端
来自 torch.distributed.distributed_c10d
导入 (
_find_pg_by_ranks_and_tag,
_get_default_group,
获取组标签,
获取后端,
获取进程组排名,
获取排名,
获取世界大小,
初始化进程组,
已初始化,
创建新组,
流程组,
分割组,
)
记录器 =
记录.
获取日志记录器(__name__)
仅在类型检查时导入 numpy 类型提示
如果
类型检查:
尝试:
来自 numpy.typing
导入 ArrayLike
除了
导入错误:
记录器.
警告(
"DeviceMesh 需要安装 numpy >= 1.21 以进行类型检查"
)
类
网格环境(
线程.local):
def __init__(self) -> 无:
self.网格堆栈:
列表[
设备网格] =
输入文本为空,请提供需要翻译的文本
self.子到根映射:
字典[
设备网格,
设备网格] = {}
self.网格维度组选项:
字典[
int, 元组[str,
可选[
C10d 后端.
选项]]
] = {}
self.root_to_flatten_mapping: 字典[
设备网格,
字典[str,
设备网格]] = {}
# 将记录展开网格名称到根网格中的网格维度索引。
self.将名称扁平化到根维度:
字典[
设备网格,
字典[str,
元组[int, ...]]
] = {}
def 获取当前网格(self) ->
设备网格:
如果
长度(self.
网格堆栈) == 0:
抛出
运行时错误(
当前没有活动设备网格!)
返回 self.
网格堆叠[-1]
def 创建子网格(
self,
设备网格:
设备网格,
子网格维度名称:
元组[str, ...
]
子网格维度:
列表[
元组[int, ...]],
) -> DeviceMesh:
从 submesh_dims 中获取子网格维度大小。
例如,如果我们有一个 mesh_shape 为(2, 2, 2)、mesh_dim_names 为("dp", "cp", "tp")的 3D 网格,并且我们想要
切片出 mesh["dp_cp"],那么 submesh_dims = [(0, 1), (2,)],submesh_dim_size = [2 * 2, 2] = [4, 2]。
如果我们想要切片出 mesh["dp", "cp"],那么 submesh_dims = [(0,), (1,)],submesh_dim_size = [2, 2]。
切片维度大小 = [
归约(
lambda x, y: x * 设备网格.
网格.
尺寸(y),
网格维度,
1,
)
for 网格维度
在
子网格维度
]
网格张量 =
设备网格.
网格
slice_dim_idx 可能与 submesh_dims 不同,因为我们可能需要展平一些维度。
slice_dim_idx = 输入文本为空,请提供需要翻译的文本
切片维度组信息 =
输入文本为空,请提供需要翻译的文本
# 记录已展平的维度数量,以便正确获取 slice_dim_idx
# 展平的网格张量。
num_dims_flatten = 0
for 网格维度索引,
网格维度名称
在 zip(
子网格维度,
子网格维度名称):
目前这仅允许切割出一个连续的扁平维度。
TODO:我们需要处理非连续扁平维度的重建。
如果
长度(
网格维度索引) > 1:
我们需要将 start_dim 和 end_dim 向左移动,如果某些维度已经被扁平化。
网格张量 =
网格张量.
展平(
start_dim=网格维度索引[0] -
展平维度数量,
end_dim=网格维度索引[-1] -
展平维度数量,
)
如果某些维度已经展开,则需要相应地调整 slice_dim_idx。
例如,如果 submesh_dims = [(0, 1), (2,), (3, 4)],其中 0-1 展开,3-4 展开,
则最终的 slice_dim_idx 应该是[0, 1, 2]。
slice_dim_idx.append(网格维度索引[0] -
展平维度数量)
展平维度数量 +=
长度(
网格维度索引) - 1
切片维度组信息.append(
self.根到扁平映射[
设备网格
]
网格维度名称
]._维度组信息[0]
)
否则:
切片维度索引.append(
网格维度索引[0] -
展平维度数量)
切片维度组信息.append(
设备网格.
_维度组信息[
网格维度索引[0]]
)
# 网格张量在需要时已被展平。因此,mesh_tensor.ndim <= device_mesh.mesh.ndim 现在成立。
网格维度保留索引 =
列表(
范围(
网格张量.
维数))
for 索引
在
切片维度索引:
mesh_dims_remained_idx.删除(
索引)
# pg_ranks_by_dim 是 [最外层子网格维度的本地进程数,*slice_dim_idx] 的大小
# 这意味着在每个最外层切片网格维度的本地进程上,我们都有一个子网格大小的张量
# 该子网格的 pg 排序。从这些中,我们可以提取包含当前排序的子网格网格张量。
pg_ranks_by_dim = 网格张量.
排列(
*网格维度剩余索引, *
切片维度索引
).重塑(-1, *
切片维度大小)
当前排名 =
设备网格.
获取排名()
for 网格节点
在
按维度划分的进程组:
子网格 =
设备网格(
设备网格.
设备类型,
网格节点,
网格维度名称=
子网格维度名称,
初始化后端=
错误,
)
如果
当前排名
在
网格节点数:
子网格分辨率 =
子网格
着色子网格.
_维度组信息 =
切片维度组信息 # type: ignore[possibly-undefined]
self.子到根映射[
资源子网格] =
设备网格
返回
资源子网格
def 创建展平网格(
self, 设备网状:
设备网状,
网状维度名称:
可选[str] =
无
) -> 设备网状:
根网格 =
网格资源.
获取根网格(
设备网格)
展平根维度 = [
非空(
根网格.
网格维度名称).
索引(
展平的网格维度名称)
for 展平网格维度名称
在
非空(
设备网格.
网格维度名称列表)
]
如果
不是
网格维度名称:
网格维度名称 =
“_”.
连接(
[
非空(
根网格.
网格维度名称
)]
暗淡]
for 维度
在
在根中展平维度
]
)
# 检查展平网格的 mesh_dim_name 是否有效。
self.展平名称到根维度.setdefault(
根网格, {})
无效的维度名称 = chain(
*列表(
非空(
根网格.
网格维度名称)),
*self.将名称扁平化到根维度[
根网格].
键(),
)
如果
网格维度名称
在
无效的维度名称:
抛出
运行时错误(
f"{网格维度名称}
已经为子网格的{
根网格}
.,
f"子网格和平滑网格的 mesh_dim_names 无效。"{
无效的 dim_names}
.
f"请指定另一个有效的 mesh_dim_name。",
)
# 如果已经创建了展平网格,则快速返回。
# TODO:如果我们决定限制一次展平初始化,则应删除此检查,并在展平网格已创建之前抛出错误。
# 此检查,并在展平网格已创建之前抛出错误。
如果 (
root_mesh 在 self.
根到扁平映射
和
网格维度名称
在 self.
根到扁平映射[
根网格]
):
返回 self.
根转扁平映射[
根网格
]
[网格维度名称]
扁平化网格维度大小 =
数学.
生产(
设备网状.
网状.
尺寸())
根维度 =
列表(
范围(
根网格.
网格.
维数))
for 在根中展平维度
在
在根中展平多个维度:
根中剩余维度.
删除(
根中展平维度)
按维度排序的 pg_ranks =
根网格.
网格.
排列(
*根节点中剩余的维度, *
根节点中展平的维度
).重塑(-1,
展平网格维度大小)
当前排名 =
根网格.
获取排名()
for 网格维度
在 pg_ranks_by_dim:
# 需要在扁平化的 pg 不存在于根网格中时初始化后端。
flattened_mesh = 设备网格(
root_mesh.设备类型,
网格节点,
网格维度名称=(
网格维度名称项,),
)
如果
当前进程号
在
网格节点:
展平后的网格 =
展平网格
self.子节点到根节点的映射[
展平的网格] =
根网格 # type: ignore[possibly-undefined]
self.根到展平映射.setdefault(
根网格,
空括号数组
网格维度名称] = (
展平网格 # type: ignore[possibly-undefined]
)
self.展平名称到根维度[
根网格
]
[网格维度名称] =
元组(
在根中展平维度
) # type: ignore[possibly-undefined]
返回
展平后的网格
def 获取根网格(self,
设备网格:
设备网格) ->
设备网格:
如果子到根映射中找不到网格,则它本身就是一个根网格。
根网格不是通过切片创建的。
我们认为根网格的根网格本身就是。
root_mesh = self.根到子映射.
获取(
设备网状,
无)
返回
设备网状
如果
不是
根网状
否则
根网状
def 获取根网格维度(self,
设备网格:
DeviceMesh) ->
可选[int
]
""
返回网格维度在根网格中的索引。
传入的 device_mesh 需要从根网格中切分出来。
或根网格的子网格。
"沉浸式翻译"
根网格 = self.
获取根网格(
设备网格)
儿童网格维度名称 =
设备网格.
网格维度名称
如果
根网格
和
子网格维度名称:
断言
长度(
子网格维度名称) == 1, (
子网格只能是一个一维网格。
)
子网格维度名称 =
子网格维度名称[0]
返回 self.
通过名称获取网格维度(
根网格,
子网格维度名称)
返回
无
@staticmethod
def 每个主机上的设备数量(
设备类型: str) -> int:
返回
获取设备句柄(
设备类型).
设备数量()
@staticmethod
def 主机数量(
设备类型: str) -> int:
# ProcessGroup 无法告诉我们这个信息,所以我们必须推断它,假设
# 目前假设为同构硬件
返回
获取世界大小() //
_网格环境.
每个主机上的设备数量(
设备类型)
def 通过名称获取网格维度(
self, 设备网状:
设备网状,
网格维度名称:
字符串
) -> int:
如果 (
设备网状.
网格维度名称
是
无
或者
长度(
设备网格.
网格维度名称) == 0
):
抛出
键错误(
未找到 `网格维度名称`。,
)
如果
网格维度名称
不是
在
设备网格.
网格维度名称列表:
抛出
键错误(
f网格维度 '{
网格维度名称}
不存在。,
f"可用的网格维度有:mesh_dim_names="{
设备网格.
网格维度名称}",
)
返回
非空(
设备网格.
网格维度名称.
索引(
网格维度名称))
def 设置网格维度组选项(
self,
暗淡: int,
后端: str,
pg 选项:
可选[
C10d 后端.
选项] =
无,
) -> 无:
self.网格维度组选项[
暗淡] = (
后端,
PG 选项)
def _获取切片网格维度(
self, 网状设备,
网格维度名称
) -> 列表[
元组[int, ...
]]
""
验证网格维度名称是否适用于给定网状设备的切片。
如果有效,则返回切片网格在设备网格中的维度索引。
"沉浸式翻译"
如果
设备网状 != self.
获取根网状(
设备网状):
抛出
运行时错误(
无法从子网状创建子网状。)
# 切片网格维度名称应包含 device_mesh 的 mesh_dim_names
# 或者其展开网格的 mesh_dim_names。
self.flatten_name_to_root_dims.setdefault(device_mesh, {})
展平名称到根维度 = self.
展平名称到根维度[
设备网格]
有效的网格维度名称 = [
*设备网格.
网格维度名称,
*展平名称到根维度,
]
如果
不是
所有(
网格维度名称
在
有效的网格维度名称
for 网格维度名称
在
网格维度名称列表
):
抛出
键错误(
f"无效的 mesh_dim_names"{mesh_dim_names}
指定的 mesh_dim_names 无效。
f"有效的 mesh_dim_names 包括"{valid_mesh_dim_names}
。
)
# 验证切片网格维度索引的顺序。
# 需要按升序排列。
curr_idx = -1
切片网格维度 =
输入文本为空,请提供需要翻译的文本
for 网格维度名称
在
网格维度名称列表:
如果
网格维度名称
在
展平名称到根维度:
网格索引 =
展平名称到根维度[
网格维度名称]
这还不允许使用 flatten dim 进行非连续切片。next_idx
一旦我们支持使用 flatten dim 进行非连续切片,should be mesh_indices[0]。
next_idx = mesh_indices[-1]
切片网格维度.append(
网格索引)
否则:
下一个索引 =
设备网格.
网格维度名称.
索引(
网格维度名)
切片网格维度.append((
下一个索引,))
如果 next_idx <= curr_idx:
抛出
键错误(
f"无效的 mesh_dim_names"{mesh_dim_names}
指定。,
f找到切片网格维度索引:{slice_mesh_dims}
。,
"网格维度索引应按升序排列。",
)
当前索引 =
下一个索引
返回
切片网格维度
def 获取所有子网格(
self, 设备网格:
设备网格,
网格维度名称:
字符串
) -> 列表[
设备网格
]
""
返回设备网格给定网格维度的所有子网格。
"沉浸式翻译"
网格维 = self.
通过名称获取网格维(
设备网状,
网状维度名称)
pg_ranks_by_dim = 设备网格.
网格.
交换维度(-1,
网格维度).
重塑(
-1, 设备网格.
网格.
尺寸(
网格维度)
)
当前排名 =
设备网格.
获取排名()
着色子网格 =
输入文本为空,请提供需要翻译的文本
for 一维网格
在
按维度排序的进程组秩:
子网格 =
设备网格(
设备网格.
设备类型,
网格_1D,
网格维度名称=(
网格维度名称,),
初始化后端=
错误,
)
子网格.
_维度组信息 = (
[设备网格.
_维度组信息[
网格维度]]
如果
当前排名
在
网格_1D
否则
输入文本为空,请提供需要翻译的文本
)
子网格分辨率.append(
子网格)
返回
累积子网格
网格资源:
网格环境 =
网格环境()
def 获取设备句柄(
设备类型:
字符串 =
cuda):
""
获取对应于 device_type 为 cuda 或 cuda-like 设备的模块。
例如,当 device_type 为 cuda 时,返回模块`torch.cuda`。
当没有对应 device_type 的模块时返回 None,否则
返回相应的模块。
"沉浸式翻译"
返回 getattr(torch,
设备类型,
无)
[文档]
类
设备网格:
""
DeviceMesh 表示设备网格,其中设备的布局可以表示为一个 n 维数组,且 n 维数组中的每个值都是默认进程组 rank 的全局 ID。
n 维数组中的每个值代表默认进程组 rank 的全局 ID。
每个 n 维数组值代表默认进程组 rank 的全局 ID。
DeviceMesh 可用于描述集群中设备的布局,
并作为集群内设备列表之间通信的代理。
DeviceMesh 可以用作上下文管理器。
.. 注意::
DeviceMesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序
在集群的所有进程/节点上运行。因此,用户需要确保
`mesh`数组(描述设备布局)在所有节点上应保持一致。
不一致的`mesh`将导致无声挂起。
参数:
device_type (str): 网格的设备类型。目前支持:"cpu","cuda/cuda-like"。
mesh (ndarray):一个多维数组或描述布局的整数张量
设备,其中 ID 是默认进程组的全局 ID。
返回:
DeviceMesh:表示设备布局的:class:`DeviceMesh`对象。
以下程序以 SPMD 方式在每个进程/排名上运行。在这个例子中,我们有 2
个主机,每个主机有 4 个 GPU。
网格的第一维度的缩减将跨维度减少
列(0,4),..以及(3,7),对第二维度的缩减
网格在行(0,1,2,3)和(4,5,6,7)之间减少。
示例::
>>> # xdoctest: +SKIP("无等级")
>>> 从 torch.distributed.device_mesh 导入 DeviceMesh
...
>>> # 初始化设备网格为 (2, 4) 以表示拓扑
>>> # 跨主机(维度 0),以及主机内(维度 1)。
>>> 网格 = DeviceMesh(device_type="cuda", 网格=[[0, 1, 2, 3],[4, 5, 6, 7]])
"沉浸式翻译"
设备类型:
字符串
网格: torch.
张量
网格维度名称:
可选[
元组[str, ...]]
def __init__(
self,
设备类型: str,
网格:
联合[torch.
张量,
"数组类似"
]
*,
网格维度名称:
可选[
元组[str, ...]] =
无,
_初始化后端:
布尔值 =
是,
) -> 无:
self.设备类型 =
设备类型
如果 isinstance(
网格, torch.
张量)
和
网格.
设备.
类型 != "cpu":
抛出
值错误(f
"mesh 必须是一个 CPU 张量,得到"{
网格}")
self.网格 = (
网格.detach().
到(
数据类型=torch.int)
如果 isinstance(
网格, torch.
张量)
否则 torch.
张量(
网格,
设备="cpu",
数据类型=torch.int)
)
self.网格维度名称 =
元组(
网格维度名称)
如果
网格维度名称
否则
无
# 私有字段,用于预生成 DeviceMesh 的哈希
self._flatten_mesh_list = 元组(self.
网格.
展平().
转列表())
self._thread_id = 无
跳过如果 xla 设备或初始化后端为 False 的过程组初始化
TODO(yeounoh)实现 DeviceMesh 后端并注册 XLA 后端。
如果
设备类型 != "xla":
尝试创建默认(世界)进程组,即使它尚未初始化
已经。世界进程组用于每个设备网格的身份(排名)
的进程(我们需要知道当前的全局排名是否在网格中)。
如果
初始化后端:
self.获取或创建默认组()
self.初始化进程组()
如果
已初始化()
和
获取后端() ==
线程化:
self._线程 ID =
线程.
获取标识符()
# 计算当前全局秩在网格上的坐标
排名坐标 = (self.
网格 ==
获取排名()).
非零()
断言
排名坐标.
尺寸(0)
在 (0, 1)
self._coordinate_on_dim: 可选[
列表[int]] = (
rank_coords[0].转列表()
如果 rank_coords.
尺寸(0) > 0
否则
无
)
def _get_or_create_default_group(self):
默认初始化 =
已初始化()
如果
不是
默认初始化:
初始化进程组()
世界大小 =
获取世界大小()
如果 self.
网格.
元素数量() >
世界大小:
抛出
运行时错误(
f"网格不应大于默认世界大小"{
世界大小}
,但找到{self.
网格.
元素数量()}
排名!”
)
设备句柄 =
_获取设备句柄(self.
设备类型)
# TODO:如果用户想要传递 pg_options,提供一种方法来做
如果
不是
默认已初始化
和
设备句柄:
# 根据每个主机中可用的 GPU 设备数量自动设置当前 CUDA/类似 CUDA 的设备
# 注意:此设备选择仅适用于同构硬件。
每个主机上的设备数量 =
设备句柄.
设备数量()
如果 (
世界大小 >
每个主机上的设备数量
和
世界大小 %
每个主机上的设备数量 != 0
):
抛出
运行时错误(
f"DeviceMesh 仅支持同构硬件,但发现"
f"{世界大小}
排名和{
每个主机上的设备数} {self.
设备类型}
设备!
)
设备句柄.
设置设备(
获取排名() %
每个主机上的设备数量)
返回 _get_default_group()
def _init_process_groups(self):
# 每个网格维度关联的标签/排名/组名,每个网格维度应该有一个子组对应每个排名
# 网格维度应该为每个排名有一个子组
#
# TODO(yifu): 在完全迁移到本地后删除标签和排名
# 功能性集体。详细信息请见:
# https://github.com/pytorch/pytorch/issues/93173#issuecomment-1907095208
dim_group_infos: 列表[
元组[str,
列表[int
] str]] =
输入文本为空,请提供需要翻译的文本
默认组 =
_获取默认组()
如果 self.
网格.
维数 == 1
和 self.
网格.
元素数量() ==
获取世界大小():
仅当默认页与 `self.device_type` 兼容时,才将默认页追加到第一维组中。
否则,创建新页。
排名 =
列表(
范围(
获取世界大小()))
线性组 = (
新组(
后端="cpu:gloo,cuda:nccl",
排名=
排名,
群组描述=
默认网格,
)
如果 torch.cuda.
是否可用()
和
获取后端(
默认组) ==
gluu
否则
默认组
)
维度组信息.append(
(
_获取组标签(
维度组),
排名,
维度组.
群组名称,
)
)
否则:
根据网格参数创建子页面
for 维度
在
范围(self.
网格.
维数):
将当前维度交换到最后维度
然后重塑以展平其他维度
pg_ranks_by_dim = self.网格.
交换维度(-1,
暗淡).
重塑(
-1, self.网格.
尺寸(
暗淡)
)
通过 _MeshEnv.set_dim_group_options() 指定的维度组选项。
如果未指定组选项,则从父组继承。
如果
维度
在
网格资源.
网格维度组选项:
(
后端,
索引选项,
) = 网格资源.
网格维度组选项[
暗淡]
否则:
后端,
pg 选项 =
无,
无
如果我们有一个具有 mesh_dim_names("dp", "tp")的 2D 网格,子组的描述将是`mesh_dim_dp`和`mesh_name_tp`。
如果网格没有 mesh_dim_names,那么子组的描述将是
# 如果网格没有 mesh_dim_names,那么子组的描述将是
# 子组将是 `mesh_dim_0` 和 `mesh_dim_1`。
群组描述 = (
f"网格_"{self.
网格维度名称列表[
暗淡]}"
如果 self.
网格维度名称
否则 f
"网格维度_"{
暗淡}"
)
如果存在 bound_device_id,则表示 nccl 通信器已被积极初始化
因此我们可以使用 `split_group` 通过 `ncclCommSplit` 创建子组
在这种情况下,我们只需要对子组创建进行一次 API 调用(`split_group`)
对于每个网格维度。在一个 2 * 4 的网格中,我们只需要对每个 rank 进行 2 次 API 调用以创建
所有子组。
否则,我们需要对子组创建进行多次 API 调用(`new_group`)。
# 网格的每个维度子组的数量等于 API 调用的数量。在一个 2 * 4 的网格中
# 我们需要为每个进程调用 2 + 4 = 6 次 API 来创建所有子组。
dim_group = 无
has_split_group = 假
如果 (
绑定设备 ID := getattr(
默认组,
"绑定设备 ID",
无
)
) 是
不是
无
和 torch.cuda.
是否可用():
调光组 =
分组(
父页面=
默认组,
数据库选项=
数据库选项,
分割等级=pg_ranks_by_dim.
转列表(),
group_desc=group_desc,
)
已分割组 =
真实
如果子组已经通过 `split_group` 创建,我们只需遍历 `pg_ranks_by_dim`
并将 `(group_tag, subgroup_ranks, 和 group_name)` 元组添加到 `dim_group_infos` 列表中,当
当前排名在子组中时。
否则,我们使用 `new_group` 而不是 `split_group` 通过遍历 `pg_ranks_by_dim` 创建子组。
随时必要地向 `dim_group_infos` 列表追加信息。
for dim_mesh 在 pg_ranks_by_dim:
subgroup_ranks = dim_mesh.转列表()
我们暂时恢复重用子组,因为它破坏了两个内部测试。
暂时回滚以解决测试超时问题,并定位原因。
# TODO: 添加两个测试以覆盖内部测试场景,如果存在则重新启用重用子组。
如果
绑定设备 ID
是
无
或者
不是
是否有分割组:
维度组 =
新组(
等级=
子组等级,
后端=
后端,
PG 选项=
pg 选项,
群组描述=
群组描述,
)
# 仅当当前在子组中的排名时添加到 dim_groups
如果 self.
获取排名()
在
子群等级:
如果
长度(
维度组信息) >
暗淡:
抛出
运行时错误(
f"每个设备网格维度应只获得一个进程组,但实际上却得到了{self.
获取排名()} "
f"在"{
子群排名}
!
)
维度群信息.append(
(
获取组标签(
非空(dim_group)),
子组等级,
dim_group.群组名称,
)
)
self._dim_group_infos = dim_group_infos
def __进入__(self) ->
设备网格:
# 将此网格设置为网格环境中的当前网格
_mesh_resources.网格堆栈.append(self)
返回 self
# pyre-fixme[2]: 参数必须进行注解。
def __退出__(self,
异常类型, exc_value,
异常跟踪回溯) ->
无:
# 从网格环境中弹出此网格
网格资源.
网格堆栈.
流行()
def __repr__(self) -> str:
设备网格表示 = (
f"DeviceMesh('{self.设备类型}', {self.
网格.
转列表()})"
如果
不是 self.
网格维度名称
否则 f
"设备网格('{self.
设备类型}', {self.
网格.
转列表()}
, 网格维度名称={self.
网格维度名称列表})"
)
返回
设备网格表示
def __哈希__(self):
懒加载计算哈希
self._哈希 = getattr(self,
"_哈希",
无)
如果
不是 self.
_哈希:
self.哈希 =
哈希(
(
self._flatten_mesh_list,
self.网格.
形状,
self.设备类型,
self.网格维度名称,
self._线程 ID,
)
)
返回 self._hash
def __等于__(self,
其他:
对象) -> bool:
如果
不是 isinstance(
其他,
设备网格):
返回
假
如果 id(self) == id(
其他):
返回
真实
否则:
返回 (
self._flatten_mesh_list == 其他._flatten_mesh_list
和 self.
网格.
形状 ==
其他.
网格.
形状
和 self.
设备类型 ==
其他.
设备类型
和 self.mesh_dim_names ==
其他.mesh_dim_names
和 self._thread_id ==
其他._thread_id
)
def __getitem__(
self, 网格维度名称列表:
联合[str,
元组[str, ...]]
) -> 设备网格:
""
根据给定的 mesh_dim_names 对当前 DeviceMesh 进行切片,创建子网格。
创建的子网格包含由`mesh_dim_names`指示的维度和通信器。
``mesh_dim_names``
参数:
mesh_dim_names (Union[str, Tuple[str]]): 创建子网格的 DeviceMesh 的网格维度名称或名称元组。
为创建子网格而指定的 DeviceMesh 的网格维度。
返回:
一个 :class:`DeviceMesh` 对象。
以下程序以 SPMD 方式在每个进程/排名上运行,世界大小为 8。
在第一个例子中:
在 rank 0、1、2、3 上调用 mesh_2d["tp"]返回 DeviceMesh 的 1D 子网格:([0, 1, 2, 3])。
在 rank 4、5、6、7 上调用 mesh_2d["tp"]返回 DeviceMesh 的 1D 子网格:([4, 5, 6, 7])。
在 rank 0、4 上调用 mesh_2d["dp"]返回 DeviceMesh 的 1D 子网格:([0, 4])。
在 rank 1,5 上调用 mesh_2d["dp"]返回一个 DeviceMesh:([1, 5])的 1D 子网格。
在 rank 2,6 上调用 mesh_2d["dp"]返回一个 DeviceMesh:([2, 6])的 1D 子网格。
在 rank 3,7 上调用 mesh_2d["dp"]返回一个 DeviceMesh:([3, 7])的 1D 子网格。
在第二个例子中:
在 rank 0, 1, 4, 5 上调用 mesh_3d["dp", "cp"]返回 DeviceMesh 的 2D 子网格:([[0, 1], [4, 5]])。
在 rank 2, 3, 6, 7 上调用 mesh_3d["dp", "cp"]返回 DeviceMesh 的 2D 子网格:([[2, 3], [6, 7]])。
在 rank 0, 1, 4, 5 上调用 mesh_3d["cp", "dp"]返回 DeviceMesh 的 2D 子网格:([[0, 4], [1, 5]])。
在 rank 2, 3, 6, 7 上调用 mesh_3d["cp", "dp"]返回 DeviceMesh 的 2D 子网格:([[2, 6], [3, 7]])。
示例::
>>> # xdoctest: +SKIP("no rank")
>>> from torch.distributed.device_mesh import DeviceMesh
...
>>> # 初始化一个表示拓扑的 2D 设备网格(2, 4)
>>> # 跨主机(维度 0),以及主机内(维度 1)。
>>> mesh_2d = init_device_mesh(device_type="cuda", (2,4), mesh_dim_names=("dp", "tp"))
>>> tp_mesh = mesh_2d["tp"]
>>> dp_mesh = mesh_2d["dp"]
...
>>> # 初始化一个 3D 网格。
>>> mesh_3d = init_device_mesh(device_type="cuda", (2,2,2), mesh_dim_names=("dp", "pp", "cp"))
>>> # 提供的 mesh_dim_names 的顺序决定了子网格中维度的顺序。
>>> dp_cp_mesh = mesh_3d["dp", "cp"]
>>> cp_dp_mesh = mesh_3d["cp", "dp"]
"沉浸式翻译"
如果
不是 self.
网格维度名称:
抛出
运行时错误(
"无法在没有 mesh_dim_names 的情况下切片 DeviceMesh!")
网格维度名称 = (
(网格维度名称,)
如果 isinstance(
网格维度名称, str)
否则
网格维度名称
)
如果
网格维度名称 == self.
网格维度名称:
返回 self
否则:
切片网格维度 =
_网格资源.
_获取切片网格维度(
self, 网格维度名称
)
当使用 FakeTensorMode 进行模型跟踪时,`create_sub_mesh()`将
# 失败,因为它将需要一个真正的张量来操作。
# `unset_fake_temporarily()` 将允许我们实例化张量
# 在 `_mesh_resources` 中,这不应该影响建模。
#
# 注意,这应该与 torch.compile() 正交。但无论
# 我们可以编译设备网格的 `切片`(无图断开)尚未验证
# 但需要后续跟进,
# TODO:编译器 + 设备网格切片。
与 torch.
子类.fake_tensor.
临时取消 fake 设置():
子网格 =
网格资源.
创建子网格(
self, 网格维度名称列表,
切割网格维度
)
返回
子网格
[文档] def get_group(self, mesh_dim: Optional[Union[int, str]] = None) -> ProcessGroup:
```python
# 假设输入文本为:
input_text = ' """'
# 翻译函数(此处为示例,实际翻译功能需要调用真实的翻译 API 或库)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 或库进行翻译
# 由于这里没有实际的翻译功能,以下为模拟输出
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
返回由 mesh_dim 指定的单个 ProcessGroup,或者,如果未指定 mesh_dim,则返回
设备网格是 1 维的,返回网格中唯一的 ProcessGroup。
参数:
网格维度(str/int,可选):可以是网格维度的名称或索引
网格维度的索引。默认为 None。
返回:
一个 :class:`ProcessGroup` 对象。
"""
如果没有 hasattr(self, "_dim_group_infos"):
抛出 RuntimeError("DeviceMesh 处理组未初始化!")
如果 self.mesh.ndim > 1 并且 mesh_dim 是 None:
raise RuntimeError(
f"找到 DeviceMesh 具有{self.mesh.ndim}个维度",
当 device_mesh.ndim 大于 1 时,必须指定可选参数`mesh_dim`。
如果您想获取 DeviceMesh 中所有 ProcessGroups 的列表,
"请使用 `get_all_groups()` 代替。"
)
# 快速返回,如果当前 device_mesh 是 1D 网格。
if self.mesh.ndim == 1 and mesh_dim is None:
return not_none(
_find_pg_by_ranks_and_tag(*self._dim_group_infos[0][:2]) # type: ignore[index]
)
root_mesh = _mesh_resources.get_root_mesh(self)
root_to_flatten_mapping = _mesh_resources.root_to_flatten_mapping.get(
root_mesh, None
)
if root_to_flatten_mapping and mesh_dim in root_to_flatten_mapping.keys():
dim_group_infos = root_to_flatten_mapping[
mesh_dim # type: ignore[index]
]._dim_group_infos[0][:2]
return not_none(_find_pg_by_ranks_and_tag(*dim_group_infos))
else:
mesh_dim = (
_mesh_resources.get_mesh_dim_by_name(self, mesh_dim)
if isinstance(mesh_dim, str)
else 网格维度
)
return not_none(
_find_pg_by_ranks_and_tag(*self._dim_group_infos[网格维度][:2]) # type: ignore[index]
)
[文档] def get_all_groups(self) -> list[ProcessGroup]:
"""
返回所有网格尺寸的 ProcessGroup 列表。
返回:
一个 :class:`ProcessGroup` 对象的列表。
"""
return [self.get_group(i) for i in range(self.mesh.ndim)]
[文档] @staticmethod
def 来自群组(
组:
联合[
流程组,
列表[
流程组]],
设备类型: str,
网格:
可选[
联合[torch.
张量,
类数组]] =
无,
*,
网格维度名称:
可选[
元组[str, ...]] =
无,
) -> 设备网格:
""
使用 ``device_type`` 从现有的 :class:`ProcessGroup` 或一组现有的 :class:`ProcessGroup` 构建一个 :class:`DeviceMesh`。
构建出的设备网格维度数等于传入的组数。例如,如果传入一个进程组,则维度数为 1。
构建出的设备网格的维度数等于传入的组数。例如,如果传入一个进程组,则维度数为 1。
例如,如果传入一个进程组,则维度数为 1。
结果的 DeviceMesh 是一个一维网格。如果传入一个包含 2 个进程组的列表,
结果的 DeviceMesh 是一个二维网格。
如果传入多个组,则必须提供“mesh”和“mesh_dim_names”参数。
传入的进程组的顺序决定了拓扑结构。
网格。例如,第一个进程组将是 DeviceMesh 的 0 维。
传入的`mesh`张量必须具有与传入的进程组数量相同的维度,并且`mesh`张量中维度的顺序必须与传入的进程组中维度的顺序匹配。
维度顺序必须与传入的进程组中维度的顺序匹配。
的顺序匹配。
参数:
group(ProcessGroup 或 list[ProcessGroup]):现有的 ProcessGroup
或现有 ProcessGroup 的列表。
device_type(str):网格的设备类型。目前支持:"cpu","cuda/cuda-like"。
通过 GPU 索引传递的设备类型,例如 "cuda:0"。
不允许。
mesh (torch.Tensor 或 ArrayLike,可选):一个多维数组或一个
描述设备布局的整数张量,其中 ID 是全局 ID
默认进程组的 ID。默认为 None。
mesh_dim_names (tuple[str], optional): 一个用于分配给描述设备布局的多维数组每个维度的网格维度名称的元组(可选)。
to each dimension of the multi-dimensional array describing the layout of devices.
其长度必须与 `mesh_shape` 的长度匹配。`mesh_dim_names` 中的每个字符串都必须唯一。默认值为 None。
默认为 None。
返回:
DeviceMesh:表示设备布局的:class:`DeviceMesh`对象。
"沉浸式翻译"
# 1D 场景
如果 isinstance(
组,
流程组):
群组排名 =
获取进程组排名(
组)
如果 (
isinstance(网格, torch.
张量)
和
网格.
转列表() !=
组排名
) 或者 (
网格
是
不是
无
和
不是 isinstance(
网格, torch.
张量)
和
网格 !=
群组等级
):
抛出
值错误(
f"无效的网格"{str(
网格)}
对于具有等级的 ProcessGroup{
群组等级}"
)
网格 = torch.
张量(
群组等级,
设备="cpu",
数据类型=torch.int)
设备网状 =
设备网格(
设备类型,
网格,
网格维度名称=
网格维度名称,
_初始化后端=
错误,
)
设备网格.
_维度群组信息 = [
(_获取群组标签(
组),
群组等级,
组.
群组名称)
]
返回
设备网状
# nD 场景
组 =
列表(
组)
如果
长度(
群组) == 0:
抛出
值错误(
"至少需要一个流程组传入")
如果
网格
是
无:
抛出
值错误(
"必须传递网格,如果传递多个 ProcessGroups")
如果
网格维度名称
是
无:
抛出
值错误(
"必须传递 mesh_dim_names,如果传递多个 ProcessGroups"
)
网格 = (
网格.detach().
到(
数据类型=torch.int,
设备="cpu")
如果 isinstance(
网格, torch.
张量)
否则 torch.
张量(
网格,
设备="cpu",
数据类型=torch.int)
)
如果
网格.
维数 !=
长度(
群组):
抛出
值错误(
"期望网格的维度与 ProcessGroups 的数量相等,但得到了 "
f"网格"{
网格.
转列表()}
和{
长度(
群组)}
ProcessGroups
)
设备网状 =
设备网格(
设备类型,
网格,
网格维度名称=
网格维度名称,
_初始化后端=
假
)
设备网格._dim_group_infos = [
(
获取组标签(
组),
获取进程组排名(
组),
组.
群组名称,
)
for 组
在
组们
]
返回
设备网格
def 尺寸(self,
网格维度:
可选[int] =
无) -> int:
返回 self.
网格.
元素数量()
如果
网格维度
是
无
否则 self.
网格.
尺寸(
网格维度)
@property
def 维数(self) -> int:
返回 self.
网格.
维数
@property
def 形状(self) ->
元组[int, ...
]
返回
元组(self.
网格.
形状)
[文档] def get_rank(self) -> int:
"""
返回当前的全局排名。
"""
返回获取排名()
[文档] def get_local_rank(self, mesh_dim: Optional[Union[int, str]] = None) -> int:
"""
返回给定 mesh_dim 的 DeviceMesh 的本地排名。
Args:
mesh_dim (str/int, 可选): 它可以是网格维度的名称或索引
of the mesh dimension. Default is None.
Returns:
一个整数表示局部排名。
以下程序在每个进程/排名上以 SPMD 方式运行。在这个例子中,我们有 2
每个主机有 4 个 GPU。
在 rank 0, 1, 2, 3 上调用 mesh_2d.get_local_rank(mesh_dim=0)将返回 0。
在 rank 4, 5, 6, 7 上调用 mesh_2d.get_local_rank(mesh_dim=0)将返回 1。
在 rank 0, 4 上调用 mesh_2d.get_local_rank(mesh_dim=1)将返回 0。
在 rank 1, 5 上调用 mesh_2d.get_local_rank(mesh_dim=1)将返回 1。
在 rank 2, 6 上调用 mesh_2d.get_local_rank(mesh_dim=1)将返回 2。
在 rank 3, 7 上调用 mesh_2d.get_local_rank(mesh_dim=1)将返回 3。
示例::
>>> # xdoctest: +SKIP("no rank")
>>> 从 torch.distributed.device_mesh 导入 DeviceMesh
>>>
# 初始化设备网格为 (2, 4) 以表示拓扑
# 跨主机(维度 0),和主机内(维度 1)。
# mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
"""
如果 self.ndim 大于 1 且 mesh_dim 为 None:
raise RuntimeError(
f"发现 DeviceMesh 有 {self.mesh.ndim} 维",
"当 device_mesh.ndim 大于 1 时,必须指定可选参数`mesh_dim`。",
)
elif mesh_dim 为 None:
mesh_dim = 0
mesh_dim_group = not_none(self.get_group(mesh_dim))
assert isinstance(mesh_dim_group, ProcessGroup), (
"我们在调用 `get_rank` 之前期望得到 ProcessGroup!"
)
返回获取网格维度组的排名,确保非空
[文档] 定义一个方法获取坐标,返回类型为 Optional[list[int]]
"""
返回相对于所有排名的相对索引
网格的维度。如果此等级不是网格的一部分,则返回 None。
"""
如果 self._coordinate_on_dim 存在,则返回 self._coordinate_on_dim,否则返回 None。
def _flatten(self, 网格维度名称:
可选[str] =
无) ->
设备网格:
""
返回一个通过展平当前 DeviceMesh 得到的 1D DeviceMesh。
如果未提供 mesh_dim_name,则默认为将 mesh_dim_names 连接成的字符串
给定由“_”分隔的子网格,每个网格_dim_name。例如,如果我们有一个 3D 网格
设备网格([[[0, 1], [2, 3]], [[4, 5], [6, 7]]], 网格维度名称=("dp", "cp", "tp")),调用
mesh_3d["dp", "cp"]._flatten() 将创建一个 1D 子网格 DeviceMesh([0, 1, 2, 3], mesh_dim_names=("dp_cp",))
在 rank 0, 1, 2, 3 上,以及 rank 4, 5, 6, 7 上的 1D 子网格 DeviceMesh([4, 5, 6, 7], mesh_dim_names=("dp_cp",))。
创建扁平化维度之后,要访问 mesh_3d 中的扁平化维度,可以使用现有的切片方法通过调用 mesh_3d["dp_cp"] 来获取扁平化网格。
可以使用现有的切片方法通过调用 mesh_3d["dp_cp"] 来获取扁平化网格。
"沉浸式翻译"
如果
不是 self.
网格维度名称:
抛出
运行时错误(
"无法对没有 mesh_dim_names 的 DeviceMesh 进行展平!"
)
返回
_网格资源.
创建展平网格(self,
网格维度名称)
[文档] def
初始化设备网格(
设备类型: str,
网格形状:
元组[int, ...
]
*,
网格维度名称:
可选[
元组[str, ...]] =
无,
) -> 设备网格:
""
根据参数 `device_type`、`mesh_shape` 和 `mesh_dim_names` 初始化一个 `DeviceMesh`。
这将创建一个具有 n 维数组布局的 DeviceMesh,其中 `n` 是 `mesh_shape` 的长度。
如果提供了 `mesh_dim_names`,则每个维度将标记为 `mesh_dim_names[i]`。
.. 注意::
`init_device_mesh`遵循 SPMD 编程模型,意味着相同的 PyTorch Python 程序在集群中的所有进程/排名上运行。确保`mesh_shape`(描述设备布局的 nD 数组的维度)在所有排名中相同。不一致的`mesh_shape`可能导致挂起。
如果没有找到进程组,init_device_mesh 将初始化分布式进程组/组。
请确保`mesh_shape`(描述设备布局的 nD 数组的维度)在所有排名中相同。不一致的`mesh_shape`可能导致挂起。
.. 注意::
如果没有找到进程组,init_device_mesh 将初始化分布式进程组/组。
在幕后分布式通信所必需的。
参数:
device_type (str): 网状结构的设备类型。目前支持:"cpu","cuda/cuda-like"。
不允许传递带有 GPU 索引的设备类型,例如 "cuda:0"。
mesh_shape (Tuple[int]): 定义多维数组维度的元组。
描述设备布局。
mesh_dim_names(可选的字符串元组[str]):为每个维度分配的网格维度名称元组。
描述设备布局的多维数组的长度必须与 mesh_shape 的长度匹配。
`mesh_dim_names`中的每个字符串都必须是唯一的。
返回:
DeviceMesh:表示设备布局的:class:`DeviceMesh`对象。
示例::
>>> # xdoctest: +SKIP("no rank")
>>> 从 torch.distributed.device_mesh 导入 init_device_mesh
...
>>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,))
>>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))
"沉浸式翻译"
如果
网格维度名称
是
不是
无:
如果
长度(
设置(
网格维度名称)) !=
长度(
网格维度名称):
抛出
运行时错误(
"每个 mesh_dim_name 必须唯一。",
f"在 mesh_dim_names 中找到重复的 mesh_dim_name"{
网格维度名称}",
)
如果
长度(
网格形状) !=
长度(
网格维度名称):
抛出
运行时错误(
"mesh_shape 和 mesh_dim_names 应该具有相同的长度!",
f"找到 len(mesh_dim_names):"{
长度(
网格维度名称)}
和 len(mesh_shape):{
长度(
网格形状)}.",
)
# 假设有效的设备类型都是字母
如果
设备类型
和
不是
设备类型.isalpha():
抛出
运行时错误(
f"不支持索引的设备类型,但得到了"{
设备类型}
。",
"如果你维护了一个 'torch.device' 对象,建议传入 'device.type'。",
)
# 无论外部设备类型被设置为(例如元数据),
# 总是在 CPU 上初始化网格的张量,
与 torch.
设备("cpu"):
网格 = torch.arange(
数学.
生产(
网格形状),
数据类型=torch.int).
视图(
网格形状)
设备网状 =
设备网格(
设备类型=
设备类型,
网格=
网格,
网格维度名称=
网格维度名称,
)
返回
设备网格