# mypy: 允许未类型化装饰器
# mypy: 允许未类型化定义
导入
集合
导入 contextlib
导入 functools
导入
检查
导入
记录日志
导入
线程
来自
打字
导入
任何,
通用,
类型检查,
类型变量
导入
火炬
来自 torch._C._distributed_rpc
导入 (
清理 Python RPC 处理器,
删除所有用户和未分叉所有者 rrefs,
销毁 rref 上下文,
获取当前 RPC 代理,
远程调用内置函数,
远程调用 Python UDF,
远程调用 TorchScript,
远程调用 RPC 内置函数,
_调用 Python UDF RPC,
_调用 TorchScript RPC,
当前 RPC 代理是否已设置,
_重置当前 RPC 代理,
设置并启动 RPC 代理,
获取 RPC 超时时间,
PyRRef,
远程性能分析管理器,
TensorPipe 代理,
工作信息,
)
来自 torch.futures
导入
未来
来自
_utils
导入
_组成员管理,
更新群组成员资格
来自
.常量
导入
默认关机超时,
未设置 RPC 超时
来自
.内部
导入 (
构建 RPC 性能分析键,
内部 RPC 序列化器,
Python 用户定义函数,
RPC 执行模式,
)
全部 = [
关闭,
获取工作信息,
远程,
RPC 同步,
"rpc 异步",
"RRef",
"AllGatherStates",
"method_factory",
"新方法",
]
记录器 =
记录.
获取日志记录器(__name__)
# NB: 在关闭期间忽略 RRef 泄漏。如果不这样做,应用程序必须
# 确保应用程序代码中没有对任何 RRef 的引用,并且
# Python GC 已经完成了其工作,删除了这些 RRef。这可能会导致不良
调试经验,尤其是在大型应用程序中。因此,通过
默认情况下,我们在关闭时将忽略 RRef 泄漏。这通常
# 关闭表示应用程序已完成训练且不再关心
关于状态。
#
启用 RRef 泄露检查,请将此_ignore_rref_leak 设置为 False
_ignore_rref_leak = 真实
_default_pickler = _internal_rpc_pickler
@contextlib.contextmanager
定义
使用 RPC Pickler(rpc_pickler):
r""
rpc_pickler: (.internal._InternalRPCPickler) 覆盖默认的 RPC Pickler
"文档"
全局 _default_pickler
_default_pickler = rpc_pickler
try:
产生
最后:
_default_pickler = _internal_rpc_pickler
定义
需要初始化(
函数):
@functools.包装(
函数)
定义
包装器(*
参数, **kwargs):
如果
非
当前 RPC 代理是否已设置():
raise 运行时错误(
"RPC 尚未初始化。请调用"
"torch.distributed.rpc.init_rpc 首先。"
)
返回
函数(*
参数, **kwargs)
返回
包装器
类 AllGatherStates:
定义
初始化(
我):
初始时,每个`gathered_objects`都是一个空的字典。
领导者工作员被选为排序后的第一个工作员。
名称列表。每当有工作员进入`_all_gather()`时,它会在领导者上运行`_gather_to_leader()`以添加其自己的名称。
然后将其自己的名称添加到领导者的名称列表中。
# 数据对象添加到字典中。领导者还将自己的名字添加到字典中
# 在调用 `_all_gather()` 时。
# 一旦 `set(gathered_objects.keys()) == _ALL_WORKER_NAMES`,领导者
# 将收集到的字典广播到所有跟随者工作者并设置它们的
# `收集对象`字段和`进行信号`字段。
我.
收集对象 = {}
# 所有工人都在等待这个信号,直到它收到所有收集到的
# 对象。
我.
进程信号 =
线程.
活动()
`def _all_gather()` 使用的状态。
`_ALL_WORKER_NAMES` 在初始化 RPC 层时初始化。
_ALL_WORKER_NAMES: 集合[
任何] =
集合()
全局收集字典锁 =
线程.RLock()
全局收集序列 ID:
字典[
字符串,
整数] = {}
全局收集序列 ID 到状态映射:
集合.defaultdict =
集合.
默认字典(
全局收集状态
)
定义
初始化 RPC 状态(
代理):
工作信息 =
代理.
获取工作信息()
全局
所有工人名称
所有工人名称 = {
工人信息.
名称
为
工人信息
在
工人信息列表}
# 注意:后端实现可能已经设置了 rpc_agent。
如果
非
当前 RPC 代理是否已设置():
设置并启动 RPC 代理(
代理)
定义
聚集到领导者(
序列 ID,
工作者名称,
对象,
工作者名称列表=
无):
与
全局收集字典锁:
如果
非
工作者名称:
工作者名称 =
_所有工作者名称_
断言
工人名称
在
工人名称列表, (
f"{工人名称}
这不是领导者所期望的。
)
州 = _all_gather_sequence_id_to_states[sequence_id]
断言 worker_name
非
在
州.
收集的对象, (
f"{工人姓名}
报告的意图序列 ID{
序列 ID}
两次。
)
状态.
收集的对象[
工人名称] =
对象
如果
工人名称列表 ==
集合(
州.
收集到的物品.
键()):
州.
进行信号.
集合()
定义
向关注者广播(
序列 ID,
对象映射):
与 _all_gather_dict_lock:
状态 = _all_gather_sequence_id_to_states[
序列 ID]
断言
非
状态.
进行信号.
已设置(), (
f"终止信号序列 ID"{
序列 ID}
被设置过两次。
)
状态.
收集的物品 =
物品地图
状态.
进行信号.
集合()
_thread_local_var = 线程.local()
@contextlib.contextmanager
定义 _wait_all():
r""
一个收集由 `rpc_async` 返回的所有 futures 的上下文管理器,
在上下文管理器退出时等待它们;减轻用户需要
明确调用 wait。
示例::
>>> # xdoctest: +SKIP("distributed")
>>> # 在工作进程 0 上:
>>> 导入 torch
>>> 导入 torch.distributed.rpc 作为 rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> with rpc._wait_all():
>>> fut_1 = rpc.rpc_async(dst, torch.add, (torch.ones(2, 2), 1))
>>> fut_2 = rpc.rpc_async(dst, torch.add, (torch.ones(2, 2), 1))
>>> #fut_1 和 fut_2 等待
"文档"
_thread_local_var.未来列表 = []
try:
产生
最后:
try:
火炬.
期货.
等待全部(
_线程局部变量.
未来列表)
最后:
删除 _thread_local_var.future_list
@_require_initialized
定义 _all_gather(
对象, worker_names=
无,
超时:
浮点数 = UNSET_RPC_TIMEOUT):
r""
这与 torch.distributed.all_gather()类似,但使用 RPC。它
选择名字最小的 worker(按字母顺序)作为领导者。
然后所有追随者将他们的数据 ``obj`` 发送到领导者。领导者
已接收全部,将结果广播给所有关注者。
函数会阻塞,直到所有工作者都接收到了收集到的结果。
"文档"
如果
非 worker_names:
断言 _ALL_WORKER_NAMES
是
非
无, (
`_ALL_WORKER_NAMES` 未初始化,无法用于 `def _all_gather`。
)
worker_names = _ALL_WORKER_NAMES
leader_name = 最小(
工人名称)
自身名称 =
获取当前 RPC 代理().
获取工人信息().
名称
与 _all_gather_dict_lock:
连接名称 =
输入文本翻译为简体中文为:"".
连接(
排序(
工作人员名称))
序列号 = _all_gather_sequence_id.
获取(
连接名称, 0)
_all_gather_sequence_id[连接名称] =
序列号 + 1
序列 ID =
连接名称 +
字符串(
序列号)
是否是领导者 =
领导者姓名 ==
自称
如果
超时 == UNSET_RPC_TIMEOUT:
# 超时由代理为 RPC 调用指定
rpc 超时 =
获取 RPC 超时时间()
信号无超时
信号超时 =
无
elif 超时 ==
默认关闭超时:
RPC 无超时
rpc_timeout = 超时
信号无超时
信号超时 =
无
else:
信号和 RPC 超时使用相同的超时
信号超时 =
RPC 超时 =
超时
# 第一阶段:追随者将其对象发送给领导者
如果
领导者:
_聚集到领导者处(
序列号, self_name,
对象,
工人名称)
else:
RPC 同步(
领导者名称,
聚集到领导者,
参数=(
序列 ID, self_name,
对象,
工作者名称),
超时=
RPC 超时,
)
与 _all_gather_dict_lock:
状态 = _all_gather_sequence_id_to_states[sequence_id]
# 超时时间由函数参数设置或为 None(表示无限期)
states.进行信号.
等待(
超时=
信号超时)
# 第二阶段:领导者将结果广播给所有跟随者
# 领导者的信号是第一个被解除阻塞的,在收到所有
# 跟随者的数据对象后。
如果
领导:
worker_name_to_response_future_dict = {}
为
追随者名称
在
工人名称 - {
领导者名称}:
fut = rpc 异步(
追随者名称,
向关注者广播,
参数=(
序列 ID,
状态.
收集到的对象),
超时=
RPC 超时,
)
worker_name_to_response_future_dict[follower_name] = fut
错误 = []
为 follower_name, fut
在 worker_name_to_response_future_dict.
项目():
try:
fut.等待()
除了
运行时错误
是
ext: ex:
错误.
追加((
粉丝名称,
ext: ex))
如果
错误:
raise 运行时错误(
f粉丝{[e[0]
为 e
在
错误]}
在_all_gather 中超时
f之后{
RPC 超时:.2f}
秒后。第一个异常是{
错误[0
]
[1]}"
)
使用 sequence_id 清理状态
与 _all_gather_dict_lock:
states = _all_gather_sequence_id_to_states.流行(sequence_id)
返回
州.
收集的物体
@_require_initialized
定义
_障碍(
工人姓名):
r""
同步本地和远程 RPC 进程。
这将阻塞,直到所有在 worker_names 下指定的本地和远程 RPC 进程
调用此方法以等待所有待处理工作完成。
参数:
worker_names (List[str]): 要同步的工作者集合。
"文档"
try:
全局收集(
无,
集合(
工作节点名称))
除了
运行时错误
是
ext: ex:
日志记录器.
错误(
"完成屏障失败,发生错误"%s",
ext: ex)
@_require_initialized
定义
等待所有工作进程(
超时=
默认关闭超时):
r""
阻塞,直到所有本地和远程 RPC 进程达到此方法并等待
所有未完成的工作完成。每个 RPC 进程都必须调用此方法
在退出前执行方法以进行优雅关闭。这应该被用来
终止 RPC 框架,无法保证 RPC
框架将在此方法返回后工作。
"文档"
try:
_all_gather(无,
超时=
超时)
除了
运行时错误
是
ext: ex:
日志记录器.
错误(
"未能及时响应 '关机进行',出现错误"%s",
示例
)
raise 示例
[文档]@_require_initialized
定义
关闭(
平滑的=True,
超时=DEFAULT_SHUTDOWN_TIMEOUT):
r""
执行 RPC 代理的关机操作,然后销毁 RPC 代理。
停止本地代理接受未完成的请求,并关闭
关闭 RPC 框架,通过终止所有 RPC 线程。如果 `graceful=True`,
这将阻塞,直到所有本地和远程 RPC 进程都达到此方法
等待所有未完成的工作完成。否则,如果
``graceful=False``,这是一个本地关闭操作,它不会等待其他
RPC 进程达到此方法。
.. 警告::
对于由 :class:`~torch.futures.Future` 对象返回的
`torch.distributed.rpc.rpc_async`,`future.wait()` 不应
在调用 `shutdown()` 之后被调用。
参数:
优雅(布尔值):是否进行优雅关闭。如果为 True,
这将等待直到没有挂起的系统消息,针对“UserRRefs”删除它们;2)阻塞,直到所有本地和远程 RPC 进程都达到此方法并等待所有待处理工作完成
消息;2)阻塞,直到所有本地和远程 RPC 进程都达到此方法并等待所有待处理工作完成
此方法并等待所有待处理工作完成
此方法并等待所有待处理工作完成
完成。
示例::
确保正确设置了 ``MASTER_ADDR`` 和 ``MASTER_PORT``。
在所有工作节点上。请参阅 :meth:`~torch.distributed.init_process_group` API 获取更多详细信息。
例如,
export MASTER_ADDR=localhost
export MASTER_PORT=5678
然后在两个不同的进程中运行以下代码:
>>> # xdoctest: +SKIP
>>> # 在工作进程 0 上:
>>> 导入 torch
>>> 导入 torch.distributed.rpc 作为 rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> # do some work
>>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1))
>>> # ready to shutdown
>>> rpc.shutdown()
>>> # 在工作器 1 上:
>>> 导入 torch.distributed.rpc 作为 rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> # 等待工作器 0 完成工作后关闭。
>>> rpc.shutdown()
"文档"
如果
平滑:
try:
代理 =
获取当前 RPC 代理()
如果
非 isinstance(
代理,
TensorPipe 代理)
或者
代理.
静态组:
_等待所有工作者(
超时)
删除所有用户和未分叉所有者 rrefs()
代理.
连接(
关闭=True,
超时=
超时)
else:
# 这是一个动态组,因此我们需要获取操作令牌
我的工人信息 =
代理.
获取工人信息()
我的名字 =
我的工种信息.
名称
与
_群组管理(
代理.
店铺,
我的名字,
错误):
所有工人信息 =
代理.
获取工作者信息()
为
工作者
在
所有工人信息:
如果
工人.
名称 !=
我的名字:
RPC 同步(
工人.
名称,
更新群组成员资格,
参数=(my_worker_info,
[] {},
错误),
)
代理.
连接(
关闭=True,
超时=
超时)
最后:
如果发生错误,继续完成本地关闭。
_完成关闭()
else:
_完成关机()
定义
_完成关机():
try:
# 当检测到 RRef 泄漏时引发`TORCH_CHECK()`异常。
销毁 rref 上下文(
_忽略 rref 泄漏)
最后:
获取当前 RPC 代理().
关闭()
# 在 shutdown()中清理 Python RPC 处理器,请参阅注释
# 在 Python API 中调用 PythonRpcHandler::cleanup(),因为 cleanup()函数有 Python 依赖,它假设存在 Python 解释器
# cleanup()函数有 Python 依赖,它假设存在 Python 解释器
# 解释器
无论是否引发 RRef 泄漏异常,此清理代码都必须运行,以避免在 Python 3.5 中发生破坏性段错误。
必须运行以避免在 Python 3.5 中发生破坏性段错误。
#
# future.wait() 不应在关闭后调用。
# pythonRpcHandler 在关闭()后清理。
# 关闭()后,从 rpc python 调用返回的 python 对象无法解析。
# resolved.
清理 Python RPC 处理器()
_重置当前 RPC 代理()
[文档]@_require_initialized
def get_worker_info(worker_name=None):
r"""
获取指定工作名称的 :class:`~torch.distributed.rpc.WorkerInfo`。
使用此 :class:`~torch.distributed.rpc.WorkerInfo` 以避免在每次调用时传递昂贵的字符串。
每次调用时避免传递昂贵的字符串。
参数:
worker_name (str):工作者的字符串名称。如果 ``None``,则返回。
当前工作者的 ID。(默认为 ``None``)
返回:
给定 ``worker_name`` 或 :class:`~torch.distributed.rpc.WorkerInfo` 实例
的 :class:`~torch.distributed.rpc.WorkerInfo`
当前工作进程,如果 `worker_name` 为 `None`。
"""
如果 worker_name 不为 None:
返回 `_get_current_rpc_agent().get_worker_info(worker_name)`。
else:
返回 _get_current_rpc_agent().get_worker_info()
定义 _to_worker_info(
到):
如果 isinstance(
到,
工作信息):
返回 to
elif isinstance(到, (
字符串,
整数)):
返回
获取工作者信息(
到)
else:
raise ValueError(f"无法从名称获取 WorkerInfo"{
到}")
定义 _rref_typeof_on_owner(rref,
阻塞:
布尔类型 = True):
rref 类型 =
类型(rref.
本地值())
如果
阻塞:
返回
rref 类型
else:
将结果包装成完成的 Future。这是为了确保如果 blocking=`False`
如果 # 被指定,无论此调用是否在用户端,我们都返回一个未来对象
# 或所有者。
未来 =
未来[
类型
]
未来.
设置结果(
rref 类型)
返回
未来
定义 _rref_typeof_on_user(
rref, 超时:
浮点数 = UNSET_RPC_TIMEOUT,
阻塞:
布尔类型 =
真实
):
fut = rpc 异步(rref.
拥有者(), _rref_typeof_on_owner,
参数=(rref,),
超时=
超时)
如果
阻塞:
返回 fut.
等待()
else:
返回 fut
T = 类型变量(
T)
泛型含有一个类型变量 =
通用[T]
如果
类型检查:
类 RRef(PyRRef[T
],
通用[T
)]
通过
else:
try:
将实现类和类型类合并。
类 RRef(PyRRef,
通用[T
)]
通过
除了
类型错误:
类型错误:派生类的元类冲突。
必须是所有基类的元类的(非严格)子类。
Mypy 不理解 __class__(mypy 错误 #4177)。
类 RRefMeta(PyRRef.
类,
具有一型变量的通用.
类): # type: ignore[name-defined, misc, valid-type]
通过
将实现类和类型类合并。
# 类期望特定泛型参数的类型(mypy 错误#7791)
类 RRef(PyRRef,
带有一个类型变量的泛型,
元类=RRefMeta):
# 类型:忽略[misc, no-redef, valid-type]
通过
从 `PyRRef` 到 `RRef` 安装文档字符串。
#
这是因为 pybind11 生成了参数 `self` 的类型为 `rpc.PyRRef`,所以 `:inherited-members:` 下的 `.. autoclass:: RRef` 不起作用。
`self` 作为类型 `rpc.PyRRef`,所以 `.. autoclass:: RRef` 下的 `:inherited-members:` 不起作用。
在 `.. autoclass:: RRef` 下的 `:inherited-members:` 不起作用。
我们需要执行以下过程以将`rpc.PyRRef`替换为`rpc.RRef`。
#
定义
方法工厂(
方法名称,
文档字符串):
定义
方法(
我, *
参数, **kwargs):
返回 getattr(
超级(RRef,
我),
方法名称)(*
参数, **kwargs)
如果
方法.__doc__:
方法.__doc__ =
文档字符串
返回
方法
为
方法名称,
方法
在
检查.getmembers(PyRRef):
忽略魔法方法,除了 "__str__"。
如果
方法名称.
以...开头(
“_”)
和
方法名 != "__str__":
continue
获取 pybind11 生成的文档字符串。
就像,
""
to_here(self: torch.distributed.rpc.PyRRef, timeout: float=-1.0) -> object
阻塞调用,从所有者复制 RRef 的值
将本地节点返回。如果当前节点是
owner, 返回对局部值的引用。
"文档"
文档字符串 = getattr(
方法,
__doc__,
无)
断言
文档字符串
是
非
无,
"所有面向用户的方法都应该有文档字符串。"
对 pybind11 生成的文档字符串进行手术。
文档字符串 =
文档字符串.
替换(
"torch.distributed.rpc.PyRRef", "torch.distributed.rpc.RRef"
)
将用户界面 RRef 方法与修改后的文档字符串关联。
新方法 =
方法工厂(
方法名称,
文档字符串)
setattr(RRef, 方法名称,
新方法)
[文档]@_require_initialized
定义
远程(
到,
函数,
参数=
无, kwargs=
无,
超时=UNSET_RPC_TIMEOUT):
r""
向工作器“to”远程调用“func”并立即返回结果值的引用。
class:`~torch.distributed.rpc.RRef` 到结果值。
工作器“to”将是返回的
class:`~torch.distributed.rpc.RRef` 的所有者,调用“remote”的工作器是
一个用户。所有者管理其全局引用计数。
`:class:`~torch.distributed.rpc.RRef`,及其所有者
`:class:`~torch.distributed.rpc.RRef` 仅在全局范围内被销毁时
没有关于它的生活参考。
参数:
目标工作者的名称/等级/``WorkerInfo``(str 或 WorkerInfo 或 int)
可调用函数(Callable),例如 Python 可调用函数、内置函数
操作符(例如::meth:`~torch.add`)和注解
TorchScript 函数
args(元组):调用`func`的参数元组。
kwargs(字典):是`func`调用时的关键字参数字典。
调用。
timeout(浮点数,可选):此远程调用的超时时间(秒)。如果未指定,则默认为 None。
这个的创建
在工作器上的 :class:`~torch.distributed.rpc.RRef`
``to`` 在这个工作器上未能成功处理
如果在这个超时时间内未能处理,那么下一次
尝试使用 RRef(例如
``to_here()``),将引发超时
表示此失败。0 表示
无限超时,即超时错误将
永远不会被提升。如果没有提供,则使用默认值
在初始化期间或使用 ``_set_rpc_timeout`` 设置的值
``_set_rpc_timeout`` 被使用。
返回:
一个指向结果的 :class:`~torch.distributed.rpc.RRef` 实例
使用阻塞 API::meth:`torch.distributed.rpc.RRef.to_here` 获取结果值。
在本地检索结果值。
..警告::
``remote`` API 在通过网络发送之前不会复制参数张量的存储,这可能会由不同的线程完成。
这可能导致在发送参数张量之前,存储它们的 API 不会复制。
根据 RPC 后端类型而定。调用者应确保那些张量的内容在返回 RRef 被所有者确认之前保持不变。
这些张量的内容在返回 RRef 被所有者确认之前保持不变。
可以使用:meth:`torch.distributed.rpc.RRef.confirmed_by_owner` API 来检查。
meth:`torch.distributed.rpc.RRef.confirmed_by_owner` API 进行确认。
..警告::
远程 API 的“超时”等错误以尽力而为的方式处理。
这意味着当由“远程”发起的远程调用失败,例如出现超时错误时,我们采取尽力而为的错误处理方式。
这意味着当“远程”调用失败,例如出现超时错误时,我们采取尽力而为的错误处理方法。
这意味着错误会被处理并设置为
在异步基础上对结果 RRef 进行操作。如果在此处理之前应用程序尚未使用该 RRef(例如“to_here”或 fork 调用),则未来对“RRef”的使用将适当地引发错误。
但是,用户应用程序可能会使用该 RRef。
如果在此处理之前应用程序尚未使用该 RRef(例如“to_here”或 fork 调用),则未来对“RRef”的使用将适当地引发错误。
然而,用户应用程序可能会使用该 RRef。
在处理错误之前,先处理 `RRef`。在这种情况下,错误可能不会被
尚未处理,因此被提升。
示例::
确保 ``MASTER_ADDR`` 和 ``MASTER_PORT`` 设置正确
在两个工作者上。参考::meth:`~torch.distributed.init_process_group`
查看更多详情的 API。例如,
导出 MASTER_ADDR=localhost
导出 MASTER_PORT=5678
然后在两个不同的进程中运行以下代码:
>>> # xdoctest: +SKIP
>>> # 在工作节点 0 上:
>>> 导入 torch
>>> 导入 torch.distributed.rpc 作为 rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1))
>>> x = rref1.to_here() + rref2.to_here()
>>> rpc.shutdown()
>>> # 在工作器 1 上:
>>> 导入 torch.distributed.rpc 作为 rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
下面是一个使用 RPC 运行 TorchScript 函数的示例。
>>> # 在两个工作者上:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>> return torch.add(tensor, scalar)
>>> # 在工作节点 0 上:
>>> 导入 torch.distributed.rpc 作为 rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rref.to_here()
>>> rpc.shutdown()
>>> # 在 worker 1 上:
>>> 导入 torch.distributed.rpc 作为 rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
"文档"
火炬._C._log_api_usage_once(
torch.distributed.rpc_remote)
qualified_name = 火炬.
算子._builtins.
内置查找(
函数)
目标工作器信息 =
转换为工作器信息(
到)
是否应该进行性能分析 = _get_should_profile()
ctx_manager = _enable_rpc_profiler(
should_profile, 合法名称,
函数,
RPC 执行模式.
远程,
目标工作信息
)
与
上下文管理器
是 rf:
args = args 如果 args
否则 ()
kwargs = kwargs 如果 kwargs
否则 {}
is_async_exec = 有属性(
函数,
_wrapped_async_rpc_function)
如果 is_async_exec:
包装 =
函数._wrapped_async_rpc_function
如果 isinstance(
包装,
火炬.
算子.
脚本函数):
函数 =
包装
如果 qualified_name
是
非
无:
rref = 远程调用内置函数(
目标工作信息,
合法名称,
超时, *
参数, **kwargs
)
elif isinstance(函数,
火炬.
算子.
脚本函数):
rref = 远程调用 TorchScript(
目标工作信息.
名称,
火炬._jit_internal._qualified_name(
函数),
超时,
是否异步执行,
*参数,
**kwargs,
)
else:
(序列化的 Python UDF,
张量) = _default_pickler.
序列化(
PythonUDF(函数,
参数, kwargs)
)
rref = 远程调用 Python UDF(
dst_worker_info, pickled_python_udf, 张量,
超时,
异步执行
)
#附加性能分析信息
如果
应该进行性能分析:
断言
火炬.
自动微分._profiler_enabled()
断言 rf
是
非
无
fut = rf.在 future 上调用结束回调(rref.
获取未来())
rref.设置未来分析(fut)
返回 rref
定义
调用 RPC(
到,
函数,
RPC 类型,
参数=
无, kwargs=
无,
RPC 超时:
浮点数 =
设置 RPC 超时
):
如果
非
可调用(
函数):
raise 类型错误(
函数应该是可调用的。)
qualified_name = 火炬.
算子.
内置模块.
查找内置函数(
函数)
目标工作器信息 =
转换为工作器信息(
到)
应该配置文件 =
_获取应该配置文件()
ctx 管理器 =
_启用 RPC 性能分析器(
应该配置,
合法名称,
函数,
RPC 类型,
目标工作者信息
)
与
上下文管理器
是 rf:
args = args 如果 args
否则 ()
kwargs = kwargs 如果 kwargs
否则 {}
is_async_exec = 有属性(
函数,
_wrapped_async_rpc_function)
如果 is_async_exec:
包装 =
函数._wrapped_async_rpc_function
如果 isinstance(
包装,
火炬.
算子.
脚本函数):
函数 =
包装
如果 qualified_name
是
非
无:
fut = 远程调用 RPC 内置函数(
目标工作信息,
合法名称,
RPC 超时, *
参数, **kwargs
)
elif isinstance(函数,
火炬.
算子.
脚本函数):
fut = _调用 TorchScript RPC(
目标工作信息.
名称,
火炬._jit_internal._qualified_name(
函数),
参数,
kwargs,
RPC 超时,
是否异步执行,
)
else:
(序列化的 Python UDF,
张量) = _default_pickler.
序列化(
PythonUDF(函数,
参数, kwargs)
)
fut = _调用 Python UDF RPC(
dst_worker_info, pickled_python_udf, 张量,
RPC 超时,
异步执行
)
如果
应该进行性能分析:
断言
火炬.
自动微分._profiler_enabled()
断言 rf
是
非
无
# 将性能分析回调计划在 future 完成时运行。
# 这将返回一个当原始 future 完成时以及配置文件回调也完成时的 future
# 完成并确保 fut.wait()完成配置文件。这个新的 future 将包含与原始 future 相同的值。
# 以保证 fut.wait()完成配置文件。这个新的 future 将包含与原始 future 相同的值。
# 这个新的 future 将包含与原始 future 相同的值。
fut = rf.在 future 上调用结束回调(fut)
返回 fut
[文档]@_require_initialized
定义
rpc 同步(
到,
函数,
参数=
无, kwargs=
无,
超时:
浮点数 =
未设置 RPC 超时):
r""
向工作器“to”运行函数“func”的阻塞 RPC 调用。
消息在 Python 代码执行过程中并行发送和接收。
此方法线程安全。
参数:
to(str 或 WorkerInfo 或 int):目标工作器的名称/排名/WorkerInfo。
可调用对象(Callable):一个可调用的函数,例如 Python 可调用对象、内置函数
操作符(例如::meth:`~torch.add`)和注解的
TorchScript 函数。
args(元组):`func`调用的参数元组。
kwargs(dict):是调用 ``func`` 的关键字参数字典。
调用。
timeout(浮点数,可选):用于此 RPC 的秒数超时。如果
RPC 在此时间内未完成。
时间,表示发生异常的时间
超时将被触发。值为 0
表示无限超时,即超时
错误永远不会引发。如果未提供,
初始化时设置的默认值
或者使用 `__set_rpc_timeout` 进行设置。
返回:
返回使用 `func`、`args` 和 `kwargs` 运行的结果。
示例::
确保已正确设置 `MASTER_ADDR` 和 `MASTER_PORT`。
在两个工作器上。请参阅 :meth:`~torch.distributed.init_process_group` 相关 API 获取更多详细信息。
例如,
导出 MASTER_ADDR=localhost
导出 MASTER_PORT=5678
然后在两个不同的进程中运行以下代码:
>>> # xdoctest: +SKIP
>>> # 在 worker 0 上:
>>> 导入 torch
>>> 导入 torch.distributed.rpc 作为 rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # 在工作进程 1:
>>> 导入 torch.distributed.rpc 作为 rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
下面是一个使用 RPC 运行 TorchScript 函数的示例。
>>> # 在两个工作者上:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>> return torch.add(tensor, scalar)
>>> # 在 worker 0 上:
>>> 导入 torch.distributed.rpc 作为 rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # 在工作进程 1:
>>> 导入 torch.distributed.rpc 作为 rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
"文档"
火炬._C._log_api_usage_once(
torch.distributed.rpc_sync)
fut = _invoke_rpc(到,
函数,
RPC 执行模式.
同步,
参数, kwargs,
超时)
返回 fut.
等待()
[文档]@_require_initialized
定义
rpc 异步(
到,
函数,
参数=
无, kwargs=
无,
超时=UNSET_RPC_TIMEOUT):
r""
向工作器“to”的非阻塞 RPC 调用运行函数“func”。消息在 Python 代码执行过程中并行发送和接收。
此方法线程安全。此方法将立即返回。
(此处原文为空,故不进行翻译)
可等待的 :class:`~torch.futures.Future` 对象。
参数:
目标工作者的名称/排名/``WorkerInfo``,类型为 (str 或 WorkerInfo 或 int)。
func (Callable):一个可调用的函数,例如 Python 可调用函数或内置函数。
操作符(例如 :meth:`~torch.add`)和注解。
TorchScript 函数。
args(元组):`func`调用的参数元组。
kwargs(字典):`func`调用时的关键字参数字典。
调用。
超时(浮点数,可选):用于此 RPC 的超时时间(秒)。如果
RPC 在此时间内未完成,将引发表示已超时的异常。
超时的异常将被抛出。0 表示不设置超时。
超时(浮点数,可选):用于此 RPC 的超时时间(秒)。如果
表示无限超时,即超时
错误永远不会被抛出。如果没有提供,
则使用初始化期间设置的默认值。
或者使用 ``_set_rpc_timeout`` 设置。
返回:
返回一个 :class:`~torch.futures.Future` 对象,可以等待
在. 完成后,`func` 在 `args` 上的返回值
``kwargs`` 可以从 :class:`~torch.futures.Future` 中获取
对象
..警告::
使用 GPU 张量作为`func`的参数或返回值是不支持的,因为我们不支持通过网络发送 GPU 张量。您需要在使用`func`的参数或返回值之前,显式地将 GPU 张量复制到 CPU。
您需要在使用`func`的参数或返回值之前,显式地将 GPU 张量复制到 CPU。您需要在使用`func`的参数或返回值之前,显式地将 GPU 张量复制到 CPU。
您需要在使用`func`的参数或返回值之前,显式地将 GPU 张量复制到 CPU。您需要在使用`func`的参数或返回值之前,显式地将 GPU 张量复制到 CPU。
您需要在使用`func`的参数或返回值之前,显式地将 GPU 张量复制到 CPU。您需要在使用`func`的参数或返回值之前,显式地将 GPU 张量复制到 CPU。
..警告::
``rpc_async`` API 不会复制参数张量的存储直到
将它们发送到线上,这可以通过不同的线程完成
根据 RPC 后端类型。调用者应确保
那些张量的内容保持不变,直到返回
`torch.futures.Future` 完成。
示例::
确保正确设置了 `MASTER_ADDR` 和 `MASTER_PORT`。
在所有工作节点上。请参阅 :meth:`~torch.distributed.init_process_group` API 获取更多详细信息。
例如,
export MASTER_ADDR=localhost
export MASTER_PORT=5678
然后在两个不同的进程中运行以下代码:
>>> # xdoctest: +SKIP
>>> # 在工作进程 0 上:
>>> 导入 torch
>>> 导入 torch.distributed.rpc 作为 rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3))
>>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2))
>>> result = fut1.wait() + fut2.wait()
>>> rpc.shutdown()
>>> # 在工作进程 1:
>>> 导入 torch.distributed.rpc 作为 rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
下面是一个使用 RPC 运行 TorchScript 函数的示例。
>>> # 在两个工作者上:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>> return torch.add(tensor, scalar)
>>> # 在 worker 0 上:
>>> 导入 torch.distributed.rpc 作为 rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3))
>>> ret = fut.wait()
>>> rpc.shutdown()
>>> # 在工作器 1 上:
>>> 导入 torch.distributed.rpc 作为 rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
"文档"
火炬._C._log_api_usage_once(
torch.distributed.rpc_async)
fut = _invoke_rpc(到,
函数,
RPC 执行模式.
异步,
参数, kwargs,
超时)
如果
有属性(
_线程局部变量,
future 列表):
_thread_local_var.future_list.追加(fut)
返回 fut
定义 _get_should_profile():
# 旧版分析器应启用。不支持与 RPC 分析一起使用
动态分析器
活动分析器类型 =
火炬._C.
_分析器.
活动分析器类型
返回 (
火炬.
自动微分._profiler_enabled()
和
火炬._C.
_自动微分._profiler_type() ==
活动分析器类型.
旧版
# 类型: 忽略[attr-defined]
)
定义
启用 RPC 分析器(
应该配置,
合法名称,
函数,
RPC 类型,
目标工作者信息
):
上下文管理器 = contextlib.
无上下文()
如果
应该配置:
# 创建基于函数类型的适当字符串表示
# (内置,脚本,Python)
如果 qualified_name
是
无:
函数名 = (
火炬._jit_internal._qualified_name(
函数)
如果 isinstance(
函数,
火炬.
算子.
脚本函数)
否则
函数.__qualname__
)
else:
函数名 = qualified_name
# 构建 RPC 性能分析键。
rpc 性能键 =
构建 rpc 性能键(
rpc 类型,
函数名,
获取工作者信息().
名称,
目标工作者信息.
名称,
)
远程性能分析管理器.
设置当前性能分析键(rpc_profiling_key)
# Mypy 不支持在同一块中重新定义不在同一块中的变量(#1174)
ctx_manager = 火炬.
自动微分.
分析器.
记录功能(rpc_profiling_key)
# 类型:忽略[赋值]
返回
上下文管理器