快捷键

torch.distributed.rpc.backend_registry 源代码

# mypy: 允许未类型化定义


导入 集合
导入 枚举
来自 打字 导入 角色

导入 火炬
导入 torch.distributed  dist

来自 . 导入 api, 常量  rpc_constants
来自 _utils 导入 _群组成员管理, _更新群组成员


全部 = [
    "后端已注册",
    "注册后端",
    "构建 RPC 后端选项",
    "初始化后端",
    "后端值",
    后端类型,
]

后端值 = 集合.命名元组(
    后端值, [构建 RPC 后端选项处理器, 初始化后端处理器]
)


定义 _后端类型表示():
    返回 "BackendType." + .名称


_backend_type_doc = ""
后端枚举类。

PyTorch 自带内置的 ``BackendType.TENSORPIPE`` 后端。
其他可以通过注册使用
`:func:`~torch.distributed.rpc.backend_registry.register_backend` 函数。
""

创建一个具有空成员的枚举类型 `BackendType`。
无法处理函数枚举 API(mypy 错误#9079)
后端类型 = 枚举.枚举(=后端类型, names={})  # 类型:忽略[杂项]
无法分配函数一个方法(mypy 错误#2427)
BackendType.__repr__ = _backend_type_repr  # 类型:忽略[赋值]

如果 BackendType.__doc__:
    BackendType.__doc__ = _backend_type_doc


定义 后端已注册(后端名称):
    ""
检查 backend_name 是否已注册为 RPC 后端。

参数:
backend_name (str):用于标识 RPC 后端的字符串。
返回:
如果后端已通过 ``register_backend`` 注册,则为 True,否则
错误。
"文档"
    返回 后端名称  BackendType.__members__.()


定义 注册后端(
    后端名称, 构建 RPC 后端选项处理器, 初始化后端处理器
):
    注册一个新的 RPC 后端。

参数:
后端名称(str):用于标识处理器的后端字符串。
construct_rpc_backend_options_handler(函数):
当调用 rpc_backend.construct_rpc_backend_options(**dict) 时被调用的处理器。
rpc_backend.construct_rpc_backend_options(**dict) 被调用时触发的处理器。
init_backend_handler(函数):当调用带有后端的 `_init_rpc_backend()` 函数时被调用的处理器
`_init_rpc_backend()` 函数带后端调用时返回代理
这将返回代理
"文档"
    全局 后端类型
    如果 backend_registered(后端名称):
        raise 运行时错误(fRPC 后端{后端名称}已注册)
    创建一个新的枚举类型,`BackendType`,并扩展其成员。
    现有枚举字典 = {成员.名称: 成员.value  成员  BackendType}
    扩展枚举字典 = 字典(
        {
            后端名称: 后端值(
                构建 RPC 后端选项处理器=构建 RPC 后端选项处理器,
                初始化后端处理器=初始化后端处理器,
            )
        },
        **现有枚举字典,
    )
    无法处理函数枚举 API(mypy 错误#9079)
    后端类型 = 枚举.枚举(=后端类型, names=扩展枚举字典)  # 类型:忽略[杂项]
    无法将函数分配为方法(mypy 错误#2427)
    BackendType.__repr__ = _backend_type_repr  # 类型:忽略[赋值]
    如果 BackendType.__doc__:
        BackendType.__doc__ = 后端类型文档
    返回 BackendType[后端名称]


定义 构建 RPC 后端选项(
    后端,
    RPC 超时=RPC 常量.默认 RPC 超时秒数,
    初始化方法=RPC 常量.默认初始化方法,
    **kwargs,
):
    返回 后端..构建 RPC 后端选项处理器(
        RPC 超时, 初始化方法, **kwargs
    )


定义 初始化后端(后端, *参数, **kwargs):
    返回 后端..初始化后端处理器(*参数, **kwargs)


定义 _初始化进程组(店铺, 排名, 世界大小):
    初始化进程组。
    进程组超时 = RPC 常量.默认进程组超时

    我们在这里使用了大量的私有 API,因为`new_group`需要初始化默认组。
    默认组需要被初始化。
     = 距离.网格流程组 Gloo(店铺, 排名, 世界大小, process_group_timeout)

    断言    , "初始化默认进程组失败。"

    如果 (排名 != -1)  (排名 != 群组.排名()):
        raise 运行时错误(f排序参数{排名}不匹配 pg 排序{群组.排名()}")
    如果 (世界大小 != -1)  (世界大小 != 群组.尺寸()):
        raise 运行时错误(
            fworld_size 参数{世界大小}与 pg size 不匹配{群组.尺寸()}"
        )
    返回 


定义 _tensorpipe_construct_rpc_backend_options_handler(
    RPC 超时,
    初始化方法,
    num_worker_threads=rpc 常量.默认工作线程数,
    _传输=,
    _通道=,
    **kwargs,
):
    来自 . 导入 TensorPipeRpcBackendOptions

    返回 TensorPipeRpcBackendOptions(
        RPC 超时=RPC 超时,
        初始化方法=初始化方法,
        num_worker_threads=num_worker_threads,
        _transports=交通方式,
        通道=通道,
    )


定义 tensorpipe 验证设备(设备, 设备数量):
    返回 所有(
        d.类型 == cpu 或者 (d.类型 == "cuda"  0  d.索引 < 设备数量)
         d  设备
    )


检测是否有任何工作进程具有无效的 device_map 配置,并返回
反转设备映射
定义 _tensorpipe 交换并检查所有设备映射(
    我的名字, 我的设备数量, 我的设备地图, 我的设备, 
):
    收集的: 列表[
        元组[字符串, 整数, 字典[字符串, 字典[火炬.设备, 火炬.设备]], 列表[火炬.设备]]
    ] = [输入文本翻译为简体中文为:"", 0, {}, []  _  范围(群组.尺寸]]())
    距离.全局收集对象(
        汇集, (我的名字, 我的设备数量, 我的设备地图, 我的设备), 
    )
    所有名称 = [名称  名称, _, _, _  汇总]
    所有设备数量 = {名称: 计算  名称, 数量, _, _  汇集}
    所有设备图 = {名称: 地图_  名称, _, 地图_, _  汇集}
    所有设备 = {名称: 设备  名称, _, _, 设备  汇集}

    _验证设备映射(所有名称, 所有设备计数, 所有设备映射, 所有设备)

    所有检查通过,构建反向映射并获取由该代理处理的设备列表
    反向设备映射 = _创建反向映射(我的名字, 所有名字, 所有设备映射)
    我的设备 = 创建设备列表(我的设备, 我的设备映射, 反向设备映射)
    返回 翻转设备映射, 我的设备


定义 验证设备映射(
    所有名称, 所有设备数量, 所有设备地图, 所有设备, 是否为静态组=真实
):
     节点  所有名称:
        设备 = 所有设备[node]
        如果 len(集合(设备)) != len(设备):
            raise ValueError(f"节点"{node}设备重复输入文本翻译为简体中文为:\ndevices = {设备}")
        如果  _tensorpipe_validate_devices(设备, 所有设备计数[node)]
            raise ValueError(
                f"节点"{node}具有无效索引的设备输入文本翻译为简体中文为:\n"
                f"设备列表 ={设备}输入文本翻译为简体中文为:\n"
                f"设备数量 ={所有设备数量[node]}"
            )

     源节点  所有名称:
        # 对于动态组(非静态)由于它可能尚未加入,因此不要检查目标节点名称
        如果 是否为静态组   集合(所有设备映射[源节点].()).是否是子集(
            所有名称
        ):
            raise ValueError(
                f"节点"{源节点}其设备映射中存在无效的目标节点名称输入文本翻译为简体中文为:\n"
                f"设备映射 ="{所有设备映射[源节点].()}输入文本翻译为简体中文为:\n"
                f"节点名称 ="{所有名称}"
            )
         目标节点, 地图  所有设备地图[源节点].项目():
            如果 len(集合(地图_.())) != len(地图_):
                raise ValueError(
                    f"节点"{源节点}在其设备映射中存在重复的目标设备 "
                    f"的 "{目标节点}输入文本翻译为简体中文为:\n"
                    f"设备映射 ="{地图_}"
                )
            如果 所有设备[源节点]
                如果  集合(地图_.()).是否是子集(所有设备[源节点)]
                    raise ValueError(
                        f"节点"{源节点}出现意外的源设备 "
                        f在其设备映射中{目标节点}输入文本翻译为简体中文为:\n"
                        f"设备映射 ="{地图_}输入文本翻译为简体中文为:\n"
                        f"设备 ="{所有设备[源节点]}"
                    )
            elif  _tensorpipe 验证设备(
                地图_.(), 所有设备数量[源节点]
            ):
                raise ValueError(
                    f"节点"{源节点}具有无效索引的源设备
                    f在其设备映射中{目标节点}输入文本翻译为简体中文为:\n"
                    f设备映射 ={地图_}输入文本翻译为简体中文为:\n"
                    f"设备数量 ="{所有设备数量[源节点]}"
                )
            如果 所有设备.获取(目标节点, []):
                如果  集合(地图_.()).是否是子集(所有设备[目标节点)]
                    raise ValueError(
                        f"节点"{源节点}在其设备映射中存在意外的目标设备 "
                        f"的 "{目标节点}输入文本翻译为简体中文为:\n"
                        f设备映射 ={地图_}输入文本翻译为简体中文为:\n"
                        f设备 ={所有设备[目标节点]}"
                    )
            elif 目标节点  所有设备数量   _tensorpipe 验证设备(
                地图_.(), 所有设备数量[目标节点]
            ):
                raise ValueError(
                    f"节点"{源节点}具有目标设备,其设备映射中存在无效索引
                    f在其设备映射中为{目标节点}输入文本翻译为简体中文为:\n"
                    f"设备映射 ="{地图_}输入文本翻译为简体中文为:\n"
                    f"设备数量 ="{所有设备数量[目标节点]}"
                )


定义 创建设备列表(我的设备, 我的设备映射, 设备映射反转):
    如果  我的设备:
        设备集: 集合[火炬.设备] = 集合()
         地图  我的设备地图.():
            设备集.更新(地图_.())
         地图  反向设备映射.():
            设备集.更新(地图_.())
        设备集.丢弃(火炬.设备("cpu"))
        我的设备 = 列表(设备集合)
    我的设备 = 排序(我的设备, =lambda d: d.索引)
    返回 我的设备


定义 创建反向映射(我的名字, 所有名字, 所有设备映射):
    反向设备映射: 字典[字符串, 字典[火炬.设备, 火炬.设备]] = {}
     节点  所有名称:
        如果 我的名字  所有设备映射[node]
            反向设备映射[node] = {
                v: k  k, v  所有设备映射[node]
[我的名字].项目()
            }
    返回 反向设备映射


定义 _获取设备信息():
    来自 . 导入 TensorPipe 代理

    代理 = 角色(TensorPipe 代理, api.获取当前 RPC 代理())
    选项 = 代理._获取后端选项()
    设备数量 = 火炬.cuda.设备数量()
    如果 火炬.cuda.是否可用()  选项.设备:
        火炬.cuda.初始化()
    返回 设备数量, 选项.设备映射, 选项.设备


定义 _设置设备和反向设备映射(代理):
    来自 . 导入 TensorPipe 代理

    代理 = 角色(TensorPipe 代理, 代理)
    从本地代理检索组状态
    初始化时,tensorpipe 代理从所有现有工作者检索信息,因此组状态有效
    工作者信息 = 代理.获取工作者信息()
    我的名字 = 我的工作者信息.名称
    所有工作者信息 = 代理.获取工作者信息()
    # 一轮获取所有工作者的 device_maps 并构建反向 device_maps
    所有设备数量, 所有设备映射, 所有设备, 所有名称 = {}, {}, {}, []
     工人信息  所有工人信息:
        工人姓名 = 工人信息.名称
        如果 工人名称 != 我的名字:
            # TODO: 使其为异步?
            设备数量, 设备映射表, 设备 = api.rpc 同步(
                工作名称, _获取设备信息
            )
        else:
            选项 = 代理.获取后端选项()
            设备数量, 设备映射, 设备 = (
                火炬.cuda.设备数量(),
                选项.设备映射表,
                选项.设备,
            )
        所有设备数量[工作人员名称] = 设备数量
        所有设备图[工作名称] = 设备映射
        所有设备[工作名称] = 设备
        所有名称.追加(工人名称)

    验证设备映射(
        所有名称,
        所有设备数量,
        所有设备映射,
        所有设备,
        静态组=错误,
    )
    反向设备映射 = 创建反向映射(我的名字, 所有名字, 所有设备映射)

    向所有工作者(包括自身)执行 RPC 调用,包括新加入的工作者信息和设备映射
     工作者名称  所有名称:
        为每个工作者设置设备列表
        所有设备[工作名称] = _创建设备列表(
            所有设备[工人名称], 所有设备映射[工人名称], 反向设备映射
        )
        api.rpc 同步(
            工作器名称,
            _更新组成员资格,
            参数=(我的工人信息, 所有设备[工作名称], 反向设备映射, True),
        )


定义 _tensorpipe 初始化后端处理器(
    店铺, 名称, 排名, 世界大小, RPC 后端选项
):
    来自 . 导入 TensorPipe 代理, TensorPipeRpcBackendOptions

    如果  isinstance(店铺, 距离.存储):
        raise 类型错误(f"`store` 必须是一个 c10d::Store。{店铺}")

    如果  isinstance(RPC 后端选项, TensorPipeRpcBackendOptions):
        raise 类型错误(
            f"`rpc_backend_options` 必须是一个 `TensorPipeRpcBackendOptions`。{RPC 后端选项}"
        )

    设备数量 = 火炬.cuda.设备数量()

    是静态组 = 真实 如果 世界大小 否则 
    # world_size 被指定,因此这是一个静态组(排名不能加入和离开)
    如果 是静态组:
        # 代理的加入方法需要像屏障一样行为,并执行
        # 需要集体操作,它依赖于进程组,而不是
        在 RPCs 之上重新实现此功能。
         = 初始化进程组(店铺, 排名, 世界大小)

        设备映射反转, 设备 = _tensorpipe 交换并检查所有设备映射(
            名称,
            设备数量,
            RPC 后端选项.设备映射,
            RPC 后端选项.设备,
            群组,
        )

        如果 火炬.cuda.是否可用()  设备:
            初始化 PyTorch CUDA 状态在这里是必要的(例如)
            # CUDACachingAllocator). 如果缺少此内容,我们可能会遇到错误
            "分配器未初始化",因为其他进程可能会发送
            # CUDA 相关的 RPC 请求在此进程中的用户代码之前
            # 处理初始化其 PyTorch CUDA 状态。
            火炬.cuda.初始化()

        在所有进程中添加 try-except 和在失败时销毁 _agent。
        代理 = TensorPipe 代理(
            店铺,
            名称,
            排名,
            世界大小,
            RPC 后端选项,
            反向设备映射,
            设备,
        )

        api.初始化 RPC 状态(代理)

        # 运行一次模拟的 RPC 轮次以初始化通道/传输。如果没有
        # 这,当没有其他 RPC 时,在 rpc.shutdown()中很容易遇到超时
        在 rpc.shutdown()之前处理该过程,因为代理初始化可以
        # 耗时超过 5 秒。
        api._all_gather(, 超时=RPC 后端选项.RPC 超时)
        # 需要一个屏障以确保在 rank0 完成之前没有其他节点离开
        # 全局聚合
        群组.障碍().等待()

        返回 代理
    # 动态 RPC 的初始化(节点可以加入和离开)
    else:
         _群组成员管理(店铺, 名称, True):
            构建带有空 reverse_device_map 和设备的 TPAgent
            这些属性将在初始化后更新
            代理 = TensorPipe 代理(
                店铺,
                名称,
                排名,
                世界大小,
                RPC 后端选项,
                {},
                []
            )
            api.初始化 RPC 状态(agent)

            try:
                # 通知本组所有工人该 rank 已加入并设置设备和反向设备映射
                # 这是一个同步操作,一旦所有现有 rank 更新完成即完成
                _设置设备和反向设备映射(代理)
            除了 异常:
                api.关闭()
                raise
            返回 代理


注册后端(
    TENSORPIPE,
    _tensorpipe 构造 rpc 后端选项处理程序,
    _tensorpipe_init_backend_handler,
)

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源