# mypy: 允许未类型化定义
r随机梯度下降优化器的实现
from 打字
导入
角色,
可选,
联合
导入
火炬
from 火炬
导入
张量
from .优化器
导入 (
默认为融合或遍历,
_检查融合的设备数据类型,
_可区分文档,
_foreach_doc,
_融合文档,
_maximize_doc,
_params_doc,
_use_grad_for_differentiable,
设备字典,
优化器,
参数 T,
)
全部 = [
SGD,
sgd]
[文档]
类 SGD(
优化器): # noqa: D101
定义 __init__(
我,
参数:
参数 T,
学习率:
联盟[float,
张量] =
0.001,
动量:
浮点数 = 0,
阻尼:
浮点数 = 0,
权重衰减:
联盟[float,
张量] = 0,
内斯特罗夫:
布尔值 =
错误,
*,
最大化:
布尔值 =
错误,
foreach: 可选[
布尔] =
无,
可微分的:
布尔值 =
错误,
融合:
可选[
布尔] =
无,
): # 无需注意:D107
如果 isinstance(
学习率,
张量)
和
学习率.
元素数量() != 1:
提升 ValueError(
"Tensor lr 必须是 1 个元素")
如果
左右 < 0.0:
提升 ValueError(f
"无效的学习率:"{
学习率}")
如果
动量 < 0.0:
提升 ValueError(f
"无效的动量值:"{
动量}")
如果
权重衰减 < 0.0:
提升 ValueError(f
"无效的 weight_decay 值:"{
权重衰减}")
默认 =
字典(
学习率=
学习率,
动量=
动量,
阻尼=
衰减,
权重衰减=
权重衰减,
内斯特罗夫=
内斯特罗夫,
最大化=
最大化,
foreach=foreach,
可微分的=
可微分的,
融合=
融合,
)
如果
内斯特罗夫
和 (
动量 <= 0
或者
阻尼 != 0):
提升 ValueError(
"Nesterov 动量需要动量和零阻尼")
超级().__init__(
参数,
默认值)
如果
融合:
我.
_支持_step_supports_amp_scaling =
真实
我.
需要为融合进行设备数据类型检查 =
真实
如果
可微分的:
提升
运行时错误(
"融合不支持可微分")
如果 foreach:
提升
运行时错误(
"融合和 foreach 不能同时为 True。")
定义 __setstate__(
我,
状态):
# 无需注意:D105
超级().__setstate__(
状态)
for 组
在
我.
参数组:
群组.setdefault(
Nesterov,
错误)
群组.setdefault(
最大化,
错误)
群组.setdefault(
foreach,
无)
群组.setdefault(
可微分的,
错误)
群组.setdefault(
融合,
错误)
定义
初始化组(
我,
群组,
参数,
梯度,
动量缓冲区列表):
有稀疏梯度 =
假
for p 在
群组[
参数]:
如果 p.
梯度
是
不
无:
如果
群组[
融合]
和 getattr(
我,
_需要检查融合的设备数据类型,
真实
):
_检查融合的设备数据类型(p)
我.
需要为融合进行设备数据类型检查 =
假
参数.append(p)
梯度.append(p.
研究生)
如果 p.
研究生.is_sparse:
有稀疏梯度 =
真实
如果
群组[
动量] != 0:
状态 =
我.
状态[p]
动量缓冲区列表.append(
状态.
获取(
动量缓冲区))
返回
有稀疏梯度
[文档] @使用梯度进行可微分
def step(self, closure=None):
"""执行单个优化步骤。
Args:
closure (Callable, 可选): 一个重新评估模型的闭包
返回损失。
"""
loss = None
如果 closure 不为 None:
with torch.enable_grad():
loss = closure()
for group in self.param_groups:
params: list[Tensor] = []
grads: list[Tensor] = []
momentum_buffer_list: list[Optional[Tensor]] = []
has_sparse_grad = self._init_group(
group, params, grads, momentum_buffer_list
)
sgd(
params,
grads,
动量缓冲区列表,
weight_decay=group["weight_decay"],
momentum=group["momentum"],
lr=group["lr"],
dampening=group["dampening"]
nesterov=group["nesterov"]
maximize=group["maximize"]
has_sparse_grad=has_sparse_grad
foreach=group["foreach"],
fused=group["fused"],
grad_scale=getattr(self, "grad_scale", None),
found_inf=getattr(self, "found_inf", None),
)
if group["momentum"] != 0:
# 更新状态中的 momentum_buffers
for p, momentum_buffer in zip(params, momentum_buffer_list):
state = self.state[p]
state["momentum_buffer"] = momentum_buffer
return loss
SGD.__doc__ = (
r实现随机梯度下降(可选带有动量)。
.. math::
\begin{aligned}
&\rule{110mm}{0.4pt} \\
&\textbf{输入} : \gamma \text{ (lr)}, \: \theta_0 \text{ (params)}, \: f(\theta)
目标函数,λ(权重衰减)
&\hspace{13mm} \:\mu \text{(动量)}, \:\tau \text{(阻尼)},
\:\textit{ nesterov,}\:\textit{最大化} \\[-1.ex]
&\rule{110mm}{0.4pt} \\
for t=1 到 ... do
g_t ← ∇_θ f_t (θ_{t-1}) \\
&\hspace{5mm}\textbf{if} \: \lambda \neq 0 \\
\(g_t \leftarrow g_t + \lambda \theta_{t-1}\)
&\hspace{5mm}\textbf{如果} \: \mu \neq 0 \\
&\hspace{10mm}\textbf{if} \: t > 1
&\hspace{15mm} \textbf{b}_t \leftarrow \mu \textbf{b}_{t-1} + (1-\tau) g_t
&\hspace{10mm}\textbf{else}
&\hspace{15mm} \textbf{b}_t \leftarrow g_t
&\hspace{10mm}\textbf{if} \: \textit{Nesterov}
&\hspace{15mm} g_t \leftarrow g_{t} + \mu \textbf{b}_t
&\hspace{10mm}\textbf{else}
&\hspace{15mm} g_t \leftarrow \textbf{b}_t
&\hspace{5mm}\textbf{如果} \: \textit{最大化} \\
&\hspace{10mm}\theta_t \leftarrow \theta_{t-1} + \gamma g_t \\[-1.ex]
&\hspace{5mm}\textbf{否则} \\[-1.ex]
&\hspace{10mm}\theta_t \leftarrow \theta_{t-1} - \gamma g_t \\[-1.ex]
&\规则{110 毫米}{0.4 点} \\[-1.ex]
&\bf{return} \: \theta_t \\[-1.ex]
&\规则{110 毫米}{0.4 点} \\[-1.ex]
\end{aligned}
Nesterov 动量基于以下公式
《深度学习中初始化和动量的重要性》__
"文档"
+ rf""
参数:
{_params_doc}
lr (float, Tensor, 可选): 学习率 (默认: 1e-3)
momentum (float, 可选): 动量因子 (默认: 0)
阻尼(浮点数,可选):动量的阻尼(默认:0)
权重衰减(float,可选):权重衰减(L2 惩罚)(默认:0)
nesterov(布尔值,可选):启用 Nesterov 动量。仅当动量非零时适用。
(默认:False)
{_maximize_doc}
{_foreach_doc}
{_可区分文档}
{_融合文档}
"文档"
+ r""
示例:
>>> # xdoctest: +SKIP
>>> 优化器 = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9)
>>> optimizer.zero_grad()
>>> loss_fn(model(input), target).backward()
>>> optimizer.step()
__ http://www.cs.toronto.edu/~hinton/absps/momentum.pdf
.. 注意::
SGD 与动量/Neisterov 的实现与 Sutskever 等人以及某些其他框架中的实现略有不同。
与 Sutskever 等人以及某些其他框架中的实现略有不同。
考虑动量这个特定案例,更新可以写成
.. math::
\begin{aligned}
v_{t+1} & = \mu * v_{t} + g_{t+1}, \\
p_{t+1} & = p_{t}文本
左右
v_{t+1},
\end{对齐}
其中 :math:`p`、:math:`g`、:math:`v` 和 :math:`:mu` 分别表示
参数、梯度、速度和动量。
这与 Sutskever 等人的方法形成对比。
其他采用形式更新的框架
.. math::
\begin{对齐}
v_{t+1} & = \mu * v_{t} + \text{lr}* g_{t+1}, \\\
p_{t+1} & = p_{t} - v_{t+1}.
\end{对齐}
涅斯特罗夫版本进行了类似修改。
此外,动量缓冲区的初始值设置为
梯度值在第一步。这与某些其他
初始化为全零的框架。
"文档"
)
定义
优化梯度下降法。(
参数:
列表[
张量
]
d_p_list: 列表[
张量
]
动量缓冲区列表。:
列表[
可选[
张量]],
使用 torchscript 编译的函数不支持带默认值的只写关键字参数问题 #70627
现由 torch/distributed/optim 编译的函数 API 参数
有稀疏梯度:
布尔值 =
错误,
foreach: 可选[
布尔] =
无,
融合:
可选[
布尔] =
无,
梯度缩放:
可选[
张量] =
无,
找到无穷大:
可选[
张量] =
无,
*,
权重衰减: float,
动量: float,
学习率: float,
衰减: float,
内斯特罗夫:
布尔,
最大化:
布尔,
):
r执行 SGD 算法计算的函数式 API。
详细信息请参阅 :class:`~torch.optim.SGD`。
"文档"
当用户输入 False/True 用于 foreach 或 fused 时,请尊重。我们只想更改
# 默认值,当两个都没有被指定时。注意,我们默认为 foreach
# 并传递 False 以使用_fused。这不是错误——我们希望提供融合实现
# 在将其作为默认值之前先进行内嵌时间,即使它通常更快。
如果 foreach
是
无
和 fused
是
无:
# 为什么在 torch.jit.is_scripting 这里必须显式地处理 if 语句?
# 因为 JIT 在脚本化时无法处理可选类型以及复杂的条件语句。
如果
不 torch.
算子.
是否正在脚本化():
融合, foreach =
默认为融合或遍历(
参数,
可微分的=
错误,
使用融合的=
假
)
else:
foreach = 假
fused = 假
如果 foreach
是
无:
foreach = 假
如果 fused
是
无:
fused = 假
如果 foreach
和 torch.
算子.
是否正在脚本化():
提升
运行时错误(
"torch.jit.script 不支持 foreach 优化器")
如果 fused
和 torch.
算子.
是否正在脚本化():
提升
运行时错误(
"torch.jit.script 不支持与融合优化器一起使用")
如果 foreach
和
不 torch.
算子.
是否正在脚本化():
函数 =
多张量 SGD
elif fused 和
不 torch.
算子.
是否正在脚本化():
函数 =
熔合 SGD
else:
函数 =
单张量 SGD
函数(
参数,
d_p_list,
动量缓冲区列表,
权重衰减=
权重衰减,
动量=
动量,
学习率=
学习率,
阻尼=
阻尼,
nesterov=内斯特罗夫,
有稀疏梯度=
有稀疏梯度,
最大化=
最大化,
梯度缩放=
梯度缩放,
找到无穷大=
找到无穷大,
)
定义
单张张量 SGD(
参数:
列表[
张量
]
梯度:
列表[
张量
]
动量缓冲区列表:
列表[
可选[
张量]],
梯度缩放:
可选[
张量
]
找到无穷大:
可选[
张量
]
*,
权重衰减: float,
动量: float,
学习率: float,
衰减: float,
内斯特罗夫:
布尔,
最大化:
布尔,
有稀疏梯度:
布尔,
):
断言
梯度缩放
是
无
和
发现信息
是
无
for i, 参数
在
列举(
参数):
梯度 =
梯度[i]
如果
不
最大化
否则 -
梯度[i]
如果
权重衰减 != 0:
# 嵌套 if 是必要的,以绕过 jitscript 规则
如果 isinstance(
权重衰减,
张量):
如果
权重衰减.
需要梯度:
# 通常这是可微路径,这就是为什么需要 param.clone()的原因
梯度 =
研究生.addcmul_(
参数.
克隆(),
权重衰减)
else:
梯度 =
研究生.
添加(
参数,
阿尔法=
权重衰减)
else:
梯度 =
研究生.
添加(
参数,
阿尔法=
权重衰减)
如果
动量 != 0:
缓冲区 =
动量缓冲列表[i]
如果
缓冲区
是
无:
缓冲区 = torch.
克隆(
研究生).detach()
动量缓冲列表[i] =
缓冲区
else:
缓冲区.mul_(
动量).
加_(
研究生,
阿尔法=1 -
阻尼)
如果
内斯特罗夫:
梯度 =
研究生.
添加(
缓冲区,
阿尔法=
动量)
else:
梯度 =
缓冲区
# 嵌套 if 是必要的,以绕过 jitscript 规则
如果 isinstance(
学习率,
张量):
如果
学习率.
需要梯度:
参数.addcmul_(
研究生,
学习率,
值=-1)
else:
参数.
加_(
研究生,
阿尔法=-
学习率)
else:
参数.
加_(
研究生,
阿尔法=-
学习率)
定义
多张量随机梯度下降(
参数:
列表[
张量
]
梯度:
列表[
张量
]
动量缓冲区列表:
列表[
可选[
张量]],
梯度缩放:
可选[
张量
]
找到无穷大:
可选[
张量
]
*,
权重衰减: float,
动量: float,
学习率: float,
衰减: float,
内斯特罗夫:
布尔,
最大化:
布尔,
有稀疏梯度:
布尔,
):
断言
梯度缩放
是
无
和
发现信息
是
无
如果
长度(
参数) == 0:
返回
分组张量 =
优化器.
按设备类型和数据类型分组张量(
[参数,
梯度,
动量缓冲区列表
]
带索引=
真实 # type: ignore[list-item]
)
for (
设备参数_,
device_grads_,
设备动量缓冲区列表,
), 索引
在
分组张量.
值():
设备参数:
列表[
张量] =
角色(
列表[
张量
]
设备参数_)
设备梯度:
列表[
张量] =
角色(
列表[
张量
] device_grads_)
设备是否有稀疏梯度 =
有稀疏梯度
和
任何(
研究生.is_sparse for
梯度
在
设备梯度
)
如果
最大化:
设备梯度 = torch._foreach_neg(
设备梯度)
# 类型:忽略[赋值]
如果
权重衰减 != 0:
重新使用为最大化分配的中间内存(device_grads)
如果
最大化:
torch._foreach_add_(设备梯度,
设备参数,
阿尔法=
权重衰减)
else:
设备梯度 = torch.
_foreach_add_(
# 类型:忽略[赋值]
设备梯度,
设备参数,
阿尔法=
权重衰减
)
如果
动量 != 0:
缓冲区:
列表[
张量] =
输入文本为空,请提供需要翻译的文本
带动量缓冲区的所有状态 =
真实
for i 在
范围(
长度(
设备动量缓冲区列表)):
如果
设备动量缓冲区列表[i]
是
无:
带动量缓冲区的所有状态 =
假
断开
else:
缓冲区.append(
角色(
张量,
设备动量缓冲区列表[i]))
如果
所有具有动量缓冲区的状态:
torch._foreach_mul_(缓冲区,
动量)
torch._foreach_add_(缓冲区,
设备梯度,
阿尔法=1 -
阻尼)
else:
缓冲区 =
输入文本为空,请提供需要翻译的文本
for i 在
范围(
长度(
设备动量缓冲区列表)):
如果
设备动量缓冲区列表[i]
是
无:
缓冲区 =
设备动量缓冲区列表[i] =
动量缓冲区列表[
索引[i]
] = torch.克隆(
设备梯度[i]).detach()
else:
缓冲区 =
角色(
张量,
设备动量缓冲区列表[i])
缓冲区.mul_(
动量).
加_(
设备梯度[i
]
阿尔法=1 -
阻尼)
缓冲区.append(
缓冲区)
如果 nesterov:
torch._foreach_add_(设备梯度,
缓冲区,
阿尔法=
动量)
else:
设备梯度 =
缓冲区
如果
不
设备具有稀疏梯度:
# 处理内部 item()调用,如果 lr 是一个张量
如果 isinstance(
学习率, torch.
张量)
和 torch.
编译器.is_compiling():
梯度_x_lr = torch._foreach_mul(
设备梯度, -
学习率)
torch._foreach_add_(设备参数,
梯度_x_lr)
else:
torch._foreach_add_(设备参数,
设备梯度,
阿尔法=-
学习率)
else:
# foreach API 不支持稀疏
for i 在
范围(
长度(
设备参数)):
设备参数[i].
加_(
设备梯度[i
]
阿尔法=-
学习率)
定义
混合 SGD(
参数:
列表[
张量
]
梯度:
列表[
张量
]
动量缓冲区列表:
列表[
可选[
张量]],
梯度缩放:
可选[
张量
]
找到无穷大:
可选[
张量
]
*,
权重衰减: float,
动量: float,
学习率: float,
阻尼: float,
nesterov: 布尔,
最大化:
布尔,
有稀疏梯度:
布尔,
)
输入文本:
->
翻译:
-> 无:
如果
不
参数:
返回
如果
有稀疏梯度:
提升
运行时错误(
`_fused_sgd` 不支持稀疏梯度)
梯度缩放字典:
设备字典 = (
{梯度缩放.
设备:
梯度缩放}
如果
梯度缩放
是
不
无
否则 {}
)
发现信息字典:
设备字典 = (
{找到无穷大.
设备:
找到无穷大}
如果
发现信息
是
不
无
否则 {}
)
无动量缓冲区 =
动量 == 0
是否为第一步 = (
所有(t
是
无 for t
在
动量缓冲区列表)
和
不
无动量缓冲区
)
如果
是否为第一步:
for i, g 在
列举(
梯度):
动量缓冲区列表[i] = torch.
空值类似(g)
分组张量 =
优化器.
按设备类型和数据类型分组张量(
[参数,
梯度,
动量缓冲区列表
]
带索引=
假 # type: ignore[list-item]
)
for (设备, _), (
(设备参数_, device_grads_,
设备动量缓冲区列表),
_,
) 在
分组张量.
项目():
设备参数:
列表[
张量] =
角色(
列表[
张量
]
设备参数_)
设备梯度:
列表[
张量] =
角色(
列表[
张量
] device_grads_)
设备梯度学习率,
设备发现信息 =
无,
无
如果
梯度缩放
是
不
无:
设备梯度缩放 =
梯度缩放字典.setdefault(
设备,
梯度缩放.
到(
设备)
)
如果
找到无穷字典
是
不
无
和
发现信息
是
不
无:
设备发现信息 =
发现信息字典.setdefault(
设备,
找到无穷大.
到(
设备))
torch.混合 SGD(
设备参数,
设备梯度,
输入文本为空,请提供需要翻译的文本
如果
无动量缓冲区
否则
角色(
列表[
张量
]
设备动量缓冲区列表),
权重衰减=
权重衰减,
动量=
动量,
学习率=
学习率,
阻尼=
阻尼,
内斯特罗夫=
内斯特罗夫,
最大化=
最大化,
是第一步=
是第一步,
梯度缩放=
设备梯度学习率,
找到无穷大=
设备已找到,
)