快捷键

torch.distributed.rpc 的源代码

# mypy: 允许未类型化定义
导入 记录日志
导入 操作系统
导入 线程
导入 警告
来自 collections.abc 导入 生成器
来自 datetime 导入 时间差
来自 urllib.parse 导入 urlparse

导入 火炬
导入 torch.distributed  dist


全部 = ["是否可用"]


记录器 = 记录.获取日志记录器(__name__)


_init_counter = 0
_init_counter_lock = 线程.()


def 是否可用() -> bool:
    返回 有属性(torch._C, "_rpc_init")


如果 是否可用()  不是 torch._C._rpc_init():
    抛出 运行时错误("初始化 torch.distributed.rpc 失败")


如果 是否可用():
    导入 数字

    导入 torch.distributed.autograd  dist_autograd
    来自 torch._C._distributed_c10d 导入 店铺
    来自 torch._C._distributed_rpc 导入 (  # noqa: F401
        清理 Python RPC 处理器,
        默认初始化方法,
        默认工作线程数,
        默认 RPC 超时秒数,
        删除所有用户和未分叉所有者 rrefs,
        销毁 rref 上下文,
        禁用 jit_rref 反序列化,
        禁用服务器进程全局分析器,
        启用 JIT RRef Pickle,
        启用服务器进程全局分析器,
        获取当前 RPC 代理,
        远程调用内置函数,
        远程调用 Python UDF,
        远程调用 TorchScript,
        远程调用 RPC 内置函数,
        _调用 Python UDF RPC,
        _调用 TorchScript RPC,
        _检查当前 RPC 代理是否已设置,
        _重置当前 RPC 代理,
        获取调试信息上下文引用,
        设置并启动 RPC 代理,
        设置分析器节点 ID,
        设置 RPC 超时时间,
        _TensorPipeRpcBackendOptionsBase,
        _UNSET_RPC_TIMEOUT,
        启用 GIL 性能分析,
        获取 RPC 超时时间,
        PyRRef,
        远程性能分析管理器,
        Rpc 代理,
        Rpc 后端选项,
        TensorPipe 代理,
        工作信息,
    )

    来自 . 导入 api, 后端注册, 函数
    来自 .api 导入 *  # 无需检查:F401,F403
    来自 后端注册表 导入 后端类型
    来自 .选项 导入 TensorPipeRpcBackendOptions  # noqa: F401
    来自 .服务器进程全局分析器 导入 _服务器进程全局配置

    集合迭代器: 生成器[元组[存储, int, int] , ]

    全部 += ["初始化 RPC", 后端类型, TensorPipeRpc 后端选项]
    全部 = 全部 + api.全部 + 后端注册表.全部  # noqa: PLE0605

[文档] def 初始化 RPC( 名称, 后端=, 排名=-1, 世界大小=, RPC 后端选项=, ): r"" 初始化 RPC 原语,例如本地 RPC 代理 并且分布式自动微分,这立即使得当前 进程准备发送和接收 RPC。 参数: name (字符串): 该节点的全局唯一名称。(例如,) ``Trainer3``, ``ParameterServer2``, ``Master``, ``Worker1``) 名称只能包含数字、字母、下划线、冒号, 以及斜杠,且长度必须小于 128 个字符。 后端(BackendType,可选):RPC 后端实现类型。 支持的值是 ``BackendType.TENSORPIPE``(默认)。 更多信息请参阅::ref:`rpc-backends`。 rank(整型):该节点的全局唯一 ID/排名。 world_size(整型):组中工作进程的数量。 rpc_backend_options (RpcBackendOptions, 可选): 选项 传递给 RpcAgent 构造函数。它必须是一个特定于代理的 子类::class:`~torch.distributed.rpc.RpcBackendOptions` 包含特定代理的初始化配置。 默认值,对所有代理,将默认超时设置为 60 秒,并执行与底层进程的会合 使用 `init_method = "env://"` 初始化的进程组, 意味着环境变量 `MASTER_ADDR` 和 ``MASTER_PORT`` 需要设置正确。参见 ref:`rpc-backends` 获取更多信息并查找可用的选项 "沉浸式翻译" torch._C._log_api_usage_once("torch.distributed.init_rpc") 如果 后端 不是 不是 isinstance( 后端, 后端注册.后端类型 ): 抛出 类型错误("参数后端必须是 BackendType 的成员") 如果 RPC 后端选项 不是 不是 isinstance( rpc 后端选项, RpcBackendOptions ): 抛出 类型错误( "参数 rpc_backend_options 必须是一个 RpcBackendOptions 实例" ) 尝试从选项中检测后端 如果 后端 rpc 后端选项 不是 : for 候选后端 BackendType: 如果 isinstance( rpc 后端选项, 类型( 后端注册表.构建 RPC 后端选项( 候选后端 ) ), ): 后端 = 候选后端 断开 否则: 抛出 类型错误( f无法推断选项的后端{rpc 后端选项}" ) 忽略类型错误,因为 mypy 无法处理动态生成的类型对象(#4865) 如果 后端 != BackendType.TENSORPIPE: # 类型:忽略[已定义] 日志记录器.警告( "RPC 已初始化,没有指定显式后端,但带有选项" # 类型:忽略[已定义] 对应输入文本翻译为简体中文为: %(后端)s因此,将使用该后端 而不是默认的 BackendType.TENSORPIPE。为了静音此 "警告传递 `backend= "%(backend)s显式地。, {"backend": 后端}, ) 如果 后端 : 后端 = BackendType.TENSORPIPE # 类型:忽略[已定义] 如果 RPC 后端选项 : # 默认构造一组 RPC 后端选项。 RPC 后端选项 = 后端注册.构建 RPC 后端选项( 后端 ) # 创建存储,为静态 RPC 组执行 rendezvous。 如果 不是 世界大小: # 如果在构建时未设置 world_size,且环境变量中也没有设置 # 动态分组设置将创建此商店 存储 = 距离._从选项创建存储(rpc 后端选项, 排名) 否则: # 此 rendezvous 状态有时在所有进程完成握手之前就被销毁 # 为避免该问题,我们将其设置为全局 保持活跃 全局 集合迭代器 集合迭代器 = 距离.预约( RPC 后端选项.初始化方法, 排名=排名, 世界大小=世界大小 ) 店铺, _, _ = 下一(集合迭代器) 使用与 RPC 相同的超时时间。 店铺.设置超时(timedelta(=RPC 后端选项.RPC 超时)) 使用 PrefixStore 来区分多个调用。 _初始化计数器锁: 全局 _初始化计数器 存储 = 距离.前缀存储(str(f"rpc 前缀_"{_初始化计数器}"), 店铺) 初始化计数器 += 1 在 RPC 之前初始化 autograd,因为_init_rpc_backend 保证了所有 # processes 通过 store 同步。如果我们初始化 autograd 在 RPC 之后, # 可能会出现竞态条件,有些节点可能已经初始化了 autograd # 其他节点可能尚未初始化。 # torch.distributed.autograd.backward() 可能会运行出错,因为 # 其他节点可能没有初始化。 分布式自动微分._初始化(排名) 设置分析器节点 ID(排名) 初始化 RPC。 初始化 RPC 后端(后端, 店铺, 名称, 排名, 世界大小, RPC 后端选项)
def 验证 RPC 参数(后端, 店铺, 名称, 排名, 世界大小, RPC 后端选项): 类型映射 = { 后端: 后端注册表.BackendType, 店铺: 距离.存储, 名称: str, 排名: 数字.积分, # world_size 可以是 None 以实现动态组 世界大小: (数字.积分, 类型()), rpc 后端选项: RpcBackendOptions, } for arg, 参数类型 类型映射.项目(): 如果 不是 isinstance(arg, 参考类型): # type: ignore[arg-type] 抛出 运行时错误( f"论点"{arg}必须为类型{参考类型}但得到类型{类型(arg)}" ) def _初始化 RPC 后端( 后端=BackendType.TENSORPIPE, # 类型:忽略[已定义] 店铺=, 名称=, 排名=-1, 世界大小=, rpc 后端选项=, ): _验证 rpc 参数(后端, 店铺, 名称, 排名, 世界大小, rpc 后端选项) 如果 当前 RPC 代理是否已设置(): 抛出 运行时错误(RPC 已初始化) 初始化 RPC。 rpc 代理 = 后端注册.初始化后端( 后端, 店铺=店铺, 名称=名称, 排名=排名, 世界大小=世界大小, RPC 后端选项=RPC 后端选项, ) api.初始化 RPC 状态(RPC 代理) @api.需要初始化 def _获取调试信息(): 信息 = 获取调试信息上下文() 信息.更新(api.获取当前 RPC 代理().获取调试信息()) 信息.更新(分布式自动微分._获取调试信息()) 返回 信息

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源