# mypy: 允许未类型化定义
r实现随机权重平均的实现。
导入 itertools
导入
数学
导入
警告
from collections.abc 导入
迭代器
from 复制
导入
深拷贝
from 打字
导入
任何,
可调用,
直接,
可选,
联合
导入
火炬
from 火炬
导入
张量
from torch.nn 导入
模块
from torch.optim.lr_scheduler 导入 _format_param, LRScheduler
from torch.utils._foreach_utils 导入
_get_foreach_kernels_supported_devices
获取每个内核支持的所有设备
from .优化器
导入
优化器
全部 = [
平均模型,
更新_bn,
SWALR,
获取_ema_多平均_fn,
获取_swa_多平均_fn,
get_ema_avg_fn,
get_swa_avg_fn,
]
from torch.utils._foreach_utils 导入 _group_tensors_by_device_and_dtype
PARAM_LIST = 联盟[
元组[
张量, ...],
列表[
张量]]
[docs]def get_ema_multi_avg_fn(decay=0.999):
"""获取应用于多个参数的指数移动平均(EMA)函数。"""
if decay < 0.0 or decay > 1.0:
raise ValueError(
f"提供的衰减值 {decay} 无效。请提供一个在 [0,1] 范围内的值。"
)
@torch.no_grad()
def ema_update(ema_param_list: PARAM_LIST, current_param_list: PARAM_LIST, _):
# foreach lerp 只处理浮点数和复数
if torch.is_floating_point(ema_param_list[0]) or torch.is_complex(
ema_param_list[0]
):
torch._foreach_lerp_(ema_param_list, current_param_list, 1 - decay)
else:
for p_ema, p_model in zip(ema_param_list, current_param_list):
p_ema.copy_(p_ema * decay + p_model * (1 - decay))
返回 ema_update
定义 get_swa_multi_avg_fn():
获取应用于多个参数的随机权重平均(SWA)函数
@torch.不梯度()
定义
软件更新(
平均参数列表:
参数列表,
当前参数列表:
参数列表,
平均数:
联盟[
张量,
整数
]
):
# foreach lerp 仅处理浮点数和复数
如果 torch.is_floating_point(
平均参数列表[0])
或者 torch.
是复杂的(
平均参数列表[0]
):
torch._foreach_lerp_(
平均参数列表,
当前参数列表, 1 / (
平均数 + 1)
)
else:
差分 = torch.
遍历子项(
当前参数列表,
平均参数列表)
如果 isinstance(
平均数,
张量):
torch._foreach_addcdiv_(
平均参数列表,
差分,
[平均数 + 1] *
长度(
平均参数列表),
)
else:
torch._foreach_add_(
平均参数列表,
差分,
阿尔法=1.0 / (
平均数 + 1)
)
返回
SWA 更新
定义 get_ema_avg_fn(
衰减=0.999):
获取应用指数移动平均(EMA)的单个参数的函数。
如果
衰减 < 0.0
或者
衰减 > 1.0:
raise ValueError(
f"无效的衰减值"{
衰减}
提供的值无效。请提供一个在[0,1]范围内的值。
)
@torch.不梯度()
定义
指数移动平均更新(
指数移动平均参数:
张量,
当前参数:
张量,
平均数个数):
返回
衰减 *
指数移动平均参数 + (1 -
衰减) *
当前参数
返回
ema 更新
定义
获取 swa 平均函数():
获取应用随机权重平均(SWA)的单个参数的函数
@torch.不梯度()
定义
swa 更新(
平均参数:
张量,
当前参数:
张量,
平均数:
联盟[
张量,
整数]
):
返回
平均参数 + (
当前参数 -
平均参数) / (
平均数 + 1)
返回
SWA 更新
[文档]
类
平均模型(
模块):
r实现了随机权重平均(SWA)和指数移动平均(EMA)的平均模型。
随机权重平均由 Pavel Izmailov 和 Dmitrii 在《平均权重导致更宽的优化范围和更好的泛化》_ 一文中提出。
_见《平均权重导致更宽的优化范围和更好的泛化》_。
波多普里欣,蒂穆尔·加里波夫,德米特里·维特罗夫和安德鲁·戈登·威尔逊
(UAI 2018)。
指数移动平均是`Polyak 平均`的一种变体,
但使用指数权重而不是迭代过程中的等权重。
AveragedModel 类创建了提供的模块:attr:`model`的副本
在设备:attr:`device`上,并允许计算模型的运行平均值
的参数。
参数:
模型(torch.nn.Module):用于 SWA/EMA 的模型
设备(torch.device,可选):如果提供,平均模型将
存储在`:attr:`设备`上
avg_fn(函数,可选):用于更新的平均函数
参数;函数必须接受当前值的参数
`AveragedModel` 参数,当前 :attr:`model` 的值
参数,以及已经平均的模型数量;如果为 None,则
使用等权平均(默认:None)
multi_avg_fn(函数,可选):用于更新平均的函数
参数就地;函数必须接受当前值
`:class:`AveragedModel` 参数作为列表,:attr:`model` 的当前值
参数作为列表,以及已平均的模型数量;如果为 None,
一个等权平均被使用(默认:无)
use_buffers (布尔值): 如果设置为 ``True``,则会对模型的参数和缓冲区计算运行平均值。
(默认:``False``)
示例:
>>> # xdoctest: +SKIP("未定义变量")
>>> loader, optimizer, model, loss_fn = ...
>>> swa_model = torch.optim.swa_utils.AveragedModel(model)
>>> 调度器 = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer,
>>> T_max=300)
>>> swa_start = 160
>>> swa_scheduler = SWALR(optimizer, swa_lr=0.05)
>>> for i in range(300):
>>> for input, target in loader:
>>> optimizer.zero_grad()
>>> loss_fn(model(input), target).backward()
>>> 优化器执行步骤()
>>> 如果 i > swa_start:
>>> swa_model 更新参数(model)
>>> swa_scheduler 执行步骤()
>>> else:
>>> scheduler.step()
...
>>> # 更新 swa_model 的 bn 统计信息
>>> torch.optim.swa_utils.update_bn(loader, swa_model)
您也可以使用自定义平均函数,通过`avg_fn`或`multi_avg_fn`参数。
如果没有提供平均函数,则默认计算
等权平均权重(SWA)。
示例:
>>> # xdoctest: +SKIP("未定义变量")
计算权重和缓冲区的指数移动平均值
>>> ema_model = torch.optim.swa_utils.AveragedModel(model,
>>> torch.optim.swa_utils.get_ema_multi_avg_fn(0.9), use_buffers=True)
.. 注意::
当使用 SWA/EMA 与包含批量归一化的模型时,您可能需要更新批量归一化的激活统计信息。
需要更新批量归一化的激活统计信息。
这可以通过使用 :meth:`torch.optim.swa_utils.update_bn` 来完成
或通过将`:attr:`use_buffers`设置为`True`。第一种方法更新了
在训练后步骤通过将数据传递给模型进行统计。
在参数更新阶段通过平均所有缓冲区来完成。
实证研究表明,更新归一化层的统计信息可以提高准确性,但您可能需要通过实验来测试哪种方法在您的问题中效果最佳。
`avg_fn` 和 `multi_avg_fn` 并未保存在模型的 `state_dict` 中。
在您的问题中,您可能需要通过实验来测试哪种方法效果最佳。
.. 注意::
`avg_fn` 和 `multi_avg_fn` 并未保存在模型的 `state_dict` 中。
.. 注意::
当第一次调用 `update_parameters` 方法时(即:
:n_averaged 为 `0`) `model` 的参数被复制
到 :class:`AveragedModel` 的参数。对于后续的
调用::meth:`update_parameters` 函数 `avg_fn` 被使用
更新参数。
.. _平均权重导致更宽的优化范围和更好的泛化能力:
https://arxiv.org/abs/1803.05407
.. _关于未标记数据的许多一致解释:为什么你应该
平均:
https://arxiv.org/abs/1806.05594
随机权重平均在低精度训练中的应用
https://arxiv.org/abs/1904.11943
并行随机权重平均:大批量训练,泛化良好
通用性良好:
https://arxiv.org/abs/2001.02312
.. _Polyak 平均:
https://paperswithcode.com/method/polyak-averaging
"文档"
平均化:
张量
定义 __init__(
我,
模型:
模块,
设备:
可选[
联盟[
整数, torch.
设备]] =
无,
平均函数:
可选[
可调用[[
张量,
张量,
联盟[
张量,
整数]],
张量]] =
无,
多重平均函数:
可选[
可调用[[
参数列表,
参数列表,
联盟[
张量,
整数]],
无]
] = 无,
使用缓冲区=
错误,
): # 无需注意:D107
超级().__init__()
断言 (
平均函数
是
无
或者
多平均函数
是
无
), "只能提供 avg_fn 和 multi_avg_fn 中的一个"
我.
模块 =
深拷贝(
模型)
如果
设备
是
不
无:
我.
模块 =
我.
模块.
到(
设备)
我.
注册缓冲区(
"平均数", torch.
张量(0,
数据类型=torch.
长,
设备=
设备)
)
我.
平均函数 =
平均函数
我.
多平均函数 =
多平均函数
我.
使用缓冲区 =
使用缓冲区
[文档] def forward(self, *args, **kwargs):
"""正向传播。”
return self.module(*args, **kwargs)
[文档]
定义
更新参数(
我,
模型:
模块):
更新模型参数。
自定义参数 = (
itertools.chain(我.
模块.
参数(),
我.
模块.
缓冲区())
如果
我.
使用缓冲区
否则
我.
参数()
)
模型参数 = (
itertools.chain(模型.
参数(),
模型.
缓冲区())
如果
我.
使用缓冲区
否则
模型.
参数()
)
自定义参数分离:
列表[
可选[
张量]] =
输入文本为空,请提供需要翻译的文本
模型参数分离:
列表[
可选[
张量]] =
输入文本为空,请提供需要翻译的文本
for 平均化_p,
模型_p
在 zip(
自定义参数,
模型参数):
p 模型_ =
p 模型.detach().
到(
p_平均.
设备)
自参数分离.append(
平均化_p.detach())
模型参数分离.append(
p_模型_)
如果
我.
n_平均化 == 0:
平均化_p.detach().
复制_(
模型_p_)
如果
我.
标准化_n > 0:
如果
我.
多重平均函数_multi_avg_fn
是
不
无
或者
我.avg_fn
是
无:
分组张量 =
按设备类型和数据类型分组张量(
[self_param_detached, model_param_detached]
)
for (设备, _), (
[self_params, 模型参数
]
_,
) 在
分组张量.
项目():
如果
我.
多平均函数:
我.
多平均函数(
自定义参数,
模型参数,
我.
平均数.
到(
设备) # type: ignore[arg-type]
)
elif (
设备
是
不
无
和
设备.
类型
在
获取支持的遍历内核设备()
):
多次平均函数 =
获取 SWA 多次平均函数()
多平均函数(
自定义参数,
模型参数,
我.
平均次数.
到(
设备)
)
else:
avg_fn = get_swa_avg_fn()
n_averaged = 我.n_averaged.
到(
设备)
for 平均化_p,
模型_p
在 zip(
自定义参数,
模型参数):
# 类型:忽略[赋值]
平均化.
复制_(
平均函数(
平均化,
模型化,
平均化))
else:
for 加权平均化,
p 模型
在 zip(
# 类型:忽略[赋值]
自参数分离,
模型参数分离
):
平均次数 =
我.
平均次数.
到(
平均概率.
设备)
平均化.detach().
复制_(
我.
平均函数(
平均化.detach(),
模型化,
平均化)
)
如果
不
我.
使用缓冲区:
# 如果不应用运行平均到缓冲区,
# 保持缓冲区与源模型同步。
for b_swa, b_model 在 zip(
我.
模块.
缓冲区(),
模型.
缓冲区()):
b_swa.detach().复制_(b_model.detach().
到(b_swa.
设备))
我.
平均化 += 1
[文档]@torch.
不梯度()
定义
更新 bn(
加载器:
迭代器[
任何
]
模型:
模块,
设备:
可选[
联盟[
整数, torch.
设备]] =
无,
):
r"""更新模型中 BatchNorm 的 running_mean 和 running_var 缓冲区。
它对`loader`中的数据进行一次遍历,以估计模型中 BatchNorm 层的激活统计信息。
loader (torch.utils.data.DataLoader):用于计算激活统计信息的数据集加载器。每个数据批次应该是以下之一。
参数:
loader (torch.utils.data.DataLoader):数据集加载器,用于在上述数据上计算激活统计信息。
每个数据批次应该是以下之一。
张量,或是一个列表/元组,其第一个元素是张量
包含数据。
模型(torch.nn.Module):我们想要更新 BatchNorm 统计信息的模型
的统计信息。
设备(torch.device,可选):如果设置,数据将被传输到
:attr:`device` 在传递给 :attr:`model` 之前。
示例:
>>> # xdoctest: +忽略("未定义变量")
>>> 加载器,模型 = ...
>>> torch.optim.swa_utils 更新 bn(loader, model)
.. 注意::
`update_bn` 工具假定每个数据批次在 :attr:`loader` 中
要么是张量,要么是张量列表或元组;在后一种情况下,假定应该对列表或元组中的第一个元素调用 :meth:`model.forward()`,以对应数据批次。
应该在列表或元组中的第一个元素上调用 :meth:`model.forward()`,以对应数据批次。
对应数据批次的列表或元组中的第一个元素。
"文档"
动量 = {}
for 模块
在
模型.
模块():
如果 isinstance(
模块, torch.
神经网络.
模块.
批标准化.
_批标准化):
模块.
重置运行统计量()
动量[
模块] =
模块.
动量
如果
不
动量:
返回
正在训练 =
模型.
训练
模型.
训练()
for 模块
在
动量.
键():
模块.
动量 =
无
for 输入
在
加载器:
如果 isinstance(
输入, (
列表,
元组)):
输入 =
输入[0]
如果
设备
是
不
无:
输入 =
输入.
到(
设备)
模型(
输入)
for 布尔模块
在
动量.
键():
布尔模块.
动量 =
动量[
bn 模块]
模型.
训练(
正在训练)
[文档]
类 SWALR(
学习率调度器):
r将每个参数组的学习率调整为固定值。
这种学习率调度器旨在与随机权重平均(SWA)方法(见 `torch.optim.swa_utils.AveragedModel`)一起使用。
SWA 方法(torch.optim.swa_utils.AveragedModel)的平均值。
参数:
优化器(torch.optim.Optimizer):包装的优化器
swa_lrs(浮点数或列表):所有参数组的学习率值
每组一起或分别。
annealing_epochs (int): 缓冷阶段中的 epoch 数量
(默认: 10)
annealing_strategy (str): "cos" 或 "linear"; 指定缓冷策略
策略:"cos" 表示余弦退火,"linear" 表示线性退火
(默认:"cos")
最后一个 epoch(int 类型):最后一个 epoch 的索引(默认:-1)
class:`SWALR` 调度器可以与其他
训练后期切换到恒定学习率的调度器
如以下示例所示。
示例:
>>> # xdoctest: +忽略("未定义变量")
>>> 加载器,优化器,模型 = ...
>>> lr_lambda = lambda epoch: 0.9
>>> scheduler = torch.optim.lr_scheduler.MultiplicativeLR(optimizer,
>>> lr_lambda=lr_lambda)
>>> swa_scheduler = torch.optim.swa_utils.SWALR(optimizer,
>>> anneal_strategy="linear", anneal_epochs=20, swa_lr=0.05)
>>> swa_start = 160
>>> for i in range(300):
>>> for input, target in loader:
>>> optimizer.zero_grad()
>>> 损失函数(model(input), target).backward()
>>> 优化器.step()
>>> 如果 i > swa_start:
>>> swa_scheduler.step()
>>> else:
>>> 调度器步骤执行()
.. _平均权重导致更宽的优化范围和更好的泛化能力:
https://arxiv.org/abs/1803.05407
"文档"
定义 __init__(
我,
优化器:
优化器,
swa_lr: float,
退火轮数=10,
退火策略:
直接[
余弦,
线性] =
余弦,
最后一个纪元=-1,
): # 无需注意:D107
SWA 学习率 = _format_param(
SWA 学习率,
优化器,
SWA 学习率)
for swa_lr, 组
在 zip(swa_lrs,
优化器.
参数组):
群组[
swa_lr] = swa_lr
如果 anneal_strategy
不
在 [
余弦,
线性]:
raise ValueError(
"anneal_strategy 必须为'cos'或'linear'之一,"
f"而不是"{
退火策略}"
)
elif anneal_strategy == 余弦:
我.anneal_func =
我._cosine_anneal
elif anneal_strategy == 线性:
我.anneal_func =
我.
_线性退火
如果
不 isinstance(anneal_epochs,
整数)
或者 anneal_epochs < 0:
raise ValueError(
f"anneal_epochs 必须等于或大于 0,实际得到"{anneal_epochs}"
)
我.anneal_epochs = anneal_epochs
超级().__init__(
优化器,
最后一个纪元)
@staticmethod
定义
_线性退火(t):
返回 t
@staticmethod
定义
_余弦退火(t):
返回 (1 -
数学.
余弦(
数学.
圆周率 * t)) / 2
@staticmethod
定义
_获取初始学习率(
学习率,
swa 学习率,
阿尔法):
如果 alpha == 1:
返回 swa_lr
返回 (
左右 - alpha * swa_lr) / (1 -
阿尔法)
[文档] def get_lr(self):
"""获取学习率。"""
`_get_lr_called_within_step` 仅在 `_enable_get_lr_call` 中可用,
因此在这里忽略类型错误。详见 `LRScheduler.step()`。
如果 not self._get_lr_called_within_step:
warnings.warn(
"获取调度器计算出的最后一个学习率,请使用 `get_last_lr()`。"
"请使用 `get_last_lr()` 获取。"
用户警告
)
在 `LRScheduler._initial_step()` 中设置
step = self._step_count - 1
if self.anneal_epochs == 0:
step = max(1, step)
prev_t = max(0, min(1, (step - 1) / max(1, self.anneal_epochs)))
prev_alpha = self.anneal_func(prev_t)
prev_lrs = [
self._get_initial_lr(group["lr"], group["swa_lr"], prev_alpha)
for group in self.optimizer.param_groups
]
t = max(0, min(1, step / max(1, self.anneal_epochs)))
alpha = self.anneal_func(t)
return [
group["swa_lr"] * alpha + lr * (1 - alpha)
for group, lr in zip(self.optimizer.param_groups, prev_lrs)
]