过期计时器 ¶
过期计时器与代理的同一进程设置,并在您的脚本中用于处理卡住的工人。当您进入可能卡住的代码块时,您可以获取一个过期计时器,该计时器指示计时器服务器在未在自设的过期截止日期前释放计时器时终止进程。
使用方法:
import torchelastic.timer as timer
import torchelastic.agent.server as agent
def main():
start_method = "spawn"
message_queue = mp.get_context(start_method).Queue()
server = timer.LocalTimerServer(message, max_interval=0.01)
server.start() # non-blocking
spec = WorkerSpec(
fn=trainer_func,
args=(message_queue,),
...<OTHER_PARAMS...>)
agent = agent.LocalElasticAgent(spec, start_method)
agent.run()
def trainer_func(message_queue):
timer.configure(timer.LocalTimerClient(message_queue))
with timer.expires(after=60): # 60 second expiry
# do some work
在上述示例中,如果 trainer_func
耗时超过 60 秒,则工作进程将被终止,并且代理会重试工作组。
客户端方法
- torch.distributed.elastic.timer.configure(timer_client)[source][source]¶
配置计时器客户端。必须在使用
expires
之前调用。
- torch.distributed.elastic.timer.expires(after, scope=None, client=None)[source][source]¶
获取一个倒计时计时器,从现在起
after
秒后过期,除非它包裹的代码块在规定时间内完成。当计时器过期时,此工作进程有资格被回收。"回收"的确切含义取决于客户端实现。在大多数情况下,回收意味着终止工作进程。请注意,工作进程不一定会在time.now() + after
时被回收,而是有资格被回收,并且客户端与TimerServer
的交互将最终决定何时以及如何回收过期计时器的工作进程。用法:
torch.distributed.elastic.timer.configure(LocalTimerClient()) with expires(after=10): torch.distributed.all_reduce(...)
服务器/客户端实现
以下是 torchelastic 提供的计时服务器和客户端对。
注意
计时服务器和客户端总是需要成对实现和使用,因为服务器和客户端之间存在消息协议。
以下是基于 multiprocess.Queue
实现的计时服务器和客户端对。
- class torch.distributed.elastic.timer.LocalTimerServer(mp_queue, max_interval=60, daemon=True)[source][source]¶
服务器与
LocalTimerClient
协同工作。客户端应作为运行此服务器的父进程的子进程。作业中的每个主机都应本地启动自己的计时器服务器,每个服务器实例管理本地工作者的计时器(在相同主机上的进程上运行)。
- class torch.distributed.elastic.timer.LocalTimerClient(mp_queue)[source][source]
LocalTimerServer
的客户端。此客户端旨在在同一主机上运行LocalTimerServer
,并使用 pid 唯一标识一个工作者。这在在具有多个 GPU 设备的主机上为每个 GPU 设备启动子进程(训练器)的情况下特别有用。
下面是基于命名管道实现的另一个计时器服务器和客户端对。
- class torch.distributed.elastic.timer.FileTimerServer(file_path, run_id, max_interval=10, daemon=True, log_event=None)[source][source]¶
与
FileTimerClient
.客户端预期运行在与运行此服务器的进程相同的宿主机上。作业中的每个宿主机都预期本地启动自己的定时器服务器,每个服务器实例管理本地工作者的定时器(运行在相同宿主机上的进程上)。- 参数:
file_path (str) – 字符串,要创建的 FIFO 特殊文件的路径。
max_interval (float) – float,每个看门狗循环的最大间隔(秒)。
daemon (bool) – bool,运行守护线程在守护模式或否。守护线程不会阻止进程停止。
log_event (Optional[Callable[[str, Optional[FileTimerRequest]], None]]) – Callable[[Dict[str, str]], None],一个可选的回调函数,用于以 JSON 格式记录事件。
- class torch.distributed.elastic.timer.FileTimerClient(file_path, signal=Signals.SIGKILL)[source][source]¶
客户端侧为
FileTimerServer
。此客户端旨在在运行FileTimerServer
的同一主机上使用,并使用 pid 唯一标识一个工作进程。此客户端使用命名管道将计时器请求发送到FileTimerServer
。此客户端是生产者,而FileTimerServer
是消费者。多个客户端可以与同一FileTimerServer
一起工作。- 参数:
file_path (str) – 字符串,FIFO 特殊文件的路径。
FileTimerServer
必须通过调用 os.mkfifo() 创建。signal – 信号,用于杀死进程的信号。使用负数或零信号将不会杀死进程。
编写自定义定时服务器/客户端
要编写自己的定时服务器和客户端,请扩展 torch.distributed.elastic.timer.TimerServer
用于服务器和 torch.distributed.elastic.timer.TimerClient
用于客户端。 TimerRequest
对象用于在服务器和客户端之间传递消息。
- class torch.distributed.elastic.timer.TimerRequest(worker_id, scope_id, expiration_time)[source][source]¶
数据对象表示倒计时计时器获取和释放,用于
TimerClient
和TimerServer
之间。负的expiration_time
应解释为“释放”请求。注意
worker_id
的类型是特定实现的。它就是 TimerServer 和 TimerClient 实现用来唯一标识工作者的标识。
- class torch.distributed.elastic.timer.TimerServer(request_queue, max_interval, daemon=True)[source][source]¶
监控活动计时器并在及时的方式下使它们过期。该服务器负责回收已过期的工人。
- abstract get_expired_timers(deadline)[source][source]¶
返回每个 worker_id 的所有已过期的定时器。已过期的定时器是指其过期时间小于或等于提供的截止时间的定时器。
- 返回类型:
dict[str, list[torch.distributed.elastic.timer.api.TimerRequest]]
- abstract register_timers(timer_requests)[source][source]
处理传入的定时器请求并将它们注册到服务器。定时器请求可以是获取定时器请求或释放定时器请求。具有负过期时间的定时器请求应解释为释放定时器请求。
- class torch.distributed.elastic.timer.TimerClient[source][source]¶
通过与 TimerServer 通信来获取和释放倒计时定时器的客户端库。
- abstract acquire(scope_id, expiration_time)[source][source]¶
根据 scope_id 和过期时间获取持有此客户端对象的 worker 的定时器。通常将定时器注册到 TimerServer。
- 抽象发布(scope_id)[来源][来源] ¶
释放该客户端所代表的 worker 上的
scope_id
计时器。调用此方法后,作用域上的倒计时计时器将不再生效。
调试信息记录 ¶
- torch.distributed.elastic.timer.debug_info_logging.log_debug_info_for_expired_timers(运行 ID, 过期计时器)[来源][来源] ¶