torch.distributed.elastic.timer.api 的源代码
# mypy: 允许未类型化定义
版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。
导入 abc
导入
记录日志
导入
线程
导入
时间
来自 contextlib
导入 contextmanager
来自
检查
导入
获取帧信息,
栈
来自
打字
导入
任意,
可选
__all__ = [
定时请求,
"定时客户端",
"请求队列",
"定时服务器",
"配置",
"过期",
]
日志记录器 =
记录日志.
获取日志记录器(__name__)
[文档]class TimerRequest:
"""
表示倒计时计时器获取和释放的数据对象
该对象用于 ``TimerClient`` 和 ``TimerServer`` 之间。
一个负的 `expiration_time` 应该被解释为“发布”
请求。
.. 注意:: `worker_id` 的类型是特定实现的。
它是 TimerServer 和 TimerClient 实现的任何内容
用于唯一标识一个工人的字段。
"""
__slots__ = ["worker_id", "scope_id", "expiration_time"]
def __init__(self, worker_id: Any, scope_id: str, expiration_time: float):
self.worker_id = worker_id
self.scope_id = scope_id
self.expiration_time = expiration_time
def __eq__(self, other):
如果 isinstance(other, TimerRequest):
返回 (
self.worker_id == other.worker_id
and self.scope_id == other.scope_id
并且 self.expiration_time 等于 other.expiration_time
)
返回 False
[文档]类 TimerClient(abc.ABC):
""
客户端库,通过通信获取和释放倒计时计时器。
与 TimerServer 通信。
""
[文档] @abc.abstractmethod
def acquire(self, scope_id: str, expiration_time: float) -> None:
"""
为持有此客户端对象的 worker 获取一个计时器
根据作用域 ID 和过期时间。通常与 TimerServer 注册定时器。
将定时器与 TimerServer 注册。
"""
[文档] @abc.abstractmethod
def release(self, scope_id: str): # 释放(self, 范围 ID: 字符串)
```python
# 输入文本
input_text = '"""'
# 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 假设的翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
释放工作器上 ``scope_id`` 的计时器
客户端表示。在此方法之后
范围内的倒计时计时器不再生效。
""
类
请求队列(abc.ABC):
""
持有计时器获取/释放请求的消费者队列
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
@abc.抽象方法
def 尺寸(
自身) -> int:
""
返回此方法调用时队列的大小。
请注意,在调用 `get` 方法时,队列的大小
可能已增加。队列的大小不应减少
直到调用“get”方法。也就是说,以下断言
应该包含:
大小 = q.size()
res = q.get(size, 超时=0)
断言大小 == res 的长度
-- 或 --
size = q.size()
res = q.get(size * 2, timeout=1)
assert size <= len(res) <= size * 2
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
@abc.抽象方法
def 获取(
自身,
尺寸: int,
超时: float) ->
列表[
定时器请求
]:
""
以阻塞方式获取最多 ``size`` 个定时器请求
(不超过 ``timeout`` 秒)。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
[文档]
类
定时器服务器(abc.ABC):
""
实体监控活动计时器并使它们过期
及时地。本服务器负责
收获已过期的计时器工作者。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 初始化(
自身,
请求队列:
请求队列,
最大间隔: float,
守护进程:
布尔类型 =
真实
):
""
param request_queue: 消费者 ``请求队列``
param max_interval: 等待最大时间(以秒为单位)
请求队列中的每个项目
param daemon: 是否以守护进程方式运行看门狗线程
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
超级().
初始化()
自身.
_请求队列 =
请求队列
自身.
最大间隔 =
最大间隔
自身.
_守护进程 =
守护进程
自身.
_看门狗线程:
可选[
线程.Thread] = None
自身.
_停车信号 =
假
[文档] @abc.抽象方法
def 注册计时器(self, timer_requests: list[TimerRequest]) -> None:
"""
处理传入的计时器请求并将它们注册到服务器。
计时器请求可以是获取计时器请求或释放计时器请求。
过期时间(expiration_time)为负的计时器请求应解释为释放计时器请求。
作为释放计时器请求处理。
"""
[文档] @abc.abstractmethod
def clear_timers(self, worker_ids: set[Any]) -> None:
"""
清除给定 `worker_ids` 的所有计时器。
"""
[文档] @abc.abstractmethod
def get_expired_timers(self, deadline: float) -> dict[str, list[TimerRequest]]:
"""
返回每个 worker_id 的所有已过期的定时器。
已过期的定时器是指其过期时间小于或等于
提供的截止日期的定时器。
"""
@abc.抽象方法
def _reap_worker(自身,
工作器 ID:
任意) ->
布尔:
""
重新回收指定的工作者。如果工作者成功回收,则返回 True,否则返回 False。如果发生任何未捕获的异常
则返回 False。如果发生任何未捕获的异常
该方法抛出异常,则认为工作进程已结束
并且所有相关定时器将被移除。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def _reap_worker_no_throw(自身,
工作器 ID:
任意) ->
布尔:
""
包装 `_reap_worker(worker_id)`,如果发生未捕获的异常
抛出后,它将工作进程视为已回收。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
try:
返回
自身._reap_worker(
工作器 ID)
除了
异常:
记录器.
异常(
"从 _reap_worker() 中抛出的未捕获异常,"
"检查实现是否正确捕获异常",
)
返回
真实
def _watchdog_loop(自身):
while not 自身._stop_signaled:
try:
自身._run_watchdog()
除了
异常:
记录器.
异常("Error running watchdog")
def _run_watchdog(自身):
批处理大小 =
最大值(1,
自身._request_queue.
尺寸())
timer_requests = 自身._request_queue.
获取(
批处理大小,
自身.
最大间隔)
自身.
注册计时器(
计时器请求)
现在 =
时间.
时间()
收割的 worker ID =
集合()
为
工作器 ID,
过期定时器
在
自身.
获取过期定时器(
现在).
项目():
记录器.
信息(
"回收工作进程_id=["%s
]. 过期定时器:%s",
工作器 ID,
自身.
获取作用域(
过期定时器),
)
如果
自身._reap_worker_no_throw(
工作器 ID):
记录器.
信息(
成功回收工作进程=[%s
]],
工作器 ID)
收割工人 ID.
添加(
工作器 ID)
否则:
记录器.
错误(
"错误:正在收割工人=["%s
]. 将在下次看门狗中重试。,
工作 ID
)
自身.
清除计时器(
收割工人 ID)
def _获取作用域(
自身,
计时请求):
返回 [r.
作用域 ID
为 r
在
定时请求]
def 开始(
自身) ->
无:
记录器.
信息(
开始%s
...最大间隔=%s
,守护进程=%s",
类型(
自身).__name__,
自身.
最大间隔,
自身.
守护进程,
)
自身.
监视线程 =
线程.Thread(
目标=
自身.
监视循环,
守护进程=
自身._daemon
)
记录器.
信息(
"启动看门狗线程...")
自身._watchdog_thread.
开始()
def 停止(
自身) ->
无:
记录器.
信息(
"停止"%s",
类型(
自身).__name__)
自身.
_停车信号 =
真实
如果
自身.
_看门狗线程:
记录器.
信息(
"正在停止看门狗线程...")
自身.
_看门狗线程.
加入(
自身.
最大间隔)
自身.
看门狗线程 = None
否则:
记录器.
信息(
没有运行看门狗线程,什么也不做)
计时器客户端:
可选[
Timer 客户端] = None
[文档]def configure(timer_client: Timer 客户端):
"""
配置定时客户端。必须在调用 ``expires`` 之前调用。
"""
global _timer_client
_timer_client = timer_client
logger.info("Timer client configured to: %s", type(_timer_client).__name__)
[文档]@contextmanager
def 过期(
after: float, 范围: Optional[str] = None, 客户端: Optional[TimerClient] = None
):
"""
从现在开始,获取一个将在 ``after`` 秒后到期的倒计时计时器,
除非它包裹的代码块在规定时间内完成。
当计时器到期时,这个工作进程有资格被回收。
“收割”的确切含义取决于客户端实现。在
大多数情况下,收割意味着终止工作进程。
请注意,工作进程并不保证在“time.now() + after”
“时间点”被收割,而是工作进程“有资格”被
收割并最终与客户端通信的 ``TimerServer``
决定何时以及如何收割已过期的计时器工人。
使用方法:
torch.distributed.elastic.timer.configure(LocalTimerClient())
with expires(after=10):
torch.distributed.all_reduce(...)
"""
if client is None:
如果_timer_client 为 None:
抛出 RuntimeError 异常("在使用倒计时计时器之前配置计时器客户端。")
client = _timer_client
如果 scope 为 None:
获取调用者文件和行号
caller = getframeinfo(stack()[1][0])
scope = f"{caller.filename}#{caller.lineno}"
expiration = time.time() + after
client.acquire(作用域, 过期时间)
try:
产生
finally:
客户端释放作用域