快捷键

torch.multiprocessing.spawn 源代码

# mypy: 允许未类型化定义
导入 记录日志
导入 多进程
导入 multiprocessing.connection
导入 操作系统
导入 酸菜
导入 信号
导入 系统
导入 tempfile
导入 时间
导入 警告
from concurrent.futures 导入 as_completed, ThreadPoolExecutor
from 打字 导入 可选

from . 导入 _prctl_pr_set_pdeathsig  # 类型: 忽略[attr-defined]


环境变量并行启动 = 火炬并行启动

日志 = 记录.获取日志记录器(__name__)

全部 = [
    进程上下文,
    进程异常,
    "ProcessExitedException",
    "ProcessRaisedException",
    "spawn",
    "SpawnContext",
    启动进程,
]


 ProcessException(异常):
    __slots__ = [错误索引, 错误进程 ID]

    def __init__(, 信息: 字符串, 错误索引: int, 进程 ID: int):
        超级().__init__(信息)
        .msg = msg
        .错误索引 = 错误索引
        .进程 ID = 进程 ID

    def __reduce__():
        返回 类型(), (.信息, .错误索引, .进程 ID)


 处理引发的异常(ProcessException):
    代码抛出异常导致进程失败时引发的异常。

    def __init__(
        ,
        信息: 字符串,
        错误索引: int,
        错误进程 ID: int,
    ):
        超级().__init__(信息, 错误索引, 错误进程 ID)


 进程退出异常(ProcessException):
    当进程因信号失败或以特定代码退出时引发的异常。

    __slots__ = ["退出码"]

    def __init__(
        ,
        信息: 字符串,
        错误索引: int,
        错误进程 ID: int,
        退出码: int,
        信号名称: 可选[字符串] = ,
    ):
        超级().__init__(信息, 错误索引, 错误进程 ID)
        .退出代码 = 退出码
        .信号名称 = 信号名称

    def __reduce__():
        返回 (
            类型(),
            (.信息, .错误索引, .进程 ID, .退出码, .信号名称),
        )


def _包装(函数, i, 参数, 错误文件):
    # prctl(2) 是 Linux 特定的系统调用。
    # 在其他系统中,此函数调用没有任何效果。
    # 这是为了确保非守护进程子进程可以在
    # 父进程在它们之前终止时终止。
    prctl_pr_set_pdeathsig(信号.SIGINT)

    尝试:
        函数(i, *参数)
    除了 KeyboardInterrupt:
        通过  # SIGINT; 被父进程终止,不做任何操作
    除了 异常:
        # 将异常传播给父进程,保留原始跟踪信息
        导入 跟踪回溯

         打开(错误文件, wb)  文件句柄:
            pickle.导出(跟踪回溯.format_exc(), fh)
        系统模块.退出(1)


 进程上下文:
    def __init__(, 进程, 错误文件):
        .错误文件 = 错误文件
        .进程 = 进程
        .哨兵 = {
            进程.信号兵: 索引  索引, 处理  列举(进程)
        }

    def 进程 ID():
        返回 [int(进程.进程 ID)  处理  .进程]

    def _join_procs_with_timeout(, 超时: float):
        尝试使用共享超时连接所有进程。
        末端 = 时间.单调的() + 超时
         流程  .流程:
            等待时间 = 最大值(0, 末端 - 时间.单调的())
            流程.连接(等待时间)

    def 连接(
        , 超时: 可选[float] = , 宽限期: 可选[float] = 
    ):
        r在 spawn 上下文中加入一个或多个进程。

尝试在此 spawn 上下文中加入一个或多个进程。
如果其中之一以非零退出状态退出,
则此函数将杀死剩余的进程(可选地带有宽限期)。
并且抛出一个异常,指出第一个进程退出的原因。

如果所有进程都成功连接,则返回 ``True``。
如果还有更多进程需要连接,则返回 ``False``。

参数:
timeout (浮点数): 在放弃等待之前等待这么长时间(以秒为单位)。
grace_period (浮点数):当任何进程失败时,等待这么长时间(以秒为单位)
让其他进程优雅地关闭,然后再终止它们。如果它们
仍然没有退出,等待另一个宽限期后再杀死它们。
"文档"
        # 确保这个函数可以在我们完成时被调用。
        如果 长度(.星际迷航者) == 0:
            返回 真实

        等待任何进程失败或所有进程都成功。
        准备就绪 = 多进程.连接.等待(
            .哨兵.(),
            超时=超时,
        )

        错误索引 = 
         哨兵  准备就绪:
            索引 = .哨兵.流行(哨兵)
            流程 = .流程[索引]
            处理.连接()
            如果 处理.返回码 != 0:
                错误索引 = 索引
                断开

        # 如果没有错误则返回。
        如果 错误索引 is :
            返回是否所有进程都已加入。
            返回 长度(.哨兵) == 0
        发生错误。在返回前清理所有进程。
        首先,允许进程有自我关闭的宽限期。
        如果 宽限期 is  :
            ._join_procs_with_timeout(宽限期)
        然后,终止仍然存活的过程。首先尝试发送 SIGTERM 信号。
         进程  .进程们:
            如果 进程.is_alive():
                日志.警告("终止进程"%s通过 SIGTERM 信号, 进程.进程 ID)
                进程.终止()

        如果进程在另一个 grace_period 后仍未停止,则尝试使用 SIGKILL。
        原因与 Python 信号处理限制有关
        仅限于主线程,如果主线程处于 C/C++环境中且陷入停滞,则不会
        处理它。我们见过进程因未处理而卡住。
        因此发送 SIGTERM。
        ._join_procs_with_timeout(30 如果 宽限期 is  否则 宽限期)
         流程  .流程们:
            如果 流程.是否存活():
                日志.警告(
                    "无法关闭进程"%s通过 SIGTERM,强制退出通过 SIGKILL,
                    进程.进程 ID,
                )
                处理.终止()
            处理.连接()

        # 文件只有在进程崩溃时才会创建。
        失败的过程 = .进程[错误索引]
        如果  os.访问(.错误文件[错误索引] os.R_OK):
            退出码 = .进程[错误索引].exitcode
            如果 exitcode < 0:
                尝试:
                    名称 = 信号.信号(-exitcode).名称
                除了 ValueError:
                    名称 = f"<未知信号"{-退出码}>
                提升 进程退出异常(
                    f"进程"{错误索引:d}因信号终止{名称}",
                    错误索引=错误索引,
                    进程 ID=失败的进程.进程 ID,
                    退出码=退出码,
                    信号名称=名称,
                )
            else:
                提升 进程退出异常(
                    f"进程"{错误索引:d}终止退出码{退出码:d}",
                    错误索引=错误索引,
                    错误进程 ID=失败进程.进程 ID,
                    退出码=退出码,
                )

         打开(.错误文件[错误索引] rb)  fh:
            原始跟踪 = pickle.加载(fh)
        msg = f"\n\n
\n\n-- 处理{错误索引:d}以以下错误终止:输入文本翻译为简体中文为:\n"
        msg += 原始跟踪
        提升 处理引发的异常(信息, 错误索引, 失败进程.进程 ID)


[文档]class SpawnContext(ProcessContext): def __init__(self, processes, error_files): warnings.warn("SpawnContext 已更名为 ProcessContext,自 1.4 版本起。") super().__init__(processes, error_files)
# Note: [start_processes] # mp.start_processes 同时处理 start_method='spawn'和'fork'。它应该是一个 # 比 mp.spawn 更通用的 API。目前我们只记录 mp.spawn,因为它目前是 # 与 CUDA 兼容的 start_method。然而,在像 Ipython 笔记本这样的环境中, # 比 'spawn' 工作得更好。我们为 mp.spawn 创建的每个辅助函数确实 # 足够通用,并且像 XLA 这样的后端可以在 Colab 笔记本中重用它们。 # 目前我们首先添加这个 API,我们可以考虑将来将其添加到文档中。 # 在未来可能需要。 def 启动进程( 函数, 参数=(), nprocs=1, 连接
=, 守护进程=错误, 启动方法="创建", ): # 在某些情况下加速性能(见 https://github.com/pytorch/pytorch/issues/133010), # 此函数将在 start_method 为 'forkserver' 时并行启动进程。 请通过设置环境变量(TORCH_MP_PARALLEL_START)为 1 来选择此性能优化。 todo:调查为什么 spawn 与 threadpool 不兼容并引发 SIGINT 如果 ( 启动方法 == "forkserver" os.环境.获取(环境变量并行启动, "0") == 1 ): 日志.信息(并行启动进程) 开始并行 = 真实 else: 将环境变量 TORCH_MP_PARALLEL_START 设置为 0 以禁用并行启动 开始并行 = mp = 多进程.获取上下文(开始方法) 错误文件 = [] * 进程数 进程 = [] * 进程数 def 启动进程(i): # 每个进程都分配一个文件来写入跟踪信息。 使用文件非空来表示异常 发生错误(而非预期的关闭)。注意:之前 使用了一个多进程的队列,但这可能会容易出错 死锁,所以我们选择了一个更简单的单次解决方案 进程间消息 tf = 临时文件.命名临时文件( 前缀="pytorch 错误文件-", 后缀=".pickle", 删除= ) tf.关闭() os.解链(tf.名称) 处理 = mp.流程( 目标=_包装, 参数=(函数, i, 参数, tf.名称), 守护进程=守护进程, ) 进程.开始() 返回 i, 进程, tf.名称 如果 启动并行: i 范围(进程数): 索引, 进程, tf 名称 = 启动流程(i) 错误文件[索引] = tf 名称 流程[索引] = 流程 else: 线程池执行器(max_workers=进程数) 执行器: 期货 = [执行器.提交(启动流程, i) i 范围(进程数)] fut 当所有进程完成时(期货): 索引, 进程, tf_name = fut.结果() 索引和进程排名需要相同。 错误文件[索引] = tf_name 流程[索引] = 流程 上下文 = 进程上下文(流程, 错误文件) 如果 连接: 返回 上下文 # 循环直到返回 True 或引发异常。 上下文.连接(): 通过
[文档]def spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method="spawn"): r"""启动 ``nprocs`` 个进程运行 ``fn`` 并传递 ``args``。 如果其中一个进程以非零退出状态退出,则 剩余进程被终止,并抛出异常,异常原因为终止原因。如果在子进程中捕获到异常,则将其转发,并在父进程中抛出的异常中包含其堆栈跟踪。 在子进程中捕获到异常的情况下,它将被转发,并在父进程中抛出的异常中包含其堆栈跟踪。 如果在子进程中捕获到异常,则将其转发,并在父进程中抛出的异常中包含其堆栈跟踪。 如果在子进程中捕获到异常,则将其转发,并在父进程中抛出的异常中包含其堆栈跟踪。 参数: fn (函数): 函数作为入口点被调用 启动进程。此函数必须在顶部定义 模块的级别,以便它可以被序列化和实例化。 这是多进程强制要求的。 函数调用方式为 `fn(i, *args)`,其中 `i` 是 进程索引,`args` 是传递的参数元组。 的参数列表。 args (元组): 传递给 ``fn`` 的参数。 nprocs (整数): 要生成的进程数。 join (布尔值): 对所有进程执行阻塞式 join。 daemon (布尔值): 生成的进程的守护进程标志。如果设置为 True, 守护进程将被创建。 start_method (str): (已弃用) 此方法始终使用 `spawn` 作为启动方法。要使用不同的启动方法 使用 `start_processes()`。 返回值: 如果 `join` 为 `True`,则返回 `None` 如果 `join` 为 `False`,则返回 `:class:`~ProcessContext` 对象 """ 如果 start_method 不等于"spawn": msg = ( f"此方法仅支持 start_method=spawn(获取:{start_method})。\n" "要使用不同的 start_method,请使用:\n\t\t" "torch.multiprocessing.start_processes(...)" ) 警告:msg,未来警告,堆栈级别=2 返回 start_processes(fn, args, nprocs, join, daemon, start_method="spawn")

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源