torch.nn.parallel.data_parallel 的源代码
# mypy: 允许未类型化定义
导入
操作符
导入
警告
来自 collections.abc
导入
序列
来自 itertools
导入
连接
来自
打字
导入
任意,
通用,
可选,
类型变量,
联合
导入
火炬
来自 torch._utils
导入 (
_get_all_device_indices,
获取可用设备类型,
获取设备索引,
_get_devices_properties,
)
来自 torch.nn.modules
导入
模块
来自 torch.nn.parallel.parallel_apply
导入 parallel_apply
来自 torch.nn.parallel.replicate
导入
复制
来自 torch.nn.parallel.scatter_gather
导入
收集, scatter_kwargs
__all__ = ["DataParallel", 数据并行]
def _检查余额(
设备 ID:
序列[
联盟[int,
火炬.
设备]]) ->
无:
不平衡警告 =
""
您的 GPU 之间存在不平衡。您可能需要排除具有低于 75%的 GPU{}
的
低于 75% o
如果是 GPU 的内存或核心{}
。您可以通过设置
DataParallel 的 device_ids 参数,或者通过设置 CUDA_VISIBLE_DEVICES 环境变量来实现。
""环境变量。""
设备 ID = [
获取设备索引(x, True)
为 x
在
设备 ID]
设备属性 =
获取设备属性(
设备 ID)
def 警告不平衡(
获取属性):
values = [获取属性(
属性)
为
属性
在
开发属性]
最小位置,
最小值 =
最小(
列举(
值),
键=
操作符.itemgetter(1))
最大位置, max_val =
最大值(
列举(
值),
键=
操作符.itemgetter(1))
如果
最小值 / max_val < 0.75:
warnings.警告(
不平衡警告.
格式(
设备 ID[
最小位置
]
设备 ID[
最大位置])
)
返回
真实
返回
假
如果
警告不平衡(lambda
属性:
属性.
总内存):
返回
如果
警告不平衡(lambda
属性:
属性.
多处理器数量):
返回
T = 类型变量(
T,
绑定=
模块)
[文档]
类
数据并行(
模块,
通用[T
)]
r实现在模块级别的数据并行。
该容器通过并行应用给定的 :attr:`module` 来实现。
将输入数据按块分批分配到指定的设备上
维度(其他对象将每个设备复制一次)。在正向
通过,该模块在每个设备上复制,每个副本处理
输入的部分。在反向传播过程中,每个副本的梯度
这些被汇总到原始模块中。
批处理大小应该大于使用的 GPU 数量。
.. 警告::
建议使用 :class:`~torch.nn.parallel.DistributedDataParallel`,
而不是这个类,来进行多 GPU 训练,即使只有一个 GPU。
node.参见::ref:`cuda-nn-ddp-instead` 和 :ref:`ddp`。
允许传递任意位置参数和关键字参数到 DataParallel 中。
DataParallel 处理某些类型时会有特殊处理。张量将被分散到指定的维度(默认为 0)。元组、列表和字典类型将被...
**分散**到指定的维度(默认为 0)。元组、列表和字典类型将被...
需要进行浅拷贝。其他类型将在不同的线程间共享
并且在模型的正向传播中写入时可能会被损坏。
并行化的 :attr:`module` 在运行此 :class:`~torch.nn.DataParallel` 之前必须将其参数和缓冲区放在
``device_ids[0]`` 上
模块。
.. 警告::
在每次前向传播中,:attr:`module` 将在每个设备上**复制**,因此对运行中的模块在 `forward` 中的任何更新都将丢失。
例如,如果 :attr:`module` 有一个在 `forward` 中递增的计数器属性,它将始终保持在初始值,因为更新不会持久化。
如果 :attr:`module` 有一个在 `forward` 中递增的计数器属性,它将始终保持在初始值,因为更新不会持久化。
因为更新不会持久化,所以如果 :attr:`module` 有一个在 `forward` 中递增的计数器属性,它将始终保持在初始值。
在副本上完成,这些副本在“forward”之后被销毁。然而,
`:class:`~torch.nn.DataParallel` 保证副本在
`device[0]` 将与其参数和缓冲区共享存储
基础并行化:attr:`模块`。因此,**就地**更新到
``device[0]`` 上的参数或缓冲区将被记录。例如,
class:`~torch.nn.BatchNorm2d` 和 :func:`~torch.nn.utils.spectral_norm`
依赖于此行为来更新缓冲区。
.. 警告::
在 :attr:`module` 及其子模块上定义的前向和反向钩子
将被调用 `len(device_ids)` 次,每次的输入位于
特定设备。特别是,挂钩仅保证为
正确顺序执行相关操作
设备。例如,通过钩子设置的
`torch.nn.Module.register_forward_pre_hook()` 在执行 `torch.nn.Module.forward()` 之前被调用
在所有 `len(device_ids)` 个 `torch.nn.Module.forward()` 调用之前
每个这样的钩子都在相应的 `torch.nn.Module.forward()` 调用之前执行
执行 `torch.nn.Module.forward()` 调用之前
.. 警告::
当`:module`返回一个标量(即 0 维张量)时,
在`:func:`forward`中,这个包装器将返回一个向量,
包含数据并行中使用的设备数量,并包含每个设备的结果。
。
.. 注意::
使用时存在细微差别
在“打包序列 -> 循环神经网络 -> 解包序列”模式中
在 `:class:`~torch.nn.Module` 包装的 `:class:`~torch.nn.DataParallel` 中
请参阅 FAQ 中的 `:ref:`pack-rnn-unpack-with-data-parallelism` 部分
详细信息。
Args:
模块(模块):要并行化的模块
device_ids(整数列表或 torch.device):CUDA 设备(默认:所有设备)
output_device(整数或 torch.device):输出设备位置(默认:device_ids[0])
属性:
模块(模块):要并行化的模块
示例::
>>> # xdoctest: +SKIP
>>> net = torch.nn.DataParallel(model, device_ids=[0, 1, 2])
>>> output = net(input_var) # input_var 可以在任何设备上,包括 CPU
"源代码"
# TODO: 当这个类能很好地处理 8+个 GPU 时,更新 notes/cuda.rst
def 初始化(
self,
模块: T,
设备 ID:
可选[
序列[
联盟[int,
火炬.
设备]]] =
无,
输出设备:
可选[
联盟[int,
火炬.
设备]] =
无,
暗淡:
整型 = 0,
) -> 无:
超级().
初始化()
火炬._C._log_api_usage_once(
torch.nn.parallel.DataParallel)
设备类型 =
获取可用设备类型()
如果
设备类型
是
无
或
设备类型 ==
国会议员:
self.模块 =
模块
self.设备 ID = []
返回
如果
设备 ID
是
无:
设备 ID =
获取所有设备索引()
如果
设备 ID
是
无:
提升
运行时错误(
没有找到可用设备)
如果
输出设备
是
无:
输出设备 =
设备 ID[0]
self.维度 =
维度
self.模块 =
模块
self.设备 ID = [
获取设备索引(x, True)
为 x
在
设备 ID]
self.输出设备 =
获取设备索引(
输出设备, True)
self.源设备对象 =
火炬.
设备(
设备类型, self.
设备 ID[0])
如果
设备类型 ==
cuda:
检查余额(self.
设备 ID)
如果
长度(self.
设备 ID) == 1:
self.模块.
到(self.
源设备对象)
def 前向(self, *
输入:
任意, **kwargs:
任意) ->
任意:
与
火炬.
自动微分.
分析器.
记录功能("DataParallel.forward"):
如果 not self.
设备 ID:
返回 self.
模块(*
输入, **kwargs)
为 t
在 chain(self.
模块.
参数(), self.
模块.
缓冲区()):
如果 t.
设备 != self.
源设备对象:
提升
运行时错误(
"模块必须具有其参数和缓冲区"
f"在设备上"{self.
源设备对象}
(设备 ID 列表[0])但找到其中一个"
f"在设备上找到它们:{t.
设备}"
)
输入,
模块参数 = self.
分散(
输入, kwargs, self.
设备 ID 列表)
# 对于没有输入的前向函数,将创建空列表和字典
# 因此模块可以在 device_ids 列表中的第一个设备上执行
如果 not
输入
和 not
模块参数:
输入 = ((),)
模块参数 = ({},)
如果
长度(self.
设备 ID) == 1:
返回 self.
模块(*
输入[0
] **
模块参数[0])
副本 = self.
复制(self.
模块, self.
设备 ID
[
长度(
输入
)]])
输出 = self.
并行应用(
副本,
输入,
模块参数)
返回 self.
收集(
输出, self.
输出设备)
def 复制(
self, 模块: T,
设备 ID:
序列[
联盟[int,
火炬.
设备]]
) -> 列表[T
]:
返回
复制(
模块,
设备 ID, not
火炬.
梯度是否启用())
def 分散(
self,
输入:
元组[
任意, ...
]
kwargs: 可选[
字典[
字符串,
任意]],
设备 ID:
序列[
联盟[int,
火炬.
设备]],
) -> 任意:
返回
散点图参数(
输入, kwargs,
设备 ID,
暗淡=self.
暗淡)
def 并行应用(
self, 副本:
序列[T
]
输入:
序列[
任意
] kwargs:
任何
) -> 列表[
任意
]:
返回
并行应用(
副本,
输入, kwargs, self.
设备 ID
[
长度(
副本
]
)
def 收集(self,
输出:
任意,
输出设备:
联盟[int,
火炬.
设备]) ->
任意:
返回
收集(
输出,
输出设备,
暗淡=self.
暗淡)
[文档]def
数据并行(
模块:
模块,
输入:
任意,
设备 ID 列表:
可选[
序列[
联盟[int,
火炬.
设备]]] =
无,
输出设备:
可选[
联盟[int,
火炬.
设备]] =
无,
暗淡:
整型 = 0,
模块参数:
可选[
任意] =
无,
) -> 火炬.
张量:
r在给定的 device_ids 的 GPU 上并行评估模块(input)。
这是 DataParallel 模块的功能版本。
Args:
模块 (Module):要并行评估的模块
输入 (Tensor):模块的输入
device_ids(列表,int 或 torch.device):复制的模块的 GPU ID
output_device(列表,int 或 torch.device):输出位置的 GPU 使用-1 表示 CPU。
(默认:device_ids[0])
返回:
包含模块(input)结果的 Tensor,位于
输出设备
"源代码"
如果 not isinstance(
输入,
元组):
输入 = (
输入,)
如果
输入
是 not
无
否则 ()
设备类型 =
获取可用设备类型()
如果
设备类型
是
无:
提升
运行时错误(
无法确定设备类型)
如果 device_ids
是
无:
device_ids = 获取所有设备索引()
如果
设备 ID
是
无:
提升
运行时错误(
未找到可用设备)
如果
输出设备
是
无:
输出设备 =
设备 ID[0]
设备 ID = [
获取设备索引(x, True)
为 x
在
设备 ID]
输出设备 =
获取设备索引(
输出设备, True)
源设备对象 =
火炬.
设备(
设备类型,
设备 ID[0])
为 t
在 chain(
模块.
参数(),
模块.
缓冲区()):
如果 t.
设备 !=
源设备对象:
提升
运行时错误(
"模块必须具有其参数和缓冲区"
f"在设备上"{
源设备对象}
(device_ids[0])但找到其中之一"
f"在设备上找到它们:"{t.
设备}"
)
输入,
模块参数 =
scatter 参数(
输入,
模块参数,
设备 ID,
暗淡)
# 对于没有输入的模块,将创建空列表和字典
# 可在 device_ids 中的第一个设备上执行模块
如果 not
输入
和 not
模块参数:
输入 = ((),)
模块参数 = ({},)
断言
模块参数
是 not
无
如果
长度(
设备 ID) == 1:
返回
模块(*
输入[0
] **
模块参数[0])
已用设备 ID =
设备 ID
[
长度(
输入
]
副本 =
复制(
模块,
已用设备 ID)
输出 =
并行应用(
副本,
输入,
模块参数,
使用设备 ID)
返回
收集(
输出,
输出设备,
暗淡)