快捷键

torch.distributed.elastic.rendezvous.api 的源代码

# mypy: 允许未类型化定义
版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。

导入 套接字
来自 abc 导入 ABC, 抽象方法
来自 dataclasses 导入 数据类
来自 打字 导入 任意, 可调用, 类变量, 可选

来自 torch.distributed 导入 店铺
来自 torch.distributed.elastic.utils.distributed 导入 获取空闲端口


__all__ = [
    " rendezvous 关闭错误 ",
    " rendezvous 连接错误 ",
    " rendezvous 错误 ",
    " rendezvous 优雅退出错误 ",
    " rendezvous 处理器 ",
    " rendezvous 处理器创建者 ",
    " rendezvous 处理器注册表 ",
    "会合信息",
    "会合参数",
    "会合状态错误",
    "会合存储信息",
    " rendezvous 超时错误 ",
    " rendezvous 处理器注册表 ",
]


[文档]class RendezvousError(Exception): """ 表示 rendezvous 错误的基类型。 """
[文档]class RendezvousClosedError(RendezvousError): """当 rendezvous 被关闭时引发。"""
[文档]class RendezvousTimeoutError(RendezvousError): """当 rendezvous 未按时完成时引发。"""
[文档]类 RendezvousConnectionError(RendezvousError): """当与 rendezvous 后端连接失败时引发。"""
[文档]类 RendezvousStateError(RendezvousError): """当 rendezvous 状态损坏时引发。"""
[文档]类 RendezvousGracefulExitError(RendezvousError): 当节点未包含在 rendezvous 中并优雅地退出时引发。 异常是退出堆栈的机制,但这并不意味着失败。 """
[文档]@dataclass class RendezvousStoreInfo: """存储地址和端口,可用于启动训练器分布式通信""" MASTER_ADDR_KEY: ClassVar[str] = "MASTER_ADDR" MASTER_PORT_KEY: 类变量[str] = "MASTER_PORT" master_addr: str master_port: int
[文档] @staticmethod def build( rank: int, store: Store, local_addr: Optional[str], server_port: 可选[int] = None, ) -> "RendezvousStoreInfo": """工厂方法,在 rank0 主机上查找未使用的端口,并获取所有 rank 的 addr/port 信息。 如果已知 master_addr/master_port(当共享现有的 tcp 存储服务器时很有用),请使用构造函数。 Args: rank: 当前节点的排名 store: 用于会合的存储 local_addr: 当前节点的地址,如未提供,将根据主机名解析 server_port: TCPStore 服务器的端口号,当 TCPStore 被共享时。 """ # TODO 使用集体通信 API 替换 if rank == 0: addr = local_addr 或 socket.getfqdn() 当 TCPStore 不共享时,我们回退到 get_free_port。 port = server_port 或 get_free_port() store.set( RendezvousStoreInfo.MASTER_ADDR_KEY, addr.encode(encoding="UTF-8"), # type: ignore[arg-type] ) store.set( RendezvousStoreInfo.MASTER_PORT_KEY, str(port).encode(encoding="UTF-8"), # type: ignore[arg-type] ) addr = store.get(RendezvousStoreInfo.MASTER_ADDR_KEY).decode(encoding="UTF-8") port = int( store.get(RendezvousStoreInfo.MASTER_PORT_KEY).decode(encoding="UTF-8") ) return RendezvousStoreInfo(master_addr=addr, master_port=port)
[文档]class RendezvousInfo: 保存关于会合的信息。 def __init__ self store: 商店, rank: 整数, world_size: 整数, bootstrap_store_info: 交会点商店信息, ): self._store = store self._rank = rank self._world_size = world_size self._bootstrap_store_info = bootstrap_store_info @property def store(self) -> Store: """Store used by torchelastic control plane""" 返回 self._store @property def rank(self) -> int: """组内排名""" 返回 self._rank @property def world_size(self) -> int: """全局组大小""" 返回 self._world_size @property def bootstrap_store_info(self) -> Optional[RendezvousStoreInfo]: """存储信息,供训练代码用于启动分布式通信。""" 返回自启动存储信息
[文档] 会合处理器(ABC): 主 rendezvous 接口。 注意: 分布式 Torch 用户通常 **不需要** 实现自己的 ``RendezvousHandler``。基于 C10d Store 的实现已经 提供的,适用于大多数用户。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ```
[文档] @abstractmethod def get_backend(self) -> str: """返回 rendezvous 后端名称。"""
@property def 使用代理存储
(自身) -> 布尔: 指示::py:meth:`next_rendezvous` 返回的存储引用可以与用户共享 应用程序将在应用程序生命周期内可用。 rendezvous 处理器实现将作为:py:class:`RendezvousStoreInfo`实例共享存储详情。 应用程序通常使用`MASTER_ADDR`/`MASTER_PORT`环境变量来查找存储。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 返回
[文档] @abstractmethod def next_rendezvous(self) -> RendezvousInfo: """主入口到 rendezvous 障碍。 阻塞,直到 rendezvous 完成,当前进程 包含在形成的工人组中,或者发生超时,或者 会合已被标记为关闭。 返回: 实例::py:class:`RendezvousInfo`。 Raises: RendezvousClosedError: The rendezvous is closed. RendezvousConnectionError: 与 rendezvous 后端连接失败。 RendezvousStateError: rendezvous 状态已损坏。 RendezvousTimeoutError: 约会没有按时完成。 ```python # 输入文本 input_text = '"""' # 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API) def translate_to_simplified_chinese(text): # 假设的翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ```
[docs] @abstractmethod def is_closed(self) -> bool: 是否已关闭 检查会合是否已关闭。 封闭会合意味着所有未来尝试重新会合 同一工作将失败。 `is_closed()` 和 :py:meth:`set_closed` 具有最终语义 传播且不应用于同步。意图是 如果至少有一个节点决定工作已完成,它将关闭 rendezvous,其他节点将很快观察到这一点并停止运行 好的。 """
[文档] @abstractmethod def 设置为关闭(self): """标记会合为已关闭。"""
[文档] @abstractmethod def num_nodes_waiting(self) -> int: """返回到达集合点晚的节点数量 障碍,因此未被包含在当前工作组中。""" 调用者应定期调用此方法以检查是否有新 节点正在等待加入作业,如果可以,通过调用它们来接纳它们 :py:meth:`next_rendezvous()`(重新会面)。 ```python # 输入文本 input_text = '"""' # 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API) def translate_to_simplified_chinese(text): # 假设的翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ```
[文档] @abstractmethod def get_run_id(self) -> str: """返回 rendezvous 的运行 ID。 运行 ID 是一个用户定义的 ID,用于唯一标识 rendezvous 的一个实例。 一个分布式应用程序。它通常映射到作业 ID,并用于 允许节点加入正确的分布式应用程序。 """
[文档] @abstractmethod def shutdown(self) -> bool: """关闭所有为会合而打开的资源。 示例:: rdzv_handler = ... try: store, rank, world_size = rdzv_handler.next_rendezvous() finally: rdzv_handler.shutdown() """
[文档] 会合参数: 保持构建 :py:class:`RendezvousHandler` 的参数。 参数: backend: 使用后端处理 rendezvous 的名称。 端点: 约会点的端点,通常为 [:] 的形式。 run_id: 约会点的 ID。 min_nodes: 约会中允许的最小节点数。 max_nodes: 允许加入会合的最大节点数。 local_addr: 本地节点的地址。 **kwargs: 指定后端额外的参数。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` def 初始化( 自身, 后端: 字符串, 端点: 字符串, run_id: 字符串, min_nodes: int, 最大节点数: int, 本地地址: 可选[字符串] = , **kwargs, ): 如果 not 后端: 提升 ValueError(会议后端名称必须是非空字符串。) 如果 最小节点数 < 1: 提升 ValueError( f"会合节点数的最小值("{min_nodes})必须大于零。 ) 如果 节点数最大值 < min_nodes: 提升 ValueError( f"会合节点的最大值("{最大节点数})必须大于或 " f等于最小 rendezvous 节点数 ({min_nodes}). ) 自身.后端 = 后端 自身.端点 = 端点 自身.运行 ID = 运行 ID 自身.最小节点数 = 最小节点数 自身.节点数最大值 = 节点数最大值 自身.配置 = kwargs 自身.本地地址 = 本地地址
[文档] def get(self, key: str, default: Any = None) -> Any: """返回 ``key`` 的值,如果 ``key`` 存在,否则返回 ``default``。” return self.config.get(key, default)
[文档] def get_as_bool(self, key: str, default: Optional[bool] = None) -> Optional[bool]: """将 ``key`` 的值作为 ``bool`` 返回。""" value = self.get(key, default) if value is None or isinstance(value, bool): 返回值 if isinstance(value, int): if value == 1: return True if value == 0: return False elif isinstance(value, str): if value.lower() in ["1", "true", "t", "yes", "y"]: return True if value.lower() in ["0", "false", "f", "no", "n"]: return False raise ValueError( "The rendezvous configuration option '{key}' does not represent a valid boolean value." 的简体中文翻译为: rendezvous 配置选项'{key}'不代表一个有效的布尔值。 )
[文档] def get_as_int(self, key: str, default: Optional[int] = None) -> Optional[int]: """返回 ``key`` 的整数值。""" value = self.get(key, default) if value is None: 返回值 try: 返回 int(value) except ValueError as e: raise ValueError( 会合配置选项 '{key}' 不表示一个有效的整数 "值。" ) from e
预约处理创建器 = 可调用[[会合参数] 会合处理器]
[文档] 预约处理程序注册表: 表示一个 :py:class:`RendezvousHandler` 后端的注册表。 _注册表: 字典[字符串, 预约处理器创建者] def 初始化(自身) -> : 自身._注册表 = {} def 注册(自身, 后端: 字符串, 创作者: 预约处理器创建者) -> : 注册新的会话后端。 参数: backend: 后端名称。 creator: 回调以构造 :py:class:` rendezvous 处理器 `. ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 如果 not 后端: 提升 ValueError(会议后端名称必须是非空字符串。) 当前创建者: 可选[预约处理器创建者] try: 当前创建者 = 自身._注册表[后端] 除了 键错误: 当前创建者 = None 如果 当前创建者 not None 当前创建者 != 创作者: 提升 ValueError( f"会合后端 '"{后端}' 无法与 ' 注册{创作者}"就像它“ f"已经注册为"{当前创建者}'。"' ) 自身._注册表[后端] = 创建者 def 创建处理器(自身, 参数: 会合参数) -> 会合处理器: 创建一个新的 :py:class:`RendezvousHandler`。 try: 创建者 = 自身._注册表[参数.后端] 除了 键错误 作为 e: 提升 ValueError( f"会合后端 '"{参数.后端}' 未注册。你是否忘记了 " f调用 `{自身.注册.__name__}`?` ) 来自 e 处理器 = 创作者(参数) 进行一些合理性检查。 如果 处理器.获取后端() != 参数.后端: 提升 运行时错误( f"会合后端 '"{处理器.获取后端()}'不匹配请求的 "' f"后端 '"{参数.后端}'。"' ) 返回 处理器
# 启动脚本使用的默认全局注册实例,用于实例化 预约处理程序。 预约处理程序注册表 = 预约处理程序注册表()

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源