快捷键

torch.distributed.elastic.timer.file_based_local_timer 的源代码

# mypy: 允许未类型化定义
# 版权(c)Meta Platforms,Inc.及其关联公司。
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。

导入 输入/输出
导入 json
导入 操作系统
导入 选择
导入 信号
导入 系统
导入 线程
导入 时间
来自 打字 导入 可调用, 可选

来自 torch.distributed.elastic.timer.api 导入 Timer 客户端, TimerRequest
来自 torch.distributed.elastic.timer.debug_info_logging 导入 (
    记录已过期计时器的调试信息,
)
来自 torch.distributed.elastic.utils.日志 导入 获取日志记录器


__all__ = ["文件计时器客户端", "文件计时器请求", 文件计时服务器]

日志记录器 = 获取日志记录器(__name__)


def 重试(最大重试次数: int, 睡眠时间: float) -> 可调用:
    ""
一个简单的重试包装器。

参数:
max_retries: int, 重试的最大次数。
sleep_time: float, 重试之间的睡眠时间。
```python
# 假设输入文本为:
input_text = '"""'

# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
    # 这里应该调用真实的翻译 API 进行翻译
    # 由于示例中不使用真实的 API,以下为模拟翻译结果
    return text

# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```

    def 包装器(函数: 可调用) -> 可调用:
        def 包装器(*参数, **kwargs):
             i  范围(max_retries):
                try:
                    返回 函数(*参数, **kwargs)
                除了 异常:
                    记录器.异常("运行错误"%s.正在重试..., 函数.__name__)
                    如果 i < 最大重试次数 - 1:
                        时间.睡眠(睡眠时间)
                    否则:
                        提升

        返回 包装器

    返回 包装器


 文件计时器请求(定时器请求):
    ""
表示倒计时计时器获取和释放的数据对象
该对象用于`FileTimerClient`和`FileTimerServer`之间。
负的`expiration_time`应解释为“释放”。
请求。
``信号`` 是从服务器回收工作进程的信号。
进程。
```python
# 假设输入文本为:
input_text = '"""'

# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
    # 这里应该调用真实的翻译 API 进行翻译
    # 由于示例中不使用真实的 API,以下为模拟翻译结果
    return text

# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```

    __slots__ = [版本, "worker_pid", 范围 ID, 过期时间, 信号]

    def 初始化(
        自身, 工作进程 ID: int, 范围 ID: 字符串, 过期时间: float, 信号: 整型 = 0
    ) -> :
        自身.版本 = 1
        自身.工作进程 ID = 工作进程 ID
        自身.范围 ID = 范围 ID
        自身.过期时间 = 过期时间
        自身.信号 = 信号

    def __等于__(自身, 其他) -> 布尔:
        如果 isinstance(其他, 文件计时请求):
            返回 (
                自身.版本 == 其他.版本
                 自身.工作进程 ID == 其他.工作进程 ID
                 自身.范围 ID == 其他.范围 ID
                 自身.过期时间 == 其他.过期时间
                 自身.信号 == 其他.信号
            )
        返回 

    def 转换为 JSON(自身) -> 字符串:
        返回 json.压缩包(
            {
                版本: 自身.版本,
                "进程 ID": 自身.worker_pid,
                "scope_id": 自身.scope_id,
                "expiration_time": 自身.过期时间,
                信号: 自身.信号,
            },
        )


[文档] 文件计时器客户端(Timer 客户端): "" ``文件计时器服务器`` 的客户端。此客户端旨在在同一主机上使用 运行 ``文件计时器服务器`` 的主机上,并使用 pid 来唯一标识一个工作进程。 该客户端使用命名管道发送计时器请求。 ``FileTimerServer``。该客户端是生产者,而``FileTimerServer``是消费者。 多个客户端可以与同一个``FileTimerServer``一起工作。 ``FileTimerServer``。 参数: 文件路径:str,FIFO 特殊文件的路径。``FileTimerServer`` 必须通过调用 os.mkfifo()创建它。 信号:signal,用于杀死进程的信号。使用负数或零信号将不会杀死进程。 信号:signal,用于杀死进程的信号。使用负数或零信号将不会杀死进程。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` def 初始化( 自身, 文件路径: 字符串, 信号=(信号.SIGKILL 如果 系统.平台 != win32 否则 信号.CTRL_C_EVENT), # 类型: 忽略[attr-defined] ) -> : 超级().初始化() 自身.文件路径 = 文件路径 自身.信号 = 信号 @_retry(最大重试次数=10, 睡眠时间=0.1) def _非阻塞打开(自身) -> 可选[输入/输出.TextIOWrapper]: # 服务器可能已崩溃或尚未启动。 # 在这种情况下,在阻塞模式下调用 open() 将阻塞客户端。 避免此类问题,请以非阻塞模式打开,如果服务器不存在,将引发 OSError 如果服务器不存在,将引发 OSError fd = 操作系统.打开(自身.文件路径, 操作系统.O_WRONLY | 操作系统.非阻塞) 返回 操作系统.fdopen(fd, "wt") def 发送请求(自身, 请求: 文件计时器请求) -> : try: 文件 = 自身.非阻塞打开() 除了 异常 作为 e: 提升 破坏管道错误( "无法发送文件计时器请求,因为文件计时器服务器不可用。" ) 来自 e 文件: json 请求 = 请求.转换为 JSON() 使用 select.PIPE_BUF 作为最大长度写入请求可保证原子性。 如果 长度(json 请求) > 选择.PIPE_BUF: 提升 运行时错误( f"文件定时请求大于"{选择.PIPE_BUF}字节 f不支持:{json 请求}" ) 文件.(json 请求 + "输入文本翻译为简体中文为:\n") def 获取(自身, 范围 ID: 字符串, 过期时间: float) -> : 自身.发送请求( 请求=文件计时器请求( 工作进程 ID=操作系统.获取进程 ID(), 范围 ID=范围 ID, 过期时间=过期时间, 信号=自身.信号, ), ) def 释放(自身, 范围 ID: 字符串) -> : 自身.发送请求( 请求=文件计时器请求( 工作进程 ID=操作系统.获取进程 ID(), 范围 ID=范围 ID, 过期时间=-1, 信号=0 ), )
[文档] 文件定时服务器: "" 与 `FileTimerClient` 一起工作的服务器。客户端应运行在与运行此服务器的进程相同的宿主机上。 每个作业中的宿主机都应启动自己的定时器服务器,并且每个服务器实例管理本地工作者(在本地运行的)的定时器。 每个作业中的宿主机都应启动自己的定时器服务器,并且每个服务器实例管理本地工作者(在本地运行的)的定时器。 并且每个服务器实例管理本地工作者(在本地运行的)的定时器。 同一主机上的进程)。 参数: file_path: str, 要创建的 FIFO 特殊文件的路径。 max_interval: float, 每个看门狗循环的最大间隔(秒)。 daemon: bool, 是否以守护进程模式运行看门狗线程。 守护线程不会阻止进程停止。 log_event: 可调用函数[[Dict[str, str]], None],一个可选的回调用于 以 JSON 格式记录事件。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` def 初始化( 自身, 文件路径: 字符串, run_id: 字符串, 最大间隔: 浮点数 = 10, 守护进程: 布尔类型 = True, log_event: 可选[可调用[[字符串, 可选[文件计时器请求]], ]] = , ) -> : 自身._文件路径 = 文件路径 自身._运行 ID = 运行 ID 自身._最大间隔 = 最大间隔 自身._daemon = 守护进程 自身._timers: 字典[元组[int, 字符串] 文件计时器请求] = {} 自身._停车信号 = 自身._看门狗线程: 可选[线程.Thread] = None 自身._is_client_started = 如果 操作系统.路径.存在(自身.文件路径): 操作系统.删除(自身.文件路径) 操作系统.创建命名管道(自身.文件路径) 仅作测试。统计接收到的请求数量。 自身._request_count = 0 仅作测试。处理所有请求并停止服务器。 自身._run_once = 自身.记录事件 = ( log_event 如果 log_event not None 否则 lambda 名称, 请求: None ) 自身.最后一次进度时间 = int(时间.时间()) def 开始(自身) -> : 记录器.信息( 开始%s...最大间隔=%s,守护进程=%s文件路径=%s", 类型(自身).__name__, 自身.最大间隔, 自身.守护进程, 自身.文件路径_, ) 自身.看门狗线程 = 线程.Thread( 目标=自身.监视狗循环, 守护进程=自身._daemon ) 记录器.信息("启动看门狗线程...") 自身._看门狗线程.开始() 自身._log_event("看门狗启动", ) def 停止(自身) -> : 记录器.信息(停止%s", 类型(自身).__name__) 自身._停车信号 = 真实 如果 自身._看门狗线程: 记录器.信息("正在停止看门狗线程...") 自身._看门狗线程.加入(自身.最大间隔) 自身.看门狗线程 = None 否则: 记录器.信息(没有运行看门狗线程,什么也不做) 如果 操作系统.路径.存在(自身.文件路径): 操作系统.删除(自身.文件路径) 自身.日志事件(监视器已停止, ) def 运行一次(自身) -> : 自身.__运行一次 = 真实 如果 自身._看门狗线程: 记录器.信息("正在停止看门狗线程...") 自身._看门狗线程.加入() 自身.看门狗线程 = None 否则: 记录器.信息(没有运行看门狗线程,什么也不做) 如果 操作系统.路径.存在(自身._文件路径): 操作系统.删除(自身._文件路径) @staticmethod def 进程是否正在运行(进程 ID: int): "" 检查进程是否正在运行的函数 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` try: # 检查进程是否存在并且我们可以向其发送信号 操作系统.终止(进程 ID, 0) 返回 真实 除了 OSError: 返回 def _看门狗循环(自身) -> : 在阻塞模式下打开管道会阻塞服务器线程。 这是因为以下原因: 1. 客户端案例通常不会发生。 2. 我们正在单独的守护进程中运行看门狗循环。 # 线程,不会阻塞进程停止。 try: fd = 打开(自身._文件路径) 除了 异常: 记录器.异常("无法打开 FileTimerServer 管道") 提升 fd: 自身._客户端是否已启动 = 真实 while not 自身.停止信号: try: 运行一次 = 自身._运行一次 自身._运行看门狗(fd) 如果 运行一次: 断开 自身.上次进度时间 = int(时间.时间()) 除了 异常: 记录器.异常(运行看门狗时出错) def 运行看门狗(自身, fd: 输入/输出.文本 I/O 包装器) -> : 计时请求 = 自身._获取请求(fd, 自身.最大间隔) 自身.注册计时器(计时器请求) 现在 = 时间.时间() 收割的进程 ID = 集合() 杀死进程 = 收割信号 = 0 所有已过期的计时器 = 自身.获取已过期的计时器(现在) 记录过期计时器的调试信息( 自身._运行_id, { 进程 ID: [已过期计时器.to_json() expired_timer expired_timers] 进程 ID, expired_timers 所有已过期的定时器.项目() }, ) 工作进程 ID, 已过期的定时器 所有已过期的定时器.项目(): 记录器.信息( "收割 worker_pid="%s]. 过期定时器:%s", worker_pid, 自身._get_scopes(过期定时器), ) 收割的 worker 进程 ID.添加(worker 进程 ID) # 如果存在多个过期定时器,我们找到第一个定时器 有效信号(>0)的到期时间顺序。 过期定时器.排序(=lambda 定时器: 定时器.过期时间) 信号 = 0 已过期计时器 = None 计时器 已过期计时器列表: 自身.记录事件(计时器已过期, 计时器) 如果 计时器.信号 > 0: 信号 = 计时器.信号 已过期的计时器 = 计时器 断开 如果 信号 0: 记录器.信息( 未指定与 worker=[相关的信号%s不要收割它。, worker_pid ) 继续 如果 自身._reap_worker(worker_pid, 信号): 记录器.信息( "成功收割了 worker="%s] with signal=%s", worker_pid, 信号 ) 自身._log_event("终止工作进程", 过期定时器) 终止进程 = 真实 回收信号 = 信号 否则: 记录器.错误( "回收工作进程失败 worker=["%s]. 将在下次看门狗中重试。, worker 进程 ID, ) 如果 终止进程 收割信号 > 0: 记录器.信息( "终止服务器进程=["%s] 因超时定时器, 操作系统.获取进程 ID(), ) 自身._reap_worker(操作系统.获取进程 ID(), 收割信号) 自身.清除定时器(收割的进程 ID) def 获取作用域(自身, 计时器请求: 列表[文件计时器请求]) -> 列表[字符串]: 返回 [r.范围 ID r 计时器请求] def 获取请求( 自身, fd: 输入/输出.TextIOWrapper, 最大间隔: 浮点数 ) -> 列表[文件计时请求]: 开始 = 时间.时间() 请求 = [] while not 自身._停车信号 自身._run_once: 对于命名管道,当至少有一个写入者打开时,readline() 是阻塞的。 它仅在写入端调用 flush() 时返回。 注意,flush() 在 close() 内部自动调用。 在最后一个写入者关闭后,readline() 不会阻塞。 当到达文件末尾时将返回空字符串。 由于客户端始终打开管道,写入消息然后立即关闭管道, 因此下面的 readline()调用不会长时间阻塞。 json 请求 = fd.读取行() 如果 长度(JSON 请求) == 0: 如果 自身._只运行一次: 断开 时间.睡眠(最小(最大间隔, 1)) 否则: 请求 = json.loads(JSON 请求) 进程 ID = 请求["进程 ID"] 范围 ID = 请求["范围 ID"] 过期时间 = 请求["过期时间"] 信号 = 请求[信号] 请求.追加( 文件定时请求( 工作进程 ID=进程 ID, 范围 ID=范围 ID, 过期时间=过期时间, 信号=信号, ) ) 现在 = 时间.时间() 如果 现在 - 开始 > 最大间隔: 断开 返回 请求 def 注册计时器(自身, 计时器请求: 列表[文件计时器请求]) -> : 请求 计时请求: 进程 ID = 请求.工作进程 ID 范围 ID = 请求.范围 ID 过期时间 = 请求.过期时间 自身.请求次数 += 1 key = (进程 ID, scope_id) # 负过期时间是一个发布调用的代理 如果 过期时间 < 0: 如果 key 自身._定时器: 删除 自身._计时器[] 否则: 自身._定时器[] = 请求 def 清除定时器(自身, 工作进程 ID: 集合[int]) -> : 进程 ID, 范围 ID 列表(自身._计时器.()): 如果 进程 ID 工作进程 ID not 文件计时服务器.进程是否正在运行(进程 ID): 删除 自身._计时器[进程 ID, 范围 ID] def 获取已过期的计时器(自身, 截止日期: float) -> 字典[int, 列表[文件计时器请求]] # 进程 ID -> [计时器请求...] 过期计时器: 字典[int, 列表[文件计时器请求]] = {} 请求 自身._计时器.(): 如果 请求.过期时间 截止日期: 已过期作用域 = 过期定时器.setdefault(请求.工作进程 ID, [] 过期作用域.追加(请求) 返回 过期定时器 def 收割工人(自身, worker_pid: int, 信号: int) -> 布尔: try: 操作系统.终止(worker_pid, 信号) 返回 真实 除了 进程查找错误: 记录器.信息("进程 pid="%s不存在。跳过, worker_pid) 返回 真实 除了 异常: 记录器.异常(进程终止错误 pid=%s", 工作进程 pid) 返回 def 获取最后进度时间(自身) -> int: 返回 自身._最后进度时间 如果 自身._is_client_started 是客户端启动了 否则 int(时间.时间())

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源