# mypy: 忽略错误
版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。
导入 abc
导入 json
导入
操作系统
导入
信号
导入
套接字
导入
时间
导入
跟踪回溯
导入
警告
来自
集合
导入 defaultdict
来自 contextlib
导入 contextmanager
来自 dataclasses
导入
数据类,
字段
来自
枚举
导入
枚举
来自
打字
导入
任意,
可调用,
可选,
联合
导入 torch.distributed.elastic.rendezvous
作为 rdzv
导入 torch.distributed.elastic.utils.store
作为 store_util
来自 torch.distributed.elastic.events
导入
活动,
事件源,
记录
来自 torch.distributed.elastic.metrics
导入
专业, put_metric
来自 torch.distributed.elastic.multiprocessing
导入
进程失败, SignalException
来自 torch.distributed.elastic.rendezvous
导入 RendezvousGracefulExitError
来自
torch.distributed.elastic.utils.日志
导入
获取日志记录器
__all__ = [
"WorkerSpec",
"Worker",
"WorkerState",
"WorkerGroup",
运行结果,
弹性代理,
简单弹性代理,
]
_终端状态同步 ID = "torchelastic/agent/terminal_state"
默认角色 =
"默认"
日志记录器 =
获取日志记录器(__name__)
[文档]@dataclass
类 WorkerSpec:
"""特定类型工作者的蓝图信息。
对于给定的角色,必须只存在一个工作规范。
工作规范应跨所有节点(机器)保持一致,
即每个节点为特定规范运行相同数量的工作进程。
参数:
角色定义:为具有此规范的工作进程定义的用户角色
本地世界大小:运行本地工作进程的数量
fn: (已弃用,请使用 entrypoint 代替)
entrypoint:工作函数或命令
args:传递给 ``entrypoint`` 的参数
rdzv_handler: 处理该组工作进程的 rdzv
max_restarts: 工作进程的最大重试次数
monitor_interval: 每 ``n`` 秒监控工作进程的状态
master_port: 在 rank 0 上运行 c10d 存储的固定端口
如果未指定,则将选择一个随机的空闲端口
master_addr:将 master_addr 固定为在 rank 0 上运行 c10d 存储
如果未指定,则将选择 agent rank 0 上的主机名
redirects:将标准流重定向到文件,
有选择性地进行重定向
通过传递映射来对本地排名进行排序
tee:将指定的标准流(s)输出到控制台和文件,
有选择性地将本地排名的 tee 输出到控制台,通过传递映射,
优先于“重定向”设置。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
角色:
字符串
本地世界大小:
整型
rdzv 处理器: rdzv.
rendezvous 处理器
函数:
可选[
可调用] = None
# TODO @kiuk - 将入口点设为必填字段
入口:
联盟[
可调用,
字符串,
无] = None
参数:
元组 = ()
最大重启次数:
整型 = 3
监控间隔:
浮点数 = 0.1
主机端口:
可选[int] = None
主机地址:
可选[
字符串] = None
本地地址:
可选[
字符串] = None
定义 __post_init__(
自身):
断言
自身.
本地世界大小 > 0
断言
自身.
监控间隔 > 0
如果
自身.
函数:
warnings.警告(
"WorkerSpec.fn 将被弃用,"
"请使用 WorkerSpec.entrypoint 代替",
分类=
弃用警告,
)
自身.
入口 =
自身.fn
断言
自身.
入口
[文档] def get_entrypoint_name(self):
获取入口点名称。
如果入口点是函数(例如 `Callable`),则返回其 `__qualname__`。
否则如果入口点是二进制(例如 `str`),则返回二进制名称。
"""
如果 isinstance(self.entrypoint, str):
返回 os.path.basename(self.entrypoint)
else:
assert self.entrypoint is not None
return self.entrypoint.__qualname__
[文档]
类
工作者:
"""一个工作实例。
将其与表示工作规格的 ``WorkerSpec`` 进行对比,
工作者。一个“工作者”由一个“工作者规范”创建。一个“工作者”是用来
一个 `WorkerSpec` 对象相当于一个类。
工人的“id”被解释
通过“ElasticAgent”的具体实现。对于本地
代理,可能是工作进程的 ``pid (整数)``,对于远程
代理,可以编码为 ``主机:端口 (字符串)``。
参数:
id (任何类型): 唯一标识一个工作进程(由代理解释)
local_rank (整数): 工作进程的本地排名
global_rank (int):全局排名(整数)
role_rank (int):角色排名(整数)
world_size (int):全局工作节点数(整数)
role_world_size (int):同一角色工作节点数(整数)
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
__slots__ = [
id,
本地排名,
全局排名,
角色排名,
"世界大小",
角色世界大小,
]
定义
初始化(
自身,
本地排名: int,
全球排名:
整型 = -1,
角色等级:
整型 = -1,
世界大小:
整型 = -1,
角色世界大小:
整型 = -1,
):
# 此工作者的唯一标识符
自身.id:
任何 = None
# 在被监控的具有相同角色的工作者中,该工作者的排名
由相同的“agent”实例执行。
自身.
本地排名:
整型 =
本地排名
在所有角色中的所有工作者中,该工作者的排名。
在所有“agent”实例中。
重新 rendezvous 之间,全局排名不稳定。
自身.
全球排名:
整型 =
全球排名
# 工作员在其角色所有工作员中的排名
# 在所有 ``agent`` 实例中。
# # 角色排名在重新 rendezvous 之间不稳定。
自身.
角色排名:
整型 =
角色等级
# 全球工人总数(由于弹性)
# 世界大小可能在重新会合之间发生变化。
自身.
世界大小:
整型 =
世界大小
# 具有相同角色的工人总数(由于弹性)
重新会面时,角色世界大小可能会改变。
自身.
角色世界大小:
整型 =
角色世界大小
定义 __str__(
自身):
返回 (
flocal_rank={
自身.
本地排名}
global_rank={
自身.
全球排名}"
frole_rank={
自身.
角色排名}
world_size={
自身.
世界大小}"
f",角色世界大小="{
自身.
角色世界大小}"
)
定义 __repr__(
自身):
返回
字符串(
自身)
[文档]
类 WorkerState(
字符串,
枚举):
"""一个“WorkerGroup”的状态。
工人组中的工人在状态上作为一个单元进行更改。如果一个单独的工人
在一个工作组中失败,整个集合被视为失败
未知 - 代理丢失了工作组状态,无法恢复
初始化 - 工作组对象已创建,尚未启动
健康 - 工人跑步且健康
不健康 - 工人跑步且不健康
停止 - 工人被代理中断停止
成功 - 工人完成跑步(退出码 0)
失败 - 工作者未能成功完成(退出码!0)
工作组从初始的 ``INIT`` 状态开始,
然后过渡到 ``HEALTHY`` 或 ``UNHEALTHY`` 状态,
最后达到终端的 ``SUCCEEDED`` 或 ``FAILED`` 状态。
工人组可以被中断并临时置于“停止”状态
由代理操作。处于“停止”状态的工人将被安排重启
在不久的将来由代理完成。一些将工人投入的例子
停止状态是:
1. 工作组故障|观察到不健康
2. 检测到成员变更
当对工作组执行操作(启动、停止、rdzv、重试等)失败时
并且导致操作仅部分应用于工作组
状态将被设置为“未知”。通常这种情况发生在代理在状态变化事件中未捕获/未处理的异常时。
代理在状态变化事件中未捕获/未处理的异常。代理不期望从“未知”状态恢复工作组,最好是自行终止并允许作业管理器重试该节点。
未知状态下的工作组。最好是自行终止并允许作业管理器重试该节点。
自行终止并允许作业管理器重试该节点。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
未知 =
未知
初始化 =
初始化
健康 =
健康
不健康 =
"不健康"
停止 =
"停止"
成功 =
成功
失败 =
失败
[文档] @staticmethod
def is_running(state: "WorkerState") -> bool:
"""返回 Worker 的状态。
返回:
如果工作状态表示工作仍在运行,则返回 True
(例如,该进程存在但不一定健康)。
"""
返回状态是否在 WorkerState.HEALTHY 或 WorkerState.UNHEALTHY 中
[文档]class WorkerGroup:
一组 ``Worker`` 实例。
该类定义了由 ``ElasticAgent`` 管理的 ``WorkerSpec`` 对应的一组 ``Worker`` 实例。该工作组是否包含跨实例的工人取决于代理的实现。
工作组是否包含跨实例的工人取决于代理的实现。
"""
__slots__ = [
"spec"
"workers"
"store"
"group_rank"
"group_world_size"
"state"
"master_addr"
"master_port"
]
def __init__(self, spec: WorkerSpec):
self.spec = spec
self.workers = [Worker(local_rank=i) for i in range(self.spec.local_world_size)]
# assigned after rdzv
self.store = None
self.group_rank = None
self.group_world_size = None
self.master_addr = None
self.master_port = None
self.state = WorkerState.INIT
类
_角色实例信息:
该类由智能体用于与其他智能体交换信息。
这些信息用于确定智能体在异构环境中管理的工人排名,
其中不同的智能体可以拥有不同的信息。
不同数量的工人。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
__slots__ = [角色,
"排名",
本地世界大小]
定义
初始化(
自身,
角色:
字符串,
排名: int,
本地世界大小: int):
r初始化代理类实例。
参数:
role (str): 用户定义的具有此规范的工人角色
rank (int): 代理的排名
local_world_size (int): 运行的本地工作进程数量
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
自身.
角色 =
角色
自身.
排名 =
排名
自身.
本地世界大小 =
本地世界大小
定义
序列化(
自身) ->
字节:
字典数据 = {
角色:
自身.
角色,
"排名":
自身.
排名,
本地世界大小:
自身.
本地世界大小,
}
返回 json.
压缩包(
字典数据).
编码(
编码="UTF-8")
@staticmethod
定义
反序列化(
数据:
字节):
字典数据 = json.loads(
数据.
解码(
编码="UTF-8"))
返回
角色实例信息(
字典数据[
角色
]
字典数据[
"排名"
]
字典数据[
本地世界大小]
)
@staticmethod
定义
比较(obj1, obj2) -> int:
如果 obj1.
角色 == obj2.
角色:
返回 obj1.
排名 - obj2.
排名
elif obj1.角色 > obj2.
角色:
返回 1
否则:
返回 -1
@staticmethod
定义
查找角色边界(
角色信息:
列表,
角色:
字符串) ->
元组[int, int
]:
开始索引,
结束索引 = -1, -1
为
索引,
角色信息
在
列举(
角色信息列表):
如果
角色信息.
角色 ==
角色:
如果
起始索引 == -1:
起始索引 =
索引
结束索引 =
索引
返回 (
开始索引,
结束索引)
[文档]@数据类
class RunResult:
"""返回工作执行的成果。
运行结果遵循“全有或全无”策略,只有当此代理管理的所有本地工作都成功完成时,运行才成功。
只有当此代理管理的所有本地工作都成功完成时,运行才成功。
如果结果是成功的(例如 ``is_failed() = False``)那么 ``return_values``
字段包含由本代理管理的工人产生的输出(返回值)
根据他们的全球排名。即 ``result.return_values[0]`` 是返回值
全局排名 0
.. 注意:: ``return_values`` 仅在 worker 入口点是一个函数时才有意义。指定为二进制入口点的 worker 并没有规范化的返回值,``return_values`` 字段没有意义,可能为空。
作为函数。指定为二进制入口点的 worker 并没有规范化的返回值,``return_values`` 字段没有意义,可能为空。
并没有规范化的返回值和 ``return_values`` 字段没有意义,可能为空。
可能为空。
如果 `is_failed()` 返回 `True`,则 `failures` 字段包含失败信息,再次强调,这是通过失败工作者的全局排名映射的。
`return_values` 和 `failures` 中的键是互斥的,也就是说,一个工作者的最终状态只能是以下之一:成功、失败。工作者故意
`return_values` 和 `failures` 中的键是互斥的,也就是说,一个工作者的最终状态只能是以下之一:成功、失败。工作者故意
`return_values` 和 `failures` 中的键是互斥的,也就是说,一个工作者的最终状态只能是以下之一:成功、失败。工作者故意
根据代理的重启策略终止的,不表示
在 ``return_values`` 或 ``failures`` 中。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
状态:WorkerState
return_values: dict[int, Any] = field(default_factory=dict)
failures: dict[int, ProcessFailure] = field(default_factory=dict)
def is_failed(self) -> bool:
return self.state == WorkerState.FAILED
定义
获取完全限定主机名() ->
字符串:
返回
套接字.
获取完全限定域名(
套接字.
获取主机名())
[文档]
类 ElasticAgent(abc.ABC):
一个负责管理一个或多个工作进程的代理进程。
工作进程假定是常规的分布式 PyTorch 脚本。
当代理创建工作进程时,代理提供必要信息,以便工作进程正确初始化 torch 进程组。
代理与工作进程之间的确切部署拓扑和比例取决于具体应用。
代理与工作进程之间的确切部署拓扑和比例取决于具体应用。
代理与工作进程之间的确切部署拓扑和比例取决于具体应用。
关于代理的具体实现和用户的职位安置
偏好。例如,要在 GPU 上运行分布式训练作业,使用 8 个训练师(每个 GPU 一个)可以:
1. 使用 8 个单 GPU 实例,每个实例放置一个代理,管理
4. 使用 8 个单 GPU 实例,每个实例放置一个代理,管理
每个代理 1 个工人。
2. 使用 4 个双 GPU 实例,每个实例放置一个代理,进行管理。
2 个工人每个代理。
3. 使用 2 个四 GPU 实例,每个实例放置一个代理,进行管理。
每个代理 4 个工人。
使用 1 个 x8 GPU 实例,每个实例放置一个代理,进行管理。
每个代理 8 个工人。
使用
::
group_result = agent.run()
如果 group_result.is_failed():
# 工作进程失败
failure = group_result.failures[0]
logger.exception("worker 0 失败,退出代码为:%s", failure.exit_code)
否则:
返回 group_result.return_values[0] # 返回排名 0 的结果
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
[文档] @abc.abstractmethod
def run(self, role: str = DEFAULT_ROLE) -> RunResult:
"""运行代理。
支持在失败时重试工作组,最多重试 `max_restarts` 次。
返回:
执行结果,包含返回值或
每个工作者的全局排名映射的失败详情。
Raises:
异常 - 与工作进程无关的其他失败
"""
raise NotImplementedError
[文档] @abc.abstractmethod
def get_worker_group(self, role: str = DEFAULT_ROLE) -> WorkerGroup:
"""返回给定 ``role`` 的 ``WorkerGroup``。
注意,工作组是一个可变对象,因此在
多线程/进程环境可能会改变状态。
实现者被鼓励(但不是必须)返回
防御性的只读副本。
"""
抛出未实现异常
[文档]
类 SimpleElasticAgent(ElasticAgent):
“一个管理特定类型工作角色的 ``ElasticAgent``。
管理单个 ``WorkerSpec`` 的 ``WorkerGroup`` 工作者的 ``ElasticAgent``
例如,一种特定的工作者角色。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
定义
初始化(
自身,
规格: WorkerSpec,
退出屏障超时:
浮点数 = 300):
自身.
工作组 =
工作组(
规格)
自身.
剩余重启次数 =
自身.
工作组.
规格.
最大重启次数
自身._store = None
自身.
_退出屏障超时 = exit_barrier_timeout
自身.
_总执行时间 = 0
定义
获取工作组(
自身,
角色:
字符串 =
默认角色) ->
工作组:
返回
自身.
_工人组
[文档] @abc.抽象方法
def _start_workers(self, worker_group: WorkerGroup) -> dict[int, Any]:
r"""启动 ``worker_group.spec.local_world_size`` 个工作者。
这是根据工作者组的工作者规范进行的。
返回一个将 ``local_rank`` 映射到工作者 ``id`` 的映射。
"""
抛出未实现异常
[文档] @abc.抽象方法
def _stop_workers(
self, 工作组: WorkerGroup, 是否重启: bool = False
) -> None:
停止给定工作组中的所有工作者。
实现者必须处理所有由定义的状态的工作者。
``WorkerState``。也就是说,它必须优雅地处理停止
不存在的工人、不健康的(卡住)工人等情况
"""
抛出未实现异常
[文档] @abc.abstractmethod
def _monitor_workers(self, worker_group: WorkerGroup) -> RunResult:
r"""检查 ``worker_group`` 中的工作者。
此函数还返回工作者组的新状态。
"""
抛出未实现异常
[文档] @abc.抽象方法
def _shutdown(
self, death_sig: signal.Signals = signal.SIGTERM, is_restart: bool = False
) -> None:
清理代理工作期间分配的任何资源。
Args:
死亡信号:发送给子进程的信号,默认为 SIGTERM
"""
抛出未实现异常
[文档] @prof
定义 _rendezvous(
自身,
工作组:
工作组) ->
无:
r运行指定工作规范的工作者的 rendezvous。
为工作者分配新的全局排名和世界大小。
更新工作组 rendezvous 存储。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
规范 =
工作组.
规范
与
自身.
记录时长(
约会):
约会信息 =
规格.
约会处理程序.next_rendezvous()
存储 =
约会信息.
存储
组排名 = rdzv_info.
排名
群组世界大小 = rdzv_info.
世界大小
# 主地址/端口号可以被显式覆盖
# TODO:BC - 仅针对静态 rdzv,可以进一步简化
主机地址 =
规格.
主机地址
或
路由信息.
引导存储信息.
主机地址
主端口 =
规格.
主端口
或
rdzv 信息.
引导存储信息.
主端口
自身._store =
存储
与
自身.
录制时长(
"分配工人排名"):
工人 =
自身.
_分配工人排名(
店铺,
群组排名,
世界组大小,
规范
)
工作组.
工人 =
工人
工作组.
存储 =
存储
工作组.
组排名 =
组排名
工作组.
群组世界大小 =
群组世界大小
工作组.
主机地址 =
主机地址
工作组.
主机端口 =
主机端口
重启次数 =
规格.
最大重启次数 -
自身.
剩余重启次数
记录器.
信息(
"[%(角色)s
] 工作者 rendezvous 完成。结果:
输入文本翻译为简体中文为:\n"
" 重启次数="
%(重启次数)s
输入文本翻译为简体中文为:\n"
" 主机地址="%(master_addr)s
输入文本翻译为简体中文为:\n"
" master_port="%(master_port)s
输入文本翻译为简体中文为:\n"
" group_rank="%(group_rank)s
输入文本翻译为简体中文为:\n"
" group_world_size="%(group_world_size)s
输入文本翻译为简体中文为:\n"
" local_ranks="%(local_ranks)s
输入文本翻译为简体中文为:\n"
" 角色排名="%(role_ranks)s
输入文本翻译为简体中文为:\n"
" 全球排名="%(global_ranks)s
输入文本翻译为简体中文为:\n"
" 角色世界大小=%(role_world_sizes)s
输入文本翻译为简体中文为:\n"
" 全球世界大小="
全球世界大小
输入文本翻译为简体中文为:\n",
{
角色:
规格.
角色,
重启次数:
重启次数,
"master_addr": 主机地址,
"master_port": 主机端口,
"group_rank": 群组排名,
"group_world_size": 世界组大小,
本地排名: [
工人.
本地排名
为
工作者
在
工作者
]
角色排名: [
工人.
角色排名
为
工作者
在
工人
]
全球排名: [
工人.
全球排名
为
工作者
在
工人
]
角色世界大小: [
工人.
世界角色大小
为
工作者
在
工作者
]
全局世界大小: [
工人.
世界大小
为
工作者
在
工作者
]
},
)
# pyre-fixme[56]: Pyre 无法推断装饰器的类型
`torch.distributed.elastic.metrics.prof`
[文档] @prof
定义
分配工人等级(
自身,
店铺,
群组排名: int,
世界组大小: int,
规格: WorkerSpec
) -> 列表[
工人
]:
确定工作进程的正确排名。
快速路径:当所有工作进程具有相同的角色和世界大小时。我们计算全局排名为 group_rank * group_world_size + local_rank。并且 role_world_size 与 global_world_size 相同。在此情况下不使用 TCP 存储。
的全局排名为 group_rank * group_world_size + local_rank。并且 role_world_size 与 global_world_size 相同。在此情况下不使用 TCP 存储。
的 global_world_size 相同。在此情况下不使用 TCP 存储。
在这种情况下。这仅在用户设置环境变量
`TORCH_ELASTIC_WORKER_IDENTICAL` 为 1 时启用。
时间复杂度:每个工作器 O(1),总体 O(1)
慢路径:当工作器具有不同的角色和世界大小时。我们使用
以下算法:
1. 每个代理将其配置(group_rank、group_world_size、num_workers)写入公共存储。
,num_workers)到公共存储。
2. 排名 0 的代理从存储中读取所有 role_info。
确定每个代理的工人排名。
3. 确定全局排名:全局排名通过计算它前面所有工人的 local_world_size 的累积和来计算。
由于效率原因,每个工人被分配一个基本的全局排名。
为了效率,每个工人被分配一个基本的全局排名。
使得其工作者在范围 [base_global_rank,] 内
base_global_rank + local_world_size).
4. 确定角色等级:角色等级是通过算法确定的
在第 3 点中,除了排名是计算得出的之外
关于角色名称的尊重。
5. 等级 0 的代理将分配的等级写入存储。
6. 每个代理从存储中读取分配的等级。
时间复杂度:每个工作器 O(1),等级 0 O(n),总体 O(n)
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
如果
操作系统.
环境.
获取(
火炬弹性工作器相同标识, "0") == "1":
全球世界大小 =
群组世界大小 *
规格.
本地世界大小
基础全局排名 =
组排名 *
规格.
本地世界大小
基础角色排名 =
基础全球排名
角色世界大小 =
全球世界大小
否则:
角色信息前缀 =
torchelastic/角色信息/
分配的排名前缀 =
torchelastic/分配的排名/
代理角色信息 =
角色实例信息(
规格.
角色,
群组排名,
规格.
本地世界大小
)
店铺.
集合(f"{
角色信息前缀}{
群组排名}",
代理角色信息.
序列化())
# tcp 存储与 rank 0 协同,因此我们可以利用它进行额外的计算以减少整体操作次数。
如果
组排名 == 0:
角色信息字节 =
店铺.
多次获取(
[ftorchelastic/角色信息/{i}"
为 i
在
范围(
世界组大小
]
)
角色信息 = [
_角色实例信息.
反序列化(
信息字节)
为 info_bytes
在 role_infos_bytes
]
role_sizes = defaultdict(lambda: 0)
全局大小 = 0
为
角色信息
在
角色信息列表:
角色大小[
角色信息.
角色] +=
角色信息.
本地世界大小
全局大小 +=
角色信息.
本地世界大小
基础全局排名 = 0
角色等级 = defaultdict(lambda: 0)
键 = []
values = []
为 i,
角色信息
在
列举(
角色信息列表):
键.
追加(f"{
分配的等级前缀}{i}")
值.
追加(
json.压缩包(
[
基础全球排名,
全球规模,
角色排名[
角色信息.
角色
]
角色尺寸[
角色信息.
角色
]
]
)
)
基础全球排名 +=
角色信息.
本地世界大小
角色排名[
角色信息.
角色] +=
角色信息.
本地世界大小
店铺.
多集(
键,
值)
# 获取将阻塞,直到数据在存储中可用。
(
基础全局排名,
全球世界大小,
基础角色排名,
世界角色大小,
) = json.loads(店铺.
获取(f"{
分配的排名前缀}{
群组排名}"))
工作者 = []
为
本地排名
在
范围(
规格.
本地世界大小):
工作者 =
工作员(
本地排名=
本地排名,
全球排名=
基础全球排名 +
本地排名,
角色排名=
基础角色排名 +
本地排名,
世界大小=
全球世界大小,
角色世界大小=
世界角色大小,
)
工作者.
追加(
工人)
返回
工作者
# pyre-fixme[56]: Pyre 无法推断装饰器的类型
`torch.distributed.elastic.metrics.prof`
[文档] @prof
def _initialize_workers(self, worker_group: WorkerGroup) -> None:
r"""为 worker_group 启动一组全新的工作者。"""
本质上,这是一个随后调用 `start_workers` 的会合。
调用者应首先调用 `_stop_workers()` 来停止运行中的工作进程。
在调用此方法之前。
乐观地设置工作进程组的状态。
刚刚开始作为“HEALTHY”并委托实际监控
国家到 ``_monitor_workers()`` 方法
```python
# 输入文本
input_text = '"""'
# 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 假设的翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
role = 工作组规范的角色
logger.info("[%s] 正在 rendezvous 工作组", 角色)
# TODO 在停止工作者后,至少等待 monitor_interval*2,以便不同节点上的工作者在等待 rdzv 障碍前集体操作失败
# 在 rdzv 障碍上等待之前,确保节点进入 rdzv
# 的方式,这样我们就能确保节点进入 rdzv
大约在同一时间减少 rdzv 超时错误
self._rendezvous(worker_group)
logger.info("[%s] 开始工作组", role)
worker_ids = self._start_workers(worker_group)
for local_rank, w_id in worker_ids.items():
worker = worker_group.workers[local_rank]
worker.id = w_id
worker_group.state = WorkerState.HEALTHY
# pyre-fixme[56]: Pyre 无法推断装饰器的类型
`torch.distributed.elastic.metrics.prof`
[文档] @prof
def _restart_workers(self, worker_group: WorkerGroup) -> None:
重启(停止、会合、启动)组内所有本地工作者。
role = worker_group.spec.role
logger.info("[%s] 停止工作者组", role)
self._stop_workers(worker_group, is_restart=True)
worker_group.state = WorkerState.STOPPED
self._initialize_workers(worker_group)
# pyre-fixme[56]: Pyre 无法推断装饰器的类型
# `torch.distributed.elastic.metrics.prof`
@prof
定义 run(
自身,
角色:
字符串 =
默认角色) ->
运行结果:
开始时间 =
时间.
单调的()
调用了关机:
布尔类型 =
假
try:
结果 =
自身.
_调用运行(
角色)
自身.
_总执行时间 = int(
时间.
单调的() -
开始时间)
自身.
_记录指标(
结果)
自身.
_记录工作事件(
结果)
返回
结果
除了
遇见优雅退出错误
作为 e:
记录器.
信息(
"Rendezvous 优雅退出:"%s", e)
除了
信号异常
作为 e:
记录器.
警告(
"收到:"%s
死亡信号,关闭工作者", e.sigval)
自身.
关闭(e.sigval)
调用关闭 =
真实
抛出异常
最后:
如果 not shutdown_called:
自身.
关闭()
记录执行时间,以防运行过程中出现任何异常。
自身._total_execution_time = int(
时间.
单调的() -
开始时间)
定义
获取事件失败(
自身) ->
活动:
返回
自身.
构建事件(
状态=
失败,
源=
事件源.
代理,
原始错误=
跟踪回溯.format_exc(),
)
定义
事件成功获取(
自身) ->
活动:
返回
自身.
构建事件(
状态=
成功,
源=
事件源.
代理,
)
定义
记录工作者事件(
自身,
结果:
运行结果) ->
无:
为
工作者
在
自身.
工作者组.
工作者:
失败 =
结果.
失败次数.
获取(
工人.
全球排名)
状态:
字符串 =
自身.
获取工作者状态(
工人,
结果)
原始错误 = json.
压缩包(
失败.
错误文件数据)
如果
失败
否则 None
记录(
自身.
构造事件(
状态,
事件源.
工作者,
工人,
原始错误))
定义
获取工作状态(
自身,
工人:
工作者,
结果:
运行结果) ->
字符串:
失败 =
结果.
失败次数.
获取(
工人.
全球排名)
如果
结果.
状态
在 {
工作状态.
不健康,
工作状态.
失败}
和 not
失败:
# 工作员被 torchelastic 代理通过 SIGTERM 信号终止
返回
"已终止"
elif 失败
或
工人.
全球排名
在
结果.
返回值:
返回
结果.
状态.value
否则:
抛出异常 ValueError(f
"未知工作员:"{
工人.
全球排名}")
@contextmanager
定义
录制时长(
自身,
状态:
字符串):
开始时间 =
时间.
性能计数器()
try:
产生
最后:
结束时间 =
时间.
性能计数器()
持续时间(毫秒) = (
结束时间 -
开始时间) * 1000
记录(
自身.
构建事件(
状态=
状态,
源=
事件源.
代理,
持续时间(毫秒)=
持续时间(毫秒)
)
)
定义
构建事件(
自身,
状态:
字符串,
源:
事件源,
工人:
可选[
工人] =
无,
原始错误:
可选[
字符串] =
无,
持续时间(毫秒):
可选[float] =
无,
) -> 活动:
wg = 自身.
工作组
规范 = wg.
规范
md = {
"组世界大小": wg.
世界组大小,
入口点:
规格.
获取入口点名称(),
}
如果
工人:
md[本地排名] = (
工人.
本地排名,)
md["角色排名"] = (
工人.
角色排名,)
md["角色世界大小"] = (
工人.
角色世界大小,)
全球排名 =
工人.
全球排名
工作 ID =
字符串(
工人.id)
否则:
全球排名 = None
工作 ID = None
md_str = json.压缩包(md)
元数据 = {
运行_id:
规格.
rdzv 处理器.get_run_id(),
全局_rank:
全球排名,
"分组排名": wg.
群组排名,
"工作者 ID":
工作器 ID,
"角色":
规格.
角色,
主机名:
_获取完全限定主机名(),
状态:
状态,
总运行时间:
自身.
总执行时间,
rdzv 后端:
规格.
rdzv 处理器.
获取后端(),
原始错误:
原始错误,
"元数据":
md 字符串,
代理重启:
规格.
最大重启次数 -
自身.
剩余重启次数,
持续时间(毫秒):
持续时间(毫秒),
}
返回
活动(
ftorchelastic.worker.status.{
状态}",
源=
源,
元数据=
元数据
)
定义
记录指标(
自身,
分组结果:
运行结果):
是否失败 =
分组结果.
是否失败()
自身.
记录故障率指标(
是否失败)
规范 =
自身.
工作组.
规范
发生了重启 =
自身.
剩余重启次数 !=
规格.
最大重启次数
设置指标(f
"工作者."{
规格.
角色}
.运行总数", 1)
自身.
_根据条件记录指标(
"带重试的运行成功", not is_failed
和
重启发生
)
自身.
条件记录指标(
"运行成功无重试", not
失败
和 not
重启发生
)
自身.
条件记录指标(
重试后运行失败,
失败
和
发生了重启
)
自身.
条件记录指标(
运行失败无重试,
失败
和 not
发生了重启
)
定义
条件记录指标(
自身,
指标名称, condition):
规范 =
自身.
工作组.
规范
如果 condition:
上传指标(f
工作者。{
规格.
角色}.{
指标名称}", 1)
否则:
上传指标(f
工人{
规格.
角色}.{
指标名称}", 0)
定义
_记录故障性指标(
自身,
是否失败:
布尔类型 =
错误):
如果
是否失败:
故障性 = 100.0
否则:
规范 =
自身.
工作组.
规范
不稳定性 = 100.0 - 100.0 * (
自身.
剩余重启次数 + 1) / (
规格.
最大重启次数 + 1
)
规范 =
自身.
工作组.
规范
发布指标(f
"工作者。{
规格.
角色}
.不稳定性", int(
不稳定性))
定义
_调用运行(
自身,
角色:
字符串 =
默认角色) ->
运行结果:
# 目前仅适用于单个角色
规范 =
自身.
_工作组.
规范
角色 =
规格.
角色
记录器.
信息(
"[%s] 开始启动工作进程:entrypoint:%s",
角色,
规格.
获取 entrypoint 名称()
)
自身.
初始化工作者(
自身.
工作者组)
监控间隔 =
规格.
监控间隔
rdzv 处理器 =
规格.
rdzv 处理器
while True:
断言
自身.
_工作组.
状态 !=
工作状态.
初始化
时间.
睡眠(
监控间隔)
运行结果 =
自身.
_监控工作者(
自身.
_工作者组)
状态 =
运行结果.
状态
自身.
_工作组.
状态 =
状态
设置指标(f
工作者。{
角色}
.剩余重启次数,
自身.
_剩余重启次数)
设置指标(f
工人。{
角色}.{
状态.
名称.
小写()}", 1)
如果
状态 ==
工人状态.
成功:
记录器.
信息(
"[%s] 工作组成功完成。
"等待"%s
其他代理完成需要秒数。,
角色,
自身.
_退出屏障超时,
)
自身.
退出屏障()
返回
运行结果
elif 状态
在 {
工作状态.
不健康,
工作状态.
失败}:
如果
自身.
_剩余重启次数 > 0:
记录器.
信息(
"[%s] 工作组%s
.
"%s/%s剩余尝试次数;"
将重新启动工作组,
角色,
状态.
名称,
自身.
_剩余重启次数,
规格.
最大重启次数,
)
自身.
_剩余重启次数 -= 1
自身.
_重启工作进程(
自身.
_工作进程组)
否则:
自身.
_停止工作进程(
自身.
工作组)
自身.
工作组.
状态 =
工作状态.
失败
返回
运行结果
elif 状态 ==
工作状态.
健康:
# 成员变更不计入重试次数
等待节点数量 =
rdzv 处理器.
等待中的节点数()
组排名 =
自身.
_工作组.
组排名
如果
等待节点数 > 0:
记录器.
信息(
"[%s] 检测到%s "
"从 group_rank=新节点"%s
输入文本翻译为简体中文为:"; "
"将重启工作进程",
角色,
等待中的节点数,
群组排名,
)
自身.
_重启工作进程(
自身.
工作组)
否则:
抛出异常
异常(
# 无需注意:TRY002
f"[{角色}
] 工作组在{
状态.
名称}
状态"
)
[文档] def _exit_barrier(self):
"""
定义一个屏障,使代理进程在所有工作进程完成之前保持活动状态。
等待 `exit_barrier_timeout` 秒,直到所有代理完成
执行他们的本地工作者(无论成功与否)。这
作为一种安全防护,防止用户脚本在不同时间终止。
。
"""
logger.info(
"本地工作组完成(%s)。"
"等待%s 秒,其他代理完成。"
self._worker_group.state,
self._exit_barrier_timeout,
)
start = time.time()
try:
store_util.barrier(
store=self._store,
world_size=self._worker_group.group_world_size,
key_prefix=_TERMINAL_STATE_SYNC_ID,
barrier_timeout=self._exit_barrier_timeout,
)
logger.info(
"已完成等待其他代理。耗时:%s 秒",
time.time() - start,
)
except SignalException as e:
logger.warning("收到终止信号:%s", e.sigval)
raise
except Exception:
logger.exception(
"错误等待退出屏障。耗时:%s 秒"
当前时间与开始时间的差值
)