# mypy: 允许未类型化定义
导入
二分查找
导入 itertools
导入
数学
导入
警告
来自 collections.abc
导入
序列
# UP006 希望从 collections.abc 导入'Iterable',但因为它需要
# 由于向后兼容性问题暂时保留从 typing 导入。特别是几个内部
# 目标无法通过以下方式类型检查:
无法创建一个一致的 MRO(方法解析顺序)
基类 Iterable, 泛型
来自
打字
导入
角色,
通用,
迭代器,
可选,
类型变量,
联合 # noqa: UP035
来自 typing_extensions
导入
已弃用
torch/__init__.pyi 中没有 'default_generator'
来自
火炬
导入
默认生成器,
生成器,
随机排列,
张量
__all__ = [
数据集,
可迭代数据集,
张量数据集,
堆叠数据集,
"ConcatDataset",
"ChainDataset",
"Subset",
"random_split",
]
_T = 类型变量("_T")
_T_co = 类型变量(
_T_co,
协变=True)
_T_dict = 字典[
字符串, _T_co]
_T_tuple = 元组[_T_co, ...]
_T_stack = 类型变量("_T_stack", _T_tuple, _T_dict)
[文档]class Dataset(Generic[_T_co]):
r"""一个表示 :class:`Dataset` 的抽象类。"""
所有表示从键到数据样本映射的数据集都应该子类化。
所有子类都应该重写 :meth:`__getitem__` 方法,支持根据给定的键获取数据样本。子类也可以选择性地重写 :meth:`__len__` 方法,该方法通常期望返回数据集的大小。
子类还可以选择性地重写 :meth:`__len__` 方法,该方法通常期望返回数据集的大小。
子类还可以选择性地重写 :meth:`__len__` 方法,该方法通常期望返回数据集的大小。
PyTorch 的`:class:`~torch.utils.data.Sampler`实现和默认选项
class:`~torch.utils.data.DataLoader`的子类也可以
可选地实现`:meth:`__getitems__`,以加快批量样本加载的速度
此方法接受样本批次的索引列表,并返回
样本列表。
.. 注意::
默认情况下,`torch.utils.data.DataLoader` 构建一个索引
采样器,它产生整数索引。为了使其能够与映射式
数据集具有非整数索引/键,必须提供自定义采样器。
"""
def __getitem__(self, index) -> _T_co:
raise NotImplementedError("Dataset 的子类应实现__getitem__。")
# def __getitems__(self, indices: List) -> List[_T_co]:
# 未实现,以防止在 fetcher 检查中产生假阳性
# torch.utils.data._utils.fetch._MapDatasetFetcher
def __add__(self, other: "Dataset[_T_co]") -> "ConcatDataset[_T_co]":
返回 ConcatDataset([self, other])
没有 `def __len__(self)` 默认?
查看 NOTE [Python 抽象基类中缺少默认的`__len__`]
在 pytorch/torch/utils/data/sampler.py 中
[文档]
类
可迭代数据集(
数据集[_T_co
]
迭代器[_T_co
)]
r可迭代的数据集。
所有表示数据样本可迭代的集合都应该继承它。
当数据来自流时,这种形式的集合尤其有用。
所有子类都应该重写 :meth:`__iter__` 方法,该方法将返回一个
样本集的样本迭代器。
当使用 :class:`~torch.utils.data.DataLoader` 与子类一起使用时,每个
数据集中的项目将通过`:class:`~torch.utils.data.DataLoader`生成
迭代器。当 :attr:`num_workers > 0` 时,每个工作进程将拥有
不同数据集对象的副本,因此通常希望配置
每个副本独立进行,以避免返回重复数据
workers. :func:`~torch.utils.data.get_worker_info`,当在工作者中调用时
进程,返回有关工作者的信息。它可以在任一上下文中使用。
数据集的 :meth:`__iter__` 方法或 :class:`~torch.utils.data.DataLoader` 的
attr:`worker_init_fn` 选项来修改每个副本的行为。
示例 1:在 :meth:`__iter__` 中分配所有工作给所有工作进程:
>>> # xdoctest: +REQUIRES(env:TORCH_DOCTEST_DATALOADER)
>>> # xdoctest: +SKIP("在 MacOS12 上失败")
>>> class MyIterableDataset(torch.utils.data.IterableDataset):
... def __init__(self, start, end):
... super(MyIterableDataset).__init__()
... 断言 end > start,"此示例代码仅适用于 end >= start"
... self.start = start
... self.end = end
...
... def __iter__(self):
... worker_info = torch.utils.data.get_worker_info()
... if worker_info is None: # 单进程数据加载,返回完整迭代器
... iter_start = self.start
... iter_end = self.end
... else: # 在工作进程
... # 分割工作负载
... 每个工作器的数量 = int(math.ceil((self.end - self.start) / float(worker_info.num_workers)))
... 工作器 ID = worker_info.id
... 迭代开始位置 = self.start + worker_id * 每个 worker 的数量
... 迭代结束位置 = min(迭代开始位置 + 每个 worker 的数量, self.end)
... 返回 iter(range(迭代开始位置, 迭代结束位置))
...
>>> # 应该给出与 range(3, 7) 相同的数据集,即 [3, 4, 5, 6].
>>> ds = MyIterableDataset(start=3, end=7)
>>> # 单进程加载
>>> print(list(torch.utils.data.DataLoader(ds, num_workers=0)))
[tensor([3]), tensor([4]), tensor([5]), tensor([6])]
>>> # xdoctest: +REQUIRES(POSIX)
>>> # 多进程加载,使用两个工作进程
>>> # 工作进程 0 获取了[3, 4]。工作进程 1 获取了[5, 6]。
>>> # xdoctest: +IGNORE_WANT("非确定性")
>>> print(list(torch.utils.data.DataLoader(ds, num_workers=2)))
[tensor([3]), tensor([5]), tensor([4]), tensor([6])]
>>> # 使用更多的工作者
>>> # xdoctest: +IGNORE_WANT("非确定性")
>>> print(list(torch.utils.data.DataLoader(ds, num_workers=12)))
[tensor([3]), tensor([5]), tensor([4]), tensor([6])]
示例 2:使用 :attr:`worker_init_fn` 在所有工作者之间分配工作负载::
>>> # xdoctest: +REQUIRES(env:TORCH_DOCTEST_DATALOADER)
>>> class MyIterableDataset(torch.utils.data.IterableDataset):
... def __init__(self, start, end):
... super(MyIterableDataset).__init__()
... assert end >= start, "此示例代码仅适用于 end >= start"
... self.start = start
... self.end = end
...
... def __iter__(self):
... return iter(range(self.start, self.end))
...
>>> # 应该给出与 range(3, 7) 相同的数据集,即 [3, 4, 5, 6].
>>> ds = MyIterableDataset(start=3, end=7)
>>> # 单进程加载
>>> print(list(torch.utils.data.DataLoader(ds, num_workers=0)))
[3, 4, 5, 6]
...
>>> 直接进行多进程加载会导致数据重复
>>> 打印(list(torch.utils.data.DataLoader(ds, num_workers=2)))
[3, 3, 4, 4, 5, 5, 6, 6]
>>> 定义一个 `worker_init_fn`,该函数配置每个数据集副本的方式不同
>>> def worker_init_fn(worker_id):
... worker_info = torch.utils.data.get_worker_info()
... dataset = worker_info.dataset # the dataset copy in this worker process
... overall_start = dataset.start
... overall_end = dataset.end
... # configure the dataset to only process the split workload
... per_worker = int(math.ceil((overall_end - overall_start) / float(worker_info.num_workers)))
... worker_id = worker_info.id
... dataset.start = overall_start + worker_id * per_worker
... dataset.end = min(dataset.start + per_worker, overall_end)
...
>>> # 多进程加载使用自定义的 `worker_init_fn`
>>> # 工作器 0 获取了[3, 4]。工作器 1 获取了[5, 6]。
>>> print(list(torch.utils.data.DataLoader(ds, num_workers=2, worker_init_fn=worker_init_fn)))
[3, 5, 4, 6]
>>> # 使用更多的工作者
>>> print(list(torch.utils.data.DataLoader(ds, num_workers=12, worker_init_fn=worker_init_fn)))
[3, 4, 5, 6]
"源代码"
def __add__(self, 其他:
数据集[_T_co
)]
返回
链式数据集([self,
其他])
# 默认没有 `def __len__(self)`?当需要时,子类会引发 `TypeError`。
# 查看 NOTE [Python 抽象基类中缺少默认的 `__len__`]
[文档]class TensorDataset(Dataset[tuple[Tensor, ...]]):
r"""封装张量的数据集。
每个样本将通过沿第一个维度索引张量来检索。
Args:
索引张量(Tensor):具有相同第一维大小的张量。
"""
tensors: (Tensor, ...) 元组
def __init__(self, *tensors: Tensor) -> None:
assert all(
tensors[0].size(0) == tensor.size(0) for tensor in tensors
), "张量大小不匹配"
self.tensors = tensors
def __getitem__(self, index):
return tuple(tensor[index] for tensor in self.tensors)
def __len__(self):
return self.tensors[0].size(0)
[文档]
类
栈数据集(
数据集[_T_stack
)]
r数据集作为多个数据集的堆叠。
此类用于组装复杂输入数据的各个部分,输入数据以数据集的形式给出。
示例:
>>> # xdoctest: +SKIP
>>> images = ImageDataset()
>>> texts = TextDataset()
>>> tuple_stack = StackDataset(images, texts)
>>> tuple_stack[0] == (images[0], texts[0])
>>> dict_stack = StackDataset(image=images, text=texts)
>>> dict_stack[0] == {'image': images[0], 'text': texts[0]}
参数:
*args (Dataset): Datasets for stacking returned as tuple.
**kwargs (Dataset): Datasets for stacking returned as dict.
"源代码"
数据集:
联盟[
元组,
字典]
def 初始化(self, *
参数:
数据集[_T_co
] **kwargs:
数据集[_T_co]) ->
无:
如果
参数:
如果 kwargs:
提升 ValueError(
支持“元组”(通过“args”)或
`dict`- (通过 `kwargs`) 类似于输入/输出,但两种类型都给出。
)
self.长度 =
长度(
参数[0]) # type: ignore[arg-type]
如果
任何(self.
长度 !=
长度(
数据集)
为
数据集
在
参数): # type: ignore[arg-type]
提升 ValueError(
"数据集大小不匹配")
self.数据集 = args
elif kwargs:
临时 =
列表(kwargs.
值())
self.长度 =
长度(tmp[0]) # type: ignore[arg-type]
如果
任何(self.
长度 !=
长度(
数据集)
为
数据集
在 tmp): # type: ignore[arg-type]
提升 ValueError(
"数据集之间大小不匹配")
self.数据集 = kwargs
否则:
提升 ValueError(
"至少应传递一个数据集")
def __getitem__(self, 索引):
如果 isinstance(self.
数据集,
字典):
返回 {k:
数据集[
索引]
为 k,
数据集
在 self.
数据集.
项目()}
返回
元组(
数据集[
索引]
为
数据集
在 self.
数据集)
def __getitems__(self, 索引:
列表):
当父数据集支持时添加批量采样支持。
如果 isinstance(self.
数据集,
字典):
dict_batch: 列表[_T_dict] =
[{}]
为 _
在
索引]
为 k,
数据集
在 self.
数据集.
项目():
如果
可调用(getattr(
数据集, "__getitems__",
无)):
项目 =
数据集.__getitems__(
索引)
# 类型: 忽略[attr-defined]
如果
长度(
项目) !=
长度(
索引):
提升 ValueError(
"嵌套数据集的输出大小不匹配。"
f预期{
长度(
索引)}
,获得{
长度(
项目)}"
)
为
数据,
样本
在
压缩(
项目,
字典批量):
样本 d[k] =
数据
否则:
为
索引,
样本 d
在
压缩(
索引,
字典批量):
样本[k] =
数据集[
索引]
返回
批量字典
# 元组数据
批量列表:
列表[
列表] =
空列表
为 _
在
索引]
为
数据集
在 self.
数据集:
如果
可调用(getattr(
数据集, "__getitems__",
无)):
项目 =
数据集.__getitems__(
索引)
# 类型: 忽略[attr-defined]
如果
长度(
项目) !=
长度(
索引):
提升 ValueError(
"嵌套数据集的输出大小不匹配。"
f预期{
长度(
索引)}
,获得{
长度(
项目)}"
)
为
数据,
样本_t
在
压缩(
项目,
列批量):
样本_t.
追加(
数据)
否则:
为
索引,
样本
在
压缩(
索引,
批处理列表):
样本.
追加(
数据集[
索引])
批处理元组:
列表[
_T 元组] = [
元组(
样例)
为
样本
在
列批量]
返回
元组批次
def __len__(self):
返回 self.
_长度
[文档]
类
连接数据集(
数据集[_T_co
)]
r“数据集作为多个数据集的连接。”
这个类用于组装不同的现有数据集。
参数:
数据集(序列):要连接的数据集列表
"源代码"
数据集:
列表[
数据集[_T_co]]
累计大小:
列表[int]
@staticmethod
def 累加和(sequence):
r, s = [] 0
为 e
在 sequence:
l = 长度(e)
r.追加(l + s)
s += l
返回 r
def 初始化(self,
数据集:
迭代器[
数据集]) ->
无:
超级().
初始化()
self.数据集 =
列表(
数据集)
断言
长度(self.
数据集) > 0,
"数据集不应为空的可迭代对象" # type: ignore[arg-type]
为 d
在 self.
数据集:
断言 not isinstance(
d, 可迭代数据集
), "ConcatDataset 不支持 IterableDataset"
self.累积大小 = self.
累加和(self.
数据集)
def __len__(self):
返回 self.
累计大小[-1]
def __getitem__(self, 索引):
如果
索引 < 0:
如果 -
索引 >
长度(self):
提升 ValueError(
索引的绝对值不应超过数据集长度
)
索引 =
长度(self) +
索引
数据集索引 =
二分查找.
二分查找右边界(self.
累计大小,
索引)
如果
数据集索引 == 0:
样本索引 =
索引
否则:
样本索引 =
索引 - self.
累计大小[
数据集索引 - 1]
返回 self.
数据集[
数据集索引
]
[样本索引]
@property
@deprecated(
将 `cummulative_sizes` 属性重命名为 `cumulative_sizes`,
分类=
未来警告,
)
def cummulative_sizes(self):
返回 self.
累积大小
[文档]类 ChainDataset(IterableDataset):
r"""用于链式多个 :class:`IterableDataset` 的数据集。
此类可用于组装不同的现有数据集流。The
链接操作即时完成,因此使用此类连接大规模数据集将非常高效。
使用此类连接数据集将非常高效。
Args:
datasets (可迭代的 IterableDataset 迭代器): 要连接的数据集。
"""
def __init__(self, 数据集: Iterable[数据集]) -> None:
super().__init__()
self.datasets = 数据集
def __iter__(self):
for d in self.datasets:
assert isinstance(
d, IterableDataset
“仅支持 IterableDataset”
yield from d
def __len__(self):
total = 0
for d in self.datasets:
assert isinstance(
d, IterableDataset
), "ChainDataset 仅支持 IterableDataset"
total += len(d) # 忽略参数类型
返回 total
[文档]class Subset(Dataset[_T_co]):
r"""
数据集的指定索引子集
Args:
数据集(Dataset):整个数据集
索引(序列):在整体集中选定的索引
"""
"""
dataset: 数据集[_T_co]
indices: 整数序列[int]
def __init__(self, dataset: 数据集[_T_co], indices: 整数序列[int]) -> None:
self.dataset = 数据集
self.indices = 索引
def __getitem__(self, idx):
if isinstance(idx, 列表):
return self.dataset[[self.indices[i] for i in idx]]
return self.dataset[self.indices[idx]]
def __getitems__(self, indices: list[int]) -> list[_T_co]:
# add batched sampling support when parent dataset supports it.
# see torch.utils.data._utils.fetch._MapDatasetFetcher
如果是可调用的 getattr(self.dataset, "__getitems__", None):
return self.dataset.__getitems__([self.indices[idx] for idx in indices]) # type: ignore[attr-defined]
else:
return [self.dataset[self.indices[idx]] for idx in indices]
def __len__(self):
return len(self.indices)
[文档]def
随机分割(
数据集:
数据集[_T
]
长度:
序列[
联盟[int, float]],
生成器:
可选[
生成器] =
默认生成器,
) -> 列表[
子集[_T
]]
r""
将数据集随机分割成给定长度的非重叠新数据集。
如果给出一个总和为 1 的分数列表,
长度将自动计算
floor(frac * len(dataset)) 对于每个提供的分数。
计算长度后,如果有余数,则计数为 1
分布式轮询方式分配到长度
直至没有余数为止。
可选地修复生成器以获得可重复的结果,例如:
示例:
>>> # xdoctest: +SKIP
>>> generator1 = torch.Generator().manual_seed(42)
>>> generator2 = torch.Generator().manual_seed(42)
>>> random_split(range(10), [3, 7], generator=generator1)
>>> random_split(range(30), [0.3, 0.3, 0.4], generator=generator2)
参数:
数据集(Dataset):要分割的数据集
长度(sequence):生成的分割长度或分割比例
生成器(生成器):用于随机排列的生成器。
"源代码"
如果
数学.isclose(
总和(
长度), 1)
和
总和(
长度)
≤ 1:
子集长度:
列表[int] = []
为 i,
比率
在
列举(
长度):
如果
比率 < 0
或
分数 > 1:
提升 ValueError(f
"索引处的分数"{i}
不在 0 和 1 之间)
n_items_in_split = int(
数学.
向下取整(
长度(
数据集) *
分数部分) # type: ignore[arg-type]
)
子集长度.
追加(
每个分割中的项目数)
余数 =
长度(
数据集) -
总和(
子集长度) # type: ignore[arg-type]
以轮询方式将 1 加到所有长度上,直到余数为 0
为 i
在
范围(
剩余):
idx_to_add_at = i % 长度(subset_lengths)
subset_lengths[idx_to_add_at] += 1
lengths = 子集长度
为 i,
长度
在
列举(
长度):
如果
长度 == 0:
warnings.警告(
f"在索引处分割的长度"{i}
可能是 0。
f"这可能会导致数据集为空。"
)
无法验证数据集是否已分页
如果
总和(
长度) !=
长度(
数据集): # type: ignore[arg-type]
提升 ValueError(
输入长度之和不等于输入数据集的长度!
)
索引 =
随机排列(
总和(
长度),
生成器=
生成器).
转列表()
# 类型:忽略[参数类型,调用重载]
lengths = 角色(
序列[int
]
长度)
返回 [
子集(
数据集,
索引[
偏移 -
长度 :
偏移量])
为
偏移量,
长度
在
压缩(itertools.
累加(
长度),
长度)
]