快捷键

torch.distributed.elastic.timer.local_timer 的源代码

# mypy: 允许未类型化定义
版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。
导入 记录日志
导入 多进程 作为 mp
导入 操作系统
导入 信号
导入 时间
来自 队列 导入 空的
来自 打字 导入 任何

来自 .api 导入 请求队列, Timer 客户端, 定时器请求, 定时服务器


__all__ = ["LocalTimerClient", "多进程请求队列", "本地定时器服务器"]

日志记录器 = 记录日志.获取日志记录器(__name__)


[文档]类 LocalTimerClient(TimerClient): ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 客户端为 `LocalTimerServer`。此客户端旨在使用 在同一台运行“LocalTimerServer”的主机上并使用 pid 用于唯一标识一个工作者。这在某些情况下特别有用。 在主机上为每个 GPU 启动一个子进程(训练器)的地方 GPU 设备。 """ def __init__(self, mp_queue): super().__init__() self._mp_queue = mp_queue def acquire(self, scope_id, expiration_time): pid = os.getpid() acquire_request = TimerRequest(pid, scope_id, expiration_time) self._mp_queue.put(acquire_request) def release(self, scope_id): pid = os.getpid() release_request = TimerRequest(pid, scope_id, -1) self._mp_queue.put(release_request)
MultiprocessingRequestQueue(请求队列): "" 基于 python 的 ``multiprocessing.Queue`` 的 ``RequestQueue`` ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` def 初始化(自身, mp_queue: mp.队列): 超级().初始化() 自身._mp_queue = mp_queue def 尺寸(自身) -> int: 返回 自身._mp_queue.qsize() def 获取(自身, 尺寸, 超时: float) -> 列表[定时器请求]: 请求 = [] 等待 = 超时 _ 范围(0, 尺寸): 开始 = 时间.时间() try: r = 自身._mp_queue.获取(=True, 超时=等待) 除了 空的: 断开 请求.追加(r) 等待 = 等待 - (时间.时间() - 开始) 如果 等待 0: 断开 返回 请求
[文档] 本地定时器服务器(定时器服务器): "" 与 `LocalTimerClient` 一起工作的服务器。客户端应被 子进程到运行此服务器的父进程。每个主机 在任务中预期将本地启动自己的计时器服务器,每个 服务器实例管理本地工作进程的计时器(运行在进程上) 在同一主机上)。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` def 初始化( 自身, 多进程队列: mp.队列, 最大间隔: 浮点数 = 60, 守护进程: 布尔类型 = 真实 ): 超级().初始化(多进程请求队列(多进程队列), 最大间隔, 守护进程) 自身._定时器: 字典[元组[任意, 字符串] 定时器请求] = {} def 注册计时器(自身, 计时器请求: 列表[定时器请求]) -> : 请求 计时器请求: 进程 ID = 请求.工作 ID 范围 ID = 请求.范围 ID 过期时间 = 请求.过期时间 # 负数过期时间是一个发布调用的代理 如果 过期时间 < 0: 自身._计时器.弹出((进程 ID, 范围 ID), ) 否则: 自身._定时器[进程 ID, 范围 ID] = 请求 def 清除定时器(自身, worker_ids: 集合[int]) -> : 进程 ID, scope_id 列表(自身._timers.()): 如果 进程 ID worker_ids: 自身._定时器.弹出((进程 ID, 范围 ID)) def 获取已过期的定时器(自身, 截止日期: float) -> 字典[任意, 列表[定时器请求]] # 进程 ID -> [定时器请求...] 过期定时器: 字典[任意, 列表[定时器请求]] = {} 请求 自身._定时器.(): 如果 请求.过期时间 截止日期: 过期作用域 = 过期计时器.setdefault(请求.工作器 ID, [] 过期作用域.追加(请求) 返回 过期计时器 def 收割者(自身, 工作器 ID: int) -> 布尔: try: 操作系统.终止(工作器 ID, 信号.SIGKILL) 返回 真实 除了 进程查找错误: 记录器.信息("进程 pid="%s该进程不存在。跳过, 工作器 ID) 返回 真实 除了 异常: 记录器.异常(错误终止 pid=%s", 工作器 ID) 返回 错误

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源