快捷键

torch.distributed.elastic.timer.api 的源代码

# mypy: 允许未类型化定义
版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。
导入 abc
导入 记录日志
导入 线程
导入 时间
来自 contextlib 导入 contextmanager
来自 检查 导入 获取帧信息, 
来自 打字 导入 任意, 可选


__all__ = [
    定时请求,
    "定时客户端",
    "请求队列",
    "定时服务器",
    "配置",
    "过期",
]

日志记录器 = 记录日志.获取日志记录器(__name__)


[文档]class TimerRequest: """ 表示倒计时计时器获取和释放的数据对象 该对象用于 ``TimerClient`` 和 ``TimerServer`` 之间。 一个负的 `expiration_time` 应该被解释为“发布” 请求。 .. 注意:: `worker_id` 的类型是特定实现的。 它是 TimerServer 和 TimerClient 实现的任何内容 用于唯一标识一个工人的字段。 """ __slots__ = ["worker_id", "scope_id", "expiration_time"] def __init__(self, worker_id: Any, scope_id: str, expiration_time: float): self.worker_id = worker_id self.scope_id = scope_id self.expiration_time = expiration_time def __eq__(self, other): 如果 isinstance(other, TimerRequest): 返回 ( self.worker_id == other.worker_id and self.scope_id == other.scope_id 并且 self.expiration_time 等于 other.expiration_time ) 返回 False
[文档]类 TimerClient(abc.ABC): "" 客户端库,通过通信获取和释放倒计时计时器。 与 TimerServer 通信。 ""
[文档] @abc.abstractmethod def acquire(self, scope_id: str, expiration_time: float) -> None: """ 为持有此客户端对象的 worker 获取一个计时器 根据作用域 ID 和过期时间。通常与 TimerServer 注册定时器。 将定时器与 TimerServer 注册。 """
[文档] @abc.abstractmethod def release(self, scope_id: str): # 释放(self, 范围 ID: 字符串) ```python # 输入文本 input_text = '"""' # 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API) def translate_to_simplified_chinese(text): # 假设的翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 释放工作器上 ``scope_id`` 的计时器 客户端表示。在此方法之后 范围内的倒计时计时器不再生效。 ""
请求队列(abc.ABC): "" 持有计时器获取/释放请求的消费者队列 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` @abc.抽象方法 def 尺寸(自身) -> int: "" 返回此方法调用时队列的大小。 请注意,在调用 `get` 方法时,队列的大小 可能已增加。队列的大小不应减少 直到调用“get”方法。也就是说,以下断言 应该包含: 大小 = q.size() res = q.get(size, 超时=0) 断言大小 == res 的长度 -- 或 -- size = q.size() res = q.get(size * 2, timeout=1) assert size <= len(res) <= size * 2 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` @abc.抽象方法 def 获取(自身, 尺寸: int, 超时: float) -> 列表[定时器请求]: "" 以阻塞方式获取最多 ``size`` 个定时器请求 (不超过 ``timeout`` 秒)。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ```
[文档] 定时器服务器(abc.ABC): "" 实体监控活动计时器并使它们过期 及时地。本服务器负责 收获已过期的计时器工作者。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` def 初始化( 自身, 请求队列: 请求队列, 最大间隔: float, 守护进程: 布尔类型 = 真实 ): "" param request_queue: 消费者 ``请求队列`` param max_interval: 等待最大时间(以秒为单位) 请求队列中的每个项目 param daemon: 是否以守护进程方式运行看门狗线程 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 超级().初始化() 自身._请求队列 = 请求队列 自身.最大间隔 = 最大间隔 自身._守护进程 = 守护进程 自身._看门狗线程: 可选[线程.Thread] = None 自身._停车信号 =
[文档] @abc.抽象方法 def 注册计时器(self, timer_requests: list[TimerRequest]) -> None: """ 处理传入的计时器请求并将它们注册到服务器。 计时器请求可以是获取计时器请求或释放计时器请求。 过期时间(expiration_time)为负的计时器请求应解释为释放计时器请求。 作为释放计时器请求处理。 """
[文档] @abc.abstractmethod def clear_timers(self, worker_ids: set[Any]) -> None: """ 清除给定 `worker_ids` 的所有计时器。 """
[文档] @abc.abstractmethod def get_expired_timers(self, deadline: float) -> dict[str, list[TimerRequest]]: """ 返回每个 worker_id 的所有已过期的定时器。 已过期的定时器是指其过期时间小于或等于 提供的截止日期的定时器。 """
@abc.抽象方法 def _reap_worker(自身
, 工作器 ID: 任意) -> 布尔: "" 重新回收指定的工作者。如果工作者成功回收,则返回 True,否则返回 False。如果发生任何未捕获的异常 则返回 False。如果发生任何未捕获的异常 该方法抛出异常,则认为工作进程已结束 并且所有相关定时器将被移除。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` def _reap_worker_no_throw(自身, 工作器 ID: 任意) -> 布尔: "" 包装 `_reap_worker(worker_id)`,如果发生未捕获的异常 抛出后,它将工作进程视为已回收。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` try: 返回 自身._reap_worker(工作器 ID) 除了 异常: 记录器.异常( "从 _reap_worker() 中抛出的未捕获异常," "检查实现是否正确捕获异常", ) 返回 真实 def _watchdog_loop(自身): while not 自身._stop_signaled: try: 自身._run_watchdog() 除了 异常: 记录器.异常("Error running watchdog") def _run_watchdog(自身): 批处理大小 = 最大值(1, 自身._request_queue.尺寸()) timer_requests = 自身._request_queue.获取(批处理大小, 自身.最大间隔) 自身.注册计时器(计时器请求) 现在 = 时间.时间() 收割的 worker ID = 集合() 工作器 ID, 过期定时器 自身.获取过期定时器(现在).项目(): 记录器.信息( "回收工作进程_id=["%s]. 过期定时器:%s", 工作器 ID, 自身.获取作用域(过期定时器), ) 如果 自身._reap_worker_no_throw(工作器 ID): 记录器.信息(成功回收工作进程=[%s]], 工作器 ID) 收割工人 ID.添加(工作器 ID) 否则: 记录器.错误( "错误:正在收割工人=["%s]. 将在下次看门狗中重试。, 工作 ID ) 自身.清除计时器(收割工人 ID) def _获取作用域(自身, 计时请求): 返回 [r.作用域 ID r 定时请求] def 开始(自身) -> : 记录器.信息( 开始%s...最大间隔=%s,守护进程=%s", 类型(自身).__name__, 自身.最大间隔, 自身.守护进程, ) 自身.监视线程 = 线程.Thread( 目标=自身.监视循环, 守护进程=自身._daemon ) 记录器.信息("启动看门狗线程...") 自身._watchdog_thread.开始() def 停止(自身) -> : 记录器.信息("停止"%s", 类型(自身).__name__) 自身._停车信号 = 真实 如果 自身._看门狗线程: 记录器.信息("正在停止看门狗线程...") 自身._看门狗线程.加入(自身.最大间隔) 自身.看门狗线程 = None 否则: 记录器.信息(没有运行看门狗线程,什么也不做)
计时器客户端: 可选[Timer 客户端] = None
[文档]def configure(timer_client: Timer 客户端): """ 配置定时客户端。必须在调用 ``expires`` 之前调用。 """ global _timer_client _timer_client = timer_client logger.info("Timer client configured to: %s", type(_timer_client).__name__)
[文档]@contextmanager def 过期( after: float, 范围: Optional[str] = None, 客户端: Optional[TimerClient] = None ): """ 从现在开始,获取一个将在 ``after`` 秒后到期的倒计时计时器, 除非它包裹的代码块在规定时间内完成。 当计时器到期时,这个工作进程有资格被回收。 “收割”的确切含义取决于客户端实现。在 大多数情况下,收割意味着终止工作进程。 请注意,工作进程并不保证在“time.now() + after” “时间点”被收割,而是工作进程“有资格”被 收割并最终与客户端通信的 ``TimerServer`` 决定何时以及如何收割已过期的计时器工人。 使用方法: torch.distributed.elastic.timer.configure(LocalTimerClient()) with expires(after=10): torch.distributed.all_reduce(...) """ if client is None: 如果_timer_client 为 None: 抛出 RuntimeError 异常("在使用倒计时计时器之前配置计时器客户端。") client = _timer_client 如果 scope 为 None: 获取调用者文件和行号 caller = getframeinfo(stack()[1][0]) scope = f"{caller.filename}#{caller.lineno}" expiration = time.time() + after client.acquire(作用域, 过期时间) try: 产生 finally: 客户端释放作用域

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源