快捷键

torch.distributed.elastic.rendezvous.etcd_rendezvous 的源代码

#!/usr/bin/env python3
# mypy: 允许未类型化定义

版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。

导入 json
导入 记录日志
导入 系统
导入 线程
导入 时间
来自 打字 导入 可选


try:
    导入 etcd  # type: ignore[import]
除了 模块未找到错误:
    来自 . 导入 _etcd_stub 作为 etcd

来自 torch.distributed.elastic.rendezvous 导入 (
    RendezvousClosedError,
    预约错误,
    会合处理器,
    集合信息,
    会合参数,
    会合存储信息,
    RendezvousTimeoutError,
)

来自 .etcd 存储 导入 cas 延迟, Etcd 存储
来自 .工具 导入 解析预约端点


__all__ = [
    "Etcd rendezvous 重试性失败",
    "Etcd rendezvous 立即重试",
    "Etcd rendezvous 处理器",
    "Etcd rendezvous",
    "创建 rendezvous 处理器",
]

_日志格式 = 记录日志.格式化器("%(levelname)s %(asctime)s %(message)s")
_日志处理器 = 记录日志.流处理器(系统.标准错误输出)
_日志处理器.设置格式化器(_日志格式)

日志记录器 = 记录日志.获取日志记录器(__name__)
记录器.传播 = 
记录器.设置级别(记录日志.信息)
记录器.添加处理器(_日志处理器)


重试性失败异常意味着我们太晚了,无法完成
一个期望的状态转换(例如,由于竞争条件),
现在应该从开始重新启动。
建议稍作延迟以避免对 Etcd 进行垃圾邮件式操作。
 Etcd 重试 rendezvous 失败(异常):
    通过


# 与可重试失败类似,但我们观察到的新状态表明我们
# 可以立即重试,即无需“安全延迟”。
 Etcd 重试 rendezvous 立即(异常):
    通过


# 默认的会话超时时间。
_默认超时: 整型 = 600  # 10 分钟

# 达到最小节点数后的额外等待时间
# 如果会合是弹性的(最小值不等于最大值)。
_默认最后调用超时: 整型 = 30  # 30 秒

# 在 EtcdRendezvous 内部使用的各种常量
CONST_ETCD_SETUP_TTL = 5
CONST_ETCD_FROZEN_TTL = 10
CONST_ETCD_JOINABLE_EPHEMERAL_TTL = 10

# Ephemeral node TTL for worker's keep-alive key:
CONST_WORKER_KEEPALIVE_TTL = 10

# TTL for the ephemeral run_id-specific directory. All rendezvous state data
# for a specific run_id (job instance) is contained within directory.
# Its only role is to clean-up rendezvous data from old runs (for the case when
# etcd 服务器是持久的,并且对正确性没有影响,但应该
# 大于任何预期可以存活的 worker 进程的超时时间:
CONST_RUNID_SUBROOT_TTL = 7200  # 2 小时


[文档] Etcd rendezvous 处理器(会合处理器): "" 实现 `torch.distributed.elastic.rendezvous.RendezvousHandler` 接口 `torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous`. ``EtcdRendezvousHandler`` 使用 URL 来配置 rendezvous 的类型,并传递给 rendezvous 模块特定的配置。 使用并传递给 rendezvous 模块特定的配置。 基本的 etcd rendezvous 配置 URL 看起来如下所示 :: etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers> # noqa: W605 -- 示例 -- etcd://localhost:2379/1234?min_workers=1&max_workers=3 上面的 URL 解释如下: 1. 使用已注册到 ``etcd`` 的 rendezvous 处理器 方案 2. 要使用的 `etcd` 端点是 `localhost:2379` 3. 使用 `job_id == 1234` 作为 etcd 中的前缀(这允许多个作业共享同一个 etcd 服务器,只要) 4. 条件是它们使用相同的 etcd 服务器 `job_ids` 保证是唯一的。请注意,作业 ID 可以是 任何字符串(例如,不必是数字),只要它是 独特。 4. ``min_workers=1`` 和 ``max_workers=3`` 指定了一个范围 会员规模 - 当集群大小大于或等于 ``min_workers`` 时,Torch Distributed Elastic 开始运行作业 并且可以接受最多 ``max_workers`` 个工作节点加入集群。 以下是可以向 etcd 传递的所有参数的完整列表。 以下是可以传递给 etcd 的所有参数的完整列表。 rendezvous: 集合点 +--------------------------------------------+--------------------------+ | 参数 | 描述 | +============================================+==========================+ | min_workers | 最小工作进程数 | | | 工作人员为 | | | 预约有效 | +--------------------------------------------+--------------------------+ | max_workers | 最大工作线程数 | | | 工人承认 | +--------------------------------------------+--------------------------+ | 超时 | 总超时时间 | | | 下一个会面是哪个 | | | 预期成功 | | | (默认 600 秒) | +--------------------------------------------+--------------------------+ 最后调用超时 | 额外等待时间 在最小值后进行最后调用 | | 工作人员数量已 | 已达到(默认值) | | 到 30 秒) +--------------------------------------------+--------------------------+ | etcd_prefix | 路径前缀(从 etcd | | | 根),其中所有 | | | etcd 节点将被 | | | 创建(默认为 | | | ``/torchelastic/p2p``) | +--------------------------------------------+--------------------------+ ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` def 初始化(自身, rdzv 实现: Etcd rendezvous, 本地地址: 可选[字符串)] "" 参数: rdzv_impl: 遇见会合的实现 local_addr: 当前节点的本地地址 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 自身._rdzv_impl = rdzv_impl 自身._local_addr = local_addr def __del__(自身): # TODO:在此处考虑使用弱引用。 删除 自身._rdzv_impl def 获取后端(自身) -> 字符串: 返回 etcd def next_rendezvous(自身): rdzv_version, 排名, 世界大小 = 自身._rdzv_impl.集合点屏障() 记录器.信息(创建 EtcdStore 作为 c10d::Store 实现) 存储 = 自身._rdzv_impl.设置键值存储(Rdzv 版本) 引导存储信息 = 会合存储信息.构建( 排名, 店铺, 本地地址=自身.本地地址 ) 返回 集合信息(店铺, 排名, 世界大小, 引导存储信息) def 是否关闭(自身): try: _, 状态 = 自身._rdzv_impl.获取 rdzv 状态() 返回 状态[状态] == 关闭 除了 etcd.Etcd 键未找到: 无会合状态,因此无法关闭。 返回 def 设置为已关闭(自身): 自身._rdzv_impl.设置为已关闭() def 等待中的节点数(自身): try: _, 状态 = 自身._rdzv 实现.获取 rdzv 状态() 如果 状态[状态] == 最终: 返回 状态[等待中的工作者数] 除了 etcd.EtcdKeyNotFound: 通过 返回 0 def get_run_id(自身) -> 字符串: 返回 自身._rdzv_impl.运行_id def 关闭(自身) -> 布尔: try: 自身.设置为关闭() 返回 真实 除了 基础异常 作为 e: 记录器.警告("关闭失败。发生错误:"%s", 字符串(e)) 返回 错误
# TODO:我们可能还需要处理一些额外的错误, # 如 EtcdLeaderElectionInProgress 和 EtcdWatcherCleared。这些是 仅适用于多节点 Etcd 集群。简单的重试即可。 但是添加到每个地方都很冗长。考虑将客户端调用包装起来 对于这些错误是否自动重试? # EtcdRendezvous: 使用 `etcd `__ 作为后端存储的会合实现。 def 初始化( 自身, 客户端, 前缀, run_id, num_min_workers, num_max_workers, 超时, 最后一次调用超时时间, ): 自身.client = client 记录器.信息("Etcd 机器:"%s", 自身.客户端.机器) 自身._前缀 = 前缀 自身._运行 ID = 运行 ID 自身._num_min_workers = num_min_workers 自身._num_max_workers = num_max_workers 自身._超时 = 超时 自身.最后一次调用超时 = last_call_timeout # 用于清理 TTL 刷新线程(用于临时密钥) 自身._租赁运行 ID 停止 = None 自身._租赁此排名停止 = None 如果 not 自身._前缀.以...结尾(根目录): 自身._前缀 += 根目录 设置一个永久的前缀目录,如果不存在 如果 自身._prefix != 根目录: 自身.如果不存在则创建路径(自身._prefix) 为此作业实例(run_id)租赁一个特定的“子根”节点 自身.如果不存在则创建路径(自身.获取路径(""), 有效期=CONST_RUNID_SUBROOT_TTL) 自身._租约运行 ID 停止 = 自身.设置租约续订( 自身.获取路径(""), ttl=CONST_RUNID_SUBROOT_TTL ) 所有会合工作的子目录 自身.如果不存在则创建路径(自身.获取路径("/rdzv")) 创建一个 rendezvous 版本计数器,如果不存在 try: 自身.客户端.( =自身.获取路径("/rdzv/version_counter"), ="0", prevExist= ) 除了 etcd.Etcd 已存在: 通过 def __del__(自身): # TODO:在此处考虑使用弱引用。 如果 自身._租约运行 ID 停止 not : 自身._租赁运行 ID 停止.集合() 如果 自身._租赁此排名停止 not : 自身._租赁此排名停止.集合() def 集合屏障(自身): "" 下次会合的主入口点。 此方法将阻塞,直到会合成功或发生超时。 返回: ``(rdzv_version, rank, world_size)`` 抛出异常: RendezvousTimeoutError - 等待会合超时。 预约已关闭错误 - 在等待时预约已关闭或正在关闭 预约错误 - 其他持久性错误 使预约不可重试 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 自身._rendezvous_deadline = 时间.时间() + 自身._超时 while True: 如果 时间.时间() > 自身.约会截止日期: 提升 约会超时错误 记录器.信息(尝试加入下一个约会) try: # 如果存在,取消前一个约会的租约 如果 自身.租赁此排名停止 not : 自身.租赁此排名停止.集合() 返回 自身.初始化阶段() 除了 Etcd rendezvous 立即重试: 失败类型表明我们可以立即重试 通过 除了 EtcdRendezvousRetryableFailure: 如果是可重试的失败,请稍等片刻 以避免对 etcd 进行垃圾邮件式请求 时间.睡眠(1) 除了 RendezvousTimeoutError: 记录器.信息("EtcdRendezvousHandler 中发生了 rendezvous 超时") 提升 除了 RendezvousClosedError: 记录器.信息( "run_id= 的 rendezvous 已关闭"%s被观察到处于关闭状态", 自身._运行 ID ) 提升 除了 预约错误: 提升 除了 异常 作为 e: # 如果发生一般异常,等待一小段时间 # 以避免对 etcd 进行垃圾邮件式操作 # FIXME: 一些事情属于此类,例如 # etcd.EtcdKeyNotFound 等,可以更明确地处理。 记录器.信息(" rendezvous 尝试失败,将重试。原因:"%s", e) 时间.睡眠(1) def 初始化阶段(自身): "" 初始时,预计 rendezvous 状态将是以下之一: 1. 空的(不存在的)- 在这种情况下,我们尝试创建一个新的。 2. 可加入的 - 我们尝试加入它。 3. 最终状态 - 我们宣布自己处于等待状态,并进入监控模式。 任何其他状态都被认为是过渡状态,将在短暂延迟后重试 返回: ```(rdzv_version, rank, world_size)`` 抛出异常: RendezvousClosedError - 当前 rendezvous 已关闭或正在关闭 EtcdRendezvousRetryableFailure - 观察到一些中间状态,最好稍后重试处理 state, which is best handled by retrying later ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` try: active_version = 自身.try_create_rendezvous() 状态 = json.loads(活跃版本.) 记录器.信息("新会合状态已创建:"%s", 状态) 除了 etcd.Etcd 已存在: active_version, 状态 = 自身.获取 rd zv 状态() # 注意:上述查询可能失败(etcd.EtcdKeyNotFound), # 但对我们来说这没关系 - 只意味着我们将从头开始重启。 记录器.信息("观测到的现有会合状态:"%s", 状态) 如果 状态[状态] == "已关闭": 提升 会合已关闭错误 如果 状态[状态] == "可加入的": 返回 自身.加入阶段(状态[版本]) 如果 状态[状态] == 最终: 自身.处理现有约会(状态[版本]) 提升 EtcdRendezvousRetryImmediately 自身.尝试等待状态改变(etcd 索引=活跃版本.etcd 索引 + 1) 提升 Etcd rendezvous 重试失败 def 加入阶段(自身, 预期版本): "" 我们观察到 rendezvous 状态处于 '可加入' 状态,并尝试加入此状态 特定版本,然后等待所有其他节点加入。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 如果无法加入,将抛出异常,导致重新进入。 active_version, this_rank = 自身.加入会合(预期版本) 状态 = json.loads(活跃版本.) 记录器.信息( "已加入会合版本"%s作为排名%s. 全状态:%s", 状态[版本] this_rank, 状态, ) 如果这个工作进程是第一个达到 num_min_workers 要求的工作进程, # 和 rendezvous 仍然可加入(因此它是弹性的) 此工作者将负责等待“最后通话” 超时并关闭(即过渡到“冻结”)会合 之后。 作为防止此工作者(在最后调用超时期间)潜在失败的安全措施, 将 rendezvous 状态设置为短暂状态。 当达到 min_num_workers 时。 如果 这个排名 == 自身._num_min_workers - 1 状态[状态] == 可加入: 记录器.信息(排名%s负责加入最后的通话。, 此排名) 最后一次调用截止日期 = 时间.时间() + 自身.最后一次调用超时 自身.处理加入最后一次调用(预期版本, 最后调用截止日期) 记录器.信息(排名%s完成加入最后通话。, 这个排名) 等待 rendezvous 状态冻结,这意味着一组固定的对等方 记录器.信息("等待剩余节点。") active_version = 自身.wait_for_peers(expected_version) 状态 = json.loads(活跃版本.) 断言 状态[版本] == 预期版本, ( "逻辑错误:未能观察到版本不匹配" ) 返回 自身.确认阶段(预期版本, 此排名) def 确认阶段(自身, 预期版本, 这个排名): "" 一旦 rendezvous 状态从 '可加入' 转变为 '冻结', 我们要求每位参与者确认他们的会员资格并设置个人账户 保持存活 TTL 密钥,然后等待所有其他参与者确认, 这将成功结束这次会合。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 记录器.信息(所有节点已到达。确认成员资格。) 自身.确认会员资格(预期版本, 此排名) 记录器.信息(正在等待所有对等方的确认。) 活跃版本 = 自身.等待最终(预期版本) 状态 = json.loads(活跃版本.) 记录器.信息( 会合版本%s已完成。最终状态:%s", 状态[版本] 状态, ) 遇合版本号;我们在其中的排名;世界大小 返回 状态[版本] 这个排名, 长度(状态[参与者]) def 处理现有约会(自身, 预期版本): "" 处理已存在(状态为'final')的遇合的情况 在原地,我们必须宣布自己等待,并等待直到 下一次会合机会。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` # 如果状态是 'final' -> 增加等待的工人数 # 然后,观察状态变化: 如果不再是最终状态 -> 跳出并重试 如果缺少保活,则销毁并跳出 活跃状态 = 自身.宣布自身等待(预期版本) 记录器.信息( "已添加自身到等待列表。 rendezvous 全状态:"%s", 活跃状态.value ) 自身.等待 rendezvous 释放(预期版本) 记录器.信息( "之前存在的会合状态已更改。将重新尝试加入。" ) def 尝试创建会话(自身): "" 创建新的会合状态或引发表示意外状态的异常(例如,已存在)。 抛出异常: rendezvous 错误 - 在意外状态下 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 初始版本是临时的 - 这是为了处理 可能无法完成设置事务的可能性 即“setup”到“joinable”的转换。 活跃版本 = 自身.客户.( =自身.获取路径("/rdzv/active_version"), =json.压缩包({状态: 设置}), prevExist=错误, ttl=常量_ETCD 设置_TTL, ) try: 版本计数器 = 自身.客户端.获取(自身.获取路径(/rdzv/版本计数器)) 版本计数器.value = 字符串(int(版本计数器.) + 1) 自身.客户端.更新(版本计数器) 除了 (etcd.Etcd 键未找到, etcd.Etcd 比较失败) 作为 e: 提升 预约错误( "EtcdRendezvousHandler 意外状态,工作进程需要终止。" ) 来自 e 以下任何失败都将导致声明可重试的 rendezvous 失败。 临时/rdzv/active_version 将过期,然后某人可以... 重新尝试设置过程。 为参与者数据创建目录节点。 自身.客户端.( =自身.获取路径(f"/rdzv/v_"{版本计数器.}"), =, 目录=True, 前存在=错误, ) # 发布 rendezvous 版本并信号它已准备好加入。 # 如果 rendezvous 在此之前被设置为关闭,将发生重试, 处理闭合条件的地方。 返回 自身.客户端.测试与设置( =自身.获取路径("/rdzv/active_version"), =json.压缩包( { 状态: 可加入, 版本: 版本计数器., 参与者: [] } ), 前值=活跃版本., ) def 加入集结点(自身, 预期版本): “辅助方法,用于加入阶段。” 使用比较并交换来将自己添加到集结点状态: while True: cas 延迟() 活跃版本, 状态 = 自身.获取 rdzv 状态() 如果 状态[状态] != 可加入: 提升 Etcd rendezvous 重试性失败( 在我们加入之前,rendezvous 状态变成了不可加入。 必须加入下一个。 ) 如果 状态[版本] != 预期版本: 提升 Etcd rendezvous 立即重试( rendezvous 版本已更改。必须尝试加入新的。 ) 断言 长度(状态[参与者]) < 自身.最大工作线程数, ( 逻辑错误:可连接的 rendezvous 应始终有空间 ) 这个排名 = 长度(状态[参与者]) 状态[参与者].追加(此排名) 当达到最小工作者数量,或者将状态更改为冻结时,我们将设置 活跃版本节点为短暂的。 设置 TTL: 可选[int] = None 如果 长度(状态[参与者]) == 自身._最大工作线程数: 状态[状态] = 冻结 状态[保持活跃] = [] 设置 TTL = CONST_ETCD_FROZEN_TTL elif 长度(状态[参与者]) >= 自身._num_min_workers: 设置 TTL = CONST_ETCD_JOINABLE_EPHEMERAL_TTL try: 比较并交换 active_version = 自身.客户端.测试并设置( =自身.获取路径(/rdzv/active_version), =json.压缩包(状态), 上一个值=活跃版本., 有效期=设置 TTL, ) 我们成功加入。 返回 活跃版本, 这个排名 除了 etcd.Etcd 比较失败: 记录器.信息(加入 rendezvous CAS 失败,正在重试) def wait_for_peers(自身, 预期版本): "辅助方法,用于连接阶段。" 活跃版本, 状态 = 自身.获取_rdzv_state 状态() while True: 如果 状态[状态] == "冻结" 状态[版本] == 预期版本: # 成功,所有对等方已到达。 返回 活跃版本 elif 状态[状态] == "可加入的" 状态[版本] == 预期版本: # 继续等待任何有趣的事件。 活跃版本, 状态 = 自身.尝试等待状态改变( etcd 索引=活跃版本.etcd 索引 + 1 ) 否则: 当前无法进行有效转换 提升 Etcd rendezvous 重试失败( " rendezvous 状态转换不再可能。必须重新进入。" ) def 确认会员资格(自身, 预期版本, 此排名): 确认阶段的辅助方法。 比较并交换循环 while True: 卡死延迟() 活跃版本, 状态 = 自身.获取 rdzv 状态() 如果 状态[状态] != "frozen": 提升 EtcdRendezvousRetryImmediately( "Rendezvous no longer frozen, before we confirmed. " "必须加入下一个" ) 如果 状态[版本] != 预期版本: 提升 Etcd rendezvous 重试立即( 会合版本已更改。必须尝试加入新的一个。 ) 该租约键 = 自身.获取路径( f/rdzv/v_{预期版本}/排名_{此排名}" ) 自身.客户端.集合(this_lease_key, =, ttl=CONST_WORKER_KEEPALIVE_TTL) 状态[keep_alives].追加(this_lease_key) 如果 长度(状态[保持活跃]) == 长度(状态[参与者)] # Everyone confirmed (this rank is last to do so) 状态[状态] = 最终 状态[等待中的工人数] = 0 完成 = 真实 否则: 完成 = try: # 比较并交换。如果新状态仍然是冻结的,则保持其为临时状态。 活跃版本 = 自身.客户端.测试并设置( =自身.获取路径(/rdzv/active_version), =json.压缩包(状态), 前值=活跃版本., ttl=None 如果 finalize 否则 CONST_ETCD_FROZEN_TTL, ) 自身.租赁此排名_停止 = 自身.设置续租( 此租赁密钥, ttl=CONST_WORKER_KEEPALIVE_TTL ) 返回 活跃版本 除了 etcd.EtcdCompareFailed: 记录器.信息(确认成员资格 CAS 失败,正在重试) def 等待最终(自身, 预期版本): 辅助方法,用于确认阶段。 活跃版本, 状态 = 自身.获取_rdzv_state 状态() while True: 如果 状态[状态] == "final" 状态[版本] == 预期版本: # 成功。这次会合是最终的,我们接受它。 返回 活跃版本 elif 状态[状态] == "frozen" 状态[版本] == 预期版本: 继续等待任何有趣的事件。 活跃版本, 状态 = 自身.尝试等待状态改变( etcd 索引=活跃版本.etcd 索引 + 1 ) 否则: 当前无法进行有效转换。 提升 Etcd rendezvous 重试性失败。( " rendezvous 状态转换不再可能。必须重新进入。" ) def 自我宣布等待(自身, 预期版本): "" 通告此工作器正在等待(通过 num_workers_waiting 计数器)加入下一个 集合点,但仅当状态和版本匹配时。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` while True: cas 延迟() 活跃版本, 状态 = 自身.获取预约状态() 如果 状态[状态] != 最终 状态[版本] != 预期版本: 提升 EtcdRendezvousRetryImmediately # 增加计数器以通知额外的等待工作者。 状态["num_workers_waiting"] += 1 try: 活跃版本 = 自身.客户端.测试与设置( =自身.获取路径(/rdzv/活动版本), =json.压缩包(状态), 前值=活跃版本., ) 返回 活跃版本 除了 etcd.Etcd 比较失败: 记录器.信息(宣布自身为等待 CAS 失败,正在重试) def 等待 rendezvous 释放(自身, 预期版本): "" 当存在一个处于'final'状态的现有有效 rendezvous 时,我们必须等待下一个机会加入。 这样的机会可能来自: 1. rendezvous 状态被他人更改,在这种情况下,我们将解除阻塞并重试。 2. rendezvous 变得无效,因为至少有一位成员未能续订其 已租用的 keep_alive 节点。我们检测到这一点,并销毁 rendezvous。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 活跃版本, 状态 = 自身.获取 rendezvous 状态() while True: 如果 状态[状态] != "最终" 状态[版本] != 预期版本: 返回 # 检查当前 rendezvous 状态是否有效,即所有 # 成员都活着(正在续租其租约)。 如果不行,就尝试摧毁这次会面,以便可以创建一个新的。 活跃成员 = 自身.客户.获取( 自身.获取路径(f"/rdzv/v_"{预期版本}") ) 保持活跃密钥 = [ch.key ch 活跃成员.儿童] key 状态[保持活跃]: 如果 key not 保持活跃键: # 此参与者未续租其租约。我们将宣布此 # 会合版本为已死亡(但仅当它未发生变化时) 记录器.信息("保活密钥"%s未更新。, ) 记录器.信息( 会合版本%s不完整。, 预期版本 ) 记录器.信息("试图摧毁它。") # 比较并删除操作。如果比较失败,则抛出异常, # 这意味着会合点已经被销毁/重新创建/关闭, # 我们可以尝试重新进入屏障。 自身.客户.删除( =自身.获取路径("/rdzv/active_version"), prevValue=活跃版本., ) 记录器.信息( "毁坏了的会合版本"%s成功。, 预期版本, ) 我们可以立即返回(并重试) 返回 现有的会合似乎有效,没有理由去摧毁它。 我们只需等待事情发生变化,然后重新检查。 try: 总超时 = ( 最大值(自身._rendezvous_deadline - 时间.时间(), 0.0) + 1.0 ) 自身.客户.查看( =自身.获取路径("/rdzv"), 索引=活跃版本.etcd 索引 + 1, 递归=True, 超时=总超时, ) 除了 (etcd.Etcd 事件索引已清除, etcd.EtcdWatch 超时): 通过 如果 时间.时间() > 自身._rendezvous 截止时间: 提升 约会超时错误 活跃版本, 状态 = 自身.获取 rendezvous 状态() def 处理加入最后一次调用(自身, 预期版本, 截止日期): "" 在我们达到最小工作人数后,一个特定的工人开始承担 在关闭连接窗口之前等待额外超时的责任。 如果负责此任务的工人失败,由于 TTL 过期, rendezvous 将被销毁,其他参与者将重新 rendezvous。 在这里,我们期望看到状态 。 这里我们期望看到状态 。 优雅地退出,如果以下任一情况发生: 1. 状态变为 2. 超时发生(达到截止时间),在这种情况下 我们尝试过渡到 否则异常退出。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 活跃版本, 状态 = 自身.获取_rdzv_state() while True: 如果 状态[状态] == 冻结 状态[版本] == 预期版本: # 工作者集在最后调用超时之前已冻结。这是可能的 # 当在超时之前达到 num_max_workers 时。 返回 如果 状态[状态] != "可加入的" 状态[版本] != 预期版本: 提升 Etcd rendezvous 重试性失败( " rendezvous 状态转换不再可能。必须重新进入。" ) # 如果发生超时,尝试状态转换(可加入 -> 冻结) 如果 时间.时间() >= 截止日期: 状态[状态] = 冻结 状态[保持活跃] = [] try: 活跃版本 = 自身.客户.测试并设置( =自身.获取路径("/rdzv/active_version"), =json.压缩包(状态), 前值=active_version., ttl=CONST_ETCD_FROZEN_TTL, ) 我们成功使这次会合冻结。 返回 除了 etcd.Etcd 比较失败: 记录器.信息( "最后调用转换 CAS 失败。将重试" ) cas_delay() 活跃版本, 状态 = 自身.获取_rdzv_state() 继续 # 超时未发生,因此我们必须刷新 TTL,并等待 注意:我们只想在状态仍然是可加入的情况下刷新 TTL,因此在这里我们使用 CAS,即使我们并没有更改任何数据。 # 状态仍然是可加入的,因此在这里我们使用 CAS,即使我们并没有更改任何数据。 # 即使我们并没有更改任何数据。 try: 活跃版本 = 自身.客户.测试并设置( =自身.获取路径("/rdzv/active_version"), =活跃版本., 前值=活跃版本., 标题=`CONST_ETCD_JOINABLE_EPHEMERAL_TTL`, ) 最小化“睡眠过度” 超时 = 最小( CONST_ETCD_JOINABLE_EPHEMERAL_TTL / 2, 截止日期 - 时间.时间() + 1.0, 延时 1 秒是可以接受的。 ) 活跃版本, 状态 = 自身.尝试等待状态改变( etcd 索引=活跃版本.etcd 索引 + 1, 超时=超时 ) 除了 etcd.Etcd 比较失败: 记录器.信息("最后调用 TTL 刷新 CAS 失败,将重试") cas 延迟() active_version, 状态 = 自身.获取 rdzv 状态() def 关闭设置(自身): "" 将当前 run_id 的会合标记为'关闭',用于通知其他参与者不要尝试执行(重新)会合。 这在其中一个工作者决定工作已完成时很有用。 当一个工作者决定任务完成时,这样做很有用。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` while True: active_version, 状态 = 自身.获取 rdzv 状态() 如果 状态[状态] == "已关闭": # 已由他人关闭。 返回 状态[状态] = "已关闭" try: 自身.客户.测试和设置( =自身.获取路径("/rdzv/active_version"), =json.压缩包(状态), 前值=active_version., ) 返回 除了 etcd.Etcd 比较失败: 记录器.信息(设置关闭 CAS 失败,正在重试) 卡死延迟() def 获取 rdzv 状态(自身): 活跃版本 = 自身.客户.获取(=自身.获取路径("/rdzv/active_version")) 返回 active_version, json.loads(active_version.) def 尝试等待状态改变(自身, etcd 索引, 超时=): 不要超过整体截止时间(至少不要超过 1 秒) 总超时 = 最大值(自身.rendezvous_deadline rendezvous deadline 会合截止日期 - 时间.时间(), 0.0) + 1.0 超时 = 总体超时 如果 超时 None 否则 最小(超时, 总体超时) try: 自身.客户.观看( 自身.获取路径("/rdzv/active_version"), 索引=etcd 索引, 超时=超时 ) 除了 (etcd.Etcd 事件索引已清除, etcd.Etcd 监视超时): 通过 如果 时间.时间() > 自身._rendezvous_deadline: 提升 约会超时错误 很遗憾,我们必须再次获取以获取最后一个 etcd 索引。 返回 自身.获取 rdzv 状态() def 获取路径(自身, 路径): 如果 not 路径.以...开头(根目录): 路径 = 根目录 + 路径 返回 f"{自身.前缀}运行_{自身._运行_id}{路径}" def 如果不存在则创建路径(自身, 完整路径, ttl=): try: 自身.客户端.( =完整路径, =, 目录=True, prevExist=错误, ttl=ttl ) 除了 etcd.Etcd 已存在: 通过 def 设置租约续订(自身, 完整路径, ttl): # 注意:为了使临时密钥 TTL 续订(~租约)正确工作, 确保不要调用任何可能导致长时间阻塞的方法 释放 Python 的 GIL!这是一个调用 pybind11 的例子 扩展函数是阻塞的/长时间运行的,但不是 进行 GIL 的局部释放。 def 租赁工作器(客户端, 路径, ttl, 停止事件): while True: try: 客户端.刷新(路径, ttl=ttl) 除了 etcd.Etcd 键未找到: 断开 除了 连接被拒绝错误: # 此错误通常在测试期间发生,当服务器已经终止但 Python 垃圾回收器尚未调用__del__方法。 断开 如果 停止事件.等待(超时=ttl / 2): 断开 lease_stop_event = 线程.活动() lease_thread = 线程.Thread( 目标=lease_worker, 参数=(自身.客户端, 完整路径, ttl, 租约停止事件) ) 租约线程.守护进程 = 真实 租约线程.开始() 返回 租赁停止事件 def 存储额外数据(自身, rdzv 版本, , ): 节点 = 自身.获取路径(f"/rdzv/v_"{rdzv 版本}/extra_data") try: 第一次存储任何内容时: 额外数据 = 自身.客户端.( =节点, =json.压缩包({: }), prevExist= ) 返回 除了 etcd.Etcd 已存在: 通过 # CAS 循环,确保我们不会丢失并发存储。 while True: # 我们从不删除 extra_data。这里的失败应该是致命的,不需要特殊处理。 extra_data = 自身.客户端.获取(节点) new_extra_data_value = json.loads(额外数据.) 新的额外数据值[] = value try: 额外数据 = 自身.客户端.测试与设置( =节点, =json.压缩包(新的额外数据值), 前值=额外数据., ) 返回 除了 etcd.Etcd 比较失败: 记录器.信息(存储 extra_data CAS 失败,正在重试) 时间.睡眠(0.1) def 加载 extra_data(自身, rdzv 版本, , 超时=): 节点 'extra_data' 本身及其所在的目录: 节点 = 自身.获取路径(f/rdzv/v_{rdzv 版本}/extra_data") node 目录 = 自身.获取路径(f/rdzv/v_{rdzv 版本}") # TODO: 实现超时 # https://github.com/pytorch/elastic/issues/12 while True: # 结合等待节点本身及其内部键 = 自身.客户端.获取(node_dir) # 查找是否存在 extra_data 节点 extra_data = [n n .子代 如果 n.key == 节点] 断言 长度(extra_data) 1 # 节点中存在 extra_data 节点,检查所需键是否在其中。 如果 长度(extra_data) == 1: extra_data_dict = json.loads(extra_data[0].) 如果 key 额外数据字典: 返回 额外数据字典[] # 'extra_data' 节点不存在,或者键尚未发布。 # 等待 extra_data 节点上的有趣事件并重试。 try: 自身.客户端.查看(节点, 索引=.etcd 索引 + 1) 除了 (etcd.Etcd 事件索引已清除, etcd.Etcd 监视超时): 通过 def 设置键值存储(自身, rdzv 版本): 存储路径 = 自身.获取路径(f"/rdzv/v_"{rdzv 版本}/kv) 自身.如果不存在则创建路径(存储路径) 返回 Etcd 存储(etcd 客户端=自身.客户端, etcd 存储前缀=存储路径) def _创建 etcd 客户端(参数: 会合参数) -> etcd.客户端: 从指定的 ``RendezvousParameters`` 创建一个新的 ``etcd.Client``。 hostname, 端口 = 解析 rendezvous 端点(参数.端点, 2379) # 通信协议 协议 = 参数.配置.获取("协议") 如果 协议 : 协议 = http 否则: 如果 协议 != "http" 协议 != "https": 提升 ValueError("etcd 协议必须是 HTTP 或 HTTPS。") SSL 客户端证书 ssl_cert = 参数.配置.获取(证书) 如果 ssl_cert not : 证书密钥 = 参数.配置.获取(密钥) 如果 证书密钥 not : etcd 客户端期望证书密钥作为第二个元素 `cert` 元组的数量 ssl 证书 = (ssl_cert, 证书密钥) 根证书 证书 = 参数.配置.获取(cacert) 返回 etcd.客户端( hostname, 端口, 协议=协议, 证书=ssl_cert, ca 证书=ca 证书, 允许重连=True, ) torch.distributed "静态"注册的处理程序 def 创建_rdzv_handler 处理程序(参数: 会合参数) -> 会合处理器: "" 使用方法: :: rdzv_params = RendezvousParameters( backend="etcd", endpoint="192.168.0.42:2379", run_id="123", min_nodes=4, max_nodes=8, timeout=300, last_call_timeout=30, etcd_prefix="自定义前缀", protocol="https", cacert="/etc/kubernetes/certs/ca.crt", cert="/etc/kubernetes/certs/client.crt", key="/etc/kubernetes/certs/client.key" # -- or -- rdzv_params = RendezvousParameters( 后端="etcd", endpoint="192.168.0.42:2379", run_id="123", min_nodes=4, max_nodes=8) etcd_rdzv_handler = create_etcd_rendezvous_handler(rdzv_params) 哪里: run_id - 唯一标识符,用于表示本次训练作业实例 min_nodes - 预期加入 rendezvous 的最小工作节点数 max_nodes - 允许加入 rendezvous 的最大工作节点数 默认未指定 min_workers。 超时 - 期望 next_rendezvous 发生的总超时时间 成功;否则将引发 RendezvousTimeoutError 异常 默认值为 600(10 分钟)。 最后调用超时 - 在达到最小工作人数后额外的等待时间("最后调用") 最小工作人数已达到。 默认为 30 秒。 etcd_prefix - 从 etcd 根开始的路径前缀,其中包含所有 etcd 节点将被创建。 默认为"/torchelastic/p2p"。 协议 - http(默认)或 https 访问 etcd。 cacert - 访问 etcd 的 CA 证书,仅与 https 相关。 证书 - 用于访问 etcd 的客户端证书,仅与 https 一起使用有意义。 密钥 - 用于访问 etcd 的客户端密钥,仅与 https 一起使用有意义。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 客户端 = 创建 etcd 客户端(参数) etcd 前缀 = 参数.获取("etcd 前缀", /torchelastic/p2p) rdzv = Etcd rendezvous( 客户端=客户端, 前缀=etcd 前缀, run_id=参数.run_id, 最小工作节点数=参数.最小节点数, num_max_workers=参数.最大节点数, 超时=参数.get_as_int(超时, _DEFAULT_TIMEOUT), 最后一次调用超时时间=参数.get_as_int( "last_call_timeout", 默认最后调用超时 ), ) 返回 Etcd rendezvous 处理器( rdzv 实现=rdzv, 本地地址=参数.本地地址, )

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源