快捷键

torch.utils.data._utils.worker 源代码

# mypy: 允许未类型化定义
r包含由 _BaseDataLoaderIter 工作者使用的方法的定义。

这些 **需求** 必须在全局范围内,因为 Py2 不支持序列化
静态方法。
""

导入 操作系统
导入 队列
导入 随机
来自 dataclasses 导入 数据类
来自 打字 导入 可选, 类型检查, 联合

导入 火炬
来自 torch._utils 导入 异常包装器

来自 . 导入 HAS_NUMPY, IS_WINDOWS, MP 状态检查间隔, 信号处理


如果 类型检查:
    来自 torch.utils.data 导入 数据集

如果 IS_WINDOWS:
    导入 ctypes
    来自 ctypes.wintypes 导入 BOOL, DWORD, HANDLE

    在 Windows 中,工作进程的父 ID 在管理进程保持不变时
    # 已消失,唯一通过操作系统检查它的方法就是让工作进程拥有进程句柄
    # 向管理员询问进程状态是否已改变。
     管理员看门狗:
        def 初始化(自身) -> :
            自身.manager_pid = 操作系统.getppid()

            # mypy 无法检测此代码仅适用于 Windows
            自身.核心库 32 = ctypes.WinDLL("kernel32", 使用最后错误=True)  # 类型: 忽略[attr-defined]
            自身.kernel32.OpenProcess.参数类型 = (DWORD, BOOL, DWORD)
            自身.kernel32.打开进程.restype = 处理
            自身.kernel32.等待单个对象.参数类型 = (处理, DWORD)
            自身.kernel32.WaitForSingleObject.restype = DWORD

            # 从 https://msdn.microsoft.com/en-us/library/ms684880.aspx 获取的值
            同步 = 1048576
            自身.管理器句柄 = 自身.kernel32.打开进程(
                同步, 0, 自身.管理进程 ID
            )

            如果 not 自身.管理员处理:
                抛出异常 ctypes.Win 错误(ctypes.获取最后错误())  # 类型: 忽略[attr-defined]

            自身.管理进程已死 = 

        def 是否存活(自身):
            如果 not 自身.管理进程已死:
                # 从 https://msdn.microsoft.com/en-us/library/windows/desktop/ms687032.aspx 获取的值
                自身.经理已故 = (
                    自身.kernel32.等待单个对象(自身.管理员处理, 0) == 0
                )
            返回 not 自身.经理已故

否则:

     管理员看门狗:  # 类型:忽略[重新定义]
        def 初始化(自身) -> :
            自身.管理员进程 ID = 操作系统.获取父进程 ID()
            自身.管理员已死 = 

        def 是否存活(自身):
            如果 not 自身.经理已死:
                自身.经理已死 = 操作系统.getppid() != 自身.manager_pid
            返回 not 自身.经理已故


_工人信息: 可选[工人信息] = None


 工作信息:
    id: 整型
    num_workers: 整型
    种子: 整型
    数据集: 数据集
    __已初始化 = 

    def 初始化(自身, **kwargs):
         k, v  kwargs.项目():
            setattr(自身, k, v)
        自身.__keys = 元组(kwargs.())
        自身.__已初始化 = 真实

    def __setattr__(自身, , val):
        如果 自身.已初始化:
            抛出异常 运行时错误(
                f"无法将属性分配给"{自身..__name__}对象"
            )
        返回 超级().__setattr__(, val)

    def __repr__(自身):
        项目 = [f"{k}={getattr(自身, k)}"  k  自身.__keys]
        返回 f"{自身..__name__}({“,”.加入(项目)})"


[文档]def get_worker_info() -> Optional[WorkerInfo]: 返回当前信息 `:class:`~torch.utils.data.DataLoader` 迭代器工作进程。 当在工作者中调用时,这返回一个保证具有的对象 以下属性: * :attr:`id`: 当前工作进程的 ID。 * :attr:`num_workers`: 工作进程总数。 * :attr:`seed`: 为当前工作进程设置的随机种子。此值是 由主进程的 RNG 和工作 ID 共同决定。详见 `:class:`~torch.utils.data.DataLoader``的文档以获取更多详细信息。 * :attr:`dataset`:此进程中的数据集对象的副本。请注意 这将在不同进程中是不同的对象 在主进程中。 当在主进程中调用时,此函数返回 `None`。 .. 注意:: 当用于传递给 `:attr:`worker_init_fn` 的函数时, `torch.utils.data.DataLoader`,这个方法可以用来设置每个工作进程不同,例如,使用`worker_id`来配置`dataset`对象,使其只读取分片数据集的一部分,或者使用`seed`来初始化其他在数据集中使用的库 设置每个工作进程不同,例如,使用`worker_id`来配置`dataset`对象,使其只读取分片数据集的一部分,或者使用`seed`来初始化其他在数据集中使用的库 使用`seed`来初始化其他在数据集中使用的库 使用`seed`来初始化其他在数据集中使用的库 代码。 """ 返回_worker_info
r用于表示 IterableDataset 结束的占位类。 @dataclass(冻结=True)
_可迭代数据集停止迭代: 工作器 ID: 整型 r“用于在启用工作器重用时恢复获取的示例类” @dataclass(冻结=True) _恢复迭代: 种子: 可选[int] = None # `_generate_state` 函数改编自 `numpy.random.SeedSequence` # 来自 https://github.com/numpy/numpy/blob/main/numpy/random/bit_generator.pyx # 它是 MIT 许可协议,以下是版权信息: # 版权所有 (c) 2015 梅丽莎·E·奥尼尔 # 版权所有 (c) 2019 NumPy 开发者 # # 以下是对任何获得本软件及其相关文档副本(以下简称“软件”)的人的授权,免费授权 # 任何人可以复制、使用、修改和分发软件 在不限制软件的情况下,包括但不限于其权利 # 使用、复制、修改、合并、发布、分发、再许可和/或出售 软件的副本,并允许软件的接收者 提供此服务,但须遵守以下条件: # 上述版权声明和本许可声明应包含在内 所有软件的副本或大部分内容。 # 软件按“原样”提供,不提供任何形式的保证,无论是明示的还是隐示的 所隐含的,包括但不限于商销性的保证, # 适用于特定目的和不侵犯版权。在任何情况下,均不得承担因侵权或适用特定目的而产生的任何责任。 # 作者或版权所有者应对任何索赔、损害或其他责任负责,无论该责任是基于合同、侵权或其他原因,无论该责任是否源于、因或与软件的使用或其他方式有关,或者与软件的任何其他使用有关。 # 不论该责任是否源于、因或与软件的使用或其他方式有关,或者与软件的任何其他使用有关。 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. 此函数生成一个 int32 类型的数组作为`numpy.random`的种子 以防止由于相同的种子和算法导致`numpy.random`和`random`模块的状态冲突 为`numpy.random`和`random`模块实现类似于`SeedSequence`的对象 TODO:为`torch.random`实现类似的对象 def 生成状态(基础种子, 工作器 ID): 初始化_A = 1135663077 多语言 = 2468251765 初始化_B = 2337405405 多倍数_B = 1492356589 混合乘_L = 3389127133 MIX_MULT_R = 1232336661 XSHIFT = 4 * 8 // 2 MASK32 = 4294967295 = [工作器 ID, 基础种子 & MASK32, 基础种子 >> 32, 0] 泳池 = [0] * 4 哈希常量_A = 初始化_A def 哈希(): 非局部 哈希常量_A value = (value ^ hash_const_A) & MASK32 hash_const_A = (hash_const_A * 多语言_A) & 面具 32 value = (value * 哈希常量_A) & 面具 32 value = (value ^ (value 空白 XSHIFT)) & MASK32 返回 value def 混合(x, y): 结果_x = (混合乘_L * x) & 面具 32 结果_y = (MIX_MULT_R * y) & MASK32 结果 = (result_x - result_y) & MASK32 结果 = (结果 ^ (结果 空格 X 位移)) & MASK32 返回 结果 向池中添加熵 i 范围(长度(泳池)): 泳池[i] = 哈希([i]) 将所有比特混合在一起,以便晚些时候的比特可以影响早些时候的比特 i_src 范围(长度(泳池)): i_dst 范围(长度(泳池)): 如果 != 目标: 泳池[目标] = 混合(泳池[目标] 哈希(泳池[])) 哈希常量 B = 初始化 B 状态 = [] i_dst 范围(4): data_val = 泳池[i_dst] data_val = (data_val ^ 哈希常量_B) & MASK32 哈希常量_B = (hash 常量 B * 多倍 B) & 马斯克 32 数据值 = (data_val * 哈希常量_B) & MASK32 data_val = (数据值 ^ (数据值 >> XSHIFT)) & MASK32 状态.追加(data_val) 返回 状态 def 工作循环( 数据集类型, 数据集, 索引队列, 数据队列, 完成事件, auto_collation, collate_fn, drop_last, 基础种子, 初始化函数, 工作器 ID, num_workers, 持久化工作者, shared_seed, ): # 请参阅[数据加载器多进程关闭逻辑]以获取详细信息。 # 此函数的逻辑。 try: # 初始化 C 端信号处理程序以处理 SIGBUS 和 SIGSEGV 信号。 # 模块的处理器在 Python 从 C 底层返回后执行 # 处理器,可能是在相同的致命信号已经发生的情况下 又一次。 # https://docs.python.org/3/library/signal.html#execution-of-python-signal-handlers 信号处理._set_worker_signal_handlers() 火炬.多进程.设置线程名称("pt 数据工作者") 火炬.设置线程数(1) 种子 = 基础种子 + 工作 ID 随机.种子(种子) 火炬.手动播种(种子) 如果 HAS_NUMPY: np 种子 = _生成状态(基础种子, 工作器 ID) 导入 numpy 作为 np numpy.随机.种子(随机种子) 来自 torch.utils.data 导入 迭代数据处理管道 来自 torch.utils.data.graph_settings 导入 应用随机种子 共享随机数生成器 = 火炬.生成器() 如果 isinstance(数据集, 迭代数据管道): 断言 shared_seed not None 共享随机数生成器.手动播种(shared_seed) 数据集 = 应用随机种子(数据集, 共享随机数生成器) 全局 _worker_info _worker_info = 工作信息( id=工作器 ID, num_workers=num_workers, 种子=种子, 数据集=数据集 ) 来自 torch.utils.data 导入 数据集类型 初始化异常 = None try: 如果 初始化函数 not : 初始化函数(工作器 ID) 拉取器 = _DatasetKind.创建获取器( 数据集类型, 数据集, auto_collation, collate_fn, drop_last ) 除了 异常: 初始化异常 = ExceptionWrapper( 哪里=f在 DataLoader 工作进程内{工作器 ID}" ) 当使用可迭代模式时,一些工作进程可能比其他工作进程提前退出。 这是因为可迭代数据集(IterableDataset)对不同工作进程的行为不同。 当发生这种情况时,会发送一个包含此工作进程 ID 的 `_IterableDatasetStopIteration` 对象到主进程。 以便主进程知道哪个工作进程已经停止迭代。 主进程不会向此工作者发送更多任务,并将发送 `None` 到此工作者以正确退出它。 # 注意,我们无法从工作者设置 `done_event`,因为它是在所有进程间共享的。 因此,我们设置 `iteration_end` 标志来 表示迭代器已耗尽。当设置`done_event`或`iteration_end`时 我们将跳过所有处理步骤,只等待`None`。 `None`。 iteration_end = 看门狗 = 管理看门狗() while 看门狗.是否存活(): try: r = 索引队列.获取(超时=MP 状态检查间隔) 除了 队列.空的: 继续 如果 isinstance(r, _恢复迭代): # 承认主进程 数据队列.放置((r, )) 迭代结束 = 如果 isinstance(数据集, 迭代数据管道): 断言 r.种子 not None 共享随机数生成器.手动播种(r.种子) 数据集 = 应用随机种子(数据集, 共享随机数生成器) 重新创建用于工作器重用策略的获取器 获取器 = _DatasetKind.创建获取器( 数据集类型, 数据集, auto_collation, collate_fn, drop_last ) 继续 elif r : 收到最终信号 断言 已完成事件.已设置() 迭代结束 断开 elif 已完成事件.已设置() 迭代结束: # `done_event` 已设置。但我还没有收到最终信号 # (None)尚未。我会继续等待,直到收到它,并跳过处理步骤。 # 处理步骤。 继续 索引, 索引 = r 数据: 联盟[_可迭代数据集停止迭代, ExceptionWrapper] 如果 init_exception not : 数据 = 初始化异常 初始化异常 = None 否则: try: 数据 = 检索器.获取(索引) # type: ignore[possibly-undefined] 除了 异常 作为 e: 如果 ( isinstance(e, StopIteration 异常) 数据集类型 == _DatasetKind.迭代器 ): 数据 = _可迭代数据集停止迭代(工作器 ID) # 设置 `iteration_end` # (1) 保存未来的 `next(...)` 调用,和 避免发送多个 `_IterableDatasetStopIteration`。 迭代结束 = 真实 否则: 我们不应该将 exc_info 存储在变量中。 `ExceptionWrapper` 会正确处理。 # 查看 [Python 跟踪回环问题] 数据 = ExceptionWrapper( 哪里=f在 DataLoader 工作进程{工作器 ID}" ) 数据队列.放置((索引, 数据)) 删除 数据, 索引, 索引, r # 保存内存 除了 KeyboardInterrupt: # 主进程无论如何都会引发 KeyboardInterrupt。 通过 如果 done_event.已设置(): data_queue.取消加入线程() 数据队列.关闭()

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源