# mypy: 允许未类型化定义
分布式集体通信(c10d)
导入 collections.abc
导入 contextlib
导入 ctypes
导入 hashlib
导入
输入/输出
导入 itertools
导入
记录日志
导入
操作系统
导入
酸菜
导入
系统
导入
时间
导入
警告
来自
集合
导入
命名元组
来自 datetime
导入
时间差
来自
打字
导入
任何,
可调用,
可选,
类型检查,
联合
来自 typing_extensions
导入
已弃用
导入
火炬
来自 torch._C
导入
_DistStore 错误_
是
DistStore 错误
来自 torch._C._distributed_c10d
导入 (
_分布式后端选项,
_注册进程组,
_resolve_process_group,
_注销所有进程组,
_注销进程组,
Allgather 选项,
Allreduce 合并选项,
Allreduce 选项,
AllToAll 选项,
障碍选项,
广播选项,
调试等级,
收集选项,
获取调试级别,
前缀存储,
流程组,
ReduceOp,
减少选项,
减少散布选项,
散点选项,
存储,
工作,
)
来自 torch._utils_internal
导入
从 justknobs 设置 pytorch 分布式环境
from torch.monitor 导入
_等待计数器
from torch.overrides 导入 handle_torch_function,
有 torch 功能
from torch.utils._typing_utils 导入 not_none
from .c10d_logger 导入 _exception_logger,
时间记录器
from .常量
导入
默认 PG NCCL 超时,
默认 PG 超时
from .会合点
导入
注册会合处理程序,
约会 # noqa: F401
全部 = [
后端,
后端配置,
群成员,
"P2P 广播",
"全局聚合",
all_gather_coalesced,
"all_gather_object",
全局归约,
"all_reduce_coalesced",
全对全,
全向单播,
障碍,
batch_isend_irecv,
广播,
发送对象列表,
"接收对象列表",
"广播对象列表",
"销毁进程组",
"聚集",
收集对象,
"获取后端配置",
"获取后端",
"获取设备默认后端",
"获取排名",
"获取世界大小",
"获取进程组数量",
"分组",
"初始化进程组",
"irecv",
"gloo 是否可用",
"已初始化",
"mpi 是否可用",
"是否支持后端",
"是否支持 NCCL",
"是否已启动 torchelastic",
"是否支持 UCC",
"is_xccl_available",
"isend",
"monitored_barrier",
"new_group",
"新子组",
"按枚举创建新子组",
"接收",
"减少",
"减少散射",
"散射",
"散射对象列表",
"发送",
支持复杂,
AllreduceCoalescedOptions,
AllreduceOptions,
AllToAllOptions,
"屏障选项",
"广播选项",
"收集选项",
"前缀存储",
"流程组",
"减少操作",
"减少选项",
"减少分散选项",
"散点选项",
"存储",
"调试级别",
"获取调试级别",
工作,
default_pg_timeout,
获取组排名,
获取全局排名,
"获取进程组排名",
"归约操作",
"所有聚合到张量中",
"归约散射张量",
get_node_local_rank,
split_group,
]
_MPI_AVAILABLE = 真实
_NCCL_AVAILABLE = 真实
_GLOO 可用 =
真实
_UCC_AVAILABLE = 真实
_XCCL_AVAILABLE = 真实
_pickler = pickle.挑选器
_反挑选器 = pickle.
反挑选器
# 将所有从 torch._C._distributed_c10d 导入且为公开的类型的 __module__ 改为
定义 _export_c_types()
翻译
无:
可更改模块的公共类型 = [
Allreduce 合并选项,
Allreduce 选项,
AllToAll 选项,
障碍选项,
广播选项,
收集选项,
前缀存储,
流程组,
ReduceOp,
减少选项,
减少散布选项,
散点选项,
存储,
调试等级,
获取调试级别,
工作,
]
for 类型
在 _public_types_to_change_module:
类型.__module__ =
torch.distributed.distributed_c10d
_export_c_types()
尝试:
from torch._C._distributed_c10d 导入 ProcessGroupMPI
ProcessGroupMPI.__module__ = torch.distributed.distributed_c10d
全部 += ["ProcessGroupMPI"]
除了
导入错误:
_MPI_AVAILABLE = 假
尝试:
来自 torch._C._distributed_c10d
导入 ProcessGroupNCCL
ProcessGroupNCCL.__module__ = torch.distributed.distributed_c10d
全部 += [
ProcessGroupNCCL]
除了
导入错误:
_NCCL_AVAILABLE = 假
尝试:
来自 torch._C._distributed_c10d
导入
流程组包装器,
网格流程组 Gloo
网格流程组 Gloo.__module__ =
torch.distributed.distributed_c10d
全部 += [
"Gloo 进程组"]
除了
导入错误:
_GLOO 可用 =
假
尝试:
来自 torch._C._distributed_c10d
导入
UCC 进程组
UCC 进程组.__module__ =
torch.distributed.distributed_c10d
全部 += [
ProcessGroupUCC]
除了
导入错误:
_UCC_AVAILABLE = 假
尝试:
来自 torch._C._distributed_c10d
导入 ProcessGroupXCCL
ProcessGroupXCCL.__module__ = torch.distributed.distributed_c10d
全部 += [
ProcessGroupXCCL]
除了
导入错误:
_XCCL_AVAILABLE = 假
记录器 =
记录.
获取日志记录器(__name__)
PG 包装器存储前缀 =
pg_wrapper
# 一些不支持复数的 reduce 操作将导致错误。
我们目前通过查看为分布式 API 提供复杂支持
将复数张量视为实数(torch.view_as_real),意味着调用
这些不受支持的运算将返回垃圾值而不是错误退出。
# (例如:max(2+3i, 3+2i) = 3+3i)
我们希望对不支持的操作调用错误输出,而不是返回垃圾值。
而不是返回垃圾值。
def 支持复数(
简化运算符: ReduceOp) -> bool:
如果支持减少操作,则返回 true。否则返回 false。
禁用列表 = [
ReduceOp.MAX,
ReduceOp.MIN,
ReduceOp.产品,
ReduceOp.带宽,
ReduceOp.逻辑或,
ReduceOp.逻辑异或,
]
返回
减少操作
不是
在
禁用列表
# TODO 将其重构为枚举/强枚举
[文档]
类
后端(str): # noqa: SLOT000
""
后端枚举类
可用后端:GLOO、NCCL、UCC、MPI、XCCL 以及其他已注册后端。
该类的值是小写字符串,例如:"gloo"。它们可以作为属性访问,例如:`Backend.NCCL`。
可以作为属性访问,例如:`Backend.NCCL`。
该类可以直接调用以解析字符串,例如,
``Backend(backend_str)`` 将检查 ``backend_str`` 是否有效,
如果有效则返回解析后的小写字符串。它也接受大写字符串,
例如,``Backend("GLOO")`` 返回 ``"gloo"``。
.. note:: 后端项 ``Backend.UNDEFINED`` 存在,但仅用作
某些字段的初始值。用户不应直接使用
不假设其存在。
"沉浸式翻译"
未定义 =
"未定义"
GLOO = "水凝胶"
NCCL = "nccl"
UCC = "ucc"
MPI = "mpi"
XCCL = "xccl"
_BackendPlugin = 命名元组(
"_后端插件", [
"创建者函数",
"扩展 API"])
_插件:
字典[str,
后端插件] = {}
后端列表 = [
未定义, GLOO, NCCL, XCCL, UCC, MPI]
第三方设备可以在此处注册默认后端支持
默认设备后端映射:
字典[
字符串,
字符串] = {
cpu: GLOO,
cuda: NCCL,
XPU: XCCL,
}
后端能力:
字典[str,
列表[str]] = {
GLOO: ["cpu", cuda
]
NCCL: [cuda
]
XCCL: [XPU
]
UCC: ["cpu", cuda
]
MPI: ["cpu", cuda
]
}
后端类型映射:
字典[str,
流程组.BackendType] = {
未定义:
流程组.BackendType.
未定义,
GLOO: 流程组.BackendType.GLOO,
NCCL: 流程组.BackendType.NCCL,
XCCL: 流程组.BackendType.XCCL,
UCC: 流程组.BackendType.UCC,
MPI: 流程组.BackendType.MPI,
}
定义 __new__(
类,
名称: str):
创建并返回该类的新实例。
if 不是 isinstance(
名称,
字符串):
抛出
值错误(
后端构造函数参数必须是字符串类型)
值 = getattr(
后端,
名称.
上(),
后端.
未定义)
if 值 ==
后端.
未定义:
值 =
名称.
小写()
返回
值
[文档] @classmethod
定义
注册后端(
类,
名称,
函数,
扩展 API=
错误,
设备:
可选[
联合[
字符串,
列表[
字符串]]] = None,
) -> None:
"""
使用给定的名称和实例化函数注册新的后端。
此类方法由第三方 `ProcessGroup` 扩展使用。
注册新的后端。
参数:
name (str): `ProcessGroup` 扩展的后端名称。它
应与 `init_process_group()` 中的匹配。
func (函数): 实例化后端的函数处理器。
函数应在后端扩展中实现
并接受四个参数,包括
``store``、``rank``、``world_size`` 和 ``timeout``。
extended_api (bool, optional): 后端是否支持扩展参数结构。
默认:``False``。如果设置为 ``True``,后端
将获取一个 ``c10d::DistributedBackendOptions`` 的实例,
一个由后端实现定义的过程组选项对象。
设备(str 或 str 列表,可选):此后端支持的设备类型
支持,例如:"cpu"、"cuda"等。如果`None`,
假设“cpu”和“cuda”
.. 注意:对第三方后端的支持是实验性的,可能会发生变化。
"""
# 这负责处理树外后端类型,后端列表中的更新表示可用性
if 不是
有属性(
后端,
名称.
上()):
setattr(后端,
名称.
上(),
名称.
小写())
if 名称.
小写()
不是
在
后端.
后端列表:
后端.
后端列表.append(
名称.
小写())
if 设备 is
不是
无:
for 设备
在
设备:
if 设备 !=
cpu
和
设备 !=
cuda:
后端.
默认设备后端映射[
设备] =
名称.
小写()
后端.
后端类型映射[
名称.
小写()] =
流程组.BackendType.
自定义
# 在后端类中更新设备能力矩阵
if 设备 is
无:
# 这更多的是对像 `threaded` 这样的组的向后兼容支持
# 假设默认设备为 "cpu" 和 "cuda",但会发出警告
警告.
警告(
f"设备能力为"{
名称}
未指定,假设为 `cpu` 和
"`cuda`。请通过 `devices` 参数指定它。"
"注册后端。"
)
后端.
后端能力[
名称.
小写()] = [
cpu,
cuda]
如果...否则 isinstance(
设备,
字符串):
# 单个设备字符串指定。直接转换为列表。
后端.
后端能力[
名称.
小写()] = [
设备]
else:
后端.
后端能力[
名称.
小写()] =
设备
后端.
_插件[
名称.
上()] =
后端.
后端插件(
函数,
扩展 API)
类
后端配置:
"""后端配置类。"""
def __init__(self, 后端:
后端):
初始化。
self.设备后端映射:
字典[str,
后端] = {}
后端 = str(
后端)
如果
后端 ==
后端.
未定义:
# 检测机器上的加速器。如果没有加速器,则返回 CPU。
# 如果没有可用,则返回 CPU。
设备类型 = torch._C.
获取加速器().
类型
尝试:
后端字符串 =
后端.
默认设备后端映射[
设备类型]
self.设备后端映射[
设备类型] =
后端(
后端字符串)
除了
键错误:
抛出
值错误(
f"我们检测到加速器"{
设备类型}
在您的机器上。
f但我们不知道该为这个加速器使用哪个通信后端。
f请在`init_process_group`调用中指定`backend`参数。
) 来自
无
如果...否则
后端.
小写()
在
后端.
后端列表:
当后端是单个字符串(不包含设备类型)时的案例
例如 "nccl","gloo","ucc","mpi"
支持的设备 =
后端.
后端能力[
后端.
小写()]
后端值 =
后端(
后端)
self.设备后端映射 =
字典.fromkeys(
支持的设备,
后端值)
如果...否则 ":"
在
后端.
小写():
# 后端指定在 "device:backend" 格式
# 确保后端字符串格式正确
# "{设备类型 1}:{后端 1},{设备类型 2}:{后端 2}"
cpu:gloo,cuda:nccl
后端错误信息字符串 = f
"自定义后端字符串参数无效:"{
后端}.
自定义后端字符串是一个实验性功能,后端字符串必须遵循以下格式:
"<设备类型 1>:<后端 1>,<设备类型 2>:<后端 2>...". 例如:'cpu:gloo,cuda:nccl'"
解析后端字符串并填充设备后端映射
for 设备后端对字符串
在
后端.
小写().
分割(","):
设备后端配对 =
设备后端配对字符串.
分割(
“:”)
如果
长度(
设备后端对) != 2:
抛出
值错误(
f"无效设备:后端配对失败:"\
{设备后端配对字符串}. {
后端错误信息}"
)
设备,
后端 =
设备后端配对
如果
设备
在 self.
设备后端映射:
抛出
值错误(
f"重复设备类型"{
设备} \
在后端字符串中:{
后端}. {
后端错误信息}"
)
self.设备后端映射[
设备] =
后端(
后端)
else:
用户指定了一个后端名称,其设备功能是
未知,假设它可以支持 PyTorch 的默认设备
(cpu 和 cuda)
警告.
警告(
f"设备能力为"{
后端}
未知,假设`cpu`和"
"`cuda`。您可以在`device:backend`格式中指定它"
"调用 `init_process_group`。"
)
后端值 =
后端(
后端)
我.
设备后端映射 = {
cpu:
后端值,
cuda:
后端值,
XPU:
后端值,
}
日志记录器.
信息(
使用后端配置:%s", self.
设备后端映射)
def __repr__(self):
返回所有以逗号分隔的设备:后端对。
返回 ",".
连接(
f"{设备}:{
后端}" for
设备,
后端
在
自身.
设备后端映射.
项目()
)
定义
获取设备后端映射(
自身) ->
字典[str,
后端
]
返回设备的后端映射
返回 self.
设备后端映射
类 _reduce_op:
r""
已废弃的枚举类。
对于求和、乘积、最小值和最大值等求约简操作:
建议使用 :class:`~torch.distributed.ReduceOp`。
"沉浸式翻译"
def __init__(self) -> 无:
# __members__ 是一个存储枚举类键值对的字典
for k, v 在 ReduceOp.
红操作类型.__members__.
项目():
setattr(self, k, v)
self.成员 = ReduceOp.
红操作类型.
成员
@deprecated(
"torch.distributed.reduce_op 已弃用,"
"请使用 torch.distributed.ReduceOp 代替",
分类=
未来警告,
)
def __getattribute__(self, 键):
返回
对象.__getattribute__(self,
键)
reduce_op = _reduce_op()
[文档]
类
P2P 流:
""
一个用于构建针对 ``batch_isend_irecv`` 的点对点操作的类。
这个类构建了 P2P 操作类型、通信缓冲区、节点排名
进程组,并标记。此类实例将被传递到
用于点对点通信的 `batch_isend_irecv`。
参数:
op(可调用对象):一个用于向或从对等进程发送或接收数据的函数。
``op``的类型可以是``torch.distributed.isend``或
``torch.distributed.irecv``。
tensor(张量,Tensor):要发送或接收的张量。
peer(int,可选):目标或源 rank。
group(ProcessGroup,可选):要工作的进程组。如果为 None,
则使用默认进程组。
tag(整型,可选):用于匹配发送与接收的标签。
group_peer(整型,可选):目标或源排名。
"沉浸式翻译"
def __init__(
self,
操作:
可调用,
张量: torch.
张量,
同行:
可选[int] =
无,
组:
可选[
流程组] =
无,
标签:
整型 = 0,
小组对等:
可选[int] =
无,
):
初始化。
self.操作符 =
操作符
self.张量 =
张量
self.群组 =
群组或默认群组(
组)
self.对等方 =
标准化群组排名(
self.组,
同行,
小组对等,
返回全局=
真实
)
self.标签 =
标签
self.群组对等 =
标准化群组排名(self.
组,
同行,
小组对等)
def __new__(
类,
操作:
可调用,
张量: torch.
张量,
同行:
可选[int] =
无,
组:
可选[
流程组] =
无,
标签:
整型 = 0,
小组对等:
可选[int] =
无,
):
创建并返回该类的新实例。
_检查操作(
操作)
_检查单个张量(
张量,
"张量")
返回
对象.__new__(
类)
def __repr__(self):
我的群组排名 =
获取排名(self.
组)
操作名称 = self.
操作.__name__
group_name = 我.
组.group_name
如果
自身.
群组
否则
默认分页
如果
发送
在
操作符名称:
s = 我的群组排名
d = 自身.
群组对等
如果...否则
接收
在
操作符名称:
s = 自身.
群组对等
d = 我的群组排名
否则:
返回
超级().__repr__()
返回 f
P2POp({
操作符名称} pg={
群组名称}, group_src={s}, group_dst={d}, {self.
张量.
形状}, {self.
张量.
数据类型})"
类
_集合操作:
""
捕获集体操作的类。
参数:
op (Callable):集体函数,例如 ``torch.distributed.all_reduce``。
tensor (Tensor):要操作的张量。
dst_tensor (Tensor, 可选):当源张量和目标张量不同时提供。
redop(ReduceOp,可选):减少操作。
root(int,可选):广播或减少的根。
"沉浸式翻译"
def __init__(
self,
操作:
可调用,
张量: torch.
张量,
目标张量:
可选[torch.
张量] =
无,
红操:
可选[ReduceOp] =
无,
根:
可选[int] =
无,
):
self.操作符 =
操作符
self.张量 =
张量
self.目标张量 =
目标张量
self.红色 =
红色
self.根 =
根
# DO NOT USE THESE FIELDS DIRECTLY.
# Use them through the _world object to make sure the _world override mechanism
_pg_map: 字典[
流程组,
元组[str,
存储]] = {}
_pg_names: 字典[
流程组, str] = {}
_pg_group_ranks: 字典[
流程组,
字典[int, int]] = {}
# 对于 pg,它是一个从 ProcessGroup 到 BackendConfig 的映射
_pg 后端配置:
字典[
流程组, str] = {}
_组计数 = 0
_标签到 pg:
字典[str,
列表[
流程组]] = {}
_pg 到标签:
字典[
流程组, str] = {}
后端:
可选[str] =
无
类
_世界:
""
c10d 进程组状态的容器类。
在注册和查找 PG 状态时使用。
..警告:: 这是一个实验性 API,旨在暴露内部工作原理。
c10d 的版本,可能随时更改。
"沉浸式翻译"
def __init__(self) -> 无:
自我.
_默认_pg =
无
self._合并状态_postgresql:
字典[
流程组,
列表[
_集合操作]] = {}
@property
def 默认页面(self)
翻译
可选[
流程组
]
""
包含集群所有 rank 的进程组。
当需要进程组但未提供时,c10d API 将使用此默认进程组。
但未提供。
"沉浸式翻译"
返回 self.
_默认_pg
@default_pg.setter
def 默认页面(self,
值) ->
无:
我.
_默认_pg =
值
@property
定义 pg_map(
自身) ->
字典[
流程组,
元组[str,
存储
]]
"""
提供从进程组到后端名称和存储的映射并存储。
对于 NCCL 和 GLOO 进程组,它是一个从进程组到(后端,存储)的映射。
对于 MPI 进程组,它是一个从进程组到(后端,无)的映射。
TODO 不要暴露地图,暴露细粒度操作
"沉浸式翻译"
全局 _pg_map
返回 _pg_map
@property
定义
数据库名称(
我) ->
字典[
流程组, str
]
""
处理组名称,从 ProcessGroup 到字符串的映射。
TODO 不要暴露地图,暴露细粒度操作
"沉浸式翻译"
全局 _pg_names
返回 _pg_names
@property
def pg_group_ranks(self) -> 字典[
流程组,
字典[int, int
]]
""
进程组的全局排名到本地排名映射。
TODO 不要暴露地图,暴露细粒度操作
"沉浸式翻译"
全局 _pg_group_ranks
返回 _pg_group_ranks
@property
def PostgreSQL 后端配置(self) ->
字典[
流程组, str
]
""
处理组后端配置。
TODO 不要暴露地图,暴露细粒度操作
"沉浸式翻译"
全局 _pg_backend_config
返回 _pg_backend_config
@property
def 群组计数(self) -> int:
""
默认命名下的进程组数量。
TODO 不要暴露群组数量,使用其他代替。
"沉浸式翻译"
全局
_组计数
返回
_组计数
@group_count.setter
def 群组计数(self, value: int) ->
无:
用于在全局同步时计算 ProcessGroups 的名称。
全局
_组计数
_组计数 =
值
@property
def 标签转 PostgreSQL(self) ->
字典[str,
列表[
流程组
]]
全局
_标签转 PostgreSQL
返回
_标签转 PostgreSQL
@property
def PostgreSQL 转标签(self) ->
字典[
流程组, str
]
全局
_ pg_to_tag
返回
_ pg_to_tag
@property
def pg_coalesce_state_合并状态(self) ->
字典[
流程组,
列表[
_集合操作
]]
返回 self._pg_coalesce_state
@property
def pg_config_info(我) ->
列表[
字典[str, Any
]]
""
返回包含进程组和后端及其唯一 ID 和配置(类型和排名)的字典列表。
以及它们的配置信息(类型和排名)。
"沉浸式翻译"
配置信息:
列表[
字典[
字符串,
任何]] =
输入文本为空,请提供需要翻译的文本
默认 pg 大小 =
获取组大小(
无)
for pg 在 self.pg_map.
键():
排名 = self.pg_group_ranks[pg]
配置信息.append(
{
pg 名称: self.
数据库名称[pg
]
"pg_desc": pg.群组描述,
"后端配置": self.
PostgreSQL 后端配置[pg
]
排名: (
列表(
排名.
键())
如果
长度(
排名) !=
默认 pg 大小
否则
输入文本为空,请提供需要翻译的文本
), # '排名'是一个空列表,当所有排名都参与 pg 时
"小组大小":
长度(
排名),
"组数":
自身.
群组计数,
}
)
返回
配置信息
_世界 =
_世界()
保存了 c10 使用的``_世界``单例实例。实验性扩展点,用于覆盖它
类
世界元数据(
类型):
"""
元类,用于“group”和“GroupMember”。
允许它们具有类属性“WORLD”。
"沉浸式翻译"
# 初始化后指向默认的 PG。
@property
def 世界(
类) ->
可选[
流程组
]
返回
_世界.
默认页面
@WORLD.setter
def 世界(
类, pg:
可选[
流程组
)]
_世界.
默认页面 = pg
类
组(
元类=
世界元数据):
“分组类。占位符。”
类
组成员(
元类=
世界元数据):
"小组成员类。"
非群组成员 = -100
def _获取默认超时(
后端:
后端) ->
时间差:
# 关于 nccl 与其他后端超时(constants.py)的说明
如果
后端 ==
后端.NCCL:
如果
不是 isinstance(
默认 PG NCCL 超时,
时间差):
# TODO:今天在 CPU 上初始化 pgnccl 后端时,触发了 CI 中的这个断言
# 警告:应修复 moco 模型。
警告.
警告(
尝试获取 nccl 后端的默认超时时间,但未编译 NCCL 支持
)
返回
默认 PG 超时
返回
默认 PG nccl 超时
否则:
返回
默认 PG 超时
def _检查有效超时(
超时时间: Any) ->
无:
if 不是 isinstance(
超时时间,
时间差):
抛出
类型错误(
f预期超时参数应为 datetime.timedelta 类型,实际得到{
超时时间}"
)
默认进程组状态
默认 PG 初始化方法:
可选[
字符串] =
无
店铺基础屏障前缀 =
store_based_barrier_key
定义
_获取对象集合设备(
组:
可选[
流程组] =
无) ->
字符串:
"""
.. 注意:: 这是一个内部辅助工具,不具有回溯功能
兼容性,请谨慎使用。
返回与“group”一起用于对象集合的设备类型
屏障
选择规则如下:
1. 如果用户在 `init_process_group` 调用中指定了恰好一个后端:
使用该后端
2. 否则如果用户在 `init_process_group` 中指定了多个 "device:backend" 对:
如果“cpu”在这些对中,则使用“cpu”(因为对象位于 cpu 内存中);
否则,使用第一个后端(有点像随机选择)。
参数:
group(ProcessGroup,可选):要工作的进程组。如果为 None,
则使用默认进程组。
返回:
使用对象集合的 ``group`` 的设备类型。
"""
群组 =
群组
或者
获取默认组()
if 不是 isinstance(
组,
流程组):
警告.
警告(
f您正在使用一个后端{
类型(
组)}
作为进程组。
自 PyTorch 2.0 起,此用法已弃用。请使用公共 API。
PyTorch 分布式代替。,
)
向后兼容,以应对传入的 `group` 实际上是后端(如 `ProcessGroupGloo`)而非 PT 2.0 中的 `ProcessGroup` 的情况
实际上是一个后端(如 `ProcessGroupGloo`)而不是 PT 2.0 意义上的 `ProcessGroup`
RPC 使用 Gloo 进行对象集合
if isinstance(组,
网格流程组 Gloo):
# RPC 使用 Gloo 进行对象集合
返回
cpu
else:
抛出
值错误(f
"期望得到一个流程组,但得到了一个"{
类型(
组)}.")
"""
``group._device_types`` 是一个 pybind 属性,返回设备
("cpu", "cuda" 等) 由 ``group`` 支持的设备。如果 ``group`` 支持多个设备,则可以有多个
``group`` 支持多个设备。
"""
设备 =
组._device_types
if 长度(
设备) == 1:
# 用户在`init_process_group`中恰好修复了一个后端
返回
设备[0].
类型
如果...否则
长度(
设备) == 0:
# 没有与此 PG 注册任何后端(可能是因为没有)
集体已经运行了吗?我们选择 CPU 作为默认,希望
# 这将懒加载初始化 Gloo 或其他可用的 CPU 后端。
返回
cpu
如果...否则 torch.
设备(
cpu)
在
设备:
该 PG 中有多个后端,CPU 就是其中之一。
由于对象位于 CPU 内存中,因此优先选择 CPU,无需使用设备。
复制。
返回
cpu
else:
后端列表中没有 CPU,随机选择第一个后端。
返回
设备[0].
类型
定义 _get_pg_default_device(
组:
可选[
流程组] =
无)
翻译 torch.
设备:
"""
.. 注意:: 此方法将被弃用,它仅保留用于
向后兼容的原因。替代方案:
- 如果您需要为对象集合查找设备,请使用
`_get_object_coll_device(group)`.
如果您需要查询组支持的设备类型,请使用
`_device_capability(group)`.
返回注册的 ``group`` 设备类型。
例如,如果调用了 `init_process_group("nccl", ...)`,则返回的
值将是 `torch.device("cuda")`。
如果没有注册设备,则出错。
参数:
group(ProcessGroup,可选):要工作的进程组。如果为 None,
则使用默认进程组。
返回:
torch.device:与`group`注册的设备类型。
"""
警告.
警告(
`_get_pg_default_device` 将会被弃用,它仅保留用于
向后兼容性原因。如果您需要为对象“
集体,请使用 `_get_object_coll_device`。如果您需要查询“
支持该组设备的类型,请使用“
"设备能力组(group)。"
)
群组 =
群组
或者
获取默认组()
if 不是 isinstance(
组,
流程组):
向后兼容,以应对传入的 `group` 实际上是后端(如 `ProcessGroupGloo`)而非 PT 2.0 中的 `ProcessGroup` 的情况
实际上是一个后端(如 `ProcessGroupGloo`)而不是 PT 2.0 意义上的 `ProcessGroup`
RPC 使用 Gloo 进行对象集合
警告.
警告(
f您正在使用一个后端{
类型(
组)}
作为进程组。
自 PyTorch 2.0 起,此用法已弃用。请使用公共 API。
PyTorch 分布式代替。,
未来警告,
栈级别=3,
)
大多数用户使用私有 API 创建 Gloo 以进行对象聚合。
返回 torch.
设备(
cpu)
"""
``group._device_types`` 是一个 pybind 属性,返回设备
("cpu", "cuda" 等) 由 ``group`` 支持的设备。如果 ``group`` 支持多个设备,则可以有多个
``group`` 支持多个设备。
"""
设备 =
组._device_types
if 长度(
设备) == 1:
# 用户在`init_process_group`中恰好修复了一个后端
返回
设备[0]
如果...否则
长度(
设备) == 0:
抛出
运行时错误(
"默认设备未找到,因为没有注册后端。"
"与此流程组相关。"
)
else:
# 此流程组中有多个后端。
if torch.设备(
cpu)
在
设备:
rv = torch.设备(
cpu)
else:
rv = 设备[0]
警告.
警告(
"已在此进程组中注册了多个后端。我们无法"
f确定哪个是默认的。返回{rv}
.
请考虑使用其他 API。
)
返回 rv
定义
设备能力(
组:
可选[
流程组] =
无)
翻译
列表[
字符串
]
"""
返回由 `group` 支持的设备类型。
参数:
group(进程组,可选):要查询的进程组。如果为 None,则使用默认进程组。
则使用默认进程组。
返回:
List[str]:由 `group` 支持的设备类型列表。
"""
群组 =
群组
或者
获取默认组()
返回 [
设备.
类型 for
设备
在
组.
设备类型]
@_time_logger
定义
基于商店的屏障(
排名,
店铺,
群组名称,
集合计数,
超时时间,
日志记录间隔=
时间差(
秒数=10),
) 翻译
无:
"""
基于存储的同步进程同步屏障。
基于存储的屏障,用于在进程同步后使用
``init_process_group`` 或 ``new_group``。仅适用于这两种方法,不是 ``barrier()`` 的通用替代方案。
这些方法,不是 ``barrier()`` 的通用替代方案。
"""
store_key = f"{店铺基础前缀}:{
群组名称}"
店铺.
添加(
存储键, 1)
日志记录器.
调试(
添加键:%s
保存用于排名:%s",
存储键,
排名)
现在等待所有工作者与商店确认。
世界大小 =
集合计数
工作人员数量 =
店铺.
添加(
存储键, 0)
最后一个工作键 = f"{
存储键}
:最后一个工作
if 工作人员数量 ==
世界大小:
店铺.
设置(
最后一个工作人员键, "1")
# 将超时时间调整为至少 10 秒 + 每千个排名加 1 秒,以降低超时的可能性
在规模测试中经验性地找到的值
日志记录间隔 =
最大值(
日志记录间隔,
时间差(
秒数=10 +
世界大小 / 1000))
开始 =
时间.
时间()
当
是:
尝试:
这将在我们打印日志的日志间隔之后抛出异常
群组状态或超时正式抛出运行时错误
店铺.
等待
[
最后一个工作人员键
]
日志记录间隔)
断开
除了
运行时错误
是 e:
工作人员数量 =
店铺.
添加(
存储键, 0)
# 定期打印状态以保持跟踪。
日志记录器.
调试(
"在商店基于屏障等待初始化进程组"%s
秒
"rank: "%s
,键:%s
world_size=%s
,num_workers_joined=%s
, 超时=%s error=%s)",
时间.
时间() -
开始,
排名,
存储键,
世界大小,
工作者数量,
超时时间,
e,
)
if 时间差(
秒数=(
时间.
时间() -
开始)) >
超时时间:
抛出 DistStoreError( # noqa: B904
"在“store based barrier”上初始化进程组超时"
f"排名"{
排名}
,对于键:{
存储键}
world_size={
世界大小}
,"
fnum_workers_joined={
工作者数量}
, 超时={
超时时间} error={e})"
)
日志记录器.
信息(
排名%s
完成基于键的存储屏障%s
的%s
节点。,
排名,
存储键,
世界大小,
)
定义 _rank_not_in_group(
组:
可选[
流程组])
翻译
布尔:
检查当前进程的 rank 是否不在给定组中。
if 群组 is
无:
返回
假
返回
群组 ==
组成员.
非群组成员
定义 _warn_not_in_group(
操作符名称)
翻译
无:
全球排名 = -1 if
组成员.WORLD is
无
否则
组成员.
世界.
排名()
警告.
警告(
f运行{
操作符名称}
全球排名{
全球排名}
“不属于”
“的给定组。”
)
[文档]def get_group_rank(group: ProcessGroup, global_rank: int) -> int:
"""
将全局排名转换为组排名。
``global_rank`` 必须是 ``group`` 的一个部分,否则会引发 RuntimeError。
Args:
群组 (ProcessGroup):查找相对排名的群组。
global_rank (int):查询的全局排名。
Returns:
``global_rank`` 相对于 ``group`` 的全局排名
注意。在默认进程组上调用此函数将返回恒等值
"""
如果 group 是 GroupMember.WORLD:
返回全局排名
如果 group 不在 _world.pg_group_ranks 中:
raise ValueError(
f"组 {group} 未注册,请使用 torch.distributed.new_group API 创建组"
)
group_ranks = _world.pg_group_ranks[group]
if global_rank not in group_ranks:
raise ValueError(f"全局排名 {global_rank} 不属于组 {group}")
return group_ranks[global_rank]
[文档]def get_global_rank(group: ProcessGroup, group_rank: int) -> int:
"""
将组排名转换为全局排名。
``group_rank`` 必须是 `group` 的一部分,否则会引发 RuntimeError。
Args:
group (ProcessGroup): 要查找全局排名的 ProcessGroup。
group_rank (int): 要查询的组排名。
返回值:
``group_rank`` 相对于 ``group`` 的全局排名
注意:在默认进程组上调用此函数返回恒等值
"""
如果组是 GroupMember.WORLD:
返回组等级
如果组不在_world.pg_group_ranks 中:
抛出 ValueError 异常(
组 {group} 未注册,请使用 torch.distributed.new_group API 创建组
)
对于 rank 和 grp_rank 在 _world.pg_group_ranks[group].items() 中:
如果 grp_rank 等于 group_rank:
返回排名
抛出 ValueError 异常(f"组排名 {group_rank} 不属于组 {group}")
# TODO: 在生态系统迁移后删除此条
@deprecated(
`torch.distributed.distributed_c10d._get_global_rank` 已弃用,
"请使用 `torch.distributed.distributed_c10d.get_global_rank` 代替",
分类=
未来警告,
)
定义 _get_global_rank(
组,
排名)
翻译 int:
使用 get_global_rank 作为此方法已弃用。
返回
获取全球排名(
组,
排名)
[文档]def get_process_group_ranks(group: ProcessGroup) -> list[int]:
"""
获取与 ``group`` 相关的所有排名。
Args:
group (ProcessGroup):获取所有排名的 ProcessGroup。
返回:
按组排名顺序排列的全局排名列表。
"""
return list(_world.pg_group_ranks[group].keys())
定义
获取组大小(
组)
翻译 int:
获取指定组的全球大小。
if 群组 is
组成员.WORLD
或者
群组 is
无:
默认页面 =
获取默认组()
返回
默认页面.
大小()
返回
组.
大小()
定义
获取按名称获取组大小(
群组名称:
字符串)
翻译 int:
群组 = _resolve_process_group(
群组名称)
返回
组.
大小()
定义 _resolve_group_name_by_ranks_and_tag(
排名:
列表[int
]
标签:
字符串)
翻译
字符串:
# TODO(yifu): 在 ranks + tag 不再受支持后删除此函数
# 功能性集体进程组的标识符
群组 = _find_pg_by_ranks_and_tag(
标签,
排名)
if 群组 is
无:
抛出
值错误(
输入文本翻译为简体中文为:"")
返回
组.group_name
定义
_检查单个张量(
参数,
` 的类型为 List[torch.Tensor])
翻译
无:
检查参数 `param_name` 是否为单个张量。
if 不是 isinstance(
参数,
火炬.
张量):
抛出
类型错误(
f无效的函数参数。期望参数 `{
` 的类型为 List[torch.Tensor]}
`of type torch.Tensor`
但是得到了{
类型(
参数)}
`而不是。`
)
定义
`_check_tensor_list`(
参数,
` 的类型为 List[torch.Tensor])
翻译
无:
检查参数 `param_name` 是否为张量列表。
if 不是 isinstance(
参数,
列表):
抛出
类型错误(
f无效的函数参数。期望参数 `{
` 的类型为 List[torch.Tensor]}
` 的类型为 List[torch.Tensor]
但是得到了{
类型(
参数)}
`而不是。`
)
如果...否则
不是
所有(isinstance(p,
火炬.
张量) for p
在
参数):
抛出
类型错误(
f无效的函数参数。期望参数 `{
` 的类型为 List[torch.Tensor]}
` 的类型为 List[torch.Tensor]
但是得到了{
类型(
参数)}
具有类型元素的{[
类型(p) for p
在
参数]}."""
)
定义
群组或默认群组(
组:
可选[
流程组] =
无)
翻译
流程组:
if 群组 is
无
或者
群组 is
组成员.
世界:
群组 =
获取默认组()
返回
群组
定义
标准化群组排名(
组:
流程组,
全球排名:
可选[int] =
无,
群组排名:
可选[int] =
无,
返回全局:
布尔值 =
错误,
) 翻译 int:
"""
辅助方法,用于接受全局排名或组排名,并生成组排名。
如果 'return_global' 为 true,则生成全局排名而不是组排名。
"""
if 组排名 is
不是
无:
if 全球排名 is
不是
无:
抛出
值错误(
不能同时指定 group_rank 和 global_rank)
全球排名 =
获取全球排名(
组,
群组排名)
else:
if 全球排名 is
无:
抛出
值错误(
"必须指定全球排名或群组排名")
组排名 =
获取群组排名(
组,
全球排名)
返回
全球排名 if
返回全局
否则
组排名
定义
_检查非自身排名(
组:
流程组,
排名: int,
排名类型:
字符串):
if 组.
排名() ==
排名:
抛出
值错误(
f无效{
排名类型}
排名:{
排名类型}
排名不应与 "
"当前进程的排名。"
)
定义
作为可迭代对象(
对象)
翻译
集合.abc.
迭代器:
返回
对象 if isinstance(
对象,
列表)
否则 (
对象,)
定义
确保所有张量具有相同的数据类型(*
张量)
翻译
无:
最后的数据类型 =
无
for 张量
在 itertools.chain.from_iterable(
地图(
作为可迭代对象,
张量)):
张量数据类型 =
张量.dtype
复杂类型及其元素类型可以混合
if 张量数据类型.
是复杂的:
张量数据类型 = (
火炬.float32 if
张量数据类型 ==
火炬.complex64
否则
火炬.complex128
)
if 最后的数据类型 is
无:
最后的数据类型 =
张量数据类型
else:
if 最后的数据类型 !=
张量数据类型:
抛出
值错误(
无效使用不同数据类型的张量
f"找到"{last_dtype}
和{
张量.
数据类型}"
)
定义
_检查操作(
操作)
翻译
无:
"检查 ``op`` 是否为 isend 或 irecv。"
if 操作符
不是
在 [isend,
接收
]
抛出
值错误(
"无效的 ``op``。期望 ``op`` "
"为 ``torch.distributed.isend`` 类型或 "
"``torch.distributed.irecv``。"
)
定义
_检查 P2P 操作列表(p2p_op_list)
翻译
无:
"""
检查`p2p_op_list`是否为 P2POp 实例的列表。
此外,还需检查所有操作使用的是同一个组。
"""
if 不是 isinstance(p2p_op_list,
列表)
或者
不是
所有(
isinstance(p2p 操作,
P2P 流) for
p2p 操作
在
p2p 操作列表
):
抛出
值错误(
"无效的 ``p2p_op_list``。每个操作应 "
"为 ``torch.distributed.P2POp`` 类型。"
)
群组 = p2p_op_list[0].
群组
if 不是
所有(
群组 ==
p2p 操作.
群组 for
p2p 操作
在 p2p_op_list):
抛出
值错误(
所有操作都需要使用相同的组。)
[文档]def is_mpi_available() -> bool:
"""检查 MPI 后端是否可用。"""
return _MPI_AVAILABLE
[文档]def is_nccl_available() -> bool:
检查 NCCL 后端是否可用。
返回_NCCL_AVAILABLE
[文档]def is_gloo_available() -> bool:
检查 Gloo 后端是否可用。
return _GLOO_AVAILABLE
定义
is_ucc_available
不可用() 翻译
布尔:
检查 UCC 后端是否可用。
返回 _UCC_AVAILABLE
[文档]def is_xccl_available() -> bool:
检查 XCCL 后端是否可用。
return _XCCL_AVAILABLE
定义
是后端可用(
后端:
字符串)
翻译
布尔:
"""
检查后端可用性。
检查指定的后端是否可用以及是否支持内置后端。
通过函数 `Backend.register_backend` 注册第三方后端。
参数:
后端 (str):后端名称。
返回:
bool:如果后端可用则返回 true,否则返回 false。
"""
如果后端有 `is_backend_available` 函数,则直接返回该函数的结果。
可用功能 = getattr(
火炬.
分布式, f
"is_"{
后端.
小写()}
_可用",
无)
if 可用功能:
返回
可用功能()
返回
后端.
小写()
在
后端.
后端列表
[文档]def is_initialized() -> bool:
"""检查默认进程组是否已初始化。"""
return GroupMember.WORLD 不为 None
[文档]def is_torchelastic_launched() -> bool:
"""
检查此进程是否使用 ``torch.distributed.elastic`` (即 torchelastic) 启动
存在 ``TORCHELASTIC_RUN_ID`` 环境变量
变量用作代理以确定当前进程
启动了 torchelastic。这是一个合理的代理,因为
``TORCHELASTIC_RUN_ID`` 映射到会合 ID,该 ID 始终是
表示用于对等发现目的的工作 ID 的非空值。
"""
返回 os.getenv("TORCHELASTIC_RUN_ID") 是否不为空。
定义 _is_barrier_after_init()
翻译 int:
控制是否在进程组初始化后执行
障碍的环境变量。默认值是 0,即没有障碍。如果您
遇到这个问题设置,您可以设置
`TORCH_DIST_INIT_BARRIER=1` 以添加障碍。
返回 int(os.
获取环境变量("TORCH_DIST_INIT_BARRIER", "0"))
定义
获取默认组()
翻译
流程组:
"获取由 init_process_group 创建的默认进程组。"
if 不是
已初始化():
抛出
值错误(
"默认进程组尚未初始化,"
"请确保已调用 init_process_group。"
)
if 类型检查:
返回
非空(
组成员.
世界)
else:
返回
组成员.WORLD
定义 _get_default_store()
翻译
存储:
"获取由 init_process_group 创建的默认存储。"
if 不是
已初始化():
抛出
值错误(
"默认进程组尚未初始化,"
"请确保已调用 init_process_group。"
)
默认页面 =
获取默认组()
_, 默认商店 =
_世界.pg_map[
默认页面]
返回
默认商店
定义
_更新默认数据库(pg)
翻译
无:
_世界.
默认页面 = pg
排名 = pg.
排名() if pg is
不是
无
和 pg !=
组成员.
非群组成员
否则 -1
火炬._C.
_分布式_c10d.
设置全局排名(
排名)
定义
获取后端配置(
组:
可选[
流程组] =
无)
翻译
字符串:
"""
返回指定进程组的后端配置。
参数:
组(ProcessGroup,可选):要工作的进程组。
默认是通用主进程组。如果另一个特定组
指定后,调用进程必须是:attr:`group`的一部分。
返回:
给定进程组的后端配置,作为小写字符串。
"""
pg = 群组
或者
获取默认组()
if _rank_not_in_group(pg):
抛出
值错误(
无效的过程组指定)
backend_config = _世界.
PostgreSQL 后端配置.
获取(pg)
返回
字符串(
非空(
后端配置))
[文档]def get_backend(group: Optional[ProcessGroup] = None) -> Backend:
"""
返回给定进程组的后端。
参数:
组(ProcessGroup,可选):要工作的进程组。
默认为主流程组。如果另一个特定组
指定后,调用进程必须是:attr:`group`的一部分。
返回:
给定进程组的后端作为小写字符串。
"""
pg = group or _get_default_group()
如果 _rank_not_in_group(pg):
指定的进程组无效,请抛出 ValueError 异常
如果 pg 在 _world.pg_map 中,则 pg_store = _world.pg_map[pg],否则 pg_store = None
返回 Backend(not_none(pg_store)[0])
定义
获取设备的默认后端(
设备:
联合[
字符串,
火炬.
设备])
翻译
字符串:
"""
返回给定设备的默认后端。
参数:
Union[str, torch.device]: 获取默认后端所需的设备。
返回:
给定设备的默认后端,以小写字符串形式表示。
"""
if isinstance(设备,
火炬.
设备):
设备字符串 =
设备.
类型
else:
设备字符串 =
火炬.
设备(
设备).
类型
后端 =
后端.
默认设备后端映射.
获取(
设备字符串)
if 后端 is
无:
抛出
值错误(f
"默认后端未为设备注册:"{
设备}")
返回
后端
定义
获取进程组 UID(pg:
流程组)
翻译 int:
后端 =
无
尝试:
后端 = pg.
获取后端(
火炬.
设备(
cuda))
除了
运行时错误:
通过
if 检查 NCCL 是否可用()
和 isinstance(
后端, ProcessGroupNCCL):
返回
后端.
用户 ID
返回 -1
定义 _get_pg_config(
组:
可选[
流程组] =
无)
翻译
字典[
字符串,
任何
]
"""
返回给定进程组的 pg 配置。
"""
pg = 群组
或者
获取默认组()
返回 {
pg 名称:
获取进程组名称(pg),
"pg_desc": pg.群组描述,
"后端配置":
获取后端配置(pg),
pg_size:
获取组大小(pg),
排名:
获取进程组排名(pg),
}
定义
_获取所有 pg 配置()
翻译
列表[
字典[
字符串,
任何
]]
"""
返回所有进程组的 pg 配置。
"""
配置信息:
列表[
字典[
字符串,
任何]] = [
_get_pg_config(pg) for pg 在
_世界.pg_map.
键()
]
返回
配置信息
定义 get_pg_count()
翻译 int:
"""
返回进程组的数量。
"""
返回
_世界.group_count
定义 get_node_local_rank(fallback_rank:
可选[int] =
无)
翻译 int:
"""
返回当前进程相对于节点的本地排名。
从语义上讲,这是一个将进程映射到设备的有用概念。
例如,在一个有 8 个加速器的节点上,您可以使用节点本地排名来决定
将进程绑定到哪个加速器设备。
实际上,节点本地排名的分配由 PyTorch 之外的进程启动器处理,
并且通过`LOCAL_RANK`环境变量进行通信。
Torchrun 会自动填充`LOCAL_RANK`,但其他启动器可能不会。如果未指定`LOCAL_RANK`,
此 API 将回退到提供的`fallback_rank`参数(如果指定),否则将引发错误。
意图是允许编写一个可以在单设备或多设备环境中运行而不会出现错误的应用程序。
"""
if 本地排名
在 os.
环境:
返回 int(os.
环境[
本地排名])
如果...否则
回退排名 is
不是
无:
返回 int(fallback_rank)
抛出
运行时错误(
"LOCAL_RANK 不在环境中。请考虑传递 fallback_rank 以允许`get_node_local_rank`工作,"
"假设您不在多设备环境中运行,并且希望代码在本地运行。"
)
定义
_为所有 pgs 添加临时超时(
超时时间:
时间差)
翻译
无:
"""
该 API 为所有本地 PG 添加了短暂的超时扩展
在一个 rank 上。当第一个集体操作发出后,超时将被重置
在 API 调用完成后。
注意:目前我们仅支持为 cuda 后端设置超时。
注意:虽然此功能
在特定场景中提供灵活性,但它引入了有状态性
超时设置。因此,建议谨慎使用此 API
考虑其他方法,例如直接设置超时
或使用屏障集体(可以为屏障设置任何超时)
尽可能。
参数:
超时(timedelta):扩展超时的增量。
返回:
无。
"""
for pg 在
_世界.pg_map.
键():
设备 = pg._device_types
if 火炬.
设备(
cuda)
在
设备:
后端 = pg.
获取后端(
火炬.
设备(
cuda))
if 检查 NCCL 是否可用()
和 isinstance(
后端, ProcessGroupNCCL):
后端.
添加临时超时(
超时时间)
定义
设置 PG 超时(
超时时间:
时间差,
组:
可选[
流程组] =
无)
翻译
无:
"""
设置用户希望使用不同于默认值的超时时,给定进程组的超时时间。
默认值。
参数:
超时(timedelta):用户想要设置的针对进程组操作的超时时间。默认值为 NCCL 的 10 分钟和其他后端的 30 分钟。
超时(timedelta):用户想要设置的针对进程组操作的超时时间。默认值为 NCCL 的 10 分钟和其他后端的 30 分钟。
这是集体异步终止后的持续时间,之后进程将崩溃。
这是因为 CUDA 执行是异步的,继续执行用户代码不再安全。
失败的异步 NCCL 操作可能会导致后续 CUDA 操作在损坏的数据上运行。
当设置 TORCH_NCCL_BLOCKING_WAIT 时,进程将阻塞并等待此超时。
组(ProcessGroup,可选):要工作的进程组。
默认是通用主进程组。如果另一个特定组
指定后,调用进程必须是:attr:`group`的一部分。
返回:
无
"""
如果
群组 is
无:
群组 =
获取默认组()
如果 _rank_not_in_group(
组):
抛出
值错误(
无效的过程组指定)
断言 isinstance(
组,
流程组)
设备 =
组._device_types
后端 =
设置()
如果
火炬.
设备("cpu")
在
设备
和
gloo 是否可用():
后端 =
组.
获取后端(
火炬.
设备("cpu"))
如果 isinstance(
后端,
网格流程组 Gloo):
后端.
添加(
后端)
if 火炬.
设备(
cuda)
在
设备:
后端 =
组.
获取后端(
火炬.
设备(
cuda))
if 检查 NCCL 是否可用()
和 isinstance(
后端, ProcessGroupNCCL):
后端.
添加(
后端) # type: ignore[arg-type]
如果...否则
gloo 是否可用()
和 isinstance(
后端,
网格流程组 Gloo):
后端.
添加(
后端) # type: ignore[arg-type]
如果
长度(
后端) == 0:
警告.
警告(
"设置超时现在仅支持 nccl 或 gloo。")
for 后端
在
后端:
后端.
_设置默认超时(
超时时间)
[文档]@_exception_logger
@_time_logger
定义
初始化进程组(
后端:
可选[
字符串] =
无,
初始化方法:
可选[
字符串] =
无,
超时时间:
可选[
时间差] =
无,
世界大小:
整型 = -1,
排名:
整型 = -1,
店铺:
可选[
存储] =
无,
群组名称:
字符串 =
输入文本翻译为简体中文为:"",
PG 选项:
可选[
任何] =
无,
设备 ID:
可选[
火炬.
设备] =
无,
) 翻译
无:
"""
初始化默认分布式进程组。
这也将初始化分布式包。
初始化进程组主要有两种方式:
1. 明确指定 `store`、`rank` 和 `world_size`。
指定 ``init_method``(一个 URL 字符串),表示如何/在哪里发现对等节点。
可选指定 ``rank`` 和 ``world_size``,或者将所有必需参数编码在 URL 中并省略它们。
如果两者都没有指定,则假定 ``init_method`` 为 "env://"。
如果两者都没有指定,则假定 ``init_method`` 为 "env://"。
参数:
后端(str 或 Backend,可选):要使用的后端。根据构建时配置,有效值包括`mpi`、`gloo`、`nccl`、`ucc`或由第三方插件注册的值。
构建时配置,有效值包括 `mpi`、`gloo`、`nccl`、`ucc` 或一个由第三方插件注册的值。
`nccl`、`ucc` 或一个由第三方插件注册的值。
。
自 2.6 版本起,如果未提供 ``backend``,c10d 将使用默认后端
为由 `device_id` 关键字参数指定的设备类型注册
(如果提供)。目前已知的默认注册包括:``nccl``
对于 ``cuda``,对于 ``cpu`` 使用 ``gloo``。
如果没有提供 ``backend`` 或 ``device_id``,c10d 将在运行时机器上检测加速器并使用为该检测到的加速器注册的后端(或 ``cpu``)。
此字段可以提供一个小写字符串(例如,``"gloo"``)。
该字段可以提供一个小写字符串(例如,``"gloo"``)。
此字段可以提供一个小写字符串(例如,``"gloo"``)。
也可以通过 :class:`Backend` 属性访问(例如,
``Backend.GLOO``).
如果使用每台机器多个进程与 `nccl` 后端,每个
进程必须独占访问它所使用的每个 GPU,因为共享
进程间使用 GPU 可能导致死锁或 NCCL 无效使用。
``ucc``后端是实验性的。
init_method (str, 可选): 指定如何初始化进程组的 URL。默认为"env://"如果没有指定。
process group. 默认是 "env://" 如果没有指定。
``init_method`` 或 ``store`` 被指定。
与 ``store`` 互斥。
world_size (int, 可选): 参与作业的进程数。
如果指定了 ``store``,则必须提供。
rank(int,可选):当前进程的排名(它应该是一个介于 0 和 ``world_size``-1 之间的数字)。
number between 0 and ``world_size``-1)。
Required if ``store`` is specified.
store(Store,可选):所有工作者可访问的键/值存储,用于
交换连接/地址信息。
与 `init_method` 互斥。
超时(timedelta,可选):对操作执行的超时时间
进程组。默认值为 NCCL 的 10 分钟和其他后端的 30 分钟。
这是集体异步终止后的持续时间,之后进程将崩溃。
这是因为 CUDA 执行是异步的,继续执行用户代码不再安全。
失败的异步 NCCL 操作可能会导致后续 CUDA 操作在损坏的数据上运行。
当设置 TORCH_NCCL_BLOCKING_WAIT 时,进程将阻塞并等待此超时。
group_name(str,可选,已弃用):组名。此参数将被忽略
pg_options(ProcessGroupOptions,可选):进程组选项
在构建特定进程组时需要指定哪些附加选项
目前,我们支持的是 ``ProcessGroupNCCL.Options`` 选项,用于 ``nccl`` 后端,可以指定 ``is_high_priority_stream`` 以确保
我们支持的是 ``ProcessGroupNCCL.Options`` 选项,用于 ``nccl`` 后端,可以指定 ``is_high_priority_stream`` 以确保
可以指定 ``is_high_priority_stream`` 以确保
nccl 后端可以抓取高优先级的 cuda 流,当有计算内核等待时。
对于配置 nccl 的其他可用选项,请参阅。
请见 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t
device_id(torch.device,可选):一个单独的、特定的设备。
将此进程绑定到,允许针对后端特定
优化。目前这有两个效果,仅在
NCCL:通信器立即形成(调用)
立即调用 `ncclCommInit*` 而不是正常的延迟
调用)和子组将使用`ncclCommSplit`以避免创建组的额外开销。如果您想尽早了解 NCCL 初始化错误,也可以使用此字段。
如果可能,则可以使用此方法避免创建组的额外开销。如果您想尽早了解 NCCL 初始化错误,也可以使用此字段。
如果想尽早了解 NCCL 初始化错误,您也可以使用此字段。
。
.. 注意:: 要启用 ``backend == Backend.MPI``,PyTorch 需要从源代码构建
在支持 MPI 的系统上。
.. 注意:: 多个后端支持是实验性的。当前如果没有指定后端,
则将创建 ``gloo`` 和 ``nccl`` 后端。``gloo`` 后端
将用于具有 CPU 张量的集体,而 ``nccl`` 后端将被使用
用于具有 CUDA 张量的集体。可以通过传递一个字符串来指定自定义后端,格式为 ":,:",例如
"cpu:gloo,cuda:custom_backend"。
"cpu:gloo,cuda:custom_backend"。
"""
全局
_世界
全局
后端
全局
默认 PostgreSQL 初始化方法
如果
组成员.WORLD is
不是
无:
抛出
值错误(
尝试两次初始化默认进程组!)
从 justknobs 设置 PyTorch 分布式环境()
# 在导入顺序中,某些 trace_rules 函数可能会被评估
# 在导入阶段,这些函数可能不会正确地
# 添加与分布式相关的规则,因为存在导入循环依赖。
我们需要在运行时清除 lru_cache 以确保正确性
这些 trace_rules 的数量。
#
# 由于此 API 必须在编译所有分布式代码之前调用,
清除这里的缓存应该是安全的。
如果
torch._dynamo
在
系统模块.
模块:
火炬._dynamo.
跟踪规则.
清除 lru 缓存()
断言 (
存储 is
无)
或者 (
初始化方法 is
无), (
"不能同时指定初始化方法和存储。"
)
如果
存储 is
不是
无:
断言
世界大小 > 0,
"使用 store 时,world_size 必须为正数"
断言
排名
≥ 0,
"使用 store 时,rank 必须为非负数"
如果...否则
初始化方法 is
无:
初始化方法 = "env://"
# 如果用户没有提供后端字符串,但提供了设备 ID,例如
# >>> 初始化进程组(device_id=device)
# 我们尝试根据设备类型确定后端名称。
如果
后端 is
无
和
设备 ID is
不是
无:
# 第三方设备可以通过以下默认后端进行注册。
# 默认映射如下。
后端 =
后端.
默认设备后端映射.
获取(
设备 ID.
类型)
如果我们仍然无法弄清楚,例如
>>> 初始化进程组()
我们将其设置为 `undefined` 并依赖懒加载初始化。
如果
后端 is
无:
后端 =
"未定义"
# 转换字符串为 `Backend` 类型
后端 =
后端(
后端)
如果
超时 is
无:
超时 =
_获取默认超时(
后端)
_检查有效超时(
超时时间)
"""
用户除非访问 c10d 内部,否则无法看到组名。
这意味着我们可以忽略他们提供的值,因为这些值没有以公开的方式暴露。
因为它们没有以公开的方式暴露,所以我们忽略他们提供的值。
"""
group_name = _process_group_name([], 使用哈希名称=
错误)
如果
后端 ==
后端.MPI:
如果
世界大小 != -1
或者
排名 != -1:
警告.
警告(
f对于 MPI 后端,world_size({
世界大小}
) 和排名 ({
排名}
)
"被忽略,因为它们是由 "
"MPI 运行时分配的。"
)
默认页面, _ =
新流程组助手(
-1,
-1,
[]
后端,
存储(),
占位符值,因为存储不能为空
群组名称,
超时时间=
超时时间,
群组描述=
默认 pg,
)
_更新默认数据库(
默认页面)
else:
兼容旧版 API
如果
存储 is
无:
集合迭代器 =
预约(
非空(
初始化方法),
排名,
世界大小,
超时时间=
超时
)
店铺,
排名,
世界大小 =
下一(
集合迭代器)
店铺.
设置超时(
超时时间)
使用 PrefixStore 以避免意外覆盖由键
多租户存储的情况下,不同的系统(例如 RPC)。
存储 =
前缀存储(
默认 pg,
店铺)
默认页面, _ =
新流程组助手(
世界大小,
排名,
[]
后端,
店铺,
群组名称,
后端选项=
PG 选项,
超时时间=
超时时间,
设备 ID=
设备 ID,
群组描述=
默认 pg,
)
_更新默认数据库(
默认页面)
_世界.pg_group_ranks[
组成员.
世界] = {
忽略索引
i: i
for i 在
范围(
组成员.
世界.
大小())
# 类型:忽略[已定义]
}
后端 =
_世界.pg_map[
非空(
组成员.
世界)][0]
默认 PostgreSQL 初始化方法 =
初始化方法
旧钩子 =
系统模块.
异常处理钩子
异常钩子前缀 = f
"[排名"{
获取排名()}]"
定义
_分布式异常处理钩子(*
参数):
旧的 stderr =
系统模块.
标准错误
系统模块.
标准错误 =
缓冲区 =
输入/输出.StringIO()
尝试:
旧钩子(*
参数)
最后:
系统模块.
标准错误 =
旧的 stderr
msg = 缓冲区.
获取值()
msg = "输入文本翻译为简体中文为:\n".
连接(
f"{异常处理钩子前缀}: {s}"
如果 s !=
请提供需要翻译的文本
否则
请提供需要翻译的文本 for s
在
信息.
分割("
输入文本翻译为简体中文为:\n")
)
系统模块.
标准错误输出.
写(
信息)
系统模块.
标准错误输出.
清空()
系统模块.
异常处理钩子 =
分布式异常处理钩子
如果 _is_barrier_after_init() == 1:
# 防止在从该方法返回时出现问题的障碍
处理包括全局变量(如有)在内的进程组
正确地在所有等级上。
# 更新 2023 年 4 月:对于大规模运行,这个障碍(尤其是基于存储的)
# 障碍可能代价高昂且/或无法扩展。此外,在许多情况下,
这些障碍可能是不必要的,因为绿色 CI 已经证明
移除后。已添加环境变量 `TORCH_DIST_INIT_BARRIER`,当设置为 1 时才启用此障碍。
仅在初始化 ProcessGroup 后执行屏障。
日志记录器.
调试(
由于 "Performing barrier after ProcessGroup initialization since "
"TORCH_DIST_INIT_BARRIER = 1"
)
如果
后端 ==
后端.MPI:
# MPI 后端不使用 store。
障碍()
else:
# 由于 barrier()使用了多个默认设备,这里使用基于 store 的 barrier()。
# 默认设备会导致 NCCL 内部状态混乱。
基于商店的屏障(
排名,
店铺,
群组名称,
世界大小,
超时时间)
定义
获取分割源(pg):
分割从 =
无
如果 pg.
绑定设备 ID:
分割从 = pg.
获取后端(pg.
绑定设备 ID)
如果...否则 pg is
_世界.
默认页面:
尝试:
分割从 = pg.
获取后端(
火炬.
设备(
cuda))
除了
运行时错误:
与此后端无关联的 CUDA 设备
通过
如果
不是
分割从
或者
不是
分割从.
支持分割:
返回
无
如有必要,通过剥皮工艺找到可分割的后端
将我们可能包装的过程组中的包装器分组
当
_GLOO 可用
和 isinstance(
分割从,
流程组包装器):
分割从 =
分割从.
包裹的页面
返回
分割从
定义
新流程组助手(
组大小,
群组排名,
全局组内排名,
后端,
店铺,
群组名称,
后端选项=
无,
超时时间=
无,
pg_tag=无,
设备 ID=
无,
群组描述=
无,
):
"""
创建一个新的分布式进程组。
此函数必须由全局组中的所有进程调用,即使
调用进程不属于新创建的组。在这种情况下,
该函数返回 GroupMember.NON_GROUP_MEMBER。
当默认组中的 global_ranks_in_group 为空列表时,将调用此函数。
"""
全局
_世界
如果 group_name
在
_世界.
数据库名称.
值():
抛出
值错误(
"指定的群组名称已存在,请使用不同的群组名称"
"已创建,请使用不同的群组名称"
)
如果
设备 ID is
不是
无
和 (
设备 ID.
索引 is
无
或者
设备 ID.
类型 == "cpu"):
抛出
值错误(
"init_process_group 设备 ID 参数必须是一个具有索引的加速器"
)
# 注意:_new_process_group_helper 只从 init_process_group 调用,该函数始终提供超时值
_检查有效超时(
超时时间)
如果 pg_tag
不是
在 [
无,
输入文本翻译为简体中文为:""
]
使用相同的标签和排名设置将产生相同的底层 PG
已存在的组 = _find_pg_by_ranks_and_tag(pg_tag,
全局组内排名)
如果
现有组:
_, 前缀存储 =
_世界.pg_map[
现有组]
返回
现有组,
前缀存储
群组描述 =
"未定义"
如果
群组描述 is
无
否则
群组描述
如果我们正在创建默认组,则组等级列表为空。
是否为默认组 =
长度(
全局组内排名) == 0
# nccl 及可能的其他后端允许创建
基于现有沟通者,可节省
初始化时间。由于惰性初始化,
在某些后端中,我们必须要小心,只有当我们知道默认的 PG 已经开始了通信者初始化时,我们才进行拆分。
我们知道这一点,如果我们已经将设备 ID 绑定到了默认的 pg(急切初始化)。
我们知道这一点,如果我们已经将设备 ID 绑定到了默认的 pg(急切初始化)。
如果
已初始化()
和
获取默认组().
绑定设备 ID:
分割从 =
获取分割源(
获取默认组())
else:
分割从 =
无
如果这是一个子组(即已指定 group_ranks),
我们检查当前进程是否是新的组成员。
如果
不是
是否为默认组:
全球排名 =
获取默认组().
排名()
如果
全球排名
不是
在
全局组内排名:
如果我们使用 `ncclCommSplit`(或类似的分割)来创建通信器,我们需要在新的组父组中的所有等级上调用 `ncclCommSplit`,即使那些不在新组中的等级也是如此。
# other APIs) 来创建通信器,我们将会在新的组父组中的所有等级上调用 `ncclCommSplit`,即使那些不在新组中的等级也是如此。
# call `ncclCommSplit` on *all* ranks in this new group's parent group, even those not in the new group. This is
# necessary to ensure that the communicator is properly initialized for all ranks.
NCCL API 的一个要求,否则我们会得到
不同步。
如果
分割从:
分割从.
执行无颜色分割(
获取默认组().
绑定设备 ID)
返回
组成员.
非群组成员,
无
前缀存储 =
前缀存储(f"{
群组名称}
“”,
店铺)
PG 的后端将在稍后根据 BackendConfig 中的内容进行设置。
并且超时设置在每个后端选项中。
pg: 进程组 =
流程组(
prefix_store,
群组排名,
组大小,
)
backend_config = 后端配置(
后端)
# 设置当传入单个后端时的默认后端。
如果
逗号
不是
在
字符串(
后端)
和 ":"
不是
在
字符串(
后端):
断言
后端
在
后端.
后端类型映射, f
"未知后端类型"{
后端}"
如果
后端 ==
后端.
未定义:
# 当前当后端为 UNDEFINED 时,同时使用 ``gloo`` 和 ``nccl`` 后端
如果有 CUDA 可用,我们将使用 nccl 或默认的 gloo
后端,这样我们就可以在 ProcessGroup 中正确调用 getDefaultBackend
如果
后端.NCCL
在
后端配置.
获取设备后端映射().
值():
pg._设置默认后端(
流程组.BackendType.NCCL)
else:
pg._设置默认后端(
流程组.BackendType.GLOO)
else:
pg._设置默认后端(
后端.
后端类型映射[
后端])
为了正确调用 pg._has_hooks(),我们应该设置默认后端
当传入多个后端时
else:
如果
后端.NCCL
在
后端配置.
设备后端映射.
值():
pg._设置默认后端(
流程组.BackendType.NCCL)
如果...否则
后端.
_插件.
键():
自定义后端 =
下一(
迭代(
后端.
_插件.
键()))
如果
自定义后端
在
后端配置.
设备后端映射.
值():
pg._设置默认后端(
流程组.BackendType.
自定义)
else:
pg._设置默认后端(
流程组.BackendType.GLOO)
如果
设备 ID:
pg.绑定设备 ID =
设备 ID
后端类:
火炬._C.
_分布式_c10d.
后端
for 设备,
后端字符串
在
后端配置.
获取设备后端映射().
项目():
在默认存储中,使用组名作为前缀
一个门店可以被多个组重复使用。
后端前缀存储 =
前缀存储(f"{
设备}
“”, prefix_store)
如果
后端字符串 ==
后端.MPI:
如果
不是
是否支持 MPI():
抛出
运行时错误(
“分布式包未内置 MPI。”
"MPI 仅在你从"
"已安装 MPI 的主机源代码构建 PyTorch 时包含。"
)
backend_class = ProcessGroupMPI.创建(
全局组内排名)
后端类型 =
流程组.BackendType.MPI
如果
不是
后端类:
返回
组成员.
非群组成员,
无
创建具有准确排名和大小的新的进程组
如果 pg.
排名() == -1
和 pg.
大小() == -1:
pg = 流程组(
后端前缀存储,
后端类.
排名(),
后端类.
大小(),
)
pg._设置默认后端(
后端类型)
如果...否则
后端字符串 ==
后端.GLOO:
# TODO: 在延迟初始化支持后删除此检查
# 如果 pg_options 不为 None:
# raise RuntimeError("GLOO 选项不受支持")
backend_class = 网格流程组 Gloo(
后端前缀存储,
群组排名,
组大小,
超时时间=
超时
)
后端类型 =
流程组.BackendType.GLOO
如果...否则
后端字符串 ==
后端.NCCL:
如果
不是
检查 NCCL 是否可用():
抛出
运行时错误(
分布式包未内置 NCCL)
如果
后端选项 is
不是
无:
断言 isinstance(
后端选项, ProcessGroupNCCL.
选项), (
"期望 backend_options 参数为 ProcessGroupNCCL.Options 类型"
)
如果
后端选项.
_超时 !=
超时时间:
警告.
警告(
"已指定 backend_options._timeout,"
"但是 timeout 参数有一个默认值,它将始终覆盖它。"
)
else:
# NCCL 的默认 backend_options
后端选项 = ProcessGroupNCCL.
选项()
后端选项.
高优先级流 =
假
后端选项.
_超时 =
超时
如果
分割从:
后端选项.
分割从 =
分割从
后端选项.
分割颜色 =
_处理组颜色(
global_ranks_in_group
)
后端选项.global_ranks_in_group = global_ranks_in_group
后端选项.group_name = group_name
backend_class = ProcessGroupNCCL(
后端前缀存储,
群组排名,
组大小,
后端选项
)
后端类型 =
流程组.BackendType.NCCL
如果...否则
后端字符串 ==
后端.UCC
和
is_ucc_available
不可用():
# TODO: 一旦 UCC 插件完全弃用,请删除
# 从上面的 elif 条件中删除 is_ucc_available()并抛出异常
如果 is_ucc_available()返回 false 则抛出 RuntimeError。
backend_class = UCC 进程组(
后端前缀存储,
群组排名,
组大小,
超时时间=
超时
)
后端类型 =
流程组.BackendType.UCC
如果...否则
后端字符串 ==
后端.XCCL:
如果
不是
是否支持 xccl():
抛出
运行时错误(
分布式包未内置 XCCL)
backend_class = ProcessGroupXCCL(
后端前缀存储,
群组排名,
组大小
)
后端类型 =
流程组.BackendType.XCCL
else:
断言
后端字符串.
上()
在
后端.
_插件, (
f未知 c10d 后端类型{
后端字符串.
上()}"
)
后端插件 =
后端.
_插件[
后端字符串.
上()]
创建者函数 =
后端插件.
创建者函数
扩展 API =
后端插件.
扩展 API
后端类型 =
流程组.BackendType.
自定义
如果
不是
扩展 API:
backend_class = 创建者函数(
后端前缀存储,
群组排名,
组大小,
超时
)
else:
分布式后端选项 =
_分布式后端选项()
dist_backend_opts.存储 =
后端前缀存储
dist_backend_opts.组排名 =
组排名
dist_backend_opts.组大小 =
组大小
dist_backend_opts.超时 =
超时
dist_backend_opts.群组 ID = group_name
dist_backend_opts.global_ranks_in_group = global_ranks_in_group
backend_class = 创建者函数(dist_backend_opts,
后端选项)
# 为 gloo 和 nccl 后端设置序列号。
如果
后端字符串 ==
后端.GLOO:
断言 isinstance(
后端类,
网格流程组 Gloo)
后端类.
为组设置序列号()
如果...否则
后端字符串 ==
后端.NCCL:
断言 isinstance(
后端类, ProcessGroupNCCL)
后端类.
为组设置序列号()
如果类型是 ProcessGroup 的子类,则立即返回此进程组
# TODO: 对于 PythonProcessGroups,这默认为旧行为,会覆盖
# ProcessGroup 实例
如果
派生类(
类型(
后端类),
流程组):
pg = backend_class # 类型:忽略[赋值]
断开
当设置 TORCH_DISTRIBUTED_DEBUG 时,为支持的 PGs 处理组包装初始化
如果 (
后端字符串
在 [
后端.GLOO,
后端.NCCL,
后端.UCC]
或者
后端字符串.
上()
在
后端.
_插件
):
# 在调试模式且 GLOO 可用时,用 PG 包装器包裹
# 启用增强的集体检查以提高调试性。
如果
获取调试级别() ==
调试等级.
详细信息:
如果
不是
_GLOO 可用:
日志记录器.
信息(
"将 TORCH_DISTRIBUTED_DEBUG 设置为 DETAIL,但
GLOO 不可用。请使用 GLOO 进行构建,
以在调试模式下创建包装进程组,
以帮助集体不同步调试。"
)
else:
backend_class = 创建进程组包装器(
包装的 PG=
后端类,
存储前缀=
群组名称,
店铺=
后端前缀存储,
排名=
群组排名,
世界大小=
组大小,
超时时间=
超时时间,
)
当所有 get_device_backend_map 值都相同时,仅注册单个后端
如果
长度(
设置(
后端配置.
获取设备后端映射().
值())) == 1:
for 设备
在
后端配置.
获取设备后端映射().
键():
pg.注册后端(
火炬.
设备(
设备),
后端类型,
后端类)
# 跳出外部循环以不创建更多后端
断开
pg.注册后端(
火炬.
设备(
设备),
后端类型,
后端类)
设置 group_name 和 group_dsec 为后端
断言 group_name is
不是
无
断言
群组描述 is
不是
无
pg.设置群组名称(
群组名称)
pg.设置群组描述(
群组描述)
如果
设备 ID
和 pg.
获取后端(
设备 ID).
支持分割:
贪婪后端 = pg.
获取后端(
设备 ID)
贪婪后端.
主动连接单个设备(
设备 ID)
更新全局状态
_世界.pg_map[pg] = (
后端, prefix_store)
_世界.
数据库名称[pg] = group_name
_注册进程组(
群组名称, pg)
_世界.
PostgreSQL 后端配置[pg] =
字符串(
后端配置)
# "" 是用户 PG 的默认标签
如果 pg_tag
在 [
无,
输入文本翻译为简体中文为:""
]
pg_tag = fptd:{
群组名称}"
_世界.
标签转 PostgreSQL.setdefault(
输入文本翻译为简体中文为:"",
空列表.append(pg)
else:
pg_tag = f"用户:"{pg_tag}"
_世界.
标签转 PostgreSQL.setdefault(pg_tag,
空列表.append(pg)
_世界.
PostgreSQL 转标签[pg] = pg_tag
返回 pg,
前缀存储
定义
销毁进程组(
组:
可选[
流程组] =
无):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
销毁指定的进程组,并反初始化分布式包。
参数:
group(进程组,可选):要销毁的进程组,如果
group.WORLD 被指定,则销毁所有进程
默认组以及其它组将被销毁
将被销毁。
"""
全局
_世界
如果
群组 ==
组成员.
非群组成员:
返回
如果
群组 is
无:
pg = 组成员.WORLD
else:
pg = 群组
断言 pg is
不是
无
如果
_世界.pg_map.
获取(pg,
无) is
无:
抛出
值错误(
无效的过程组指定)
当用户在 Python 上注册 onCompletion 钩子时,这些钩子将在主线程之外的线程上运行。
然而,ProcessGroup 析构函数会等待该线程。但是,析构函数可能在 Python 解释器退出后完成。
之后,为了获取 Python 钩子的 GIL,可能会导致崩溃。
因此,在 Python 钩子中获取 GIL 可能会导致崩溃。
我们可以运行钩子时恢复解释器,或者保持主解释器不变
# 存活直到所有工作和钩子完成。当前实现这样做。
因此,我们在此显式调用 _wait_for_pending_works() 以等待
等待挂起的钩子完成。
如果
类型(pg) ==
进程组
和 pg._has_hooks():
pg._wait_for_pending_works()
如果
群组 is
无
或者
群组 ==
组成员.
世界:
# 关闭所有后端,按照 pg 名称的顺序。按顺序关闭是因为
# 在某些版本的 NCCL 中,ncclCommAbort() 是一个 '集体' 调用。
for 停止 PostgreSQL
在
排序(
_世界.
数据库名称,
键=lambda x:
_世界.
数据库名称[x
] reverse=
真实
):
停止 PostgreSQL.
关闭()
_更新默认数据库(
无)
_世界.pg_map.
清晰()
_世界.
数据库名称.
清晰()
_世界.pg_group_ranks.
清晰()
_世界.
PostgreSQL 后端配置.
清晰()
_世界.
PostgreSQL 转标签.
清晰()
_世界.
标签转 PostgreSQL.
清晰()
_世界.
pg_coalesce_state_合并状态.
清晰()
_注销所有进程组()
当进程组没有显式名称(只有 WORLD(默认))
进程组可以有一个显式名称),我们使用全局 _world.group_count
# 生成名称。在销毁时我们需要重置计数器。
允许在重新创建进程时生成一致的价值
# 一些训练器从故障中恢复后,我们才重置组
#
# 只有在 WORLD 被销毁时才重置,因为如果我们在 WORLD
处理组状态良好,我们目前没有处理故障。
_世界.group_count = 0
else:
pg.关闭()
删除
_世界.pg_map[pg]
删除
_世界.
数据库名称[pg]
删除
_世界.pg_group_ranks[pg]
删除
_世界.
PostgreSQL 后端配置[pg]
如果 pg
在
_世界.
pg_coalesce_state_合并状态.
键():
警告.
警告(
"一些合并的集体尚未启动,当 "
"进程组被销毁。它们将被清理。"
)
删除
_世界.
pg_coalesce_state_合并状态[pg]
标签 =
_世界.
PostgreSQL 转标签.
获取(pg)
删除
_世界.
PostgreSQL 转标签[pg]
如果
标签 is
不是
无:
尝试:
_世界.
标签转 PostgreSQL[
标签].
删除(pg)
如果
标签.
以...开头("ptd:"):
_世界.
标签转 PostgreSQL[
输入文本翻译为简体中文为:""].
删除(pg)
除了
异常:
通过
_注销进程组(pg.
群组名称)
定义 _abort_process_group(
组:
可选[
流程组] =
无):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
终止指定的进程组。如果给定 group.WORLD(即`None`),则包括默认进程组在内的所有进程组都将被终止。
终止包括默认进程组在内的所有进程组。
参数:
要终止的进程组(ProcessGroup,可选):
.. 注意:此 API 为实验性 API,目前仅与 NCCL 后端兼容
后端。
.. 注意:应使用 `TORCH_NCCL_ASYNC_ERROR_HANDLING` 使用此 API
关闭(即设置为 0)。否则,ProcessGroupNCCL 的看门狗可能会
自动为您处理错误或超时,包括终止进程组。
。
"""
全局
_世界
如果
群组 ==
组成员.
非群组成员:
返回
pg = 群组
或者
组成员.WORLD
断言 pg is
不是
无
如果
_世界.pg_map.
获取(pg,
无) is
无:
抛出
值错误(
"指定的进程组无效或已被销毁。")
尝试:
后端 = pg.
获取后端(
火炬.
设备(
cuda))
除了
运行时错误:
后端 =
无
如果
群组 is
无
或者
群组 ==
组成员.
世界:
在 ncclGroupStart|End 语义中中止所有后端。
这确保了不同的 NCCL 通信器的中止调用不会
互相死锁。
https://github.com/pytorch/pytorch/issues/119797
如果
检查 NCCL 是否可用()
和 isinstance(
后端, ProcessGroupNCCL):
后端._group_start()
for pg_to_abort 在
排序(
_世界.
数据库名称,
键=lambda x:
_世界.
数据库名称[x
] reverse=
真实
):
pg_to_abort.终止()
如果
检查 NCCL 是否可用()
和 isinstance(
后端, ProcessGroupNCCL):
后端._group_end()
_更新默认数据库(
无)
_世界.pg_map.
清晰()
_世界.
数据库名称.
清晰()
_世界.pg_group_ranks.
清晰()
_世界.
PostgreSQL 后端配置.
清晰()
_世界.
PostgreSQL 转标签.
清晰()
_世界.
标签转 PostgreSQL.
清晰()
_世界.
pg_coalesce_state_合并状态.
清晰()
_注销所有进程组()
当进程组没有显式名称(只有 WORLD(默认))
进程组可以有一个显式名称),我们使用全局 _world.group_count
# 生成名称。在销毁时我们需要重置计数器。
允许在重新创建进程时生成一致的价值
# 一些训练器从故障中恢复后,我们才重置组
#
# 只有在 WORLD 被销毁时才重置,因为如果我们在 WORLD
处理组状态良好,我们目前没有处理故障。
_世界.group_count = 0
else:
pg.终止()
删除
_世界.pg_map[pg]
删除
_世界.
数据库名称[pg]
删除
_世界.pg_group_ranks[pg]
删除
_世界.
PostgreSQL 后端配置[pg]
如果 pg
在
_世界.
pg_coalesce_state_合并状态.
键():
警告.
警告(
"一些合并的集体尚未启动,当 "
"进程组被中断。它们将被清理。"
)
删除
_世界.
pg_coalesce_state_合并状态[pg]
标签 =
_世界.
PostgreSQL 转标签.
获取(pg)
删除
_世界.
PostgreSQL 转标签[pg]
如果
标签 is
不是
无:
尝试:
_世界.
标签转 PostgreSQL[
标签].
删除(pg)
如果
标签.
以...开头("ptd:"):
_世界.
标签转 PostgreSQL[
输入文本翻译为简体中文为:""].
删除(pg)
除了
异常:
通过
_注销进程组(pg.
群组名称)
[文档]def get_rank(group: Optional[ProcessGroup] = None) -> int:
"""
返回当前进程在提供的 ``group`` 中的排名,默认情况下返回。
每个进程在分布式系统中都有一个唯一的标识符,称为 Rank。
它们总是连续的整数,范围从 0 到 world_size。
Args:
参数:
group(进程组,可选):要工作的进程组。如果为 None,
则使用默认进程组。
返回:
进程组的排名
不是该组成员时,返回-1
"""
如果 _rank_not_in_group(group):
返回-1
default_pg = _get_default_group()
if group is None or group is GroupMember.WORLD:
return default_pg.rank()
return get_group_rank(group, default_pg.rank())
[文档]def get_world_size(group: Optional[ProcessGroup] = None) -> int:
"""
返回当前进程组中的进程数量。
参数:
group (ProcessGroup, 可选): 要工作的进程组。如果为 None,
默认将使用进程组。
返回:
进程组的世界大小
不是该组成员时,返回-1
"""
如果 _rank_not_in_group(group):
返回-1
返回 _get_group_size(group)
[文档]def isend(
tensor: torch.Tensor,
dst: Optional[int] = None,
group: 可选[ProcessGroup] = None,
tag: int = 0,
group_dst: 可选[int] = None,
) -> 可选[Work]:
"""
异步发送张量。
.. 警告::
在请求完成之前修改 ``tensor`` 将导致未定义的行为。
行为。
.. 警告::
``tag`` 在 NCCL 后端中不受支持。
与 send 不同,send 是阻塞的,而 isend 允许 src == dst rank,即发送到自身。
Args:
tensor (Tensor): 要发送的张量。
dst (int): 在全局进程组中的目标 rank(无论``group``参数如何)。
group (ProcessGroup, optional): 要工作的进程组。如果为 None,
默认将使用进程组。
标签(int,可选):用于匹配发送与远程接收的标签
group_dst(int,可选):在 ``group`` 上的目标 rank。不能同时指定 ``dst`` 和 ``group_dst``
返回值:
分布式请求对象。
如果不是组的一部分,则为空。
"""
group = _group_or_default_group(group)
group_dst = _canonicalize_group_rank(group, dst, group_dst)
_check_single_tensor(tensor, "tensor")
if _rank_not_in_group(group):
_warn_not_in_group("isend")
返回 None
如果 tensor.is_complex():
tensor = torch.view_as_real(tensor)
return group.send([tensor], group_dst, tag)
[文档]def irecv(
tensor: torch.Tensor,
src: Optional[int] = None,
group: Optional[ProcessGroup] = None,
tag: 整数 = 0,
group_src: 可选整数 = None,
) -> 可选[工作]:
"""
异步接收张量。
.. 警告::
NCCL 后端不支持 ``tag``。
与 recv 不同,recv 是阻塞的,irecv 允许 src == dst rank,即从自身接收。
Args:
tensor (Tensor): 要填充接收到的数据的张量。
src (int, 可选): 在全局进程组上的源排名(无论 ``group`` 参数如何)。
如果未指定,将从任何进程接收。
group(进程组,可选):要工作的进程组。如果为 None,则使用默认进程组。
默认进程组将被使用。
tag(整数,可选):用于匹配接收与远程发送的标签
group_src(整数,可选):在``group``上的目标 rank。不能同时指定``src``和``group_src``。
返回值:
分布式请求对象。
无,如果不是组的一部分。
"""
_检查单个张量(tensor, "tensor")
如果_rank_not_in_group(group)
_警告不在组("irecv")
返回 None
if tensor.is_complex():
tensor = torch.view_as_real(tensor)
group = _group_or_default_group(group)
if src is None and group_src is None:
return group.recv_anysource([tensor], tag)
else:
group_src = _canonicalize_group_rank(group, src, group_src)
return group.recv([tensor], group_src, tag)
[文档]@_异常记录器
def 发送(
tensor: torch.Tensor,
dst: Optional[int] = None,
group: 可选[ProcessGroup] = None,
tag: int = 0,
group_dst: 可选[int] = None,
) -> None:
"""
同步发送张量。
.. 警告::
``tag`` 在 NCCL 后端中不受支持。
Args:
tensor (Tensor): 要发送的张量。
dst (int): 全局进程组中的目标 rank(无论``group``参数如何)。
目标 rank 不应与当前进程的 rank 相同。
group(进程组,可选):要工作的进程组。如果为 None,则使用默认进程组。
默认进程组将被使用。
tag(整数,可选):匹配远程接收的标签。
group_dst(整数,可选):在``group``上的目标 rank。同时指定``dst``和``group_dst``是无效的。
"""
group = _group_or_default_group(group)
group_dst = _canonicalize_group_rank(group, dst, group_dst)
_check_not_self_rank(group, group_dst, "destination")
work = isend(tensor, group=group, tag=tag, group_dst=group_dst)
if work is not None:
work.wait()
[文档]@_exception_logger
def recv(
tensor: torch.Tensor,
src: 可选[int] = None,
group: 可选[ProcessGroup] = None,
tag: 整数 = 0,
group_src: 可选[int] = None,
) -> 整数:
"""
同步接收张量。
.. 警告::
NCCL 后端不支持 ``tag``。
参数:
张量(Tensor):用于填充接收到的数据的张量。
src(int,可选):全局进程组上的源排名(无论是否指定 group 参数)。
如果未指定,将从任何进程接收。
group(ProcessGroup,可选):要工作的进程组。如果为 None,
默认将使用进程组。
标签(int,可选):用于匹配接收与远程发送的标签。
group_src(int,可选):在 ``group`` 上的目标 rank。不能同时指定 ``src`` 和 ``group_src``。
返回值:
发送者排名
-1,如果不在组内
"""
work = irecv(tensor, src=src, group=group, tag=tag, group_src=group_src)
如果工作为空:
返回-1
work.wait()
如果 src 为空:
如果 group_src 为 None:
group_src = work._source_rank()
group = _group_or_default_group(group)
_check_not_self_rank(group, group_src, "source")
src = 获取全局排名(group, group_src)
返回 src
类
非法工作(
工作):
定义 __getattribute__(
自我,
名称):
如果
名称
在 [
is_success,
"异常",
等待,
"源排名",
"_源排名",
"结果",
"同步",
]
抛出
值错误(f
非法调用{
名称}
在 IllegalWork 对象上)
类
合并管理器:
定义 __init__(
自我)
翻译
无:
自我.
工作:
列表[
工作] =
输入文本为空,请提供需要翻译的文本
定义 append(
自我,
工作:
工作):
如果
工作:
自我.
工作.append(
工作)
定义
等待(
自我):
for 工作
在
自我.
工作:
工作.
等待()
@contextlib.contextmanager
定义
合并管理器(
组:
可选[
流程组] =
无,
设备:
可选[
火炬.
设备] =
无,
异步操作:
可选[
布尔] =
错误,
):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
尝试合并集体或 P2P 操作时使用的上下文管理器。
参数:
组(`ProcessGroup`,可选):要工作的进程组。如果为 None,则
则使用默认进程组。
设备(`torch.device`,可选):默认为 None,如果后端没有`**_coalesced`实现,则设置到设备上。
没有后端提供的`**_coalesced`实现。
async_ops (`bool`, 可选): 是否合并操作为异步操作。
示例:
>>> # xdoctest: +SKIP("no rank")
>>> # 同步操作
>>> with _coalescing_manager():
>>> for i in range(num_colls):
>>> dist.all_reduce(tensors[i])
>>> # 异步操作
>>> with _coalescing_manager(async_ops=True) as cm:
>>> for i in range(num_colls):
>>> dist.all_reduce(tensors[i])
>>> cm.wait()
.. 警告::
func:`_coalescing_manager` 目前不支持合并
使用不同归约算子的 all-reduce,例如 `ReduceOp.SUM` 混合
使用 `ReduceOp.PRODUCT`。
"""
群组 =
群组
或者
获取默认组()
操作列表 =
_世界.
pg_coalesce_state_合并状态.setdefault(
组,
[]
如果
操作列表:
抛出
值错误(
"合并操作开始时,ProcessGroup 具有非空的操作列表"
)
如果
设备:
组.
开始合并(
设备)
厘米 =
合并管理器()
产生
厘米
操作列表 =
_世界.
pg_coalesce_state_合并状态.
流行(
组)
如果
操作列表:
支持快速路径合并的集体被捕获。
请参阅相应集体 API 中的实现。
当前支持:
# - 合并 `all_reduce`
# - 合并 `all_gather_into_tensor`
# - 合并 `reduce_scatter_tensor`
op0 = 操作列表[0].
操作符
如果 op0 ==
全量归约:
张量 = [
操作.
张量 for
操作符
在
操作列表]
全局减少选项 =
Allreduce 合并选项()
all_reduce_opts.减少操作 =
非空(
操作列表[0].
红操)
工作 =
组.allreduce_coalesced(
张量, all_reduce_opts)
如果...否则 op0 == all_gather_into_tensor:
输入 =
输入文本为空,请提供需要翻译的文本
输出 =
输入文本为空,请提供需要翻译的文本
for 操作符
在
操作列表:
输入.append(
操作.
张量)
输出.append(
非空(
操作.
目标张量))
工作 =
组.allgather_into_tensor_coalesced(
输出,
输入)
如果...否则 op0 == reduce_scatter_tensor:
输入 =
输入文本为空,请提供需要翻译的文本
输出 =
输入文本为空,请提供需要翻译的文本
for 操作符
在
操作列表:
输入.append(
操作.
张量)
输出.append(
非空(
操作.
目标张量))
reduce_opts = 减少散布选项()
简化选项.
减少操作 =
非空(
操作列表[0].
红操)
工作 =
组.
简化散列张量(
输出,
输入,
简化选项)
else:
抛出
断言错误(
f合并管理器不支持快速路径合并{op0}
,"
fyet{op0}
仍然记录在操作列表中。这是 c10d 的内部错误。
)
如果
设备:
# 旧式允许每个 coll 在上下文管理器中通过 Python 绑定调用 C++对应部分
工作 =
组.
结合并发(
设备)
如果
异步操作:
cm.append(工作) # type: ignore[possibly-undefined]
else:
工作.
等待() # type: ignore[possibly-undefined]
[文档]
定义
批量异步发送或接收张量并返回请求列表(p2p_op_list:
列表[
P2P 流])
翻译
列表[
工作
]
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
处理 `p2p_op_list` 中的每个操作并返回相应的请求。目前支持 NCCL、Gloo 和 UCC 后端。
处理 `p2p_op_list` 中的每个操作并返回相应的请求。
目前支持 NCCL、Gloo 和 UCC 后端。
参数:
p2p_op_list: 点对点操作列表(每个操作员的类型为)
``torch.distributed.P2POp``). 列表中 isend/irecv 的顺序很重要,需要与远程端的对应 isend/irecv 相匹配。
matters and it needs to match with corresponding isend/irecv on the
remote end.
返回:
调用相应方法返回的分布式请求对象列表
op 在 op_list 中。
示例:
>>> # xdoctest: +SKIP("no rank")
>>> send_tensor = torch.arange(2, 数据类型=torch.float32) + 2 * 级别
>>> recv_tensor = torch.randn(2, 数据类型=torch.float32)
>>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1) % world_size)
>>> recv_op = dist.P2POp(
... dist.irecv, recv_tensor, (rank - 1 + world_size) % world_size
... )
>>> reqs = batch_isend_irecv([send_op, recv_op])
>>> for req in reqs:
>>> req.wait()
>>> recv_tensor
>>> tensor([2, 3]) # Rank 0
tensor([0, 1]) # Rank 1
.. note:: 注意,当使用 NCCL PG 后端调用此 API 时,用户必须使用 `torch.cuda.set_device` 设置当前 GPU 设备,否则将
lead to unexpected hang issues.
导致意外的挂起问题。
此外,如果这是该 API 在“group”中的第一次集体调用
传递给 `dist.P2POp`,`group` 的所有等级都必须参与
此 API 调用;否则,行为未定义。如果此 API 调用是
不是该“组”第一次集体呼吁,批量 P2P 操作
仅允许使用“group”的子集等级。
"""
_检查 P2P 操作列表(p2p_op_list)
群组 = p2p_op_list[0].
群组
如果
群组 is
无:
群组 =
获取默认组()
设备 = p2p_op_list[0].
张量.
设备
定义
同义词参数(
操作:
P2P 流)
翻译
字典[
字符串, int
]
key = "组目标"
如果
操作.
操作符 ==
发送
否则
"组源"
返回 {
键:
操作.
小组对等}
如果
类型(
组) ==
进程组
和
组.
获取后端(
设备).
支持合并:
# NCCL 风格合并
与
合并管理器(
组,
设备,
异步操作=
是)
是 cm:
for p2p 操作
在 p2p_op_list:
p2p 操作.
操作(
p2p 操作.
张量,
组=
p2p 操作.
组,
标签=
p2p 操作.
标签,
**同义词参数(
p2p 操作),
)
返回 cm.
工作
else:
# 后端不支持合并
需求 =
输入文本为空,请提供需要翻译的文本
for p2p 操作
在 p2p_op_list:
工作 =
p2p 操作.
操作(
p2p 操作.
张量,
组=
p2p 操作.
组,
标签=
p2p 操作.
标签,
**同义词参数(
p2p 操作),
)
如果
工作:
需求.append(
工作)
返回
需求
[文档]@_异常记录器
def 广播(
tensor: torch.Tensor,
src: 可选[int] = None,
group: 可选[进程组] = None,
async_op: 布尔 = False,
group_src: 可选[int] = None,
):
"""
向整个组广播张量。
所有进程中的 ``tensor`` 必须具有相同数量的元素
参与集体
Args:
tensor (Tensor): 如果 ``src`` 是当前进程的 rank,则发送的数据
处理,并保存接收到的数据所使用的张量。
src (int):全局进程组上的源排名(无论``group``参数如何)。
group (ProcessGroup,可选):要工作的进程组。如果为 None,
则使用默认进程组。
async_op (bool, 可选): 是否将此操作设置为异步操作
group_src (int): 在 ``group`` 上的源排名。必须指定 ``group_src`` 和 ``src`` 中的一个,但不能同时指定两个。
并且 ``src`` 但不能同时指定两个。
返回值:
异步工作处理,如果 async_op 设置为 True。
如果不是 async_op 或者不是组的一部分,则为 None。
"""
group = _group_or_default_group(group)
group_src = _canonicalize_group_rank(group, src, group_src, return_global=False)
_check_single_tensor(tensor, "tensor")
if _rank_not_in_group(group):
_warn_not_in_group("broadcast")
返回
opts = 广播选项()
opts.rootRank = 群组源
opts.rootTensor = 0
opts.asyncOp = 异步操作
work = 群组广播([tensor],opts)
if 异步操作:
返回 work
else:
work.wait()
[文档]@_exception_logger
定义
全量归约(
张量,
操作=ReduceOp.SUM,
组=
无, async_op=
错误):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
在所有机器上以相同的方式减少张量数据,使所有机器都获得最终结果。
调用 `tensor` 之后,所有进程中的 `tensor` 将是位运算上完全相同的。
支持复杂张量。
参数:
tensor(张量):集体的输入和输出。该函数
在原地操作。
op(可选):从以下值中选择
``torch.distributed.ReduceOp``
枚举。指定用于逐元素减少的操作。
group(ProcessGroup,可选):要工作的进程组。如果为 None,
则使用默认进程组。
async_op (bool, 可选): 是否将此操作设置为异步操作
返回:
异步工作处理,如果 async_op 设置为 True。
如果不是 async_op 或不是组的一部分,则为 None。
示例:
>>> # xdoctest: +SKIP("no rank")
>>> # 以下所有张量均为 torch.int64 类型。
>>> # 我们有 2 个进程组,2 个 rank。
>>> device = torch.device(f"cuda:{rank}")
>>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4, 6], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1
>>> # 所有下面的张量都是 torch.cfloat 类型。
>>> # 我们有 2 个进程组,2 个 rank。
>>> tensor = torch.tensor(
... [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device
... ) + 2 * rank * (1 + 1j)
>>> tensor
tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0
tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1
"""
Dynamo 内置逻辑将旧版分布式操作映射到功能集体。
让我们重定向到一个可以模拟此逻辑的 torch 函数模式,该模式在 Dynamo 之外。
(例如,非严格导出实现了这样的 torch 函数模式)。
相关参数 = (
张量,)
如果
有 torch 功能(relevant_args):
返回 handle_torch_function(
全量归约,
relevant_args,
张量,
操作=
操作,
组=
组,
async_op=async_op,
)
_检查单个张量(
张量,
"张量")
如果 _rank_not_in_group(
组):
_warn_not_in_group(全局归约)
返回
如果
张量.
是复杂的():
如果
不是
支持复数(
操作):
抛出
值错误(f
"all_reduce 不支持"{
操作}
在复数张量上)
张量 =
火炬.
真实查看(
张量)
选项 =
Allreduce 选项()
选项.
减少操作 =
操作符
如果
群组 is
无:
群组 =
获取默认组()
如果
群组
在
_世界.
pg_coalesce_state_合并状态.
键():
我们处于合并上下文,不要执行单个操作,只需追加集体表示
合并 =
_集合操作(
全量归约,
张量,
无,
操作,
无)
_世界.
pg_coalesce_state_合并状态[
组].append(
集合)
如果 async_op:
返回
非法工作()
else:
返回
无
工作 =
组.allreduce
[
张量
]
选项)
如果 async_op:
返回
工作
else:
工作.
等待()
@_exception_logger
@deprecated(
`torch.distributed.all_reduce_coalesced` 将会被弃用。如果必须
使用它,请稍后重新查看我们的文档
https://pytorch.org/docs/main/分布式.html#集体函数,
分类=
未来警告,
)
定义
全局归约合并(
张量,
操作=ReduceOp.SUM,
组=
无, async_op=
错误):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
警告:目前尚未在节点间实现单个形状检查。
例如,如果 0 阶节点传递[torch.rand(4), torch.rand(2)]和
这种缺乏形状检查导致性能显著提升,但此功能的使用者需要注意。
这种缺乏形状检查导致性能显著提升,但此功能的使用者需要注意。
这种缺乏形状检查导致性能显著提升,但此功能的使用者需要注意。
函数应格外小心,确保每个节点传递的
张量在节点之间形状匹配。
在所有机器上对同一设备上的张量(tensors)进行归约
以便所有节点都得到最终结果。
电话结束后,所有进程中的张量都将进行位运算相同
。
支持复杂张量。
参数:
张量(Union[List[Tensor], Tensor]):集体的输入和输出。
函数在原地操作。
op (可选[ReduceOp]):来自以下值之一
``torch.distributed.ReduceOp`` 枚举。指定用于
元素级归约的操作。
group(ProcessGroup,可选):要工作的进程组。如果为 None,
则使用默认进程组。
async_op(可选[bool]):此操作是否应为异步操作。
返回:
异步工作处理,如果 async_op 设置为 True。
如果不是 async_op 或不是组的一部分,则为 None。
"""
如果 isinstance(
张量,
火炬.
张量):
张量 = [
张量]
`_check_tensor_list`(
张量,
"张量")
确保所有张量具有相同的数据类型(
张量)
如果 _rank_not_in_group(
组):
_warn_not_in_group("all_reduce_coalesced")
返回
如果
任何(t.
是复杂的() for t
在
张量)
和
不是
支持复数(
操作):
抛出
值错误(f
"all_reduce 不支持"{
操作}
在复数张量上)
张量 = [t
如果
不是 t.
是复杂的()
否则
火炬.
真实查看(t) for t
在
张量]
选项 =
Allreduce 合并选项()
选项.
减少操作 =
操作符
群组 =
群组
或者
获取默认组()
工作 =
组.allreduce_coalesced(
张量,
选项)
如果 async_op:
返回
工作.
获取未来()
else:
工作.
等待()
[文档]@_异常记录器
def 减少(
tensor: torch.Tensor,
dst: 可选[int] = None,
op=ReduceOp.SUM,
group: 可选[ProcessGroup] = None,
async_op: 布尔型 = False,
group_dst: 可选[int] = None,
):
"""
在所有机器上减少张量数据。
只有具有 ``dst`` 等级的进程将接收最终结果。
参数:
tensor (Tensor): 集合的输入和输出。该函数
在原地操作。
dst (int): 全局进程组上的目标排名(无论 ``group`` 参数如何)
op (可选): ``torch.distributed.ReduceOp`` 中的一个值
``torch.distributed.ReduceOp``
枚举。指定用于逐元素减少的操作。
group(ProcessGroup,可选):要工作的进程组。如果为 None,
则将使用默认进程组。
async_op(bool,可选):此操作是否应为异步操作
group_dst (int): 目标 ``group`` 的排名。必须指定 ``group_dst`` 或 ``dst`` 中的一个,但不能同时指定。
和 ``dst`` 但不能同时指定。
返回:
如果 async_op 设置为 True,则返回异步工作句柄。
如果不是异步操作或不是组的一部分,则为空
"""
group = _group_or_default_group(group)
group_dst = _canonicalize_group_rank(group, dst, group_dst, return_global=False)
_检查单个张量(tensor, "tensor")
如果_rank_not_in_group(group):
_警告不在组("reduce")中
返回
opts = ReduceOptions()
opts.reduceOp = op
opts.rootRank = group_dst
work = group.reduce([tensor], opts)
if async_op:
返回工作
else:
work.wait()
定义 _object_to_tensor(
对象,
设备,
组):
与
等待计数器("pytorch.wait_counter.c10d._object_to_tensor").
守护者():
f = 输入/输出.BytesIO()
pickler(f).
导出(
对象)
字节存储 =
火炬.
字节存储.
从缓冲区读取(f.
获取值())
# 类型:忽略[已定义]
# 不要用 torch.tensor 指定 dtype 来替换 `torch.ByteTensor` 或 `torch.LongTensor`。
# 否则会导致速度降低 100 倍。
# 查看:https://github.com/pytorch/pytorch/issues/65696
字节张量 =
火炬.ByteTensor(
字节存储).
到(
设备)
如果
获取调试级别() ==
调试等级.
详细信息
和
检查 NCCL 是否可用():
后端 =
获取后端(
组)
如果
后端 ==
后端.NCCL:
哈希 =
火炬._C.
_分布式_c10d._hash_tensors
[
字节张量])
日志记录器.
警告(
"_object_to_tensor 大小:"%s
哈希值:%s",
字节张量.
元素数量(),
哈希,
)
local_size = 火炬.LongTensor
[
字节张量.
元素数量()]).
到(
设备)
返回
字节张量, local_size
定义 _tensor_to_object(
张量,
张量大小,
组):
与
等待计数器("pytorch.wait_counter.c10d._tensor_to_object").
守护者():
如果
获取调试级别() ==
调试等级.
详细信息
和
检查 NCCL 是否可用():
后端 =
获取后端(
组)
如果
后端 ==
后端.NCCL:
哈希 =
火炬._C.
_分布式_c10d._hash_tensors
[
张量])
日志记录器.
警告(
"_tensor_to_object 大小:"%s
哈希值:%s",
张量.
元素数量(),
哈希
)
张量 =
张量.cpu()
缓冲区 =
张量.numpy().tobytes()[:
张量大小]
返回
_反序列化器(
输入/输出.BytesIO(
缓冲区)).
加载()
[文档]@_exception_logger
定义
全局收集对象(
对象列表,
对象,
组=
无):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
从整个组中收集可序列化的对象到列表中。
与 :func:`all_gather` 类似,但可以传递 Python 对象。
请注意,对象必须可序列化才能被收集。
参数:
object_list(列表[任何类型]):输出列表。它的大小应该正确。
组的大小为此集体并将包含输出。
obj(任何类型):从当前进程广播的可选择 Python 对象。
group(ProcessGroup,可选):要工作的进程组。如果为 None,
默认将使用默认进程组。默认为 ``None``。
返回:
``None``。如果调用方排名属于该组,则集体输出将被填充到输入 ``object_list`` 中。如果调用方排名不属于该组,则传入的 ``object_list`` 将
被填充到输入 ``object_list`` 中。如果调用方排名不属于该组,则传入的 ``object_list`` 将
被填充到输入 ``object_list`` 中。如果调用方排名不属于该组,则传入的 ``object_list`` 将
保持不变。
注意,此 API 与 :func:`all_gather` 有细微差别
因为它不提供 ``async_op`` 处理句柄,因此
将是一个阻塞调用。
.. 注意:: 对于基于 NCCL 处理的组,内部张量表示
对象必须在通信之前移动到 GPU 设备上
地点。在这种情况下,所使用的设备由
torch.cuda.current_device() 和它是用户的责任
确保每个等级都分配了独立的 GPU
torch.cuda.set_device()
.. 警告::
func:`all_gather_object` 隐式使用 ``pickle`` 模块,这
已知不安全。可能构造恶意 pickle 数据
在反序列化过程中将执行任意代码。仅调用此
使用您信任的数据。
.. 警告::
使用 GPU 张量调用:func:`all_gather_object`不受良好支持
并且效率低下,因为它会引发 GPU -> CPU 的传输,因为张量将被
请考虑使用:func:`all_gather`。
示例::
>>> # xdoctest: +SKIP("需要进程组初始化")
>>> # 注意:每个进程的进程组初始化被省略。
>>> 导入 torch.distributed 作为 dist
>>> 假设 world_size 为 3。
>>> gather_objects = ["foo", 12, {1: 2}] # 任何可序列化的对象
>>> output = [None for _ in gather_objects]
>>> dist.all_gather_object(output, gather_objects[dist.get_rank()])
>>> output
['foo', 12, {1: 2}]
"""
如果 _rank_not_in_group(
组):
_warn_not_in_group("all_gather_object")
返回
当前设备 =
_获取对象集合设备(
组)
输入张量, local_size = _object_to_tensor(
对象,
当前设备,
组)
# 收集所有局部大小。这是为了找到最大大小,并索引
直到反序列化张量达到正确大小时。
组大小 =
获取世界大小(
组=
组)
对象尺寸张量 =
火炬.
零值(
组大小,
数据类型=
火炬.
长,
设备=
当前设备
)
物体尺寸列表 = [
物体尺寸张量[i].
展平(
暗=0) for i
在
范围(
组大小)
]
张量大小汇总
全局聚合(
对象大小列表,
本地大小,
组=
组)
最大对象大小 = int(
最大值(
对象大小列表).
项目()) # type: ignore[type-var]
# 将张量调整到所有 rank 中的最大尺寸。
输入张量.
调整大小(
最大对象大小)
合并输出张量 =
火炬.
空的(
最大对象大小 *
组大小,
数据类型=
火炬.uint8,
设备=
当前设备
)
输出张量是非重叠的合并输出张量视图
output_tensors = [
合并输出张量[
最大对象大小 * i :
最大对象大小 * (i + 1)]
for i 在
范围(
组大小)
]
全局聚合(output_tensors,
输入张量,
组=
组)
反序列化输出回对象
for i, 张量
在
列举(output_tensors):
张量 =
张量.
类型(
火炬.uint8)
张量大小 =
对象大小列表[i]
对象列表[i] = _tensor_to_object(
张量,
张量大小,
组)
[文档]@_exception_logger
定义
收集对象(
对象:
任何,
物体收集列表:
可选[
列表[
任何]] =
无,
目标:
可选[int] =
无,
组:
可选[
流程组] =
无,
群组目标:
可选[int] =
无,
):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
从整个组中收集可序列化的对象。
与 :func:`gather` 类似,但可以传入 Python 对象。注意,对象必须是可序列化的才能被收集。
对象必须是可序列化的才能被收集。
参数:
obj (任何类型): 输入对象。必须是可序列化的。
object_gather_list (list[Any]): 输出列表。在 ``dst`` 节点上,它
应该正确地调整大小以匹配该集合的大小,并将包含输出。在非 dst
节点上必须为 ``None``。默认为 ``None``。
(默认为 ``None``)
dst (int, 可选): 目标全局进程组中的进程排名(无论 ``group`` 参数如何)。
(如果 ``dst`` 和 ``group_dst`` 都为 None,则默认为全局排名 0)
group: (ProcessGroup, 可选): 要工作的进程组。如果为 None,则
默认将使用默认进程组。默认为 ``None``。
group_dst(int,可选):在 ``group`` 上的目标排名。不能同时指定 ``dst`` 和 ``group_dst``。
返回:
无。在 `dst` 端,`object_gather_list` 将包含集体操作的输出。
集体操作的输出。
.. 注意:: 注意,此 API 与`gather`集体操作略有不同,
因为它不提供`async_op`句柄,因此将是阻塞的。
调用。
.. 注意:: 对于基于 NCCL 处理的组,内部张量表示
对象必须在通信之前移动到 GPU 设备上
地点。在这种情况下,所使用的设备由
torch.cuda.current_device() 和它是用户的责任
确保每个等级都分配了独立的 GPU
torch.cuda.set_device()
.. 警告::
`:func:`gather_object` 隐式使用 `pickle` 模块,其中是
已知不安全。可能构造恶意 pickle 数据
在反序列化过程中将执行任意代码。仅调用此
使用您信任的数据。
.. 警告::
调用:func:`gather_object`使用 GPU 张量支持不佳
并且效率低下,因为它会引发 GPU -> CPU 的传输,因为张量将被
请考虑使用 :func:`gather` 代替。
示例::
>>> # xdoctest: +SKIP("需要进程组初始化")
>>> # 注意:每个进程的进程组初始化被省略。
>>> 导入 torch.distributed 作为 dist
>>> 假设 world_size 为 3。
>>> gather_objects = ["foo", 12, {1: 2}] # 任何可序列化的对象
>>> output = [None for _ in gather_objects]
>>> dist.gather_object(
... gather_objects[dist.get_rank()],
... output if dist.get_rank() == 0 else None,
... dst=0
... )
>>> # 在排名 0
>>> output
['foo', 12, {1: 2}]
"""
群组 =
群组或默认群组(
组)
如果 dst is
无
和 group_dst is
无:
dst = 0
global_dst = 标准化群组排名(
组,
目标,
群组目标,
返回全局=
是)
如果 _rank_not_in_group(
组):
_warn_not_in_group(收集对象)
返回
确保已适当指定对象收集列表。
我的全球排名 =
获取排名()
验证输出列表以排名(
我的全球排名,
全球目标,
物体收集列表)
当前设备 =
_获取对象集合设备(
组)
输入张量, local_size = _object_to_tensor(
对象,
当前设备,
组)
# 收集所有局部大小。这是为了找到最大大小,并索引
直到反序列化张量达到正确大小时。
组大小 =
获取世界大小(
组=
组)
对象尺寸张量 =
火炬.
零值(
组大小,
数据类型=
火炬.
长,
设备=
当前设备
)
物体尺寸列表 = [
物体尺寸张量[i].
展平(
暗=0) for i
在
范围(
组大小)
]
# 所有聚集张量的大小。尽管这里不需要聚集,但由于每个 rank 都需要广播一个相同(最大)的张量,所以这里需要进行全聚集。
# 聚集,因为每个 rank 都需要广播一个相同(最大)的张量。
尺寸
全局聚合(
对象大小列表,
本地大小,
组=
组)
最大对象大小 = int(
最大值(
对象大小列表).
项目()) # type: ignore[type-var]
# 将张量调整到所有 rank 中的最大尺寸。
输入张量.
调整大小(
最大对象大小)
如果在此 rank 上不会收集结果,则避免填充输出张量。
如果
我的全球排名 ==
全球目标:
合并输出张量 =
火炬.
空的(
最大对象大小 *
组大小,
数据类型=
火炬.uint8,
设备=
当前设备
)
输出张量是非重叠的合并输出张量视图
output_tensors = [
合并输出张量[
最大对象大小 * i :
最大对象大小 * (i + 1)]
for i 在
范围(
组大小)
]
所有排名调用 gather,使用等大小的张量。
收集(
输入张量,
gather 列表=output_tensors
如果
我的全球排名 == global_dst
否则
无, # type: ignore[possibly-undefined]
目标=
全球目标,
组=
组,
)
如果
我的全球排名 !=
全球目标:
返回
断言
对象收集列表 is
不是
无,
"必须在目标 rank 上提供对象收集列表"
for i, 张量
在
列举(output_tensors):
张量 =
张量.
类型(
火炬.uint8)
张量大小 =
对象大小列表[i]
物体收集列表[i] = _tensor_to_object(
张量,
张量大小,
组)
[文档]@_exception_logger
定义
发送对象列表(
对象列表:
列表[
任何
]
目标:
可选[int] =
无,
组:
可选[
流程组] =
无,
设备:
可选[
火炬.
设备] =
无,
群组目标:
可选[int] =
无,
):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
同步发送 `object_list` 中的可序列化对象。
与 :func:`send` 类似,但可以传递 Python 对象。
注意,为了发送,`object_list` 中的所有对象都必须是可序列化的。
才能发送。
参数:
输入对象列表(List[Any]):要发送的输入对象列表。
每个对象必须是可序列化的。接收方必须提供大小相等的列表。
dst(int):发送“object_list”的目标 rank。
目标 rank 基于全局进程组(无论“group”参数如何)。
group: (ProcessGroup, 可选): 要工作的进程组。如果为 None,则
默认将使用默认进程组。默认为 ``None``。
device (``torch.device``,可选): 如果不为 None,则将对象
序列化并转换为张量,然后将这些张量移动到
在发送之前 ``device``。默认为 ``None``。
group_dst(int,可选):在 ``group`` 上的目标 rank。
必须指定 ``dst`` 和 ``group_dst`` 中的一个,但不能同时指定两个。
返回:
``None``。
.. 注意:对于基于 NCCL 的进程组,内部张量表示
对象必须在通信之前移动到 GPU 设备上
地点。在这种情况下,所使用的设备由
``torch.cuda.current_device()``给出,并且用户有责任确保
确保每个等级都分配了独立的 GPU
torch.cuda.set_device()
.. 警告::
`:func:`send_object_list` 隐式使用 `pickle` 模块,其中
已知不安全。可能构建恶意的 pickle
数据将在反序列化期间执行任意代码。仅调用此
使用您信任的数据。
.. 警告::
使用 GPU 张量调用:func:`send_object_list`不受良好支持
并且效率低下,因为它会引发 GPU -> CPU 的传输,因为张量将被
请考虑使用:func:`send`代替。
示例::
>>> # xdoctest: +SKIP("需要进程组初始化")
>>> # 注意:每个进程的进程组初始化被省略。
>>> 导入 torch.distributed 作为 dist
>>> # 假设后端不是 NCCL
>>> 设备 = torch.device("cpu")
>>> 如果 dist.get_rank() 等于 0:
>>> # 假设 world_size 为 2.
>>> 对象列表 = ["foo", 12, {1: 2}] # 任何可序列化的对象
>>> dist.send_object_list(objects, dst=1, device=device)
>>> else:
>>> 对象列表 = [None, None, None]
>>> dist.recv_object_list(objects, src=0, device=device)
>>> 对象
['foo', 12, {1: 2}]
"""
群组 =
群组或默认群组(
组)
group_dst = 标准化群组排名(
组,
目标,
群组目标)
_检查非自身排名(
组,
群组目标,
目的地)
如果 _rank_not_in_group(
组):
_warn_not_in_group(发送对象列表)
返回
当前设备选择。
为了保持向后兼容性,`device` 默认为 `None`
在这种情况下,我们运行设备选择的当前逻辑,即
当前设备是 CUDA,如果后端是 NCCL,否则是 CPU 设备。
如果它不是 `None`,我们将大小和对象张量移动
发送到该设备。
当前设备 =
设备
或者
_获取对象集合设备(
组)
在源 rank 上序列化 object_list 元素为张量
张量列表,
大小列表 = zip(
*[_object_to_tensor(对象,
当前设备,
组) for
对象
在
对象列表]
)
对象尺寸张量 =
火炬.
猫(
尺寸列表)
# 发送对象尺寸
发送(
物体尺寸张量,
群组目标=
群组目标,
组=
组)
将序列化对象张量连接并发送
# 注意:如果 tensor_list 为空,torch.cat 将不会执行任何操作
# 只有一个元素,我们可以跳过复制。
如果
长度(
张量列表) == 1: # type: ignore[possibly-undefined]
对象张量 =
张量列表[0]
else:
对象张量 =
火炬.
猫(
张量列表)
发送(
对象张量,
群组目标=
群组目标,
组=
组)
[文档]@_exception_logger
定义
接收对象列表(
对象列表:
列表[
任何
]
源:
可选[int] =
无,
组:
可选[
流程组] =
无,
设备:
可选[
火炬.
设备] =
无,
群组源:
可选[int] =
无,
):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
同步接收可序列化的对象,存储在 `object_list` 中。
类似于 :func:`recv`,但可以接收 Python 对象。
参数:
object_list (List[Any]): 要接收的对象列表。
必须提供与发送的列表大小相等的尺寸列表。
src(int,可选):从其中接收 ``object_list`` 的源排名。
源排名基于全局进程组(无论 ``group`` 参数如何)
如果设置为 None,则将从任何排名接收。默认为 ``None``。
group: (ProcessGroup, 可选): 要工作的进程组。如果为 None,则
默认将使用默认进程组。默认为 ``None``。
device (``torch.device``,可选): 如果不为 None,则在此设备上接收。
默认为 ``None``。
group_src(int,可选):在 ``group`` 上的目标排名。不能同时指定 ``src`` 和 ``group_src``。
返回:
发送者排名。如果排名不属于该组,则为 -1。如果排名属于该组,
``object_list`` 将包含来自 ``src`` 排名的发送对象。
.. 注意:对于基于 NCCL 的进程组,内部张量表示
对象必须在通信之前移动到 GPU 设备上
地点。在这种情况下,所使用的设备由
``torch.cuda.current_device()``给出,并且用户有责任确保
确保每个等级都分配了独立的 GPU
torch.cuda.set_device()
.. 警告::
`:func:`recv_object_list` 隐式使用 `pickle` 模块,其中
已知不安全。可能构建恶意的 pickle
数据将在反序列化期间执行任意代码。仅调用此
使用您信任的数据。
.. 警告::
使用 GPU 张量调用:func:`recv_object_list`不受良好支持
并且效率低下,因为它会引发 GPU -> CPU 的传输,因为张量将被
请考虑使用:func:`recv`代替。
示例::
>>> # xdoctest: +SKIP("需要进程组初始化")
>>> # 注意:每个进程的进程组初始化被省略。
>>> 导入 torch.distributed 作为 dist
>>> # 假设后端不是 NCCL
>>> 设备 = torch.device("cpu")
>>> 如果 dist.get_rank() 等于 0:
>>> # 假设 world_size 为 2.
>>> 对象列表 = ["foo", 12, {1: 2}] # 任何可序列化的对象
>>> dist.send_object_list(objects, dst=1, device=device)
>>> else:
>>> 对象列表 = [None, None, None]
>>> dist.recv_object_list(objects, src=0, device=device)
>>> 对象
['foo', 12, {1: 2}]
"""
如果 _rank_not_in_group(
组):
_warn_not_in_group("接收对象列表")
返回 -1
当前设备选择。
为了保持向后兼容性,`device` 默认为 `None`
在这种情况下,我们运行设备选择的当前逻辑,即
当前设备是 CUDA,如果后端是 NCCL,否则是 CPU 设备。
如果它不是 `None`,我们将大小和对象张量移动
接收到此设备。
当前设备 =
设备
或者
_获取对象集合设备(
组)
对象尺寸张量 =
火炬.
空的(
长度(
对象列表),
数据类型=
火炬.
长,
设备=
当前设备
)
# 接收对象大小
排序大小 =
接收(
物体尺寸张量,
源=
源,
组=
组,
群组源=
群组源)
# 接收序列化对象进入的张量。
对象张量 =
火炬.
空的( # type: ignore[call-overload]
火炬.
总和(
物体尺寸张量).
项目(), # type: ignore[arg-type]
数据类型=
火炬.uint8,
设备=
当前设备,
)
对象等级 =
接收(
对象张量,
源=
源,
组=
组,
群组源=
群组源)
断言
排序大小 ==
排序对象, (
"返回排名中对象大小和对象不匹配。"
)
使用存储的大小反序列化对象。
偏移 = 0
for i, obj_size 在
列举(
物体尺寸张量):
对象视图 =
对象张量[
偏移 :
偏移 +
对象大小]
对象视图 =
对象视图.
类型(
火炬.uint8)
偏移 += obj_size
对象列表[i] = _tensor_to_object(
对象视图,
对象大小,
组)
返回
排序对象
[文档]@_exception_logger
定义
广播对象列表(
对象列表:
列表[
任何
]
源:
可选[int] =
无,
组:
可选[
流程组] =
无,
设备:
可选[
火炬.
设备] =
无,
群组源:
可选[int] =
无,
):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
将 `object_list` 中的可序列化对象广播到整个组。
与 :func:`broadcast` 类似,但可以传递 Python 对象。
注意,为了发送,`object_list` 中的所有对象都必须是可序列化的。
广播
参数:
object_list (List[Any]): 要广播的输入对象列表。
每个对象都必须是可序列化的。只有 ``src`` 级别的对象将被分散,
但每个端点都必须提供大小相等的列表。
源 (int): 从哪个源排名广播 ``object_list``。
源排名基于全局进程组(无论 ``group`` 参数如何)
group: (ProcessGroup, 可选): 要工作的进程组。如果为 None,则
默认将使用默认进程组。默认为 ``None``。
device (``torch.device``,可选): 如果不为 None,则将对象
序列化并转换为张量,然后将这些张量移动到
``device``。默认为``None``。
group_src(整型):在``group``上的源 rank。不得指定``group_src``中的一个。
且 ``src`` 但不同时。
返回:
``None``。如果排名是组的一部分,``object_list`` 将包含
从 ``src`` 排名广播的对象。
.. 注意:对于基于 NCCL 的进程组,内部张量表示
对象必须在通信之前移动到 GPU 设备上
地点。在这种情况下,所使用的设备由
``torch.cuda.current_device()``给出,并且用户有责任确保
确保每个等级都分配了独立的 GPU
torch.cuda.set_device()
注意,此 API 与 :func:`broadcast` 略有不同,
因为它不提供 ``async_op`` 处理句柄,因此
将是一个阻塞调用。
.. 警告::
`broadcast_object_list`函数隐式使用`pickle`模块,这
已知不安全。可能构建恶意的 pickle
数据将在反序列化期间执行任意代码。仅调用此
使用您信任的数据。
.. 警告::
使用 GPU 张量调用 :func:`broadcast_object_list` 不受良好支持
并且效率低下,因为它会引发 GPU -> CPU 的传输,因为张量将被
序列化。请考虑使用 :func:`broadcast` 代替。
示例::
>>> # xdoctest: +SKIP("需要进程组初始化")
>>> # 注意:每个进程的进程组初始化被省略。
>>> 导入 torch.distributed 作为 dist
>>> 如果 dist.get_rank() 等于 0:
>>> # 假设 world_size 为 3。
>>> 对象列表 = ["foo", 12, {1: 2}] # 任何可序列化的对象
>>> else:
>>> 对象列表 = [None, None, None]
>>> # 假设后端不是 NCCL
>>> 设备 = torch.device("cpu")
>>> dist.broadcast_object_list(objects, src=0, device=device)
>>> 对象
['foo', 12, {1: 2}]
"""
群组 =
群组或默认群组(
组)
如果
源 is
无
和
群组源 is
无:
源 = 0
全局源 =
标准化群组排名(
组,
源,
群组源,
返回全局=
是)
如果 _rank_not_in_group(
组):
_warn_not_in_group("广播对象列表")
返回
当前设备选择。
为了保持向后兼容性,`device` 默认为 `None`
在这种情况下,我们运行设备选择的当前逻辑,即
当前设备是 CUDA,如果后端是 NCCL,否则是 CPU 设备。
如果它不是 `None`,我们将大小和对象张量移动
发送到此设备
当前设备 =
设备
或者
_获取对象集合设备(
组)
我的全球排名 =
获取排名()
在源 rank 上序列化 object_list 元素为张量
如果
我的全球排名 ==
全球源:
张量列表,
大小列表 = zip(
*[_object_to_tensor(对象,
当前设备,
组) for
对象
在
对象列表]
)
对象尺寸张量 =
火炬.
猫(
尺寸列表)
else:
对象尺寸张量 =
火炬.
空的(
长度(
对象列表),
数据类型=
火炬.
长,
设备=
当前设备
)
# 广播对象大小
广播(
物体尺寸张量,
源=
全球源,
组=
组)
# 连接并广播序列化对象张量
# 注意:如果 tensor_list 为空,torch.cat 将不会执行任何操作
# 只有一个元素,我们可以跳过复制。
如果
我的全球排名 ==
全球源:
如果
长度(
张量列表) == 1: # type: ignore[possibly-undefined]
对象张量 =
张量列表[0]
else:
对象张量 =
火炬.
猫(
张量列表)
else:
对象张量 =
火炬.
空的( # type: ignore[call-overload]
火炬.
总和(
物体尺寸张量).
项目(), # type: ignore[arg-type]
数据类型=
火炬.uint8,
设备=
当前设备,
)
广播(
对象张量,
源=
全球源,
组=
组)
使用存储的大小反序列化对象。
偏移 = 0
如果
我的全球排名 !=
全球源:
for i, obj_size 在
列举(
物体尺寸张量):
对象视图 =
对象张量[
偏移 :
偏移 +
对象大小]
对象视图 =
对象视图.
类型(
火炬.uint8)
偏移 += obj_size
对象列表[i] = _tensor_to_object(
对象视图,
对象大小,
组)
[文档]@_exception_logger
定义
散列对象列表(
散射对象输出列表:
列表[
任何
]
散列对象输入列表:
可选[
列表[
任何]] =
无,
源:
可选[int] =
无,
组:
可选[
流程组] =
无,
群组源:
可选[int] =
无,
):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
将 `scatter_object_input_list` 中的可散列对象散列到整个组中。
类似于 :func:`scatter`,但可以传递 Python 对象。在
每个等级,分散的对象将被存储为第一个元素
``scatter_object_output_list``。注意,所有在
``scatter_object_input_list``中的对象都必须是可序列化的,以便进行分散。
参数:
scatter_object_output_list(Any 类型的列表):非空列表,其第一个
该元素将存储分配到此级别的对象。
scatter_object_input_list (List[Any], optional): 要分散的输入对象列表。
每个对象都必须是可序列化的。只有 ``src`` 级别的对象将被分散,
并且对于非-src 级别,参数可以是 ``None``。
源 (int): 从哪个源排名散布 ``scatter_object_input_list``。
源排名基于全局进程组(无论 ``group`` 参数如何)。
(如果 ``src`` 和 ``group_src`` 都为 None,则默认为全局排名 0)
group: (ProcessGroup, 可选): 要工作的进程组。如果为 None,则
默认将使用默认进程组。默认为 ``None``。
group_src (int, 可选): ``group`` 上的源排名。不能同时指定 ``src`` 和 ``group_src``。
返回:
``None``。如果排名是组的一部分,``scatter_object_output_list``
将具有此等级的散列对象设置为第一个元素。
.. 注意:: 注意,此 API 与 scatter collective 略有不同
由于它不提供 `async_op` 处理句柄,因此将不会是
阻塞调用。
.. 警告::
`:func:`scatter_object_list` 隐式使用 `pickle` 模块,其中
已知不安全。可能构建恶意的 pickle
数据将在反序列化期间执行任意代码。仅调用此
使用您信任的数据。
.. 警告::
使用 GPU 张量调用:func:`scatter_object_list`不受良好支持
并且效率低下,因为它会引发 GPU -> CPU 的传输,因为张量将被
请考虑使用:func:`scatter`代替。
示例::
>>> # xdoctest: +SKIP("需要进程组初始化")
>>> # 注意:每个进程的进程组初始化被省略。
>>> 导入 torch.distributed 作为 dist
>>> 如果 dist.get_rank() 等于 0:
>>> # 假设 world_size 为 3。
>>> 对象列表 = ["foo", 12, {1: 2}] # 任何可序列化的对象
>>> else:
>>> # 在非源 rank 上可以是任何列表,元素不使用。
>>> 对象列表 = [None, None, None]
>>> 输出列表 = [None]
>>> dist.scatter_object_list(output_list, objects, src=0)
>>> # 排名 i 获取 objects[i]。例如,在排名 2:
>>> 输出列表
[{1: 2}]
"""
群组 =
群组或默认群组(
组)
如果
源 is
无
和
群组源 is
无:
源 = 0
全局源 =
标准化群组排名(
组,
源,
群组源,
返回全局=
是)
如果 _rank_not_in_group(
组):
_warn_not_in_group("散射对象列表")
返回
如果 (
不是 isinstance(
散射对象输出列表,
列表)
或者
长度(
散射对象输出列表) < 1
):
抛出
值错误(
预期参数 scatter_object_output_list 至少为大小为 1 的列表。
)
我的全球排名 =
获取排名()
PG 设备 =
_获取对象集合设备(
组)
如果
我的全球排名 ==
全球源:
如果
散射对象输入列表 is
无:
抛出
值错误(
"源排名必须提供非 None 的散射对象输入列表"
)
张量列表,
张量大小 = zip(
*[
_object_to_tensor(对象,
pg 设备,
组)
for 对象
在
散射对象输入列表
]
)
张量列表,
张量大小 =
列表(
张量列表),
列表(
张量大小)
# 源秩广播最大张量大小。这是因为所有秩都
# 预期以大小相等的张量调用 scatter()。
最大张量大小 =
最大值(
张量大小) # type: ignore[possibly-undefined]
for 张量
在
张量列表: # type: ignore[possibly-undefined]
张量.
调整大小(
最大张量大小)
else:
最大张量大小 =
火炬.
张量
[0
]
数据类型=
火炬.
长,
设备=
pg 设备)
广播(
最大张量大小,
源=
全球源,
组=
组)
# 散列实际序列化对象
输出张量 =
火炬.
空的(
最大张量大小.
项目(),
数据类型=
火炬.uint8,
设备=
PG 设备
)
分散(
输出张量,
scatter_list=无
如果
我的全球排名 !=
全局源
否则
张量列表, # type: ignore[possibly-undefined]
源=
全球源,
组=
组,
)
在反序列化回对象时分散对象大小以修剪张量
obj_tensor_size = 火炬.
张量
[0
]
数据类型=
火炬.
长,
设备=
pg 设备)
分散(
对象张量大小,
scatter_list=无
如果
我的全球排名 !=
全局源
否则
张量大小, # type: ignore[possibly-undefined]
源=
全球源,
组=
组,
)
反序列化回对象
散射对象输出列表[0] = _tensor_to_object(
输出张量,
对象张量大小,
群组
)
[文档]@_exception_logger
定义
全局聚合(
张量列表,
张量,
组=
无, async_op=
错误):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
从整个组中收集张量到一个列表中。
支持复杂和不均匀大小的张量。
参数:
tensor_list (list[Tensor]): 输出列表。它应该包含
正确大小的张量,用于集体输出。
支持不同大小的张量。
张量(Tensor):当前进程要广播的张量。
group(ProcessGroup,可选):要工作的进程组。如果为 None,
则使用默认进程组。
async_op (bool, 可选): 是否将此操作设置为异步操作
返回:
异步工作处理,如果 async_op 设置为 True。
如果不是 async_op 或不是组的一部分,则为 None。
示例:
>>> # xdoctest: +SKIP("需要进程组初始化")
>>> # 以下所有张量均为 torch.int64 数据类型。
>>> # 我们有 2 个进程组,2 个 rank。
>>> device = torch.device(f"cuda:{rank}")
>>> tensor_list = [
... torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2)
... ]
>>> tensor_list
[tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0
[tensor([0, 0], device='cuda:1'), tensor([0, 0], device='cuda:1')] # Rank 1
>>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> dist.all_gather(tensor_list, tensor)
>>> tensor_list
[tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0
[tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1
>>> # 所有下面的张量都是 torch.cfloat 类型。
>>> # 我们有 2 个进程组,2 个 rank。
>>> tensor_list = [
... torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2)
... ]
>>> tensor_list
[tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0
[tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1
>>> tensor = torch.tensor(
... [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device
... ) + 2 * rank * (1 + 1j)
>>> tensor
tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
>>> dist.all_gather(tensor_list, tensor)
>>> tensor_list
[tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0
[tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1
"""
Dynamo 内置逻辑将旧版分布式操作映射到功能集体。
让我们重定向到一个可以模拟此逻辑的 torch 函数模式,该模式在 Dynamo 之外。
(例如,非严格导出实现了这样的 torch 函数模式)。
相关参数 = (
张量,)
如果
有 torch 功能(relevant_args):
返回 handle_torch_function(
全局聚合,
relevant_args,
张量列表,
张量,
组=
组,
async_op=async_op,
)
`_check_tensor_list`(
张量列表,
"tensor 列表")
_检查单个张量(
张量,
"张量")
确保所有张量具有相同的数据类型(
张量列表,
张量)
如果 _rank_not_in_group(
组):
_warn_not_in_group("全局聚合")
返回
张量列表 = [
t 如果
不是 t.
是复杂的()
否则
火炬.
真实查看(t) for t
在
张量列表
]
张量 =
张量
如果
不是
张量.
是复杂的()
否则
火炬.
真实查看(
张量)
群组 =
群组
或者
获取默认组()
工作 =
组.
全局收集
[
张量列表
] [
张量])
如果 async_op:
返回
工作
else:
工作.
等待()
[文档]@_exception_logger
定义 all_gather_into_tensor(
输出张量,
输入张量,
组=
无, async_op=
错误):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
从所有进程收集张量并将它们放入单个输出张量中。
此函数要求每个进程上的所有张量大小必须相同。
参数:
输出张量(Tensor):用于容纳张量元素的输出张量
从所有阶层。它必须正确尺寸,以便有一个
以下形式:
(i) 所有输入张量的连接
维度;关于“连接”的定义,请参阅 ``torch.cat()``;
(ii)所有输入张量沿主维度的堆叠;
关于“堆叠”的定义,请参阅 ``torch.stack()``。
下面的示例可能更能说明支持的输出形式。
输入张量(Tensor):从当前 rank 收集的张量。
与`all_gather` API 不同,此 API 的输入张量在所有 rank 上必须具有相同的大小。
API 必须具有相同的大小。
group(ProcessGroup,可选):要工作的进程组。如果为 None,
则使用默认进程组。
async_op (bool, 可选): 是否将此操作设置为异步操作
返回:
异步工作处理,如果 async_op 设置为 True。
如果不是 async_op 或不是组的一部分,则为 None。
示例:
>>> # xdoctest: +SKIP("需要进程组初始化")
>>> # 以下所有张量均为 torch.int64 数据类型,且位于 CUDA 设备上。
>>> 我们有两个等级。
>>> device = torch.device(f"cuda:{rank}")
>>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor_in
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> # 输出拼接形式
>>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device)
>>> dist.all_gather_into_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([1, 2, 3, 4], device='cuda:0') # Rank 0
tensor([1, 2, 3, 4], device='cuda:1') # Rank 1
>>> # 输出为栈形式
>>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device)
>>> dist.all_gather_into_tensor(tensor_out2, tensor_in)
>>> tensor_out2
tensor([[1, 2],
[3, 4]], device='cuda:0') # Rank 0
tensor([[1, 2],
[3, 4]], device='cuda:1') # Rank 1
.. 警告::
Gloo 后端不支持此 API。
"""
Dynamo 内置逻辑将旧版分布式操作映射到功能集体。
让我们重定向到一个可以模拟此逻辑的 torch 函数模式,该模式在 Dynamo 之外。
(例如,非严格导出实现了这样的 torch 函数模式)。
相关参数 = (
输入张量,)
如果
有 torch 功能(relevant_args):
返回 handle_torch_function(
all_gather_into_tensor,
relevant_args,
输出张量,
输入张量,
组=
组,
async_op=async_op,
)
_检查单个张量(
输入张量,
输入张量)
_检查单个张量(
输出张量,
输出张量)
如果 _rank_not_in_group(
组):
_warn_not_in_group("所有聚合到张量中")
返回
输出张量 = (
输出张量
如果
不是
输出张量.
是复杂的()
否则
火炬.
真实查看(
输出张量)
)
输入张量 = (
输入张量
如果
不是
输入张量.
是复杂的()
否则
火炬.
真实查看(
输入张量)
)
选项 =
Allgather 选项()
选项.
异步操作 = async_op
群组 =
群组
或者
获取默认组()
如果
群组
在
_世界.
pg_coalesce_state_合并状态.
键():
我们处于合并上下文,不要执行单个操作,只需追加集体表示
合并 =
_集合操作(all_gather_into_tensor,
输入张量,
输出张量)
_世界.
pg_coalesce_state_合并状态[
组].append(
集合)
如果 async_op:
返回
非法工作()
else:
返回
无
工作 =
组._allgather_base(
输出张量,
输入张量,
选项)
如果 async_op:
返回
工作
else:
工作.
等待()
@_exception_logger
@deprecated(
"`torch.distributed._all_gather_base` 是一个私有函数,并将被弃用。"
"请使用 `torch.distributed.all_gather_into_tensor` 代替。",
分类=
未来警告,
)
定义 _all_gather_base(
输出张量,
输入张量,
组=
无, async_op=
错误):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
单个张量全聚合。从所有进程收集单个张量,并将它们放入单个输出张量中。
参数:
输出张量(Tensor):输出张量。它应包含
正确大小的张量,用于集体输出。
输入张量(Tensor):当前进程要广播的张量。
group(ProcessGroup,可选):要工作的进程组。如果为 None,
则使用默认进程组。
async_op (bool, 可选): 是否将此操作设置为异步操作
返回:
异步工作处理,如果 async_op 设置为 True。
如果不是 async_op 或不是组的一部分,则为 None。
.. 警告::
`_all_gather_base`是一个私有函数。用户应使用
`all_gather_into_tensor`代替。
"""
返回 all_gather_into_tensor(
输出张量,
输入张量,
组, async_op)
@_exception_logger
@deprecated(
"`torch.distributed.all_gather_coalesced` 将被弃用。如果必须使用,请稍后重新查阅我们的文档。"
"请稍后在我们的文档中查阅。"
https://pytorch.org/docs/main/分布式.html#集体函数,
分类=
未来警告,
)
定义
全局聚合合并(
输出张量列表,
输入张量列表,
组=
无, async_op=
假
):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
以合并方式从整个组收集输入张量到列表中。
支持复杂张量。
参数:
output_tensor_lists (list[list[Tensor]]): 输出列表。它应包含
正确大小的张量,用于集体输出。
input_tensor_list (list[Tensor]): 要从当前进程广播的张量。
至少有一个张量不能为空。
group(ProcessGroup,可选):要工作的进程组。如果为 None,
则使用默认进程组。
async_op (bool, 可选): 是否此操作应为异步操作。
返回:
异步工作处理,如果 async_op 设置为 True。
如果不是 async_op 或不是组的一部分,则为 None。
示例:
我们有 2 个进程组,2 个进程等级。
rank 0 passes:
input_tensor_list = [[[1, 1], [1, 1]], [2], [3, 3]]
output_tensor_lists =
[[[-1, -1], [-1, -1]], [-1], [-1, -1]]
[[[-1, -1], [-1, -1]], [-1], [-1, -1]]
排名 1 通过数:
input_tensor_list = [[[3, 3], [3, 3]], [5], [1, 1]]
output_tensor_lists =
[[[-1, -1], [-1, -1]], [-1], [-1, -1]]
[[[-1, -1], [-1, -1]], [-1], [-1, -1]]
两个都获得排名 0 和 1:
output_tensor_lists =
[[[1, 1], [1, 1]], [2], [3, 3]],
[[3, 3], [3, 3]], [5], [1, 1]]].
警告:目前尚未在节点间实现单个形状检查。
例如,如果 0 阶节点传递[torch.rand(4), torch.rand(2)]和
排名 1 的节点通过 [torch.rand(2), torch.rand(2), torch.rand(2)]
all_gather_coalesced 操作将无怨无悔地进行并返回
错误的输出。这种缺乏形状检查导致性能显著提升,但使用此函数的用户应格外小心
这将带来显著的性能提升,但使用此函数的用户应格外小心
确保每个节点传递的张量形状在节点间匹配。
"""
在这里我们仅检查与 C++参数的基本兼容性,C++代码将
执行形状和类型检查。
如果 _rank_not_in_group(
组):
_warn_not_in_group(all_gather_coalesced)
返回
`_check_tensor_list`(
输入张量列表,
输入张量列表)
确保所有张量具有相同的数据类型(
输入张量列表)
如果
不是 isinstance(
输出张量列表,
列表):
抛出
类型错误(
"无效的函数参数:output_tensor_lists 应该是一个列表"
)
for 输出张量列表
在
输出张量列表:
`_check_tensor_list`(
输出张量列表,
输出张量列表)
确保所有张量具有相同的数据类型(
输出张量列表)
输出张量列表 = [
[t 如果
不是 t.
是复杂的()
否则
火炬.
真实查看(t) for t
在 l]
for l 在
输出张量列表
]
输入张量列表 = [
t 如果
不是 t.
是复杂的()
否则
火炬.
真实查看(t) for t
在
输入张量列表
]
群组 =
群组
或者
获取默认组()
工作 =
组.
全局聚合合并(
输出张量列表,
输入张量列表)
如果 async_op:
返回
工作.
获取未来()
else:
工作.
等待()
定义
验证输出列表以排名(
我的排名,
目标,
gather 列表):
如果 dst ==
我的排名:
如果
不是
gather 列表:
抛出
值错误(
"必须在目标排名上指定参数 ``收集列表``。"
)
如果...否则
gather 列表:
抛出
值错误(
参数 `gather_list` 在非目标节点上不得指定。
)
[文档]@_exception_logger
定义
收集(
张量:
火炬.
张量,
gather 列表:
可选[
列表[
火炬.
张量]] =
无,
目标:
可选[int] =
无,
组:
可选[
流程组] =
无,
async_op: 布尔值 =
错误,
群组目标:
可选[int] =
无,
):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
在单个进程中收集张量列表。
此函数要求每个进程上的所有张量大小必须相同。
参数:
张量(Tensor):输入张量。
收集列表(list[Tensor],可选):适当列表
相同大小的张量用于收集数据
(默认为 None,必须在目标排名上指定)
dst (int, 可选): 目标全局进程组中的进程排名(无论 ``group`` 参数如何)。
(如果 ``dst`` 和 ``group_dst`` 都为 None,则默认为全局排名 0)
group(ProcessGroup,可选):要工作的进程组。如果为 None,
则使用默认进程组。
async_op (bool, 可选): 是否将此操作设置为异步操作
group_dst(int,可选):在 ``group`` 上的目标排名。不能同时指定 ``dst`` 和 ``group_dst``。
返回:
异步工作处理,如果 async_op 设置为 True。
如果不是 async_op 或不是组的一部分,则为 None。
.. 注意:: 注意,gather_list 中的所有张量必须具有相同的大小。
示例::
>>> # xdoctest: +SKIP("no rank")
>>> # 我们有 2 个进程组,2 个 rank。
>>> tensor_size = 2
>>> device = torch.device(f'cuda:{rank}')
>>> tensor = torch.ones(tensor_size, device=device) + rank
>>> 如果 dist.get_rank() 等于 0:
>>> gather_list = [torch.zeros_like(tensor, device=device) for i in range(2)]
>>> else:
>>> gather_list = None
>>> dist.gather(tensor, gather_list, dst=0)
>>> # Rank 0 获取数据。
>>> gather_list
[tensor([1., 1.], device='cuda:0'), tensor([2., 2.], device='cuda:0')] # Rank 0
None # Rank 1
"""
_检查单个张量(
张量,
"张量")
参数 ``gather_list`` 在非目标 rank 上可以不指定。
如果
gather 列表:
`_check_tensor_list`(
gather 列表, "gather_list")
else:
收集列表 =
输入文本为空,请提供需要翻译的文本
确保所有张量具有相同的数据类型(
张量,
gather 列表)
群组 =
群组或默认群组(
组)
如果 _rank_not_in_group(
组):
_warn_not_in_group("聚集")
返回
如果 dst is
无
和 group_dst is
无:
dst = 0
global_dst = 标准化群组排名(
组,
目标,
群组目标,
返回全局=
是)
group_dst = 标准化群组排名(
组,
目标,
群组目标,
返回全局=
错误)
我的全球排名 =
获取排名()
验证输出列表以排名(
我的全球排名,
全球目标,
gather 列表)
output_tensors = [gather 列表]
如果 global_dst ==
我的全球排名
否则
输入文本为空,请提供需要翻译的文本
input_tensors = [张量]
选项 =
收集选项()
选项.
根等级 = group_dst
工作 =
组.
收集(output_tensors,
输入张量,
选项)
如果 async_op:
返回
工作
else:
工作.
等待()
[文档]@_exception_logger
定义
分散(
张量:
火炬.
张量,
scatter_list: 可选[
列表[
火炬.
张量]] =
无,
源:
可选[int] =
无,
组:
可选[
流程组] =
无,
async_op: 布尔值 =
错误,
群组源:
可选[int] =
无,
):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
将张量列表分散到组中的所有进程。
每个进程将恰好接收一个张量并将数据存储在
``tensor``参数中。
支持复杂张量。
参数:
索引张量(Tensor):输出张量。
散列列表(list[Tensor]):要散列的张量列表(默认为空,必须在源 rank 上指定)。
None,必须在全局进程组上的源 rank 上指定。
src(int):全局进程组上的源 rank(无论``group``参数如何)。
(如果 ``src`` 和 ``group_src`` 都为 None,则默认为全局排名 0)
group(ProcessGroup,可选):要工作的进程组。如果为 None,
则使用默认进程组。
async_op (bool, 可选): 是否将此操作设置为异步操作
group_src (int, 可选): ``group`` 上的源排名。不能同时指定 ``src`` 和 ``group_src``。
返回:
异步工作处理,如果 async_op 设置为 True。
如果不是 async_op 或不是组的一部分,则为 None。
.. 注意:: 注意 scatter_list 中的所有张量必须具有相同的大小。
示例::
>>> # xdoctest: +SKIP("需要进程组初始化")
>>> # 注意:每个进程的进程组初始化被省略。
>>> 导入 torch.distributed 作为 dist
>>> tensor_size = 2
>>> device = torch.device(f'cuda:{rank}')
>>> output_tensor = torch.zeros(tensor_size, device=device)
>>> 如果 dist.get_rank() 等于 0:
>>> # 假设 world_size 为 2.
>>> # 只需张量,所有张量的大小必须相同。
>>> t_ones = torch.ones(tensor_size, device=device)
>>> t_fives = torch.ones(tensor_size, device=device) * 5
>>> scatter_list = [t_ones, t_fives]
>>> else:
>>> scatter_list = None
>>> dist.scatter(output_tensor, scatter_list, src=0)
>>> # Rank i 获取 scatter_list[i].
>>> output_tensor
tensor([1., 1.], device='cuda:0') # Rank 0
tensor([5., 5.], device='cuda:1') # Rank 1
"""
_检查单个张量(
张量,
"张量")
# 参数 `scatter_list` 在非源 rank 上可以不指定。
如果 scatter_list:
`_check_tensor_list`(scatter_list,
散点列表)
else:
散点列表 =
输入文本为空,请提供需要翻译的文本
确保所有张量具有相同的数据类型(
张量, scatter_list)
群组 =
群组或默认群组(
组)
如果
源 is
无
和
群组源 is
无:
源 = 0
全局源 =
标准化群组排名(
组,
源,
群组源,
返回全局=
是)
群组源 =
标准化群组排名(
组,
源,
群组源,
返回全局=
错误)
如果 _rank_not_in_group(
组):
_warn_not_in_group("散射")
返回
散点列表 = [
t 如果
不是 t.
是复杂的()
否则
火炬.
真实查看(t) for t
在
散点列表
]
张量 =
张量
如果
不是
张量.
是复杂的()
否则
火炬.
真实查看(
张量)
我的全球排名 =
获取排名()
如果
全局源 ==
我的全球排名:
如果
不是 scatter_list:
抛出
值错误(
"必须在源端指定参数 ``散列列表``。"
)
input_tensors = [scatter_list]
output_tensors = [张量]
else:
如果 scatter_list:
抛出
值错误(
"``scatter_list`` 参数必须在非源节点上不指定。"
)
input_tensors = 输入文本为空,请提供需要翻译的文本
output_tensors = [张量]
选项 =
散点选项()
选项.
根等级 =
群组源
选项.
异步操作 = async_op
工作 =
组.
分散(output_tensors,
输入张量,
选项)
如果 async_op:
返回
工作
else:
工作.
等待()
[文档]@_exception_logger
def reduce_scatter(output, input_list, op=ReduceOp.SUM, group=None, async_op=False):
"""
将张量列表减少并分散到组中的所有进程。
Args:
输出(张量):输出张量。
输入列表(列表[张量]):要减少和散布的张量列表。
op(可选):以下值之一
torch.distributed.ReduceOp
枚举。指定用于逐元素减少的操作。
group(进程组,可选):要工作的进程组。如果为 None,
则将使用默认进程组。
async_op (bool, 可选): 是否将此操作设置为异步操作。
返回:
异步工作句柄,如果 async_op 设置为 True。
如果不是 async_op 或不是组的一部分,则为 None。
"""
_check_single_tensor(output, "output")
_check_tensor_list(input_list, "input_list")
_ensure_all_tensors_same_dtype(output, input_list)
如果_rank_not_in_group(group):
_warn_not_in_group("reduce_scatter")
返回
opts = ReduceScatterOptions()
opts.reduceOp = op
group = group or _get_default_group()
work = group.reduce_scatter([output], [input_list], opts)
if async_op:
返回工作
else:
work.wait()
[文档]@_exception_logger
定义 reduce_scatter_tensor(
输出,
输入,
操作=ReduceOp.SUM,
组=
无, async_op=
错误):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
减少后,将张量分散到组中的所有等级。
参数:
输出(张量):输出张量。它应该在所有方面都具有相同的大小。
排名。
输入(张量):要减少和散布的输入张量。其大小
应该是输出张量大小乘以全局大小。输入张量
可以具有以下形状之一:
(i)输出张量沿主轴的连接
维度,或
(ii)沿主维度输出的张量堆栈。
关于“连接”的定义,请参阅 ``torch.cat()``。
关于“堆叠”的定义,请参阅 ``torch.stack()``。
group(ProcessGroup,可选):要工作的进程组。如果为 None,
则使用默认进程组。
async_op (bool, 可选): 是否此操作应为异步操作。
返回:
异步工作处理,如果 async_op 设置为 True。
如果不是 async_op 或不是组的一部分,则为 None。
示例:
>>> # xdoctest: +SKIP("需要进程组初始化")
>>> # 以下所有张量均为 torch.int64 数据类型,且位于 CUDA 设备上。
>>> 我们有两个等级。
>>> device = torch.device(f"cuda:{rank}")
>>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device)
>>> # 输入拼接形式
>>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device)
>>> tensor_in
tensor([0, 1, 2, 3], device='cuda:0') # 级别 0
tensor([0, 1, 2, 3], device='cuda:1') # 1 级
>>> dist.reduce_scatter_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([0, 2], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1
>>> # 栈输入
>>> tensor_in = torch.reshape(tensor_in, (world_size, 2))
>>> tensor_in
tensor([[0, 1],
[[2, 3]], device='cuda:0' # Rank 0
tensor([[0, 1],
[[2, 3]], device='cuda:1' # Rank 1
>>> dist.reduce_scatter_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([0, 2], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1
.. 警告::
Gloo 后端不支持此 API。
"""
Dynamo 内置逻辑将旧版分布式操作映射到功能集体。
让我们重定向到一个可以模拟此逻辑的 torch 函数模式,该模式在 Dynamo 之外。
(例如,非严格导出实现了这样的 torch 函数模式)。
相关参数 = (
输入,)
如果
有 torch 功能(relevant_args):
返回 handle_torch_function(
reduce_scatter_tensor,
relevant_args,
输出,
输入,
操作=
操作,
组=
组,
async_op=async_op,
)
_检查单个张量(
输出,
输出)
_检查单个张量(
输入,
输入)
如果 _rank_not_in_group(
群组):
_warn_not_in_group("reduce_scatter_tensor")
返回
选项 =
减少散布选项()
选项.
减少操作 =
操作符
选项.
异步操作 = async_op
群组 =
群组
或者
获取默认组()
# 检查我们是否处于合并上下文
# 如果我们处于合并上下文,则不要发出单个操作,只需附加一个集体表示
如果
群组
在
_世界.
pg_coalesce_state_合并状态.
键():
合并 =
_集合操作(reduce_scatter_tensor,
输入,
输出,
操作,
无)
_世界.
pg_coalesce_state_合并状态[
组].append(
集合)
如果 async_op:
返回
非法工作()
else:
返回
无
工作 =
组._reduce_scatter_base(
输出,
输入,
选项)
如果 async_op:
return 工作
else:
工作.
等待()
@deprecated(
"`torch.distributed._reduce_scatter_base` 是一个私有函数,并将被弃用。"
"请使用 `torch.distributed.reduce_scatter_tensor` 代替。",
分类=
未来警告,
)
定义 _reduce_scatter_base(
输出,
输入,
操作=ReduceOp.SUM,
组=
无, async_op=
错误):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
将展平的 tensor 分散到组中的所有进程中。
参数:
输出(Tensor):输出 tensor。
输入(Tensor):输入 tensor 的大小为输出 tensor 大小乘以全局大小。
group(ProcessGroup,可选):要工作的进程组。如果为 None,
则使用默认进程组。
async_op (bool, 可选): 是否此操作应为异步操作。
返回:
异步工作处理,如果 async_op 设置为 True。
如果不是 async_op 或不是组的一部分,则为 None。
.. 警告::
`_reduce_scatter_base` 是一个私有函数。用户应使用
`reduce_scatter_tensor` 代替。
"""
返回 reduce_scatter_tensor(
输出,
输入,
操作,
组, async_op)
[文档]@_exception_logger
定义
全向单播(
输出,
输入,
输出分割大小=
无,
输入分割大小=
无,
组=
无,
async_op=错误,
):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
将输入张量分割,然后将分割后的列表分散到组中的所有进程中。
然后将所有进程中接收到的张量连接起来。
并将其作为单个输出张量返回。
支持复杂张量。
参数:
输出(张量):收集的连接输出张量。
输入(张量):散列的输入张量。
output_split_sizes:(可选的[int]列表):dim 0 的输出分割大小
如果指定为 None 或空,则 ``output`` 张量的 dim 0 必须能被分割
由 `world_size` 均等分配。
输入分割大小:(list[Int], 可选):维度 0 的输入分割大小
如果指定为 None 或空,则 `input` 张量的维度 0 必须能被 `world_size` 整除
由 `world_size` 均等分配。
group(ProcessGroup,可选):要工作的进程组。如果为 None,
则使用默认进程组。
async_op (bool, 可选): 是否此操作应为异步操作。
返回:
异步工作处理,如果 async_op 设置为 True。
如果不是 async_op 或不是组的一部分,则为 None。
.. 警告::
`all_to_all_single`是实验性的,可能会更改。
示例:
>>> # xdoctest: +SKIP("未定义的 rank")
>>> input = torch.arange(4) + rank * 4
>>> input
tensor([0, 1, 2, 3]) # Rank 0
tensor([4, 5, 6, 7]) # Rank 1
tensor([8, 9, 10, 11]) # Rank 2
tensor([12, 13, 14, 15]) # Rank 3
>>> output = torch.empty([4], dtype=torch.int64)
>>> dist.all_to_all_single(output, input)
>>> output
tensor([0, 4, 8, 12]) # Rank 0
tensor([1, 5, 9, 13]) # Rank 1
tensor([2, 6, 10, 14]) # Rank 2
tensor([3, 7, 11, 15]) # Rank 3
>>> # 类似于以下操作:
>>> scatter_list = list(input.chunk(world_size))
>>> gather_list = list(output.chunk(world_size))
>>> for i in range(world_size):
>>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i)
>>> # 另一个不均匀分割的例子
>>> input
tensor([0, 1, 2, 3, 4, 5]) # Rank 0
tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1
tensor([20, 21, 22, 23, 24]) # Rank 2
tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3
>>> 输入分割
[2, 2, 1, 1] # 排名 0
[3, 2, 2, 2] # 排名 1
[2, 1, 1, 1] # 排名 2
[2, 2, 2, 1] # 排名 3
>>> 输出分割
[2, 3, 2, 2] # 排名 0
[2, 2, 1, 2] # 排名 1
[1, 2, 1, 2] # 排名 2
[1, 2, 1, 1] # 排名 3
>>> output = ...
>>> dist.all_to_all_single(output, input, output_splits, input_splits)
>>> output
tensor([ 0, 1, 10, 11, 12, 20, 21, 30, 31]) # Rank 0
tensor([ 2, 3, 13, 14, 22, 32, 33]) # Rank 1
tensor([ 4, 15, 16, 23, 34, 35]) # Rank 2
tensor([ 5, 17, 18, 24, 36]) # Rank 3
>>> # 另一个示例,使用 torch.cfloat 类型的张量。
>>> input = torch.tensor(
[..., [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j], dtype=torch.cfloat]
... ) + 4 * rank * (1 + 1j)
>>> input
tensor([1+i, 2+2i, 3+3i, 4+4i]) # Rank 0
tensor([5+5i, 6+6i, 7+7i, 8+8i]) # Rank 1
tensor([9+9j, 10+10j, 11+11j, 12+12j]) # Rank 2
tensor([13+13j, 14+14j, 15+15j, 16+16j]) # Rank 3
>>> output = torch.empty([4], dtype=torch.int64)
>>> dist.all_to_all_single(output, input)
>>> output
tensor([1+1j, 5+5j, 9+9j, 13+13j]) # Rank 0
tensor([2+2j, 6+6j, 10+10j, 14+14j]) # Rank 1
tensor([3+3j, 7+7j, 11+11j, 15+15j]) # Rank 2
tensor([4+4j, 8+8j, 12+12j, 16+16j]) # Rank 3
"""
Dynamo 内置逻辑将旧版分布式操作映射到功能集体。
让我们重定向到一个可以模拟此逻辑的 torch 函数模式,该模式在 Dynamo 之外。
(例如,非严格导出实现了这样的 torch 函数模式)。
相关参数 = (
输入,)
如果
有 torch 功能(relevant_args):
返回 handle_torch_function(
全向单播,
relevant_args,
输出,
输入,
输出分割大小=
输出分割大小,
输入分割大小=
输入分割大小,
组=
组,
async_op=async_op,
)
如果 _rank_not_in_group(
组):
_warn_not_in_group(全向单播)
返回
选项 =
AllToAll 选项()
_检查单个张量(
输出,
输出)
_检查单个张量(
输入,
输入)
确保所有张量具有相同的数据类型(
输出,
输入)
如果
输入.
是复杂的():
输入 =
火炬.
真实查看(
输入)
如果
输出.
是复杂的():
输出 =
火炬.
真实查看(
输出)
输出分割大小 =
输入文本为空,请提供需要翻译的文本
如果
输出分割大小 is
无
否则
输出分割大小
输入分割大小 =
输入文本为空,请提供需要翻译的文本
如果
输入分割大小 is
无
否则
输入分割大小
群组 =
群组
或者
获取默认组()
工作 =
组.alltoall_base(
输出,
输入,
输出分割大小,
输入分割大小,
选项
)
如果 async_op:
返回
工作
else:
工作.
等待()
[文档]@_exception_logger
定义
全部到全部(
输出张量列表,
输入张量列表,
组=
无, async_op=
错误):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
将输入张量的散列列表分配到组中的所有进程,并返回输出列表中收集的张量列表。
支持复杂张量。
参数:
output_tensor_list (list[Tensor]): 需要收集的每个 rank 的张量列表
per rank.
input_tensor_list (list[Tensor]): 每个 rank 散列的张量列表
group(ProcessGroup,可选):要工作的进程组。如果为 None,
则使用默认进程组。
async_op (bool, 可选): 是否此操作应为异步操作。
返回:
异步工作处理,如果 async_op 设置为 True。
如果不是 async_op 或不是组的一部分,则为 None。
.. 警告::
`all_to_all`是实验性的,可能会更改。
示例:
>>> # xdoctest: +SKIP("未定义的 rank")
>>> input = torch.arange(4) + rank * 4
>>> input = list(input.chunk(4))
>>> input
[tensor([0]), tensor([1]), tensor([2]), tensor([3])] # Rank 0
[tensor([4]), tensor([5]), tensor([6]), tensor([7])] # Rank 1
[tensor([8]), tensor([9]), tensor([10]), tensor([11])] # Rank 2
[tensor([12]), tensor([13]), tensor([14]), tensor([15])] # 程序秩为 3
>>> output = list(torch.empty([4], dtype=torch.int64).chunk(4))
>>> dist.all_to_all(output, input)
>>> output
[tensor([0]), tensor([4]), tensor([8]), tensor([12])] # 零阶
[tensor([1]), tensor([5]), tensor([9]), tensor([13])] # Rank 1
[tensor([2]), tensor([6]), tensor([10]), tensor([14])] # Rank 2
[tensor([3]), tensor([7]), tensor([11]), tensor([15])] # Rank 3
>>> # 类似于以下操作:
>>> scatter_list = input
>>> gather_list = output
>>> for i in range(world_size):
>>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src=i)
>>> input
tensor([0, 1, 2, 3, 4, 5]) # Rank 0
tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1
tensor([20, 21, 22, 23, 24]) # Rank 2
tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3
>>> 输入分割
[2, 2, 1, 1] # 排名 0
[3, 2, 2, 2] # 排名 1
[2, 1, 1, 1] # 排名 2
[2, 2, 2, 1] # 排名 3
>>> 输出分割
[2, 3, 2, 2] # 排名 0
[2, 2, 1, 2] # 排名 1
[1, 2, 1, 2] # 排名 2
[1, 2, 1, 1] # 排名 3
>>> 输入 = 列表(输入.split(输入分割))
>>> input
[tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])] # Rank 0
[tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1
[tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])] # Rank 2
[tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])] # Rank 3
>>> output = ...
>>> dist.all_to_all(output, input)
>>> output
[tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])] # Rank 0
[tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])] # Rank 1
[tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])] # Rank 2
[tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])] # Rank 3
>>> # 另一个示例,使用 torch.cfloat 类型的张量。
>>> input = torch.tensor(
[..., [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j], dtype=torch.cfloat]
... ) + 4 * rank * (1 + 1j)
>>> input = list(input.chunk(4))
>>> input
[tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])] # Rank 0
[tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])] # Rank 1
[tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])] # Rank 2
[tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])] # Rank 3
>>> output = list(torch.empty([4], dtype=torch.int64).chunk(4))
>>> dist.all_to_all(output, input)
>>> output
[tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])] # Rank 0
[tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])] # Rank 1
[tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])] # Rank 2
[tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])] # Rank 3
"""
如果 _rank_not_in_group(
组):
_warn_not_in_group(全对全)
return
选项 =
AllToAll 选项()
`_check_tensor_list`(
输出张量列表,
输出张量列表)
`_check_tensor_list`(
输入张量列表,
输入张量列表)
确保所有张量具有相同的数据类型(
输出张量列表,
输入张量列表)
输入张量列表 = [
t 如果
不是 t.
是复杂的()
否则
火炬.
真实查看(t)
为 t
在
输入张量列表
]
输出张量列表 = [
t 如果
不是 t.
是复杂的()
否则
火炬.
真实查看(t) for t
在
输出张量列表
]
群组 =
群组
或者
获取默认组()
工作 =
组.alltoall(
输出张量列表,
输入张量列表,
选项)
如果 async_op:
返回
工作
else:
工作.
等待()
[文档]@_异常记录器
def 障碍(
group: Optional[ProcessGroup] = GroupMember.WORLD, 异步操作=False, 设备 ID=None
):
"""
同步所有进程。
此集体块将阻止进程,直到整个组进入此函数,
如果 async_op 为 False,或者如果在 wait() 上调用异步工作处理程序。
Args:
进程组(ProcessGroup,可选):要工作的进程组。如果为 None,
将使用默认进程组。
async_op(布尔值,可选):此操作是否应为异步操作
设备 ID 列表 ([int],可选):设备/GPU ID 列表。
返回:
异步工作句柄,如果 async_op 设置为 True。
如果不是 async_op 或不是组的一部分,则为 None。
.. 注意:: `ProcessGroupNCCL` 现在会阻塞 CPU 线程直到屏障集体操作完成。
"""
if _rank_not_in_group(group):
_warn_not_in_group("barrier")
返回
opts = BarrierOptions()
opts.device = torch.device(_get_object_coll_device(group))
如果 device_ids 不为 None:
如果 device_ids 是列表类型:
opts.device_ids = device_ids
else:
raise TypeError(
"无效的函数参数:device_ids 类型应为 List[int]"
)
group = group or _get_default_group()
work = group.barrier(opts=opts)
if async_op:
返回工作
else:
work.wait()
[文档]
定义
监控屏障(
组:
可选[
流程组] =
组成员.
世界,
超时=
无,
等待所有 rank=
错误,
):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
类似于 `torch.distributed.barrier` 的进程同步,但考虑可配置的超时时间。
能够报告在提供超时时间内未通过此屏障的等级。
特别是对于非零等级,将阻塞直到从等级 0 处理完发送/接收操作。
Rank 0 将阻塞,直到所有来自其他 rank 的发送/接收操作处理完毕,并将报告
失败信息给未能及时响应的 rank。注意,如果一个 rank 没有达到监控屏障(例如由于挂起),则所有其他 rank 都会在监控屏障中失败。
此集体操作将阻塞组中所有进程/rank,直到
所有进程/rank 都完成操作。
整个组成功退出函数,使其对调试很有用
然而,它可能会影响性能,并且仅应
用于调试或需要主机端完整同步点的场景
为了调试目的,可以在该屏障处插入
在应用程序的集体调用检查是否有任何排名
不同步。
.. 注意:: 注意,此集体仅支持 GLOO 后端。
参数:
群组(ProcessGroup,可选):要工作的进程组。如果
如果为 ``None``,将使用默认进程组。
超时(datetime.timedelta,可选):监控屏障的超时时间。
如果为 ``None``,将使用默认进程组超时时间。
wait_all_ranks(布尔值,可选):是否收集所有失败的进程。
默认情况下,这是 ``False``,并且 ``monitored_barrier`` 在 0 级别
将在遇到第一个失败的级别时抛出异常,以失败结束
快速。通过设置 ``wait_all_ranks=True``,``monitored_barrier`` 将
收集所有失败的级别并抛出一个包含信息的错误
关于所有失败的排名。
返回:
``None``。
示例::
>>> # xdoctest: +SKIP("需要进程组初始化")
>>> # 注意:每个进程的进程组初始化被省略。
>>> 导入 torch.distributed 作为 dist
>>> if dist.get_rank() != 1:
>>> dist.monitored_barrier() # 抛出异常,指示 rank 1 未调用 monitored_barrier。
>>> # rank 1 未调用 monitored_barrier。
>>> # 示例:wait_all_ranks=True
>>> 如果 dist.get_rank() 等于 0:
>>> dist.monitored_barrier(wait_all_ranks=True) # 抛出异常
>>> # 表示 rank 1, 2, ... world_size - 1 没有调用
>>> # 监控屏障.
"""
# 在使用组之前需要调用不在组中的 rank,否则
# 会抛出“无效进程组”错误。
如果 _rank_not_in_group(
组):
_warn_not_in_group("monitored_barrier")
返回
如果
获取后端(
组) !=
后端.GLOO:
抛出
值错误(
监控屏障仅对 GLOO 后端进行了实现。)
如果
超时 is
无:
超时 =
_获取默认超时(
获取后端(
组))
如果...否则 isinstance(
超时时间, float):
显然,某些现有的测试用例对于 monitored_barrier 在浮点格式下超时通过?
警告.
警告(
请指定超时参数为 timedelta。
f转换当前值为{
超时时间}
假设它代表秒",
)
超时 =
时间差(
秒数=
超时时间)
_检查有效超时(
超时时间)
要使用的组 =
获取默认组()
如果
群组 is
无
否则
群组
返回
使用组.
监控屏障(
# 类型:忽略[未定义]
超时时间,
等待所有 rank=
等待所有等级
)
定义
创建进程组包装器(
包装的 PG:
火炬._C.
_分布式_c10d.
后端,
存储前缀:
字符串,
店铺:
存储,
排名: int,
世界大小: int,
超时时间:
时间差 =
默认 PG 超时,
):
断言
_GLOO 可用,
"ProcessGroupWrapper 不支持 GLOO 后端。"
# (whc) 这似乎仅适用于 gloo 后端?如果是这样,`default_pg_timeout`是合适的...
为辅助进程组创建一个独立的存储前缀。
前缀 = f"{
PG 包装器存储前缀}:{
存储前缀}"
存储 =
前缀存储(
前缀,
店铺)
辅助 PG =
网格流程组 Gloo(
店铺,
排名,
世界大小,
超时时间=
超时时间)
将底层 pg 包装为 ProcessGroupWrapper。
包裹的页面 =
流程组包装器(
包装的 PG, helper_pg)
返回
包裹的页面
# 确定性散列一系列排名到唯一
# 字符串
定义 _hash_ranks_to_str(
排名:
列表[int])
翻译
字符串:
排名连接:
字符串 =
“_”.
连接(
地图(
字符串,
排名))
# 如果已经存在具有相同排名组合的 PG
unique_str = “_”.
连接
[
排名连接,
字符串(
长度(
_世界.
数据库名称))])
return hashlib.sha1(字节(unique_str,
utf-8), usedforsecurity=
错误).
摘要()
# 计算一个整数颜色,输入一个排名列表
定义
_处理组颜色(
排名:
列表[int])
翻译 int:
# 将列表转换为元组以使其可哈希
排名 =
元组(
排名)
哈希值 =
哈希(
排名)
分割颜色必须为:
非负整数;
与 C 语言的 int 类型兼容,因为我们绑定到后者。
因此,我们将哈希值限制在 c_int 的最大值内。
最大整型 = 2 ** (ctypes.sizeof(ctypes.c_int) * 8 - 1)
颜色 =
绝对值(
哈希值) %
最大整型
返回
颜色
定义 _process_group_name(
排名,
使用哈希名称):
为进程组创建名称。
全局
_世界
如果
使用哈希名称:
pg_name = _hash_ranks_to_str(排名)
else:
pg_name = 字符串(
_世界.
群组计数)
_世界.group_count += 1
# TODO: 为什么只在 else 路径中增加组计数?
return pg_name
定义 _get_backend_from_str(
后端:
可选[
字符串] =
无)
翻译
后端:
默认使用与全局进程组相同的后端
如果未指定后端。
如果
不是
后端:
后端 =
获取后端(
获取默认组())
return 后端(
后端)
定义 _is_safe_to_split()
翻译
布尔:
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
检查在世界上是否可以安全地拆分任何进程组。
这只在默认的 pg 有绑定设备 ID 时才安全,否则
用户必须意识到,在第一个集体发布后,pg 才能分割。
才可以发行。
"""
return 假
如果
获取默认组().
绑定设备 ID is
无
否则
真实
@_time_logger
定义
分割组(
父页面:
可选[
流程组] =
无,
分割等级:
可选[
列表] =
无,
超时:
可选[
时间差] =
无,
pg 选项:
可选[
任何] =
无,
群组描述:
可选[
字符串] =
无,
) 翻译
可选[
流程组
]
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
从给定的父进程组创建一个新的进程组。
警告:这是一个实验性 API,仅支持`NCCL`后端。
其他后端将引发错误。
使用此 API 的用户必须保证父组中的所有 rank 都进入此 API 调用。
子组的分割在父组中所有等级都是相同的。
参数:
parent_pg (ProcessGroup, 可选): 父进程组。如果为 None,则
将使用默认进程组。用户需要保证
父组已完全初始化(例如,通信器已初始化)
split_ranks (list[list[int]]): 分割等级,即等级的列表列表。
用户需要确保分割排名的有效性,以便一个
分割(由一个整数内部列表表示)不与其他分割重叠。
请注意,每个分组的排名是组内排名(而不是全局排名)
在父页。例如,如果父组有 4 个等级,并且 split_ranks 可以为
[[0, 1], [2, 3]]. 注意 [[0,1]] 也是一个有效的分割,在这种情况下,排名 2、3 的返回值将
返回一个非组成员。
timeout (timedelta, 可选): 请参阅 `init_process_group` 了解详细信息及默认值。
pg_options(ProcessGroupOptions,可选):目前仅支持 ProcessGroupNCCLOptions。
在构建特定进程组时需要指定哪些附加选项
特定进程组的构建。例如:``is_high_priority_stream``
可以指定,以便进程组可以拾取高优先级的 CUDA 流。
对于配置 NCCL 的其他可用选项,
请见 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t
group_desc (str, 可选):用于描述进程组的字符串。
返回:
当前排名在 split_ranks 指定的一个 split/subgroup 内时,使用 ProcessGroup;如果当前排名不属于任何 split_ranks,则使用 None。
或 None,如果当前排名不属于任何 split_ranks。
"""
# 检查输入
如果 split_ranks is
无:
提升 ValueError(
"split_ranks 不能为空")
全局
_世界
默认页面 =
获取默认组()
设备 ID =
默认页面.
绑定设备 ID
如果
不是
设备 ID:
提升
运行时错误(
"默认 pg 没有关联设备,拆分任何进程组不安全"
)
_default_backend, 默认商店 =
_世界.pg_map[
默认页面]
全球排名 =
默认页面.
排名()
全球世界大小 =
默认页面.
大小()
如果
不是
父页面:
父级页面 =
默认页面
如果
父级页面
不是
在
_世界.pg_group_ranks:
抛出
值错误(f
"组"{
父页面}
未注册)
父全局到组排名 =
_世界.pg_group_ranks[
父页面]
父组到全局排名 = {
群组排名:
全球排名
for 全球排名,
组排名
在
父级全局到组排名.
项目()
}
如果
全球排名
不是
在
父级全局到组排名:
抛出
值错误(
f全球排名{
全球排名}
不属于父组{
父页面}"
)
父级组排名 =
父级全局到组排名[
全球排名]
父级后端 =
父页面.
获取后端(
火炬.
设备(
cuda))
# 如果父级后端不支持拆分,则抛出错误
当前此 API 仅支持 NCCL 后端
如果 (
不是
父级后端
或者
不是
父后端.
支持拆分
或者
不是 isinstance(
父后端, ProcessGroupNCCL)
):
抛出
运行时错误(
"父进程组没有后端,或者其后端不支持分割"
)
在设置分组描述之前设置颜色或无颜色分割
群组描述 = (
f"{父页面.
群组描述}
分割:{
父后端.
分区数量()}"
如果
群组描述 is
无
否则
群组描述
)
父后端字符串, _ =
_世界.pg_map[
父页面]
与父进程组相同的后端类型
后端 =
后端(
父后端字符串)
backend_config = 后端配置(
后端)
如果
pg 选项 is
不是
无:
断言 isinstance(
PG 选项, ProcessGroupNCCL.
选项), (
期望 pg_options 参数为 ProcessGroupNCCL.Options 类型
)
else:
# 默认的 pg_options 与父进程组相同
pg 选项 =
父后端.
选项
# 此超时默认/验证用于所有 new_groups/new_subgroups 变体
# 可能只是传递了它们的超时值(或 None)
如果
超时 is
无:
超时 =
_获取默认超时(
后端)
_检查有效超时(
超时)
在 split_ranks 中查找我的组等级和我的组本地等级
我的组 =
无
组排名 = -1
为 split_group
在
分割等级:
如果
长度(
分割组) == 0:
提升 ValueError(
"split 组不能为空")
如果
长度(
分割组) >
全球世界大小:
提升 ValueError(
"分割组的数量应小于或等于由 init_process_group 设置的 world_size"
)
如果
长度(
分割组) !=
长度(
设置(
分割组)):
提升 ValueError(
分组不能有重复的排名)
split_group = 排序(
分割组)
如果
父级组排名
在
分割组:
我的组 = split_group
组排名 =
分割组.
索引(
父级组排名)
断开
如果我的排名不属于任何子组,
应该调用 no_color 分割
如果
我的组 is
无
或者
组排名 == -1:
父后端.
执行无颜色分割(
设备 ID)
返回
无
group_name = _process_group_name(我的组,
使用哈希名称=
错误)
我组中的全局排名 = [
父组到全局排名[
排名]
为
排名
在
我的组]
前缀存储 =
前缀存储(f"{
群组名称}
“”,
默认商店)
我们在初始化后注册后端,并在 pg_options 中设置超时。
pg: 进程组 =
流程组(
prefix_store,
群组排名,
长度(
我的组),
)
后端类型 =
流程组.BackendType.NCCL
pg.绑定设备 ID =
设备 ID
pg._设置默认后端(
后端类型)
PG 选项.
_超时 =
超时
PG 选项.
分割从 =
父级后端
PG 选项.
分割颜色 =
_处理组颜色(
我的组)
PG 选项.global_ranks_in_group =
我组中的全局排名
PG 选项.group_name = group_name
backend_class = ProcessGroupNCCL(
prefix_store, 群组排名,
长度(
我的组),
pg 选项
)
后端类.
为组设置序列号()
pg.注册后端(
火炬.
设备(
cuda),
后端类型,
后端类)
将 group_name 和 group_desc 设置为后端
断言 group_name is
不是
无
断言
群组描述 is
不是
无
pg.设置群组名称(
群组名称)
pg.设置群组描述(
群组描述)
在 split_group 中始终积极初始化后端
贪婪后端 = pg.
获取后端(
设备 ID)
贪婪后端.
主动连接单个设备(
设备 ID)
更新全局状态
_世界.pg_map[pg] = (
后端, prefix_store)
_世界.
数据库名称[pg] = group_name
_注册进程组(
群组名称, pg)
_世界.
PostgreSQL 后端配置[pg] =
字符串(
后端配置)
pg_tag = fptd:{
群组名称}"
_世界.
标签转 PostgreSQL.setdefault(pg_tag,
空列表.append(pg)
_世界.
PostgreSQL 转标签[pg] = pg_tag
创建全局排名到组排名的映射
_世界.pg_group_ranks[pg] = {
全球排名:
组排名
为
群组排名,
全球排名
在
列举(
全局排名在我组中)
}
return pg
[文档]@_time_logger
def 创建新组(
排名=
无,
超时=
无,
后端=
无,
PG 选项=
无,
使用本地同步=
错误,
群组描述=
无,
设备 ID:
可选[
火炬.
设备] =
无,
):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
创建一个新的分布式组。
此功能要求主组(即所有)中的所有进程(即所有
分布式作业中的进程进入此函数,即使
如果他们不是该组成员。此外,群组
应在所有进程中以相同的顺序创建。
.. 警告::
安全的并发使用:
当使用具有 ``NCCL`` 后端的多个进程组时,用户
必须确保集体操作的执行顺序在全球范围内保持一致。
排名。
如果一个进程内的多个线程发出集体操作,显式
同步是确保一致顺序的必要条件。
当使用 torch.distributed 通信 API 的异步变体时
返回一个工作对象,并将通信内核
放入单独的 CUDA 流中,允许通信
和计算的并行执行。一旦在一个进程组上
发起了一个或多个异步操作,它们必须通过调用
在使用另一个进程组之前。
查看《同时使用多个 NCCL 通信器 `__》
ia.com/deeplearning/nccl/user-guide/docs/使用/通信器.html#使用
-同时使用多个 NCCL 通信器,更多详情请参阅。
参数:
ranks (list[int]): 群成员的等级列表。如果为 ``None``,则将所有等级设置为。默认为 ``None``。
将所有等级设置为。默认为 ``None``。
timeout (timedelta, 可选): 请参阅 `init_process_group` 了解详细信息及默认值。
后端(str 或 Backend,可选):要使用的后端。根据构建时配置,有效值包括`mpi`、`gloo`、`nccl`、`ucc`或由第三方插件注册的值。
构建时配置,有效值为 ``gloo`` 和 ``nccl``。
默认使用与全局组相同的后端。此字段
应该以小写字符串的形式提供(例如,``"gloo"``),也可以
通过:class:`Backend`属性访问(例如,
``Backend.GLOO``)。如果传入``None``,则后端
对应默认进程组将被使用。默认为
``None``。
pg_options(ProcessGroupOptions,可选):进程组选项
在构建特定进程组时需要指定哪些附加选项
特定过程组的构建。例如,对于 ``nccl``
可以指定 ``is_high_priority_stream`` 以确保
进程组可以拾取高优先级 CUDA 流。对于配置 NCCL 的其他可用选项,
请见 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t
使用本地同步(bool,可选):执行组内本地同步
过程组创建末尾的障碍。这不同
那些非成员等级无需调用 API,也不需要加入屏障。
。
group_desc (str, 可选):用于描述进程组的字符串。
device_id(torch.device,可选):一个单独的、特定的设备。
绑定此进程的,`new_group` 调用将尝试初始化
设备立即提供通信后端,如果此字段给出。
返回:
可以分配给集体通话的分布式组句柄。
如果等级不是 ``等级`` 的一部分,则为 GroupMember.NON_GROUP_MEMBER。
注意:use_local_synchronization 与 MPI 不兼容。
请注意。当 use_local_synchronization=True 时,对于较大的
集群和小型进程组,必须小心,因为它会改变集群行为
作为非成员等级不加入群组屏障()
请注意,将 use_local_synchronization=True 设置为真可能导致每个进程创建时出现死锁。
多个重叠的过程组。为了避免这种情况,请确保所有进程都遵循相同的全局创建顺序。
相同的全局创建顺序。
"""
返回
_带有标签的新组(
排名,
超时,
后端,
PG 选项,
无,
使用本地同步=
使用本地同步,
群组描述=
群组描述,
设备 ID=
设备 ID,
)
def _带有标签的新组(
排名=
无,
超时=
无,
后端=
无,
后端选项=
无,
pg_tag=无,
使用本地同步=
错误,
群组描述=
无,
设备 ID:
可选[
火炬.
设备] =
无,
):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
``new_group`` 的变体,用于暴露标签创建。
注意。该机制是实验性的,并与功能集体努力相关联,请参阅
``torch.distributed._functional_collectives`` 用于了解如何使用它的参考。
"""
全局
_世界
默认页面 =
获取默认组()
如果
设备 ID is
无:
设备 ID =
默认页面.
绑定设备 ID
如果...否则
默认页面.
绑定设备 ID is
不是
无:
断言
设备 ID ==
默认页面.
绑定设备 ID, (
"新 pg 与默认 pg 之间存在不匹配的绑定设备。"
)
默认后端,
默认商店 =
_世界.pg_map[
默认页面]
全球排名 =
默认页面.
排名()
全球世界大小 =
默认页面.
大小()
默认使用与全局进程组相同的后端
如果后端未指定。
如果
不是
后端:
后端 =
默认后端
后端 =
后端(
后端)
# 此超时默认/验证用于所有 new_groups/new_subgroups 变体
# 可能只是传递了它们的超时值(或 None)
如果
超时 is
无:
超时 =
_获取默认超时(
后端)
_检查有效超时(
超时)
如果
使用本地同步:
# MPI 后端没有执行部分同步的方法
如果
后端 ==
后端.MPI:
提升 ValueError(
MPI 后端不支持 use_local_synchronization=True
)
如果
排名 is
不是
无
和
获取排名()
不是
在
排名:
返回
无
检查输入排名
如果
排名 is
不是
无:
排名 =
排序(
排名)
群组世界大小 =
长度(
排名)
如果
群组世界大小 >
全球世界大小:
提升 ValueError(
"新组的全球大小应该小于或等于"
"由 init_process_group 设置的全球大小"
"检查 rank 的合理性"
)
#检查 rank 的合理性
为
排名
在
排名:
如果
排名 < 0
或者
排名
≥
全球世界大小:
提升 ValueError(
"新组的排名应在 "
"由 init_process_group 设置的世界大小 "
)
如果
全球排名
在
排名:
组排名 =
排名.
索引(
全球排名)
else:
组排名 =
无
else:
排名 =
列表(
范围(
全球世界大小))
群组世界大小 =
全球世界大小
组排名 =
全球排名
group_name = _process_group_name(排名,
使用哈希名称=
使用本地同步)
pg, pg 存储 =
新流程组助手(
世界组大小,
群组排名,
排名,
后端,
默认商店,
群组名称,
后端选项=
后端选项,
超时=
超时,
pg_tag=pg_tag,
设备 ID=
设备 ID,
群组描述=
群组描述,
)
创建全局排名到组排名的映射
_世界.pg_group_ranks[pg] = {
全球排名:
组排名
为
群组排名,
全球排名
在
列举(
排名)
}
如果 _is_barrier_after_init() == 1:
# 防止在从该方法返回时出现问题的障碍
处理包括全局变量(如有)在内的进程组
正确地在所有等级上。
# 更新 2023 年 4 月:对于大规模运行,这个障碍(尤其是基于存储的)
# 障碍可能代价高昂且/或无法扩展。此外,在许多情况下,
这些障碍可能是不必要的,因为绿色 CI 已经证明
移除后。已添加环境变量 `TORCH_DIST_INIT_BARRIER`,当设置为 1 时才启用此障碍。
仅在初始化 ProcessGroup 后执行屏障。
日志记录器.
信息(
由于 "Performing barrier after ProcessGroup initialization since "
"TORCH_DIST_INIT_BARRIER = 1"
)
如果
后端 ==
后端.MPI:
MPI 没有存储。
障碍()
else:
障碍存储 =
pg 存储
如果
使用本地同步
否则
默认商店
世界大小 =
长度(
排名)
如果
使用本地同步
否则
获取世界大小()
# 由于 barrier()使用了多个默认设备,这里使用基于 store 的 barrier()。
# 默认设备会导致 NCCL 内部状态混乱。
基于商店的屏障(
全球排名,
障碍存储,
群组名称,
世界大小,
超时
)
返回 pg
def 新子组(
组大小=
无,
群组=
无,
超时=
无,
后端=
无,
PG 选项=
无,
群组描述=
无,
):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
创建大小相等的子组。
默认情况下,它创建机器内部的子组,
其中每个子组都包含机器的所有等级,基于每个机器具有相同数量的设备的假设,
这是一个便利的 API,它调用 `new_group` 来生成多个子组。
这是一个便捷的 API,用于调用`new_group`生成多个子组。
它要求主组中的所有进程(即所有
分布式作业中的进程进入此函数,即使
如果他们不是该组成员。
.. 警告::
如果传入 `group_size`,则世界大小必须能被 `group_size` 整除。
如果没有传入 `group_size`,则认为您正在基于 CUDA 创建组,并通过 CUDA 设备数量确定组大小,如果所有机器的设备数量不同,则节点之间的子组划分将不同,可能会引起意外的行为。因此,如果您正在
基于 CUDA 创建组,并通过 CUDA 设备数量确定组大小,如果所有机器的设备数量不同,则节点之间的子组划分将不同,可能会引起意外的行为。因此,如果您正在
基于 CUDA 创建组,并通过 CUDA 设备数量确定组大小,如果所有机器的设备数量不同,则节点之间的子组划分将不同,可能会引起意外的行为。因此,如果您正在
基于 CUDA 创建组,并通过 CUDA 设备数量确定组大小,如果所有机器的设备数量不同,则节点之间的子组划分将不同,可能会引起意外的行为。因此,如果您正在
创建一个不依赖于 CUDA 的子组(例如 CPU 上的 Gloo),请正确传入`group_size`。
请正确传入`group_size`。
.. 警告::
请参阅关于`安全并发使用`的警告 `new_group` API,以获取关于以安全方式同时使用多个进程组的重要细节。
使用多个进程组的同时进行安全操作。
参数:
group_size (int, 可选): 每个子组的尺寸。如果为 ``None``,则默认子组大小等于每台机器上的设备数量。
默认子组大小等于每台机器上的设备数量。
基于每个机器完全相同的假设
设备数量。默认为 ``None``。
timeout (timedelta, 可选): 请参阅 `init_process_group` 了解详细信息及默认值。
后端(str 或 Backend,可选):要使用的后端。根据构建时配置,有效值包括`mpi`、`gloo`、`nccl`、`ucc`或由第三方插件注册的值。
构建时配置,有效值为 ``gloo`` 和 ``nccl``。
默认使用与全局组相同的后端。此字段
应该以小写字符串的形式提供(例如,``"gloo"``),也可以
通过:class:`Backend`属性访问(例如,
``Backend.GLOO``)。如果传入``None``,则后端
对应默认进程组将被使用。默认为
``None``。
pg_options(ProcessGroupOptions,可选):进程组选项
在构建特定进程组时需要指定哪些附加选项
特定过程组的构建。例如,对于 ``nccl``
可以指定 ``is_high_priority_stream`` 以确保
进程组可以拾取高优先级 CUDA 流。
group_desc (str, optional): 描述组的字符串。每个子组将
继承其分组描述
返回:
包含当前等级的所有子群组以及用于清理的所有子群组。
示例:
>>> # 创建机器内部子组。
>>> # xdoctest: +SKIP("需要进程组初始化")
>>> cur_subgroup, subgroups = dist.new_subgroups()
>>> # 在机器内进行 Allreduce 操作。
>>> rank = dist.get_rank()
>>> tensor = torch.ones(1, device=rank) * rank
>>> dist.all_reduce(tensor, group=cur_subgroup)
>>> tensor
tensor([28]) # 假设每台机器有 8 个 CUDA 设备。28 是 range(8)的和。
>>> # 清理。
>>> for subgroup in subgroups:
>>> dist.destroy_process_group(subgroup)
"""
如果
组大小 is
无:
如果
不是
火炬.cuda.
是否可用():
提升 ValueError(
"默认组大小仅在 CUDA 可用时生效。"
"如果您的子组使用不依赖 CUDA 的后端,"
"请正确传入 'group_size'。"
)
组大小 =
火炬.cuda.
设备数量()
如果
组大小 <= 0:
提升 ValueError(f
"参数 'group_size' ("{
组大小}
) 必须为正数)
世界大小 =
获取世界大小()
如果
世界大小 <
组大小:
提升 ValueError(
f"参数 'group_size' ("{
组大小}
)必须不大于世界大小({
世界大小})"
)
如果
世界大小 %
组大小 != 0:
提升 ValueError(
"世界大小必须能被 '群组大小' 整除")
子组 =
输入文本为空,请提供需要翻译的文本
当前子组 =
无
为
子组 ID
在
范围(
世界大小 //
组大小):
开始排名 =
子组 ID *
组大小
结束排名 =
开始排名 +
组大小
子群排名 =
列表(
范围(
开始排名,
结束排名))
子组 =
创建新组(
排名=
在子组中的排名,
超时=
超时,
后端=
后端,
pg 选项=
PG 选项,
群组描述=
群组描述,
)
子组.append(
子分组)
排名 =
获取排名()
如果
排名
在
在子组中的排名:
当前子组 =
子组
日志记录器.
信息(
排名%s
被分配到子组%s",
排名,
在子组中的排名)
返回
当前子组,
子组
def 新子组按枚举(
ranks_per_subgroup_list,
超时=
无,
后端=
无,
pg 选项=
无,
群组描述=
无,
):
```python
# 假设输入文本为:
input_text = """Immersive Translate"""
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text # 假设翻译结果与原文相同
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
输出:
```
Immersive Translate
```
通过划分全球世界创建子组。
该分区由嵌套的等级列表指定。子组不能有
重叠,并且某些排名可能不需要在任何子组中。
这是一个便捷的 API,用于调用`new_group`生成多个子组。
它要求主组中的所有进程(即所有
分布式作业中的进程进入此函数,即使
如果他们不是该组成员。
.. 警告::
请参阅关于`安全并发使用`的警告 `new_group` API,以获取关于以安全方式同时使用多个进程组的重要细节。
使用多个进程组的同时进行安全操作。
参数:
ranks_per_subgroup_list(列表[列表[int]]):一个嵌套列表,包含子组的秩。
群组成员。
timeout (timedelta, 可选): 请参阅 `init_process_group` 了解详细信息及默认值。
后端(str 或 Backend,可选):要使用的后端。根据构建时配置,有效值包括`mpi`、`gloo`、`nccl`、`ucc`或由第三方插件注册的值。
构建时配置,有效值为 ``gloo`` 和 ``nccl``。
默认使用与全局组相同的后端。此字段
应该以小写字符串的形式提供(例如,``"gloo"``),也可以
通过:class:`Backend`属性访问(例如,
``Backend.GLOO``)。如果传入``None``,则后端
对应默认进程组将被使用。默认为
``None``。
pg_options(ProcessGroupOptions,可选):进程组选项
在构建特定进程组时需要指定哪些附加选项
特定过程组的构建。例如,对于 ``nccl``
可以指定 ``is_high_priority_stream`` 以确保
进程组可以拾取高优先级 CUDA 流。
group_desc (str, optional): 描述组的字符串。每个子组将
继承其 group_desc。
返回:
包含当前等级的所有子群组以及用于清理的所有子群组。
示例:
>>> 创建两个子群组,每个子群组包含 2 个进程。
>>> # xdoctest: +SKIP("需要进程组初始化")
>>> cur_subgroup, subgroups = dist.new_subgroups(ranks=[[0, 2], [1, 3]])
>>> rank = dist.get_rank()
>>> tensor = torch.ones(1, device=rank) * rank
>>> dist.all_reduce(tensor, group=cur_subgroup)
>>> tensor
tensor([2]) # Subgroup 0: ranks 0 and 2
tensor([4]) # Subgroup 1: ranks 1 and 3
"""
如果
排名_子组列表 is
无
或者
长度(ranks_per_subgroup_list) == 0:
提升 ValueError(
"参数 'ranks_per_subgroup_list' 不能为空")
子组 =
输入文本为空,请提供需要翻译的文本
当前子组 =
无
# 从排名到子组的映射创建,以检查是否存在子组重叠。
rank_to_ranks_dict = {} # type: ignore[var-annotated]
为
排名
在 ranks_per_subgroup_list:
子组 =
创建新组(
排名=
排名,
超时=
超时,
后端=
后端,
pg 选项=
pg 选项,
群组描述=
群组描述,
)
子组.append(
子分组)
我的排名 =
获取排名()
为
排名
在
排名:
如果
排名
在 rank_to_ranks_dict:
提升 ValueError(
f排名{
排名}
已出现在子组中{rank_to_ranks_dict[
排名]}
和{
排名}"
)
rank_to_ranks_dict[排名] =
排名
如果
我的排名 ==
排名:
当前子组 =
子组
日志记录器.
信息(
排名%s
被分配到子组%s",
排名,
排名)
返回
当前子组,
子组
def _find_pg_by_ranks_and_tag(标签:
字符串,
排名:
列表[int])
翻译
可选[
流程组
]
如果
长度(
标签) > 0
和
不是
标签.
以...开头("ptd:")
和
不是
标签.
以...开头(
"用户:"):
标签 = f
"用户:"{
标签}"
为
群组
在
_世界.
标签转 PostgreSQL.
获取(
标签, []):
如果
群组.
大小() !=
长度(
排名):
continue
群组等级 =
获取进程组排名(
群组)
好的 =
所有(r
在
群组等级
为 r
在
排名)
如果
良好:
返回
群组
返回
无
def 根据排名和标签查找或创建 PG(
标签:
字符串,
排名:
列表[int
]
步长:
整型
) 翻译
流程组:
断言
长度(
排名) %
步长 == 0, (
f排名长度({
长度(
排名)}
) 必须能被步长整除({
步长})"
)
我的排名 =
获取排名()
我的排名 =
无
如果
步长 ==
长度(
排名):
我的排名 =
排名.
复制()
断言
我的排名
在
我的排名,
"rankset 不包含当前节点"
else:
为 i
在
范围(0,
长度(
排名),
步长):
等级集合 =
排名[i : i +
步长]
如果
我的排名
在
等级集合:
我的排名 =
等级集合
断言
我的排名 is
不是
无,
"rankset 不包含当前节点"
我的排名 =
排序(
我的排名)
pg = _find_pg_by_ranks_and_tag(标签,
我的排名)
如果 pg is
不是
无:
返回 pg
如果
标签 ==
输入文本翻译为简体中文为:"":
提升 ValueError(
无法自动创建带有空标签的 PG)
# TODO 从默认 PG 复制设置和超时
返回
_带有标签的新组(
我的排名, pg_tag=
标签)
def 获取组标签(pg:
流程组)
翻译
字符串:
"返回与 ``pg`` 相关的标签。"
标签 =
_世界.
PostgreSQL 转标签[pg]
标签 =
标签.
去前缀(
"用户:")
返回
标签
def 获取进程组名称(pg:
流程组)
翻译
字符串:
返回
_世界.
数据库名称.
获取(pg,
无)
def _获取进程组存储(pg:
流程组)
翻译
存储:
返回
_世界.pg_map[pg
]
[1]