快捷键

torch.distributed.fsdp._fully_shard._fully_shard 的源代码

# mypy: 允许未类型化装饰器
# mypy: 允许未类型化定义

来自 未来 导入 注释

导入 functools
来自 打字 导入 (
    任意,
    可调用,
    角色,
    无返回值,
    可选,
    过载,
    类型检查,
    联盟,
)

导入 火炬
导入 torch.nn 作为 然后
来自 torch.distributed._composable 导入 合同
来自 torch.distributed.utils 导入 获取根模块

来自 .fsdp_api 导入 混合精度策略, 卸载策略
来自 .fsdp_common 导入 FSDPMeshInfo, HSDPMeshInfo
来自 __fsdp 初始化 导入 (
    _从网格获取设备,
    _获取管理模块,
    _获取管理状态,
    _获取后置网格信息,
    _初始化默认全分片网格,
    将状态移动到设备,
)
来自 ._fsdp_param_group 导入 FSDPParamGroup
来自 ._fsdp_state 导入 获取模块 FSDP 状态, FSDP 状态


如果 类型检查:
    来自 collections.abc 导入 迭代器

    来自 torch.distributed.tensor 导入 设备网状结构, 片段

__all__ = [
    "全分片",
    "FSDP 模块",
    Unshard 处理,
    注册 fsdp 前进方法,
]


类到 FSDP 类的转换: 字典[类型, 类型] = {}


@overload
def 全分片(
    模块: nn.模块,
    *,
    网格: 可选[设备网状结构] = ...,
    前置后重新分配资源: 联盟[布尔, int] = ...,
    分片放置函数: 可选[可调用[[nn.参数] 可选[片段]]] = ...,
    多精度策略: 混合精度策略 = ...,
    卸载策略: 卸载策略 = ...,
    忽略参数: 可选[集合[nn.参数]] = ...,
) -> FSDP 模块: ...


@overload
def 全分片(
    模块: 列表[nn.模块]
    *,
    网格: 可选[设备网状结构] = ...,
    转发后重新分片: 联盟[布尔, int] = ...,
    shard 放置函数: 可选[可调用[[nn.参数] 可选[片段]]] = ...,
    多精度策略: 混合精度策略 = ...,
    卸载策略: 卸载策略 = ...,
    忽略参数: 可选[集合[nn.参数]] = ...,
) -> 列表[FSDP 模块]: ...


# 装饰器向 `module` 添加一个状态对象,可以通过
# `fully_shard.state(module)` 访问。状态对象和模块是一对一的关系。
Python 运行时装饰器与静态类型检查不兼容
因此抑制一些类型检查以支持类型重载
这样调用者仍然可以根据输入类型获取正确的返回类型
@contract(state_cls=FSDP 状态)  # 类型:忽略[misc] # 参考[1]
def 全分片(
    模块,
    *,
    网格: 可选[设备网状结构] = ,
    前置后重新分配: 联盟[布尔, int] = True,
    分片放置函数: 可选[可调用[[nn.参数] 可选[片段]]] = ,
    多精度策略: 混合精度策略 = 混合精度策略(),
    转发策略: 转发策略 = 卸载策略(),
    忽略参数: 可选[集合[nn.参数]] = ,
):
    ""
将完全分片数据并行(FSDP)应用于 ``module``,其中 FSDP
分片模块参数、梯度和优化器状态到数据
并行工作进程以节省内存,但以通信为代价。

初始化时,FSDP 会将模块的参数分散到由`mesh`指定的并行工作进程中。
在前向计算之前,FSDP 会将分散的参数在数据并行工作进程中全部收集起来,以获得用于前向计算的非分散参数。
如果`reshard_after_forward`被设置为
,则在前向计算后重新分散参数。
“如果为真”,则 FSDP 在正向传播后释放未分片的参数,并在反向传播前重新聚合它们。在计算梯度后,FSDP 释放未分片的参数,并将未分片的梯度在数据并行工作之间进行 reduce-scatter。
计算梯度后,FSDP 释放未分片的参数,并将未分片的梯度在数据并行工作之间进行 reduce-scatter。
计算梯度后,FSDP 释放未分片的参数,并将未分片的梯度在数据并行工作之间进行 reduce-scatter。
计算梯度后,FSDP 释放未分片的参数,并将未分片的梯度在数据并行工作之间进行 reduce-scatter。

该实现将分片参数表示为 :class:`DTensor` 对象
在 dim-0 维度上进行分片,而未分片的参数将类似于原始的
参数(例如,如果原始参数是 :class:`torch.Tensor`)
模块(例如,:class:`torch.Tensor`)。一个模块
    `forward pre-hook <https://pytorch.org/docs/main/generated/torch.nn.Module.html#torch.nn.Module.register_forward_pre_hook>`_
在 `module` 上进行所有参数的全局聚合,并创建一个模块
    `forward hook <https://pytorch.org/docs/main/generated/torch.nn.Module.html#torch.nn.Module.register_forward_hook>`_
在 `module` 上释放它们(如果需要)。类似的反向钩子也会进行全局聚合
参数和后续的自由参数以及减少-散射梯度。

由于将多个张量分组在一起进行一次集体操作对于通信效率至关重要,因此该实现首先进行这种分组。
该类。在 `module` 上调用 :meth:`fully_shard` 构造一个组。
。在 `module` 上调用 :meth:`fully_shard` 构造一个组。
包含 `module.parameters()` 中的参数,但不包括那些已经
分配给早期通话中的一个子模块的组。这意味着
`:meth:`fully_shard` 应该从下往上调用您的模型。每个组的
参数全部汇集在一个集合中,其梯度是
将分散的合并成一个整体。将模型分割成多个组(“层叠式”)可以最大程度地节省内存并实现通信/计算重叠。用户通常不应仅在顶层根模块上调用::meth:`fully_shard`。
这允许实现峰值内存节省和通信/计算重叠。
用户通常不应仅在顶层根模块上调用:meth:`fully_shard`。


参数:
模块(nn.Module 或 nn.Module 列表):模块或模块列表
使用 FSDP 分片并将它们分组以进行通信。
网格(可选 DeviceMesh):此数据并行网格定义了分片和设备。
如果是 1D,则参数完全分片
在 1D 网格(FSDP)中通过`(Shard(0),)`放置。如果为 2D,
然后参数在第一维上分片并在副本中复制
跨过 0 维(HSDP)使用`(Replicate(), Shard(0))`
放置。网格的设备类型指定了使用的设备类型。
通信;如果是一个 CUDA 或 CUDA 类似设备类型,那么我们使用
当前设备。
reshard_after_forward(Union[bool, int]):这控制参数
行为在前进后可以权衡内存和通信:

如果为 ``True``,则在正向传播后重新分片参数并在反向传播中重新收集。
如果为 ``False``,则在正向传播后保持未分片的参数在内存中,并在反向传播中避免所有收集。
如果为 ``False``,则在正向传播后保持未分片的参数在内存中。
并在反向传播中避免所有收集。
如果是整数,则表示要重新分片的世界大小
在前向操作之后。它应该是“网格”的非平凡约数
分片维度大小(即不包括 1 和维度大小本身)。一个
选择可能是节点内大小(例如 ``torch.cuda.device_count()``)。
这允许向后全聚集覆盖的范围更小
以更高的内存使用量为代价来设置大小。
- 根 FSDP 状态的值被特别设置为 ``False``
启发式方法,因为其参数通常会被立即
向后汇总。
- 前向之后,注册到模块的参数取决于
这:注册的参数是分片参数如果
``True``;非分片参数如果``False``;以及参数
否则重新划分到较小的网格。要修改参数
在正向和反向之间,必须注册相同的参数
划分的参数。对于 ``False`` 或 ``int``,可以通过手动重新划分来实现::meth:`reshard`。
通过 :meth:`reshard` 手动重新划分。
shard_placement_fn(可选[Callable[[nn.Parameter],Optional[Shard]]]):
此可调用函数可用于覆盖分片放置
参数用于在除 dim-0 以外的维度上划分参数。
此可调用函数返回一个 :class:`Shard` 放置(非 ``None``)
然后 FSDP 将根据该放置进行分片(例如 ``Shard(1)``)。
如果在非零维度上进行分片,我们目前要求进行均匀分片,
即在该维度上张量的维度大小必须能被 FSDP
分片网格大小整除。
mp_policy(混合精度策略):控制该模块的参数/缩减混合精度
策略,为该模块提供参数/缩减混合精度
。请参阅:class:`MixedPrecisionPolicy` 获取详细信息。
offload_policy(卸载策略):控制卸载策略,
提供参数/梯度/优化器状态卸载的功能。请参阅
class:`OffloadPolicy` 及其子类以获取详细信息。
ignored_params: Optional(Set[nn.Parameter]): 我们不想与 FSDP 分片的一组参数。
参数。

返回:
FSDP 模块:应用了 FSDP 的模块(原地)。
```python
# 假设输入文本为:
input_text = '"""'

# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
    # 这里应该调用真实的翻译 API 进行翻译
    # 由于示例中不使用真实的 API,以下为模拟翻译结果
    return text

# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
    如果 isinstance(模块, (nn.模块列表, nn.ModuleDict)):
        提升 ValueError(
            f"fully_shard 不支持不实现 forward 的容器。"{模块}"
        )
    网格 = 网格  _init_default_fully_shard_mesh()
    如果 网格.维数 not  (1, 2):
        提升 ValueError(f"fully_shard 期望一个 1D 或 2D DeviceMesh,但得到了"{网格}")
    elif 网格.维数 == 1:
        网格信息 = FSDPMeshInfo(网格, 碎片网格维度=0)
    否则:
        如果 网格.网格维度名称列表  :
            提升 断言错误(
                请使用指定的 mesh_dim_names 初始化 HSDP 的 2D 网格
            )
        网格信息 = HSDPMeshInfo(网格, 片段网格维度=1, 复制网格维度=0)
    设备 = 从网格获取设备(网格)
    发布前网格信息 = 获取发布前网格信息(
        发布后重新分配, 网格信息
    )

    参数模块 = 模块
    模块 = (
        (模块,) 如果 isinstance(模块, nn.模块) 否则 元组(_获取根模块(模块))
    )
    状态 = 全分片.状态(模块[0])  # 类型:忽略[attr-defined] # 参考[1]
    状态.初始化(模块, 设备, mp 策略)

    管理模块 = _获取管理模块(模块, 忽略参数)
    参数, 缓冲区 = 获取管理状态(管理模块, 忽略的参数)

    将状态移动到设备(参数, 缓冲区, 设备)
    如果 参数:
        状态._fsdp_param_group = FSDP 参数组(
            参数,
            模块,
            网格信息,
            前向网格信息,
            设备,
            shard 放置函数,
            mp 策略,
            转发策略,
        )

    # 适用于 Dynamo
     管理模块  管理模块列表:
        管理模块._是否为 fsdp 管理模块 = 真实  # 类型:忽略[赋值]
        管理模块._fsdp_use_orig_params = 真实  # 类型:忽略[赋值]

    # 将 FSDP 置于方法解析顺序的最左侧以获得最高优先级
     模块  模块:
         = 模块.
        new_cls = 类到类状态深度复制.获取(, )
        如果 not 新类:
            dct = {"__deepcopy__": 未实现深度复制}
            新类 = 类型(fFSDP{.__name__}", (FSDP 模块, ), dct)
            类到 fsdp 类[] = 新类
        模块. = 新类
    返回 参数模块


def _未实现深拷贝(*参数: 任意, **kwargs: 任意) -> 无返回值:
    提升 断言错误(
        "FSDP 不支持深拷贝。请使用状态字典进行序列化。"
    )


 FSDP 模块:
    def __new__(, *参数, **kwargs):
        ""
覆盖 `__new__` 方法以移除 FSDP 类并直接构造原始类,用于索引容器模块等情况。
例如,使用索引 2,因为 0 是动态构造的 `FSDP<...>` 类。
```python
# 假设输入文本为:
input_text = '"""'

# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
    # 这里应该调用真实的翻译 API 进行翻译
    # 由于示例中不使用真实的 API,以下为模拟翻译结果
    return text

# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
        # 使用索引 2,因为 0 是动态构造的 `FSDP<...>` 类
        # 索引 1 是 `FSDPModule` 类本身
        原类 = .__mro__[2]
        self = orig_cls.__new__(orig_cls, *参数, **kwargs)
        自身.初始化(*参数, **kwargs)
        返回 self

[文档] def reshard(self) -> None: """ 重置模块的参数,如果未分配则释放未分片的参数,并将分片参数注册到模块中。此方法**不**是递归的。 如果它们已分配,则释放未分片的参数,并将分片参数注册到模块中。 此方法**不**是递归的。 """ state = self._get_fsdp_state() if fsdp_param_group := state._fsdp_param_group: fsdp_param_group.reshard()
[文档] def unshard(self, async_op: bool = False) -> Optional[UnshardHandle]: ```python # 输入文本 input_text = '"""' # 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API) def translate_to_simplified_chinese(text): # 假设的翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 解耦模块参数,通过分配内存和全部收集 参数。此方法不是递归的。未分片遵循 `:class:`混合精度策略`, 因此它将执行所有-gather 操作 如果设置,则为 ``param_dtype``。 参数: async_op (布尔值): 如果 ``True``,则返回一个 :class:`UnshardHandle`。 该对象具有一个 :meth:`wait` 方法,用于等待未分片操作。 如果为 `False`,则返回 `None` 并等待内部句柄 这个函数。 .. 注意:: 如果 ``async_op=True``,则 FSDP 将等待挂起的 在模块的预转发中解块。用户仅 需要显式调用 :meth:`wait` 方法,如果等待应该发生 在预转发之前。 """ state = self._get_fsdp_state() fsdp_param_group = state._fsdp_param_group if fsdp_param_group is not None: fsdp_param_group.lazy_init() fsdp_param_group.unshard(async_op=async_op) handle = _UnshardHandleImpl(fsdp_param_group) if async_op: return handle handle.wait() 返回 None
[文档] def set_is_last_backward(self, is_last_backward: bool) -> None: """ 设置是否下一个反向是最后一个。在最后一个反向, FSDP 等待待处理的梯度减少并清除内部数据 用于向后预取的数据结构。这可以用于微批处理。 微批处理。 """ state = self._get_fsdp_state() state._state_ctx.is_last_backward = is_last_backward
[文档] def set_requires_gradient_sync( self, requires_gradient_sync: bool, *, recurse: bool = True ) -> None: """ 设置模块是否同步梯度。这可以用来实现 梯度累积*不进行通信*。对于 HSDP,这控制着 同时使用 reduce-scatter 和 all-reduce。这是等价的。 `no_sync` 在 FSDP1. 参数: requires_gradient_sync (bool): 是否减少梯度 模块的参数。 递归(布尔值):是否为所有 FSDP 子模块设置,还是仅对传入的模块设置。 传入的模块。 """ self_module = cast(nn.Module, self) modules = list(self_module.modules()) if recurse else [self_module] for module in modules: if isinstance(module, FSDPModule): state = module._get_fsdp_state() if fsdp_param_group := state._fsdp_param_group: fsdp_param_group.reduce_grads = requires_gradient_sync fsdp_param_group.all_reduce_grads = requires_gradient_sync
[文档] def set_requires_all_reduce( self, requires_all_reduce: bool, *, recurse: bool = True ) -> None: """ 设置模块是否应该全量梯度归一化。这可以用于 实现仅使用 reduce-scatter 而不使用梯度累积 all-reduce for HSDP. ```python # 输入文本 input_text = '"""' # 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API) def translate_to_simplified_chinese(text): # 假设的翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` self_module = cast(nn.Module, self) modules = list(self_module.modules()) if recurse else [self_module] for module in modules: if isinstance(module, FSDPModule): state = module._get_fsdp_state() if fsdp_param_group := state._fsdp_param_group: fsdp_param_group.all_reduce_grads = requires_all_reduce
[文档] def set_reshard_after_backward( self, reshard_after_backward: bool, *, recurse: bool = True ) -> None: """ 设置模块是否在反向传播后重新分配参数。这可以 在梯度累积过程中可以使用,以牺牲更高的内存为代价来减少通信,因为未分片的参数在下一个前向传播之前不需要重新全局聚合。 减少了通信,因为未分片的参数在下一个前向传播之前不需要重新全局聚合。 在下一个前向传播之前不需要重新全局聚合。 参数: reshard_after_backward (bool): 是否在反向操作后重新分配参数 backward. recurse (bool): 是否为所有 FSDP 子模块设置,还是仅针对传入的模块 passed-in module. """ self_module = cast(nn.Module, self) modules = list(self_module.modules()) if recurse else [self_module] for module in modules: if isinstance(module, FSDPModule): state = module._get_fsdp_state() if fsdp_param_group := state._fsdp_param_group: fsdp_param_group.reshard_after_backward = reshard_after_backward
[文档] def set_modules_to_forward_prefetch(self, modules: 列表[FSDPModule]) -> None: ```python # 输入文本 input_text = '"""' # 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API) def translate_to_simplified_chinese(text): # 假设的翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 设置此 FSDP 模块应显式设置的 FSDP 模块 预取正向所有汇聚。预取操作在此之后运行。 模块的全局收集输出。 传递包含下一个 FSDP 模块的单例列表会产生相同的效果 所有汇聚重叠行为作为默认重叠行为,除了 预取的全局收集操作比 CPU 提前发出。传递一个列表 至少长度为两个的才需要更激进的覆盖 将使用更多保留内存。 Args: 模块(List[FSDPModule]):预取的 FSDP 模块。 """ _assert_all_fsdp_modules(模块) self._get_fsdp_state()._states_to_forward_prefetch = [ [module._get_fsdp_state() for module in 模块] ]
[文档] def set_modules_to_backward_prefetch(self, modules: list[FSDPModule]) -> None: """ 设置此 FSDP 模块应显式预取的 FSDP 模块 预取所有收集项的反向。这覆盖了默认的反向 预取实现,根据反向后前顺序预取下一个 FSDP 模块 通过传递包含前一个 FSDP 模块的单例列表来给出 传递一个包含前一个 FSDP 模块的单例列表 与默认重叠行为相同的所有-gather 重叠行为。 传递一个长度至少为二的列表是必需的,以实现更激进的 重叠并会使用更多预留内存。 参数: modules (List[FSDPModule]): 预取的 FSDP 模块。 """ _assert_all_fsdp_modules(modules) self._get_fsdp_state()._states_to_backward_prefetch = [ module._get_fsdp_state() for module in modules ]
[文档] def set_all_reduce_hook( self, hook: 可调用[torch.Tensor],None 类型的函数, *, stream: 可选的 torch.cuda.Stream 对象 = None, ): ```python # 输入文本 input_text = '"""' # 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API) def translate_to_simplified_chinese(text): # 假设的翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 参数: hook (Callable[[torch.Tensor], None]): 用户定义的全量减少钩子 使用预期的签名 `hook(reduce_output: torch.Tensor) -> None` 如果仅使用 FSDP,则`reduce_output`是 reduce-scatter 输出;如果使用原生 HSDP,则是 all-reduce 输出。 流(Optional[torch.cuda.Stream]):在其中运行 all-reduce 钩子。如果未使用原生 HSDP,则应设置此参数。 流(Optional[torch.cuda.Stream]):在其中运行 all-reduce 钩子。如果未使用原生 HSDP,则应设置此参数。 流(Optional[torch.cuda.Stream]):在其中运行 all-reduce 钩子。如果未使用原生 HSDP,则应设置此参数。 使用原生 HSDP,钩子将在内部定义的 all-reduce 流中运行 使用原生 HSDP 的 all-reduce 流 """ state = self._get_fsdp_state() if (fsdp_param_group := state._fsdp_param_group) is not None: fsdp_param_group._all_reduce_hook = hook if stream is not None: if fsdp_param_group._is_hsdp: 引发 ValueError("使用原生 HSDP 时无法设置流") fsdp_param_group._all_reduce_hook_stream = stream
[文档] def set_post_optim_event(self, event: torch.Event) -> None: 设置优化后事件 ```python # 输入文本 input_text = '"""' # 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API) def translate_to_simplified_chinese(text): # 假设的翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 为根 FSDP 模块设置后优化步骤事件,以便等待所有-gather 流开启。 默认情况下,根 FSDP 模块会在当前流上等待所有-gather 流开启,以确保优化步骤完成后再进行。 默认情况下,根 FSDP 模块会在当前流上等待所有-gather 流开启,以确保优化步骤完成后再进行。 默认情况下,根 FSDP 模块会在当前流上等待所有-gather 流开启,以确保优化步骤完成后再进行。 然而,这可能会引入虚假依赖。然而,如果在优化步骤之后有无关的计算,则可能引入虚假依赖。 此 API 允许用户提供他们自己的等待事件。在等待根事件后,事件将被丢弃,因此此 API 应该 允许用户等待他们自己的事件。在等待根事件后,该事件将被丢弃,因此此 API 应该 允许用户等待他们自己的事件。在等待根事件后,该事件将被丢弃,因此此 API 应该 每次迭代都会调用一个新的事件。 Args: 事件(event):在优化器步骤后记录的事件 用于等待所有-gather 流的。 """ self._get_fsdp_state()._state_ctx.post_optim_event = event
[文档] def set_reduce_scatter_divide_factor(self, factor: float) -> None: 设置减少分散除法因子(self, 因子: 浮点数) -> None: ```python # 输入文本 input_text = '"""' # 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API) def translate_to_simplified_chinese(text): # 假设的翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 设置 reduce-scatter 的自定义除数因子。这成为 自定义使用 NCCL 的 PreMulSum 的 reduce 操作,允许乘以 减少前的系数。 Args: factor (float): 自定义除数因子。 """ state = self._get_fsdp_state() if (fsdp_param_group := state._fsdp_param_group) is not None: mul_factor = 1.0 / float(factor) reduce_op = torch.distributed._make_nccl_premul_sum(mul_factor) fsdp_param_group.reduce_scatter_reduce_op = reduce_op
[文档] def set_unshard_in_backward(self, unshard_in_backward: bool) -> None: """ 设置 FSDP 模块的参数是否需要在反向传播中进行去分片 向后。这可以在用户知道此 FSDP 模块的参数组中所有参数都不用于反向计算(例如嵌入)的专家情况下使用。 在此 FSDP 模块的参数组中,这些参数都不用于反向计算(例如嵌入)。 用于反向计算(例如嵌入)。 """ state = self._get_fsdp_state() if (fsdp_param_group := state._fsdp_param_group) is not None: fsdp_param_group.unshard_in_backward = unshard_in_backward
def _set_unshard_async_op(自身
, async_op: 布尔): "" 设置是否使用 `async_op=True` 或 `False` 进行预转发 并且预向后未分片操作。此默认为“False”,但可以设置 使用此方法将值设置为“True”。 将此设置为 `True` 允许所有全局收集分配发生 默认流,避免跨流内存碎片化。 然而,您必须使用显式预取(例如通过:meth:`unshard`) 来获取重叠,并且像数据类型转换和复制-入这样的预所有-gather 操作 将不会与计算重叠。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 自定义模块 = 角色(nn.模块, 自身) 模块 自定义模块.模块(): 如果 isinstance(模块, FSDP 模块): 状态 = 模块._获取 Fsdp 状态() 如果 Fsdp 参数组 := 状态._fsdp 参数组: fsdp 参数组.解散异步操作 = async_op def _获取 fsdp 状态(自身) -> FSDP 状态: 如果 (状态 := 获取模块 FSDP 状态(角色(nn.模块, 自身))) : 提升 断言错误(f在此处未找到 FSDP 状态{自身}") 返回 状态 def 应用(自身, *参数: 任意, **kwargs: 任意) -> 任意: # 重新分片以确保分片参数被注册 自身.重新分片() 返回 = 超级().应用(*参数, **kwargs) # 类型:忽略[杂项] 状态 = 自身._get_fsdp_state() 如果 not (fsdp_param_group := 状态._fsdp 参数组): 返回 返回 # TODO: 一旦 DTensor 填充本地张量,则移除此填充逻辑 # https://github.com/pytorch/pytorch/issues/113045 火炬.不梯度(): fsdp 参数 fsdp 参数组.fsdp 参数列表: fsdp_param.重置分片参数() 返回 返回 UnshardHandle: "" 等待一个 :meth:`FSDPModule.unshard` 操作的句柄。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ```
[文档] def wait(self) -> None: "" 等待未分片操作。这确保当前流可以使用 未分片的参数,这些参数现在已注册到模块中。 "" 返回
_UnshardHandleImpl(UnshardHandle): def 初始化
(自身, fsdp_param_group: 可选[FSDPParamGroup)] 自身._fsdp 参数组 = fsdp 参数组 def 等待(自身): 如果 自身._fsdp 参数组 not : 自身._fsdp 参数组.等待解分片() 避免保留引用 自身._fsdp_param_group = None def 注册 fsdp 前向方法(模块: nn.模块, 方法名称: 字符串) -> : "" 在 ``module`` 上注册一个方法,使其被视为 FSDP 的前向方法 FSDP. FSDP 所有聚集参数预前向和可选释放参数 post-forward(取决于`reshard_after_forward`)。FSDP 只知道 默认情况下,为 :meth:`nn.Module.forward` 执行此操作。此函数修补 用户指定的方法在运行前后/前后的预/后钩子 方法,分别。如果 ``module`` 不是一个 :class:`FSDPModule`,则 这是一个空操作。 参数: module (nn.Module):注册前向方法的模块。 method_name (str):前向方法名称。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 如果 not isinstance(模块, FSDP 模块): # 禁用空操作以允许在使用/不使用 FSDP 时同时包含 返回 如果 not 有属性(模块, 方法名称): 提升 ValueError(f"{类型(模块)}没有方法{方法名称}") 原方法 = getattr(模块, 方法名称) @functools.包装(原始方法) def 包装方法(自身, *参数, **kwargs): fsdp 状态 = 自身._获取_fsdp 状态() 参数, kwargs = fsdp 状态._pre_forward(自身, 参数, kwargs) out = 原始方法(*参数, **kwargs) 返回 fsdp 状态._post_forward(自身, 参数, 输出) 使用 `__get__` 使 `wrapped_method` 成为实例方法 setattr( 模块, 方法名称, wrapped_method.__get__(模块, 类型(模块)), # 类型:忽略[未定义] ) def _assert_all_fsdp_modules(模块: 迭代器[任意]) -> : 模块 模块: 如果 not isinstance(模块, FSDP 模块): 提升 ValueError(f"期望是 FSDPModule 但得到{类型(模块)}: {模块}")

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源