# mypy: 允许未类型化定义
导入
记录日志
导入
操作系统
导入
线程
导入
警告
来自 collections.abc
导入
生成器
来自 datetime
导入
时间差
来自 urllib.parse
导入 urlparse
导入
火炬
导入 torch.distributed
是 dist
全部 = [
"是否可用"]
记录器 =
记录.
获取日志记录器(__name__)
_init_counter = 0
_init_counter_lock = 线程.
锁()
def 是否可用() -> bool:
返回
有属性(torch._C, "_rpc_init")
如果
是否可用()
和
不是 torch._C._rpc_init():
抛出
运行时错误(
"初始化 torch.distributed.rpc 失败")
如果
是否可用():
导入
数字
导入 torch.distributed.autograd
是 dist_autograd
来自 torch._C._distributed_c10d
导入
店铺
来自 torch._C._distributed_rpc
导入 ( # noqa: F401
清理 Python RPC 处理器,
默认初始化方法,
默认工作线程数,
默认 RPC 超时秒数,
删除所有用户和未分叉所有者 rrefs,
销毁 rref 上下文,
禁用 jit_rref 反序列化,
禁用服务器进程全局分析器,
启用 JIT RRef Pickle,
启用服务器进程全局分析器,
获取当前 RPC 代理,
远程调用内置函数,
远程调用 Python UDF,
远程调用 TorchScript,
远程调用 RPC 内置函数,
_调用 Python UDF RPC,
_调用 TorchScript RPC,
_检查当前 RPC 代理是否已设置,
_重置当前 RPC 代理,
获取调试信息上下文引用,
设置并启动 RPC 代理,
设置分析器节点 ID,
设置 RPC 超时时间,
_TensorPipeRpcBackendOptionsBase,
_UNSET_RPC_TIMEOUT,
启用 GIL 性能分析,
获取 RPC 超时时间,
PyRRef,
远程性能分析管理器,
Rpc 代理,
Rpc 后端选项,
TensorPipe 代理,
工作信息,
)
来自 .
导入 api,
后端注册,
函数
来自 .api
导入 *
# 无需检查:F401,F403
来自
后端注册表
导入
后端类型
来自
.选项
导入 TensorPipeRpcBackendOptions # noqa: F401
来自
.服务器进程全局分析器
导入
_服务器进程全局配置
集合迭代器:
生成器[
元组[
存储, int, int
]
无,
无]
全部 += [
"初始化 RPC",
后端类型,
TensorPipeRpc 后端选项]
全部 =
全部 + api.
全部 +
后端注册表.
全部 # noqa: PLE0605
[文档] def
初始化 RPC(
名称,
后端=
无,
排名=-1,
世界大小=
无,
RPC 后端选项=
无,
):
r""
初始化 RPC 原语,例如本地 RPC 代理
并且分布式自动微分,这立即使得当前
进程准备发送和接收 RPC。
参数:
name (字符串): 该节点的全局唯一名称。(例如,)
``Trainer3``, ``ParameterServer2``, ``Master``, ``Worker1``)
名称只能包含数字、字母、下划线、冒号,
以及斜杠,且长度必须小于 128 个字符。
后端(BackendType,可选):RPC 后端实现类型。
支持的值是
``BackendType.TENSORPIPE``(默认)。
更多信息请参阅::ref:`rpc-backends`。
rank(整型):该节点的全局唯一 ID/排名。
world_size(整型):组中工作进程的数量。
rpc_backend_options (RpcBackendOptions, 可选): 选项
传递给 RpcAgent 构造函数。它必须是一个特定于代理的
子类::class:`~torch.distributed.rpc.RpcBackendOptions`
包含特定代理的初始化配置。
默认值,对所有代理,将默认超时设置为 60
秒,并执行与底层进程的会合
使用 `init_method = "env://"` 初始化的进程组,
意味着环境变量 `MASTER_ADDR` 和
``MASTER_PORT`` 需要设置正确。参见
ref:`rpc-backends` 获取更多信息并查找可用的选项
。
"沉浸式翻译"
torch._C._log_api_usage_once("torch.distributed.init_rpc")
如果
后端
是
不是
无
和
不是 isinstance(
后端,
后端注册.
后端类型
):
抛出
类型错误(
"参数后端必须是 BackendType 的成员")
如果
RPC 后端选项
是
不是
无
和
不是 isinstance(
rpc 后端选项, RpcBackendOptions
):
抛出
类型错误(
"参数 rpc_backend_options 必须是一个 RpcBackendOptions 实例"
)
尝试从选项中检测后端
如果
后端
是
无
和
rpc 后端选项
是
不是
无:
for 候选后端
在 BackendType:
如果 isinstance(
rpc 后端选项,
类型(
后端注册表.
构建 RPC 后端选项(
候选后端
)
),
):
后端 =
候选后端
断开
否则:
抛出
类型错误(
f无法推断选项的后端{
rpc 后端选项}"
)
忽略类型错误,因为 mypy 无法处理动态生成的类型对象(#4865)
如果
后端 != BackendType.TENSORPIPE:
# 类型:忽略[已定义]
日志记录器.
警告(
"RPC 已初始化,没有指定显式后端,但带有选项"
# 类型:忽略[已定义]
对应
输入文本翻译为简体中文为:
%(后端)s因此,将使用该后端
而不是默认的 BackendType.TENSORPIPE。为了静音此
"警告传递 `backend= "%(backend)s
显式地。,
{"backend": 后端},
)
如果
后端
是
无:
后端 = BackendType.TENSORPIPE
# 类型:忽略[已定义]
如果
RPC 后端选项
是
无:
# 默认构造一组 RPC 后端选项。
RPC 后端选项 =
后端注册.
构建 RPC 后端选项(
后端
)
# 创建存储,为静态 RPC 组执行 rendezvous。
如果
不是
世界大小:
# 如果在构建时未设置 world_size,且环境变量中也没有设置
# 动态分组设置将创建此商店
存储 =
距离.
_从选项创建存储(
rpc 后端选项,
排名)
否则:
# 此 rendezvous 状态有时在所有进程完成握手之前就被销毁
# 为避免该问题,我们将其设置为全局
保持活跃
全局
集合迭代器
集合迭代器 =
距离.
预约(
RPC 后端选项.
初始化方法,
排名=
排名,
世界大小=
世界大小
)
店铺, _, _ =
下一(
集合迭代器)
使用与 RPC 相同的超时时间。
店铺.
设置超时(timedelta(
秒=
RPC 后端选项.
RPC 超时))
使用 PrefixStore 来区分多个调用。
与
_初始化计数器锁:
全局
_初始化计数器
存储 =
距离.
前缀存储(str(f
"rpc 前缀_"{
_初始化计数器}"),
店铺)
初始化计数器 += 1
在 RPC 之前初始化 autograd,因为_init_rpc_backend 保证了所有
# processes 通过 store 同步。如果我们初始化 autograd 在 RPC 之后,
# 可能会出现竞态条件,有些节点可能已经初始化了 autograd
# 其他节点可能尚未初始化。
# torch.distributed.autograd.backward() 可能会运行出错,因为
# 其他节点可能没有初始化。
分布式自动微分.
_初始化(
排名)
设置分析器节点 ID(
排名)
初始化 RPC。
初始化 RPC 后端(
后端,
店铺,
名称,
排名,
世界大小,
RPC 后端选项)
def 验证 RPC 参数(
后端,
店铺,
名称,
排名,
世界大小,
RPC 后端选项):
类型映射 = {
后端:
后端注册表.BackendType,
店铺:
距离.
存储,
名称: str,
排名:
数字.
积分,
# world_size 可以是 None 以实现动态组
世界大小: (
数字.
积分,
类型(
无)),
rpc 后端选项: RpcBackendOptions,
}
for arg, 参数类型
在
类型映射.
项目():
如果
不是 isinstance(arg,
参考类型): # type: ignore[arg-type]
抛出
运行时错误(
f"论点"{arg}
必须为类型{
参考类型}
但得到类型{
类型(arg)}"
)
def _初始化 RPC 后端(
后端=BackendType.TENSORPIPE,
# 类型:忽略[已定义]
店铺=
无,
名称=
无,
排名=-1,
世界大小=
无,
rpc 后端选项=
无,
):
_验证 rpc 参数(
后端,
店铺,
名称,
排名,
世界大小,
rpc 后端选项)
如果
当前 RPC 代理是否已设置():
抛出
运行时错误(
RPC 已初始化)
初始化 RPC。
rpc 代理 =
后端注册.
初始化后端(
后端,
店铺=
店铺,
名称=
名称,
排名=
排名,
世界大小=
世界大小,
RPC 后端选项=
RPC 后端选项,
)
api.初始化 RPC 状态(
RPC 代理)
@api.需要初始化
def _获取调试信息():
信息 =
获取调试信息上下文()
信息.
更新(api.
获取当前 RPC 代理().
获取调试信息())
信息.
更新(
分布式自动微分.
_获取调试信息())
返回
信息