快捷键

torch.distributed.rpc.api 源代码

# mypy: 允许未类型化装饰器
# mypy: 允许未类型化定义

导入 集合
导入 contextlib
导入 functools
导入 检查
导入 记录日志
导入 线程
来自 打字 导入 任何, 通用, 类型检查, 类型变量

导入 火炬
来自 torch._C._distributed_rpc 导入 (
    清理 Python RPC 处理器,
    删除所有用户和未分叉所有者 rrefs,
    销毁 rref 上下文,
    获取当前 RPC 代理,
    远程调用内置函数,
    远程调用 Python UDF,
    远程调用 TorchScript,
    远程调用 RPC 内置函数,
    _调用 Python UDF RPC,
    _调用 TorchScript RPC,
    当前 RPC 代理是否已设置,
    _重置当前 RPC 代理,
    设置并启动 RPC 代理,
    获取 RPC 超时时间,
    PyRRef,
    远程性能分析管理器,
    TensorPipe 代理,
    工作信息,
)
来自 torch.futures 导入 未来

来自 _utils 导入 _组成员管理, 更新群组成员资格
来自 .常量 导入 默认关机超时, 未设置 RPC 超时
来自 .内部 导入 (
    构建 RPC 性能分析键,
    内部 RPC 序列化器,
    Python 用户定义函数,
    RPC 执行模式,
)


全部 = [
    关闭,
    获取工作信息,
    远程,
    RPC 同步,
    "rpc 异步",
    "RRef",
    "AllGatherStates",
    "method_factory",
    "新方法",
]


记录器 = 记录.获取日志记录器(__name__)

# NB: 在关闭期间忽略 RRef 泄漏。如果不这样做,应用程序必须
# 确保应用程序代码中没有对任何 RRef 的引用,并且
# Python GC 已经完成了其工作,删除了这些 RRef。这可能会导致不良
调试经验,尤其是在大型应用程序中。因此,通过
默认情况下,我们在关闭时将忽略 RRef 泄漏。这通常
# 关闭表示应用程序已完成训练且不再关心
关于状态。
#
启用 RRef 泄露检查,请将此_ignore_rref_leak 设置为 False
_ignore_rref_leak = 真实
_default_pickler = _internal_rpc_pickler


@contextlib.contextmanager
定义 使用 RPC Pickler(rpc_pickler):
    r""
rpc_pickler: (.internal._InternalRPCPickler) 覆盖默认的 RPC Pickler
"文档"
    全局 _default_pickler
    _default_pickler = rpc_pickler
    try:
        产生
    最后:
        _default_pickler = _internal_rpc_pickler


定义 需要初始化(函数):
    @functools.包装(函数)
    定义 包装器(*参数, **kwargs):
        如果  当前 RPC 代理是否已设置():
            raise 运行时错误(
                "RPC 尚未初始化。请调用"
                "torch.distributed.rpc.init_rpc 首先。"
            )
        返回 函数(*参数, **kwargs)

    返回 包装器


 AllGatherStates:
    定义 初始化():
        初始时,每个`gathered_objects`都是一个空的字典。
        领导者工作员被选为排序后的第一个工作员。
        名称列表。每当有工作员进入`_all_gather()`时,它会在领导者上运行`_gather_to_leader()`以添加其自己的名称。
        然后将其自己的名称添加到领导者的名称列表中。
        # 数据对象添加到字典中。领导者还将自己的名字添加到字典中
        # 在调用 `_all_gather()` 时。
        # 一旦 `set(gathered_objects.keys()) == _ALL_WORKER_NAMES`,领导者
        # 将收集到的字典广播到所有跟随者工作者并设置它们的
        # `收集对象`字段和`进行信号`字段。
        .收集对象 = {}
        # 所有工人都在等待这个信号,直到它收到所有收集到的
        # 对象。
        .进程信号 = 线程.活动()


`def _all_gather()` 使用的状态。
`_ALL_WORKER_NAMES` 在初始化 RPC 层时初始化。
_ALL_WORKER_NAMES: 集合[任何] = 集合()
全局收集字典锁 = 线程.RLock()
全局收集序列 ID: 字典[字符串, 整数] = {}
全局收集序列 ID 到状态映射: 集合.defaultdict = 集合.默认字典(
    全局收集状态
)


定义 初始化 RPC 状态(代理):
    工作信息 = 代理.获取工作信息()
    全局 所有工人名称
    所有工人名称 = {工人信息.名称  工人信息  工人信息列表}

    # 注意:后端实现可能已经设置了 rpc_agent。
    如果  当前 RPC 代理是否已设置():
        设置并启动 RPC 代理(代理)


定义 聚集到领导者(序列 ID, 工作者名称, 对象, 工作者名称列表=):
     全局收集字典锁:
        如果  工作者名称:
            工作者名称 = _所有工作者名称_
            断言 工人名称  工人名称列表, (
                f"{工人名称}这不是领导者所期望的。
            )
         = _all_gather_sequence_id_to_states[sequence_id]
        断言 worker_name   .收集的对象, (
            f"{工人姓名}报告的意图序列 ID{序列 ID}两次。
        )
        状态.收集的对象[工人名称] = 对象
        如果 工人名称列表 == 集合(.收集到的物品.()):
            .进行信号.集合()


定义 向关注者广播(序列 ID, 对象映射):
     _all_gather_dict_lock:
        状态 = _all_gather_sequence_id_to_states[序列 ID]

    断言  状态.进行信号.已设置(), (
        f"终止信号序列 ID"{序列 ID}被设置过两次。
    )
    状态.收集的物品 = 物品地图
    状态.进行信号.集合()


_thread_local_var = 线程.local()


@contextlib.contextmanager
定义 _wait_all():
    r""
一个收集由 `rpc_async` 返回的所有 futures 的上下文管理器,
在上下文管理器退出时等待它们;减轻用户需要
明确调用 wait。


示例::
        >>> # xdoctest: +SKIP("distributed")
>>> # 在工作进程 0 上:
>>> 导入 torch
>>> 导入 torch.distributed.rpc 作为 rpc
        >>> rpc.init_rpc("worker0", rank=0, world_size=2)
        >>> with rpc._wait_all():
        >>>    fut_1 = rpc.rpc_async(dst, torch.add, (torch.ones(2, 2), 1))
        >>>    fut_2 = rpc.rpc_async(dst, torch.add, (torch.ones(2, 2), 1))
>>> #fut_1 和 fut_2 等待
"文档"
    _thread_local_var.未来列表 = []
    try:
        产生
    最后:
        try:
            火炬.期货.等待全部(_线程局部变量.未来列表)
        最后:
            删除 _thread_local_var.future_list


@_require_initialized
定义 _all_gather(对象, worker_names=, 超时: 浮点数 = UNSET_RPC_TIMEOUT):
    r""
这与 torch.distributed.all_gather()类似,但使用 RPC。它
选择名字最小的 worker(按字母顺序)作为领导者。
然后所有追随者将他们的数据 ``obj`` 发送到领导者。领导者
已接收全部,将结果广播给所有关注者。
函数会阻塞,直到所有工作者都接收到了收集到的结果。
"文档"
    如果  worker_names:
        断言 _ALL_WORKER_NAMES   , (
            `_ALL_WORKER_NAMES` 未初始化,无法用于 `def _all_gather`。
        )
        worker_names = _ALL_WORKER_NAMES
    leader_name = 最小(工人名称)

    自身名称 = 获取当前 RPC 代理().获取工人信息().名称

     _all_gather_dict_lock:
        连接名称 = 输入文本翻译为简体中文为:"".连接(排序(工作人员名称))
        序列号 = _all_gather_sequence_id.获取(连接名称, 0)
        _all_gather_sequence_id[连接名称] = 序列号 + 1
        序列 ID = 连接名称 + 字符串(序列号)

    是否是领导者 = 领导者姓名 == 自称

    如果 超时 == UNSET_RPC_TIMEOUT:
        # 超时由代理为 RPC 调用指定
        rpc 超时 = 获取 RPC 超时时间()
        信号无超时
        信号超时 = 
    elif 超时 == 默认关闭超时:
        RPC 无超时
        rpc_timeout = 超时
        信号无超时
        信号超时 = 
    else:
        信号和 RPC 超时使用相同的超时
        信号超时 = RPC 超时 = 超时

    # 第一阶段:追随者将其对象发送给领导者
    如果 领导者:
        _聚集到领导者处(序列号, self_name, 对象, 工人名称)
    else:
        RPC 同步(
            领导者名称,
            聚集到领导者,
            参数=(序列 ID, self_name, 对象, 工作者名称),
            超时=RPC 超时,
        )

     _all_gather_dict_lock:
        状态 = _all_gather_sequence_id_to_states[sequence_id]

    # 超时时间由函数参数设置或为 None(表示无限期)
    states.进行信号.等待(超时=信号超时)

    # 第二阶段:领导者将结果广播给所有跟随者
    # 领导者的信号是第一个被解除阻塞的,在收到所有
    # 跟随者的数据对象后。
    如果 领导:
        worker_name_to_response_future_dict = {}
         追随者名称  工人名称 - {领导者名称}:
            fut = rpc 异步(
                追随者名称,
                向关注者广播,
                参数=(序列 ID, 状态.收集到的对象),
                超时=RPC 超时,
            )
            worker_name_to_response_future_dict[follower_name] = fut

        错误 = []
         follower_name, fut  worker_name_to_response_future_dict.项目():
            try:
                fut.等待()
            除了 运行时错误  ext: ex:
                错误.追加((粉丝名称, ext: ex))

        如果 错误:
            raise 运行时错误(
                f粉丝{[e[0]  e  错误]}在_all_gather 中超时
                f之后{RPC 超时:.2f}秒后。第一个异常是{错误[0]
[1]}"
            )

    使用 sequence_id 清理状态
     _all_gather_dict_lock:
        states = _all_gather_sequence_id_to_states.流行(sequence_id)
    返回 .收集的物体


@_require_initialized
定义 _障碍(工人姓名):
    r""
同步本地和远程 RPC 进程。

这将阻塞,直到所有在 worker_names 下指定的本地和远程 RPC 进程
调用此方法以等待所有待处理工作完成。

参数:
worker_names (List[str]): 要同步的工作者集合。

"文档"
    try:
        全局收集(, 集合(工作节点名称))
    除了 运行时错误  ext: ex:
        日志记录器.错误("完成屏障失败,发生错误"%s", ext: ex)


@_require_initialized
定义 等待所有工作进程(超时=默认关闭超时):
    r""
阻塞,直到所有本地和远程 RPC 进程达到此方法并等待
所有未完成的工作完成。每个 RPC 进程都必须调用此方法
在退出前执行方法以进行优雅关闭。这应该被用来
终止 RPC 框架,无法保证 RPC
框架将在此方法返回后工作。
"文档"
    try:
        _all_gather(, 超时=超时)
    除了 运行时错误  ext: ex:
        日志记录器.错误(
            "未能及时响应 '关机进行',出现错误"%s", 示例
        )
        raise 示例


[文档]@_require_initialized 定义 关闭(平滑的=True, 超时=DEFAULT_SHUTDOWN_TIMEOUT): r"" 执行 RPC 代理的关机操作,然后销毁 RPC 代理。 停止本地代理接受未完成的请求,并关闭 关闭 RPC 框架,通过终止所有 RPC 线程。如果 `graceful=True`, 这将阻塞,直到所有本地和远程 RPC 进程都达到此方法 等待所有未完成的工作完成。否则,如果 ``graceful=False``,这是一个本地关闭操作,它不会等待其他 RPC 进程达到此方法。 .. 警告:: 对于由 :class:`~torch.futures.Future` 对象返回的 `torch.distributed.rpc.rpc_async`,`future.wait()` 不应 在调用 `shutdown()` 之后被调用。 参数: 优雅(布尔值):是否进行优雅关闭。如果为 True, 这将等待直到没有挂起的系统消息,针对“UserRRefs”删除它们;2)阻塞,直到所有本地和远程 RPC 进程都达到此方法并等待所有待处理工作完成 消息;2)阻塞,直到所有本地和远程 RPC 进程都达到此方法并等待所有待处理工作完成 此方法并等待所有待处理工作完成 此方法并等待所有待处理工作完成 完成。 示例:: 确保正确设置了 ``MASTER_ADDR`` 和 ``MASTER_PORT``。 在所有工作节点上。请参阅 :meth:`~torch.distributed.init_process_group` API 获取更多详细信息。 例如, export MASTER_ADDR=localhost export MASTER_PORT=5678 然后在两个不同的进程中运行以下代码: >>> # xdoctest: +SKIP >>> # 在工作进程 0 上: >>> 导入 torch >>> 导入 torch.distributed.rpc 作为 rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> # do some work >>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1)) >>> # ready to shutdown >>> rpc.shutdown() >>> # 在工作器 1 上: >>> 导入 torch.distributed.rpc 作为 rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> # 等待工作器 0 完成工作后关闭。 >>> rpc.shutdown() "文档" 如果 平滑: try: 代理 = 获取当前 RPC 代理() 如果 isinstance(代理, TensorPipe 代理) 或者 代理.静态组: _等待所有工作者(超时) 删除所有用户和未分叉所有者 rrefs() 代理.连接(关闭=True, 超时=超时) else: # 这是一个动态组,因此我们需要获取操作令牌 我的工人信息 = 代理.获取工人信息() 我的名字 = 我的工种信息.名称 _群组管理(代理.店铺, 我的名字, 错误): 所有工人信息 = 代理.获取工作者信息() 工作者 所有工人信息: 如果 工人.名称 != 我的名字: RPC 同步( 工人.名称, 更新群组成员资格, 参数=(my_worker_info, [] {}, 错误), ) 代理.连接(关闭=True, 超时=超时) 最后: 如果发生错误,继续完成本地关闭。 _完成关闭() else: _完成关机()
定义 _完成关机(): try: # 当检测到 RRef 泄漏时引发`TORCH_CHECK()`异常。 销毁 rref 上下文(_忽略 rref 泄漏) 最后: 获取当前 RPC 代理().关闭() # 在 shutdown()中清理 Python RPC 处理器,请参阅注释 # 在 Python API 中调用 PythonRpcHandler::cleanup(),因为 cleanup()函数有 Python 依赖,它假设存在 Python 解释器 # cleanup()函数有 Python 依赖,它假设存在 Python 解释器 # 解释器 无论是否引发 RRef 泄漏异常,此清理代码都必须运行,以避免在 Python 3.5 中发生破坏性段错误。 必须运行以避免在 Python 3.5 中发生破坏性段错误。 # # future.wait() 不应在关闭后调用。 # pythonRpcHandler 在关闭()后清理。 # 关闭()后,从 rpc python 调用返回的 python 对象无法解析。 # resolved. 清理 Python RPC 处理器() _重置当前 RPC 代理()
[文档]@_require_initialized def get_worker_info(worker_name=None): r""" 获取指定工作名称的 :class:`~torch.distributed.rpc.WorkerInfo`。 使用此 :class:`~torch.distributed.rpc.WorkerInfo` 以避免在每次调用时传递昂贵的字符串。 每次调用时避免传递昂贵的字符串。 参数: worker_name (str):工作者的字符串名称。如果 ``None``,则返回。 当前工作者的 ID。(默认为 ``None``) 返回: 给定 ``worker_name`` 或 :class:`~torch.distributed.rpc.WorkerInfo` 实例 的 :class:`~torch.distributed.rpc.WorkerInfo` 当前工作进程,如果 `worker_name` 为 `None`。 """ 如果 worker_name 不为 None: 返回 `_get_current_rpc_agent().get_worker_info(worker_name)`。 else: 返回 _get_current_rpc_agent().get_worker_info()
定义
_to_worker_info(): 如果 isinstance(, 工作信息): 返回 to elif isinstance(, (字符串, 整数)): 返回 获取工作者信息() else: raise ValueError(f"无法从名称获取 WorkerInfo"{}") 定义 _rref_typeof_on_owner(rref, 阻塞: 布尔类型 = True): rref 类型 = 类型(rref.本地值()) 如果 阻塞: 返回 rref 类型 else: 将结果包装成完成的 Future。这是为了确保如果 blocking=`False` 如果 # 被指定,无论此调用是否在用户端,我们都返回一个未来对象 # 或所有者。 未来 = 未来[类型] 未来.设置结果(rref 类型) 返回 未来 定义 _rref_typeof_on_user( rref, 超时: 浮点数 = UNSET_RPC_TIMEOUT, 阻塞: 布尔类型 = 真实 ): fut = rpc 异步(rref.拥有者(), _rref_typeof_on_owner, 参数=(rref,), 超时=超时) 如果 阻塞: 返回 fut.等待() else: 返回 fut T = 类型变量(T) 泛型含有一个类型变量 = 通用[T] 如果 类型检查: RRef(PyRRef[T], 通用[T)] 通过 else: try: 将实现类和类型类合并。 RRef(PyRRef, 通用[T)] 通过 除了 类型错误: 类型错误:派生类的元类冲突。 必须是所有基类的元类的(非严格)子类。 Mypy 不理解 __class__(mypy 错误 #4177)。 RRefMeta(PyRRef., 具有一型变量的通用.): # type: ignore[name-defined, misc, valid-type] 通过 将实现类和类型类合并。 # 类期望特定泛型参数的类型(mypy 错误#7791) RRef(PyRRef, 带有一个类型变量的泛型, 元类=RRefMeta): # 类型:忽略[misc, no-redef, valid-type] 通过 从 `PyRRef` 到 `RRef` 安装文档字符串。 # 这是因为 pybind11 生成了参数 `self` 的类型为 `rpc.PyRRef`,所以 `:inherited-members:` 下的 `.. autoclass:: RRef` 不起作用。 `self` 作为类型 `rpc.PyRRef`,所以 `.. autoclass:: RRef` 下的 `:inherited-members:` 不起作用。 在 `.. autoclass:: RRef` 下的 `:inherited-members:` 不起作用。 我们需要执行以下过程以将`rpc.PyRRef`替换为`rpc.RRef`。 # 定义 方法工厂(方法名称, 文档字符串): 定义 方法(, *参数, **kwargs): 返回 getattr(超级(RRef, ), 方法名称)(*参数, **kwargs) 如果 方法.__doc__: 方法.__doc__ = 文档字符串 返回 方法 方法名称, 方法 检查.getmembers(PyRRef): 忽略魔法方法,除了 "__str__"。 如果 方法名称.以...开头(“_”) 方法名 != "__str__": continue 获取 pybind11 生成的文档字符串。 就像, "" to_here(self: torch.distributed.rpc.PyRRef, timeout: float=-1.0) -> object 阻塞调用,从所有者复制 RRef 的值 将本地节点返回。如果当前节点是 owner, 返回对局部值的引用。 "文档" 文档字符串 = getattr(方法, __doc__, ) 断言 文档字符串 , "所有面向用户的方法都应该有文档字符串。" 对 pybind11 生成的文档字符串进行手术。 文档字符串 = 文档字符串.替换( "torch.distributed.rpc.PyRRef", "torch.distributed.rpc.RRef" ) 将用户界面 RRef 方法与修改后的文档字符串关联。 新方法 = 方法工厂(方法名称, 文档字符串) setattr(RRef, 方法名称, 新方法)
[文档]@_require_initialized 定义 远程(, 函数, 参数=, kwargs=, 超时=UNSET_RPC_TIMEOUT): r"" 向工作器“to”远程调用“func”并立即返回结果值的引用。 class:`~torch.distributed.rpc.RRef` 到结果值。 工作器“to”将是返回的 class:`~torch.distributed.rpc.RRef` 的所有者,调用“remote”的工作器是 一个用户。所有者管理其全局引用计数。 `:class:`~torch.distributed.rpc.RRef`,及其所有者 `:class:`~torch.distributed.rpc.RRef` 仅在全局范围内被销毁时 没有关于它的生活参考。 参数: 目标工作者的名称/等级/``WorkerInfo``(str 或 WorkerInfo 或 int) 可调用函数(Callable),例如 Python 可调用函数、内置函数 操作符(例如::meth:`~torch.add`)和注解 TorchScript 函数 args(元组):调用`func`的参数元组。 kwargs(字典):是`func`调用时的关键字参数字典。 调用。 timeout(浮点数,可选):此远程调用的超时时间(秒)。如果未指定,则默认为 None。 这个的创建 在工作器上的 :class:`~torch.distributed.rpc.RRef` ``to`` 在这个工作器上未能成功处理 如果在这个超时时间内未能处理,那么下一次 尝试使用 RRef(例如 ``to_here()``),将引发超时 表示此失败。0 表示 无限超时,即超时错误将 永远不会被提升。如果没有提供,则使用默认值 在初始化期间或使用 ``_set_rpc_timeout`` 设置的值 ``_set_rpc_timeout`` 被使用。 返回: 一个指向结果的 :class:`~torch.distributed.rpc.RRef` 实例 使用阻塞 API::meth:`torch.distributed.rpc.RRef.to_here` 获取结果值。 在本地检索结果值。 ..警告:: ``remote`` API 在通过网络发送之前不会复制参数张量的存储,这可能会由不同的线程完成。 这可能导致在发送参数张量之前,存储它们的 API 不会复制。 根据 RPC 后端类型而定。调用者应确保那些张量的内容在返回 RRef 被所有者确认之前保持不变。 这些张量的内容在返回 RRef 被所有者确认之前保持不变。 可以使用:meth:`torch.distributed.rpc.RRef.confirmed_by_owner` API 来检查。 meth:`torch.distributed.rpc.RRef.confirmed_by_owner` API 进行确认。 ..警告:: 远程 API 的“超时”等错误以尽力而为的方式处理。 这意味着当由“远程”发起的远程调用失败,例如出现超时错误时,我们采取尽力而为的错误处理方式。 这意味着当“远程”调用失败,例如出现超时错误时,我们采取尽力而为的错误处理方法。 这意味着错误会被处理并设置为 在异步基础上对结果 RRef 进行操作。如果在此处理之前应用程序尚未使用该 RRef(例如“to_here”或 fork 调用),则未来对“RRef”的使用将适当地引发错误。 但是,用户应用程序可能会使用该 RRef。 如果在此处理之前应用程序尚未使用该 RRef(例如“to_here”或 fork 调用),则未来对“RRef”的使用将适当地引发错误。 然而,用户应用程序可能会使用该 RRef。 在处理错误之前,先处理 `RRef`。在这种情况下,错误可能不会被 尚未处理,因此被提升。 示例:: 确保 ``MASTER_ADDR`` 和 ``MASTER_PORT`` 设置正确 在两个工作者上。参考::meth:`~torch.distributed.init_process_group` 查看更多详情的 API。例如, 导出 MASTER_ADDR=localhost 导出 MASTER_PORT=5678 然后在两个不同的进程中运行以下代码: >>> # xdoctest: +SKIP >>> # 在工作节点 0 上: >>> 导入 torch >>> 导入 torch.distributed.rpc 作为 rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> x = rref1.to_here() + rref2.to_here() >>> rpc.shutdown() >>> # 在工作器 1 上: >>> 导入 torch.distributed.rpc 作为 rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() 下面是一个使用 RPC 运行 TorchScript 函数的示例。 >>> # 在两个工作者上: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar) >>> # 在工作节点 0 上: >>> 导入 torch.distributed.rpc 作为 rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rref.to_here() >>> rpc.shutdown() >>> # 在 worker 1 上: >>> 导入 torch.distributed.rpc 作为 rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() "文档" 火炬._C._log_api_usage_once(torch.distributed.rpc_remote) qualified_name = 火炬.算子._builtins.内置查找(函数) 目标工作器信息 = 转换为工作器信息() 是否应该进行性能分析 = _get_should_profile() ctx_manager = _enable_rpc_profiler( should_profile, 合法名称, 函数, RPC 执行模式.远程, 目标工作信息 ) 上下文管理器 rf: args = args 如果 args 否则 () kwargs = kwargs 如果 kwargs 否则 {} is_async_exec = 有属性(函数, _wrapped_async_rpc_function) 如果 is_async_exec: 包装 = 函数._wrapped_async_rpc_function 如果 isinstance(包装, 火炬.算子.脚本函数): 函数 = 包装 如果 qualified_name : rref = 远程调用内置函数( 目标工作信息, 合法名称, 超时, *参数, **kwargs ) elif isinstance(函数, 火炬.算子.脚本函数): rref = 远程调用 TorchScript( 目标工作信息.名称, 火炬._jit_internal._qualified_name(函数), 超时, 是否异步执行, *参数, **kwargs, ) else: (序列化的 Python UDF, 张量) = _default_pickler.序列化( PythonUDF(函数, 参数, kwargs) ) rref = 远程调用 Python UDF( dst_worker_info, pickled_python_udf, 张量, 超时, 异步执行 ) #附加性能分析信息 如果 应该进行性能分析: 断言 火炬.自动微分._profiler_enabled() 断言 rf fut = rf.在 future 上调用结束回调(rref.获取未来()) rref.设置未来分析(fut) 返回 rref
定义 调用 RPC( , 函数, RPC 类型, 参数=, kwargs=, RPC 超时: 浮点数 = 设置 RPC 超时 ): 如果 可调用(函数): raise 类型错误(函数应该是可调用的。) qualified_name = 火炬.算子.内置模块.查找内置函数(函数) 目标工作器信息 = 转换为工作器信息() 应该配置文件 = _获取应该配置文件() ctx 管理器 = _启用 RPC 性能分析器( 应该配置, 合法名称, 函数, RPC 类型, 目标工作者信息 ) 上下文管理器 rf: args = args 如果 args 否则 () kwargs = kwargs 如果 kwargs 否则 {} is_async_exec = 有属性(函数, _wrapped_async_rpc_function) 如果 is_async_exec: 包装 = 函数._wrapped_async_rpc_function 如果 isinstance(包装, 火炬.算子.脚本函数): 函数 = 包装 如果 qualified_name : fut = 远程调用 RPC 内置函数( 目标工作信息, 合法名称, RPC 超时, *参数, **kwargs ) elif isinstance(函数, 火炬.算子.脚本函数): fut = _调用 TorchScript RPC( 目标工作信息.名称, 火炬._jit_internal._qualified_name(函数), 参数, kwargs, RPC 超时, 是否异步执行, ) else: (序列化的 Python UDF, 张量) = _default_pickler.序列化( PythonUDF(函数, 参数, kwargs) ) fut = _调用 Python UDF RPC( dst_worker_info, pickled_python_udf, 张量, RPC 超时, 异步执行 ) 如果 应该进行性能分析: 断言 火炬.自动微分._profiler_enabled() 断言 rf # 将性能分析回调计划在 future 完成时运行。 # 这将返回一个当原始 future 完成时以及配置文件回调也完成时的 future # 完成并确保 fut.wait()完成配置文件。这个新的 future 将包含与原始 future 相同的值。 # 以保证 fut.wait()完成配置文件。这个新的 future 将包含与原始 future 相同的值。 # 这个新的 future 将包含与原始 future 相同的值。 fut = rf.在 future 上调用结束回调(fut) 返回 fut
[文档]@_require_initialized 定义 rpc 同步(, 函数, 参数=, kwargs=, 超时: 浮点数 = 未设置 RPC 超时): r"" 向工作器“to”运行函数“func”的阻塞 RPC 调用。 消息在 Python 代码执行过程中并行发送和接收。 此方法线程安全。 参数: to(str 或 WorkerInfo 或 int):目标工作器的名称/排名/WorkerInfo。 可调用对象(Callable):一个可调用的函数,例如 Python 可调用对象、内置函数 操作符(例如::meth:`~torch.add`)和注解的 TorchScript 函数。 args(元组):`func`调用的参数元组。 kwargs(dict):是调用 ``func`` 的关键字参数字典。 调用。 timeout(浮点数,可选):用于此 RPC 的秒数超时。如果 RPC 在此时间内未完成。 时间,表示发生异常的时间 超时将被触发。值为 0 表示无限超时,即超时 错误永远不会引发。如果未提供, 初始化时设置的默认值 或者使用 `__set_rpc_timeout` 进行设置。 返回: 返回使用 `func`、`args` 和 `kwargs` 运行的结果。 示例:: 确保已正确设置 `MASTER_ADDR` 和 `MASTER_PORT`。 在两个工作器上。请参阅 :meth:`~torch.distributed.init_process_group` 相关 API 获取更多详细信息。 例如, 导出 MASTER_ADDR=localhost 导出 MASTER_PORT=5678 然后在两个不同的进程中运行以下代码: >>> # xdoctest: +SKIP >>> # 在 worker 0 上: >>> 导入 torch >>> 导入 torch.distributed.rpc 作为 rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3)) >>> rpc.shutdown() >>> # 在工作进程 1: >>> 导入 torch.distributed.rpc 作为 rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() 下面是一个使用 RPC 运行 TorchScript 函数的示例。 >>> # 在两个工作者上: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar) >>> # 在 worker 0 上: >>> 导入 torch.distributed.rpc 作为 rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rpc.shutdown() >>> # 在工作进程 1: >>> 导入 torch.distributed.rpc 作为 rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() "文档" 火炬._C._log_api_usage_once(torch.distributed.rpc_sync) fut = _invoke_rpc(, 函数, RPC 执行模式.同步, 参数, kwargs, 超时) 返回 fut.等待()
[文档]@_require_initialized 定义 rpc 异步(, 函数, 参数=, kwargs=, 超时=UNSET_RPC_TIMEOUT): r"" 向工作器“to”的非阻塞 RPC 调用运行函数“func”。消息在 Python 代码执行过程中并行发送和接收。 此方法线程安全。此方法将立即返回。 (此处原文为空,故不进行翻译) 可等待的 :class:`~torch.futures.Future` 对象。 参数: 目标工作者的名称/排名/``WorkerInfo``,类型为 (str 或 WorkerInfo 或 int)。 func (Callable):一个可调用的函数,例如 Python 可调用函数或内置函数。 操作符(例如 :meth:`~torch.add`)和注解。 TorchScript 函数。 args(元组):`func`调用的参数元组。 kwargs(字典):`func`调用时的关键字参数字典。 调用。 超时(浮点数,可选):用于此 RPC 的超时时间(秒)。如果 RPC 在此时间内未完成,将引发表示已超时的异常。 超时的异常将被抛出。0 表示不设置超时。 超时(浮点数,可选):用于此 RPC 的超时时间(秒)。如果 表示无限超时,即超时 错误永远不会被抛出。如果没有提供, 则使用初始化期间设置的默认值。 或者使用 ``_set_rpc_timeout`` 设置。 返回: 返回一个 :class:`~torch.futures.Future` 对象,可以等待 在. 完成后,`func` 在 `args` 上的返回值 ``kwargs`` 可以从 :class:`~torch.futures.Future` 中获取 对象 ..警告:: 使用 GPU 张量作为`func`的参数或返回值是不支持的,因为我们不支持通过网络发送 GPU 张量。您需要在使用`func`的参数或返回值之前,显式地将 GPU 张量复制到 CPU。 您需要在使用`func`的参数或返回值之前,显式地将 GPU 张量复制到 CPU。您需要在使用`func`的参数或返回值之前,显式地将 GPU 张量复制到 CPU。 您需要在使用`func`的参数或返回值之前,显式地将 GPU 张量复制到 CPU。您需要在使用`func`的参数或返回值之前,显式地将 GPU 张量复制到 CPU。 您需要在使用`func`的参数或返回值之前,显式地将 GPU 张量复制到 CPU。您需要在使用`func`的参数或返回值之前,显式地将 GPU 张量复制到 CPU。 ..警告:: ``rpc_async`` API 不会复制参数张量的存储直到 将它们发送到线上,这可以通过不同的线程完成 根据 RPC 后端类型。调用者应确保 那些张量的内容保持不变,直到返回 `torch.futures.Future` 完成。 示例:: 确保正确设置了 `MASTER_ADDR` 和 `MASTER_PORT`。 在所有工作节点上。请参阅 :meth:`~torch.distributed.init_process_group` API 获取更多详细信息。 例如, export MASTER_ADDR=localhost export MASTER_PORT=5678 然后在两个不同的进程中运行以下代码: >>> # xdoctest: +SKIP >>> # 在工作进程 0 上: >>> 导入 torch >>> 导入 torch.distributed.rpc 作为 rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3)) >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2)) >>> result = fut1.wait() + fut2.wait() >>> rpc.shutdown() >>> # 在工作进程 1: >>> 导入 torch.distributed.rpc 作为 rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() 下面是一个使用 RPC 运行 TorchScript 函数的示例。 >>> # 在两个工作者上: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar) >>> # 在 worker 0 上: >>> 导入 torch.distributed.rpc 作为 rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3)) >>> ret = fut.wait() >>> rpc.shutdown() >>> # 在工作器 1 上: >>> 导入 torch.distributed.rpc 作为 rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() "文档" 火炬._C._log_api_usage_once(torch.distributed.rpc_async) fut = _invoke_rpc(, 函数, RPC 执行模式.异步, 参数, kwargs, 超时) 如果 有属性(_线程局部变量, future 列表): _thread_local_var.future_list.追加(fut) 返回 fut
定义 _get_should_profile(): # 旧版分析器应启用。不支持与 RPC 分析一起使用 动态分析器 活动分析器类型 = 火炬._C._分析器.活动分析器类型 返回 ( 火炬.自动微分._profiler_enabled() 火炬._C._自动微分._profiler_type() == 活动分析器类型.旧版 # 类型: 忽略[attr-defined] ) 定义 启用 RPC 分析器( 应该配置, 合法名称, 函数, RPC 类型, 目标工作者信息 ): 上下文管理器 = contextlib.无上下文() 如果 应该配置: # 创建基于函数类型的适当字符串表示 # (内置,脚本,Python) 如果 qualified_name : 函数名 = ( 火炬._jit_internal._qualified_name(函数) 如果 isinstance(函数, 火炬.算子.脚本函数) 否则 函数.__qualname__ ) else: 函数名 = qualified_name # 构建 RPC 性能分析键。 rpc 性能键 = 构建 rpc 性能键( rpc 类型, 函数名, 获取工作者信息().名称, 目标工作者信息.名称, ) 远程性能分析管理器.设置当前性能分析键(rpc_profiling_key) # Mypy 不支持在同一块中重新定义不在同一块中的变量(#1174) ctx_manager = 火炬.自动微分.分析器.记录功能(rpc_profiling_key) # 类型:忽略[赋值] 返回 上下文管理器

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源