• 文档 >
  • 模块代码 >
  • torch >
  • torch.distributed >
  • torch.distributed.elastic.agent.server.本地弹性代理
快捷键

torch.distributed.elastic.agent.server.local_elastic_agent 的源代码

#!/usr/bin/env python3
# mypy: 允许未类型化定义

版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。


导入 json
导入 操作系统
导入 信号
导入 套接字
导入 时间
导入 uuid
来自 字符串 导入 模板
来自 打字 导入 任意, 可选, 类型检查

导入 torch.distributed.elastic 定时器 作为 计时器
来自 torch.distributed.elastic 导入 事件
来自 torch.distributed.elastic.agent.server.api 导入 (
    运行结果,
    简单弹性代理,
    工作组,
    工作规范,
    工作状态,
)
来自 torch.distributed.elastic.agent.server.健康检查服务器 导入 (
    创建健康检查服务器,
    健康检查服务器,
)
来自 torch.distributed.elastic.metrics.api 导入 专业
来自 torch.distributed.elastic.multiprocessing 导入 (
    日志规范,
    P 上下文,
    启动进程,
)
来自 torch.distributed.elastic.utils 导入 
来自 torch.distributed.elastic.utils.日志 导入 获取日志记录器


如果 类型检查:
    来自 torch.distributed.elastic.events.api 导入 事件元数据值

日志记录器 = 获取日志记录器(__name__)

__all__ = [
    本地弹性代理,
    启用文件计时器,
    弹性计时器文件,
    "火炬弹性健康检查端口",
]

火炬弹性启用文件计时器 = "火炬弹性启用文件计时器"
火炬弹性健康检查端口 = "火炬弹性健康检查端口"
火炬弹性定时文件 = "火炬弹性定时文件"


[文档] 本地弹性代理(SimpleElasticAgent): torchelastic.agent.server.ElasticAgent 的实现,用于处理主机本地工作者。 此代理在每个主机上部署,并配置为启动 ``n`` 个工作者。 当使用 GPU 时,``n`` 对应主机上可用的 GPU 数量。 本地代理不与其他主机上部署的其他本地代理通信,即使工作进程可能进行跨主机通信。工作进程 ID 被解释为本地进程。代理作为一个整体启动和停止所有工作进程。 。工作进程 ID 被解释为本地进程。代理作为一个整体启动和停止所有工作进程。 。代理作为一个整体启动和停止所有工作进程。 工人函数及其传递给工人函数的参数必须 Python 多进程兼容。要传递多进程数据结构 对工作者,你可以创建相同的多进程数据结构 上下文作为指定的 `start_method` 并将其作为函数参数传递。 ``exit_barrier_timeout``指定等待其他代理完成的时间(以秒为单位) 这作为一种安全网,用于处理工作者在不同时间完成的情况,以防止代理将提前完成的工作者视为缩容事件。强烈建议这样做 它是处理工作者在不同时间完成的情况的安全网,以防止代理将提前完成的工作者视为缩容事件。强烈建议这样做 它是处理工作者在不同时间完成的情况的安全网,以防止代理将提前完成的工作者视为缩容事件。强烈建议这样做 用户代码处理确保工作者以同步方式终止 方式而不是依赖于 exit_barrier_timeout。 基于命名管道的看门狗可以在```LocalElasticAgent```中启用,如果 环境变量 `TORCHELASTIC_ENABLE_FILE_TIMER` 的值为 1 在```LocalElasticAgent```进程中已定义。 可选地,还可以设置另一个环境变量```TORCHELASTIC_TIMER_FILE```。 可以使用唯一的文件名来设置命名管道。如果未设置环境变量```TORCHELASTIC_TIMER_FILE```, 则```LocalElasticAgent```将使用默认设置。 将内部创建一个唯一的文件名并将其设置为环境 变量 `TORCHELASTIC_TIMER_FILE`,以及这个环境变量将 传播到工作进程以允许它们连接到相同的 命名管道,LocalElasticAgent 使用。 日志被写入指定的日志目录。默认情况下,每条日志行都会被 前缀为 ``[${role_name}${local_rank}]:`` (例如 ``[trainer0]: foobar``)。 可以通过传递一个模板字符串来自定义日志前缀 ``log_line_prefix_template`` 参数。 以下宏(标识符)在运行时会被替换: ``${role_name}, ${local_rank}, ${rank}``。例如,要将每条日志行前缀设置为全局排名而不是本地排名,请设置 ``log_line_prefix_template = "[${rank}]:```。 全局排名而不是本地排名,设置 ``log_line_prefix_template = "[${rank}]:```。 示例启动函数 :: def 训练器(args) -> str: return "进行训练" def 主函数(): start_method="spawn" shared_queue = multiprocessing.get_context(start_method).Queue() spec = WorkerSpec( role="trainer", local_world_size=nproc_per_process, entrypoint=trainer, args=("foobar",) ...<OTHER_PARAMS...>) agent = LocalElasticAgent(spec, start_method) results = agent.run() if results.is_failed(): print("训练器失败") 否则: print(f"rank 0 返回值: {results.return_values[0]}") # 打印 -> rank 0 返回值: do train 例子启动二进制 :: def 主函数(): spec = WorkerSpec( 角色="训练器", local_world_size=nproc_per_process, entrypoint="/usr/local/bin/trainer" args=("--trainer-args", "foobar") ...<OTHER_PARAMS...>) agent = LocalElasticAgent(spec) results = 代理运行() 如果 results.is_failed() 不为真: 打印("二进制启动没有返回值") ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 定义 初始化( 自身, 规格: 工作器规范, 日志规范: 日志规范, 开始方法="spawn", 退出屏障超时: 浮点数 = 300, 日志行前缀模板: 可选[字符串] = , ): 超级().初始化(规格, 退出屏障超时) 自身._启动方法 = 启动方法 自身._pcontext: 可选[P 上下文] = None 自身._rdzv_handler = 规格.rdzv_handler 自身._log_line_prefix_template = 日志行前缀模板 自身._工作守护进程: 可选[计时器.文件定时服务器] = None 自身._日志规范 = 日志规范 自身._健康检查服务器: 可选[健康检查服务器] = None 定义 _设置本地看门狗(自身, 环境变量: 字典[int, 字典[字符串, 字符串]]) -> : 启用看门狗环境变量名 = TORCHELASTIC_ENABLE_FILE_TIMER 看门狗启用 = 操作系统.获取环境变量(启用看门狗环境变量名) 监视犬文件环境变量名 = TORCHELASTIC 定时器文件 监视犬文件路径 = 操作系统.获取环境变量(监视犬文件环境变量名) 如果 看门狗启用 not None 字符串(看门狗启用) == "1": 如果 看门狗文件路径 : 看门狗文件路径 = /tmp/watchdog_timer_ + 字符串(uuid.uuid4()) 记录器.信息(启动一个 FileTimerServer%s..., watchdog_file_path) 如果 not 环境变量: 记录器.警告( 环境变量为空,使用空 run_id 作为 FileTimerServer 的运行 ID ) 运行 ID = 请提供需要翻译的文本 否则: 运行 ID = 环境变量[0] [TORCHELASTIC 运行 ID] 自身._worker 看门狗 = 计时器.文件定时服务器( 文件路径=看门狗文件路径, run_id=run_id, 最大间隔=0.1, 守护进程=True, log_event=自身._记录看门狗事件, ) 自身._worker_watchdog.开始() 记录器.信息("文件计时服务器已启动") 否则: 记录器.信息( "环境变量 '"%s'未找到。不要启动文件计时服务器。', 启用看门狗环境变量名, ) # 将看门狗文件环境变量传播到工作进程 如果 看门狗文件路径 not : 工作进程环境 环境变量.(): 工作环境[监视文件环境名称] = 监视文件路径 @staticmethod 定义 获取当前时间秒数() -> int: 返回 int(时间.时间()) 定义 设置健康检查(自身) -> : 健康检查端口环境名称 = TORCHELASTIC 健康检查端口 健康检查端口 = 操作系统.获取环境变量(健康检查端口环境名称) 如果 健康检查端口 not : 记录器.信息( 找到健康检查端口%s: %s", 健康检查端口环境名称, 健康检查端口, ) 如果 自身._worker_watchdog : 记录器.信息( "文件计时服务器不存在,使用当前时间作为虚拟回调" ) 活着回调 = 本地 Elastic 代理._获取当前时间秒数 否则: 活着回调 = 自身.工作犬监控.获取最后进度时间 try: 健康检查端口号(整数) = int(健康检查端口号) 自身.健康检查服务器 = 创建健康检查服务器( 生存回调=生存回调, 端口=健康检查端口整数, 超时=60, ) 自身._健康检查服务器.开始() 除了 ValueError: 记录器.信息( "无效的健康检查端口值: '"%s,期望整数。不启动健康检查服务器。, 健康检查端口, ) 否则: 记录器.信息( "环境变量 '"%s'未找到。不要启动健康检查。', healthcheck_port_env_name, ) 定义 _获取完全限定主机名(自身) -> 字符串: 返回 套接字.获取完全限定域名(套接字.获取主机名()) 定义 _log_watchdog_event( 自身, 名称: 字符串, 请求: 可选[计时器.文件计时器请求] ) -> : wg = 自身.工作组 规范 = wg.规范 md = {看门狗事件: 名称} 如果 请求 not : md["worker_pid"] = 字符串(请求.worker 进程 ID) md["scope_id"] = 请求.scope_id md["expiration_time"] = 字符串(请求.过期时间) md[信号] = 字符串(请求.信号) md_str = json.压缩包(md) 状态 = 运行中 元数据: 字典[字符串, 事件元数据值] = { 运行_id: 规格.rdzv 处理器.get_run_id(), 全局_rank: , "分组排名": wg.群组排名, "工作者 ID": , "角色": 规格.角色, 主机名: 自身._获取完全限定主机名(), 状态: 状态, 总运行时间: 自身.总执行时间, rdzv 后端: 规格.rdzv 处理器.获取后端(), 原始错误: , "元数据": md 字符串, "agent 重启": 规格.最大重启次数 - 自身._剩余重启次数, } # 注意:事件的'metadata'字段稍后转换为 TorchelasticStatusLogEntry。 # 事件的'name'字段在 TorchelasticStatusLogEntry 中不被使用。 事件 = 活动.活动( 名称=名称, =活动.事件源.代理, 元数据=元数据 ) 活动.记录(事件) # pyre-fixme[56]: Pyre 无法推断装饰器的类型 `torch.distributed.elastic.metrics.prof` @prof 定义 停止工作线程( 自身, 工作者组: 工作组, 是重启: 布尔类型 = ) -> : 自身.关闭(is_restart=is_restart) # pyre-fixme[56]: Pyre 无法推断装饰器的类型 # `torch.distributed.elastic.metrics.prof`. @prof 定义 开始工作进程(自身, 工作组: 工作组) -> 字典[int, 任意]: 规范 = 工作组.规范 存储 = 工作组.存储 断言 存储 not None 重启次数 = 规格.最大重启次数 - 自身._剩余重启次数 使用代理存储: 布尔类型 = 规格.rdzv 处理器.使用代理存储 记录器.信息(使用代理存储:%s", 使用代理存储) 参数: 字典[int, 元组] = {} 环境变量: 字典[int, 字典[字符串, 字符串]] = {} 日志行前缀: 可选[字典[int, 字符串]] = ( {} 如果 自身._日志行前缀模板 否则 None ) 工作者 工作组.工人: 本地排名 = 工人.本地排名 工作环境 = { 本地排名: 字符串(本地排名), RANK: 字符串(工人.全球排名), "GROUP_RANK": 字符串(工作组.群组排名), "角色等级": 字符串(工人.角色等级), "角色名称": 规格.角色, "本地世界大小": 字符串(规格.本地世界大小), WORLD_SIZE: 字符串(工人.世界大小), "GROUP_WORLD_SIZE": 字符串(工作组.世界组大小), "ROLE_WORLD_SIZE": 字符串(工人.角色世界大小), "主地址": 工作组.主机地址, "主端口": 字符串(工作组.主机端口), "TORCHELASTIC_RESTART_COUNT": 字符串(重启次数), "TORCHELASTIC_MAX_RESTARTS": 字符串(规格.最大重启次数), TORCHELASTIC 运行 ID: 规格.rdzv 处理器.get_run_id(), "TORCHELASTIC_USE_AGENT_STORE": 字符串(使用代理存储), PyTorch NCCL 异步错误处理: 操作系统.获取环境变量( PyTorch NCCL 异步错误处理, 字符串(1) ), } 如果 OMP_NUM_THREADS 操作系统.环境: worker_env[OMP_NUM_THREADS] = 操作系统.环境[OMP_NUM_THREADS] 如果 自身.日志行前缀模板: 日志行前缀 = 模板( 自身.日志行前缀模板 ).安全替换( 角色名称=规格.角色, 排名=工人.全球排名, 本地排名=本地排名, ) 日志行前缀[本地排名] = 日志行前缀 环境变量[本地排名] = 工作环境 工作参数 = 列表(规格.参数) 工作参数 = 宏定义.替代(工作参数, 字符串(本地排名)) 参数[本地排名] = 元组(工作参数) 自身._设置本地看门狗(环境变量=环境变量) 自身._设置健康检查() 断言 规格.入口 not None 断言 自身._logs_specs not None 自身._pcontext = 启动进程( 名称=规格.角色, 入口=规格.入口, 参数=参数, 环境变量=环境变量, 日志规范=自身._logs_specs, 日志行前缀=日志行前缀, 开始方法=自身._start_method, ) 返回 自身._pcontext.进程 ID() 定义 关闭( 自身, 死亡信号: 信号.信号 = 信号.SIGTERM, is_restart: 布尔类型 = ) -> : 如果 自身._worker_watchdog not : 自身._worker_watchdog.停止() 自身._worker_watchdog = None 如果 自身._health_check_server not : 自身._health_check_server.停止() 自身.健康检查服务器 = None 如果 自身._pcontext: 自身._pcontext.关闭(死亡信号) 如果 not 是否重启 自身._rdzv_handler: 自身._rdzv_handler.关闭() # pyre-fixme[56]: Pyre 无法推断装饰器的类型 # `torch.distributed.elastic.metrics.prof`. @prof 定义 监控工作者(自身, 工作组: 工作组) -> 运行结果: 角色 = 工作组.规格.角色 工作进程 ID = {w.标识符 w 工作组.工人} 断言 自身._上下文 not None 进程 ID = 集合(自身._上下文.进程 ID().()) 如果 工作进程 ID != pc_pids: 记录器.错误( "[%s工作进程的 PID 与进程上下文 PID 不匹配。 "期望:"%s,实际:%s", 角色, 工作进程 ID, pc_pids, ) 返回 运行结果(状态=工作状态.未知) 结果 = 自身._上下文.等待(0) 如果 结果: 如果 结果.是否失败(): # 将本地排名失败映射到全局排名 工作失败 = {} 本地排名, 失败 结果.失败次数.项目(): 工作者 = 工作组.工人[本地排名] 工人故障[工人.全球排名] = 失败 返回 运行结果( 状态=工作状态.失败, 失败次数=工人故障, ) 否则: # 将 ret_val_queue 复制到一个具有全局 ranks 的映射中 工人返回值 = {} 本地排名, 返回值 结果.返回值.项目(): 工作者 = 工作组.工人[本地排名] 工人返回值[工人.全球排名] = 返回值 返回 运行结果( 状态=工作状态.成功, 返回值=工人返回值, ) 否则: 返回 运行结果(状态=工作状态.健康的)

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源