# mypy: 允许未类型化定义
# 版权(c)Meta Platforms,Inc.及其关联公司。
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。
导入
输入/输出
导入 json
导入
操作系统
导入
选择
导入
信号
导入
系统
导入
线程
导入
时间
来自
打字
导入
可调用,
可选
来自 torch.distributed.elastic.timer.api
导入
Timer 客户端, TimerRequest
来自 torch.distributed.elastic.timer.debug_info_logging
导入 (
记录已过期计时器的调试信息,
)
来自
torch.distributed.elastic.utils.日志
导入
获取日志记录器
__all__ = ["文件计时器客户端",
"文件计时器请求",
文件计时服务器]
日志记录器 =
获取日志记录器(__name__)
def 重试(
最大重试次数: int,
睡眠时间: float) ->
可调用:
""
一个简单的重试包装器。
参数:
max_retries: int, 重试的最大次数。
sleep_time: float, 重试之间的睡眠时间。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 包装器(
函数:
可调用) ->
可调用:
def 包装器(*
参数, **kwargs):
为 i
在
范围(max_retries):
try:
返回
函数(*
参数, **kwargs)
除了
异常:
记录器.
异常(
"运行错误"%s
.正在重试...,
函数.__name__)
如果 i <
最大重试次数 - 1:
时间.
睡眠(
睡眠时间)
否则:
提升
返回
包装器
返回
包装器
类
文件计时器请求(
定时器请求):
""
表示倒计时计时器获取和释放的数据对象
该对象用于`FileTimerClient`和`FileTimerServer`之间。
负的`expiration_time`应解释为“释放”。
请求。
``信号`` 是从服务器回收工作进程的信号。
进程。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
__slots__ = [版本, "worker_pid",
范围 ID,
过期时间,
信号]
def 初始化(
自身,
工作进程 ID: int,
范围 ID:
字符串,
过期时间: float,
信号:
整型 = 0
) -> 无:
自身.
版本 = 1
自身.
工作进程 ID =
工作进程 ID
自身.
范围 ID =
范围 ID
自身.
过期时间 =
过期时间
自身.
信号 =
信号
def __等于__(
自身,
其他) ->
布尔:
如果 isinstance(
其他,
文件计时请求):
返回 (
自身.
版本 ==
其他.
版本
和
自身.
工作进程 ID ==
其他.
工作进程 ID
和
自身.
范围 ID ==
其他.
范围 ID
和
自身.
过期时间 ==
其他.
过期时间
和
自身.
信号 ==
其他.
信号
)
返回
假
def 转换为 JSON(
自身) ->
字符串:
返回 json.
压缩包(
{
版本:
自身.
版本,
"进程 ID":
自身.worker_pid,
"scope_id": 自身.scope_id,
"expiration_time": 自身.
过期时间,
信号:
自身.
信号,
},
)
[文档]
类
文件计时器客户端(
Timer 客户端):
""
``文件计时器服务器`` 的客户端。此客户端旨在在同一主机上使用
运行 ``文件计时器服务器`` 的主机上,并使用
pid 来唯一标识一个工作进程。
该客户端使用命名管道发送计时器请求。
``FileTimerServer``。该客户端是生产者,而``FileTimerServer``是消费者。
多个客户端可以与同一个``FileTimerServer``一起工作。
``FileTimerServer``。
参数:
文件路径:str,FIFO 特殊文件的路径。``FileTimerServer``
必须通过调用 os.mkfifo()创建它。
信号:signal,用于杀死进程的信号。使用负数或零信号将不会杀死进程。
信号:signal,用于杀死进程的信号。使用负数或零信号将不会杀死进程。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 初始化(
自身,
文件路径:
字符串,
信号=(
信号.SIGKILL
如果
系统.
平台 !=
win32
否则
信号.CTRL_C_EVENT),
# 类型: 忽略[attr-defined]
) -> 无:
超级().
初始化()
自身.
文件路径 =
文件路径
自身.
信号 =
信号
@_retry(最大重试次数=10,
睡眠时间=0.1)
def _非阻塞打开(
自身) ->
可选[
输入/输出.TextIOWrapper
]:
# 服务器可能已崩溃或尚未启动。
# 在这种情况下,在阻塞模式下调用 open() 将阻塞客户端。
避免此类问题,请以非阻塞模式打开,如果服务器不存在,将引发 OSError
如果服务器不存在,将引发 OSError
fd = 操作系统.
打开(
自身.
文件路径,
操作系统.O_WRONLY |
操作系统.
非阻塞)
返回
操作系统.fdopen(fd, "wt")
def 发送请求(
自身,
请求:
文件计时器请求) ->
无:
try:
文件 =
自身.
非阻塞打开()
除了
异常
作为 e:
提升
破坏管道错误(
"无法发送文件计时器请求,因为文件计时器服务器不可用。"
) 来自 e
与
文件:
json 请求 =
请求.
转换为 JSON()
使用 select.PIPE_BUF 作为最大长度写入请求可保证原子性。
如果
长度(
json 请求) >
选择.PIPE_BUF:
提升
运行时错误(
f"文件定时请求大于"{
选择.PIPE_BUF}
字节
f不支持:{
json 请求}"
)
文件.
写(
json 请求 + "
输入文本翻译为简体中文为:\n")
def 获取(
自身,
范围 ID:
字符串,
过期时间: float) ->
无:
自身.
发送请求(
请求=
文件计时器请求(
工作进程 ID=
操作系统.
获取进程 ID(),
范围 ID=
范围 ID,
过期时间=
过期时间,
信号=
自身.
信号,
),
)
def 释放(
自身,
范围 ID:
字符串) ->
无:
自身.
发送请求(
请求=
文件计时器请求(
工作进程 ID=
操作系统.
获取进程 ID(),
范围 ID=
范围 ID,
过期时间=-1,
信号=0
),
)
[文档]
类
文件定时服务器:
""
与 `FileTimerClient` 一起工作的服务器。客户端应运行在与运行此服务器的进程相同的宿主机上。
每个作业中的宿主机都应启动自己的定时器服务器,并且每个服务器实例管理本地工作者(在本地运行的)的定时器。
每个作业中的宿主机都应启动自己的定时器服务器,并且每个服务器实例管理本地工作者(在本地运行的)的定时器。
并且每个服务器实例管理本地工作者(在本地运行的)的定时器。
同一主机上的进程)。
参数:
file_path: str, 要创建的 FIFO 特殊文件的路径。
max_interval: float, 每个看门狗循环的最大间隔(秒)。
daemon: bool, 是否以守护进程模式运行看门狗线程。
守护线程不会阻止进程停止。
log_event: 可调用函数[[Dict[str, str]], None],一个可选的回调用于
以 JSON 格式记录事件。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 初始化(
自身,
文件路径:
字符串,
run_id: 字符串,
最大间隔:
浮点数 = 10,
守护进程:
布尔类型 = True,
log_event: 可选[
可调用[[
字符串,
可选[
文件计时器请求]],
无]] =
无,
) -> 无:
自身.
_文件路径 =
文件路径
自身.
_运行 ID =
运行 ID
自身.
_最大间隔 =
最大间隔
自身._daemon =
守护进程
自身._timers:
字典[
元组[int,
字符串
]
文件计时器请求] = {}
自身.
_停车信号 =
假
自身.
_看门狗线程:
可选[
线程.Thread] = None
自身._is_client_started =
假
如果
操作系统.
路径.
存在(
自身.
文件路径):
操作系统.
删除(
自身.
文件路径)
操作系统.
创建命名管道(
自身.
文件路径)
仅作测试。统计接收到的请求数量。
自身._request_count = 0
仅作测试。处理所有请求并停止服务器。
自身._run_once =
假
自身.
记录事件 = (
log_event 如果 log_event
是 not None
否则 lambda
名称,
请求: None
)
自身.
最后一次进度时间 = int(
时间.
时间())
def 开始(
自身) ->
无:
记录器.
信息(
开始%s
...最大间隔=%s
,守护进程=%s
文件路径=%s",
类型(
自身).__name__,
自身.
最大间隔,
自身.
守护进程,
自身.
文件路径_,
)
自身.
看门狗线程 =
线程.Thread(
目标=
自身.
监视狗循环,
守护进程=
自身._daemon
)
记录器.
信息(
"启动看门狗线程...")
自身.
_看门狗线程.
开始()
自身._log_event(
"看门狗启动",
无)
def 停止(
自身) ->
无:
记录器.
信息(
停止%s",
类型(
自身).__name__)
自身.
_停车信号 =
真实
如果
自身.
_看门狗线程:
记录器.
信息(
"正在停止看门狗线程...")
自身.
_看门狗线程.
加入(
自身.
最大间隔)
自身.
看门狗线程 = None
否则:
记录器.
信息(
没有运行看门狗线程,什么也不做)
如果
操作系统.
路径.
存在(
自身.
文件路径):
操作系统.
删除(
自身.
文件路径)
自身.
日志事件(
监视器已停止,
无)
def 运行一次(
自身) ->
无:
自身.
__运行一次 =
真实
如果
自身.
_看门狗线程:
记录器.
信息(
"正在停止看门狗线程...")
自身.
_看门狗线程.
加入()
自身.
看门狗线程 = None
否则:
记录器.
信息(
没有运行看门狗线程,什么也不做)
如果
操作系统.
路径.
存在(
自身.
_文件路径):
操作系统.
删除(
自身.
_文件路径)
@staticmethod
def 进程是否正在运行(
进程 ID: int):
""
检查进程是否正在运行的函数
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
try:
# 检查进程是否存在并且我们可以向其发送信号
操作系统.
终止(
进程 ID, 0)
返回
真实
除了 OSError:
返回
假
def _看门狗循环(
自身) ->
无:
在阻塞模式下打开管道会阻塞服务器线程。
这是因为以下原因:
1. 客户端案例通常不会发生。
2. 我们正在单独的守护进程中运行看门狗循环。
# 线程,不会阻塞进程停止。
try:
fd = 打开(
自身.
_文件路径)
除了
异常:
记录器.
异常(
"无法打开 FileTimerServer 管道")
提升
与 fd:
自身.
_客户端是否已启动 =
真实
while not 自身.
停止信号:
try:
运行一次 =
自身.
_运行一次
自身.
_运行看门狗(fd)
如果
运行一次:
断开
自身.
上次进度时间 = int(
时间.
时间())
除了
异常:
记录器.
异常(
运行看门狗时出错)
def 运行看门狗(
自身, fd:
输入/输出.
文本 I/O 包装器) ->
无:
计时请求 =
自身.
_获取请求(fd,
自身.
最大间隔)
自身.
注册计时器(
计时器请求)
现在 =
时间.
时间()
收割的进程 ID =
集合()
杀死进程 =
假
收割信号 = 0
所有已过期的计时器 =
自身.
获取已过期的计时器(
现在)
记录过期计时器的调试信息(
自身.
_运行_id,
{
进程 ID: [
已过期计时器.to_json()
为 expired_timer
在 expired_timers]
为
进程 ID, expired_timers
在
所有已过期的定时器.
项目()
},
)
为
工作进程 ID,
已过期的定时器
在
所有已过期的定时器.
项目():
记录器.
信息(
"收割 worker_pid="%s
]. 过期定时器:%s",
worker_pid,
自身._get_scopes(
过期定时器),
)
收割的 worker 进程 ID.
添加(
worker 进程 ID)
# 如果存在多个过期定时器,我们找到第一个定时器
有效信号(>0)的到期时间顺序。
过期定时器.
排序(
键=lambda
定时器:
定时器.
过期时间)
信号 = 0
已过期计时器 = None
为
计时器
在
已过期计时器列表:
自身.
记录事件(
计时器已过期,
计时器)
如果
计时器.
信号 > 0:
信号 =
计时器.
信号
已过期的计时器 =
计时器
断开
如果
信号
≤ 0:
记录器.
信息(
未指定与 worker=[相关的信号%s
不要收割它。, worker_pid
)
继续
如果
自身._reap_worker(worker_pid,
信号):
记录器.
信息(
"成功收割了 worker="%s
]
with signal=%s", worker_pid, 信号
)
自身._log_event(
"终止工作进程",
过期定时器)
终止进程 =
真实
回收信号 =
信号
否则:
记录器.
错误(
"回收工作进程失败 worker=["%s
]. 将在下次看门狗中重试。,
worker 进程 ID,
)
如果
终止进程
和
收割信号 > 0:
记录器.
信息(
"终止服务器进程=["%s
] 因超时定时器,
操作系统.
获取进程 ID(),
)
自身._reap_worker(
操作系统.
获取进程 ID(),
收割信号)
自身.
清除定时器(
收割的进程 ID)
def 获取作用域(
自身,
计时器请求:
列表[
文件计时器请求]) ->
列表[
字符串
]:
返回 [r.
范围 ID
为 r
在
计时器请求]
def 获取请求(
自身, fd:
输入/输出.TextIOWrapper,
最大间隔:
浮点数
) -> 列表[
文件计时请求
]:
开始 =
时间.
时间()
请求 = []
while not 自身.
_停车信号
或
自身._run_once:
对于命名管道,当至少有一个写入者打开时,readline() 是阻塞的。
它仅在写入端调用 flush() 时返回。
注意,flush() 在 close() 内部自动调用。
在最后一个写入者关闭后,readline() 不会阻塞。
当到达文件末尾时将返回空字符串。
由于客户端始终打开管道,写入消息然后立即关闭管道,
因此下面的 readline()调用不会长时间阻塞。
json 请求 = fd.
读取行()
如果
长度(
JSON 请求) == 0:
如果
自身.
_只运行一次:
断开
时间.
睡眠(
最小(
最大间隔, 1))
否则:
请求 = json.loads(
JSON 请求)
进程 ID =
请求[
"进程 ID"]
范围 ID =
请求[
"范围 ID"]
过期时间 =
请求[
"过期时间"]
信号 =
请求[
信号]
请求.
追加(
文件定时请求(
工作进程 ID=
进程 ID,
范围 ID=
范围 ID,
过期时间=
过期时间,
信号=
信号,
)
)
现在 =
时间.
时间()
如果
现在 -
开始 >
最大间隔:
断开
返回
请求
def 注册计时器(
自身,
计时器请求:
列表[
文件计时器请求]) ->
无:
为
请求
在
计时请求:
进程 ID =
请求.
工作进程 ID
范围 ID =
请求.
范围 ID
过期时间 =
请求.
过期时间
自身.
请求次数 += 1
key = (进程 ID, scope_id)
# 负过期时间是一个发布调用的代理
如果
过期时间 < 0:
如果 key
在
自身.
_定时器:
删除
自身.
_计时器[
键]
否则:
自身.
_定时器[
键] =
请求
def 清除定时器(
自身,
工作进程 ID:
集合[int]) ->
无:
为
进程 ID,
范围 ID
在
列表(
自身.
_计时器.
键()):
如果
进程 ID
在
工作进程 ID
或 not
文件计时服务器.
进程是否正在运行(
进程 ID):
删除
自身.
_计时器
[
进程 ID,
范围 ID
]
def 获取已过期的计时器(
自身,
截止日期: float) ->
字典[int,
列表[
文件计时器请求
]]
# 进程 ID -> [计时器请求...]
过期计时器:
字典[int,
列表[
文件计时器请求]] = {}
为
请求
在
自身.
_计时器.
值():
如果
请求.
过期时间
≤
截止日期:
已过期作用域 =
过期定时器.setdefault(
请求.
工作进程 ID,
[]
过期作用域.
追加(
请求)
返回
过期定时器
def 收割工人(
自身, worker_pid: int,
信号: int) ->
布尔:
try:
操作系统.
终止(worker_pid,
信号)
返回
真实
除了
进程查找错误:
记录器.
信息(
"进程 pid="%s
不存在。跳过, worker_pid)
返回
真实
除了
异常:
记录器.
异常(
进程终止错误 pid=%s",
工作进程 pid)
返回
假
def 获取最后进度时间(
自身) -> int:
返回
自身.
_最后进度时间
如果
自身.
_is_client_started
是客户端启动了 否则 int(
时间.
时间())