备注
点击此处下载完整示例代码
神经切线核 ¶
创建时间:2025 年 4 月 1 日 | 最后更新时间:2025 年 4 月 1 日 | 最后验证:未验证
神经切线核(NTK)是一种描述神经网络在训练过程中如何演化的核。近年来,围绕它进行了大量研究。本教程受 JAX 中 NTK 实现(见快速有限宽度神经切线核以获取详细信息)的启发,演示了如何使用 torch.func
、可组合函数转换轻松计算此量,适用于 PyTorch。
备注
本教程需要 PyTorch 2.0.0 或更高版本。
设置
首先,进行一些设置。让我们定义一个简单的 CNN,我们希望计算其 NTK。
import torch
import torch.nn as nn
from torch.func import functional_call, vmap, vjp, jvp, jacrev
device = 'cuda' if torch.cuda.device_count() > 0 else 'cpu'
class CNN(nn.Module):
def __init__(self):
super(CNN, self).__init__()
self.conv1 = nn.Conv2d(3, 32, (3, 3))
self.conv2 = nn.Conv2d(32, 32, (3, 3))
self.conv3 = nn.Conv2d(32, 32, (3, 3))
self.fc = nn.Linear(21632, 10)
def forward(self, x):
x = self.conv1(x)
x = x.relu()
x = self.conv2(x)
x = x.relu()
x = self.conv3(x)
x = x.flatten(1)
x = self.fc(x)
return x
生成一些随机数据吧
x_train = torch.randn(20, 3, 32, 32, device=device)
x_test = torch.randn(5, 3, 32, 32, device=device)
创建模型的函数版本 ¶
torch.func
转换操作在函数上执行。特别是为了计算 NTK,我们需要一个函数,该函数接受模型的参数和单个输入(而不是输入批次!)并返回单个输出。
我们将使用 torch.func.functional_call
,它允许我们使用不同的参数/缓冲区调用 nn.Module
,以帮助完成第一步。
请记住,该模型最初是为了接受一批输入数据点而编写的。在我们的 CNN 示例中,没有批间操作。也就是说,批中的每个数据点与其他数据点独立。基于这个假设,我们可以轻松地生成一个评估单个数据点的函数:
net = CNN().to(device)
# Detaching the parameters because we won't be calling Tensor.backward().
params = {k: v.detach() for k, v in net.named_parameters()}
def fnet_single(params, x):
return functional_call(net, params, (x.unsqueeze(0),)).squeeze(0)
计算 NTK:方法 1(雅可比收缩)
我们已经准备好计算经验 NTK。对于两个数据点 \(x_1\) 和 \(x_2\) 的经验 NTK 定义为模型在 \(x_1\) 处评估的雅可比矩阵与模型在 \(x_2\) 处评估的雅可比矩阵的矩阵乘积:
在批处理情况下,当 \(x_1\) 是数据点的批次且 \(x_2\) 是数据点的批次时,我们希望计算所有从 \(x_1\) 和 \(x_2\) 的数据点组合的雅可比矩阵的乘积。
第一种方法就是这样做——计算两个雅可比矩阵,并将它们收缩。下面是如何在批处理情况下计算 NTK 的方法:
def empirical_ntk_jacobian_contraction(fnet_single, params, x1, x2):
# Compute J(x1)
jac1 = vmap(jacrev(fnet_single), (None, 0))(params, x1)
jac1 = jac1.values()
jac1 = [j.flatten(2) for j in jac1]
# Compute J(x2)
jac2 = vmap(jacrev(fnet_single), (None, 0))(params, x2)
jac2 = jac2.values()
jac2 = [j.flatten(2) for j in jac2]
# Compute J(x1) @ J(x2).T
result = torch.stack([torch.einsum('Naf,Mbf->NMab', j1, j2) for j1, j2 in zip(jac1, jac2)])
result = result.sum(0)
return result
result = empirical_ntk_jacobian_contraction(fnet_single, params, x_train, x_test)
print(result.shape)
在某些情况下,你可能只想得到这个量的对角线或迹,特别是如果你事先知道网络架构会导致非对角元素可以近似为零的 NTK。很容易调整上述函数来实现这一点:
def empirical_ntk_jacobian_contraction(fnet_single, params, x1, x2, compute='full'):
# Compute J(x1)
jac1 = vmap(jacrev(fnet_single), (None, 0))(params, x1)
jac1 = jac1.values()
jac1 = [j.flatten(2) for j in jac1]
# Compute J(x2)
jac2 = vmap(jacrev(fnet_single), (None, 0))(params, x2)
jac2 = jac2.values()
jac2 = [j.flatten(2) for j in jac2]
# Compute J(x1) @ J(x2).T
einsum_expr = None
if compute == 'full':
einsum_expr = 'Naf,Mbf->NMab'
elif compute == 'trace':
einsum_expr = 'Naf,Maf->NM'
elif compute == 'diagonal':
einsum_expr = 'Naf,Maf->NMa'
else:
assert False
result = torch.stack([torch.einsum(einsum_expr, j1, j2) for j1, j2 in zip(jac1, jac2)])
result = result.sum(0)
return result
result = empirical_ntk_jacobian_contraction(fnet_single, params, x_train, x_test, 'trace')
print(result.shape)
这种方法的渐近时间复杂度是 \(N O [FP]\)(计算雅可比矩阵的时间)+ \(N^2 O^2 P\)(收缩雅可比矩阵的时间),其中 \(N\) 是 \(x_1\) 和 \(x_2\) 的批大小,\(O\) 是模型的输出大小,\(P\) 是总参数数,\([FP]\) 是模型单次前向传播的成本。有关详细信息,请参阅第 3.2 节“快速有限宽度神经切线核”。
计算 NTK:方法 2(NTK 向量积)¶
我们接下来要讨论的方法是使用 NTK 向量积来计算 NTK 的一种方式。
此方法将 NTK 重新表述为对单位矩阵 \(I_O\) 的列应用 NTK 向量积的堆栈(其中 \(O\) 是模型的输出大小):
\(e_o\)属于\(\mathbb{R}^O\)的列向量,是单位矩阵\(I_O\)的列向量。
设\(\textrm{vjp}_o = J_{net}^T(x_2) e_o\)。我们可以使用向量-雅可比乘积来计算这个值。
现在,考虑\(J_{net}(x_1) \textrm{vjp}_o\)。这是一个雅可比-向量乘积!
最后,我们可以使用
vmap
在\(I_O\)的所有列\(e_o\)上并行运行上述计算。
这表明我们可以使用反向模式 AD(用于计算向量-雅可比乘积)和正向模式 AD(用于计算雅可比-向量乘积)的组合来计算 NTK。
让我们来编写代码:
def empirical_ntk_ntk_vps(func, params, x1, x2, compute='full'):
def get_ntk(x1, x2):
def func_x1(params):
return func(params, x1)
def func_x2(params):
return func(params, x2)
output, vjp_fn = vjp(func_x1, params)
def get_ntk_slice(vec):
# This computes ``vec @ J(x2).T``
# `vec` is some unit vector (a single slice of the Identity matrix)
vjps = vjp_fn(vec)
# This computes ``J(X1) @ vjps``
_, jvps = jvp(func_x2, (params,), vjps)
return jvps
# Here's our identity matrix
basis = torch.eye(output.numel(), dtype=output.dtype, device=output.device).view(output.numel(), -1)
return vmap(get_ntk_slice)(basis)
# ``get_ntk(x1, x2)`` computes the NTK for a single data point x1, x2
# Since the x1, x2 inputs to ``empirical_ntk_ntk_vps`` are batched,
# we actually wish to compute the NTK between every pair of data points
# between {x1} and {x2}. That's what the ``vmaps`` here do.
result = vmap(vmap(get_ntk, (None, 0)), (0, None))(x1, x2)
if compute == 'full':
return result
if compute == 'trace':
return torch.einsum('NMKK->NM', result)
if compute == 'diagonal':
return torch.einsum('NMKK->NMK', result)
# Disable TensorFloat-32 for convolutions on Ampere+ GPUs to sacrifice performance in favor of accuracy
with torch.backends.cudnn.flags(allow_tf32=False):
result_from_jacobian_contraction = empirical_ntk_jacobian_contraction(fnet_single, params, x_test, x_train)
result_from_ntk_vps = empirical_ntk_ntk_vps(fnet_single, params, x_test, x_train)
assert torch.allclose(result_from_jacobian_contraction, result_from_ntk_vps, atol=1e-5)
我们对 empirical_ntk_ntk_vps
的代码看起来是直接从上面的数学公式翻译过来的!这展示了函数变换的强大功能:祝你好运,仅使用 torch.autograd.grad
来编写一个高效的版本吧。
该方法的渐近时间复杂度为\(N^2 O [FP]\),其中\(N\)是\(x_1\)和\(x_2\)的批量大小,\(O\)是模型的输出大小,\([FP]\)是模型单次正向传播的成本。因此,该方法比方法 1(雅可比收缩,\(N^2 O\)而不是\(N O\))执行更多的网络正向传播,但避免了收缩成本(没有\(N^2 O^2 P\)项,其中\(P\)是模型的总参数数)。因此,当\(O P\)相对于\([FP]\)较大时,例如具有许多输出\(O\)的完全连接(非卷积)模型,这种方法更可取。从内存角度来看,两种方法应该相当。有关详细信息,请参阅《快速有限宽度神经切线核》的第 3.3 节。
脚本总运行时间:(0 分钟 0.000 秒)