torch.utils.data.distributed 的源代码
导入
数学
来自 collections.abc
导入
迭代器
来自
打字
导入
可选,
类型变量
导入
火炬
导入 torch.distributed
作为 dist
来自 torch.utils.data.dataset
导入
数据集
来自 torch.utils.data.sampler
导入
样本器
__all__ = [分布式采样器]
_T_co = 类型变量(
_T_co,
协变=True)
[文档]
类
分布式采样器(
采样器[_T_co
)]
r采样器,限制数据加载到数据集的子集。
尤其适用于与
`:class:`torch.nn.parallel.DistributedDataParallel`. 在这种情况下,每个
过程可以将一个 :class:`~torch.utils.data.DistributedSampler` 实例传递
class:`~torch.utils.data.DataLoader` 样本器,并加载原始数据集的一个子集
数据集被假定为固定大小,并且它的任何实例始终
.. 注意::
假设数据集的大小是恒定的,并且它的任何实例始终
返回相同元素且顺序相同。
参数:
数据集:用于采样的数据集。
num_replicas(int,可选):参与分布式训练的进程数。
默认情况下,从 :attr:`world_size` 获取。
当前分布式组。
排名(int,可选):当前进程在 :attr:`num_replicas` 中的排名。
默认情况下,:attr:`rank` 从当前分布式组中获取。
组。
shuffle(bool,可选):如果为 ``True``(默认),采样器将打乱
索引。
seed(int,可选):用于打乱采样器的随机种子,如果
attr:`shuffle=True`。此数字应在所有
分布式组中的进程。默认值:``0``。
drop_last(布尔值,可选):如果为 ``True``,则采样器将丢弃
数据的尾部,使其能够被副本数量整除。
如果为 ``False``,采样器将添加额外的索引以保持
数据应均匀分配到副本上。默认:``False``。
.. 警告::
在分布式模式下,在每个 epoch 开始时调用 :meth:`set_epoch` 方法
在创建 :class:`DataLoader` 迭代器之前
是使多个 epoch 之间正确打乱顺序所必需的。否则,
始终使用相同的顺序。
示例::
>>> # xdoctest: +SKIP
>>> sampler = DistributedSampler(dataset) if is_distributed else None
>>> loader = DataLoader(dataset, shuffle=(sampler is None),
... sampler=sampler)
>>> for epoch in range(start_epoch, n_epochs):
... 如果是分布式:
... sampler.set_epoch(epoch)
... train(loader)
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 初始化(
自身,
数据集:
数据集,
副本数量:
可选[int] =
无,
排名:
可选[int] =
无,
打乱:
布尔类型 = True,
种子:
整型 = 0,
drop_last: 布尔类型 =
错误,
) -> 无:
如果
副本数量
是
无:
如果 not
距离.
是否可用():
提升
运行时错误(
需要分布式包可用)
副本数量 =
距离.
获取世界大小()
如果
排名
是
无:
如果 not
距离.
是否可用():
提升
运行时错误(
"需要分布式包可用")
排名 =
距离.
获取排名()
如果
排名 >= num_replicas
或
排名 < 0:
提升 ValueError(
f"无效的 rank"{
排名}
, rank 应在区间[0, {num_replicas - 1}]"
)
自身.
数据集 =
数据集
自身.num_replicas = num_replicas
自身.
排名 =
排名
自身.
纪元 = 0
自身.drop_last = drop_last
如果数据集长度能被副本数整除,那么
无需删除任何数据,因为数据集将平均分割。
如果
自身.drop_last
和
长度(
自身.
数据集) %
自身.num_replicas != 0: # type: ignore[arg-type]
拆分到最近的可用长度,该长度是平均可分割的。
这是为了确保每个 rank 接收相同数量的数据。
使用此采样器。
自身.
样本数量 =
数学.
向上取整(
(长度(
自身.
数据集) -
自身.
副本数量) /
自身.
副本数量 # type: ignore[arg-type]
)
否则:
自身.
样本数量 =
数学.
向上取整(
长度(
自身.
数据集) /
自身.
副本数量) # type: ignore[arg-type]
自身.
总大小 =
自身.
样本数量 *
自身.
副本数量
自身.
打乱 =
打乱
自身.
种子 =
种子
def __iter__(自身) ->
迭代器[_T_co
]:
如果
自身.
打乱:
基于 epoch 和 seed 进行确定性打乱
g = 火炬.
生成器()
g.手动播种(
自身.
种子 +
自身.
训练轮数)
索引 =
火炬.
随机排列(
长度(
自身.
数据集),
生成器=g).
转列表() # type: ignore[arg-type]
否则:
索引 =
列表(
范围(
长度(
自身.
数据集))) # type: ignore[arg-type]
如果 not
自身.drop_last:
添加额外样本以使其均匀可除
填充大小 =
自身.
总大小 -
长度(
索引)
如果
填充大小
≤
长度(
索引):
索引 +=
索引
[
填充大小]
否则:
索引 += (
索引 *
数学.
向上取整(
填充大小 /
长度(
索引)))[
:填充大小
]
否则:
删除数据尾部以使其均匀可除
索引 =
索引
[
自身.
总大小]
断言
长度(
索引) ==
自身.
总大小
子采样
索引 =
索引[
自身.
排名 :
自身.
总大小 :
自身.
副本数量]
断言
长度(
索引) ==
自身.
样本数量
返回
迭代(
索引)
def __len__(自身) -> int:
返回
自身.
样本数量
def 设置 epoch(
自身,
训练轮数: int) ->
无:
r""
设置此采样器的 epoch。
当:attr:`shuffle=True`时,这确保了所有副本
每个 epoch 使用不同的随机排序。否则,下一个迭代的采样器将产生相同的排序。
epoch(int):epoch 编号。
参数:
epoch(int):epoch 编号。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
自身.
纪元 =
训练轮数