快捷键

torch.optim.lr_scheduler 源代码

# mypy: 允许未类型化定义
r学习率调度器。
导入 数学
导入 类型
导入 警告
from 二分查找 导入 bisect_right
from 集合 导入 计数器
from collections.abc 导入 迭代器, 序列
from functools 导入 偏函数, 包装
from 打字 导入 (
    任何,
    可调用,
    角色,
    直接,
    可选,
    支持浮点数,
    类型字典,
    并集,
)
from 弱引用 导入 分支

from 火炬 导入 无穷, 张量

from .优化器 导入 优化器


全部 = [
    "LambdaLR",
    "乘性学习率",
    "步长学习率",
    "多步长学习率",
    "恒定学习率",
    线性学习率调整,
    指数学习率调整,
    顺序学习率调整,
    余弦退火学习率调整,
    "链式调度器",
    "学习率在平台期减少",
    "循环学习率",
    "余弦退火重启",
    "OneCycleLR",
    "多项式 LR",
    "学习率调度器",
]

EPOCH 弃用警告 = (
    "在 `scheduler.step()` 中的 epoch 参数是不必要的,现在已被弃用。请使用 `scheduler.step()` 来执行调度。"
    "尽可能使用 `scheduler.step()`。在弃用过程中,如果 epoch 与 None 不同,将使用封闭形式而不是新的链式形式,如果可用。"
    "如果 epoch 与 None 不同,在弃用期间将使用封闭形式而不是新的链式形式,如果可用。"
    "如果可用,在弃用期间将使用封闭形式而不是新的链式形式,如果 epoch 与 None 不同。"
    "请创建一个工单,如果您无法复现您的情况:"
    "https://github.com/pytorch/pytorch/issues/new/choose."
)


定义 _format_param(名称: 字符串, 优化器: 优化器, 参数):
    "返回每个参数组的正确格式 lr/momentum。"

    定义 _复制(_param):
        返回 _param.克隆() 如果 isinstance(_param, 张量) 否则 _参数

    如果 isinstance(参数, (列表, 元组)):
        如果 长度(参数) != 长度(优化器.参数组):
            提升 ValueError(
                f"{名称}必须与 optimizer.param_groups 的长度相同。
                f"{名称}具有{长度(参数)}values, param_groups 具有{长度(优化器.参数组)}
            )
    else:
        参数 = [参数] * 长度(优化器.参数组)

    返回 列表(地图(_副本, 参数))


[文档] LRScheduler: r调整优化过程中的学习率。 _在步骤内调用_get_lr_called: 布尔值 = 定义 __init__( 自身, 优化器: 优化器, 最后一个 epoch: 整型 = -1, ): # 无需注意:D107 # 附带优化器 如果 isinstance(优化器, 优化器): 提升 类型错误(f"{类型(优化器).__name__}这不是一个优化器") 自身.优化器 = 优化器 初始化纪元和基础学习率 如果 上一个纪元 == -1: for 群组 优化器.参数组: 初始学习率 = 群组["lr"] 如果 isinstance(初始学习率, 张量): 初始学习率 = 初始学习率.克隆() 群组.setdefault("初始学习率", 初始学习率) else: for i, 群组 列举(优化器.参数组): 如果 "初始学习率" 群组: 提升 键错误( "未指定 '初始学习率' 参数" f"在恢复优化器时,在 param_groups["{i}] 中 ) 自身.基础学习率: 列表[float] = [ 群组[初始学习率] for 群组 优化器.参数组 ] 自身.上一个纪元 = 上一个纪元 # 请参考 https://github.com/pytorch/pytorch/issues/20124 # 我们希望确保在调用 `lr_scheduler.step()` 之后 `optimizer.step()` 定义 patch_track_step_called(opt: 优化器): 如果 有属性(opt.步长, "由 lr_sched 包装"): 我们已经修复了 返回 选项.步骤 定义 包装步骤(step_fn): opt_ref = ref(自身.optimizer) 函数 = step_fn.__func__ @wraps(函数) 定义 包装器(*参数, **kwargs): 选项 = 选项引用() 选项._选项调用_ = 真实 # 类型:忽略[联合属性] 返回 函数.__get__(选择, 选择.)(*参数, **kwargs) 包装器.由 lr_sched 包装 = 真实 # 类型: 忽略[attr-defined] 返回 包装器 优化.步骤 = 包装步长(选项.步长) # 类型:忽略[方法分配] 调用补丁跟踪步骤(自身.优化器) 自身._初始步骤() 定义 _初始步骤(自身): """初始化步骤计数并执行一个步骤。""" 自身._步骤计数 = 0 自身.步长()
[文档] def state_dict(self): 返回调度器的状态作为一个 :class:`dict`。 它包含 self.__dict__中每个变量的条目 不是优化器。 ```python # 输入文本 input_text = '"""' # 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API) def translate_to_simplified_chinese(text): # 假设的翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` return { key: value for key, value in self.__dict__.items() if key != "optimizer" }
[文档] def load_state_dict(self, state_dict: dict[str, Any]): 加载调度器的状态。 参数: state_dict (dict): 调度器状态。应该是一个从调用 :meth:`state_dict` 返回的对象。 应该从调用 :meth:`state_dict` 获取。 """ self.__dict__.update(state_dict)
[文档] def get_last_lr(self) -> list[float]: """返回当前调度器计算的最后学习率。""" return self._last_lr
[文档] def get_lr(self) -> list[float]: 使用调度器的链式形式计算学习率。 抛出未实现异常。
[文档] def step(self, epoch: Optional[int] = None): """执行一个步骤。” # 如果检测到旧模式,则发出警告 # https://github.com/pytorch/pytorch/issues/20124 if self._step_count == 1: if not hasattr(self.optimizer.step, "_wrapped_by_lr_sched"): warnings.warn( "看起来 `optimizer.step()` 在学习率调度器之后被覆盖了" 初始化。请确保在调用 `optimizer.step()` 之前 调用 `lr_scheduler.step()`。更多详细信息请参阅 https://maskerprc.github.io/docs/stable/optim.html#how-to-adjust-learning-rate UserWarning ) # 在两次调用 optimizer.step()之前检查是否有两个 first lr_scheduler.step()调用 elif not getattr(self.optimizer, "_opt_called", False): warnings.warn( "检测到在调用 `lr_scheduler.step()` 之前调用了 `optimizer.step()`。" "在 PyTorch 1.1.0 及以后的版本中,您应该按照相反的顺序调用它们:" "在 `lr_scheduler.step()` 之前调用 `optimizer.step()`。未能这样做将导致 PyTorch 跳过学习率调度中的第一个值。" "这将导致 PyTorch 跳过学习率调度中的第一个值。" 查看更多详情请点击 https://maskerprc.github.io/docs/stable/optim.html#如何调整学习率 用户警告 ) self._step_count += 1 with _enable_get_lr_call(self): if epoch is None: self.last_epoch += 1 values = self.get_lr() else: warnings.warn(EPOCH_DEPRECATION_WARNING, UserWarning) self.last_epoch = epoch 如果有属性 self._get_closed_form_lr values = cast(list[float], self._get_closed_form_lr()) else: values = self.get_lr() for param_group, lr in zip(self.optimizer.param_groups, values): if isinstance(param_group["lr"], Tensor): param_group["lr"].fill_(lr) else: param_group["lr"] = lr self._last_lr: list[float] = [ group["lr"] for group in self.optimizer.param_groups ]
定义 在步骤内部调用获取学习率警告(学习率调度器: 学习率调度器): 如果 学习率调度器.在步骤内部调用获取学习率: 警告.警告( "要获取调度器计算的最后学习率," "请使用 `get_last_lr()`。", 用户警告, 栈级别=2, ) # 包括 _LRScheduler 以实现向后兼容 # 继承而不是赋值,因为我们希望 _LRScheduler 的 __name__ 仍然是 _LRScheduler(赋值会使其变为 LRScheduler)。 _LRScheduler(LRScheduler): 通过 _enable_get_lr_call: 定义 __init__(自身, o: LRScheduler): 自身.o = o 定义 __进入__(自身): 自身.o._get_lr_called_within_step = 真实 返回 self 定义 __退出__(自身, 类型, , 跟踪回溯): 自身.o._get_lr_called_within_step =
[文档] LambdaLR(LRScheduler): 设置初始学习率。 将每个参数组的初始学习率设置为初始 lr。 乘以给定的函数。当 last_epoch=-1 时,将初始 lr 设置为 lr。 参数: 优化器(Optimizer):包装的优化器。 lr_lambda(函数或列表):一个计算乘法系数的函数 给定一个整数参数 epoch,或此类列表 函数,每个优化器 param_groups 中的组一个。 上一个纪元的索引。默认值:-1。 示例: >>> # xdoctest: +SKIP >>> 假设优化器有两个组。 >>> lambda1 = lambda epoch: epoch // 30 >>> lambda2 = lambda epoch: 0.95 ** epoch >>> scheduler = LambdaLR(optimizer, lr_lambda=[lambda1, lambda2]) >>> for epoch in range(100): >>> 训练(...) >>> 验证(...) >>> 调度器.step() "文档" 定义 __init__( 自身, 优化器: 优化器, 学习率 lambda: 并集[可调用[[int] float] 列表[可调用[[int] float]], 上一个 epoch: 整型 = -1, ): # 无需注意:D107 自身.优化器 = 优化器 自身.学习率 lambdas: 列表[可调用[[int] float]] 如果 isinstance(lr_lambda, 列表) isinstance(lr_lambda, 元组): 自身.lr_lambdas = [lr_lambda] * 长度(优化器.参数组) else: 如果 长度(学习率 lambda) != 长度(优化器.参数组): 提升 ValueError( f预期{长度(优化器.参数组)}lr_lambdas,但得到了{长度(lr_lambda)}" ) 自身.lr_lambdas = 列表(lr_lambda) 超级().__init__(优化器, 上一个纪元)
[文档] def 状态字典(self): """返回调度器的状态作为 :class:`dict`。 它包含 self.__dict__ 中每个变量的条目 这不是一个优化器。 学习率 lambda 函数只有在它们是可调用对象时才会被保存。 而不是它们是函数或 lambda 时。 保存或加载调度器时,请确保同时保存或加载优化器的状态。 """ state_dict = { 键: 值 for key, value in self.__dict__.items() 如果键不在("optimizer", "lr_lambdas")中 } state_dict["lr_lambdas"] = [None] * len(self.lr_lambdas) for idx, fn in enumerate(self.lr_lambdas): 如果不是 types.FunctionType 类型: state_dict["lr_lambdas"][idx] = fn.__dict__.copy() 返回 state_dict
[文档] def load_state_dict(self, state_dict): 加载调度器的状态。 当保存或加载调度器时,请确保同时保存或加载优化器的状态。 参数: state_dict (dict): 调度器状态。应为返回的对象。 从对 :meth:`state_dict` 的调用中 """ lr_lambdas = state_dict.pop("lr_lambdas") self.__dict__.update(state_dict) 恢复 state_dict 键的顺序以防止副作用 https://github.com/pytorch/pytorch/issues/32756 state_dict["lr_lambdas"] = lr_lambdas for idx, fn in enumerate(lr_lambdas): if fn is not None: self.lr_lambdas[idx].__dict__.update(fn)
[文档] def get_lr(self): 计算学习率。 _warn_get_lr_called_within_step(self) return [ base_lr * lmbda(self.last_epoch) for lmbda, base_lr in zip(self.lr_lambdas, self.base_lrs) ]
[文档] 乘性学习率调整器(学习率调度器): 将每个参数组的 学习率乘以指定函数中给出的因子。 当 last_epoch=-1 时,设置初始 lr 为 lr。 参数: 优化器(Optimizer):包装后的优化器。 lr_lambda(函数或列表):一个函数,用于根据整数参数 epoch 计算一个乘法 因子,或者是一个这样的列表。 优化器中每个组对应的函数。 last_epoch(整数):最后一个 epoch 的索引。默认值:-1。 示例: >>> # xdoctest: +SKIP >>> lmbda = lambda epoch: 0.95 >>> scheduler = MultiplicativeLR(optimizer, lr_lambda=lmbda) >>> for epoch in range(100): >>> 训练(...) >>> 验证(...) >>> 调度器.step() "文档" 定义 __init__( 自身, 优化器: 优化器, 学习率 lambda: 并集[可调用[[int] float] 列表[可调用[[int] float]], 上一个 epoch: 整型 = -1, ): # 无需注意:D107 自身.优化器 = 优化器 自身.学习率 lambdas: 列表[可调用[[int] float]] 如果 isinstance(lr_lambda, 列表) isinstance(lr_lambda, 元组): 自身.lr_lambdas = [lr_lambda] * 长度(优化器.参数组) else: 如果 长度(学习率 lambda) != 长度(优化器.参数组): 提升 ValueError( f预期{长度(优化器.参数组)}lr_lambdas,但得到{长度(lr_lambda)}" ) 自身.lr_lambdas = 列表(lr_lambda) 超级().__init__(优化器, 最后一个 epoch)
[文档] def 状态字典(self): 返回调度器的状态作为一个 :class:`dict`。 它包含 self.__dict__中每个变量的条目 不是优化器。 学习率 lambda 函数只有在它们是可调用对象时才会被保存 并且不是如果它们是函数或 lambda。 """ state_dict = { 键: 值 for key, value in self.__dict__.items() if key not in ("optimizer", "lr_lambdas") } state_dict["lr_lambdas"] = [None] * len(self.lr_lambdas) for idx, fn in enumerate(self.lr_lambdas): if not isinstance(fn, types.FunctionType): state_dict["lr_lambdas"][idx] = fn.__dict__.copy() return state_dict
[文档] def 加载状态字典(self, state_dict): """加载调度器的状态。 参数: state_dict (dict): 调度器状态。应为返回的对象 从对 :meth:`state_dict` 的调用中 """ lr_lambdas = state_dict.pop("lr_lambdas") self.__dict__.update(state_dict) 恢复 state_dict 键的顺序以防止副作用 https://github.com/pytorch/pytorch/issues/32756 state_dict["lr_lambdas"] = lr_lambdas for idx, fn in enumerate(lr_lambdas): 如果 fn 不为 None: self.lr_lambdas[idx].__dict__.update(fn)
[文档] def get_lr(self): """计算每个参数组的学习率。""" _warn_get_lr_called_within_step(self) if self.last_epoch > 0: return [ group["lr"] * lmbda(self.last_epoch) for lmbda, group in zip(self.lr_lambdas, self.optimizer.param_groups) ] else: return [group["lr"] for group in self.optimizer.param_groups]
[文档] 步长学习率(学习率调度器): "每步_size 个 epoch 后,将每个参数组的学习率衰减为 gamma。" 请注意,这种衰减可以与其他外部调度器的学习率变化同时发生。 当 last_epoch=-1 时,将初始 lr 设置为 lr。 参数: 优化器(Optimizer):包装的优化器。 步长(int):学习率衰减的周期。 gamma(float):学习率衰减的乘法因子。 默认:0.1。 last_epoch(int):最后一个纪元的索引。默认:-1。 示例: >>> # xdoctest: +SKIP >>> # 假设优化器对所有组使用 lr = 0.05 >>> # lr = 0.05 如果 epoch < 30 >>> # lr = 0.005 如果 30 <= epoch < 60 >>> # lr = 0.0005 如果 60 <= epoch < 90 >>> # ... >>> 调度器 = StepLR(优化器, 步长=30, gamma=0.1) >>> for epoch in range(100): >>> 训练(...) >>> 验证(...) >>> 调度器.step() "文档" 定义 __init__( 自身, 优化器: 优化器, 步长: int, 伽马: 浮点数 = 0.1, 最后一个纪元: 整型 = -1, ): # 无需注意:D107 自身.步长 = 步长 自身.伽马 = 伽马 超级().__init__(优化器, 最后一个纪元)
[文档] def get_lr(self): """计算每个参数组的学习率。""" _warn_get_lr_called_within_step(self) if (self.last_epoch == 0) or (self.last_epoch % self.step_size != 0): return [group["lr"] for group in self.optimizer.param_groups] return [group["lr"] * self.gamma for group in self.optimizer.param_groups]
定义
获取封闭形式的学习率(自身): 返回 [ 基础学习率 * 自身.γ值 ** (自身.最后一个 epoch // 自身.步长) for 基础学习率 自身.基础学习率列表 ]
[文档] 多步学习率调整器(学习率调度器): 当训练轮数达到里程碑之一时,将每个参数组的学习率衰减为 gamma 倍。 注意,这种衰减可以与来自外部调度器的其他学习率变化同时发生。 当 last_epoch=-1 时,将初始 lr 设置为 lr。 参数: 优化器(Optimizer):包装的优化器。 里程碑(列表):时代索引列表。必须递增。 伽马(浮点数):学习率衰减的乘法因子。 默认:0.1。 最后一个 epoch 索引(整数):最后一个时代的索引。默认:-1。 示例: >>> # xdoctest: +SKIP >>> # 假设优化器对所有组使用 lr = 0.05 >>> # lr = 0.05 如果 epoch < 30 >>> # lr = 0.005 如果 30 <= epoch < 80 >>> # lr = 0.0005 如果 epoch >= 80 >>> 调度器 = MultiStepLR(optimizer, milestones=[30,80], gamma=0.1) >>> for epoch in range(100): >>> train(...) >>> validate(...) >>> 调度器步骤() "文档" 定义 __init__( 自身, 优化器: 优化器, 里程碑: 迭代器[int] γ射线: 浮点数 = 0.1, 最后一个纪元: 整型 = -1, ): # 无需注意:D107 自身.里程碑 = 计数器(里程碑) 自身.玻璃 = 玻璃 超级().__init__(优化器, 最后一个纪元)
[文档] def get_lr(self): """计算每个参数组的学习率。""" _warn_get_lr_called_within_step(self) 如果 self.last_epoch 不在 self.milestones 中: 返回 [group["lr"] for group in self.optimizer.param_groups] 返回 [ group["lr"] * self.gamma ** self.milestones[self.last_epoch] for group in self.optimizer.param_groups ]
定义
_get_closed_form_lr(自身): 里程碑 = 排序(自身.里程碑.元素()) 返回 [ 基础学习率 * 自身.γ值 ** 二分查找右边界(阶段列表, 自身.上一个纪元) for 基础学习率 自身.基础学习率列表 ]
[文档] 恒定学习率(LRScheduler): 将每个参数组的学习率乘以一个很小的常数因子。 乘法操作会持续到达到预定义的里程碑:total_iters(总迭代次数)。 注意,这种很小的常数因子的乘法操作可以 发生在其他调度器之外的学习率变化的同时。 当 last_epoch=-1 时,将初始 lr 设置为 lr。 参数: 优化器(Optimizer):包装的优化器。 因子(float):我们乘以学习率的数字,直到达到里程碑。默认:1./3.。 总迭代次数(int):调度器将学习率乘以因子的步数。 默认:5。 last_epoch(整数):最后一个纪元的索引。默认:-1。 示例: >>> # xdoctest: +SKIP >>> 假设优化器对所有组使用 lr = 0.05 >>> lr = 0.025 如果纪元 == 0 >>> # lr = 0.025 如果 epoch == 1 >>> # lr = 0.025 如果 epoch == 2 >>> # lr = 0.025 如果 epoch == 3 >>> # lr = 0.05 如果 epoch >= 4 >>> 调度器 = ConstantLR(优化器, factor=0.5, total_iters=4) >>> for epoch in range(100): >>> train(...) >>> validate(...) >>> 调度器步骤() "文档" 定义 __init__( 自身, 优化器: 优化器, 因子: 浮点数 = 1.0 / 3, 总迭代次数: 整型 = 5, 上次纪元: 整型 = -1, ): # 无需注意:D107 如果 因子 > 1.0 或者 因子 < 0: 提升 ValueError( 预期常数乘法因子应在 0 到 1 之间。 ) 自身.因子 = 因子 自身.总迭代次数 = 总迭代次数 超级().__init__(优化器, 上一个纪元)
[文档] def get_lr(self): 计算每个参数组的学习率。 _warn_get_lr_called_within_step(self) if self.last_epoch == 0: return [group["lr"] * self.factor for group in self.optimizer.param_groups] if self.last_epoch != self.total_iters: return [group["lr"] for group in self.optimizer.param_groups] return [ group["lr"] * (1.0 / self.factor) for group in self.optimizer.param_groups ]
定义
_get_closed_form_lr(自身): 返回 [ 基础学习率 * (自身.因子 + (自身.最后的周期 >= 自身.总迭代次数) * (1 - 自身.因子)) for 基础学习率 自身.基础学习率调整器 ]
[文档] 线性学习率调整器(学习率调度器): "通过线性改变小的乘性因子来衰减每个参数组的学习率。" 迭代乘法直到达到预定义的里程碑:total_iters。 注意,这种衰减可以与学习率的其他变化同时发生。 从外部调度器设置初始 lr 为 lr。当 last_epoch=-1 时。 参数: 优化器(Optimizer):包装的优化器。 start_factor (浮点数): 第一个 epoch 中乘以学习率的数值。 乘数因子在后续 epoch 中逐渐变为 end_factor。 默认:1./3. end_factor (浮点数): 线性变化结束时乘以学习率的数值。 处理过程。默认:1.0。 total_iters(整数):乘法因子达到 1 的迭代次数。 默认:5。 last_epoch(整数):最后一个纪元的索引。默认:-1。 示例: >>> # xdoctest: +SKIP >>> # 假设优化器对所有组使用 lr = 0.05 >>> # 如果 epoch == 0 则 lr = 0.025 >>> # 如果 epoch == 1 则 lr = 0.03125 >>> # 如果 epoch == 2 则 lr = 0.0375 >>> # lr = 0.04375 如果 epoch == 3 >>> # lr = 0.05 如果 epoch >= 4 >>> scheduler = LinearLR(optimizer, start_factor=0.5, total_iters=4) >>> for epoch in range(100): >>> 训练(...) >>> 验证(...) >>> 调度器.step() "文档" 定义 __init__( 自身, 优化器: 优化器, 起始因子: 浮点数 = 1.0 / 3, 结束因子: 浮点数 = 1.0, 总迭代次数: 整型 = 5, 最后一个纪元: 整型 = -1, ): # 无需注意:D107 如果 起始因子 > 1.0 或者 起始因子 <= 0: 提升 ValueError( "起始乘数预期应大于 0 且小于或等于 1。" ) 如果 结束因子 > 1.0 或者 结束因子 < 0: 提升 ValueError( "结束乘数预期应在 0 到 1 之间。" ) 自身.起始因子 = 起始因子 自身.结束因子 = 结束因子 自身.总迭代次数 = 总迭代次数 超级().__init__(优化器, 上次迭代轮数)
[文档] def get_lr(self): """计算学习率。""" _warn_get_lr_called_within_step(self) if self.last_epoch == 0: return [ group["lr"] * self.start_factor for group in self.optimizer.param_groups ] if self.last_epoch > self.total_iters: return [group["lr"] for group in self.optimizer.param_groups] return [ group["lr"] * ( 1.0 + (自身结束因子 - 自身开始因子) / ( 自身总迭代次数 * 自身开始因子 (self.last_epoch - 1) * (self.end_factor - self.start_factor) ) ) for group in self.optimizer.param_groups ]
定义
获取闭合形式的学习率(自身): 返回 [ 基础学习率 * ( 自身.起始因子 + (自身.结束因子 - 自身.起始因子) * 最小(自身.总迭代次数, 自身.上一个纪元) / 自身.总迭代次数 ) for 基础学习率 自身.基础学习率列表 ]
[文档]class ExponentialLR(LRScheduler): """每轮衰减每个参数组的学习率 gamma。 当 last_epoch=-1 时,将初始 lr 设置为 lr。 Args: 优化器(Optimizer):包装的优化器。 gamma(浮点数):学习率衰减的乘法因子。 last_epoch(整数):最后一个 epoch 的索引。默认:-1。 """ def __init__( self, optimizer: Optimizer, gamma: float, last_epoch: int = -1, ): # noqa: D107 self.gamma = gamma super().__init__(optimizer, last_epoch)
[文档] def get_lr(self): """计算每个参数组的学习率。""" _warn_get_lr_called_within_step(self) if self.last_epoch == 0: return [group["lr"] for group in self.optimizer.param_groups] return [group["lr"] * self.gamma for group in self.optimizer.param_groups]
def _get_closed_form_lr(self): return [base_lr * self.gamma**self.last_epoch for base_lr in self.base_lrs]
[文档] SequentialLR(LRScheduler): """包含在优化过程中预期按顺序调用的调度器列表。 具体来说,调度器将根据里程碑点被调用,这应该提供每个调度器在特定时间点被调用的确切间隔。 调度器(列表):链式调度器的列表。 参数: 优化器(Optimizer):包装后的优化器。 调度器(列表):链式调度器的列表。 里程碑(列表):反映里程碑点的整数列表。 last_epoch(整数):最后一个纪元的索引。默认值:-1。 示例: >>> # xdoctest: +SKIP >>> # 假设优化器对所有组使用 lr = 1. >>> # lr = 0.1 如果纪元 == 0 >>> # lr = 0.1 如果 epoch == 1 >>> # lr = 0.9 如果 epoch == 2 >>> # lr = 0.81 如果 epoch == 3 >>> # lr = 0.729 如果 epoch == 4 >>> scheduler1 = ConstantLR(optimizer, factor=0.1, total_iters=2) >>> scheduler2 = ExponentialLR(optimizer, gamma=0.9) >>> scheduler = SequentialLR(optimizer, schedulers=[scheduler1, scheduler2], milestones=[2]) >>> for epoch in range(100): >>> 训练(...) >>> 验证(...) >>> 调度器.step() "文档" 定义 __init__( 自身, 优化器: 优化器, 调度器: 列表[学习率调度器] 里程碑: 列表[int] 最后一个纪元: 整型 = -1, ): # 无需注意:D107 如果 长度(调度器) < 1: 提升 ValueError( f"{自身..__name__}预期至少有一个调度器,但未获取到调度器。 ) for 调度器索引, 调度器 列举(调度器): 如果 有属性(调度程序, 优化器): 提升 类型错误( f"{自身..__name__}在索引处{scheduler_idx}应具有`optimizer`属性。" ) 如果 isinstance(调度器, ReduceLROnPlateau): 提升 ValueError( f"{自身..__name__}不支持 `ReduceLROnPlateau` 调度器,因为它 " 调用 `step` 时需要指定额外的 kwargs,但只指定了一个在索引 f但获取了一个在索引{scheduler_idx}在给定的调度器序列中。" ) 如果 优化器 != 调度器.优化器: 提升 ValueError( f"{自身..__name__}预期所有调度器都属于同一个优化器,但 " f"获取调度器"{调度器..__name__}在索引{调度器_idx}{调度器.优化器}," f"这与...不同"{优化器..__name__} ) 如果 长度(里程碑) != 长度(调度器) - 1: 提升 ValueError( "顺序调度器期望提供的调度器数量比一个多" f"比里程碑点的数量多,但获得了调度器的数量"{长度(调度器)}和“ f"里程碑的数量要等于"{长度(里程碑)}" ) 自身._调度器 = 调度器 自身._里程碑 = 里程碑 自身.最后一个纪元 = 最后一个纪元 + 1 自身.优化器 = 优化器 将学习率重置回初始值 for 群组 自身.优化器.参数组: 群组["lr"] = 群组[初始学习率] # 撤销其他调度器执行的步骤 自身.递归撤销() # 仅对第一个调度器执行初始步骤 自身._调度器[0]._初始步骤() 自身._最后学习率 = 调度器[0].获取最后一个学习率()
[文档] def 递归撤销(self, sched=None): """ 递归撤销由初始化步骤执行的任何步骤 调度器。 """ scheds = self if sched is None else sched 如果 scheds 具有"_schedulers"属性: for s in scheds._schedulers: self.recursive_undo(s) elif hasattr(scheds, "last_epoch"): scheds.last_epoch -= 1
[文档] def step(self): # type: ignore[override] """执行一个步骤。” self.last_epoch += 1 idx = bisect_right(self._milestones, self.last_epoch) scheduler = self._schedulers[idx] if idx > 0 and self._milestones[idx - 1] == self.last_epoch: scheduler.step(0) else: scheduler.step() self._last_lr = scheduler.get_last_lr()
[文档] def state_dict(self): """返回调度器的状态为 :class:`dict`。 它包含 self.__dict__中每个变量的条目 不是优化器。 包装的调度器状态也将被保存。 ```python # 输入文本 input_text = '"""' # 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API) def translate_to_simplified_chinese(text): # 假设的翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` state_dict = { 键: 值 for key, value in self.__dict__.items() if key not in ("optimizer", "_schedulers") } state_dict["_schedulers"] = [None] * len(self._schedulers) for idx, s in enumerate(self._schedulers): state_dict["_schedulers"][idx] = s.state_dict() 返回状态字典
[文档] def load_state_dict(self, state_dict): """加载调度器的状态。 参数: state_dict (dict): 状态字典。应该是一个从调用 :meth:`state_dict` 返回的对象 从一个对 :meth:`state_dict` 的调用中获取 """ _schedulers = state_dict.pop("_schedulers") self.__dict__.update(state_dict) # 恢复 state_dict 键以防止副作用 # https://github.com/pytorch/pytorch/issues/32756 state_dict["_schedulers"] = _schedulers for idx, s in enumerate(_schedulers): self._schedulers[idx].load_state_dict(s)
[文档] 多项式学习率(学习率调度器): 使用多项式函数在给定的 total_iters 中衰减每个参数组的学习率。 当 last_epoch=-1 时,将初始 lr 设置为 lr。 参数: 优化器(Optimizer):包装的优化器。 total_iters(int):调度器衰减学习率的步数。默认:5。 power (浮点数): 多项式的幂。默认值:1.0。 示例: >>> # xdoctest: +SKIP("undefined vars") >>> 假设优化器对所有组使用 lr = 0.001 >>> lr = 0.001 如果 epoch == 0 >>> lr = 0.00075 如果 epoch == 1 >>> # lr = 0.00050 如果 epoch == 2 >>> # lr = 0.00025 如果 epoch == 3 >>> # lr = 0.0 如果 epoch >= 4 >>> scheduler = PolynomialLR(optimizer, total_iters=4, power=1.0) >>> for epoch in range(100): >>> 训练(...) >>> 验证(...) >>> 调度器.step() "文档" 定义 __init__( 自身, 优化器: 优化器, 总迭代次数: 整型 = 5, 功率: 浮点数 = 1.0, 最后一个纪元: 整型 = -1, ): # 无需注意:D107 自身.总迭代次数 = 总迭代次数 自身.功率 = 功率 超级().__init__(优化器, 最后一个纪元)
[文档] def get_lr(self): """计算学习率。""" _warn_get_lr_called_within_step(self) if self.last_epoch == 0 or self.last_epoch > self.total_iters: return [group["lr"] for group in self.optimizer.param_groups] decay_factor = ( (1.0 - self.last_epoch / self.total_iters) / (1.0 - (self.last_epoch - 1) / self.total_iters) ) ** self.power return [group["lr"] * decay_factor for group in self.optimizer.param_groups]
定义
_get_closed_form_lr(自身): 返回 [ ( base_lr * (1.0 - 最小(自身.总迭代次数, 自身.上次纪元) / 自身.总迭代次数) ** 自身.功率 ) for 基础学习率 自身.基础学习率列表 ]
[文档] 余弦退火学习率(学习率调度器): r使用余弦退火计划设置每个参数组的学习率。 将 :math:`\eta_{max}` 设置为初始 lr。 math:`T_{cur}` 是 SGDR 中自上次重启以来的 epoch 数。 .. math:: \begin{aligned} \eta_t & = \eta_{min} + \frac{1}{2}(\eta_{max} - \eta_{min})\left(1 \( \cos\left(\frac{T_{cur}}{T_{max}}\pi\right)\right), \( T_{cur} \neq (2k+1)T_{max}; \) \( \eta_{t+1} = \eta_{t} + \frac{1}{2}(\eta_{max} - \eta_{min}) \) \( \left(1 - \cos\left(\frac{1}{T_{max}}\pi\right)\right), \) & T_{cur} = (2k+1)T_{max} \end{aligned} 当 last_epoch=-1 时,将初始学习率设置为 lr。注意,由于调度 递归定义,学习率可同时修改 外部由其他操作员通过此调度器设置。如果学习率被设置 仅由这个调度器控制,每个步骤的学习率变为: .. math:: \eta_t = \eta_{min} + \frac{1}{2}(\eta_{max} - \eta_{min})\left(1 + \cos\left(\frac{T_{cur}}{T_{max}}\pi\right)\right) \cos\left(\frac{T_{cur}}{T_{max}}\pi\right)\right) 已有提议在 `SGDR: 带有 Warm Restarts 的随机梯度下降`_. 注意,这仅 实现了 SGDR 的余弦退火部分,而不是重启。 参数: 优化器(Optimizer):包装的优化器。 T_max(整型):最大迭代次数。 eta_min (浮点数): 最小学习率。默认值:0。 last_epoch (整数): 上一个训练周期的索引。默认值:-1。 .. _SGDR:随机梯度下降带重启: https://arxiv.org/abs/1608.03983 "文档" 定义 __init__( 自身, 优化器: 优化器, T_max: int, 估计最小值: 浮点数 = 0.0, 上一个纪元: 整型 = -1, ): # 无需注意:D107 自身.最大温度 = 最大温度 自身.估计最小值 = eta_min 超级().__init__(优化器, 上一个纪元)
[文档] def get_lr(self): """获取每个参数组的学习率。""" _warn_get_lr_called_within_step(self) if self.last_epoch == 0: return [group["lr"] for group in self.optimizer.param_groups] elif self._step_count == 1 and self.last_epoch > 0: return [ self.eta_min + (基础学习率 - 自定义最小学习率) * (1 + math.cos((self.last_epoch) * math.pi / self.T_max)) / 2 for base_lr, group in zip(self.base_lrs, self.optimizer.param_groups) ] elif (self.last_epoch - 1 - self.T_max) % (2 * self.T_max) == 0: return [ group["lr"] + (基础学习率 - 最小学习率) * (1 - cos(π / 最大迭代次数)) / 2 for 基础学习率, 群组 in zip(self.base_lrs, self.optimizer.param_groups) ] 返回 [ (1 + cos(π * self.last_epoch / self.T_max)) / (1 + cos(π * (self.last_epoch - 1) / self.T_max)) * (group["lr"] - self.eta_min) + self.eta_min for group in self.optimizer.param_groups ]
定义
_get_closed_form_lr(自身): 返回 [ 自身.eta_min + (基础学习率 - 自身.最小学习率) * (1 + 数学.余弦(数学.圆周率 * 自身.最后的周期 / 自身.最大时间)) / 2 for 基础学习率 自身.基础学习率列表 ]
[文档] 链式调度器(学习率调度器): 链式连接一系列学习率调度器。 接受一系列可链式学习率调度器并调用它们 step() 函数在单个 step() 调用中连续执行。 参数: 调度器(序列):链式调度器的序列。 优化器(Optimizer,可选):包装优化器。默认:无。 示例: >>> # xdoctest: +SKIP >>> 假设优化器对所有组使用 lr = 1.。 >>> lr = 0.09 如果 epoch == 0 >>> lr = 0.081 如果 epoch == 1 >>> # lr = 0.729 如果 epoch == 2 >>> # lr = 0.6561 如果 epoch == 3 >>> # lr = 0.59049 如果 epoch >= 4 >>> scheduler1 = ConstantLR(optimizer, factor=0.1, total_iters=2) >>> scheduler2 = ExponentialLR(optimizer, gamma=0.9) >>> scheduler = ChainedScheduler([scheduler1, scheduler2], optimizer=optimizer) >>> for epoch in range(100): >>> train(...) >>> 验证(...) >>> 调度器.step() "文档" 定义 __init__( 自身, 调度器: 序列[LRScheduler] 优化器: 可选[优化器] = ): # 无需注意:D107 如果 长度(调度器) < 1: 提升 ValueError( f"{自身..__name__}至少需要一个调度器进行链式调用,但未获取到任何调度器。 ) 优化器 = 优化器 或者 调度器[0].优化器 for 调度器索引, 调度器 列举(调度器): 如果 有属性(调度程序, 优化器): 提升 类型错误( f"{自身..__name__}在索引处{scheduler_idx}应具有`optimizer`属性。" ) 如果 isinstance(调度器, ReduceLROnPlateau): 提升 ValueError( f"{自身..__name__}不支持 `ReduceLROnPlateau` 调度器,因为它 " 调用 `step` 时需要指定额外的 kwargs,但只指定了一个在索引 f但获取了一个在索引{scheduler_idx}在给定的调度程序序列中。" ) 如果 优化器 != 调度程序.优化器: 提升 ValueError( f"{自身..__name__}预期所有调度程序都属于同一个优化器,但 " f"得到了调度程序{调度器..__name__}在索引{调度器_idx}{调度器.优化器}," f"这与{优化器..__name__} ) 自身._调度器 = 调度器 自身.优化器 = 优化器 自身._最后学习率 = [ 群组["lr"] for 群组 自身._调度器[-1].优化器.参数组 ]
[文档] def step(self): # type: ignore[override] """执行一步。” for scheduler in self._schedulers: scheduler.step() self._last_lr = [ group["lr"] for group in self._schedulers[-1].optimizer.param_groups ]
[文档] def state_dict(self): """返回调度器的状态作为 :class:`dict`。 它包含 self.__dict__ 中每个变量的条目 这不是一个优化器。 被包装的调度器状态也将被保存。 """ state_dict = { key: value for key, value in self.__dict__.items() if key not in ("optimizer", "_schedulers") } state_dict["_schedulers"] = [None] * len(self._schedulers) for idx, s in enumerate(self._schedulers): state_dict["_schedulers"][idx] = s.state_dict() return state_dict
[文档] def 加载状态字典(self, state_dict): """加载调度器的状态。 参数: state_dict (dict): 调度器状态。应为返回的对象 从对 :meth:`state_dict` 的调用中 """ _schedulers = state_dict.pop("_schedulers") self.__dict__.update(state_dict) 恢复状态字典键的顺序以防止副作用 https://github.com/pytorch/pytorch/issues/32756 state_dict["_schedulers"] = _schedulers for idx, s in enumerate(_schedulers): self._schedulers[idx].load_state_dict(s)
[文档] ReduceLROnPlateau(LRScheduler): """当指标停止改进时减少学习率。 模型通常从减少学习率中受益, 2-10 次一旦学习停滞。此调度器读取指标 数量,如果对于“耐心”数字没有看到改进 随着 epoch 数的增加,学习率降低。 参数: 优化器(Optimizer):包装优化器。 模式(字符串):`min` 或 `max` 之一。在 `min` 模式下,lr 将 当监控的数量停止时,将减少 减少;在`max`模式下将减少 监控的数量已停止增加。默认值: 'min'。 因子(浮点数):学习率将按此因子降低。new_lr = lr * factor。默认:0.1。 降低。new_lr = lr * factor。默认:0.1。 忍耐(整数):在降低学习率之前允许没有改进的 epoch 数。 之后学习率将降低。 例如,考虑没有耐心(`patience = 0`)的情况。 在第一个时期,建立了一个基线,由于没有之前的基线,所以总是被认为是好的。 在第二个时期,如果性能比基线差, 我们就认为这是一个无法容忍的时期。 由于无法容忍的时期(1)的数量大于耐心水平(0), 在本时期结束时降低学习率。 从第三个时期开始,学习率在每个时期结束时继续降低, 如果性能比基线差。如果性能有所提高或保持不变, 学习率未调整。 默认:10。 阈值(浮点数):测量新最优值的阈值, 仅关注显著变化。默认:1e-4。 threshold_mode (字符串): `rel` 或 `abs` 之一。在 `rel` 模式下, dynamic_threshold = best * ( 1 + threshold ) 在 'max' 模式下, mode 或 best * ( 1 - threshold ) 在 `min` 模式下。 在 `abs` 模式下,dynamic_threshold = best + threshold 在 `max`模式或最佳阈值 - `min`模式下的阈值。默认:'rel'。 冷却时间(整数):在降低学习率后等待的 epoch 数,之后恢复正常操作。默认:0。 min_lr(浮点数或列表):一个标量或标量列表。 A scalar or a list of scalars. 学习率所有参数组的下限 或分别对每个组。默认:0。 eps (float): 最小衰减应用于学习率。如果差异 新旧 lr 之间的差距小于 eps,更新是 忽略。默认:1e-8。 示例: >>> # xdoctest: +SKIP >>> optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9) >>> scheduler = ReduceLROnPlateau(optimizer, 'min') >>> for epoch in range(10): >>> 训练(...) >>> 验证损失 = 验证(...) >>> # 注意:step 应在 validate() 之后调用 >>> 调度器.step(val_loss) "文档" 定义 __init__( 自身, 优化器: 优化器, 模式: 直接[min, 最大] = min, 因子: 浮点数 = 0.1, 耐心: 整型 = 10, 阈值: 浮点数 = 0.0001, 阈值模式: 直接[相对, "abs"] = "rel", 冷却时间: 整型 = 0, 最小学习率: 并集[列表[float] float] = 0, eps: 浮点数 = 1e-8, ): # 无需注意:D107 如果 因子 >= 1.0: 提升 ValueError(因子应小于 1.0。) 自身.因子 = 因子 # 添加优化器 如果 isinstance(优化器, 优化器): 提升 类型错误(f"{类型(优化器).__name__}不是一个优化器) 自身.优化器 = 优化器 如果 isinstance(最小学习率, (列表, 元组)): 如果 长度(最小学习率) != 长度(优化器.参数组): 提升 ValueError( f预期{长度(优化器.参数组)}最小学习率列表,获取{长度(最小学习率)}" ) 自身.默认最小学习率 = 自身.最小学习率列表 = 列表(最小学习率) else: 自身.默认最小学习率 = 最小学习率 自身.最小学习率列表 = [最小学习率] * 长度(优化器.参数组) 自身.耐心 = 耐心 自身.冷却 = 冷却 自身.冷却计数器 = 0 自身.模式 = 模式 自身.阈值 = 阈值 自身.阈值模式 = 阈值模式 自身.最佳: 浮点数 自身.坏周期数: 整型 自身.模式更差: 浮点数 # 选择模式的更差值 自身.eps = eps 自身.最后一个纪元 = 0 自身._最后学习率 = [群组["lr"] for 群组 自身.优化器.参数组] 自身.初始化更好( 模式=模式, 阈值=阈值, 阈值模式=阈值模式 ) 自身.重置() 定义 重置(自身): "重置 num_bad_epochs 计数器和冷却计数器。" 自身.最佳 = 自身.模式更差 自身.凉却计数器 = 0 自身.恶劣时代数量 = 0
[文档] def step(self, metrics: SupportsFloat, epoch=None): # type: ignore[override] """执行一步。” 将 `metrics` 转换为浮点数,如果它是一个零维张量 current = float(metrics) 如果 epoch 为 None: epoch = self.last_epoch + 1 else: warnings.warn(EPOCH_DEPRECATION_WARNING, UserWarning) self.last_epoch = epoch if self.is_better(current, self.best): self.best = 当前 self.num_bad_epochs = 0 else: self.num_bad_epochs += 1 if self.in_cooldown: self.cooldown_counter -= 1 self.num_bad_epochs = 0 # 忽略冷却期间任何不良的 epoch if self.num_bad_epochs > self.patience: self._reduce_lr(epoch) self.cooldown_counter = self.cooldown self.num_bad_epochs = 0 self._last_lr = [group["lr"] for group in self.optimizer.param_groups]
定义
减少学习率(自身, 训练轮数): 如果 长度(自身.优化器.参数组) != 长度(自身.最小学习率): 如果 自身.默认最小学习率 : 提升 运行时错误( `optimizer`中的参数组数量 f“(”{长度(自身.优化器.参数组)}) 区别于 f从`ReduceLROnPlateau`初始化时起 f“(”{长度(自身.min_lrs)}), 通常是由于添加了新的 " "参数组到优化器中。请 " "修改 `min_lrs` 字段以匹配长度 " `优化器`参数组。 ) else: 自身.最小学习率。 = [自身.默认最小学习率。] * 长度(自身.优化器.参数组) for i, 参数组。 列举(自身.优化器.参数组): 旧学习率 = float(参数组["lr"]) 新学习率 = 最大值(旧学习率 * 自身.因子, 自身.最小学习率[i]) 如果 旧学习率 - 新学习率 > 自身.eps: 参数组["lr"] = 新_lr @property 定义 在冷却中(自身): # noqa: D102 返回 自身.冷却倒计时 > 0 定义 更好(自身, a, 最佳): # noqa: D102 如果 自身.模式 == 最小 自身.阈值模式 == "相关": 相关 epsilon = 1.0 - 自身.阈值 返回 a < 最好的 * 相对误差 elif 自身.模式 == "最小" 自身.阈值模式 == "abs": 返回 a < 最好的 - 自身.阈值 elif 自身.模式 == "最大" 自身.阈值模式 == 相对: 相对容差 = 自身.阈值 + 1.0 返回 a > 最佳 * 相对容差 else: # 模式 == 'max' 且 epsilon_mode == 'abs': 返回 a > 最佳 + 自身.阈值 定义 _初始化更好(自身, 模式, 阈值, 阈值模式): 如果 模式 {min, 最大}: 提升 ValueError("模式 " + 模式 + "是未知的!") 如果 阈值模式 {"rel", 绝对}: 提升 ValueError("阈值模式 " + 阈值模式 + "是未知的!") 如果 模式 == min: 自身.模式更差 = else: # 模式 == 'max': 自身.模式更差 = - 自身.模式 = 模式 自身.阈值 = 阈值 自身.阈值模式 = 阈值模式 定义 状态字典(自身): # 无需注意:D102 返回 { : for , 自身.字典.项目() 如果 key != 优化器 }
[docs] def 加载状态字典(self, 状态字典): 加载调度器的状态。 self.__dict__.update(state_dict) self._init_is_better( mode=self.mode, threshold=self.threshold, threshold_mode=self.threshold_mode )
[文档] 循环学习率(LRScheduler): r设置每个参数组的学习率,根据循环学习率策略(CLR)。 该策略以恒定频率在两个边界之间循环学习率 如论文《循环学习率训练神经网络》中详细描述的_。 两次边界之间的距离可以在每次迭代或每个周期的基础上进行缩放。 周期性学习率策略在每个批次之后改变学习率。 在使用批次进行训练后应调用`step`。 `step`应在每个批次使用后调用。 本类具有三种内置策略,如论文所述: * "三角形": 基本三角形循环,不进行振幅缩放。 * "三角形 2": 基本三角形循环,每次循环将初始振幅缩小一半。 * "指数范围": 一个循环,每次循环将初始振幅缩放为 :math:`\text{gamma}^{\text{循环迭代次数}}`。 在每个循环迭代中。 此实现改编自 github 仓库:`bckenstler/CLR`_。 参数: 优化器(Optimizer):包装优化器。 base_lr(浮点数或列表):初始学习率,即 周期中每个参数组的下限边界。 max_lr(浮点数或列表):周期中每个参数组的上限学习率边界。 功能上,它定义了周期振幅(max_lr - base_lr)。 它定义了周期振幅(max_lr - base_lr)。 任何周期的 lr 都是基础 lr 的总和 以及振幅的一些缩放;因此 最大 lr 可能实际上并未达到,这取决于 缩放函数。 step_size_up (int): 每次训练迭代的步长。默认值:2000 increasing half of a cycle. Default: 2000 step_size_down (int): 周期减少一半的训练迭代次数。如果 step_size_down 为 None, decreasing half of a cycle. If step_size_down is None, 它被设置为 step_size_up。默认:无 模式(str):{三角形,三角形 2,exp_range}之一。 值对应于上面详细说明的策略。 如果 scale_fn 不为 None,则忽略此参数。 默认: '三角形' gamma (浮点数): 'exp_range' 缩放函数中的常数: gamma**(循环迭代次数) 默认: 1.0 scale_fn(函数):自定义缩放策略,由单个参数 lambda 函数定义 其中 对于所有 x >= 0,0 <= scale_fn(x) <= 1 如果指定,则忽略'mode' 默认:None scale_mode (str): {'cycle', 'iterations'}. 定义 scale_fn 是否在周期数或周期迭代(训练 开始以来的迭代次数)上评估。 周期迭代次数。 默认:'周期' cycle_momentum (bool): 如果 ``True``,动量将反向循环 将学习率设置为介于 'base_momentum' 和 'max_momentum' 之间。 默认: True 基础动量(浮点数或列表):循环中的低动量边界 对于每个参数组。注意动量是反向循环的 学习率;在周期峰值时,动量是 'base_momentum' 和学习率是 'max_lr'。 默认:0.8 max_momentum(浮点数或列表):循环的上限动量 对于每个参数组。功能上, 它定义了循环振幅(max_momentum - base_momentum)。 任何循环的动量是 max_momentum 与 base_momentum 之差 并且对振幅进行一些缩放;因此 基础动量可能实际上无法达到,这取决于 缩放函数。注意动量是反向循环的 学习率;在周期开始时,动量是 'max_momentum' 并且学习率是 'base_lr' 默认:0.9 last_epoch(整型):最后一个批次的索引。此参数在 恢复训练作业。由于`step()`应该在每次批量之后调用,而不是在每个 epoch 之后调用,因此这个数字代表计算的总批次数,而不是计算的总 epoch 数。 当 last_epoch=-1 时,调度从开始处启动。 这个数字代表计算的总批次数,而不是计算的总 epoch 数。 当 last_epoch=-1 时,调度从开始处启动。 默认:-1 示例: >>> # xdoctest: +SKIP >>> 优化器 = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9) >>> 调度器 = torch.optim.lr_scheduler.CyclicLR(优化器, base_lr=0.01, max_lr=0.1) >>> 数据加载器 = torch.utils.data.DataLoader(...) >>> for epoch in range(10): >>> for batch in data_loader: >>> train_batch(...) >>> scheduler.step() .. _循环学习率训练神经网络: https://arxiv.org/abs/1506.01186 .. _bckenstler/CLR: https://github.com/bckenstler/CLR "文档" 定义 __init__( 自身, 优化器: 优化器, 基础学习率: 并集[float, 列表[float]], 最大学习率: 并集[float, 列表[float]], 步长增加: 整型 = 2000, 步长减少: 可选[int] = , 模式: 直接[三角形, 三角形 2, "exp_range"] = "triangular", γ射线: 浮点数 = 1.0, scale_fn: 可选[可调用[[float] float]] = , scale_mode: 直接["周期", "迭代"] = "周期", 周期动力: 布尔 = , 基础动量: 浮点数 = 0.8, 最大动量: 浮点数 = 0.9, 最后一个 epoch: 整型 = -1, ): # 无需注意:D107 添加优化器 如果 isinstance(优化器, 优化器): 提升 类型错误(f"{类型(优化器).__name__}不是一个优化器) 自身.优化器 = 优化器 基础学习率 = _格式参数(基础学习率, 优化器, 基础学习率) 如果 最后一个 epoch == -1: for 学习率, 群组 zip(基础学习率列表, 优化器.参数组): 如果 isinstance(群组["lr"] 张量): lr_val = 学习率.项目() 如果 isinstance(学习率, 张量) 否则 左右 群组["lr"].填充_(lr_val) else: 群组["lr"] = 左右 自身.最大学习率 = _格式参数("最大学习率", 优化器, 最大学习率) 步长增加 = float(步长增加) 步长减小 = ( float(步长减小) 如果 步长减小 否则 步长增加 ) 自身.总大小 = 步长增加 + 步长减少 自身.步长比 = 步长增加 / 自身.总大小 如果 模式 [三角形, 三角形 2, "exp_range"] scale_fn : 提升 ValueError("模式无效且 scale_fn 为空") 自身.模式 = 模式 自身.伽玛 = gamma 自身._scale_fn_ref: 可调用[[float] float] 自身._scale_fn_custom = scale_fn 自身.缩放模式 = 缩放模式 自身._初始化缩放函数() 自身.循环动量 = 循环动量 如果 循环动量: 如果 ( 动量 优化器.默认 贝塔值 优化器.默认 ): 提升 ValueError( "优化器必须支持启用 `cycle_momentum` 选项的动量或 beta1" ) 自身.use_beta1 = "beta 值" 自身.优化器.默认 自身.基础动量 = 格式参数( 基础动量, 优化器, 基础动量 ) 自身.最大动量 = _格式参数("最大动量", 优化器, 最大动量) 如果 上一个纪元 == -1: for m 动量, b 动量, zip( 自身.最大动量, 自身.基础动量, 优化器.参数组 ): 如果 自身.使用 beta1: 群组[贝塔] = (动量_m, *群组[贝塔] [1]) else: 群组[动量] = 动量_m 群组[最大动量] = 动量_m 群组[基础动量] = b 动量 超级().__init__(优化器, 最后一个纪元) 自身.基础_lrs = 基础_lrs 定义 _初始化缩放函数(自身): 如果 自身._自定义缩放函数 : 返回 如果 自身.模式 == 三角形: 自身._scale_fn_ref = 自身._triangular_scale_fn 自身.缩放模式 = "循环" elif 自身.模式 == "三角形 2": 自身._scale_fn_ref = 自身._triangular2_scale_fn 自身.缩放模式 = 循环 elif 自身.模式 == 指数范围: 自身._缩放函数引用_ = 偏函数(自身._exp_range_scale_fn, 自身.伽马) 自身.缩放模式 = "迭代次数"
[文档] def scale_fn(self, x) -> float: 获取缩放策略。 if self._scale_fn_custom is not None: 返回 self._scale_fn_custom(x) else: return self._scale_fn_ref(x) # 静态方法
@staticmethod 定义 _triangular_scale_fn(x: float) 输入文本: -> 翻译: -> float: 返回 1.0 @staticmethod 定义 _triangular2_scale_fn(x: float) 输入文本: -> 翻译: -> float: 返回 1 / (2.0 ** (x - 1)) @staticmethod 定义
_exp_range_scale_fn(伽马: float, x: float) 输入文本: -> 翻译: -> float: 返回 伽马**x
[文档] def get_lr(self): """计算批处理索引处的学习率。 该函数将 `self.last_epoch` 视为最后一个批处理索引。 如果 `self.cycle_momentum` 为 ``True``,则此函数具有副作用, 更新优化器的动量。 """ _warn_get_lr_called_within_step(self) cycle = math.floor(1 + self.last_epoch / self.total_size) x = 1.0 + self.last_epoch / self.total_size - cycle if x <= self.step_ratio: scale_factor = x / self.step_ratio else: scale_factor = (x - 1) / (self.step_ratio - 1) lrs = [] for base_lr, max_lr in zip(self.base_lrs, self.max_lrs): base_height = (max_lr - base_lr) * scale_factor 如果 self.scale_mode 等于 "cycle": lr = base_lr + base_height * self.scale_fn(cycle) 否则: lr = base_lr + base_height * self.scale_fn(self.last_epoch) lrs.append(lr) if self.cycle_momentum: momentums = [] for base_momentum, max_momentum in zip( self.base_momentums, self.max_momentums ): base_height = (max_momentum - base_momentum) * scale_factor if self.scale_mode == "cycle": 动量 = 最大动量 - 基础高度 * self.scale_fn(cycle) else: 动量 = 最大动量 - 基础高度 * self.scale_fn( self.last_epoch ) momentums.append(momentum) for param_group, momentum in zip(self.optimizer.param_groups, momentums): if self.use_beta1: param_group["betas"] = (momentum, *param_group["betas"][1:]) else: param_group["momentum"] = momentum return lrs
定义
状态字典(自身): # 无需注意:D102 状态 = 超级().状态字典() 我们正在删除 `_scale_fn_ref` 属性,因为它是一个 `weakref.WeakMethod` 不能被序列化。 状态.流行(_scale_fn_ref, ) fn = 状态.流行(_scale_fn_custom) 状态[_scale_fn_custom] = 如果 fn isinstance(函数, 类型.函数类型): # _scale_fn_custom 仅当其为可调用对象时才会保存 # 如果它是一个函数或 lambda,则不会保存 状态["_scale_fn_custom"] = 函数.字典.复制() 返回 状态
[文档] def load_state_dict(self, state_dict): """加载调度器的状态。""" fn = state_dict.pop("_scale_fn_custom") super().load_state_dict(state_dict) if fn is not None: self._scale_fn_custom.__dict__.update(fn) self._init_scale_fn()
[文档] 余弦退火重启(学习率调度器): r使用余弦退火计划设置每个参数组的学习率。 其中 :math:`\eta_{max}` 设置为初始 lr,:math:`T_{cur}` 是自上次重启以来的 epoch 数,:math:`T_{i}` 是第 两次热重启之间的 SGDR 时代数: .. math:: \eta_t = \eta_{min} + \frac{1}{2}(\eta_{max} - \eta_{min})\left(1 + \cos\left(\frac{T_{cur}}{T_{i}}\pi\right)\right) 当 :math:`T_{cur}=T_{i}` 时,设置 :math:`\eta_t = \eta_{min}`。 当 :math:`T_{cur}=0` 后重启时,设置 :math:`:eta_t=:eta_{max}`。 已有提议在 `SGDR:带有 Warm Restarts 的随机梯度下降`_。 参数: 优化器(Optimizer):包装的优化器。 T_0 (int): 首次重启前的迭代次数。 T_mult (int, optional): 重启后 :math:`T_{i}` 增加的倍数。默认:1。 eta_min (float, optional): 最小学习率。默认:0。 last_epoch (int, optional): 最后一个纪元的索引。默认:-1。 SGDR:随机梯度下降与预热重启 https://arxiv.org/abs/1608.03983 "文档" 定义 __init__( 自身, 优化器: 优化器, T_0: int, T_mult: 整型 = 1, eta_min: 浮点数 = 0.0, 最后一个纪元: 整型 = -1, ): # 无需注意:D107 如果 T_0 <= 0 或者 isinstance(T_0, int): 提升 ValueError(f预期得到正整数 T_0,但得到了{T_0}") 如果 T_mult < 1 或者 isinstance(T_mult, int): 提升 ValueError(f预期整数 T_mult >= 1,但得到{T_mult}") 如果 isinstance(eta_min, (float, int)): 提升 ValueError( f预期输入浮点数或整数 eta_min,但得到{eta_min}的类型{类型(eta_min)}" ) 自身.T_0 = T_0 自身.T_i = T_0 自身.T_多 = T_多 自身.eta_最小 = eta_最小 自身.T_cur = 最后一个纪元 超级().__init__(优化器, 最后一个纪元)
[文档] def get_lr(self): """计算初始学习率。""" _warn_get_lr_called_within_step(self) return [ self.eta_min + (base_lr - self.eta_min) * (1 + cos(π * self.T_cur / self.T_i)) / 2 for base_lr in self.base_lrs ]
[文档] 定义 步长(自身, 埃波克=): 步骤可以在每次批量更新后调用。 示例: >>> # xdoctest: +SKIP("未定义的变量") >>> 调度器 = CosineAnnealingWarmRestarts(optimizer, T_0, T_mult) >>> iters = len(dataloader) >>> for epoch in range(20): >>> for i, sample in enumerate(dataloader): >>> inputs, labels = sample['inputs'], sample['labels'] >>> optimizer.zero_grad() >>> outputs = net(inputs) >>> loss = criterion(outputs, labels) >>> loss.backward() >>> 优化器步骤() >>> 调度器步骤(epoch + i / iters) 此函数可以交错调用。 示例: >>> # xdoctest: +SKIP("未定义变量") >>> 调度器 = CosineAnnealingWarmRestarts(optimizer, T_0, T_mult) >>> for epoch in range(20): >>> 调度器.step() >>> 调度器.step(26) >>> 调度器步骤() # 调度器步骤(27),而不是调度器(20) "文档" 如果 训练轮次 自身.最后的周期 < 0: 训练轮次 = 0 如果 训练轮次 : 埃波克 = 自身.最后的周期 + 1 自身.当前温度 = 自身.当前温度 + 1 如果 自身.当前温度 >= 自身.T_i: 自身.T_cur = 自身.T_cur - 自身.T_i 自身.T_i = 自身.T_i * 自身.T_mult else: 如果 epoch < 0: 提升 ValueError(f预期得到非负的纪元,但得到了{纪元}") 如果 纪元 >= 自身.T_0: 如果 自身.T_mult == 1: 自身.T_cur = 埃波克 % 自身.T_0 else: n = int( 数学.日志( (埃波克 / 自身.T_0 * (自身.T_多 - 1) + 1), 自身.T_多 ) ) 自身.T_当前 = 埃波克 - 自身.T_0 * (自身.T_多**n - 1) / ( 自身.T_多 - 1 ) 自身.T_i = 自身.T_0 * 自身.T_mult ** (n) else: 自身.T_i = 自身.T_0 自身.T_cur = 时间纪元 自身.最后的周期 = 数学.向下取整(时间纪元) 启用获取学习率调用(自身): for 参数组, 左右 zip(自身.优化器.参数组, 自身.获取学习率()): 参数组["lr"] = 左右 自身._最后学习率 = [群组["lr"] for 群组 自身.优化器.参数组]
_调度阶段(类型字典): 结束步骤: 浮点数 开始学习率: 字符串 结束学习率: 字符串 启动动量: 字符串 结束动量: 字符串
[文档] OneCycleLR(学习率调度器): r根据一周期学习率策略设置每个参数组的 学习率。 1cycle 策略将学习率从初始学习率调整到某个最大值 学习率然后从那个最大学习率到某个最小学习率 低于初始学习率。 此策略最初在论文《超快速收敛:》中描述 使用大学习率快速训练神经网络 1cycle 学习率策略在每个批次后改变学习率 在使用批次进行训练后应调用`step` 此调度器不可链式调用 注意,循环中的总步数可以通过两种方式确定(按优先级列出): #. 明确提供 total_steps 的值。 #. 提供一个 epoch 数和一个每个 epoch 的步数。 #. 提供一个 epoch 数和一个每个 epoch 的步数。 步骤数(steps_per_epoch)已提供。 在这种情况下,总步数是通过推断得出的。 total_steps = epochs * steps_per_epoch 您必须提供 total_steps 的值或提供 steps_per_epoch 和 epochs 两个值。 时期和每个时期的步数。 此调度器的默认行为遵循 fastai 的 1cycle 实现,声称“未发表的工作通过仅使用两个阶段就取得了更好的结果”。为了模仿原始论文的行为,请设置`three_phase=True`。 有关“未发表的工作通过仅使用两个阶段就取得了更好的结果”的说法。为了模仿原始论文的行为,请设置`three_phase=True`。 代替,请设置`three_phase=True`。 参数: 优化器(Optimizer):包装优化器。 max_lr (浮点数或列表):循环中的上限学习率边界。 对于每个参数组。 total_steps (整数):循环中的总步数。注意, 如果此处未提供值,则必须通过提供来推断 一个用于 epochs 和 steps_per_epoch 的值。 默认:None 训练的轮数(int):用于训练的轮数。这用于 使用 steps_per_epoch 来推断周期中的总步数 如果未提供 total_steps 的值。 默认:None 每轮训练的步数(整数):每轮训练的步数。 与 epochs 一起使用,以推断总步数 循环,如果未提供 total_steps 的值。 默认:None 周期中(以步数为单位)花费在增加学习率的百分比。 默认值:0.3 默认:0.3 anneal_strategy (str): {'cos', 'linear'} 指定退火策略:“cos”代表余弦退火,“linear”代表线性退火。 线性退火。 默认:'cos' cycle_momentum (bool): 如果 ``True``,动量将反向循环 将学习率设置为介于 'base_momentum' 和 'max_momentum' 之间。 默认: True 基础动量(浮点数或列表):循环中的低动量边界 对于每个参数组。注意动量是反向循环的 学习率;在周期的峰值,动量是 '基础动量'和'最大学习率'。 默认:0.85 max_momentum(浮点数或列表):周期中的上限动量边界 对于每个参数组。功能上, 它定义了周期振幅(最大动量 - 基础动量)。 请注意,动量是反向循环的 学习率;在周期开始时,动量是 'max_momentum' 并且学习率是 'base_lr' 默认:0.95 div_factor(浮点数):通过 div_factor 确定初始学习率 initial_lr = max_lr/div_factor 默认:25 final_div_factor(浮点数):通过 final_div_factor 确定最小学习率 min_lr = initial_lr/final_div_factor 默认:1e4 三相(bool):如果为 ``True``,则使用调度第三个阶段来消除 学习率根据 'final_div_factor' 而不是修改第二个阶段 (前两个阶段将关于 'pct_start' 指示的步长对称) last_epoch (int): 最后一个批次的索引。当恢复训练任务时,此参数被使用。由于`step()`应该在每次批次之后调用,而不是在每个 epoch 之后调用,因此这个数字代表计算的总批次数,而不是计算的总 epoch 数。 resuming a training job. Since `step()` should be invoked after each batch instead of after each epoch, this number represents the total number of *batches* computed, not the total number of epochs computed. 恢复训练任务时。由于`step()`应该在每次批次之后调用,而不是在每个 epoch 之后调用,因此这个数字代表计算的总批次数,而不是计算的总 epoch 数。 batch instead of after each epoch, this number represents the total number of *batches* computed, not the total number of epochs computed. 而不是在每个 epoch 之后调用,因此这个数字代表计算的总批次数,而不是计算的总 epoch 数。 number of *batches* computed, not the total number of epochs computed. 计算的总批次数,而不是计算的总 epoch 数。 当 last_epoch=-1 时,计划将从开始处启动。 默认:-1 示例: >>> # xdoctest: +SKIP >>> data_loader = torch.utils.data.DataLoader(...) >>> optimizer = torch.optim.SGD(model.parameters(), lr=1e-4, momentum=0.9) >>> 调度器 = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.01, steps_per_epoch=len(data_loader), epochs=10) >>> for epoch in range(10): >>> for batch in data_loader: >>> train_batch(...) >>> 优化器步骤() >>> 调度器步骤() .. _超级收敛:使用大学习率的神经网络快速训练: https://arxiv.org/abs/1708.07120 "文档" 定义 __init__( 自身, 优化器: 优化器, 最大学习率: 并集[float, 列表[float]], 总步数: 可选[int] = , 迭代次数: 可选[int] = , 每个迭代的步数: 可选[int] = , 开始百分比: 浮点数 = 0.3, 退火策略: 直接[余弦, 线性] = 余弦, 循环动量: 布尔值 = , 基础动量: 并集[float, 列表[float]] = 0.85, 最大动量: 并集[float, 列表[float]] = 0.95, 除数因子: 浮点数 = 25.0, 最终除数因子: 浮点数 = 10000, 三相: 布尔值 = 错误, 最后一个 epoch: 整型 = -1, ): # 无需注意:D107 # 验证优化器 如果 isinstance(优化器, 优化器): 提升 类型错误(f"{类型(优化器).__name__}不是一个优化器") 自身.优化器 = 优化器 # 验证总步数 如果 总步数 : 如果 总步数 <= 0 或者 isinstance(总步数, int): 提升 ValueError( f预期得到正整数总步数,但得到{总步数}" ) 自身.总步数 = 总步数 elif 迭代次数 每个 epoch 的步数 : 如果 isinstance(轮数, int) 或者 周期 <= 0: 提升 ValueError(f预期得到正整数周期,但得到了{周期}") 如果 isinstance(每个周期的步数, int) 或者 steps_per_epoch <= 0: 提升 ValueError( f预期得到正整数 steps_per_epoch,但得到了{steps_per_epoch}" ) 自身.total_steps = 周期 * 每个周期的步数 else: 提升 ValueError( "您必须定义 total_steps 或 (epochs AND steps_per_epoch)" ) 自身._schedule_phases: 列表[_阶段安排] 如果 三相: 自身._安排阶段 = [ { 结束步骤: float(pct_start * 自身.total_steps) - 1, "start_lr": "initial_lr", "end_lr": "max_lr", "start_momentum": "max_momentum", "末动量": "基动量", }, { "末步": float(2 * 起始百分比 * 自身.总步数) - 2, "起始学习率": "最大学习率", "结束学习率": "初始学习率", "起始动量": "基础动量", "末动量": "最大动量", }, { "末步": 自身.总步数 - 1, "起始学习率": "初始学习率", "结束学习率": "min_lr", "start_momentum": "max_momentum", "end_momentum": 最大动量, }, ] else: 自身._schedule_phases = [ { 结束步骤: float(开始百分比 * 自身.总步骤数) - 1, "开始学习率": 初始学习率, "结束学习率": "最大学习率", "开始动量": "最大动量", "结束动量": "基础动量", }, { "结束步长": 自身.总步数 - 1, "开始学习率": "最大学习率", "结束学习率": "最小学习率", "启动动量": "基础动量", "结束动量": "最大动量", }, ] 验证 pct_start 如果 pct_start < 0 或者 pct_start > 1 或者 isinstance(pct_start, float): 提升 ValueError( f预期输入介于 0 到 1 之间的浮点数 pct_start,但得到{pct_start}" ) # 验证 anneal_strategy 如果 anneal_strategy [余弦, 线性]: 提升 ValueError( f"anneal_strategy 必须为'cos'或'linear'之一,而不是得到了"{anneal_strategy}" ) else: 自身._anneal_func_type = 热处理策略 # 初始化学习率变量 最大学习率 = _格式参数("最大学习率", 自身.优化器, 最大学习率) 如果 最后的周期 == -1: for 索引, 群组 列举(自身.优化器.参数组): 群组[初始学习率] = 最大学习率 s[索引] / 分解因子 群组["最大学习率"] = 最大学习率列表[索引] 群组["最小学习率"] = 群组[初始学习率] / 最终除数因子 初始化动量变量 自身.循环动量 = 循环动量 如果 自身.循环动量: 如果 ( 动量 自身.优化器.默认 贝塔值 自身.优化器.默认 ): 提升 ValueError( "优化器必须支持启用 `cycle_momentum` 选项的动量或 beta1" ) 自身.use_beta1 = "beta 值" 自身.优化器.默认 最大动量值 = 格式参数(最大动量, 优化器, 最大动量) 基础动量 = _格式参数(基础动量, 优化器, 基础动量) 如果 最后的周期 == -1: for m_动量, 动量, 群组 zip( 最大动量, 基础动量, 优化器.参数组 ): 如果 .使用_beta1: 群组[贝塔] = (m_momentum, *群组[贝塔] [1]) else: 群组[动量] = 动量_m 群组[最大动量] = 动量_m 群组[基础动量] = b_momentum 动量 超级().__init__(优化器, 上一个纪元) 定义 _anneal_func(自身, *参数, **kwargs): 如果 有属性(自身, "_anneal_func_type"): 如果 自身._anneal_func_type == 余弦: 返回 自身._annealing_cos(*参数, **kwargs) elif 自身._anneal_func_type == 线性: 返回 自身._annealing_linear(*参数, **kwargs) else: 提升 ValueError(f"未知 _anneal_func_type: "{自身._anneal_func_type}") else: # 为 BC 返回 自身.热处理函数(*参数, **kwargs) # 类型: 忽略[attr-defined] @staticmethod 定义 _余弦退火(开始, 结束, 百分比): 随着百分比从 0.0 增加到 1.0,余弦衰减从`start`到`end`。 cos_out = 数学.余弦(数学.圆周率 * 百分比) + 1 返回 末端 + (开始 - 结束) / 2.0 * cos_out @staticmethod 定义 _annealing_linear(开始, 结束, 百分比): "从 0.0 到 1.0 按百分比线性退火至`start`到`end`。" 返回 (末端 - 开始) * 百分比 + 开始
[文档] def get_lr(self): """计算每个参数组的学习率。""" _warn_get_lr_called_within_step(self) lrs = [] step_num = self.last_epoch if step_num > self.total_steps: raise ValueError( f"尝试步进 {step_num} 次。指定的总步数为 {self.total_steps}" # noqa: UP032 ) for group in self.optimizer.param_groups: start_step = 0.0 for i, phase in enumerate(self._schedule_phases): end_step = phase["end_step"] if step_num <= end_step or i == len(self._schedule_phases) - 1: pct = (step_num - start_step) / (end_step - start_step) computed_lr = self._anneal_func( group[phase["start_lr"]], group[phase["end_lr"]], pct ) 如果 self.cycle_momentum: computed_momentum = self._anneal_func( group[阶段["起始动力"]] group[阶段["结束动力"]] 百分比 ) break start_step = phase["end_step"] lrs.append(computed_lr) # type: ignore[possibly-undefined] if self.cycle_momentum: if self.use_beta1: group["betas"] = (computed_momentum, *group["betas"][1:]) # type: ignore[possibly-undefined] else: group[ 动量 ] = 计算动量 # type: ignore[possibly-undefined] 返回 lrs

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源