# mypy: 允许未类型化定义
导入
集合
导入
枚举
来自
打字
导入
角色
导入
火炬
导入 torch.distributed
是 dist
来自 .
导入 api,
常量
是 rpc_constants
来自
_utils
导入
_群组成员管理,
_更新群组成员
全部 = [
"后端已注册",
"注册后端",
"构建 RPC 后端选项",
"初始化后端",
"后端值",
后端类型,
]
后端值 =
集合.
命名元组(
后端值, [
构建 RPC 后端选项处理器,
初始化后端处理器]
)
定义
_后端类型表示(
我):
返回 "BackendType." +
我.
名称
_backend_type_doc = ""
后端枚举类。
PyTorch 自带内置的 ``BackendType.TENSORPIPE`` 后端。
其他可以通过注册使用
`:func:`~torch.distributed.rpc.backend_registry.register_backend` 函数。
""
创建一个具有空成员的枚举类型 `BackendType`。
无法处理函数枚举 API(mypy 错误#9079)
后端类型 =
枚举.
枚举(
值=
后端类型, names={})
# 类型:忽略[杂项]
无法分配函数一个方法(mypy 错误#2427)
BackendType.__repr__ = _backend_type_repr # 类型:忽略[赋值]
如果 BackendType.__doc__:
BackendType.__doc__ = _backend_type_doc
定义
后端已注册(
后端名称):
""
检查 backend_name 是否已注册为 RPC 后端。
参数:
backend_name (str):用于标识 RPC 后端的字符串。
返回:
如果后端已通过 ``register_backend`` 注册,则为 True,否则
错误。
"文档"
返回
后端名称
在 BackendType.__members__.
键()
定义
注册后端(
后端名称,
构建 RPC 后端选项处理器,
初始化后端处理器
):
注册一个新的 RPC 后端。
参数:
后端名称(str):用于标识处理器的后端字符串。
construct_rpc_backend_options_handler(函数):
当调用 rpc_backend.construct_rpc_backend_options(**dict) 时被调用的处理器。
rpc_backend.construct_rpc_backend_options(**dict) 被调用时触发的处理器。
init_backend_handler(函数):当调用带有后端的 `_init_rpc_backend()` 函数时被调用的处理器
`_init_rpc_backend()` 函数带后端调用时返回代理
这将返回代理
"文档"
全局
后端类型
如果 backend_registered(
后端名称):
raise 运行时错误(f
RPC 后端{
后端名称}
已注册)
创建一个新的枚举类型,`BackendType`,并扩展其成员。
现有枚举字典 = {
成员.
名称:
成员.value
为
成员
在 BackendType}
扩展枚举字典 =
字典(
{
后端名称:
后端值(
构建 RPC 后端选项处理器=
构建 RPC 后端选项处理器,
初始化后端处理器=
初始化后端处理器,
)
},
**现有枚举字典,
)
无法处理函数枚举 API(mypy 错误#9079)
后端类型 =
枚举.
枚举(
值=
后端类型, names=
扩展枚举字典)
# 类型:忽略[杂项]
无法将函数分配为方法(mypy 错误#2427)
BackendType.__repr__ = _backend_type_repr # 类型:忽略[赋值]
如果 BackendType.__doc__:
BackendType.__doc__ = 后端类型文档
返回 BackendType[
后端名称]
定义
构建 RPC 后端选项(
后端,
RPC 超时=
RPC 常量.
默认 RPC 超时秒数,
初始化方法=
RPC 常量.
默认初始化方法,
**kwargs,
):
返回
后端.
值.
构建 RPC 后端选项处理器(
RPC 超时,
初始化方法, **kwargs
)
定义
初始化后端(
后端, *
参数, **kwargs):
返回
后端.
值.
初始化后端处理器(*
参数, **kwargs)
定义
_初始化进程组(
店铺,
排名,
世界大小):
初始化进程组。
进程组超时 =
RPC 常量.
默认进程组超时
我们在这里使用了大量的私有 API,因为`new_group`需要初始化默认组。
默认组需要被初始化。
组 =
距离.
网格流程组 Gloo(
店铺,
排名,
世界大小, process_group_timeout)
断言
组
是
非
无,
"初始化默认进程组失败。"
如果 (
排名 != -1)
和 (
排名 !=
群组.
排名()):
raise 运行时错误(f
排序参数{
排名}
不匹配 pg 排序{
群组.
排名()}")
如果 (
世界大小 != -1)
和 (
世界大小 !=
群组.
尺寸()):
raise 运行时错误(
fworld_size 参数{
世界大小}
与 pg size 不匹配{
群组.
尺寸()}"
)
返回
组
定义 _tensorpipe_construct_rpc_backend_options_handler(
RPC 超时,
初始化方法,
num_worker_threads=rpc 常量.
默认工作线程数,
_传输=
无,
_通道=
无,
**kwargs,
):
来自 .
导入 TensorPipeRpcBackendOptions
返回 TensorPipeRpcBackendOptions(
RPC 超时=
RPC 超时,
初始化方法=
初始化方法,
num_worker_threads=num_worker_threads,
_transports=交通方式,
通道=
通道,
)
定义
tensorpipe 验证设备(
设备,
设备数量):
返回
所有(
d.类型 ==
cpu
或者 (d.
类型 == "cuda"
和 0
≤ d.
索引 <
设备数量)
为 d
在
设备
)
检测是否有任何工作进程具有无效的 device_map 配置,并返回
反转设备映射
定义
_tensorpipe 交换并检查所有设备映射(
我的名字,
我的设备数量,
我的设备地图,
我的设备,
组
):
收集的:
列表[
元组[
字符串,
整数,
字典[
字符串,
字典[
火炬.
设备,
火炬.
设备]],
列表[
火炬.
设备]]
] = [
输入文本翻译为简体中文为:"", 0, {},
[]
为 _
在
范围(
群组.
尺寸
]]())
距离.
全局收集对象(
汇集, (
我的名字,
我的设备数量,
我的设备地图,
我的设备),
组
)
所有名称 = [
名称
为
名称, _, _, _
在
汇总]
所有设备数量 = {
名称:
计算
为
名称,
数量, _, _
在
汇集}
所有设备图 = {
名称:
地图_
为
名称, _,
地图_, _
在
汇集}
所有设备 = {
名称:
设备
为
名称, _, _,
设备
在
汇集}
_验证设备映射(
所有名称,
所有设备计数,
所有设备映射,
所有设备)
所有检查通过,构建反向映射并获取由该代理处理的设备列表
反向设备映射 =
_创建反向映射(
我的名字,
所有名字,
所有设备映射)
我的设备 =
创建设备列表(
我的设备,
我的设备映射,
反向设备映射)
返回
翻转设备映射,
我的设备
定义
验证设备映射(
所有名称,
所有设备数量,
所有设备地图,
所有设备,
是否为静态组=
真实
):
为
节点
在
所有名称:
设备 =
所有设备[node]
如果 len(
集合(
设备)) != len(
设备):
raise ValueError(f"节点"{node}
设备重复
输入文本翻译为简体中文为:\ndevices = {
设备}")
如果
非 _tensorpipe_validate_devices(
设备,
所有设备计数[node
)]
raise ValueError(
f"节点"{node}
具有无效索引的设备
输入文本翻译为简体中文为:\n"
f"设备列表 ={
设备}
输入文本翻译为简体中文为:\n"
f"设备数量 ={
所有设备数量[node]}"
)
为
源节点
在
所有名称:
# 对于动态组(非静态)由于它可能尚未加入,因此不要检查目标节点名称
如果
是否为静态组
和
非
集合(
所有设备映射[
源节点].
键()).
是否是子集(
所有名称
):
raise ValueError(
f"节点"{
源节点}
其设备映射中存在无效的目标节点名称
输入文本翻译为简体中文为:\n"
f"设备映射 ="{
所有设备映射[
源节点].
键()}
输入文本翻译为简体中文为:\n"
f"节点名称 ="{
所有名称}"
)
为
目标节点,
地图
在
所有设备地图[
源节点].
项目():
如果 len(
集合(
地图_.
值())) != len(
地图_):
raise ValueError(
f"节点"{
源节点}
在其设备映射中存在重复的目标设备 "
f"的 "{
目标节点}
输入文本翻译为简体中文为:\n"
f"设备映射 ="{
地图_}"
)
如果
所有设备[
源节点
]
如果
非
集合(
地图_.
键()).
是否是子集(
所有设备[
源节点
)]
raise ValueError(
f"节点"{
源节点}
出现意外的源设备 "
f在其设备映射中{
目标节点}
输入文本翻译为简体中文为:\n"
f"设备映射 ="{
地图_}
输入文本翻译为简体中文为:\n"
f"设备 ="{
所有设备[
源节点]}"
)
elif 非
_tensorpipe 验证设备(
地图_.
键(),
所有设备数量[
源节点]
):
raise ValueError(
f"节点"{
源节点}
具有无效索引的源设备
f在其设备映射中{
目标节点}
输入文本翻译为简体中文为:\n"
f设备映射 ={
地图_}
输入文本翻译为简体中文为:\n"
f"设备数量 ="{
所有设备数量[
源节点]}"
)
如果
所有设备.
获取(
目标节点, []):
如果
非
集合(
地图_.
值()).
是否是子集(
所有设备[
目标节点
)]
raise ValueError(
f"节点"{
源节点}
在其设备映射中存在意外的目标设备 "
f"的 "{
目标节点}
输入文本翻译为简体中文为:\n"
f设备映射 ={
地图_}
输入文本翻译为简体中文为:\n"
f设备 ={
所有设备[
目标节点]}"
)
elif 目标节点
在
所有设备数量
和
非
_tensorpipe 验证设备(
地图_.
值(),
所有设备数量[
目标节点]
):
raise ValueError(
f"节点"{
源节点}
具有目标设备,其设备映射中存在无效索引
f在其设备映射中为{
目标节点}
输入文本翻译为简体中文为:\n"
f"设备映射 ="{
地图_}
输入文本翻译为简体中文为:\n"
f"设备数量 ="{
所有设备数量[
目标节点]}"
)
定义
创建设备列表(
我的设备,
我的设备映射,
设备映射反转):
如果
非
我的设备:
设备集:
集合[
火炬.
设备] =
集合()
为
地图
在
我的设备地图.
值():
设备集.
更新(
地图_.
键())
为
地图
在
反向设备映射.
值():
设备集.
更新(
地图_.
键())
设备集.
丢弃(
火炬.
设备("cpu"))
我的设备 =
列表(
设备集合)
我的设备 =
排序(
我的设备,
键=lambda d: d.
索引)
返回
我的设备
定义
创建反向映射(
我的名字,
所有名字,
所有设备映射):
反向设备映射:
字典[
字符串,
字典[
火炬.
设备,
火炬.
设备]] = {}
为
节点
在
所有名称:
如果
我的名字
在
所有设备映射[node
]
反向设备映射[node] = {
v: k 为 k, v
在
所有设备映射[node
]
[我的名字].
项目()
}
返回
反向设备映射
定义
_获取设备信息():
来自 .
导入
TensorPipe 代理
代理 =
角色(
TensorPipe 代理, api.
获取当前 RPC 代理())
选项 =
代理.
_获取后端选项()
设备数量 =
火炬.cuda.
设备数量()
如果
火炬.cuda.
是否可用()
和
选项.
设备:
火炬.cuda.
初始化()
返回
设备数量,
选项.
设备映射,
选项.
设备
定义
_设置设备和反向设备映射(
代理):
来自 .
导入
TensorPipe 代理
代理 =
角色(
TensorPipe 代理,
代理)
从本地代理检索组状态
初始化时,tensorpipe 代理从所有现有工作者检索信息,因此组状态有效
工作者信息 =
代理.
获取工作者信息()
我的名字 =
我的工作者信息.
名称
所有工作者信息 =
代理.
获取工作者信息()
# 一轮获取所有工作者的 device_maps 并构建反向 device_maps
所有设备数量,
所有设备映射,
所有设备,
所有名称 = {}, {}, {}, []
为
工人信息
在
所有工人信息:
工人姓名 =
工人信息.
名称
如果
工人名称 !=
我的名字:
# TODO: 使其为异步?
设备数量,
设备映射表,
设备 = api.
rpc 同步(
工作名称,
_获取设备信息
)
else:
选项 =
代理.
获取后端选项()
设备数量,
设备映射,
设备 = (
火炬.cuda.
设备数量(),
选项.
设备映射表,
选项.
设备,
)
所有设备数量[
工作人员名称] =
设备数量
所有设备图[
工作名称] =
设备映射
所有设备[
工作名称] =
设备
所有名称.
追加(
工人名称)
验证设备映射(
所有名称,
所有设备数量,
所有设备映射,
所有设备,
静态组=
错误,
)
反向设备映射 =
创建反向映射(
我的名字,
所有名字,
所有设备映射)
向所有工作者(包括自身)执行 RPC 调用,包括新加入的工作者信息和设备映射
为
工作者名称
在
所有名称:
为每个工作者设置设备列表
所有设备[
工作名称] =
_创建设备列表(
所有设备[
工人名称
],
所有设备映射[
工人名称
],
反向设备映射
)
api.rpc 同步(
工作器名称,
_更新组成员资格,
参数=(
我的工人信息,
所有设备[
工作名称
],
反向设备映射, True),
)
定义
_tensorpipe 初始化后端处理器(
店铺,
名称,
排名,
世界大小,
RPC 后端选项
):
来自 .
导入
TensorPipe 代理, TensorPipeRpcBackendOptions
如果
非 isinstance(
店铺,
距离.
存储):
raise 类型错误(f
"`store` 必须是一个 c10d::Store。{
店铺}")
如果
非 isinstance(
RPC 后端选项, TensorPipeRpcBackendOptions):
raise 类型错误(
f"`rpc_backend_options` 必须是一个 `TensorPipeRpcBackendOptions`。{
RPC 后端选项}"
)
设备数量 =
火炬.cuda.
设备数量()
是静态组 =
真实
如果
世界大小
否则
假
# world_size 被指定,因此这是一个静态组(排名不能加入和离开)
如果
是静态组:
# 代理的加入方法需要像屏障一样行为,并执行
# 需要集体操作,它依赖于进程组,而不是
在 RPCs 之上重新实现此功能。
组 =
初始化进程组(
店铺,
排名,
世界大小)
设备映射反转,
设备 =
_tensorpipe 交换并检查所有设备映射(
名称,
设备数量,
RPC 后端选项.
设备映射,
RPC 后端选项.
设备,
群组,
)
如果
火炬.cuda.
是否可用()
和
设备:
初始化 PyTorch CUDA 状态在这里是必要的(例如)
# CUDACachingAllocator). 如果缺少此内容,我们可能会遇到错误
"分配器未初始化",因为其他进程可能会发送
# CUDA 相关的 RPC 请求在此进程中的用户代码之前
# 处理初始化其 PyTorch CUDA 状态。
火炬.cuda.
初始化()
在所有进程中添加 try-except 和在失败时销毁 _agent。
代理 =
TensorPipe 代理(
店铺,
名称,
排名,
世界大小,
RPC 后端选项,
反向设备映射,
设备,
)
api.初始化 RPC 状态(
代理)
# 运行一次模拟的 RPC 轮次以初始化通道/传输。如果没有
# 这,当没有其他 RPC 时,在 rpc.shutdown()中很容易遇到超时
在 rpc.shutdown()之前处理该过程,因为代理初始化可以
# 耗时超过 5 秒。
api._all_gather(无,
超时=
RPC 后端选项.
RPC 超时)
# 需要一个屏障以确保在 rank0 完成之前没有其他节点离开
# 全局聚合
群组.
障碍().
等待()
返回
代理
# 动态 RPC 的初始化(节点可以加入和离开)
else:
与
_群组成员管理(
店铺,
名称, True):
构建带有空 reverse_device_map 和设备的 TPAgent
这些属性将在初始化后更新
代理 =
TensorPipe 代理(
店铺,
名称,
排名,
世界大小,
RPC 后端选项,
{},
[]
)
api.初始化 RPC 状态(agent)
try:
# 通知本组所有工人该 rank 已加入并设置设备和反向设备映射
# 这是一个同步操作,一旦所有现有 rank 更新完成即完成
_设置设备和反向设备映射(
代理)
除了
异常:
api.关闭()
raise
返回
代理
注册后端(
TENSORPIPE,
_tensorpipe 构造 rpc 后端选项处理程序,
_tensorpipe_init_backend_handler,
)