torch.distributed.elastic.timer.local_timer 的源代码
# mypy: 允许未类型化定义
版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。
导入
记录日志
导入
多进程
作为 mp
导入
操作系统
导入
信号
导入
时间
来自
队列
导入
空的
来自
打字
导入
任何
来自 .api
导入
请求队列,
Timer 客户端,
定时器请求,
定时服务器
__all__ = ["LocalTimerClient", "多进程请求队列",
"本地定时器服务器"]
日志记录器 =
记录日志.
获取日志记录器(__name__)
[文档]类 LocalTimerClient(TimerClient):
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
客户端为 `LocalTimerServer`。此客户端旨在使用
在同一台运行“LocalTimerServer”的主机上并使用
pid 用于唯一标识一个工作者。这在某些情况下特别有用。
在主机上为每个 GPU 启动一个子进程(训练器)的地方
GPU 设备。
"""
def __init__(self, mp_queue):
super().__init__()
self._mp_queue = mp_queue
def acquire(self, scope_id, expiration_time):
pid = os.getpid()
acquire_request = TimerRequest(pid, scope_id, expiration_time)
self._mp_queue.put(acquire_request)
def release(self, scope_id):
pid = os.getpid()
release_request = TimerRequest(pid, scope_id, -1)
self._mp_queue.put(release_request)
类 MultiprocessingRequestQueue(
请求队列):
""
基于 python 的 ``multiprocessing.Queue`` 的 ``RequestQueue``
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 初始化(
自身, mp_queue: mp.
队列):
超级().
初始化()
自身._mp_queue = mp_queue
def 尺寸(
自身) -> int:
返回
自身._mp_queue.qsize()
def 获取(
自身,
尺寸,
超时: float) ->
列表[
定时器请求
]:
请求 = []
等待 =
超时
为 _
在
范围(0,
尺寸):
开始 =
时间.
时间()
try:
r = 自身._mp_queue.
获取(
块=True,
超时=
等待)
除了
空的:
断开
请求.
追加(r)
等待 =
等待 - (
时间.
时间() -
开始)
如果
等待
≤ 0:
断开
返回
请求
[文档]
类
本地定时器服务器(
定时器服务器):
""
与 `LocalTimerClient` 一起工作的服务器。客户端应被
子进程到运行此服务器的父进程。每个主机
在任务中预期将本地启动自己的计时器服务器,每个
服务器实例管理本地工作进程的计时器(运行在进程上)
在同一主机上)。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 初始化(
自身,
多进程队列: mp.
队列,
最大间隔:
浮点数 = 60,
守护进程:
布尔类型 =
真实
):
超级().
初始化(
多进程请求队列(
多进程队列),
最大间隔,
守护进程)
自身.
_定时器:
字典[
元组[
任意,
字符串
]
定时器请求] = {}
def 注册计时器(
自身,
计时器请求:
列表[
定时器请求]) ->
无:
为
请求
在
计时器请求:
进程 ID =
请求.
工作 ID
范围 ID =
请求.
范围 ID
过期时间 =
请求.
过期时间
# 负数过期时间是一个发布调用的代理
如果
过期时间 < 0:
自身.
_计时器.
弹出((
进程 ID,
范围 ID),
无)
否则:
自身.
_定时器
[
进程 ID,
范围 ID
] =
请求
def 清除定时器(
自身, worker_ids:
集合[int]) ->
无:
为
进程 ID, scope_id
在
列表(
自身._timers.
键()):
如果
进程 ID
在 worker_ids:
自身.
_定时器.
弹出((
进程 ID,
范围 ID))
def 获取已过期的定时器(
自身,
截止日期: float) ->
字典[
任意,
列表[
定时器请求
]]
# 进程 ID -> [定时器请求...]
过期定时器:
字典[
任意,
列表[
定时器请求]] = {}
为
请求
在
自身.
_定时器.
值():
如果
请求.
过期时间
≤
截止日期:
过期作用域 =
过期计时器.setdefault(
请求.
工作器 ID,
[]
过期作用域.
追加(
请求)
返回
过期计时器
def 收割者(
自身,
工作器 ID: int) ->
布尔:
try:
操作系统.
终止(
工作器 ID,
信号.SIGKILL)
返回
真实
除了
进程查找错误:
记录器.
信息(
"进程 pid="%s
该进程不存在。跳过,
工作器 ID)
返回
真实
除了
异常:
记录器.
异常(
错误终止 pid=%s",
工作器 ID)
返回
错误