• 文档 >
  • 模块代码 >
  • torch >
  • torch.distributed >
  • torch.distributed.elastic.rendezvous.c10d_rendezvous_backend
快捷键

torch.distributed.elastic.rendezvous.c10d_rendezvous_backend 源代码

# mypy: 允许未类型化定义
版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。

导入 binascii
导入 记录日志
导入 操作系统
导入 tempfile
来自 base64 导入 base64 解码, base64 编码
来自 datetime 导入 时间差
来自 打字 导入 任意, 角色, 可选

来自 torch.distributed 导入 文件存储, 存储, TCP 存储
来自 torch.distributed.elastic.events 导入 构建并记录会话事件, 节点状态

来自 .api 导入 (
    约会连接错误,
    预约错误,
    会合参数,
    预约状态错误,
)
来自 .动态 rendezvous 导入 rendezvous 后端, 令牌
来自 .工具 导入 _matches_machine_hostname, 解析预约端点


日志记录器 = 记录日志.获取日志记录器(__name__)

TCP 存储的默认端口
默认端口 = 29400


[文档] C10d rendezvous 后端(rendezvous 后端): 表示基于 C10d 的会合后端。 参数: 存储: 使用 :py:class:`torch.distributed.Store` 实例。 与 C10d 存储进行通信。 run_id: 约会运行的 ID。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` # 在__init__方法中查看解释。 _NULL_SENTINEL = "Y2FuaW1hZGFt" _store: 店铺 _键: 字符串 def 初始化(自身, 店铺: 存储, run_id: 字符串) -> : 如果 not run_id: 提升 ValueError("运行 ID 必须是非空字符串。") 自身._store = 存储 自身._键 = "torch.rendezvous." + 运行 ID # 存储的读取操作会阻塞调用者,直到指定的 # 可用性成为关键。这种行为使得它难以作为一个常规的键值字典使用。 # 作为一种解决方案,我们最初设置了一个哨兵值作为汇合状态。 # # 当这个值被返回时,我们将其视为 None。 # 作为一种解决方案,我们最初设置了一个哨兵值作为汇合状态。 自身._call_store("compare_set", 自身._键, "", 自身._NULL_SENTINEL) @property def 名称(自身) -> 字符串: "查看基类。" 返回 "c10d"
[文档] 获取状态(self) -> Optional[tuple[bytes, Token]]: """查看基类。""" base64_state: bytes = self._call_store("get", self._key) return self._decode_state(base64_state)
[文档] def 设置状态( self, 状态: bytes, 令牌: Optional[Token] = None ) -> Optional[tuple[bytes, Token, bool]]: """查看基类。""" base64_state_str: str = b64encode(state).decode() if token: # 确定 token 无效时的快捷方式。 if not isinstance(token, bytes): result = self.get_state() if result is not None: tmp = *result, False # Python 3.6 不支持返回时的元组解包 #语句。 返回 tmp 返回 None token = token.decode() else: token = self._NULL_SENTINEL base64_state: bytes = self._call_store( "compare_set", self._key, token, base64_state_str ) state_token_pair = self._decode_state(base64_state) if state_token_pair is None: return None 新状态,新令牌 = 状态令牌对 # C10d Store 的 compare_set 方法没有提供一种简单的方式来找出 # 我们的写入尝试是否成功。作为一个暴力解决方案,我们 # 对我们的本地状态和远程状态进行位运算比较。 返回新状态、新令牌、新状态等于状态
def _call_store(自身
, 存储操作: 字符串, *参数, **kwargs) -> 任意: try: 返回 getattr(自身._store, 存储操作)(*参数, **kwargs) 除了 (ValueError, 运行时错误, 超时错误) 作为 异常: 提升 约会连接错误( "连接到 C10d 存储失败。请查看内部异常以获取详细信息。" ) 来自 exc def _解码状态(自身, base64 状态: 字节) -> 可选[元组[字节, 令牌]] 如果 base64 状态 == 自身._NULL_SENTINEL.编码(): 返回 None try: 状态 = base64 解码(base64 状态) 除了 binascii.错误 作为 异常: 提升 预约状态错误( 状态对象已损坏。请查看内部异常以获取详细信息。 ) 来自 exc 返回 状态, base64 状态
def 创建 TCP 存储(参数: 会合参数) -> TCP 存储: 主机, 端口 = 解析 rendezvous 端点(参数.端点, 默认端口=DEFAULT_PORT) cfg_is_host = 参数.get_as_bool("is_host") 如果用户明确指定了我们的进程是否应该托管存储,请尊重它。 # the store, respect it. 如果 cfg_is_host not : is_host = cfg_is_host 否则尝试根据我们的主机名确定我们是否是主机 IP 地址。 否则: 是主机。 = _matches_machine_hostname(host) 超时 读取超时 = 角色(int, 参数.获取整数值("读取超时", 60)) 如果 read_timeout 0: 提升 ValueError(读取超时必须是一个正整数。) 在特定情况下,我们尝试两次实例化存储。详情 请参阅下面的异常子句中的解释。 是否为服务器 [is_host, 错误]: try: 存储 = TCP 存储( 主机, 端口, 是否为主机=是否为服务器, 多租户=True, 超时=时间差(秒数=read_timeout), ) 如果 是否为服务器: msg = f"进程"{操作系统.获取进程 ID()}承载 C10d rendezvous 后端的 TCP 存储。 构建并记录会话事件( run_id=参数.run_id, 消息=信息, 节点状态=节点状态.初始化 ) 记录器.信息(信息) 断开 除了 (ValueError, 运行时错误, 超时错误) 作为 异常: 如果我们启发式地推断 is_host 的值为 True,并且我们第一次尝试实例化 TCP 存储失败,再试一次。 如果第一次尝试实例化 TCP 存储失败,再试一次。 # more time with is_host 设置为 False。作为一个边缘情况,可以有 同一会合中的多个进程 # 机器中只有一个最终会托管商店。 如果 not 是服务器 cfg_is_host not : 提升 约会连接错误( "C10d 存储连接失败。请查看内部异常以获取详细信息。" ) 来自 exc 返回 存储 # type: ignore[possibly-undefined] def _create_file_store(参数: 会合参数) -> 文件存储: 如果用户指定了端点,我们将其视为文件的路径。 如果 参数.端点: 路径 = 参数.端点 否则: try: # 临时文件只对用户可读可写 # 此进程。 _, 路径 = 临时文件.mkstemp() 除了 OSError 作为 异常: 提升 预约错误( "C10d 存储的文件创建失败。请查看内部异常详情。" ) 来自 exc try: 存储 = 文件存储(路径) 除了 (ValueError, 运行时错误) 作为 异常: 提升 约会连接错误( "连接到 C10d 存储失败。请查看内部异常详情。" ) 来自 exc 返回 存储
[文档]def create_backend(参数: 会合参数) -> 元组[C10d rendezvous 后端, 存储]: 从指定的参数创建一个新的 :py:class:`C10dRendezvousBackend`。 +--------------+-----------------------------------------------------------+ | Parameter | 描述 | +==============+===========================================================+ | store_type | C10d 存储的类型。目前支持的类型 | | | "tcp" 和 "file" 对应于 | | | :py:class:`torch.distributed.TCPStore` 和 | | | :py:class:`torch.distributed.FileStore`,分别。| 默认为 "tcp"。 +--------------+-----------------------------------------------------------+ 读取超时 | 读取超时,以秒为单位,用于存储操作。 默认为 60 秒。 | | | 请注意,这仅适用于 `torch.distributed.TCPStore` 不相关。 `torch.distributed.FileStore` 不接受超时参数。 `is_host` 是一个布尔值,表示是否为该后端实例。 +--------------+-----------------------------------------------------------+ `is_host` 是一个布尔值,表示该后端实例是否为主机。 | | 将托管 C10d 存储。如未指定,则默认使用 | 通过匹配主机名或 IP 推断的启发式方法 | | 本机地址与指定的会合点地址进行比对 | | | 端点。默认为 `None`。 | | | | 请注意,此配置选项仅适用于 | | :py:class:`torch.distributed.TCPStore`. 在正常情况下 | | | 在您可安全跳过的环境下;唯一需要的时候 | | | 如果其值无法正确显示则需要 | | | 确定地(例如,会合端点有一个 CNAME 作为 | | | 主机名或与机器的 FQDN 不匹配)。 | +--------------+-----------------------------------------------------------+ ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 截至今天,我们仅支持 TCPStore 和 FileStore。其他存储类型暂不支持。 还没有实现所需的功能(例如 compare_set)。 店铺类型 = 参数.获取("店铺类型", "tcp").strip().小写() 店铺: 店铺 try: 如果 店铺类型 == 文件: 存储 = 创建文件存储(参数) elif 店铺类型 == "tcp": 存储 = _创建_tcp 存储(参数) 否则: 提升 ValueError( "提供的存储类型无效。目前仅支持文件和 tcp。" ) 后端 = C10d rendezvous 后端(店铺, 参数.run_id) 除了 异常 作为 e: 构建并记录会话事件( 消息=f"{类型(e).__name__}: {字符串(e)}", run_id=参数.run_id, 节点状态=节点状态.失败, ) 提升 返回 后端, 店铺

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源