# mypy: 允许未类型化定义
r学习率调度器。
导入
数学
导入
类型
导入
警告
from 二分查找
导入 bisect_right
from 集合
导入
计数器
from collections.abc 导入
迭代器,
序列
from functools 导入
偏函数,
包装
from 打字
导入 (
任何,
可调用,
角色,
直接,
可选,
支持浮点数,
类型字典,
并集,
)
from 弱引用
导入
分支
from 火炬
导入
无穷,
张量
from .优化器
导入
优化器
全部 = [
"LambdaLR",
"乘性学习率",
"步长学习率",
"多步长学习率",
"恒定学习率",
线性学习率调整,
指数学习率调整,
顺序学习率调整,
余弦退火学习率调整,
"链式调度器",
"学习率在平台期减少",
"循环学习率",
"余弦退火重启",
"OneCycleLR",
"多项式 LR",
"学习率调度器",
]
EPOCH 弃用警告 = (
"在 `scheduler.step()` 中的 epoch 参数是不必要的,现在已被弃用。请使用 `scheduler.step()` 来执行调度。"
"尽可能使用 `scheduler.step()`。在弃用过程中,如果 epoch 与 None 不同,将使用封闭形式而不是新的链式形式,如果可用。"
"如果 epoch 与 None 不同,在弃用期间将使用封闭形式而不是新的链式形式,如果可用。"
"如果可用,在弃用期间将使用封闭形式而不是新的链式形式,如果 epoch 与 None 不同。"
"请创建一个工单,如果您无法复现您的情况:"
"https://github.com/pytorch/pytorch/issues/new/choose."
)
定义 _format_param(
名称:
字符串,
优化器:
优化器,
参数):
"返回每个参数组的正确格式 lr/momentum。"
定义
_复制(_param):
返回 _param.
克隆()
如果 isinstance(_param,
张量)
否则
_参数
如果 isinstance(
参数, (
列表,
元组)):
如果
长度(
参数) !=
长度(
优化器.
参数组):
提升 ValueError(
f"{名称}
必须与 optimizer.param_groups 的长度相同。
f"{名称}
具有{
长度(
参数)}
values, param_groups 具有{
长度(
优化器.
参数组)}
。
)
else:
参数 = [
参数] *
长度(
优化器.
参数组)
返回
列表(
地图(
_副本,
参数))
[文档]
类 LRScheduler:
r调整优化过程中的学习率。
_在步骤内调用_get_lr_called:
布尔值 =
假
定义 __init__(
自身,
优化器:
优化器,
最后一个 epoch:
整型 = -1,
): # 无需注意:D107
# 附带优化器
如果
不 isinstance(
优化器,
优化器):
提升
类型错误(f"{
类型(
优化器).__name__}
这不是一个优化器")
自身.
优化器 =
优化器
初始化纪元和基础学习率
如果
上一个纪元 == -1:
for 群组
在
优化器.
参数组:
初始学习率 =
群组["lr"]
如果 isinstance(
初始学习率,
张量):
初始学习率 =
初始学习率.
克隆()
群组.setdefault(
"初始学习率",
初始学习率)
else:
for i, 群组
在
列举(
优化器.
参数组):
如果
"初始学习率"
不
在
群组:
提升
键错误(
"未指定 '初始学习率' 参数"
f"在恢复优化器时,在 param_groups["{i}
] 中
)
自身.
基础学习率:
列表[float] = [
群组[
初始学习率] for
群组
在
优化器.
参数组
]
自身.
上一个纪元 =
上一个纪元
# 请参考 https://github.com/pytorch/pytorch/issues/20124
# 我们希望确保在调用 `lr_scheduler.step()` 之后
`optimizer.step()`
定义 patch_track_step_called(opt:
优化器):
如果
有属性(opt.
步长,
"由 lr_sched 包装"):
我们已经修复了
返回
选项.
步骤
定义
包装步骤(step_fn):
opt_ref = ref(自身.optimizer)
函数 = step_fn.__func__
@wraps(函数)
定义
包装器(*
参数, **kwargs):
选项 =
选项引用()
选项.
_选项调用_ =
真实
# 类型:忽略[联合属性]
返回
函数.__get__(
选择,
选择.
类)(*
参数, **kwargs)
包装器.
由 lr_sched 包装 =
真实
# 类型: 忽略[attr-defined]
返回
包装器
优化.
步骤 =
包装步长(
选项.
步长)
# 类型:忽略[方法分配]
调用补丁跟踪步骤(
自身.
优化器)
自身.
_初始步骤()
定义
_初始步骤(
自身):
"""初始化步骤计数并执行一个步骤。"""
自身.
_步骤计数 = 0
自身.
步长()
[文档] def state_dict(self):
返回调度器的状态作为一个 :class:`dict`。
它包含 self.__dict__中每个变量的条目
不是优化器。
```python
# 输入文本
input_text = '"""'
# 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 假设的翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
return {
key: value for key, value in self.__dict__.items() if key != "optimizer"
}
[文档] def load_state_dict(self, state_dict: dict[str, Any]):
加载调度器的状态。
参数:
state_dict (dict): 调度器状态。应该是一个从调用 :meth:`state_dict` 返回的对象。
应该从调用 :meth:`state_dict` 获取。
"""
self.__dict__.update(state_dict)
[文档] def get_last_lr(self) -> list[float]:
"""返回当前调度器计算的最后学习率。"""
return self._last_lr
[文档] def get_lr(self) -> list[float]:
使用调度器的链式形式计算学习率。
抛出未实现异常。
[文档] def step(self, epoch: Optional[int] = None):
"""执行一个步骤。”
# 如果检测到旧模式,则发出警告
# https://github.com/pytorch/pytorch/issues/20124
if self._step_count == 1:
if not hasattr(self.optimizer.step, "_wrapped_by_lr_sched"):
warnings.warn(
"看起来 `optimizer.step()` 在学习率调度器之后被覆盖了"
初始化。请确保在调用 `optimizer.step()` 之前
调用 `lr_scheduler.step()`。更多详细信息请参阅
https://maskerprc.github.io/docs/stable/optim.html#how-to-adjust-learning-rate
UserWarning
)
# 在两次调用 optimizer.step()之前检查是否有两个 first lr_scheduler.step()调用
elif not getattr(self.optimizer, "_opt_called", False):
warnings.warn(
"检测到在调用 `lr_scheduler.step()` 之前调用了 `optimizer.step()`。"
"在 PyTorch 1.1.0 及以后的版本中,您应该按照相反的顺序调用它们:"
"在 `lr_scheduler.step()` 之前调用 `optimizer.step()`。未能这样做将导致 PyTorch 跳过学习率调度中的第一个值。"
"这将导致 PyTorch 跳过学习率调度中的第一个值。"
查看更多详情请点击
https://maskerprc.github.io/docs/stable/optim.html#如何调整学习率
用户警告
)
self._step_count += 1
with _enable_get_lr_call(self):
if epoch is None:
self.last_epoch += 1
values = self.get_lr()
else:
warnings.warn(EPOCH_DEPRECATION_WARNING, UserWarning)
self.last_epoch = epoch
如果有属性 self._get_closed_form_lr
values = cast(list[float], self._get_closed_form_lr())
else:
values = self.get_lr()
for param_group, lr in zip(self.optimizer.param_groups, values):
if isinstance(param_group["lr"], Tensor):
param_group["lr"].fill_(lr)
else:
param_group["lr"] = lr
self._last_lr: list[float] = [
group["lr"] for group in self.optimizer.param_groups
]
定义
在步骤内部调用获取学习率警告(
学习率调度器:
学习率调度器):
如果
不
学习率调度器.
在步骤内部调用获取学习率:
警告.
警告(
"要获取调度器计算的最后学习率,"
"请使用 `get_last_lr()`。",
用户警告,
栈级别=2,
)
# 包括 _LRScheduler 以实现向后兼容
# 继承而不是赋值,因为我们希望 _LRScheduler 的 __name__ 仍然是 _LRScheduler(赋值会使其变为 LRScheduler)。
类 _LRScheduler(LRScheduler):
通过
类 _enable_get_lr_call:
定义 __init__(
自身, o: LRScheduler):
自身.o = o
定义
__进入__(
自身):
自身.o._get_lr_called_within_step =
真实
返回 self
定义
__退出__(
自身,
类型,
值,
跟踪回溯):
自身.o._get_lr_called_within_step =
假
[文档]
类 LambdaLR(LRScheduler):
设置初始学习率。
将每个参数组的初始学习率设置为初始 lr。
乘以给定的函数。当 last_epoch=-1 时,将初始 lr 设置为 lr。
参数:
优化器(Optimizer):包装的优化器。
lr_lambda(函数或列表):一个计算乘法系数的函数
给定一个整数参数 epoch,或此类列表
函数,每个优化器 param_groups 中的组一个。
上一个纪元的索引。默认值:-1。
示例:
>>> # xdoctest: +SKIP
>>> 假设优化器有两个组。
>>> lambda1 = lambda epoch: epoch // 30
>>> lambda2 = lambda epoch: 0.95 ** epoch
>>> scheduler = LambdaLR(optimizer, lr_lambda=[lambda1, lambda2])
>>> for epoch in range(100):
>>> 训练(...)
>>> 验证(...)
>>> 调度器.step()
"文档"
定义 __init__(
自身,
优化器:
优化器,
学习率 lambda:
并集[
可调用[[int
] float
]
列表[
可调用[[int
] float
]],
上一个 epoch:
整型 = -1,
): # 无需注意:D107
自身.
优化器 =
优化器
自身.
学习率 lambdas:
列表[
可调用[[int
] float]]
如果
不 isinstance(lr_lambda,
列表)
和
不 isinstance(lr_lambda,
元组):
自身.lr_lambdas = [lr_lambda] *
长度(
优化器.
参数组)
else:
如果
长度(
学习率 lambda) !=
长度(
优化器.
参数组):
提升 ValueError(
f预期{
长度(
优化器.
参数组)}
lr_lambdas,但得到了{
长度(lr_lambda)}"
)
自身.lr_lambdas =
列表(lr_lambda)
超级().__init__(
优化器,
上一个纪元)
[文档] def 状态字典(self):
"""返回调度器的状态作为 :class:`dict`。
它包含 self.__dict__ 中每个变量的条目
这不是一个优化器。
学习率 lambda 函数只有在它们是可调用对象时才会被保存。
而不是它们是函数或 lambda 时。
保存或加载调度器时,请确保同时保存或加载优化器的状态。
"""
state_dict = {
键: 值
for key, value in self.__dict__.items()
如果键不在("optimizer", "lr_lambdas")中
}
state_dict["lr_lambdas"] = [None] * len(self.lr_lambdas)
for idx, fn in enumerate(self.lr_lambdas):
如果不是 types.FunctionType 类型:
state_dict["lr_lambdas"][idx] = fn.__dict__.copy()
返回 state_dict
[文档] def load_state_dict(self, state_dict):
加载调度器的状态。
当保存或加载调度器时,请确保同时保存或加载优化器的状态。
参数:
state_dict (dict): 调度器状态。应为返回的对象。
从对 :meth:`state_dict` 的调用中
"""
lr_lambdas = state_dict.pop("lr_lambdas")
self.__dict__.update(state_dict)
恢复 state_dict 键的顺序以防止副作用
https://github.com/pytorch/pytorch/issues/32756
state_dict["lr_lambdas"] = lr_lambdas
for idx, fn in enumerate(lr_lambdas):
if fn is not None:
self.lr_lambdas[idx].__dict__.update(fn)
[文档] def get_lr(self):
计算学习率。
_warn_get_lr_called_within_step(self)
return [
base_lr * lmbda(self.last_epoch)
for lmbda, base_lr in zip(self.lr_lambdas, self.base_lrs)
]
[文档]
类
乘性学习率调整器(
学习率调度器):
将每个参数组的 学习率乘以指定函数中给出的因子。
当 last_epoch=-1 时,设置初始 lr 为 lr。
参数:
优化器(Optimizer):包装后的优化器。
lr_lambda(函数或列表):一个函数,用于根据整数参数 epoch 计算一个乘法
因子,或者是一个这样的列表。
优化器中每个组对应的函数。
last_epoch(整数):最后一个 epoch 的索引。默认值:-1。
示例:
>>> # xdoctest: +SKIP
>>> lmbda = lambda epoch: 0.95
>>> scheduler = MultiplicativeLR(optimizer, lr_lambda=lmbda)
>>> for epoch in range(100):
>>> 训练(...)
>>> 验证(...)
>>> 调度器.step()
"文档"
定义 __init__(
自身,
优化器:
优化器,
学习率 lambda:
并集[
可调用[[int
] float
]
列表[
可调用[[int
] float
]],
上一个 epoch:
整型 = -1,
): # 无需注意:D107
自身.
优化器 =
优化器
自身.
学习率 lambdas:
列表[
可调用[[int
] float]]
如果
不 isinstance(lr_lambda,
列表)
和
不 isinstance(lr_lambda,
元组):
自身.lr_lambdas = [lr_lambda] *
长度(
优化器.
参数组)
else:
如果
长度(
学习率 lambda) !=
长度(
优化器.
参数组):
提升 ValueError(
f预期{
长度(
优化器.
参数组)}
lr_lambdas,但得到{
长度(lr_lambda)}"
)
自身.lr_lambdas =
列表(lr_lambda)
超级().__init__(
优化器,
最后一个 epoch)
[文档] def 状态字典(self):
返回调度器的状态作为一个 :class:`dict`。
它包含 self.__dict__中每个变量的条目
不是优化器。
学习率 lambda 函数只有在它们是可调用对象时才会被保存
并且不是如果它们是函数或 lambda。
"""
state_dict = {
键: 值
for key, value in self.__dict__.items()
if key not in ("optimizer", "lr_lambdas")
}
state_dict["lr_lambdas"] = [None] * len(self.lr_lambdas)
for idx, fn in enumerate(self.lr_lambdas):
if not isinstance(fn, types.FunctionType):
state_dict["lr_lambdas"][idx] = fn.__dict__.copy()
return state_dict
[文档] def 加载状态字典(self, state_dict):
"""加载调度器的状态。
参数:
state_dict (dict): 调度器状态。应为返回的对象
从对 :meth:`state_dict` 的调用中
"""
lr_lambdas = state_dict.pop("lr_lambdas")
self.__dict__.update(state_dict)
恢复 state_dict 键的顺序以防止副作用
https://github.com/pytorch/pytorch/issues/32756
state_dict["lr_lambdas"] = lr_lambdas
for idx, fn in enumerate(lr_lambdas):
如果 fn 不为 None:
self.lr_lambdas[idx].__dict__.update(fn)
[文档] def get_lr(self):
"""计算每个参数组的学习率。"""
_warn_get_lr_called_within_step(self)
if self.last_epoch > 0:
return [
group["lr"] * lmbda(self.last_epoch)
for lmbda, group in zip(self.lr_lambdas, self.optimizer.param_groups)
]
else:
return [group["lr"] for group in self.optimizer.param_groups]
[文档]
类
步长学习率(
学习率调度器):
"每步_size 个 epoch 后,将每个参数组的学习率衰减为 gamma。"
请注意,这种衰减可以与其他外部调度器的学习率变化同时发生。
当 last_epoch=-1 时,将初始 lr 设置为 lr。
参数:
优化器(Optimizer):包装的优化器。
步长(int):学习率衰减的周期。
gamma(float):学习率衰减的乘法因子。
默认:0.1。
last_epoch(int):最后一个纪元的索引。默认:-1。
示例:
>>> # xdoctest: +SKIP
>>> # 假设优化器对所有组使用 lr = 0.05
>>> # lr = 0.05 如果 epoch < 30
>>> # lr = 0.005 如果 30 <= epoch < 60
>>> # lr = 0.0005 如果 60 <= epoch < 90
>>> # ...
>>> 调度器 = StepLR(优化器, 步长=30, gamma=0.1)
>>> for epoch in range(100):
>>> 训练(...)
>>> 验证(...)
>>> 调度器.step()
"文档"
定义 __init__(
自身,
优化器:
优化器,
步长: int,
伽马:
浮点数 = 0.1,
最后一个纪元:
整型 = -1,
): # 无需注意:D107
自身.
步长 =
步长
自身.
伽马 =
伽马
超级().__init__(
优化器,
最后一个纪元)
[文档] def get_lr(self):
"""计算每个参数组的学习率。"""
_warn_get_lr_called_within_step(self)
if (self.last_epoch == 0) or (self.last_epoch % self.step_size != 0):
return [group["lr"] for group in self.optimizer.param_groups]
return [group["lr"] * self.gamma for group in self.optimizer.param_groups]
定义
获取封闭形式的学习率(
自身):
返回 [
基础学习率 *
自身.
γ值 ** (
自身.
最后一个 epoch //
自身.
步长)
for 基础学习率
在
自身.
基础学习率列表
]
[文档]
类
多步学习率调整器(
学习率调度器):
当训练轮数达到里程碑之一时,将每个参数组的学习率衰减为 gamma 倍。
注意,这种衰减可以与来自外部调度器的其他学习率变化同时发生。
当 last_epoch=-1 时,将初始 lr 设置为 lr。
参数:
优化器(Optimizer):包装的优化器。
里程碑(列表):时代索引列表。必须递增。
伽马(浮点数):学习率衰减的乘法因子。
默认:0.1。
最后一个 epoch 索引(整数):最后一个时代的索引。默认:-1。
示例:
>>> # xdoctest: +SKIP
>>> # 假设优化器对所有组使用 lr = 0.05
>>> # lr = 0.05 如果 epoch < 30
>>> # lr = 0.005 如果 30 <= epoch < 80
>>> # lr = 0.0005 如果 epoch >= 80
>>> 调度器 = MultiStepLR(optimizer, milestones=[30,80], gamma=0.1)
>>> for epoch in range(100):
>>> train(...)
>>> validate(...)
>>> 调度器步骤()
"文档"
定义 __init__(
自身,
优化器:
优化器,
里程碑:
迭代器[int
]
γ射线:
浮点数 = 0.1,
最后一个纪元:
整型 = -1,
): # 无需注意:D107
自身.
里程碑 =
计数器(
里程碑)
自身.
玻璃 =
玻璃
超级().__init__(
优化器,
最后一个纪元)
[文档] def get_lr(self):
"""计算每个参数组的学习率。"""
_warn_get_lr_called_within_step(self)
如果 self.last_epoch 不在 self.milestones 中:
返回 [group["lr"] for group in self.optimizer.param_groups]
返回 [
group["lr"] * self.gamma ** self.milestones[self.last_epoch]
for group in self.optimizer.param_groups
]
定义 _get_closed_form_lr(
自身):
里程碑 =
排序(
自身.
里程碑.
元素())
返回 [
基础学习率 *
自身.
γ值 **
二分查找右边界(
阶段列表,
自身.
上一个纪元)
for 基础学习率
在
自身.
基础学习率列表
]
[文档]
类
恒定学习率(LRScheduler):
将每个参数组的学习率乘以一个很小的常数因子。
乘法操作会持续到达到预定义的里程碑:total_iters(总迭代次数)。
注意,这种很小的常数因子的乘法操作可以
发生在其他调度器之外的学习率变化的同时。
当 last_epoch=-1 时,将初始 lr 设置为 lr。
参数:
优化器(Optimizer):包装的优化器。
因子(float):我们乘以学习率的数字,直到达到里程碑。默认:1./3.。
总迭代次数(int):调度器将学习率乘以因子的步数。
默认:5。
last_epoch(整数):最后一个纪元的索引。默认:-1。
示例:
>>> # xdoctest: +SKIP
>>> 假设优化器对所有组使用 lr = 0.05
>>> lr = 0.025 如果纪元 == 0
>>> # lr = 0.025 如果 epoch == 1
>>> # lr = 0.025 如果 epoch == 2
>>> # lr = 0.025 如果 epoch == 3
>>> # lr = 0.05 如果 epoch >= 4
>>> 调度器 = ConstantLR(优化器, factor=0.5, total_iters=4)
>>> for epoch in range(100):
>>> train(...)
>>> validate(...)
>>> 调度器步骤()
"文档"
定义 __init__(
自身,
优化器:
优化器,
因子:
浮点数 = 1.0 / 3,
总迭代次数:
整型 = 5,
上次纪元:
整型 = -1,
): # 无需注意:D107
如果
因子 > 1.0
或者
因子 < 0:
提升 ValueError(
预期常数乘法因子应在 0 到 1 之间。
)
自身.
因子 =
因子
自身.
总迭代次数 =
总迭代次数
超级().__init__(
优化器,
上一个纪元)
[文档] def get_lr(self):
计算每个参数组的学习率。
_warn_get_lr_called_within_step(self)
if self.last_epoch == 0:
return [group["lr"] * self.factor for group in self.optimizer.param_groups]
if self.last_epoch != self.total_iters:
return [group["lr"] for group in self.optimizer.param_groups]
return [
group["lr"] * (1.0 / self.factor) for group in self.optimizer.param_groups
]
定义 _get_closed_form_lr(
自身):
返回 [
基础学习率
* (自身.
因子 + (
自身.
最后的周期 >=
自身.
总迭代次数) * (1 -
自身.
因子))
for 基础学习率
在
自身.
基础学习率调整器
]
[文档]
类
线性学习率调整器(
学习率调度器):
"通过线性改变小的乘性因子来衰减每个参数组的学习率。"
迭代乘法直到达到预定义的里程碑:total_iters。
注意,这种衰减可以与学习率的其他变化同时发生。
从外部调度器设置初始 lr 为 lr。当 last_epoch=-1 时。
参数:
优化器(Optimizer):包装的优化器。
start_factor (浮点数): 第一个 epoch 中乘以学习率的数值。
乘数因子在后续 epoch 中逐渐变为 end_factor。
默认:1./3.
end_factor (浮点数): 线性变化结束时乘以学习率的数值。
处理过程。默认:1.0。
total_iters(整数):乘法因子达到 1 的迭代次数。
默认:5。
last_epoch(整数):最后一个纪元的索引。默认:-1。
示例:
>>> # xdoctest: +SKIP
>>> # 假设优化器对所有组使用 lr = 0.05
>>> # 如果 epoch == 0 则 lr = 0.025
>>> # 如果 epoch == 1 则 lr = 0.03125
>>> # 如果 epoch == 2 则 lr = 0.0375
>>> # lr = 0.04375 如果 epoch == 3
>>> # lr = 0.05 如果 epoch >= 4
>>> scheduler = LinearLR(optimizer, start_factor=0.5, total_iters=4)
>>> for epoch in range(100):
>>> 训练(...)
>>> 验证(...)
>>> 调度器.step()
"文档"
定义 __init__(
自身,
优化器:
优化器,
起始因子:
浮点数 = 1.0 / 3,
结束因子:
浮点数 = 1.0,
总迭代次数:
整型 = 5,
最后一个纪元:
整型 = -1,
): # 无需注意:D107
如果
起始因子 > 1.0
或者
起始因子 <= 0:
提升 ValueError(
"起始乘数预期应大于 0 且小于或等于 1。"
)
如果
结束因子 > 1.0
或者
结束因子 < 0:
提升 ValueError(
"结束乘数预期应在 0 到 1 之间。"
)
自身.
起始因子 =
起始因子
自身.
结束因子 =
结束因子
自身.
总迭代次数 =
总迭代次数
超级().__init__(
优化器,
上次迭代轮数)
[文档] def get_lr(self):
"""计算学习率。"""
_warn_get_lr_called_within_step(self)
if self.last_epoch == 0:
return [
group["lr"] * self.start_factor for group in self.optimizer.param_groups
]
if self.last_epoch > self.total_iters:
return [group["lr"] for group in self.optimizer.param_groups]
return [
group["lr"]
* (
1.0
+ (自身结束因子 - 自身开始因子)
/ (
自身总迭代次数 * 自身开始因子
(self.last_epoch - 1) * (self.end_factor - self.start_factor)
)
)
for group in self.optimizer.param_groups
]
定义
获取闭合形式的学习率(
自身):
返回 [
基础学习率
* (
自身.
起始因子
+ (自身.
结束因子 -
自身.
起始因子)
* 最小(
自身.
总迭代次数,
自身.
上一个纪元)
/ 自身.
总迭代次数
)
for 基础学习率
在
自身.
基础学习率列表
]
[文档]class ExponentialLR(LRScheduler):
"""每轮衰减每个参数组的学习率 gamma。
当 last_epoch=-1 时,将初始 lr 设置为 lr。
Args:
优化器(Optimizer):包装的优化器。
gamma(浮点数):学习率衰减的乘法因子。
last_epoch(整数):最后一个 epoch 的索引。默认:-1。
"""
def __init__(
self,
optimizer: Optimizer,
gamma: float,
last_epoch: int = -1,
): # noqa: D107
self.gamma = gamma
super().__init__(optimizer, last_epoch)
[文档] def get_lr(self):
"""计算每个参数组的学习率。"""
_warn_get_lr_called_within_step(self)
if self.last_epoch == 0:
return [group["lr"] for group in self.optimizer.param_groups]
return [group["lr"] * self.gamma for group in self.optimizer.param_groups]
def _get_closed_form_lr(self):
return [base_lr * self.gamma**self.last_epoch for base_lr in self.base_lrs]
[文档]
类 SequentialLR(LRScheduler):
"""包含在优化过程中预期按顺序调用的调度器列表。
具体来说,调度器将根据里程碑点被调用,这应该提供每个调度器在特定时间点被调用的确切间隔。
调度器(列表):链式调度器的列表。
参数:
优化器(Optimizer):包装后的优化器。
调度器(列表):链式调度器的列表。
里程碑(列表):反映里程碑点的整数列表。
last_epoch(整数):最后一个纪元的索引。默认值:-1。
示例:
>>> # xdoctest: +SKIP
>>> # 假设优化器对所有组使用 lr = 1.
>>> # lr = 0.1 如果纪元 == 0
>>> # lr = 0.1 如果 epoch == 1
>>> # lr = 0.9 如果 epoch == 2
>>> # lr = 0.81 如果 epoch == 3
>>> # lr = 0.729 如果 epoch == 4
>>> scheduler1 = ConstantLR(optimizer, factor=0.1, total_iters=2)
>>> scheduler2 = ExponentialLR(optimizer, gamma=0.9)
>>> scheduler = SequentialLR(optimizer, schedulers=[scheduler1, scheduler2], milestones=[2])
>>> for epoch in range(100):
>>> 训练(...)
>>> 验证(...)
>>> 调度器.step()
"文档"
定义 __init__(
自身,
优化器:
优化器,
调度器:
列表[
学习率调度器
]
里程碑:
列表[int
]
最后一个纪元:
整型 = -1,
): # 无需注意:D107
如果
长度(
调度器) < 1:
提升 ValueError(
f"{自身.
类.__name__}
预期至少有一个调度器,但未获取到调度器。
)
for 调度器索引,
调度器
在
列举(
调度器):
如果
不
有属性(
调度程序,
优化器):
提升
类型错误(
f"{自身.
类.__name__}
在索引处{scheduler_idx}
应具有`optimizer`属性。"
)
如果 isinstance(
调度器, ReduceLROnPlateau):
提升 ValueError(
f"{自身.
类.__name__}
不支持 `ReduceLROnPlateau` 调度器,因为它 "
调用 `step` 时需要指定额外的 kwargs,但只指定了一个在索引
f但获取了一个在索引{scheduler_idx}
在给定的调度器序列中。"
)
如果
优化器 !=
调度器.
优化器:
提升 ValueError(
f"{自身.
类.__name__}
预期所有调度器都属于同一个优化器,但 "
f"获取调度器"{
调度器.
类.__name__}
在索引{
调度器_idx}
有{
调度器.
优化器}
,"
f"这与...不同"{
优化器.
类.__name__}
。
)
如果
长度(
里程碑) !=
长度(
调度器) - 1:
提升 ValueError(
"顺序调度器期望提供的调度器数量比一个多"
f"比里程碑点的数量多,但获得了调度器的数量"{
长度(
调度器)}
和“
f"里程碑的数量要等于"{
长度(
里程碑)}"
)
自身.
_调度器 =
调度器
自身.
_里程碑 =
里程碑
自身.
最后一个纪元 =
最后一个纪元 + 1
自身.
优化器 =
优化器
将学习率重置回初始值
for 群组
在
自身.
优化器.
参数组:
群组["lr"] =
群组[
初始学习率]
# 撤销其他调度器执行的步骤
自身.
递归撤销()
# 仅对第一个调度器执行初始步骤
自身.
_调度器[0].
_初始步骤()
自身.
_最后学习率 =
调度器[0].
获取最后一个学习率()
[文档] def 递归撤销(self, sched=None):
"""
递归撤销由初始化步骤执行的任何步骤
调度器。
"""
scheds = self if sched is None else sched
如果 scheds 具有"_schedulers"属性:
for s in scheds._schedulers:
self.recursive_undo(s)
elif hasattr(scheds, "last_epoch"):
scheds.last_epoch -= 1
[文档] def step(self): # type: ignore[override]
"""执行一个步骤。”
self.last_epoch += 1
idx = bisect_right(self._milestones, self.last_epoch)
scheduler = self._schedulers[idx]
if idx > 0 and self._milestones[idx - 1] == self.last_epoch:
scheduler.step(0)
else:
scheduler.step()
self._last_lr = scheduler.get_last_lr()
[文档] def state_dict(self):
"""返回调度器的状态为 :class:`dict`。
它包含 self.__dict__中每个变量的条目
不是优化器。
包装的调度器状态也将被保存。
```python
# 输入文本
input_text = '"""'
# 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 假设的翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
state_dict = {
键: 值
for key, value in self.__dict__.items()
if key not in ("optimizer", "_schedulers")
}
state_dict["_schedulers"] = [None] * len(self._schedulers)
for idx, s in enumerate(self._schedulers):
state_dict["_schedulers"][idx] = s.state_dict()
返回状态字典
[文档] def load_state_dict(self, state_dict):
"""加载调度器的状态。
参数:
state_dict (dict): 状态字典。应该是一个从调用 :meth:`state_dict` 返回的对象
从一个对 :meth:`state_dict` 的调用中获取
"""
_schedulers = state_dict.pop("_schedulers")
self.__dict__.update(state_dict)
# 恢复 state_dict 键以防止副作用
# https://github.com/pytorch/pytorch/issues/32756
state_dict["_schedulers"] = _schedulers
for idx, s in enumerate(_schedulers):
self._schedulers[idx].load_state_dict(s)
[文档]
类
多项式学习率(
学习率调度器):
使用多项式函数在给定的 total_iters 中衰减每个参数组的学习率。
当 last_epoch=-1 时,将初始 lr 设置为 lr。
参数:
优化器(Optimizer):包装的优化器。
total_iters(int):调度器衰减学习率的步数。默认:5。
power (浮点数): 多项式的幂。默认值:1.0。
示例:
>>> # xdoctest: +SKIP("undefined vars")
>>> 假设优化器对所有组使用 lr = 0.001
>>> lr = 0.001 如果 epoch == 0
>>> lr = 0.00075 如果 epoch == 1
>>> # lr = 0.00050 如果 epoch == 2
>>> # lr = 0.00025 如果 epoch == 3
>>> # lr = 0.0 如果 epoch >= 4
>>> scheduler = PolynomialLR(optimizer, total_iters=4, power=1.0)
>>> for epoch in range(100):
>>> 训练(...)
>>> 验证(...)
>>> 调度器.step()
"文档"
定义 __init__(
自身,
优化器:
优化器,
总迭代次数:
整型 = 5,
功率:
浮点数 = 1.0,
最后一个纪元:
整型 = -1,
): # 无需注意:D107
自身.
总迭代次数 =
总迭代次数
自身.
功率 =
功率
超级().__init__(
优化器,
最后一个纪元)
[文档] def get_lr(self):
"""计算学习率。"""
_warn_get_lr_called_within_step(self)
if self.last_epoch == 0 or self.last_epoch > self.total_iters:
return [group["lr"] for group in self.optimizer.param_groups]
decay_factor = (
(1.0 - self.last_epoch / self.total_iters)
/ (1.0 - (self.last_epoch - 1) / self.total_iters)
) ** self.power
return [group["lr"] * decay_factor for group in self.optimizer.param_groups]
定义 _get_closed_form_lr(
自身):
返回 [
(
base_lr
* (1.0 - 最小(
自身.
总迭代次数,
自身.
上次纪元) /
自身.
总迭代次数)
** 自身.
功率
)
for 基础学习率
在
自身.
基础学习率列表
]
[文档]
类
余弦退火学习率(
学习率调度器):
r使用余弦退火计划设置每个参数组的学习率。
将 :math:`\eta_{max}` 设置为初始 lr。
math:`T_{cur}` 是 SGDR 中自上次重启以来的 epoch 数。
.. math::
\begin{aligned}
\eta_t & = \eta_{min} + \frac{1}{2}(\eta_{max} - \eta_{min})\left(1
\( \cos\left(\frac{T_{cur}}{T_{max}}\pi\right)\right),
\( T_{cur} \neq (2k+1)T_{max}; \)
\( \eta_{t+1} = \eta_{t} + \frac{1}{2}(\eta_{max} - \eta_{min}) \)
\( \left(1 - \cos\left(\frac{1}{T_{max}}\pi\right)\right), \)
& T_{cur} = (2k+1)T_{max}
\end{aligned}
当 last_epoch=-1 时,将初始学习率设置为 lr。注意,由于调度
递归定义,学习率可同时修改
外部由其他操作员通过此调度器设置。如果学习率被设置
仅由这个调度器控制,每个步骤的学习率变为:
.. math::
\eta_t = \eta_{min} + \frac{1}{2}(\eta_{max} - \eta_{min})\left(1 + \cos\left(\frac{T_{cur}}{T_{max}}\pi\right)\right)
\cos\left(\frac{T_{cur}}{T_{max}}\pi\right)\right)
已有提议在
`SGDR: 带有 Warm Restarts 的随机梯度下降`_. 注意,这仅
实现了 SGDR 的余弦退火部分,而不是重启。
参数:
优化器(Optimizer):包装的优化器。
T_max(整型):最大迭代次数。
eta_min (浮点数): 最小学习率。默认值:0。
last_epoch (整数): 上一个训练周期的索引。默认值:-1。
.. _SGDR:随机梯度下降带重启:
https://arxiv.org/abs/1608.03983
"文档"
定义 __init__(
自身,
优化器:
优化器,
T_max: int,
估计最小值:
浮点数 = 0.0,
上一个纪元:
整型 = -1,
): # 无需注意:D107
自身.
最大温度 =
最大温度
自身.
估计最小值 = eta_min
超级().__init__(
优化器,
上一个纪元)
[文档] def get_lr(self):
"""获取每个参数组的学习率。"""
_warn_get_lr_called_within_step(self)
if self.last_epoch == 0:
return [group["lr"] for group in self.optimizer.param_groups]
elif self._step_count == 1 and self.last_epoch > 0:
return [
self.eta_min
+ (基础学习率 - 自定义最小学习率)
* (1 + math.cos((self.last_epoch) * math.pi / self.T_max))
/ 2
for base_lr, group in zip(self.base_lrs, self.optimizer.param_groups)
]
elif (self.last_epoch - 1 - self.T_max) % (2 * self.T_max) == 0:
return [
group["lr"]
+ (基础学习率 - 最小学习率) * (1 - cos(π / 最大迭代次数)) / 2
for 基础学习率, 群组 in zip(self.base_lrs, self.optimizer.param_groups)
]
返回 [
(1 + cos(π * self.last_epoch / self.T_max))
/ (1 + cos(π * (self.last_epoch - 1) / self.T_max))
* (group["lr"] - self.eta_min)
+ self.eta_min
for group in self.optimizer.param_groups
]
定义 _get_closed_form_lr(
自身):
返回 [
自身.eta_min
+ (基础学习率 -
自身.
最小学习率)
* (1 + 数学.
余弦(
数学.
圆周率 *
自身.
最后的周期 /
自身.
最大时间))
/ 2
for 基础学习率
在
自身.
基础学习率列表
]
[文档]
类
链式调度器(
学习率调度器):
链式连接一系列学习率调度器。
接受一系列可链式学习率调度器并调用它们
step() 函数在单个 step() 调用中连续执行。
参数:
调度器(序列):链式调度器的序列。
优化器(Optimizer,可选):包装优化器。默认:无。
示例:
>>> # xdoctest: +SKIP
>>> 假设优化器对所有组使用 lr = 1.。
>>> lr = 0.09 如果 epoch == 0
>>> lr = 0.081 如果 epoch == 1
>>> # lr = 0.729 如果 epoch == 2
>>> # lr = 0.6561 如果 epoch == 3
>>> # lr = 0.59049 如果 epoch >= 4
>>> scheduler1 = ConstantLR(optimizer, factor=0.1, total_iters=2)
>>> scheduler2 = ExponentialLR(optimizer, gamma=0.9)
>>> scheduler = ChainedScheduler([scheduler1, scheduler2], optimizer=optimizer)
>>> for epoch in range(100):
>>> train(...)
>>> 验证(...)
>>> 调度器.step()
"文档"
定义 __init__(
自身,
调度器:
序列[LRScheduler
]
优化器:
可选[
优化器] =
无
): # 无需注意:D107
如果
长度(
调度器) < 1:
提升 ValueError(
f"{自身.
类.__name__}
至少需要一个调度器进行链式调用,但未获取到任何调度器。
)
优化器 =
优化器
或者
调度器[0].
优化器
for 调度器索引,
调度器
在
列举(
调度器):
如果
不
有属性(
调度程序,
优化器):
提升
类型错误(
f"{自身.
类.__name__}
在索引处{scheduler_idx}
应具有`optimizer`属性。"
)
如果 isinstance(
调度器, ReduceLROnPlateau):
提升 ValueError(
f"{自身.
类.__name__}
不支持 `ReduceLROnPlateau` 调度器,因为它 "
调用 `step` 时需要指定额外的 kwargs,但只指定了一个在索引
f但获取了一个在索引{scheduler_idx}
在给定的调度程序序列中。"
)
如果
优化器 !=
调度程序.
优化器:
提升 ValueError(
f"{自身.
类.__name__}
预期所有调度程序都属于同一个优化器,但 "
f"得到了调度程序{
调度器.
类.__name__}
在索引{
调度器_idx}
有{
调度器.
优化器}
,"
f"这与{
优化器.
类.__name__}
。
)
自身.
_调度器 =
调度器
自身.
优化器 =
优化器
自身.
_最后学习率 = [
群组["lr"] for
群组
在
自身.
_调度器[-1].
优化器.
参数组
]
[文档] def step(self): # type: ignore[override]
"""执行一步。”
for scheduler in self._schedulers:
scheduler.step()
self._last_lr = [
group["lr"] for group in self._schedulers[-1].optimizer.param_groups
]
[文档] def state_dict(self):
"""返回调度器的状态作为 :class:`dict`。
它包含 self.__dict__ 中每个变量的条目
这不是一个优化器。
被包装的调度器状态也将被保存。
"""
state_dict = {
key: value
for key, value in self.__dict__.items()
if key not in ("optimizer", "_schedulers")
}
state_dict["_schedulers"] = [None] * len(self._schedulers)
for idx, s in enumerate(self._schedulers):
state_dict["_schedulers"][idx] = s.state_dict()
return state_dict
[文档] def 加载状态字典(self, state_dict):
"""加载调度器的状态。
参数:
state_dict (dict): 调度器状态。应为返回的对象
从对 :meth:`state_dict` 的调用中
"""
_schedulers = state_dict.pop("_schedulers")
self.__dict__.update(state_dict)
恢复状态字典键的顺序以防止副作用
https://github.com/pytorch/pytorch/issues/32756
state_dict["_schedulers"] = _schedulers
for idx, s in enumerate(_schedulers):
self._schedulers[idx].load_state_dict(s)
[文档]
类 ReduceLROnPlateau(LRScheduler):
"""当指标停止改进时减少学习率。
模型通常从减少学习率中受益,
2-10 次一旦学习停滞。此调度器读取指标
数量,如果对于“耐心”数字没有看到改进
随着 epoch 数的增加,学习率降低。
参数:
优化器(Optimizer):包装优化器。
模式(字符串):`min` 或 `max` 之一。在 `min` 模式下,lr 将
当监控的数量停止时,将减少
减少;在`max`模式下将减少
监控的数量已停止增加。默认值: 'min'。
因子(浮点数):学习率将按此因子降低。new_lr = lr * factor。默认:0.1。
降低。new_lr = lr * factor。默认:0.1。
忍耐(整数):在降低学习率之前允许没有改进的 epoch 数。
之后学习率将降低。
例如,考虑没有耐心(`patience = 0`)的情况。
在第一个时期,建立了一个基线,由于没有之前的基线,所以总是被认为是好的。
在第二个时期,如果性能比基线差,
我们就认为这是一个无法容忍的时期。
由于无法容忍的时期(1)的数量大于耐心水平(0),
在本时期结束时降低学习率。
从第三个时期开始,学习率在每个时期结束时继续降低,
如果性能比基线差。如果性能有所提高或保持不变,
学习率未调整。
默认:10。
阈值(浮点数):测量新最优值的阈值,
仅关注显著变化。默认:1e-4。
threshold_mode (字符串): `rel` 或 `abs` 之一。在 `rel` 模式下,
dynamic_threshold = best * ( 1 + threshold ) 在 'max' 模式下,
mode 或 best * ( 1 - threshold ) 在 `min` 模式下。
在 `abs` 模式下,dynamic_threshold = best + threshold 在
`max`模式或最佳阈值 - `min`模式下的阈值。默认:'rel'。
冷却时间(整数):在降低学习率后等待的 epoch 数,之后恢复正常操作。默认:0。
min_lr(浮点数或列表):一个标量或标量列表。
A scalar or a list of scalars.
学习率所有参数组的下限
或分别对每个组。默认:0。
eps (float): 最小衰减应用于学习率。如果差异
新旧 lr 之间的差距小于 eps,更新是
忽略。默认:1e-8。
示例:
>>> # xdoctest: +SKIP
>>> optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9)
>>> scheduler = ReduceLROnPlateau(optimizer, 'min')
>>> for epoch in range(10):
>>> 训练(...)
>>> 验证损失 = 验证(...)
>>> # 注意:step 应在 validate() 之后调用
>>> 调度器.step(val_loss)
"文档"
定义 __init__(
自身,
优化器:
优化器,
模式:
直接[
min,
最大] =
min,
因子:
浮点数 = 0.1,
耐心:
整型 = 10,
阈值:
浮点数 =
0.0001,
阈值模式:
直接[
相对, "abs"] = "rel",
冷却时间:
整型 = 0,
最小学习率:
并集[
列表[float
] float] = 0,
eps: 浮点数 = 1e-8,
): # 无需注意:D107
如果
因子 >= 1.0:
提升 ValueError(
因子应小于 1.0。)
自身.
因子 =
因子
# 添加优化器
如果
不 isinstance(
优化器,
优化器):
提升
类型错误(f"{
类型(
优化器).__name__}
不是一个优化器)
自身.
优化器 =
优化器
如果 isinstance(
最小学习率, (
列表,
元组)):
如果
长度(
最小学习率) !=
长度(
优化器.
参数组):
提升 ValueError(
f预期{
长度(
优化器.
参数组)}
最小学习率列表,获取{
长度(
最小学习率)}"
)
自身.
默认最小学习率 =
无
自身.
最小学习率列表 =
列表(
最小学习率)
else:
自身.
默认最小学习率 =
最小学习率
自身.
最小学习率列表 = [
最小学习率] *
长度(
优化器.
参数组)
自身.
耐心 =
耐心
自身.
冷却 =
冷却
自身.
冷却计数器 = 0
自身.
模式 =
模式
自身.
阈值 =
阈值
自身.
阈值模式 =
阈值模式
自身.
最佳:
浮点数
自身.
坏周期数:
整型
自身.
模式更差:
浮点数
# 选择模式的更差值
自身.eps = eps
自身.
最后一个纪元 = 0
自身.
_最后学习率 = [
群组["lr"] for
群组
在
自身.
优化器.
参数组]
自身.
初始化更好(
模式=
模式,
阈值=
阈值,
阈值模式=
阈值模式
)
自身.
重置()
定义
重置(
自身):
"重置 num_bad_epochs 计数器和冷却计数器。"
自身.
最佳 =
自身.
模式更差
自身.
凉却计数器 = 0
自身.
恶劣时代数量 = 0
[文档] def step(self, metrics: SupportsFloat, epoch=None): # type: ignore[override]
"""执行一步。”
将 `metrics` 转换为浮点数,如果它是一个零维张量
current = float(metrics)
如果 epoch 为 None:
epoch = self.last_epoch + 1
else:
warnings.warn(EPOCH_DEPRECATION_WARNING, UserWarning)
self.last_epoch = epoch
if self.is_better(current, self.best):
self.best = 当前
self.num_bad_epochs = 0
else:
self.num_bad_epochs += 1
if self.in_cooldown:
self.cooldown_counter -= 1
self.num_bad_epochs = 0 # 忽略冷却期间任何不良的 epoch
if self.num_bad_epochs > self.patience:
self._reduce_lr(epoch)
self.cooldown_counter = self.cooldown
self.num_bad_epochs = 0
self._last_lr = [group["lr"] for group in self.optimizer.param_groups]
定义
减少学习率(
自身,
训练轮数):
如果
长度(
自身.
优化器.
参数组) !=
长度(
自身.
最小学习率):
如果
自身.
默认最小学习率
是
无:
提升
运行时错误(
`optimizer`中的参数组数量
f“(”{
长度(
自身.
优化器.
参数组)}
) 区别于
f从`ReduceLROnPlateau`初始化时起
f“(”{
长度(
自身.min_lrs)}
), 通常是由于添加了新的 "
"参数组到优化器中。请 "
"修改 `min_lrs` 字段以匹配长度 "
`优化器`参数组。
)
else:
自身.
最小学习率。 = [
自身.
默认最小学习率。] *
长度(
自身.
优化器.
参数组)
for i, 参数组。
在
列举(
自身.
优化器.
参数组):
旧学习率 = float(
参数组["lr"])
新学习率 =
最大值(
旧学习率 *
自身.
因子,
自身.
最小学习率[i])
如果
旧学习率 -
新学习率 >
自身.eps:
参数组["lr"] =
新_lr
@property
定义
在冷却中(
自身): # noqa: D102
返回
自身.
冷却倒计时 > 0
定义
更好(
自身, a,
最佳): # noqa: D102
如果
自身.
模式 ==
最小
和
自身.
阈值模式 ==
"相关":
相关 epsilon = 1.0 -
自身.
阈值
返回 a <
最好的 *
相对误差
elif 自身.
模式 ==
"最小"
和
自身.
阈值模式 == "abs":
返回 a <
最好的 -
自身.
阈值
elif 自身.
模式 ==
"最大"
和
自身.
阈值模式 ==
相对:
相对容差 =
自身.
阈值 + 1.0
返回 a >
最佳 *
相对容差
else: # 模式 == 'max' 且 epsilon_mode == 'abs':
返回 a >
最佳 +
自身.
阈值
定义
_初始化更好(
自身,
模式,
阈值,
阈值模式):
如果
模式
不
在 {
min,
最大}:
提升 ValueError(
"模式 " +
模式 +
"是未知的!")
如果
阈值模式
不
在 {"rel",
绝对}:
提升 ValueError(
"阈值模式 " +
阈值模式 +
"是未知的!")
如果
模式 ==
min:
自身.
模式更差 =
无
else: # 模式 == 'max':
自身.
模式更差 = -
无
自身.
模式 =
模式
自身.
阈值 =
阈值
自身.
阈值模式 =
阈值模式
定义
状态字典(
自身):
# 无需注意:D102
返回 {
键:
值 for
键,
值
在
自身.
字典.
项目()
如果 key !=
优化器
}
[docs] def 加载状态字典(self, 状态字典):
加载调度器的状态。
self.__dict__.update(state_dict)
self._init_is_better(
mode=self.mode, threshold=self.threshold, threshold_mode=self.threshold_mode
)
[文档]
类
循环学习率(LRScheduler):
r设置每个参数组的学习率,根据循环学习率策略(CLR)。
该策略以恒定频率在两个边界之间循环学习率
如论文《循环学习率训练神经网络》中详细描述的_。
两次边界之间的距离可以在每次迭代或每个周期的基础上进行缩放。
周期性学习率策略在每个批次之后改变学习率。
在使用批次进行训练后应调用`step`。
`step`应在每个批次使用后调用。
本类具有三种内置策略,如论文所述:
* "三角形": 基本三角形循环,不进行振幅缩放。
* "三角形 2": 基本三角形循环,每次循环将初始振幅缩小一半。
* "指数范围": 一个循环,每次循环将初始振幅缩放为 :math:`\text{gamma}^{\text{循环迭代次数}}`。
在每个循环迭代中。
此实现改编自 github 仓库:`bckenstler/CLR`_。
参数:
优化器(Optimizer):包装优化器。
base_lr(浮点数或列表):初始学习率,即
周期中每个参数组的下限边界。
max_lr(浮点数或列表):周期中每个参数组的上限学习率边界。
功能上,它定义了周期振幅(max_lr - base_lr)。
它定义了周期振幅(max_lr - base_lr)。
任何周期的 lr 都是基础 lr 的总和
以及振幅的一些缩放;因此
最大 lr 可能实际上并未达到,这取决于
缩放函数。
step_size_up (int): 每次训练迭代的步长。默认值:2000
increasing half of a cycle. Default: 2000
step_size_down (int): 周期减少一半的训练迭代次数。如果 step_size_down 为 None,
decreasing half of a cycle. If step_size_down is None,
它被设置为 step_size_up。默认:无
模式(str):{三角形,三角形 2,exp_range}之一。
值对应于上面详细说明的策略。
如果 scale_fn 不为 None,则忽略此参数。
默认: '三角形'
gamma (浮点数): 'exp_range' 缩放函数中的常数:
gamma**(循环迭代次数)
默认: 1.0
scale_fn(函数):自定义缩放策略,由单个参数 lambda 函数定义
其中
对于所有 x >= 0,0 <= scale_fn(x) <= 1
如果指定,则忽略'mode'
默认:None
scale_mode (str): {'cycle', 'iterations'}.
定义 scale_fn 是否在周期数或周期迭代(训练
开始以来的迭代次数)上评估。
周期迭代次数。
默认:'周期'
cycle_momentum (bool): 如果 ``True``,动量将反向循环
将学习率设置为介于 'base_momentum' 和 'max_momentum' 之间。
默认: True
基础动量(浮点数或列表):循环中的低动量边界
对于每个参数组。注意动量是反向循环的
学习率;在周期峰值时,动量是
'base_momentum' 和学习率是 'max_lr'。
默认:0.8
max_momentum(浮点数或列表):循环的上限动量
对于每个参数组。功能上,
它定义了循环振幅(max_momentum - base_momentum)。
任何循环的动量是 max_momentum 与 base_momentum 之差
并且对振幅进行一些缩放;因此
基础动量可能实际上无法达到,这取决于
缩放函数。注意动量是反向循环的
学习率;在周期开始时,动量是 'max_momentum'
并且学习率是 'base_lr'
默认:0.9
last_epoch(整型):最后一个批次的索引。此参数在
恢复训练作业。由于`step()`应该在每次批量之后调用,而不是在每个 epoch 之后调用,因此这个数字代表计算的总批次数,而不是计算的总 epoch 数。
当 last_epoch=-1 时,调度从开始处启动。
这个数字代表计算的总批次数,而不是计算的总 epoch 数。
当 last_epoch=-1 时,调度从开始处启动。
默认:-1
示例:
>>> # xdoctest: +SKIP
>>> 优化器 = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9)
>>> 调度器 = torch.optim.lr_scheduler.CyclicLR(优化器, base_lr=0.01, max_lr=0.1)
>>> 数据加载器 = torch.utils.data.DataLoader(...)
>>> for epoch in range(10):
>>> for batch in data_loader:
>>> train_batch(...)
>>> scheduler.step()
.. _循环学习率训练神经网络: https://arxiv.org/abs/1506.01186
.. _bckenstler/CLR: https://github.com/bckenstler/CLR
"文档"
定义 __init__(
自身,
优化器:
优化器,
基础学习率:
并集[float,
列表[float]],
最大学习率:
并集[float,
列表[float]],
步长增加:
整型 = 2000,
步长减少:
可选[int] =
无,
模式:
直接[
三角形,
三角形 2, "exp_range"] = "triangular",
γ射线:
浮点数 = 1.0,
scale_fn: 可选[
可调用[[float
] float]] =
无,
scale_mode: 直接[
"周期",
"迭代"] =
"周期",
周期动力:
布尔 =
是,
基础动量:
浮点数 = 0.8,
最大动量:
浮点数 = 0.9,
最后一个 epoch:
整型 = -1,
): # 无需注意:D107
添加优化器
如果
不 isinstance(
优化器,
优化器):
提升
类型错误(f"{
类型(
优化器).__name__}
不是一个优化器)
自身.
优化器 =
优化器
基础学习率 =
_格式参数(
基础学习率,
优化器,
基础学习率)
如果
最后一个 epoch == -1:
for 学习率,
群组
在 zip(
基础学习率列表,
优化器.
参数组):
如果 isinstance(
群组["lr"
]
张量):
lr_val = 学习率.
项目()
如果 isinstance(
学习率,
张量)
否则
左右
群组["lr"].
填充_(lr_val)
else:
群组["lr"] =
左右
自身.
最大学习率 =
_格式参数(
"最大学习率",
优化器,
最大学习率)
步长增加 = float(
步长增加)
步长减小 = (
float(步长减小)
如果
步长减小
是
不
无
否则
步长增加
)
自身.
总大小 =
步长增加 +
步长减少
自身.
步长比 =
步长增加 /
自身.
总大小
如果
模式
不
在 [
三角形,
三角形 2, "exp_range"]
和 scale_fn
是
无:
提升 ValueError(
"模式无效且 scale_fn 为空")
自身.
模式 =
模式
自身.
伽玛 = gamma
自身._scale_fn_ref:
可调用[[float
] float]
自身._scale_fn_custom = scale_fn
自身.
缩放模式 =
缩放模式
自身.
_初始化缩放函数()
自身.
循环动量 =
循环动量
如果
循环动量:
如果 (
动量
不
在
优化器.
默认
和
贝塔值
不
在
优化器.
默认
):
提升 ValueError(
"优化器必须支持启用 `cycle_momentum` 选项的动量或 beta1"
)
自身.use_beta1 =
"beta 值"
在
自身.
优化器.
默认
自身.
基础动量 =
格式参数(
基础动量,
优化器,
基础动量
)
自身.
最大动量 =
_格式参数(
"最大动量",
优化器,
最大动量)
如果
上一个纪元 == -1:
for m 动量,
b 动量,
组
在 zip(
自身.
最大动量,
自身.
基础动量,
优化器.
参数组
):
如果
自身.
使用 beta1:
群组[
贝塔] = (
动量_m, *
群组[
贝塔
]
[1])
else:
群组[
动量] =
动量_m
群组[
最大动量] =
动量_m
群组[
基础动量] =
b 动量
超级().__init__(
优化器,
最后一个纪元)
自身.
基础_lrs =
基础_lrs
定义
_初始化缩放函数(
自身):
如果
自身.
_自定义缩放函数
是
不
无:
返回
如果
自身.
模式 ==
三角形:
自身._scale_fn_ref =
自身._triangular_scale_fn
自身.
缩放模式 =
"循环"
elif 自身.
模式 ==
"三角形 2":
自身._scale_fn_ref =
自身._triangular2_scale_fn
自身.
缩放模式 =
循环
elif 自身.
模式 ==
指数范围:
自身.
_缩放函数引用_ =
偏函数(
自身._exp_range_scale_fn,
自身.
伽马)
自身.
缩放模式 =
"迭代次数"
[文档] def scale_fn(self, x) -> float:
获取缩放策略。
if self._scale_fn_custom is not None:
返回 self._scale_fn_custom(x)
else:
return self._scale_fn_ref(x) # 静态方法
@staticmethod
定义 _triangular_scale_fn(x: float)
输入文本:
->
翻译:
-> float:
返回 1.0
@staticmethod
定义 _triangular2_scale_fn(x: float)
输入文本:
->
翻译:
-> float:
返回 1 / (2.0 ** (x - 1))
@staticmethod
定义 _exp_range_scale_fn(
伽马: float, x: float)
输入文本:
->
翻译:
-> float:
返回
伽马**x
[文档] def get_lr(self):
"""计算批处理索引处的学习率。
该函数将 `self.last_epoch` 视为最后一个批处理索引。
如果 `self.cycle_momentum` 为 ``True``,则此函数具有副作用,
更新优化器的动量。
"""
_warn_get_lr_called_within_step(self)
cycle = math.floor(1 + self.last_epoch / self.total_size)
x = 1.0 + self.last_epoch / self.total_size - cycle
if x <= self.step_ratio:
scale_factor = x / self.step_ratio
else:
scale_factor = (x - 1) / (self.step_ratio - 1)
lrs = []
for base_lr, max_lr in zip(self.base_lrs, self.max_lrs):
base_height = (max_lr - base_lr) * scale_factor
如果 self.scale_mode 等于 "cycle":
lr = base_lr + base_height * self.scale_fn(cycle)
否则:
lr = base_lr + base_height * self.scale_fn(self.last_epoch)
lrs.append(lr)
if self.cycle_momentum:
momentums = []
for base_momentum, max_momentum in zip(
self.base_momentums, self.max_momentums
):
base_height = (max_momentum - base_momentum) * scale_factor
if self.scale_mode == "cycle":
动量 = 最大动量 - 基础高度 * self.scale_fn(cycle)
else:
动量 = 最大动量 - 基础高度 * self.scale_fn(
self.last_epoch
)
momentums.append(momentum)
for param_group, momentum in zip(self.optimizer.param_groups, momentums):
if self.use_beta1:
param_group["betas"] = (momentum, *param_group["betas"][1:])
else:
param_group["momentum"] = momentum
return lrs
定义
状态字典(
自身):
# 无需注意:D102
状态 =
超级().
状态字典()
我们正在删除 `_scale_fn_ref` 属性,因为它是一个
`weakref.WeakMethod` 不能被序列化。
状态.
流行(
_scale_fn_ref,
无)
fn = 状态.
流行(
_scale_fn_custom)
状态[
_scale_fn_custom] =
无
如果 fn
是
不
无
和
不 isinstance(
函数,
类型.
函数类型):
# _scale_fn_custom 仅当其为可调用对象时才会保存
# 如果它是一个函数或 lambda,则不会保存
状态["_scale_fn_custom"] =
函数.
字典.
复制()
返回
状态
[文档] def load_state_dict(self, state_dict):
"""加载调度器的状态。"""
fn = state_dict.pop("_scale_fn_custom")
super().load_state_dict(state_dict)
if fn is not None:
self._scale_fn_custom.__dict__.update(fn)
self._init_scale_fn()
[文档]
类
余弦退火重启(
学习率调度器):
r使用余弦退火计划设置每个参数组的学习率。
其中 :math:`\eta_{max}` 设置为初始 lr,:math:`T_{cur}`
是自上次重启以来的 epoch 数,:math:`T_{i}` 是第
两次热重启之间的 SGDR 时代数:
.. math::
\eta_t = \eta_{min} + \frac{1}{2}(\eta_{max} - \eta_{min})\left(1 +
\cos\left(\frac{T_{cur}}{T_{i}}\pi\right)\right)
当 :math:`T_{cur}=T_{i}` 时,设置 :math:`\eta_t = \eta_{min}`。
当 :math:`T_{cur}=0` 后重启时,设置 :math:`:eta_t=:eta_{max}`。
已有提议在
`SGDR:带有 Warm Restarts 的随机梯度下降`_。
参数:
优化器(Optimizer):包装的优化器。
T_0 (int): 首次重启前的迭代次数。
T_mult (int, optional): 重启后 :math:`T_{i}` 增加的倍数。默认:1。
eta_min (float, optional): 最小学习率。默认:0。
last_epoch (int, optional): 最后一个纪元的索引。默认:-1。
SGDR:随机梯度下降与预热重启
https://arxiv.org/abs/1608.03983
"文档"
定义 __init__(
自身,
优化器:
优化器,
T_0: int,
T_mult: 整型 = 1,
eta_min: 浮点数 = 0.0,
最后一个纪元:
整型 = -1,
): # 无需注意:D107
如果 T_0 <= 0
或者
不 isinstance(T_0, int):
提升 ValueError(f
预期得到正整数 T_0,但得到了{T_0}")
如果 T_mult < 1
或者
不 isinstance(T_mult, int):
提升 ValueError(f
预期整数 T_mult >= 1,但得到{T_mult}")
如果
不 isinstance(eta_min, (float, int)):
提升 ValueError(
f预期输入浮点数或整数 eta_min,但得到{eta_min}
的类型{
类型(eta_min)}"
)
自身.T_0 = T_0
自身.T_i = T_0
自身.
T_多 =
T_多
自身.
eta_最小 =
eta_最小
自身.T_cur =
最后一个纪元
超级().__init__(
优化器,
最后一个纪元)
[文档] def get_lr(self):
"""计算初始学习率。"""
_warn_get_lr_called_within_step(self)
return [
self.eta_min
+ (base_lr - self.eta_min)
* (1 + cos(π * self.T_cur / self.T_i))
/ 2
for base_lr in self.base_lrs
]
[文档]
定义
步长(
自身,
埃波克=
无):
步骤可以在每次批量更新后调用。
示例:
>>> # xdoctest: +SKIP("未定义的变量")
>>> 调度器 = CosineAnnealingWarmRestarts(optimizer, T_0, T_mult)
>>> iters = len(dataloader)
>>> for epoch in range(20):
>>> for i, sample in enumerate(dataloader):
>>> inputs, labels = sample['inputs'], sample['labels']
>>> optimizer.zero_grad()
>>> outputs = net(inputs)
>>> loss = criterion(outputs, labels)
>>> loss.backward()
>>> 优化器步骤()
>>> 调度器步骤(epoch + i / iters)
此函数可以交错调用。
示例:
>>> # xdoctest: +SKIP("未定义变量")
>>> 调度器 = CosineAnnealingWarmRestarts(optimizer, T_0, T_mult)
>>> for epoch in range(20):
>>> 调度器.step()
>>> 调度器.step(26)
>>> 调度器步骤() # 调度器步骤(27),而不是调度器(20)
"文档"
如果
训练轮次
是
无
和
自身.
最后的周期 < 0:
训练轮次 = 0
如果
训练轮次
是
无:
埃波克 =
自身.
最后的周期 + 1
自身.
当前温度 =
自身.
当前温度 + 1
如果
自身.
当前温度 >=
自身.T_i:
自身.T_cur =
自身.T_cur -
自身.T_i
自身.T_i =
自身.T_i *
自身.T_mult
else:
如果 epoch < 0:
提升 ValueError(f
预期得到非负的纪元,但得到了{
纪元}")
如果
纪元 >=
自身.T_0:
如果
自身.T_mult == 1:
自身.T_cur =
埃波克 %
自身.T_0
else:
n = int(
数学.
日志(
(埃波克 /
自身.T_0 * (
自身.
T_多 - 1) + 1),
自身.
T_多
)
)
自身.
T_当前 =
埃波克 -
自身.T_0 * (
自身.
T_多**n - 1) / (
自身.
T_多 - 1
)
自身.T_i =
自身.T_0 *
自身.T_mult ** (n)
else:
自身.T_i =
自身.T_0
自身.T_cur =
时间纪元
自身.
最后的周期 =
数学.
向下取整(
时间纪元)
与
启用获取学习率调用(
自身):
for 参数组,
左右
在 zip(
自身.
优化器.
参数组,
自身.
获取学习率()):
参数组["lr"] =
左右
自身.
_最后学习率 = [
群组["lr"] for
群组
在
自身.
优化器.
参数组]
类
_调度阶段(
类型字典):
结束步骤:
浮点数
开始学习率:
字符串
结束学习率:
字符串
启动动量:
字符串
结束动量:
字符串
[文档]
类 OneCycleLR(
学习率调度器):
r根据一周期学习率策略设置每个参数组的 学习率。
1cycle 策略将学习率从初始学习率调整到某个最大值
学习率然后从那个最大学习率到某个最小学习率
低于初始学习率。
此策略最初在论文《超快速收敛:》中描述
使用大学习率快速训练神经网络
1cycle 学习率策略在每个批次后改变学习率
在使用批次进行训练后应调用`step`
此调度器不可链式调用
注意,循环中的总步数可以通过两种方式确定(按优先级列出):
#. 明确提供 total_steps 的值。
#. 提供一个 epoch 数和一个每个 epoch 的步数。
#. 提供一个 epoch 数和一个每个 epoch 的步数。
步骤数(steps_per_epoch)已提供。
在这种情况下,总步数是通过推断得出的。
total_steps = epochs * steps_per_epoch
您必须提供 total_steps 的值或提供 steps_per_epoch 和 epochs 两个值。
时期和每个时期的步数。
此调度器的默认行为遵循 fastai 的 1cycle 实现,声称“未发表的工作通过仅使用两个阶段就取得了更好的结果”。为了模仿原始论文的行为,请设置`three_phase=True`。
有关“未发表的工作通过仅使用两个阶段就取得了更好的结果”的说法。为了模仿原始论文的行为,请设置`three_phase=True`。
代替,请设置`three_phase=True`。
参数:
优化器(Optimizer):包装优化器。
max_lr (浮点数或列表):循环中的上限学习率边界。
对于每个参数组。
total_steps (整数):循环中的总步数。注意,
如果此处未提供值,则必须通过提供来推断
一个用于 epochs 和 steps_per_epoch 的值。
默认:None
训练的轮数(int):用于训练的轮数。这用于
使用 steps_per_epoch 来推断周期中的总步数
如果未提供 total_steps 的值。
默认:None
每轮训练的步数(整数):每轮训练的步数。
与 epochs 一起使用,以推断总步数
循环,如果未提供 total_steps 的值。
默认:None
周期中(以步数为单位)花费在增加学习率的百分比。
默认值:0.3
默认:0.3
anneal_strategy (str): {'cos', 'linear'}
指定退火策略:“cos”代表余弦退火,“linear”代表线性退火。
线性退火。
默认:'cos'
cycle_momentum (bool): 如果 ``True``,动量将反向循环
将学习率设置为介于 'base_momentum' 和 'max_momentum' 之间。
默认: True
基础动量(浮点数或列表):循环中的低动量边界
对于每个参数组。注意动量是反向循环的
学习率;在周期的峰值,动量是
'基础动量'和'最大学习率'。
默认:0.85
max_momentum(浮点数或列表):周期中的上限动量边界
对于每个参数组。功能上,
它定义了周期振幅(最大动量 - 基础动量)。
请注意,动量是反向循环的
学习率;在周期开始时,动量是 'max_momentum'
并且学习率是 'base_lr'
默认:0.95
div_factor(浮点数):通过 div_factor 确定初始学习率
initial_lr = max_lr/div_factor
默认:25
final_div_factor(浮点数):通过 final_div_factor 确定最小学习率
min_lr = initial_lr/final_div_factor
默认:1e4
三相(bool):如果为 ``True``,则使用调度第三个阶段来消除
学习率根据 'final_div_factor' 而不是修改第二个阶段
(前两个阶段将关于 'pct_start' 指示的步长对称)
。
last_epoch (int): 最后一个批次的索引。当恢复训练任务时,此参数被使用。由于`step()`应该在每次批次之后调用,而不是在每个 epoch 之后调用,因此这个数字代表计算的总批次数,而不是计算的总 epoch 数。
resuming a training job. Since `step()` should be invoked after each
batch instead of after each epoch, this number represents the total
number of *batches* computed, not the total number of epochs computed.
恢复训练任务时。由于`step()`应该在每次批次之后调用,而不是在每个 epoch 之后调用,因此这个数字代表计算的总批次数,而不是计算的总 epoch 数。
batch instead of after each epoch, this number represents the total
number of *batches* computed, not the total number of epochs computed.
而不是在每个 epoch 之后调用,因此这个数字代表计算的总批次数,而不是计算的总 epoch 数。
number of *batches* computed, not the total number of epochs computed.
计算的总批次数,而不是计算的总 epoch 数。
当 last_epoch=-1 时,计划将从开始处启动。
默认:-1
示例:
>>> # xdoctest: +SKIP
>>> data_loader = torch.utils.data.DataLoader(...)
>>> optimizer = torch.optim.SGD(model.parameters(), lr=1e-4, momentum=0.9)
>>> 调度器 = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.01, steps_per_epoch=len(data_loader), epochs=10)
>>> for epoch in range(10):
>>> for batch in data_loader:
>>> train_batch(...)
>>> 优化器步骤()
>>> 调度器步骤()
.. _超级收敛:使用大学习率的神经网络快速训练:
https://arxiv.org/abs/1708.07120
"文档"
定义 __init__(
自身,
优化器:
优化器,
最大学习率:
并集[float,
列表[float]],
总步数:
可选[int] =
无,
迭代次数:
可选[int] =
无,
每个迭代的步数:
可选[int] =
无,
开始百分比:
浮点数 = 0.3,
退火策略:
直接[
余弦,
线性] =
余弦,
循环动量:
布尔值 =
是,
基础动量:
并集[float,
列表[float]] = 0.85,
最大动量:
并集[float,
列表[float]] = 0.95,
除数因子:
浮点数 = 25.0,
最终除数因子:
浮点数 =
10000,
三相:
布尔值 =
错误,
最后一个 epoch:
整型 = -1,
): # 无需注意:D107
# 验证优化器
如果
不 isinstance(
优化器,
优化器):
提升
类型错误(f"{
类型(
优化器).__name__}
不是一个优化器")
自身.
优化器 =
优化器
# 验证总步数
如果
总步数
是
不
无:
如果
总步数 <= 0
或者
不 isinstance(
总步数, int):
提升 ValueError(
f预期得到正整数总步数,但得到{
总步数}"
)
自身.
总步数 =
总步数
elif 迭代次数
是
不
无
和
每个 epoch 的步数
是
不
无:
如果
不 isinstance(
轮数, int)
或者
周期 <= 0:
提升 ValueError(f
预期得到正整数周期,但得到了{
周期}")
如果
不 isinstance(
每个周期的步数, int)
或者 steps_per_epoch <= 0:
提升 ValueError(
f预期得到正整数 steps_per_epoch,但得到了{steps_per_epoch}"
)
自身.total_steps =
周期 *
每个周期的步数
else:
提升 ValueError(
"您必须定义 total_steps 或 (epochs AND steps_per_epoch)"
)
自身._schedule_phases:
列表[
_阶段安排]
如果
三相:
自身.
_安排阶段 = [
{
结束步骤: float(pct_start *
自身.total_steps) - 1,
"start_lr": "initial_lr",
"end_lr": "max_lr",
"start_momentum": "max_momentum",
"末动量":
"基动量",
},
{
"末步": float(2 *
起始百分比 *
自身.
总步数) - 2,
"起始学习率":
"最大学习率",
"结束学习率":
"初始学习率",
"起始动量":
"基础动量",
"末动量":
"最大动量",
},
{
"末步":
自身.
总步数 - 1,
"起始学习率":
"初始学习率",
"结束学习率": "min_lr",
"start_momentum": "max_momentum",
"end_momentum": 最大动量,
},
]
else:
自身._schedule_phases = [
{
结束步骤: float(
开始百分比 *
自身.
总步骤数) - 1,
"开始学习率":
初始学习率,
"结束学习率":
"最大学习率",
"开始动量":
"最大动量",
"结束动量":
"基础动量",
},
{
"结束步长":
自身.
总步数 - 1,
"开始学习率":
"最大学习率",
"结束学习率":
"最小学习率",
"启动动量":
"基础动量",
"结束动量":
"最大动量",
},
]
验证 pct_start
如果 pct_start < 0
或者 pct_start > 1
或者
不 isinstance(pct_start, float):
提升 ValueError(
f预期输入介于 0 到 1 之间的浮点数 pct_start,但得到{pct_start}"
)
# 验证 anneal_strategy
如果 anneal_strategy
不
在 [
余弦,
线性]:
提升 ValueError(
f"anneal_strategy 必须为'cos'或'linear'之一,而不是得到了"{anneal_strategy}"
)
else:
自身._anneal_func_type =
热处理策略
# 初始化学习率变量
最大学习率 =
_格式参数(
"最大学习率",
自身.
优化器,
最大学习率)
如果
最后的周期 == -1:
for 索引,
群组
在
列举(
自身.
优化器.
参数组):
群组[
初始学习率] =
最大学习率 s[
索引] /
分解因子
群组[
"最大学习率"] =
最大学习率列表[
索引]
群组[
"最小学习率"] =
群组[
初始学习率] /
最终除数因子
初始化动量变量
自身.
循环动量 =
循环动量
如果
自身.
循环动量:
如果 (
动量
不
在
自身.
优化器.
默认
和
贝塔值
不
在
自身.
优化器.
默认
):
提升 ValueError(
"优化器必须支持启用 `cycle_momentum` 选项的动量或 beta1"
)
自身.use_beta1 =
"beta 值"
在
自身.
优化器.
默认
最大动量值 =
格式参数(
最大动量,
优化器,
最大动量)
基础动量 =
_格式参数(
基础动量,
优化器,
基础动量)
如果
最后的周期 == -1:
for m_动量,
动量,
群组
在 zip(
最大动量,
基础动量,
优化器.
参数组
):
如果
我.
使用_beta1:
群组[
贝塔] = (m_momentum, *
群组[
贝塔
]
[1])
else:
群组[
动量] =
动量_m
群组[
最大动量] =
动量_m
群组[
基础动量] =
b_momentum
动量
超级().__init__(
优化器,
上一个纪元)
定义 _anneal_func(
自身, *
参数, **kwargs):
如果
有属性(
自身, "_anneal_func_type"):
如果
自身._anneal_func_type ==
余弦:
返回
自身._annealing_cos(*
参数, **kwargs)
elif 自身._anneal_func_type ==
线性:
返回
自身._annealing_linear(*
参数, **kwargs)
else:
提升 ValueError(f
"未知 _anneal_func_type: "{
自身._anneal_func_type}")
else:
# 为 BC
返回
自身.
热处理函数(*
参数, **kwargs)
# 类型: 忽略[attr-defined]
@staticmethod
定义
_余弦退火(
开始,
结束,
百分比):
随着百分比从 0.0 增加到 1.0,余弦衰减从`start`到`end`。
cos_out = 数学.
余弦(
数学.
圆周率 *
百分比) + 1
返回
末端 + (
开始 -
结束) / 2.0 * cos_out
@staticmethod
定义 _annealing_linear(
开始,
结束,
百分比):
"从 0.0 到 1.0 按百分比线性退火至`start`到`end`。"
返回 (
末端 -
开始) *
百分比 +
开始
[文档] def get_lr(self):
"""计算每个参数组的学习率。"""
_warn_get_lr_called_within_step(self)
lrs = []
step_num = self.last_epoch
if step_num > self.total_steps:
raise ValueError(
f"尝试步进 {step_num} 次。指定的总步数为 {self.total_steps}" # noqa: UP032
)
for group in self.optimizer.param_groups:
start_step = 0.0
for i, phase in enumerate(self._schedule_phases):
end_step = phase["end_step"]
if step_num <= end_step or i == len(self._schedule_phases) - 1:
pct = (step_num - start_step) / (end_step - start_step)
computed_lr = self._anneal_func(
group[phase["start_lr"]], group[phase["end_lr"]], pct
)
如果 self.cycle_momentum:
computed_momentum = self._anneal_func(
group[阶段["起始动力"]]
group[阶段["结束动力"]]
百分比
)
break
start_step = phase["end_step"]
lrs.append(computed_lr) # type: ignore[possibly-undefined]
if self.cycle_momentum:
if self.use_beta1:
group["betas"] = (computed_momentum, *group["betas"][1:]) # type: ignore[possibly-undefined]
else:
group[
动量
] = 计算动量 # type: ignore[possibly-undefined]
返回 lrs