备注
点击此处下载完整示例代码
使用 Ray Tune 进行超参数调优 ¶
创建于:2025 年 4 月 1 日 | 最后更新:2025 年 4 月 1 日 | 最后验证:2024 年 11 月 5 日
超参数调优可以使一个普通模型与一个高度准确的模型之间产生差异。通常,像选择不同的学习率或更改网络层大小这样的简单事情,可以对您的模型性能产生重大影响。
幸运的是,有一些工具可以帮助找到最佳参数组合。Ray Tune 是分布式超参数调优的行业标准工具。Ray Tune 包括最新的超参数搜索算法,与各种分析库集成,并原生支持通过 Ray 的分布式机器学习引擎进行分布式训练。
在本教程中,我们将向您展示如何将 Ray Tune 集成到您的 PyTorch 训练流程中。我们将从 PyTorch 文档中关于训练 CIFAR10 图像分类器的教程扩展本教程。
正如您将看到的,我们只需要进行一些微小的修改。特别是,我们需要
将数据加载和训练封装在函数中,
使一些网络参数可配置,
添加检查点(可选),
并定义模型调优的搜索空间
运行本教程之前,请确保已安装以下包:
ray[tune]
: 分布式超参数调优库torchvision
: 用于数据转换器
设置 / 导入
让我们从导入开始:
from functools import partial
import os
import tempfile
from pathlib import Path
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import random_split
import torchvision
import torchvision.transforms as transforms
from ray import tune
from ray import train
from ray.train import Checkpoint, get_checkpoint
from ray.tune.schedulers import ASHAScheduler
import ray.cloudpickle as pickle
大多数导入是为了构建 PyTorch 模型所需的。只有最后的导入是为了 Ray Tune。
数据加载器
我们将数据加载器封装在自己的函数中,并传递一个全局数据目录。这样我们就可以在不同试验之间共享数据目录。
def load_data(data_dir="./data"):
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)
trainset = torchvision.datasets.CIFAR10(
root=data_dir, train=True, download=True, transform=transform
)
testset = torchvision.datasets.CIFAR10(
root=data_dir, train=False, download=True, transform=transform
)
return trainset, testset
可配置的神经网络
我们只能调整那些可配置的参数。在这个例子中,我们可以指定全连接层的层大小:
class Net(nn.Module):
def __init__(self, l1=120, l2=84):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, l1)
self.fc2 = nn.Linear(l1, l2)
self.fc3 = nn.Linear(l2, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = torch.flatten(x, 1) # flatten all dimensions except batch
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
训练函数
现在变得有趣了,因为我们将对 PyTorch 文档中的示例进行一些修改。
我们将训练脚本封装在一个函数中 train_cifar(config, data_dir=None)
。 config
参数将接收我们想要训练的超参数。 data_dir
指定了我们加载数据和存储数据的目录,以便多个运行可以共享相同的数据源。我们还在运行开始时加载模型和优化器状态,如果提供了检查点。在教程的后面部分,您将找到有关如何保存检查点和它用法的说明。
net = Net(config["l1"], config["l2"])
checkpoint = get_checkpoint()
if checkpoint:
with checkpoint.as_directory() as checkpoint_dir:
data_path = Path(checkpoint_dir) / "data.pkl"
with open(data_path, "rb") as fp:
checkpoint_state = pickle.load(fp)
start_epoch = checkpoint_state["epoch"]
net.load_state_dict(checkpoint_state["net_state_dict"])
optimizer.load_state_dict(checkpoint_state["optimizer_state_dict"])
else:
start_epoch = 0
优化器的学习率也设置为可配置的:
optimizer = optim.SGD(net.parameters(), lr=config["lr"], momentum=0.9)
我们还将训练数据分为训练集和验证集。因此,我们在 80%的数据上训练,并在剩余的 20%上计算验证损失。我们迭代训练集和测试集的批量大小也是可配置的。
添加(多)GPU 支持使用 DataParallel
图像分类在很大程度上受益于 GPU。幸运的是,我们可以在 Ray Tune 中继续使用 PyTorch 的抽象。因此,我们可以将我们的模型包装在 nn.DataParallel
中以支持在多个 GPU 上的数据并行训练:
device = "cpu"
if torch.cuda.is_available():
device = "cuda:0"
if torch.cuda.device_count() > 1:
net = nn.DataParallel(net)
net.to(device)
通过使用 device
变量,我们确保在没有 GPU 可用的情况下也能进行训练。PyTorch 要求我们显式地将我们的数据发送到 GPU 内存,如下所示:
for i, data in enumerate(trainloader, 0):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
现在的代码支持在 CPU 上、单个 GPU 上以及多个 GPU 上进行训练。值得注意的是,Ray 还支持分数 GPU,这样我们可以在试验之间共享 GPU,只要模型仍然适合 GPU 内存。我们稍后再来讨论这个问题。
与 Ray Tune 通信
最有趣的部分是与 Ray Tune 的通信:
checkpoint_data = {
"epoch": epoch,
"net_state_dict": net.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
}
with tempfile.TemporaryDirectory() as checkpoint_dir:
data_path = Path(checkpoint_dir) / "data.pkl"
with open(data_path, "wb") as fp:
pickle.dump(checkpoint_data, fp)
checkpoint = Checkpoint.from_directory(checkpoint_dir)
train.report(
{"loss": val_loss / val_steps, "accuracy": correct / total},
checkpoint=checkpoint,
)
在这里,我们首先保存一个检查点,然后将一些指标反馈给 Ray Tune。具体来说,我们将验证损失和准确率发送回 Ray Tune。Ray Tune 可以使用这些指标来决定哪个超参数配置导致了最佳结果。这些指标还可以用来在早期停止表现不佳的试验,以避免在这些试验上浪费资源。
检查点保存是可选的,但是如果我们想使用像基于群体的训练这样的高级调度器,则是必要的。此外,通过保存检查点,我们可以在以后加载训练好的模型并在测试集上验证它们。最后,保存检查点对于容错很有用,它允许我们在中断训练后继续训练。
全部训练功能 ¶
完整的代码示例看起来是这样的:
def train_cifar(config, data_dir=None):
net = Net(config["l1"], config["l2"])
device = "cpu"
if torch.cuda.is_available():
device = "cuda:0"
if torch.cuda.device_count() > 1:
net = nn.DataParallel(net)
net.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=config["lr"], momentum=0.9)
checkpoint = get_checkpoint()
if checkpoint:
with checkpoint.as_directory() as checkpoint_dir:
data_path = Path(checkpoint_dir) / "data.pkl"
with open(data_path, "rb") as fp:
checkpoint_state = pickle.load(fp)
start_epoch = checkpoint_state["epoch"]
net.load_state_dict(checkpoint_state["net_state_dict"])
optimizer.load_state_dict(checkpoint_state["optimizer_state_dict"])
else:
start_epoch = 0
trainset, testset = load_data(data_dir)
test_abs = int(len(trainset) * 0.8)
train_subset, val_subset = random_split(
trainset, [test_abs, len(trainset) - test_abs]
)
trainloader = torch.utils.data.DataLoader(
train_subset, batch_size=int(config["batch_size"]), shuffle=True, num_workers=8
)
valloader = torch.utils.data.DataLoader(
val_subset, batch_size=int(config["batch_size"]), shuffle=True, num_workers=8
)
for epoch in range(start_epoch, 10): # loop over the dataset multiple times
running_loss = 0.0
epoch_steps = 0
for i, data in enumerate(trainloader, 0):
# get the inputs; data is a list of [inputs, labels]
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# print statistics
running_loss += loss.item()
epoch_steps += 1
if i % 2000 == 1999: # print every 2000 mini-batches
print(
"[%d, %5d] loss: %.3f"
% (epoch + 1, i + 1, running_loss / epoch_steps)
)
running_loss = 0.0
# Validation loss
val_loss = 0.0
val_steps = 0
total = 0
correct = 0
for i, data in enumerate(valloader, 0):
with torch.no_grad():
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
outputs = net(inputs)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
loss = criterion(outputs, labels)
val_loss += loss.cpu().numpy()
val_steps += 1
checkpoint_data = {
"epoch": epoch,
"net_state_dict": net.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
}
with tempfile.TemporaryDirectory() as checkpoint_dir:
data_path = Path(checkpoint_dir) / "data.pkl"
with open(data_path, "wb") as fp:
pickle.dump(checkpoint_data, fp)
checkpoint = Checkpoint.from_directory(checkpoint_dir)
train.report(
{"loss": val_loss / val_steps, "accuracy": correct / total},
checkpoint=checkpoint,
)
print("Finished Training")
如您所见,大部分代码直接改编自原始示例。
测试集准确率 ¶
通常,机器学习模型的性能会在一个未用于训练模型的保留测试集上进行测试。我们也将这个操作封装成一个函数:
def test_accuracy(net, device="cpu"):
trainset, testset = load_data()
testloader = torch.utils.data.DataLoader(
testset, batch_size=4, shuffle=False, num_workers=2
)
correct = 0
total = 0
with torch.no_grad():
for data in testloader:
images, labels = data
images, labels = images.to(device), labels.to(device)
outputs = net(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
return correct / total
函数还期望一个 device
参数,这样我们就可以在 GPU 上对测试集进行验证。
配置搜索空间
最后,我们需要定义 Ray Tune 的搜索空间。以下是一个示例:
config = {
"l1": tune.choice([2 ** i for i in range(9)]),
"l2": tune.choice([2 ** i for i in range(9)]),
"lr": tune.loguniform(1e-4, 1e-1),
"batch_size": tune.choice([2, 4, 8, 16])
}
该 tune.choice()
接受从其中均匀采样的值列表。在本例中, l1
和 l2
参数应该是介于 4 和 256 之间的 2 的幂,因此可以是 4、8、16、32、64、128 或 256。 lr
(学习率)应在 0.0001 和 0.1 之间均匀采样。最后,批处理大小可以选择 2、4、8 或 16。
在每次试验中,Ray Tune 现在将随机从这些搜索空间中采样参数组合。然后它将并行训练多个模型,并从中找到表现最好的一个。我们还使用了 ASHAScheduler
,这将提前终止表现不佳的试验。
我们将 train_cifar
函数包装在 functools.partial
中以设置常量 data_dir
参数。我们还可以告诉 Ray Tune 每个试验应可用的资源。
gpus_per_trial = 2
# ...
result = tune.run(
partial(train_cifar, data_dir=data_dir),
resources_per_trial={"cpu": 8, "gpu": gpus_per_trial},
config=config,
num_samples=num_samples,
scheduler=scheduler,
checkpoint_at_end=True)
您可以指定 CPU 的数量,然后它们将可用,例如,以增加 PyTorch DataLoader
实例的 num_workers
。选定的 GPU 数量在每个试验中都对 PyTorch 可见。试验无法访问未为其请求的 GPU - 因此您不必担心两个试验使用相同的资源集。
我们也可以指定分数 GPU,所以像 gpus_per_trial=0.5
这样的格式是完全有效的。试验将相互共享 GPU。你只需确保模型仍然适合 GPU 内存。
训练模型后,我们将找到表现最好的一个,并从检查点文件中加载训练好的网络。然后我们获取测试集准确率,并通过打印报告一切。
完整的主函数看起来是这样的:
def main(num_samples=10, max_num_epochs=10, gpus_per_trial=2):
data_dir = os.path.abspath("./data")
load_data(data_dir)
config = {
"l1": tune.choice([2**i for i in range(9)]),
"l2": tune.choice([2**i for i in range(9)]),
"lr": tune.loguniform(1e-4, 1e-1),
"batch_size": tune.choice([2, 4, 8, 16]),
}
scheduler = ASHAScheduler(
metric="loss",
mode="min",
max_t=max_num_epochs,
grace_period=1,
reduction_factor=2,
)
result = tune.run(
partial(train_cifar, data_dir=data_dir),
resources_per_trial={"cpu": 2, "gpu": gpus_per_trial},
config=config,
num_samples=num_samples,
scheduler=scheduler,
)
best_trial = result.get_best_trial("loss", "min", "last")
print(f"Best trial config: {best_trial.config}")
print(f"Best trial final validation loss: {best_trial.last_result['loss']}")
print(f"Best trial final validation accuracy: {best_trial.last_result['accuracy']}")
best_trained_model = Net(best_trial.config["l1"], best_trial.config["l2"])
device = "cpu"
if torch.cuda.is_available():
device = "cuda:0"
if gpus_per_trial > 1:
best_trained_model = nn.DataParallel(best_trained_model)
best_trained_model.to(device)
best_checkpoint = result.get_best_checkpoint(trial=best_trial, metric="accuracy", mode="max")
with best_checkpoint.as_directory() as checkpoint_dir:
data_path = Path(checkpoint_dir) / "data.pkl"
with open(data_path, "rb") as fp:
best_checkpoint_data = pickle.load(fp)
best_trained_model.load_state_dict(best_checkpoint_data["net_state_dict"])
test_acc = test_accuracy(best_trained_model, device)
print("Best trial test set accuracy: {}".format(test_acc))
if __name__ == "__main__":
# You can change the number of GPUs per trial here:
main(num_samples=10, max_num_epochs=10, gpus_per_trial=0)
如果你运行代码,一个示例输出可能看起来像这样:
Number of trials: 10/10 (10 TERMINATED)
+-----+--------------+------+------+-------------+--------+---------+------------+
| ... | batch_size | l1 | l2 | lr | iter | loss | accuracy |
|-----+--------------+------+------+-------------+--------+---------+------------|
| ... | 2 | 1 | 256 | 0.000668163 | 1 | 2.31479 | 0.0977 |
| ... | 4 | 64 | 8 | 0.0331514 | 1 | 2.31605 | 0.0983 |
| ... | 4 | 2 | 1 | 0.000150295 | 1 | 2.30755 | 0.1023 |
| ... | 16 | 32 | 32 | 0.0128248 | 10 | 1.66912 | 0.4391 |
| ... | 4 | 8 | 128 | 0.00464561 | 2 | 1.7316 | 0.3463 |
| ... | 8 | 256 | 8 | 0.00031556 | 1 | 2.19409 | 0.1736 |
| ... | 4 | 16 | 256 | 0.00574329 | 2 | 1.85679 | 0.3368 |
| ... | 8 | 2 | 2 | 0.00325652 | 1 | 2.30272 | 0.0984 |
| ... | 2 | 2 | 2 | 0.000342987 | 2 | 1.76044 | 0.292 |
| ... | 4 | 64 | 32 | 0.003734 | 8 | 1.53101 | 0.4761 |
+-----+--------------+------+------+-------------+--------+---------+------------+
Best trial config: {'l1': 64, 'l2': 32, 'lr': 0.0037339984519545164, 'batch_size': 4}
Best trial final validation loss: 1.5310075663924216
Best trial final validation accuracy: 0.4761
Best trial test set accuracy: 0.4737
大多数试验都提前停止,以避免浪费资源。表现最好的试验在验证集上达到了约 47%的准确率,这在测试集上也能得到证实。
好了,现在你可以调整你的 PyTorch 模型的参数了。
脚本总运行时间:(0 分钟 0.000 秒)