#!/usr/bin/env python3
# mypy: 允许未类型化定义
版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。
导入 json
导入
记录日志
导入
系统
导入
线程
导入
时间
来自
打字
导入
可选
try:
导入 etcd # type: ignore[import]
除了
模块未找到错误:
来自 .
导入 _etcd_stub
作为 etcd
来自 torch.distributed.elastic.rendezvous
导入 (
RendezvousClosedError,
预约错误,
会合处理器,
集合信息,
会合参数,
会合存储信息,
RendezvousTimeoutError,
)
来自
.etcd 存储
导入
cas 延迟,
Etcd 存储
来自
.工具
导入
解析预约端点
__all__ = [
"Etcd rendezvous 重试性失败",
"Etcd rendezvous 立即重试",
"Etcd rendezvous 处理器",
"Etcd rendezvous",
"创建 rendezvous 处理器",
]
_日志格式 =
记录日志.
格式化器("%(levelname)s %(asctime)s %(message)s")
_日志处理器 =
记录日志.
流处理器(
系统.
标准错误输出)
_日志处理器.
设置格式化器(
_日志格式)
日志记录器 =
记录日志.
获取日志记录器(__name__)
记录器.
传播 =
假
记录器.
设置级别(
记录日志.
信息)
记录器.
添加处理器(
_日志处理器)
重试性失败异常意味着我们太晚了,无法完成
一个期望的状态转换(例如,由于竞争条件),
现在应该从开始重新启动。
建议稍作延迟以避免对 Etcd 进行垃圾邮件式操作。
类
Etcd 重试 rendezvous 失败(
异常):
通过
# 与可重试失败类似,但我们观察到的新状态表明我们
# 可以立即重试,即无需“安全延迟”。
类
Etcd 重试 rendezvous 立即(
异常):
通过
# 默认的会话超时时间。
_默认超时:
整型 = 600
# 10 分钟
# 达到最小节点数后的额外等待时间
# 如果会合是弹性的(最小值不等于最大值)。
_默认最后调用超时:
整型 = 30
# 30 秒
# 在 EtcdRendezvous 内部使用的各种常量
CONST_ETCD_SETUP_TTL = 5
CONST_ETCD_FROZEN_TTL = 10
CONST_ETCD_JOINABLE_EPHEMERAL_TTL = 10
# Ephemeral node TTL for worker's keep-alive key:
CONST_WORKER_KEEPALIVE_TTL = 10
# TTL for the ephemeral run_id-specific directory. All rendezvous state data
# for a specific run_id (job instance) is contained within directory.
# Its only role is to clean-up rendezvous data from old runs (for the case when
# etcd 服务器是持久的,并且对正确性没有影响,但应该
# 大于任何预期可以存活的 worker 进程的超时时间:
CONST_RUNID_SUBROOT_TTL = 7200 # 2 小时
[文档]
类
Etcd rendezvous 处理器(
会合处理器):
""
实现
`torch.distributed.elastic.rendezvous.RendezvousHandler` 接口
由
`torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous`.
``EtcdRendezvousHandler`` 使用 URL 来配置 rendezvous 的类型,并传递给 rendezvous 模块特定的配置。
使用并传递给 rendezvous 模块特定的配置。
基本的 etcd rendezvous 配置 URL 看起来如下所示
::
etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers> # noqa: W605
-- 示例 --
etcd://localhost:2379/1234?min_workers=1&max_workers=3
上面的 URL 解释如下:
1. 使用已注册到 ``etcd`` 的 rendezvous 处理器
方案
2. 要使用的 `etcd` 端点是 `localhost:2379`
3. 使用 `job_id == 1234` 作为 etcd 中的前缀(这允许多个作业共享同一个 etcd 服务器,只要)
4. 条件是它们使用相同的 etcd 服务器
`job_ids` 保证是唯一的。请注意,作业 ID 可以是
任何字符串(例如,不必是数字),只要它是
独特。
4. ``min_workers=1`` 和 ``max_workers=3`` 指定了一个范围
会员规模 - 当集群大小大于或等于 ``min_workers`` 时,Torch Distributed Elastic 开始运行作业
并且可以接受最多 ``max_workers`` 个工作节点加入集群。
以下是可以向 etcd 传递的所有参数的完整列表。
以下是可以传递给 etcd 的所有参数的完整列表。
rendezvous: 集合点
+--------------------------------------------+--------------------------+
| 参数 | 描述 |
+============================================+==========================+
| min_workers | 最小工作进程数 |
| | 工作人员为 |
| | 预约有效 |
+--------------------------------------------+--------------------------+
| max_workers | 最大工作线程数 |
| | 工人承认 |
+--------------------------------------------+--------------------------+
| 超时 | 总超时时间 |
| | 下一个会面是哪个 |
| | 预期成功 |
| | (默认 600 秒) |
+--------------------------------------------+--------------------------+
最后调用超时 | 额外等待时间
在最小值后进行最后调用
| | 工作人员数量已 |
已达到(默认值)
| | 到 30 秒)
+--------------------------------------------+--------------------------+
| etcd_prefix | 路径前缀(从 etcd |
| | 根),其中所有 |
| | etcd 节点将被 |
| | 创建(默认为 |
| | ``/torchelastic/p2p``) |
+--------------------------------------------+--------------------------+
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 初始化(
自身,
rdzv 实现:
Etcd rendezvous,
本地地址:
可选[
字符串
)]
""
参数:
rdzv_impl: 遇见会合的实现
local_addr: 当前节点的本地地址
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
自身._rdzv_impl = rdzv_impl
自身._local_addr = local_addr
def __del__(自身):
# TODO:在此处考虑使用弱引用。
删除
自身._rdzv_impl
def 获取后端(
自身) ->
字符串:
返回
etcd
def next_rendezvous(自身):
rdzv_version, 排名,
世界大小 =
自身._rdzv_impl.
集合点屏障()
记录器.
信息(
创建 EtcdStore 作为 c10d::Store 实现)
存储 =
自身._rdzv_impl.
设置键值存储(
Rdzv 版本)
引导存储信息 =
会合存储信息.
构建(
排名,
店铺,
本地地址=
自身.
本地地址
)
返回
集合信息(
店铺,
排名,
世界大小,
引导存储信息)
def 是否关闭(
自身):
try:
_, 状态 =
自身._rdzv_impl.
获取 rdzv 状态()
返回
状态[
状态] ==
关闭
除了 etcd.
Etcd 键未找到:
无会合状态,因此无法关闭。
返回
假
def 设置为已关闭(
自身):
自身._rdzv_impl.
设置为已关闭()
def 等待中的节点数(
自身):
try:
_, 状态 =
自身.
_rdzv 实现.
获取 rdzv 状态()
如果
状态[
状态] ==
最终:
返回
状态[
等待中的工作者数]
除了 etcd.EtcdKeyNotFound:
通过
返回 0
def get_run_id(自身) ->
字符串:
返回
自身._rdzv_impl.
运行_id
def 关闭(
自身) ->
布尔:
try:
自身.
设置为关闭()
返回
真实
除了
基础异常
作为 e:
记录器.
警告(
"关闭失败。发生错误:"%s",
字符串(e))
返回
错误
# TODO:我们可能还需要处理一些额外的错误,
# 如 EtcdLeaderElectionInProgress 和 EtcdWatcherCleared。这些是
仅适用于多节点 Etcd 集群。简单的重试即可。
但是添加到每个地方都很冗长。考虑将客户端调用包装起来
对于这些错误是否自动重试?
#
类 EtcdRendezvous:
使用 `etcd `__ 作为后端存储的会合实现。
def 初始化(
自身,
客户端,
前缀,
run_id,
num_min_workers,
num_max_workers,
超时,
最后一次调用超时时间,
):
自身.client = client
记录器.
信息(
"Etcd 机器:"%s",
自身.
客户端.
机器)
自身.
_前缀 =
前缀
自身.
_运行 ID =
运行 ID
自身._num_min_workers = num_min_workers
自身._num_max_workers = num_max_workers
自身.
_超时 =
超时
自身.
最后一次调用超时 = last_call_timeout
# 用于清理 TTL 刷新线程(用于临时密钥)
自身.
_租赁运行 ID 停止 = None
自身.
_租赁此排名停止 = None
如果 not
自身.
_前缀.
以...结尾(
根目录):
自身.
_前缀 +=
根目录
设置一个永久的前缀目录,如果不存在
如果
自身._prefix !=
根目录:
自身.
如果不存在则创建路径(
自身._prefix)
为此作业实例(run_id)租赁一个特定的“子根”节点
自身.
如果不存在则创建路径(
自身.
获取路径(""),
有效期=CONST_RUNID_SUBROOT_TTL)
自身.
_租约运行 ID 停止 =
自身.
设置租约续订(
自身.
获取路径(""), ttl=CONST_RUNID_SUBROOT_TTL
)
所有会合工作的子目录
自身.
如果不存在则创建路径(
自身.
获取路径("/rdzv"))
创建一个 rendezvous 版本计数器,如果不存在
try:
自身.
客户端.
写(
键=
自身.
获取路径("/rdzv/version_counter"),
值="0", prevExist=
假
)
除了 etcd.
Etcd 已存在:
通过
def __del__(自身):
# TODO:在此处考虑使用弱引用。
如果
自身.
_租约运行 ID 停止
是 not
无:
自身.
_租赁运行 ID 停止.
集合()
如果
自身.
_租赁此排名停止
是 not
无:
自身.
_租赁此排名停止.
集合()
def 集合屏障(
自身):
""
下次会合的主入口点。
此方法将阻塞,直到会合成功或发生超时。
返回:
``(rdzv_version, rank, world_size)``
抛出异常:
RendezvousTimeoutError - 等待会合超时。
预约已关闭错误 - 在等待时预约已关闭或正在关闭
预约错误 - 其他持久性错误
使预约不可重试
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
自身._rendezvous_deadline =
时间.
时间() +
自身.
_超时
while True:
如果
时间.
时间() >
自身.
约会截止日期:
提升
约会超时错误
记录器.
信息(
尝试加入下一个约会)
try:
# 如果存在,取消前一个约会的租约
如果
自身.
租赁此排名停止
是 not
无:
自身.
租赁此排名停止.
集合()
返回
自身.
初始化阶段()
除了
Etcd rendezvous 立即重试:
失败类型表明我们可以立即重试
通过
除了 EtcdRendezvousRetryableFailure:
如果是可重试的失败,请稍等片刻
以避免对 etcd 进行垃圾邮件式请求
时间.
睡眠(1)
除了 RendezvousTimeoutError:
记录器.
信息(
"EtcdRendezvousHandler 中发生了 rendezvous 超时")
提升
除了 RendezvousClosedError:
记录器.
信息(
"run_id= 的 rendezvous 已关闭"%s
被观察到处于关闭状态",
自身.
_运行 ID
)
提升
除了
预约错误:
提升
除了
异常
作为 e:
# 如果发生一般异常,等待一小段时间
# 以避免对 etcd 进行垃圾邮件式操作
# FIXME: 一些事情属于此类,例如
# etcd.EtcdKeyNotFound 等,可以更明确地处理。
记录器.
信息(
" rendezvous 尝试失败,将重试。原因:"%s", e)
时间.
睡眠(1)
def 初始化阶段(
自身):
""
初始时,预计 rendezvous 状态将是以下之一:
1. 空的(不存在的)- 在这种情况下,我们尝试创建一个新的。
2. 可加入的 - 我们尝试加入它。
3. 最终状态 - 我们宣布自己处于等待状态,并进入监控模式。
任何其他状态都被认为是过渡状态,将在短暂延迟后重试
。
返回:
```(rdzv_version, rank, world_size)``
抛出异常:
RendezvousClosedError - 当前 rendezvous 已关闭或正在关闭
EtcdRendezvousRetryableFailure - 观察到一些中间状态,最好稍后重试处理
state, which is best handled by retrying later
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
try:
active_version = 自身.try_create_rendezvous()
状态 = json.loads(
活跃版本.
值)
记录器.
信息(
"新会合状态已创建:"%s",
状态)
除了 etcd.
Etcd 已存在:
active_version, 状态 =
自身.
获取 rd zv 状态()
# 注意:上述查询可能失败(etcd.EtcdKeyNotFound),
# 但对我们来说这没关系 - 只意味着我们将从头开始重启。
记录器.
信息(
"观测到的现有会合状态:"%s",
状态)
如果
状态[
状态] ==
"已关闭":
提升
会合已关闭错误
如果
状态[
状态] ==
"可加入的":
返回
自身.
加入阶段(
状态[
版本])
如果
状态[
状态] ==
最终:
自身.
处理现有约会(
状态[
版本])
提升 EtcdRendezvousRetryImmediately
自身.
尝试等待状态改变(
etcd 索引=
活跃版本.
etcd 索引 + 1)
提升
Etcd rendezvous 重试失败
def 加入阶段(
自身,
预期版本):
""
我们观察到 rendezvous 状态处于 '可加入' 状态,并尝试加入此状态
特定版本,然后等待所有其他节点加入。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
如果无法加入,将抛出异常,导致重新进入。
active_version, this_rank = 自身.
加入会合(
预期版本)
状态 = json.loads(
活跃版本.
值)
记录器.
信息(
"已加入会合版本"%s
作为排名%s
. 全状态:%s",
状态[
版本
]
this_rank,
状态,
)
如果这个工作进程是第一个达到 num_min_workers 要求的工作进程,
# 和 rendezvous 仍然可加入(因此它是弹性的)
此工作者将负责等待“最后通话”
超时并关闭(即过渡到“冻结”)会合
之后。
作为防止此工作者(在最后调用超时期间)潜在失败的安全措施,
将 rendezvous 状态设置为短暂状态。
当达到 min_num_workers 时。
如果
这个排名 ==
自身._num_min_workers - 1
和
状态[
状态] ==
可加入:
记录器.
信息(
排名%s
负责加入最后的通话。,
此排名)
最后一次调用截止日期 =
时间.
时间() +
自身.
最后一次调用超时
自身.
处理加入最后一次调用(
预期版本,
最后调用截止日期)
记录器.
信息(
排名%s
完成加入最后通话。,
这个排名)
等待 rendezvous 状态冻结,这意味着一组固定的对等方
记录器.
信息(
"等待剩余节点。")
active_version = 自身.wait_for_peers(expected_version)
状态 = json.loads(
活跃版本.
值)
断言
状态[
版本] ==
预期版本, (
"逻辑错误:未能观察到版本不匹配"
)
返回
自身.
确认阶段(
预期版本,
此排名)
def 确认阶段(
自身,
预期版本,
这个排名):
""
一旦 rendezvous 状态从 '可加入' 转变为 '冻结',
我们要求每位参与者确认他们的会员资格并设置个人账户
保持存活 TTL 密钥,然后等待所有其他参与者确认,
这将成功结束这次会合。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
记录器.
信息(
所有节点已到达。确认成员资格。)
自身.
确认会员资格(
预期版本,
此排名)
记录器.
信息(
正在等待所有对等方的确认。)
活跃版本 =
自身.
等待最终(
预期版本)
状态 = json.loads(
活跃版本.
值)
记录器.
信息(
会合版本%s
已完成。最终状态:%s",
状态[
版本
]
状态,
)
遇合版本号;我们在其中的排名;世界大小
返回
状态[
版本
]
这个排名,
长度(
状态[
参与者])
def 处理现有约会(
自身,
预期版本):
""
处理已存在(状态为'final')的遇合的情况
在原地,我们必须宣布自己等待,并等待直到
下一次会合机会。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
# 如果状态是 'final' -> 增加等待的工人数
# 然后,观察状态变化:
如果不再是最终状态 -> 跳出并重试
如果缺少保活,则销毁并跳出
活跃状态 =
自身.
宣布自身等待(
预期版本)
记录器.
信息(
"已添加自身到等待列表。 rendezvous 全状态:"%s",
活跃状态.value
)
自身.
等待 rendezvous 释放(
预期版本)
记录器.
信息(
"之前存在的会合状态已更改。将重新尝试加入。"
)
def 尝试创建会话(
自身):
""
创建新的会合状态或引发表示意外状态的异常(例如,已存在)。
抛出异常:
rendezvous 错误 - 在意外状态下
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
初始版本是临时的 - 这是为了处理
可能无法完成设置事务的可能性
即“setup”到“joinable”的转换。
活跃版本 =
自身.
客户.
写(
键=
自身.
获取路径("/rdzv/active_version"),
值=json.
压缩包({
状态:
设置}),
prevExist=错误,
ttl=常量_ETCD 设置_TTL,
)
try:
版本计数器 =
自身.
客户端.
获取(
自身.
获取路径(
/rdzv/版本计数器))
版本计数器.value =
字符串(int(
版本计数器.
值) + 1)
自身.
客户端.
更新(
版本计数器)
除了 (etcd.
Etcd 键未找到, etcd.
Etcd 比较失败)
作为 e:
提升
预约错误(
"EtcdRendezvousHandler 意外状态,工作进程需要终止。"
) 来自 e
以下任何失败都将导致声明可重试的 rendezvous 失败。
临时/rdzv/active_version 将过期,然后某人可以...
重新尝试设置过程。
为参与者数据创建目录节点。
自身.
客户端.
写(
键=
自身.
获取路径(f
"/rdzv/v_"{
版本计数器.
值}"),
值=
无,
目录=True,
前存在=
错误,
)
# 发布 rendezvous 版本并信号它已准备好加入。
# 如果 rendezvous 在此之前被设置为关闭,将发生重试,
处理闭合条件的地方。
返回
自身.
客户端.
测试与设置(
键=
自身.
获取路径("/rdzv/active_version"),
值=json.
压缩包(
{
状态:
可加入,
版本:
版本计数器.
值,
参与者:
[]
}
),
前值=
活跃版本.
值,
)
def 加入集结点(
自身,
预期版本):
“辅助方法,用于加入阶段。”
使用比较并交换来将自己添加到集结点状态:
while True:
cas 延迟()
活跃版本,
状态 =
自身.
获取 rdzv 状态()
如果
状态[
状态] !=
可加入:
提升
Etcd rendezvous 重试性失败(
在我们加入之前,rendezvous 状态变成了不可加入。
必须加入下一个。
)
如果
状态[
版本] !=
预期版本:
提升
Etcd rendezvous 立即重试(
rendezvous 版本已更改。必须尝试加入新的。
)
断言
长度(
状态[
参与者]) <
自身.
最大工作线程数, (
逻辑错误:可连接的 rendezvous 应始终有空间
)
这个排名 =
长度(
状态[
参与者])
状态[
参与者].
追加(
此排名)
当达到最小工作者数量,或者将状态更改为冻结时,我们将设置
活跃版本节点为短暂的。
设置 TTL:
可选[int] = None
如果
长度(
状态[
参与者]) ==
自身.
_最大工作线程数:
状态[
状态] =
冻结
状态[
保持活跃] = []
设置 TTL = CONST_ETCD_FROZEN_TTL
elif 长度(
状态[
参与者]) >=
自身._num_min_workers:
设置 TTL = CONST_ETCD_JOINABLE_EPHEMERAL_TTL
try:
比较并交换
active_version = 自身.
客户端.
测试并设置(
键=
自身.
获取路径(
/rdzv/active_version),
值=json.
压缩包(
状态),
上一个值=
活跃版本.
值,
有效期=
设置 TTL,
)
我们成功加入。
返回
活跃版本,
这个排名
除了 etcd.
Etcd 比较失败:
记录器.
信息(
加入 rendezvous CAS 失败,正在重试)
def wait_for_peers(自身,
预期版本):
"辅助方法,用于连接阶段。"
活跃版本,
状态 =
自身.
获取_rdzv_state 状态()
while True:
如果
状态[
状态] ==
"冻结"
和
状态[
版本] ==
预期版本:
# 成功,所有对等方已到达。
返回
活跃版本
elif 状态[
状态] ==
"可加入的"
和
状态[
版本] ==
预期版本:
# 继续等待任何有趣的事件。
活跃版本,
状态 =
自身.
尝试等待状态改变(
etcd 索引=
活跃版本.
etcd 索引 + 1
)
否则:
当前无法进行有效转换
提升
Etcd rendezvous 重试失败(
" rendezvous 状态转换不再可能。必须重新进入。"
)
def 确认会员资格(
自身,
预期版本,
此排名):
确认阶段的辅助方法。
比较并交换循环
while True:
卡死延迟()
活跃版本,
状态 =
自身.
获取 rdzv 状态()
如果
状态[
状态] != "frozen":
提升 EtcdRendezvousRetryImmediately(
"Rendezvous no longer frozen, before we confirmed. "
"必须加入下一个"
)
如果
状态[
版本] !=
预期版本:
提升
Etcd rendezvous 重试立即(
会合版本已更改。必须尝试加入新的一个。
)
该租约键 =
自身.
获取路径(
f/rdzv/v_{
预期版本}
/排名_{
此排名}"
)
自身.
客户端.
集合(this_lease_key,
值=
无, ttl=CONST_WORKER_KEEPALIVE_TTL)
状态[
keep_alives].
追加(this_lease_key)
如果
长度(
状态[
保持活跃]) ==
长度(
状态[
参与者
)]
# Everyone confirmed (this rank is last to do so)
状态[
状态] =
最终
状态[
等待中的工人数] = 0
完成 =
真实
否则:
完成 =
假
try:
# 比较并交换。如果新状态仍然是冻结的,则保持其为临时状态。
活跃版本 =
自身.
客户端.
测试并设置(
键=
自身.
获取路径(
/rdzv/active_version),
值=json.
压缩包(
状态),
前值=
活跃版本.
值,
ttl=None 如果 finalize
否则 CONST_ETCD_FROZEN_TTL,
)
自身.
租赁此排名_停止 =
自身.
设置续租(
此租赁密钥, ttl=CONST_WORKER_KEEPALIVE_TTL
)
返回
活跃版本
除了 etcd.EtcdCompareFailed:
记录器.
信息(
确认成员资格 CAS 失败,正在重试)
def 等待最终(
自身,
预期版本):
辅助方法,用于确认阶段。
活跃版本,
状态 =
自身.
获取_rdzv_state 状态()
while True:
如果
状态[
状态] == "final"
和
状态[
版本] ==
预期版本:
# 成功。这次会合是最终的,我们接受它。
返回
活跃版本
elif 状态[
状态] == "frozen"
和
状态[
版本] ==
预期版本:
继续等待任何有趣的事件。
活跃版本,
状态 =
自身.
尝试等待状态改变(
etcd 索引=
活跃版本.
etcd 索引 + 1
)
否则:
当前无法进行有效转换。
提升
Etcd rendezvous 重试性失败。(
" rendezvous 状态转换不再可能。必须重新进入。"
)
def 自我宣布等待(
自身,
预期版本):
""
通告此工作器正在等待(通过 num_workers_waiting 计数器)加入下一个
集合点,但仅当状态和版本匹配时。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
while True:
cas 延迟()
活跃版本,
状态 =
自身.
获取预约状态()
如果
状态[
状态] !=
最终
或
状态[
版本] !=
预期版本:
提升 EtcdRendezvousRetryImmediately
# 增加计数器以通知额外的等待工作者。
状态["num_workers_waiting"] += 1
try:
活跃版本 =
自身.
客户端.
测试与设置(
键=
自身.
获取路径(
/rdzv/活动版本),
值=json.
压缩包(
状态),
前值=
活跃版本.
值,
)
返回
活跃版本
除了 etcd.
Etcd 比较失败:
记录器.
信息(
宣布自身为等待 CAS 失败,正在重试)
def 等待 rendezvous 释放(
自身,
预期版本):
""
当存在一个处于'final'状态的现有有效 rendezvous 时,我们必须等待下一个机会加入。
这样的机会可能来自:
1. rendezvous 状态被他人更改,在这种情况下,我们将解除阻塞并重试。
2. rendezvous 变得无效,因为至少有一位成员未能续订其
已租用的 keep_alive 节点。我们检测到这一点,并销毁 rendezvous。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
活跃版本,
状态 =
自身.
获取 rendezvous 状态()
while True:
如果
状态[
状态] !=
"最终"
或
状态[
版本] !=
预期版本:
返回
# 检查当前 rendezvous 状态是否有效,即所有
# 成员都活着(正在续租其租约)。
如果不行,就尝试摧毁这次会面,以便可以创建一个新的。
活跃成员 =
自身.
客户.
获取(
自身.
获取路径(f
"/rdzv/v_"{
预期版本}")
)
保持活跃密钥 = [ch.key
为 ch
在
活跃成员.
儿童]
为 key
在
状态[
保持活跃
]:
如果 key not
在
保持活跃键:
# 此参与者未续租其租约。我们将宣布此
# 会合版本为已死亡(但仅当它未发生变化时)
记录器.
信息(
"保活密钥"%s
未更新。,
键)
记录器.
信息(
会合版本%s
不完整。,
预期版本
)
记录器.
信息(
"试图摧毁它。")
# 比较并删除操作。如果比较失败,则抛出异常,
# 这意味着会合点已经被销毁/重新创建/关闭,
# 我们可以尝试重新进入屏障。
自身.
客户.
删除(
键=
自身.
获取路径("/rdzv/active_version"),
prevValue=活跃版本.
值,
)
记录器.
信息(
"毁坏了的会合版本"%s
成功。,
预期版本,
)
我们可以立即返回(并重试)
返回
现有的会合似乎有效,没有理由去摧毁它。
我们只需等待事情发生变化,然后重新检查。
try:
总超时 = (
最大值(
自身._rendezvous_deadline -
时间.
时间(), 0.0) + 1.0
)
自身.
客户.
查看(
键=
自身.
获取路径("/rdzv"),
索引=
活跃版本.
etcd 索引 + 1,
递归=True,
超时=
总超时,
)
除了 (etcd.
Etcd 事件索引已清除, etcd.
EtcdWatch 超时):
通过
如果
时间.
时间() >
自身.
_rendezvous 截止时间:
提升
约会超时错误
活跃版本,
状态 =
自身.
获取 rendezvous 状态()
def 处理加入最后一次调用(
自身,
预期版本,
截止日期):
""
在我们达到最小工作人数后,一个特定的工人开始承担
在关闭连接窗口之前等待额外超时的责任。
如果负责此任务的工人失败,由于 TTL 过期, rendezvous 将被销毁,其他参与者将重新 rendezvous。
在这里,我们期望看到状态 。
这里我们期望看到状态 。
优雅地退出,如果以下任一情况发生:
1. 状态变为
2. 超时发生(达到截止时间),在这种情况下
我们尝试过渡到
否则异常退出。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
活跃版本,
状态 =
自身.
获取_rdzv_state()
while True:
如果
状态[
状态] ==
冻结
和
状态[
版本] ==
预期版本:
# 工作者集在最后调用超时之前已冻结。这是可能的
# 当在超时之前达到 num_max_workers 时。
返回
如果
状态[
状态] !=
"可加入的"
或
状态[
版本] !=
预期版本:
提升
Etcd rendezvous 重试性失败(
" rendezvous 状态转换不再可能。必须重新进入。"
)
# 如果发生超时,尝试状态转换(可加入 -> 冻结)
如果
时间.
时间() >=
截止日期:
状态[
状态] =
冻结
状态[
保持活跃] = []
try:
活跃版本 =
自身.
客户.
测试并设置(
键=
自身.
获取路径("/rdzv/active_version"),
值=json.
压缩包(
状态),
前值=active_version.
值,
ttl=CONST_ETCD_FROZEN_TTL,
)
我们成功使这次会合冻结。
返回
除了 etcd.
Etcd 比较失败:
记录器.
信息(
"最后调用转换 CAS 失败。将重试"
)
cas_delay()
活跃版本,
状态 =
自身.
获取_rdzv_state()
继续
# 超时未发生,因此我们必须刷新 TTL,并等待
注意:我们只想在状态仍然是可加入的情况下刷新 TTL,因此在这里我们使用 CAS,即使我们并没有更改任何数据。
# 状态仍然是可加入的,因此在这里我们使用 CAS,即使我们并没有更改任何数据。
# 即使我们并没有更改任何数据。
try:
活跃版本 =
自身.
客户.
测试并设置(
键=
自身.
获取路径("/rdzv/active_version"),
值=
活跃版本.
值,
前值=
活跃版本.
值,
标题=
`CONST_ETCD_JOINABLE_EPHEMERAL_TTL`,
)
最小化“睡眠过度”
超时 =
最小(
CONST_ETCD_JOINABLE_EPHEMERAL_TTL / 2,
截止日期 -
时间.
时间() + 1.0,
延时 1 秒是可以接受的。
)
活跃版本,
状态 =
自身.
尝试等待状态改变(
etcd 索引=
活跃版本.
etcd 索引 + 1,
超时=
超时
)
除了 etcd.
Etcd 比较失败:
记录器.
信息(
"最后调用 TTL 刷新 CAS 失败,将重试")
cas 延迟()
active_version, 状态 =
自身.
获取 rdzv 状态()
def 关闭设置(
自身):
""
将当前 run_id 的会合标记为'关闭',用于通知其他参与者不要尝试执行(重新)会合。
这在其中一个工作者决定工作已完成时很有用。
当一个工作者决定任务完成时,这样做很有用。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
while True:
active_version, 状态 =
自身.
获取 rdzv 状态()
如果
状态[
状态] ==
"已关闭":
# 已由他人关闭。
返回
状态[
状态] =
"已关闭"
try:
自身.
客户.
测试和设置(
键=
自身.
获取路径("/rdzv/active_version"),
值=json.
压缩包(
状态),
前值=active_version.
值,
)
返回
除了 etcd.
Etcd 比较失败:
记录器.
信息(
设置关闭 CAS 失败,正在重试)
卡死延迟()
def 获取 rdzv 状态(
自身):
活跃版本 =
自身.
客户.
获取(
键=
自身.
获取路径("/rdzv/active_version"))
返回 active_version, json.loads(active_version.
值)
def 尝试等待状态改变(
自身,
etcd 索引,
超时=
无):
不要超过整体截止时间(至少不要超过 1 秒)
总超时 =
最大值(
自身.
rendezvous_deadline
rendezvous deadline
会合截止日期 - 时间.
时间(), 0.0) + 1.0
超时 =
总体超时
如果
超时
是 None
否则
最小(
超时,
总体超时)
try:
自身.
客户.
观看(
自身.
获取路径("/rdzv/active_version"),
索引=
etcd 索引,
超时=
超时
)
除了 (etcd.
Etcd 事件索引已清除, etcd.
Etcd 监视超时):
通过
如果
时间.
时间() >
自身._rendezvous_deadline:
提升
约会超时错误
很遗憾,我们必须再次获取以获取最后一个 etcd 索引。
返回
自身.
获取 rdzv 状态()
def 获取路径(
自身,
路径):
如果 not
路径.
以...开头(
根目录):
路径 =
根目录 +
路径
返回 f"{
自身.
前缀}
运行_{
自身.
_运行_id}{
路径}"
def 如果不存在则创建路径(
自身,
完整路径, ttl=
无):
try:
自身.
客户端.
写(
键=
完整路径,
值=
无,
目录=True, prevExist=
错误, ttl=ttl
)
除了 etcd.
Etcd 已存在:
通过
def 设置租约续订(
自身,
完整路径, ttl):
# 注意:为了使临时密钥 TTL 续订(~租约)正确工作,
确保不要调用任何可能导致长时间阻塞的方法
释放 Python 的 GIL!这是一个调用 pybind11 的例子
扩展函数是阻塞的/长时间运行的,但不是
进行 GIL 的局部释放。
def 租赁工作器(
客户端,
路径, ttl,
停止事件):
while True:
try:
客户端.
刷新(
路径, ttl=ttl)
除了 etcd.
Etcd 键未找到:
断开
除了
连接被拒绝错误:
# 此错误通常在测试期间发生,当服务器已经终止但
Python 垃圾回收器尚未调用__del__方法。
断开
如果
停止事件.
等待(
超时=ttl / 2):
断开
lease_stop_event = 线程.
活动()
lease_thread = 线程.Thread(
目标=lease_worker,
参数=(
自身.
客户端,
完整路径, ttl,
租约停止事件)
)
租约线程.
守护进程 =
真实
租约线程.
开始()
返回
租赁停止事件
def 存储额外数据(
自身,
rdzv 版本,
键,
值):
节点 =
自身.
获取路径(f
"/rdzv/v_"{
rdzv 版本}/extra_data")
try:
第一次存储任何内容时:
额外数据 =
自身.
客户端.
写(
键=
节点,
值=json.
压缩包({
键:
值}), prevExist=
假
)
返回
除了 etcd.
Etcd 已存在:
通过
# CAS 循环,确保我们不会丢失并发存储。
while True:
# 我们从不删除 extra_data。这里的失败应该是致命的,不需要特殊处理。
extra_data = 自身.
客户端.
获取(
节点)
new_extra_data_value = json.loads(额外数据.
值)
新的额外数据值[
键] = value
try:
额外数据 =
自身.
客户端.
测试与设置(
键=
节点,
值=json.
压缩包(
新的额外数据值),
前值=
额外数据.
值,
)
返回
除了 etcd.
Etcd 比较失败:
记录器.
信息(
存储 extra_data CAS 失败,正在重试)
时间.
睡眠(0.1)
def 加载 extra_data(
自身,
rdzv 版本,
键,
超时=
无):
节点 'extra_data' 本身及其所在的目录:
节点 =
自身.
获取路径(f
/rdzv/v_{
rdzv 版本}/extra_data")
node 目录 =
自身.
获取路径(f
/rdzv/v_{
rdzv 版本}")
# TODO: 实现超时
# https://github.com/pytorch/elastic/issues/12
while True:
# 结合等待节点本身及其内部键
根 =
自身.
客户端.
获取(node_dir)
# 查找是否存在 extra_data 节点
extra_data = [n 为 n
在
根.
子代
如果 n.key ==
节点]
断言
长度(extra_data)
≤ 1
# 节点中存在 extra_data 节点,检查所需键是否在其中。
如果
长度(extra_data) == 1:
extra_data_dict = json.loads(extra_data[0].值)
如果 key
在
额外数据字典:
返回
额外数据字典[
键]
# 'extra_data' 节点不存在,或者键尚未发布。
# 等待 extra_data 节点上的有趣事件并重试。
try:
自身.
客户端.
查看(
节点,
索引=
根.
etcd 索引 + 1)
除了 (etcd.
Etcd 事件索引已清除, etcd.
Etcd 监视超时):
通过
def 设置键值存储(
自身,
rdzv 版本):
存储路径 =
自身.
获取路径(f
"/rdzv/v_"{
rdzv 版本}
/kv)
自身.
如果不存在则创建路径(
存储路径)
返回
Etcd 存储(
etcd 客户端=
自身.
客户端,
etcd 存储前缀=
存储路径)
def _创建 etcd 客户端(
参数:
会合参数) -> etcd.
客户端:
从指定的 ``RendezvousParameters`` 创建一个新的 ``etcd.Client``。
hostname, 端口 =
解析 rendezvous 端点(
参数.
端点, 2379)
# 通信协议
协议 =
参数.
配置.
获取(
"协议")
如果
协议
是
无:
协议 =
http
否则:
如果
协议 != "http"
和
协议 != "https":
提升 ValueError(
"etcd 协议必须是 HTTP 或 HTTPS。")
SSL 客户端证书
ssl_cert = 参数.
配置.
获取(
证书)
如果 ssl_cert
是 not
无:
证书密钥 =
参数.
配置.
获取(
密钥)
如果
证书密钥
是 not
无:
etcd 客户端期望证书密钥作为第二个元素
`cert` 元组的数量
ssl 证书 = (ssl_cert,
证书密钥)
根证书
证书 =
参数.
配置.
获取(
cacert)
返回 etcd.
客户端(
hostname,
端口,
协议=
协议,
证书=ssl_cert,
ca 证书=
ca 证书,
允许重连=True,
)
torch.distributed "静态"注册的处理程序
def 创建_rdzv_handler 处理程序(
参数:
会合参数) ->
会合处理器:
""
使用方法:
::
rdzv_params = RendezvousParameters(
backend="etcd",
endpoint="192.168.0.42:2379",
run_id="123",
min_nodes=4,
max_nodes=8,
timeout=300,
last_call_timeout=30,
etcd_prefix="自定义前缀",
protocol="https",
cacert="/etc/kubernetes/certs/ca.crt",
cert="/etc/kubernetes/certs/client.crt",
key="/etc/kubernetes/certs/client.key"
# -- or --
rdzv_params = RendezvousParameters(
后端="etcd",
endpoint="192.168.0.42:2379",
run_id="123",
min_nodes=4,
max_nodes=8)
etcd_rdzv_handler = create_etcd_rendezvous_handler(rdzv_params)
哪里:
run_id - 唯一标识符,用于表示本次训练作业实例
min_nodes - 预期加入 rendezvous 的最小工作节点数
max_nodes - 允许加入 rendezvous 的最大工作节点数
默认未指定 min_workers。
超时 - 期望 next_rendezvous 发生的总超时时间
成功;否则将引发 RendezvousTimeoutError 异常
默认值为 600(10 分钟)。
最后调用超时 - 在达到最小工作人数后额外的等待时间("最后调用")
最小工作人数已达到。
默认为 30 秒。
etcd_prefix - 从 etcd 根开始的路径前缀,其中包含所有
etcd 节点将被创建。
默认为"/torchelastic/p2p"。
协议 - http(默认)或 https 访问 etcd。
cacert - 访问 etcd 的 CA 证书,仅与 https 相关。
证书 - 用于访问 etcd 的客户端证书,仅与 https 一起使用有意义。
密钥 - 用于访问 etcd 的客户端密钥,仅与 https 一起使用有意义。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
客户端 =
创建 etcd 客户端(
参数)
etcd 前缀 =
参数.
获取(
"etcd 前缀",
/torchelastic/p2p)
rdzv = Etcd rendezvous(
客户端=
客户端,
前缀=
etcd 前缀,
run_id=参数.run_id,
最小工作节点数=
参数.
最小节点数,
num_max_workers=参数.
最大节点数,
超时=
参数.get_as_int(
超时, _DEFAULT_TIMEOUT),
最后一次调用超时时间=
参数.get_as_int(
"last_call_timeout", 默认最后调用超时
),
)
返回
Etcd rendezvous 处理器(
rdzv 实现=rdzv,
本地地址=
参数.
本地地址,
)