torch.distributed.elastic.rendezvous.api 的源代码
# mypy: 允许未类型化定义
版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。
导入
套接字
来自 abc
导入 ABC,
抽象方法
来自 dataclasses
导入
数据类
来自
打字
导入
任意,
可调用,
类变量,
可选
来自 torch.distributed
导入
店铺
来自 torch.distributed.elastic.utils.distributed
导入
获取空闲端口
__all__ = [
" rendezvous 关闭错误 ",
" rendezvous 连接错误 ",
" rendezvous 错误 ",
" rendezvous 优雅退出错误 ",
" rendezvous 处理器 ",
" rendezvous 处理器创建者 ",
" rendezvous 处理器注册表 ",
"会合信息",
"会合参数",
"会合状态错误",
"会合存储信息",
" rendezvous 超时错误 ",
" rendezvous 处理器注册表 ",
]
[文档]class RendezvousError(Exception):
""" 表示 rendezvous 错误的基类型。 """
[文档]class RendezvousClosedError(RendezvousError):
"""当 rendezvous 被关闭时引发。"""
[文档]class RendezvousTimeoutError(RendezvousError):
"""当 rendezvous 未按时完成时引发。"""
[文档]类 RendezvousConnectionError(RendezvousError):
"""当与 rendezvous 后端连接失败时引发。"""
[文档]类 RendezvousStateError(RendezvousError):
"""当 rendezvous 状态损坏时引发。"""
[文档]类 RendezvousGracefulExitError(RendezvousError):
当节点未包含在 rendezvous 中并优雅地退出时引发。
异常是退出堆栈的机制,但这并不意味着失败。
"""
[文档]@dataclass
class RendezvousStoreInfo:
"""存储地址和端口,可用于启动训练器分布式通信"""
MASTER_ADDR_KEY: ClassVar[str] = "MASTER_ADDR"
MASTER_PORT_KEY: 类变量[str] = "MASTER_PORT"
master_addr: str
master_port: int
[文档] @staticmethod
def build(
rank: int,
store: Store,
local_addr: Optional[str],
server_port: 可选[int] = None,
) -> "RendezvousStoreInfo":
"""工厂方法,在 rank0 主机上查找未使用的端口,并获取所有 rank 的 addr/port 信息。
如果已知 master_addr/master_port(当共享现有的 tcp 存储服务器时很有用),请使用构造函数。
Args:
rank: 当前节点的排名
store: 用于会合的存储
local_addr: 当前节点的地址,如未提供,将根据主机名解析
server_port: TCPStore 服务器的端口号,当 TCPStore 被共享时。
"""
# TODO 使用集体通信 API 替换
if rank == 0:
addr = local_addr 或 socket.getfqdn()
当 TCPStore 不共享时,我们回退到 get_free_port。
port = server_port 或 get_free_port()
store.set(
RendezvousStoreInfo.MASTER_ADDR_KEY,
addr.encode(encoding="UTF-8"), # type: ignore[arg-type]
)
store.set(
RendezvousStoreInfo.MASTER_PORT_KEY,
str(port).encode(encoding="UTF-8"), # type: ignore[arg-type]
)
addr = store.get(RendezvousStoreInfo.MASTER_ADDR_KEY).decode(encoding="UTF-8")
port = int(
store.get(RendezvousStoreInfo.MASTER_PORT_KEY).decode(encoding="UTF-8")
)
return RendezvousStoreInfo(master_addr=addr, master_port=port)
[文档]class RendezvousInfo:
保存关于会合的信息。
def __init__
self
store: 商店,
rank: 整数,
world_size: 整数,
bootstrap_store_info: 交会点商店信息,
):
self._store = store
self._rank = rank
self._world_size = world_size
self._bootstrap_store_info = bootstrap_store_info
@property
def store(self) -> Store:
"""Store used by torchelastic control plane"""
返回 self._store
@property
def rank(self) -> int:
"""组内排名"""
返回 self._rank
@property
def world_size(self) -> int:
"""全局组大小"""
返回 self._world_size
@property
def bootstrap_store_info(self) -> Optional[RendezvousStoreInfo]:
"""存储信息,供训练代码用于启动分布式通信。"""
返回自启动存储信息
[文档]
类
会合处理器(ABC):
主 rendezvous 接口。
注意:
分布式 Torch 用户通常 **不需要** 实现自己的
``RendezvousHandler``。基于 C10d Store 的实现已经
提供的,适用于大多数用户。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
[文档] @abstractmethod
def get_backend(self) -> str:
"""返回 rendezvous 后端名称。"""
@property
def 使用代理存储(
自身) ->
布尔:
指示::py:meth:`next_rendezvous` 返回的存储引用可以与用户共享
应用程序将在应用程序生命周期内可用。
rendezvous 处理器实现将作为:py:class:`RendezvousStoreInfo`实例共享存储详情。
应用程序通常使用`MASTER_ADDR`/`MASTER_PORT`环境变量来查找存储。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
返回
假
[文档] @abstractmethod
def next_rendezvous(self) -> RendezvousInfo:
"""主入口到 rendezvous 障碍。
阻塞,直到 rendezvous 完成,当前进程
包含在形成的工人组中,或者发生超时,或者
会合已被标记为关闭。
返回:
实例::py:class:`RendezvousInfo`。
Raises:
RendezvousClosedError:
The rendezvous is closed.
RendezvousConnectionError:
与 rendezvous 后端连接失败。
RendezvousStateError:
rendezvous 状态已损坏。
RendezvousTimeoutError:
约会没有按时完成。
```python
# 输入文本
input_text = '"""'
# 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 假设的翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
[docs] @abstractmethod
def is_closed(self) -> bool: 是否已关闭
检查会合是否已关闭。
封闭会合意味着所有未来尝试重新会合
同一工作将失败。
`is_closed()` 和 :py:meth:`set_closed` 具有最终语义
传播且不应用于同步。意图是
如果至少有一个节点决定工作已完成,它将关闭
rendezvous,其他节点将很快观察到这一点并停止运行
好的。
"""
[文档] @abstractmethod
def 设置为关闭(self):
"""标记会合为已关闭。"""
[文档] @abstractmethod
def num_nodes_waiting(self) -> int:
"""返回到达集合点晚的节点数量
障碍,因此未被包含在当前工作组中。"""
调用者应定期调用此方法以检查是否有新
节点正在等待加入作业,如果可以,通过调用它们来接纳它们
:py:meth:`next_rendezvous()`(重新会面)。
```python
# 输入文本
input_text = '"""'
# 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 假设的翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
[文档] @abstractmethod
def get_run_id(self) -> str:
"""返回 rendezvous 的运行 ID。
运行 ID 是一个用户定义的 ID,用于唯一标识 rendezvous 的一个实例。
一个分布式应用程序。它通常映射到作业 ID,并用于
允许节点加入正确的分布式应用程序。
"""
[文档] @abstractmethod
def shutdown(self) -> bool:
"""关闭所有为会合而打开的资源。
示例::
rdzv_handler = ...
try:
store, rank, world_size = rdzv_handler.next_rendezvous()
finally:
rdzv_handler.shutdown()
"""
[文档]
类
会合参数:
保持构建 :py:class:`RendezvousHandler` 的参数。
参数:
backend:
使用后端处理 rendezvous 的名称。
端点:
约会点的端点,通常为 [:] 的形式。
run_id:
约会点的 ID。
min_nodes:
约会中允许的最小节点数。
max_nodes:
允许加入会合的最大节点数。
local_addr:
本地节点的地址。
**kwargs:
指定后端额外的参数。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 初始化(
自身,
后端:
字符串,
端点:
字符串,
run_id: 字符串,
min_nodes: int,
最大节点数: int,
本地地址:
可选[
字符串] =
无,
**kwargs,
):
如果 not
后端:
提升 ValueError(
会议后端名称必须是非空字符串。)
如果
最小节点数 < 1:
提升 ValueError(
f"会合节点数的最小值("{min_nodes}
)必须大于零。
)
如果
节点数最大值 < min_nodes:
提升 ValueError(
f"会合节点的最大值("{
最大节点数}
)必须大于或 "
f等于最小 rendezvous 节点数 ({min_nodes}
).
)
自身.
后端 =
后端
自身.
端点 =
端点
自身.
运行 ID =
运行 ID
自身.
最小节点数 =
最小节点数
自身.
节点数最大值 =
节点数最大值
自身.
配置 = kwargs
自身.
本地地址 =
本地地址
[文档] def get(self, key: str, default: Any = None) -> Any:
"""返回 ``key`` 的值,如果 ``key`` 存在,否则返回 ``default``。”
return self.config.get(key, default)
[文档] def get_as_bool(self, key: str, default: Optional[bool] = None) -> Optional[bool]:
"""将 ``key`` 的值作为 ``bool`` 返回。"""
value = self.get(key, default)
if value is None or isinstance(value, bool):
返回值
if isinstance(value, int):
if value == 1:
return True
if value == 0:
return False
elif isinstance(value, str):
if value.lower() in ["1", "true", "t", "yes", "y"]:
return True
if value.lower() in ["0", "false", "f", "no", "n"]:
return False
raise ValueError(
"The rendezvous configuration option '{key}' does not represent a valid boolean value." 的简体中文翻译为: rendezvous 配置选项'{key}'不代表一个有效的布尔值。
)
[文档] def get_as_int(self, key: str, default: Optional[int] = None) -> Optional[int]:
"""返回 ``key`` 的整数值。"""
value = self.get(key, default)
if value is None:
返回值
try:
返回 int(value)
except ValueError as e:
raise ValueError(
会合配置选项 '{key}' 不表示一个有效的整数
"值。"
) from e
预约处理创建器 =
可调用[[
会合参数
]
会合处理器]
[文档]
类
预约处理程序注册表:
表示一个 :py:class:`RendezvousHandler` 后端的注册表。
_注册表:
字典[
字符串,
预约处理器创建者]
def 初始化(
自身) ->
无:
自身.
_注册表 = {}
def 注册(
自身,
后端:
字符串,
创作者:
预约处理器创建者) ->
无:
注册新的会话后端。
参数:
backend:
后端名称。
creator:
回调以构造
:py:class:` rendezvous 处理器 `.
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
如果 not
后端:
提升 ValueError(
会议后端名称必须是非空字符串。)
当前创建者:
可选[
预约处理器创建者]
try:
当前创建者 =
自身.
_注册表[
后端]
除了
键错误:
当前创建者 = None
如果
当前创建者
是 not None
和
当前创建者 !=
创作者:
提升 ValueError(
f"会合后端 '"{
后端}
' 无法与 ' 注册{
创作者}
"就像它“
f"已经注册为"{
当前创建者}
'。"'
)
自身.
_注册表[
后端] =
创建者
def 创建处理器(
自身,
参数:
会合参数) ->
会合处理器:
创建一个新的 :py:class:`RendezvousHandler`。
try:
创建者 =
自身.
_注册表[
参数.
后端]
除了
键错误
作为 e:
提升 ValueError(
f"会合后端 '"{
参数.
后端}
' 未注册。你是否忘记了 "
f调用 `{
自身.
注册.__name__}
`?`
) 来自 e
处理器 =
创作者(
参数)
进行一些合理性检查。
如果
处理器.
获取后端() !=
参数.
后端:
提升
运行时错误(
f"会合后端 '"{
处理器.
获取后端()}
'不匹配请求的 "'
f"后端 '"{
参数.
后端}
'。"'
)
返回
处理器
# 启动脚本使用的默认全局注册实例,用于实例化
预约处理程序。
预约处理程序注册表 =
预约处理程序注册表()