torch.utils.data._utils.worker 源代码
# mypy: 允许未类型化定义
r包含由 _BaseDataLoaderIter 工作者使用的方法的定义。
这些 **需求** 必须在全局范围内,因为 Py2 不支持序列化
静态方法。
""
导入
操作系统
导入
队列
导入
随机
来自 dataclasses
导入
数据类
来自
打字
导入
可选,
类型检查,
联合
导入
火炬
来自 torch._utils
导入
异常包装器
来自 .
导入 HAS_NUMPY, IS_WINDOWS,
MP 状态检查间隔,
信号处理
如果
类型检查:
来自 torch.utils.data
导入
数据集
如果 IS_WINDOWS:
导入 ctypes
来自 ctypes.wintypes
导入 BOOL, DWORD, HANDLE
在 Windows 中,工作进程的父 ID 在管理进程保持不变时
# 已消失,唯一通过操作系统检查它的方法就是让工作进程拥有进程句柄
# 向管理员询问进程状态是否已改变。
类
管理员看门狗:
def 初始化(
自身) ->
无:
自身.manager_pid =
操作系统.getppid()
# mypy 无法检测此代码仅适用于 Windows
自身.
核心库 32 = ctypes.WinDLL("kernel32",
使用最后错误=True)
# 类型: 忽略[attr-defined]
自身.kernel32.OpenProcess.
参数类型 = (DWORD, BOOL, DWORD)
自身.kernel32.
打开进程.restype =
处理
自身.kernel32.
等待单个对象.
参数类型 = (
处理, DWORD)
自身.kernel32.WaitForSingleObject.restype = DWORD
# 从 https://msdn.microsoft.com/en-us/library/ms684880.aspx 获取的值
同步 =
1048576
自身.
管理器句柄 =
自身.kernel32.
打开进程(
同步, 0,
自身.
管理进程 ID
)
如果 not
自身.
管理员处理:
抛出异常 ctypes.
Win 错误(ctypes.
获取最后错误())
# 类型: 忽略[attr-defined]
自身.
管理进程已死 =
假
def 是否存活(
自身):
如果 not
自身.
管理进程已死:
# 从 https://msdn.microsoft.com/en-us/library/windows/desktop/ms687032.aspx 获取的值
自身.
经理已故 = (
自身.kernel32.
等待单个对象(
自身.
管理员处理, 0) == 0
)
返回 not
自身.
经理已故
否则:
类
管理员看门狗:
# 类型:忽略[重新定义]
def 初始化(
自身) ->
无:
自身.
管理员进程 ID =
操作系统.
获取父进程 ID()
自身.
管理员已死 =
假
def 是否存活(
自身):
如果 not
自身.
经理已死:
自身.
经理已死 =
操作系统.getppid() !=
自身.manager_pid
返回 not
自身.
经理已故
_工人信息:
可选[
工人信息] = None
类
工作信息:
id: 整型
num_workers: 整型
种子:
整型
数据集:
数据集
__已初始化 =
假
def 初始化(
自身, **kwargs):
为 k, v
在 kwargs.
项目():
setattr(自身, k, v)
自身.__keys =
元组(kwargs.
键())
自身.
__已初始化 =
真实
def __setattr__(自身,
键, val):
如果
自身.
已初始化:
抛出异常
运行时错误(
f"无法将属性分配给"{
自身.
类.__name__}
对象"
)
返回
超级().__setattr__(
键, val)
def __repr__(自身):
项目 = [f"{k}={getattr(
自身, k)}"
为 k
在
自身.__keys]
返回 f"{
自身.
类.__name__}({
“,”.
加入(
项目)})"
[文档]def get_worker_info() -> Optional[WorkerInfo]:
返回当前信息
`:class:`~torch.utils.data.DataLoader` 迭代器工作进程。
当在工作者中调用时,这返回一个保证具有的对象
以下属性:
* :attr:`id`: 当前工作进程的 ID。
* :attr:`num_workers`: 工作进程总数。
* :attr:`seed`: 为当前工作进程设置的随机种子。此值是
由主进程的 RNG 和工作 ID 共同决定。详见
`:class:`~torch.utils.data.DataLoader``的文档以获取更多详细信息。
* :attr:`dataset`:此进程中的数据集对象的副本。请注意
这将在不同进程中是不同的对象
在主进程中。
当在主进程中调用时,此函数返回 `None`。
.. 注意::
当用于传递给 `:attr:`worker_init_fn` 的函数时,
`torch.utils.data.DataLoader`,这个方法可以用来设置每个工作进程不同,例如,使用`worker_id`来配置`dataset`对象,使其只读取分片数据集的一部分,或者使用`seed`来初始化其他在数据集中使用的库
设置每个工作进程不同,例如,使用`worker_id`来配置`dataset`对象,使其只读取分片数据集的一部分,或者使用`seed`来初始化其他在数据集中使用的库
使用`seed`来初始化其他在数据集中使用的库
使用`seed`来初始化其他在数据集中使用的库
代码。
"""
返回_worker_info
r用于表示 IterableDataset 结束的占位类。
@dataclass(冻结=True)
类
_可迭代数据集停止迭代:
工作器 ID:
整型
r“用于在启用工作器重用时恢复获取的示例类”
@dataclass(冻结=True)
类
_恢复迭代:
种子:
可选[int] = None
# `_generate_state` 函数改编自 `numpy.random.SeedSequence`
# 来自 https://github.com/numpy/numpy/blob/main/numpy/random/bit_generator.pyx
# 它是 MIT 许可协议,以下是版权信息:
# 版权所有 (c) 2015 梅丽莎·E·奥尼尔
# 版权所有 (c) 2019 NumPy 开发者
#
# 以下是对任何获得本软件及其相关文档副本(以下简称“软件”)的人的授权,免费授权
# 任何人可以复制、使用、修改和分发软件
在不限制软件的情况下,包括但不限于其权利
# 使用、复制、修改、合并、发布、分发、再许可和/或出售
软件的副本,并允许软件的接收者
提供此服务,但须遵守以下条件:
#
上述版权声明和本许可声明应包含在内
所有软件的副本或大部分内容。
#
软件按“原样”提供,不提供任何形式的保证,无论是明示的还是隐示的
所隐含的,包括但不限于商销性的保证,
# 适用于特定目的和不侵犯版权。在任何情况下,均不得承担因侵权或适用特定目的而产生的任何责任。
# 作者或版权所有者应对任何索赔、损害或其他责任负责,无论该责任是基于合同、侵权或其他原因,无论该责任是否源于、因或与软件的使用或其他方式有关,或者与软件的任何其他使用有关。
# 不论该责任是否源于、因或与软件的使用或其他方式有关,或者与软件的任何其他使用有关。
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
此函数生成一个 int32 类型的数组作为`numpy.random`的种子
以防止由于相同的种子和算法导致`numpy.random`和`random`模块的状态冲突
为`numpy.random`和`random`模块实现类似于`SeedSequence`的对象
TODO:为`torch.random`实现类似的对象
def 生成状态(
基础种子,
工作器 ID):
初始化_A =
1135663077
多语言 =
2468251765
初始化_B =
2337405405
多倍数_B =
1492356589
混合乘_L =
3389127133
MIX_MULT_R = 1232336661
XSHIFT = 4 * 8 // 2
MASK32 = 4294967295
熵 = [
工作器 ID,
基础种子 & MASK32,
基础种子 >> 32, 0]
泳池 = [0] * 4
哈希常量_A =
初始化_A
def 哈希(
值):
非局部
哈希常量_A
value = (value ^ hash_const_A) & MASK32
hash_const_A = (hash_const_A * 多语言_A) &
面具 32
value = (value * 哈希常量_A) &
面具 32
value = (value ^ (value 空白 XSHIFT)) & MASK32
返回 value
def 混合(x, y):
结果_x = (
混合乘_L * x) &
面具 32
结果_y = (MIX_MULT_R * y) & MASK32
结果 = (result_x - result_y) & MASK32
结果 = (
结果 ^ (
结果
空格
X 位移)) & MASK32
返回
结果
向池中添加熵
为 i
在
范围(
长度(
泳池)):
泳池[i] =
哈希(
熵[i])
将所有比特混合在一起,以便晚些时候的比特可以影响早些时候的比特
为 i_src
在
范围(
长度(
泳池)):
为 i_dst
在
范围(
长度(
泳池)):
如果
源 !=
目标:
泳池[
目标] =
混合(
泳池[
目标
]
哈希(
泳池[
源]))
哈希常量 B =
初始化 B
状态 = []
为 i_dst
在
范围(4):
data_val = 泳池[i_dst]
data_val = (data_val ^ 哈希常量_B) & MASK32
哈希常量_B = (
hash 常量 B *
多倍 B) &
马斯克 32
数据值 = (data_val *
哈希常量_B) & MASK32
data_val = (数据值 ^ (
数据值 >> XSHIFT)) & MASK32
状态.
追加(data_val)
返回
状态
def 工作循环(
数据集类型,
数据集,
索引队列,
数据队列,
完成事件,
auto_collation,
collate_fn,
drop_last,
基础种子,
初始化函数,
工作器 ID,
num_workers,
持久化工作者,
shared_seed,
):
# 请参阅[数据加载器多进程关闭逻辑]以获取详细信息。
# 此函数的逻辑。
try:
# 初始化 C 端信号处理程序以处理 SIGBUS 和 SIGSEGV 信号。
# 模块的处理器在 Python 从 C 底层返回后执行
# 处理器,可能是在相同的致命信号已经发生的情况下
又一次。
# https://docs.python.org/3/library/signal.html#execution-of-python-signal-handlers
信号处理._set_worker_signal_handlers()
火炬.
多进程.
设置线程名称(
"pt 数据工作者")
火炬.
设置线程数(1)
种子 =
基础种子 +
工作 ID
随机.
种子(
种子)
火炬.
手动播种(
种子)
如果 HAS_NUMPY:
np 种子 =
_生成状态(
基础种子,
工作器 ID)
导入 numpy
作为 np
numpy.
随机.
种子(
随机种子)
来自 torch.utils.data
导入
迭代数据处理管道
来自 torch.utils.data.graph_settings
导入
应用随机种子
共享随机数生成器 =
火炬.
生成器()
如果 isinstance(
数据集,
迭代数据管道):
断言 shared_seed
是 not None
共享随机数生成器.
手动播种(shared_seed)
数据集 =
应用随机种子(
数据集,
共享随机数生成器)
全局 _worker_info
_worker_info = 工作信息(
id=工作器 ID, num_workers=num_workers,
种子=
种子,
数据集=
数据集
)
来自 torch.utils.data
导入
数据集类型
初始化异常 = None
try:
如果
初始化函数
是 not
无:
初始化函数(
工作器 ID)
拉取器 = _DatasetKind.
创建获取器(
数据集类型,
数据集, auto_collation, collate_fn, drop_last
)
除了
异常:
初始化异常 = ExceptionWrapper(
哪里=f
在 DataLoader 工作进程内{
工作器 ID}"
)
当使用可迭代模式时,一些工作进程可能比其他工作进程提前退出。
这是因为可迭代数据集(IterableDataset)对不同工作进程的行为不同。
当发生这种情况时,会发送一个包含此工作进程 ID 的 `_IterableDatasetStopIteration` 对象到主进程。
以便主进程知道哪个工作进程已经停止迭代。
主进程不会向此工作者发送更多任务,并将发送
`None` 到此工作者以正确退出它。
#
注意,我们无法从工作者设置 `done_event`,因为它是在所有进程间共享的。
因此,我们设置 `iteration_end` 标志来
表示迭代器已耗尽。当设置`done_event`或`iteration_end`时
我们将跳过所有处理步骤,只等待`None`。
`None`。
iteration_end = 假
看门狗 =
管理看门狗()
while 看门狗.
是否存活():
try:
r = 索引队列.
获取(
超时=
MP 状态检查间隔)
除了
队列.
空的:
继续
如果 isinstance(r,
_恢复迭代):
# 承认主进程
数据队列.
放置((r,
无))
迭代结束 =
假
如果 isinstance(
数据集,
迭代数据管道):
断言 r.
种子
是 not None
共享随机数生成器.
手动播种(r.
种子)
数据集 =
应用随机种子(
数据集,
共享随机数生成器)
重新创建用于工作器重用策略的获取器
获取器 = _DatasetKind.
创建获取器(
数据集类型,
数据集, auto_collation, collate_fn, drop_last
)
继续
elif r 是
无:
收到最终信号
断言
已完成事件.
已设置()
或
迭代结束
断开
elif 已完成事件.
已设置()
或
迭代结束:
# `done_event` 已设置。但我还没有收到最终信号
# (None)尚未。我会继续等待,直到收到它,并跳过处理步骤。
# 处理步骤。
继续
索引,
索引 = r
数据:
联盟[
_可迭代数据集停止迭代, ExceptionWrapper]
如果 init_exception
是 not
无:
数据 =
初始化异常
初始化异常 = None
否则:
try:
数据 =
检索器.
获取(
索引) # type: ignore[possibly-undefined]
除了
异常
作为 e:
如果 (
isinstance(e, StopIteration 异常)
和
数据集类型 == _DatasetKind.
迭代器
):
数据 =
_可迭代数据集停止迭代(
工作器 ID)
# 设置 `iteration_end`
# (1) 保存未来的 `next(...)` 调用,和
避免发送多个 `_IterableDatasetStopIteration`。
迭代结束 =
真实
否则:
我们不应该将 exc_info 存储在变量中。
`ExceptionWrapper` 会正确处理。
# 查看 [Python 跟踪回环问题]
数据 = ExceptionWrapper(
哪里=f
在 DataLoader 工作进程{
工作器 ID}"
)
数据队列.
放置((
索引,
数据))
删除
数据,
索引,
索引, r
# 保存内存
除了 KeyboardInterrupt:
# 主进程无论如何都会引发 KeyboardInterrupt。
通过
如果 done_event.
已设置():
data_queue.取消加入线程()
数据队列.
关闭()