快捷键

torch.utils.data.dataset 的源代码

# mypy: 允许未类型化定义
导入 二分查找
导入 itertools
导入 数学
导入 警告
来自 collections.abc 导入 序列

# UP006 希望从 collections.abc 导入'Iterable',但因为它需要
# 由于向后兼容性问题暂时保留从 typing 导入。特别是几个内部
# 目标无法通过以下方式类型检查:
无法创建一个一致的 MRO(方法解析顺序)
基类 Iterable, 泛型
来自 打字 导入 角色, 通用, 迭代器, 可选, 类型变量, 联合  # noqa: UP035
来自 typing_extensions 导入 已弃用

torch/__init__.pyi 中没有 'default_generator'
来自 火炬 导入 默认生成器, 生成器, 随机排列, 张量


__all__ = [
    数据集,
    可迭代数据集,
    张量数据集,
    堆叠数据集,
    "ConcatDataset",
    "ChainDataset",
    "Subset",
    "random_split",
]


_T = 类型变量("_T")
_T_co = 类型变量(_T_co, 协变=True)
_T_dict = 字典[字符串, _T_co]
_T_tuple = 元组[_T_co, ...]
_T_stack = 类型变量("_T_stack", _T_tuple, _T_dict)


[文档]class Dataset(Generic[_T_co]): r"""一个表示 :class:`Dataset` 的抽象类。""" 所有表示从键到数据样本映射的数据集都应该子类化。 所有子类都应该重写 :meth:`__getitem__` 方法,支持根据给定的键获取数据样本。子类也可以选择性地重写 :meth:`__len__` 方法,该方法通常期望返回数据集的大小。 子类还可以选择性地重写 :meth:`__len__` 方法,该方法通常期望返回数据集的大小。 子类还可以选择性地重写 :meth:`__len__` 方法,该方法通常期望返回数据集的大小。 PyTorch 的`:class:`~torch.utils.data.Sampler`实现和默认选项 class:`~torch.utils.data.DataLoader`的子类也可以 可选地实现`:meth:`__getitems__`,以加快批量样本加载的速度 此方法接受样本批次的索引列表,并返回 样本列表。 .. 注意:: 默认情况下,`torch.utils.data.DataLoader` 构建一个索引 采样器,它产生整数索引。为了使其能够与映射式 数据集具有非整数索引/键,必须提供自定义采样器。 """ def __getitem__(self, index) -> _T_co: raise NotImplementedError("Dataset 的子类应实现__getitem__。") # def __getitems__(self, indices: List) -> List[_T_co]: # 未实现,以防止在 fetcher 检查中产生假阳性 # torch.utils.data._utils.fetch._MapDatasetFetcher def __add__(self, other: "Dataset[_T_co]") -> "ConcatDataset[_T_co]": 返回 ConcatDataset([self, other])
没有 `def __len__(self)` 默认? 查看 NOTE [Python 抽象基类中缺少默认的`__len__`] 在 pytorch/torch/utils/data/sampler.py 中
[文档] 可迭代数据集(数据集[_T_co] 迭代器[_T_co)] r可迭代的数据集。 所有表示数据样本可迭代的集合都应该继承它。 当数据来自流时,这种形式的集合尤其有用。 所有子类都应该重写 :meth:`__iter__` 方法,该方法将返回一个 样本集的样本迭代器。 当使用 :class:`~torch.utils.data.DataLoader` 与子类一起使用时,每个 数据集中的项目将通过`:class:`~torch.utils.data.DataLoader`生成 迭代器。当 :attr:`num_workers > 0` 时,每个工作进程将拥有 不同数据集对象的副本,因此通常希望配置 每个副本独立进行,以避免返回重复数据 workers. :func:`~torch.utils.data.get_worker_info`,当在工作者中调用时 进程,返回有关工作者的信息。它可以在任一上下文中使用。 数据集的 :meth:`__iter__` 方法或 :class:`~torch.utils.data.DataLoader` 的 attr:`worker_init_fn` 选项来修改每个副本的行为。 示例 1:在 :meth:`__iter__` 中分配所有工作给所有工作进程: >>> # xdoctest: +REQUIRES(env:TORCH_DOCTEST_DATALOADER) >>> # xdoctest: +SKIP("在 MacOS12 上失败") >>> class MyIterableDataset(torch.utils.data.IterableDataset): ... def __init__(self, start, end): ... super(MyIterableDataset).__init__() ... 断言 end > start,"此示例代码仅适用于 end >= start" ... self.start = start ... self.end = end ... ... def __iter__(self): ... worker_info = torch.utils.data.get_worker_info() ... if worker_info is None: # 单进程数据加载,返回完整迭代器 ... iter_start = self.start ... iter_end = self.end ... else: # 在工作进程 ... # 分割工作负载 ... 每个工作器的数量 = int(math.ceil((self.end - self.start) / float(worker_info.num_workers))) ... 工作器 ID = worker_info.id ... 迭代开始位置 = self.start + worker_id * 每个 worker 的数量 ... 迭代结束位置 = min(迭代开始位置 + 每个 worker 的数量, self.end) ... 返回 iter(range(迭代开始位置, 迭代结束位置)) ... >>> # 应该给出与 range(3, 7) 相同的数据集,即 [3, 4, 5, 6]. >>> ds = MyIterableDataset(start=3, end=7) >>> # 单进程加载 >>> print(list(torch.utils.data.DataLoader(ds, num_workers=0))) [tensor([3]), tensor([4]), tensor([5]), tensor([6])] >>> # xdoctest: +REQUIRES(POSIX) >>> # 多进程加载,使用两个工作进程 >>> # 工作进程 0 获取了[3, 4]。工作进程 1 获取了[5, 6]。 >>> # xdoctest: +IGNORE_WANT("非确定性") >>> print(list(torch.utils.data.DataLoader(ds, num_workers=2))) [tensor([3]), tensor([5]), tensor([4]), tensor([6])] >>> # 使用更多的工作者 >>> # xdoctest: +IGNORE_WANT("非确定性") >>> print(list(torch.utils.data.DataLoader(ds, num_workers=12))) [tensor([3]), tensor([5]), tensor([4]), tensor([6])] 示例 2:使用 :attr:`worker_init_fn` 在所有工作者之间分配工作负载:: >>> # xdoctest: +REQUIRES(env:TORCH_DOCTEST_DATALOADER) >>> class MyIterableDataset(torch.utils.data.IterableDataset): ... def __init__(self, start, end): ... super(MyIterableDataset).__init__() ... assert end >= start, "此示例代码仅适用于 end >= start" ... self.start = start ... self.end = end ... ... def __iter__(self): ... return iter(range(self.start, self.end)) ... >>> # 应该给出与 range(3, 7) 相同的数据集,即 [3, 4, 5, 6]. >>> ds = MyIterableDataset(start=3, end=7) >>> # 单进程加载 >>> print(list(torch.utils.data.DataLoader(ds, num_workers=0))) [3, 4, 5, 6] ... >>> 直接进行多进程加载会导致数据重复 >>> 打印(list(torch.utils.data.DataLoader(ds, num_workers=2))) [3, 3, 4, 4, 5, 5, 6, 6] >>> 定义一个 `worker_init_fn`,该函数配置每个数据集副本的方式不同 >>> def worker_init_fn(worker_id): ... worker_info = torch.utils.data.get_worker_info() ... dataset = worker_info.dataset # the dataset copy in this worker process ... overall_start = dataset.start ... overall_end = dataset.end ... # configure the dataset to only process the split workload ... per_worker = int(math.ceil((overall_end - overall_start) / float(worker_info.num_workers))) ... worker_id = worker_info.id ... dataset.start = overall_start + worker_id * per_worker ... dataset.end = min(dataset.start + per_worker, overall_end) ... >>> # 多进程加载使用自定义的 `worker_init_fn` >>> # 工作器 0 获取了[3, 4]。工作器 1 获取了[5, 6]。 >>> print(list(torch.utils.data.DataLoader(ds, num_workers=2, worker_init_fn=worker_init_fn))) [3, 5, 4, 6] >>> # 使用更多的工作者 >>> print(list(torch.utils.data.DataLoader(ds, num_workers=12, worker_init_fn=worker_init_fn))) [3, 4, 5, 6] "源代码" def __add__(self, 其他: 数据集[_T_co)] 返回 链式数据集([self, 其他])
# 默认没有 `def __len__(self)`?当需要时,子类会引发 `TypeError`。 # 查看 NOTE [Python 抽象基类中缺少默认的 `__len__`]
[文档]class TensorDataset(Dataset[tuple[Tensor, ...]]): r"""封装张量的数据集。 每个样本将通过沿第一个维度索引张量来检索。 Args: 索引张量(Tensor):具有相同第一维大小的张量。 """ tensors: (Tensor, ...) 元组 def __init__(self, *tensors: Tensor) -> None: assert all( tensors[0].size(0) == tensor.size(0) for tensor in tensors ), "张量大小不匹配" self.tensors = tensors def __getitem__(self, index): return tuple(tensor[index] for tensor in self.tensors) def __len__(self): return self.tensors[0].size(0)
[文档] 栈数据集(数据集[_T_stack)] r数据集作为多个数据集的堆叠。 此类用于组装复杂输入数据的各个部分,输入数据以数据集的形式给出。 示例: >>> # xdoctest: +SKIP >>> images = ImageDataset() >>> texts = TextDataset() >>> tuple_stack = StackDataset(images, texts) >>> tuple_stack[0] == (images[0], texts[0]) >>> dict_stack = StackDataset(image=images, text=texts) >>> dict_stack[0] == {'image': images[0], 'text': texts[0]} 参数: *args (Dataset): Datasets for stacking returned as tuple. **kwargs (Dataset): Datasets for stacking returned as dict. "源代码" 数据集: 联盟[元组, 字典] def 初始化(self, *参数: 数据集[_T_co] **kwargs: 数据集[_T_co]) -> : 如果 参数: 如果 kwargs: 提升 ValueError( 支持“元组”(通过“args”)或 `dict`- (通过 `kwargs`) 类似于输入/输出,但两种类型都给出。 ) self.长度 = 长度(参数[0]) # type: ignore[arg-type] 如果 任何(self.长度 != 长度(数据集) 数据集 参数): # type: ignore[arg-type] 提升 ValueError("数据集大小不匹配") self.数据集 = args elif kwargs: 临时 = 列表(kwargs.()) self.长度 = 长度(tmp[0]) # type: ignore[arg-type] 如果 任何(self.长度 != 长度(数据集) 数据集 tmp): # type: ignore[arg-type] 提升 ValueError("数据集之间大小不匹配") self.数据集 = kwargs 否则: 提升 ValueError("至少应传递一个数据集") def __getitem__(self, 索引): 如果 isinstance(self.数据集, 字典): 返回 {k: 数据集[索引] k, 数据集 self.数据集.项目()} 返回 元组(数据集[索引] 数据集 self.数据集) def __getitems__(self, 索引: 列表): 当父数据集支持时添加批量采样支持。 如果 isinstance(self.数据集, 字典): dict_batch: 列表[_T_dict] = [{}] _ 索引] k, 数据集 self.数据集.项目(): 如果 可调用(getattr(数据集, "__getitems__", )): 项目 = 数据集.__getitems__(索引) # 类型: 忽略[attr-defined] 如果 长度(项目) != 长度(索引): 提升 ValueError( "嵌套数据集的输出大小不匹配。" f预期{长度(索引)},获得{长度(项目)}" ) 数据, 样本 压缩(项目, 字典批量): 样本 d[k] = 数据 否则: 索引, 样本 d 压缩(索引, 字典批量): 样本[k] = 数据集[索引] 返回 批量字典 # 元组数据 批量列表: 列表[列表] = 空列表 _ 索引] 数据集 self.数据集: 如果 可调用(getattr(数据集, "__getitems__", )): 项目 = 数据集.__getitems__(索引) # 类型: 忽略[attr-defined] 如果 长度(项目) != 长度(索引): 提升 ValueError( "嵌套数据集的输出大小不匹配。" f预期{长度(索引)},获得{长度(项目)}" ) 数据, 样本_t 压缩(项目, 列批量): 样本_t.追加(数据) 否则: 索引, 样本 压缩(索引, 批处理列表): 样本.追加(数据集[索引]) 批处理元组: 列表[_T 元组] = [元组(样例) 样本 列批量] 返回 元组批次 def __len__(self): 返回 self._长度
[文档] 连接数据集(数据集[_T_co)] r“数据集作为多个数据集的连接。” 这个类用于组装不同的现有数据集。 参数: 数据集(序列):要连接的数据集列表 "源代码" 数据集: 列表[数据集[_T_co]] 累计大小: 列表[int] @staticmethod def 累加和(sequence): r, s = [] 0 e sequence: l = 长度(e) r.追加(l + s) s += l 返回 r def 初始化(self, 数据集: 迭代器[数据集]) -> : 超级().初始化() self.数据集 = 列表(数据集) 断言 长度(self.数据集) > 0, "数据集不应为空的可迭代对象" # type: ignore[arg-type] d self.数据集: 断言 not isinstance( d, 可迭代数据集 ), "ConcatDataset 不支持 IterableDataset" self.累积大小 = self.累加和(self.数据集) def __len__(self): 返回 self.累计大小[-1] def __getitem__(self, 索引): 如果 索引 < 0: 如果 -索引 > 长度(self): 提升 ValueError( 索引的绝对值不应超过数据集长度 ) 索引 = 长度(self) + 索引 数据集索引 = 二分查找.二分查找右边界(self.累计大小, 索引) 如果 数据集索引 == 0: 样本索引 = 索引 否则: 样本索引 = 索引 - self.累计大小[数据集索引 - 1] 返回 self.数据集[数据集索引] [样本索引] @property @deprecated( 将 `cummulative_sizes` 属性重命名为 `cumulative_sizes`, 分类=未来警告, ) def cummulative_sizes(self): 返回 self.累积大小
[文档]类 ChainDataset(IterableDataset): r"""用于链式多个 :class:`IterableDataset` 的数据集。 此类可用于组装不同的现有数据集流。The 链接操作即时完成,因此使用此类连接大规模数据集将非常高效。 使用此类连接数据集将非常高效。 Args: datasets (可迭代的 IterableDataset 迭代器): 要连接的数据集。 """ def __init__(self, 数据集: Iterable[数据集]) -> None: super().__init__() self.datasets = 数据集 def __iter__(self): for d in self.datasets: assert isinstance( d, IterableDataset “仅支持 IterableDataset” yield from d def __len__(self): total = 0 for d in self.datasets: assert isinstance( d, IterableDataset ), "ChainDataset 仅支持 IterableDataset" total += len(d) # 忽略参数类型 返回 total
[文档]class Subset(Dataset[_T_co]): r""" 数据集的指定索引子集 Args: 数据集(Dataset):整个数据集 索引(序列):在整体集中选定的索引 """ """ dataset: 数据集[_T_co] indices: 整数序列[int] def __init__(self, dataset: 数据集[_T_co], indices: 整数序列[int]) -> None: self.dataset = 数据集 self.indices = 索引 def __getitem__(self, idx): if isinstance(idx, 列表): return self.dataset[[self.indices[i] for i in idx]] return self.dataset[self.indices[idx]] def __getitems__(self, indices: list[int]) -> list[_T_co]: # add batched sampling support when parent dataset supports it. # see torch.utils.data._utils.fetch._MapDatasetFetcher 如果是可调用的 getattr(self.dataset, "__getitems__", None): return self.dataset.__getitems__([self.indices[idx] for idx in indices]) # type: ignore[attr-defined] else: return [self.dataset[self.indices[idx]] for idx in indices] def __len__(self): return len(self.indices)
[文档]def 随机分割( 数据集: 数据集[_T] 长度: 序列[联盟[int, float]], 生成器: 可选[生成器] = 默认生成器, ) -> 列表[子集[_T]] r"" 将数据集随机分割成给定长度的非重叠新数据集。 如果给出一个总和为 1 的分数列表, 长度将自动计算 floor(frac * len(dataset)) 对于每个提供的分数。 计算长度后,如果有余数,则计数为 1 分布式轮询方式分配到长度 直至没有余数为止。 可选地修复生成器以获得可重复的结果,例如: 示例: >>> # xdoctest: +SKIP >>> generator1 = torch.Generator().manual_seed(42) >>> generator2 = torch.Generator().manual_seed(42) >>> random_split(range(10), [3, 7], generator=generator1) >>> random_split(range(30), [0.3, 0.3, 0.4], generator=generator2) 参数: 数据集(Dataset):要分割的数据集 长度(sequence):生成的分割长度或分割比例 生成器(生成器):用于随机排列的生成器。 "源代码" 如果 数学.isclose(总和(长度), 1) 总和(长度) 1: 子集长度: 列表[int] = [] i, 比率 列举(长度): 如果 比率 < 0 分数 > 1: 提升 ValueError(f"索引处的分数"{i}不在 0 和 1 之间) n_items_in_split = int( 数学.向下取整(长度(数据集) * 分数部分) # type: ignore[arg-type] ) 子集长度.追加(每个分割中的项目数) 余数 = 长度(数据集) - 总和(子集长度) # type: ignore[arg-type] 以轮询方式将 1 加到所有长度上,直到余数为 0 i 范围(剩余): idx_to_add_at = i % 长度(subset_lengths) subset_lengths[idx_to_add_at] += 1 lengths = 子集长度 i, 长度 列举(长度): 如果 长度 == 0: warnings.警告( f"在索引处分割的长度"{i}可能是 0。 f"这可能会导致数据集为空。" ) 无法验证数据集是否已分页 如果 总和(长度) != 长度(数据集): # type: ignore[arg-type] 提升 ValueError( 输入长度之和不等于输入数据集的长度! ) 索引 = 随机排列(总和(长度), 生成器=生成器).转列表() # 类型:忽略[参数类型,调用重载] lengths = 角色(序列[int] 长度) 返回 [ 子集(数据集, 索引[偏移 - 长度 : 偏移量]) 偏移量, 长度 压缩(itertools.累加(长度), 长度) ]

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源