# mypy: 允许未类型化定义
导入
记录日志
导入
多进程
导入 multiprocessing.connection
导入
操作系统
导入
酸菜
导入
信号
导入
系统
导入 tempfile
导入
时间
导入
警告
from concurrent.futures 导入 as_completed, ThreadPoolExecutor
from 打字
导入
可选
from . 导入 _prctl_pr_set_pdeathsig
# 类型: 忽略[attr-defined]
环境变量并行启动 =
火炬并行启动
日志 =
记录.
获取日志记录器(__name__)
全部 = [
进程上下文,
进程异常,
"ProcessExitedException",
"ProcessRaisedException",
"spawn",
"SpawnContext",
启动进程,
]
类 ProcessException(
异常):
__slots__ = [错误索引,
错误进程 ID]
def __init__(我,
信息:
字符串,
错误索引: int,
进程 ID: int):
超级().__init__(
信息)
我.msg = msg
我.
错误索引 =
错误索引
我.
进程 ID =
进程 ID
def __reduce__(我):
返回
类型(
我), (
我.
信息,
我.
错误索引,
我.
进程 ID)
类
处理引发的异常(ProcessException):
代码抛出异常导致进程失败时引发的异常。
def __init__(
我,
信息:
字符串,
错误索引: int,
错误进程 ID: int,
):
超级().__init__(
信息,
错误索引,
错误进程 ID)
类
进程退出异常(ProcessException):
当进程因信号失败或以特定代码退出时引发的异常。
__slots__ = ["退出码"]
def __init__(
我,
信息:
字符串,
错误索引: int,
错误进程 ID: int,
退出码: int,
信号名称:
可选[
字符串] =
无,
):
超级().__init__(
信息,
错误索引,
错误进程 ID)
我.
退出代码 =
退出码
我.
信号名称 =
信号名称
def __reduce__(我):
返回 (
类型(
我),
(我.
信息,
我.
错误索引,
我.
进程 ID,
我.
退出码,
我.
信号名称),
)
def _包装(
函数, i,
参数,
错误文件):
# prctl(2) 是 Linux 特定的系统调用。
# 在其他系统中,此函数调用没有任何效果。
# 这是为了确保非守护进程子进程可以在
# 父进程在它们之前终止时终止。
prctl_pr_set_pdeathsig(
信号.SIGINT)
尝试:
函数(i, *
参数)
除了 KeyboardInterrupt:
通过
# SIGINT; 被父进程终止,不做任何操作
除了
异常:
# 将异常传播给父进程,保留原始跟踪信息
导入
跟踪回溯
与
打开(
错误文件,
wb)
是
文件句柄:
pickle.导出(
跟踪回溯.format_exc(), fh)
系统模块.
退出(1)
类
进程上下文:
def __init__(我,
进程,
错误文件):
我.
错误文件 =
错误文件
我.
进程 =
进程
我.
哨兵 = {
进程.
信号兵:
索引
为
索引,
处理
在
列举(
进程)
}
def 进程 ID(
我):
返回 [int(
进程.
进程 ID)
为
处理
在
我.
进程]
def _join_procs_with_timeout(我,
超时: float):
尝试使用共享超时连接所有进程。
末端 =
时间.
单调的() +
超时
为
流程
在
我.
流程:
等待时间 =
最大值(0,
末端 -
时间.
单调的())
流程.
连接(
等待时间)
def 连接(
我,
超时:
可选[float] =
无,
宽限期:
可选[float] =
无
):
r在 spawn 上下文中加入一个或多个进程。
尝试在此 spawn 上下文中加入一个或多个进程。
如果其中之一以非零退出状态退出,
则此函数将杀死剩余的进程(可选地带有宽限期)。
并且抛出一个异常,指出第一个进程退出的原因。
如果所有进程都成功连接,则返回 ``True``。
如果还有更多进程需要连接,则返回 ``False``。
参数:
timeout (浮点数): 在放弃等待之前等待这么长时间(以秒为单位)。
grace_period (浮点数):当任何进程失败时,等待这么长时间(以秒为单位)
让其他进程优雅地关闭,然后再终止它们。如果它们
仍然没有退出,等待另一个宽限期后再杀死它们。
"文档"
# 确保这个函数可以在我们完成时被调用。
如果
长度(
我.
星际迷航者) == 0:
返回
真实
等待任何进程失败或所有进程都成功。
准备就绪 =
多进程.
连接.
等待(
我.
哨兵.
键(),
超时=
超时,
)
错误索引 =
无
为
哨兵
在
准备就绪:
索引 =
我.
哨兵.
流行(
哨兵)
流程 =
我.
流程[
索引]
处理.
连接()
如果
处理.
返回码 != 0:
错误索引 =
索引
断开
# 如果没有错误则返回。
如果
错误索引 is
无:
返回是否所有进程都已加入。
返回
长度(
我.
哨兵) == 0
发生错误。在返回前清理所有进程。
首先,允许进程有自我关闭的宽限期。
如果
宽限期 is
不
无:
我._join_procs_with_timeout(
宽限期)
然后,终止仍然存活的过程。首先尝试发送 SIGTERM 信号。
为
进程
在
我.
进程们:
如果
进程.is_alive():
日志.
警告(
"终止进程"%s
通过 SIGTERM 信号,
进程.
进程 ID)
进程.
终止()
如果进程在另一个 grace_period 后仍未停止,则尝试使用 SIGKILL。
原因与 Python 信号处理限制有关
仅限于主线程,如果主线程处于 C/C++环境中且陷入停滞,则不会
处理它。我们见过进程因未处理而卡住。
因此发送 SIGTERM。
我._join_procs_with_timeout(30
如果
宽限期 is
无
否则
宽限期)
为
流程
在
我.
流程们:
如果
流程.
是否存活():
日志.
警告(
"无法关闭进程"%s
通过 SIGTERM,强制退出通过 SIGKILL,
进程.
进程 ID,
)
处理.
终止()
处理.
连接()
# 文件只有在进程崩溃时才会创建。
失败的过程 =
我.
进程[
错误索引]
如果
不 os.
访问(
我.
错误文件[
错误索引
] os.R_OK):
退出码 =
我.
进程[
错误索引].exitcode
如果 exitcode < 0:
尝试:
名称 =
信号.
信号(-exitcode).
名称
除了 ValueError:
名称 = f
"<未知信号"{-
退出码}
>
提升
进程退出异常(
f"进程"{
错误索引:d}
因信号终止{
名称}",
错误索引=
错误索引,
进程 ID=
失败的进程.
进程 ID,
退出码=
退出码,
信号名称=
名称,
)
else:
提升
进程退出异常(
f"进程"{
错误索引:d}
终止退出码{
退出码:d}",
错误索引=
错误索引,
错误进程 ID=
失败进程.
进程 ID,
退出码=
退出码,
)
与
打开(
我.
错误文件[
错误索引
]
rb)
是 fh:
原始跟踪 = pickle.
加载(fh)
msg = f"
\n\n
\n\n-- 处理{
错误索引:d}
以以下错误终止:
输入文本翻译为简体中文为:\n"
msg += 原始跟踪
提升
处理引发的异常(
信息,
错误索引,
失败进程.
进程 ID)
[文档]class SpawnContext(ProcessContext):
def __init__(self, processes, error_files):
warnings.warn("SpawnContext 已更名为 ProcessContext,自 1.4 版本起。")
super().__init__(processes, error_files)
# Note: [start_processes]
# mp.start_processes 同时处理 start_method='spawn'和'fork'。它应该是一个
# 比 mp.spawn 更通用的 API。目前我们只记录 mp.spawn,因为它目前是
# 与 CUDA 兼容的 start_method。然而,在像 Ipython 笔记本这样的环境中,
# 比 'spawn' 工作得更好。我们为 mp.spawn 创建的每个辅助函数确实
# 足够通用,并且像 XLA 这样的后端可以在 Colab 笔记本中重用它们。
# 目前我们首先添加这个 API,我们可以考虑将来将其添加到文档中。
# 在未来可能需要。
def 启动进程(
函数,
参数=(),
nprocs=1,
连接=
是,
守护进程=
错误,
启动方法=
"创建",
):
# 在某些情况下加速性能(见 https://github.com/pytorch/pytorch/issues/133010),
# 此函数将在 start_method 为 'forkserver' 时并行启动进程。
请通过设置环境变量(TORCH_MP_PARALLEL_START)为 1 来选择此性能优化。
todo:调查为什么 spawn 与 threadpool 不兼容并引发 SIGINT
如果 (
启动方法 == "forkserver"
和 os.
环境.
获取(
环境变量并行启动, "0") ==
1
):
日志.
信息(
并行启动进程)
开始并行 =
真实
else:
将环境变量 TORCH_MP_PARALLEL_START 设置为 0 以禁用并行启动
开始并行 =
假
mp = 多进程.
获取上下文(
开始方法)
错误文件 = [
无] *
进程数
进程 = [
无] *
进程数
def 启动进程(i):
# 每个进程都分配一个文件来写入跟踪信息。
使用文件非空来表示异常
发生错误(而非预期的关闭)。注意:之前
使用了一个多进程的队列,但这可能会容易出错
死锁,所以我们选择了一个更简单的单次解决方案
进程间消息
tf = 临时文件.
命名临时文件(
前缀=
"pytorch 错误文件-",
后缀=".pickle",
删除=
假
)
tf.关闭()
os.解链(tf.
名称)
处理 = mp.
流程(
目标=
_包装,
参数=(
函数, i,
参数, tf.
名称),
守护进程=
守护进程,
)
进程.
开始()
返回 i,
进程, tf.
名称
如果
不
启动并行:
为 i
在
范围(
进程数):
索引,
进程,
tf 名称 =
启动流程(i)
错误文件[
索引] =
tf 名称
流程[
索引] =
流程
else:
与
线程池执行器(max_workers=
进程数)
是
执行器:
期货 = [
执行器.
提交(
启动流程, i)
为 i
在
范围(
进程数)]
为 fut
在
当所有进程完成时(
期货):
索引,
进程, tf_name = fut.
结果()
索引和进程排名需要相同。
错误文件[
索引] = tf_name
流程[
索引] =
流程
上下文 =
进程上下文(
流程,
错误文件)
如果
不
连接:
返回
上下文
# 循环直到返回 True 或引发异常。
当
不
上下文.
连接():
通过
[文档]def spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method="spawn"):
r"""启动 ``nprocs`` 个进程运行 ``fn`` 并传递 ``args``。
如果其中一个进程以非零退出状态退出,则
剩余进程被终止,并抛出异常,异常原因为终止原因。如果在子进程中捕获到异常,则将其转发,并在父进程中抛出的异常中包含其堆栈跟踪。
在子进程中捕获到异常的情况下,它将被转发,并在父进程中抛出的异常中包含其堆栈跟踪。
如果在子进程中捕获到异常,则将其转发,并在父进程中抛出的异常中包含其堆栈跟踪。
如果在子进程中捕获到异常,则将其转发,并在父进程中抛出的异常中包含其堆栈跟踪。
参数:
fn (函数): 函数作为入口点被调用
启动进程。此函数必须在顶部定义
模块的级别,以便它可以被序列化和实例化。
这是多进程强制要求的。
函数调用方式为 `fn(i, *args)`,其中 `i` 是
进程索引,`args` 是传递的参数元组。
的参数列表。
args (元组): 传递给 ``fn`` 的参数。
nprocs (整数): 要生成的进程数。
join (布尔值): 对所有进程执行阻塞式 join。
daemon (布尔值): 生成的进程的守护进程标志。如果设置为 True,
守护进程将被创建。
start_method (str): (已弃用) 此方法始终使用 `spawn`
作为启动方法。要使用不同的启动方法
使用 `start_processes()`。
返回值:
如果 `join` 为 `True`,则返回 `None`
如果 `join` 为 `False`,则返回 `:class:`~ProcessContext` 对象
"""
如果 start_method 不等于"spawn":
msg = (
f"此方法仅支持 start_method=spawn(获取:{start_method})。\n"
"要使用不同的 start_method,请使用:\n\t\t"
"torch.multiprocessing.start_processes(...)"
)
警告:msg,未来警告,堆栈级别=2
返回 start_processes(fn, args, nprocs, join, daemon, start_method="spawn")