• 文档 >
  • 模块代码 >
  • torch >
  • torch.nn.parallel.data_parallel
快捷键

torch.nn.parallel.data_parallel 的源代码

# mypy: 允许未类型化定义
导入 操作符
导入 警告
来自 collections.abc 导入 序列
来自 itertools 导入 连接
来自 打字 导入 任意, 通用, 可选, 类型变量, 联合

导入 火炬
来自 torch._utils 导入 (
    _get_all_device_indices,
    获取可用设备类型,
    获取设备索引,
    _get_devices_properties,
)
来自 torch.nn.modules 导入 模块
来自 torch.nn.parallel.parallel_apply 导入 parallel_apply
来自 torch.nn.parallel.replicate 导入 复制
来自 torch.nn.parallel.scatter_gather 导入 收集, scatter_kwargs


__all__ = ["DataParallel", 数据并行]


def _检查余额(设备 ID: 序列[联盟[int, 火炬.设备]]) -> :
    不平衡警告 = ""
您的 GPU 之间存在不平衡。您可能需要排除具有低于 75%的 GPU{}
低于 75% o如果是 GPU 的内存或核心{}。您可以通过设置
DataParallel 的 device_ids 参数,或者通过设置 CUDA_VISIBLE_DEVICES 环境变量来实现。
""环境变量。""
    设备 ID = [获取设备索引(x, True)  x  设备 ID]
    设备属性 = 获取设备属性(设备 ID)

    def 警告不平衡(获取属性):
        values = [获取属性(属性)  属性  开发属性]
        最小位置, 最小值 = 最小(列举(), =操作符.itemgetter(1))
        最大位置, max_val = 最大值(列举(), =操作符.itemgetter(1))
        如果 最小值 / max_val < 0.75:
            warnings.警告(
                不平衡警告.格式(设备 ID[最小位置] 设备 ID[最大位置])
            )
            返回 真实
        返回 

    如果 警告不平衡(lambda 属性: 属性.总内存):
        返回
    如果 警告不平衡(lambda 属性: 属性.多处理器数量):
        返回


T = 类型变量(T, 绑定=模块)


[文档] 数据并行(模块, 通用[T)] r实现在模块级别的数据并行。 该容器通过并行应用给定的 :attr:`module` 来实现。 将输入数据按块分批分配到指定的设备上 维度(其他对象将每个设备复制一次)。在正向 通过,该模块在每个设备上复制,每个副本处理 输入的部分。在反向传播过程中,每个副本的梯度 这些被汇总到原始模块中。 批处理大小应该大于使用的 GPU 数量。 .. 警告:: 建议使用 :class:`~torch.nn.parallel.DistributedDataParallel`, 而不是这个类,来进行多 GPU 训练,即使只有一个 GPU。 node.参见::ref:`cuda-nn-ddp-instead` 和 :ref:`ddp`。 允许传递任意位置参数和关键字参数到 DataParallel 中。 DataParallel 处理某些类型时会有特殊处理。张量将被分散到指定的维度(默认为 0)。元组、列表和字典类型将被... **分散**到指定的维度(默认为 0)。元组、列表和字典类型将被... 需要进行浅拷贝。其他类型将在不同的线程间共享 并且在模型的正向传播中写入时可能会被损坏。 并行化的 :attr:`module` 在运行此 :class:`~torch.nn.DataParallel` 之前必须将其参数和缓冲区放在 ``device_ids[0]`` 上 模块。 .. 警告:: 在每次前向传播中,:attr:`module` 将在每个设备上**复制**,因此对运行中的模块在 `forward` 中的任何更新都将丢失。 例如,如果 :attr:`module` 有一个在 `forward` 中递增的计数器属性,它将始终保持在初始值,因为更新不会持久化。 如果 :attr:`module` 有一个在 `forward` 中递增的计数器属性,它将始终保持在初始值,因为更新不会持久化。 因为更新不会持久化,所以如果 :attr:`module` 有一个在 `forward` 中递增的计数器属性,它将始终保持在初始值。 在副本上完成,这些副本在“forward”之后被销毁。然而, `:class:`~torch.nn.DataParallel` 保证副本在 `device[0]` 将与其参数和缓冲区共享存储 基础并行化:attr:`模块`。因此,**就地**更新到 ``device[0]`` 上的参数或缓冲区将被记录。例如, class:`~torch.nn.BatchNorm2d` 和 :func:`~torch.nn.utils.spectral_norm` 依赖于此行为来更新缓冲区。 .. 警告:: 在 :attr:`module` 及其子模块上定义的前向和反向钩子 将被调用 `len(device_ids)` 次,每次的输入位于 特定设备。特别是,挂钩仅保证为 正确顺序执行相关操作 设备。例如,通过钩子设置的 `torch.nn.Module.register_forward_pre_hook()` 在执行 `torch.nn.Module.forward()` 之前被调用 在所有 `len(device_ids)` 个 `torch.nn.Module.forward()` 调用之前 每个这样的钩子都在相应的 `torch.nn.Module.forward()` 调用之前执行 执行 `torch.nn.Module.forward()` 调用之前 .. 警告:: 当`:module`返回一个标量(即 0 维张量)时, 在`:func:`forward`中,这个包装器将返回一个向量, 包含数据并行中使用的设备数量,并包含每个设备的结果。 .. 注意:: 使用时存在细微差别 在“打包序列 -> 循环神经网络 -> 解包序列”模式中 在 `:class:`~torch.nn.Module` 包装的 `:class:`~torch.nn.DataParallel` 中 请参阅 FAQ 中的 `:ref:`pack-rnn-unpack-with-data-parallelism` 部分 详细信息。 Args: 模块(模块):要并行化的模块 device_ids(整数列表或 torch.device):CUDA 设备(默认:所有设备) output_device(整数或 torch.device):输出设备位置(默认:device_ids[0]) 属性: 模块(模块):要并行化的模块 示例:: >>> # xdoctest: +SKIP >>> net = torch.nn.DataParallel(model, device_ids=[0, 1, 2]) >>> output = net(input_var) # input_var 可以在任何设备上,包括 CPU "源代码" # TODO: 当这个类能很好地处理 8+个 GPU 时,更新 notes/cuda.rst def 初始化( self, 模块: T, 设备 ID: 可选[序列[联盟[int, 火炬.设备]]] = , 输出设备: 可选[联盟[int, 火炬.设备]] = , 暗淡: 整型 = 0, ) -> : 超级().初始化() 火炬._C._log_api_usage_once(torch.nn.parallel.DataParallel) 设备类型 = 获取可用设备类型() 如果 设备类型 设备类型 == 国会议员: self.模块 = 模块 self.设备 ID = [] 返回 如果 设备 ID : 设备 ID = 获取所有设备索引() 如果 设备 ID : 提升 运行时错误(没有找到可用设备) 如果 输出设备 : 输出设备 = 设备 ID[0] self.维度 = 维度 self.模块 = 模块 self.设备 ID = [获取设备索引(x, True) x 设备 ID] self.输出设备 = 获取设备索引(输出设备, True) self.源设备对象 = 火炬.设备(设备类型, self.设备 ID[0]) 如果 设备类型 == cuda: 检查余额(self.设备 ID) 如果 长度(self.设备 ID) == 1: self.模块.(self.源设备对象) def 前向(self, *输入: 任意, **kwargs: 任意) -> 任意: 火炬.自动微分.分析器.记录功能("DataParallel.forward"): 如果 not self.设备 ID: 返回 self.模块(*输入, **kwargs) t chain(self.模块.参数(), self.模块.缓冲区()): 如果 t.设备 != self.源设备对象: 提升 运行时错误( "模块必须具有其参数和缓冲区" f"在设备上"{self.源设备对象}(设备 ID 列表[0])但找到其中一个" f"在设备上找到它们:{t.设备}" ) 输入, 模块参数 = self.分散(输入, kwargs, self.设备 ID 列表) # 对于没有输入的前向函数,将创建空列表和字典 # 因此模块可以在 device_ids 列表中的第一个设备上执行 如果 not 输入 not 模块参数: 输入 = ((),) 模块参数 = ({},) 如果 长度(self.设备 ID) == 1: 返回 self.模块(*输入[0] **模块参数[0]) 副本 = self.复制(self.模块, self.设备 ID[ 长度(输入)]]) 输出 = self.并行应用(副本, 输入, 模块参数) 返回 self.收集(输出, self.输出设备) def 复制( self, 模块: T, 设备 ID: 序列[联盟[int, 火炬.设备]] ) -> 列表[T]: 返回 复制(模块, 设备 ID, not 火炬.梯度是否启用()) def 分散( self, 输入: 元组[任意, ...] kwargs: 可选[字典[字符串, 任意]], 设备 ID: 序列[联盟[int, 火炬.设备]], ) -> 任意: 返回 散点图参数(输入, kwargs, 设备 ID, 暗淡=self.暗淡) def 并行应用( self, 副本: 序列[T] 输入: 序列[任意] kwargs: 任何 ) -> 列表[任意]: 返回 并行应用( 副本, 输入, kwargs, self.设备 ID[ 长度(副本] ) def 收集(self, 输出: 任意, 输出设备: 联盟[int, 火炬.设备]) -> 任意: 返回 收集(输出, 输出设备, 暗淡=self.暗淡)
[文档]def 数据并行( 模块: 模块, 输入: 任意, 设备 ID 列表: 可选[序列[联盟[int, 火炬.设备]]] = , 输出设备: 可选[联盟[int, 火炬.设备]] = , 暗淡: 整型 = 0, 模块参数: 可选[任意] = , ) -> 火炬.张量: r在给定的 device_ids 的 GPU 上并行评估模块(input)。 这是 DataParallel 模块的功能版本。 Args: 模块 (Module):要并行评估的模块 输入 (Tensor):模块的输入 device_ids(列表,int 或 torch.device):复制的模块的 GPU ID output_device(列表,int 或 torch.device):输出位置的 GPU 使用-1 表示 CPU。 (默认:device_ids[0]) 返回: 包含模块(input)结果的 Tensor,位于 输出设备 "源代码" 如果 not isinstance(输入, 元组): 输入 = (输入,) 如果 输入 not 否则 () 设备类型 = 获取可用设备类型() 如果 设备类型 : 提升 运行时错误(无法确定设备类型) 如果 device_ids : device_ids = 获取所有设备索引() 如果 设备 ID : 提升 运行时错误(未找到可用设备) 如果 输出设备 : 输出设备 = 设备 ID[0] 设备 ID = [获取设备索引(x, True) x 设备 ID] 输出设备 = 获取设备索引(输出设备, True) 源设备对象 = 火炬.设备(设备类型, 设备 ID[0]) t chain(模块.参数(), 模块.缓冲区()): 如果 t.设备 != 源设备对象: 提升 运行时错误( "模块必须具有其参数和缓冲区" f"在设备上"{源设备对象}(device_ids[0])但找到其中之一" f"在设备上找到它们:"{t.设备}" ) 输入, 模块参数 = scatter 参数(输入, 模块参数, 设备 ID, 暗淡) # 对于没有输入的模块,将创建空列表和字典 # 可在 device_ids 中的第一个设备上执行模块 如果 not 输入 not 模块参数: 输入 = ((),) 模块参数 = ({},) 断言 模块参数 not 如果 长度(设备 ID) == 1: 返回 模块(*输入[0] **模块参数[0]) 已用设备 ID = 设备 ID[ 长度(输入] 副本 = 复制(模块, 已用设备 ID) 输出 = 并行应用(副本, 输入, 模块参数, 使用设备 ID) 返回 收集(输出, 输出设备, 暗淡)

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源