使用PyTorch搭建模型
本笔记将记录PyTorch开发搭建模型的主要方法。区别于手动实现底层计算,PyTorch的模块化设计让开发深度学习架构非常容易,并于2026年已经成为事实上的标准开发方法。
模块化开发
如今深度学习的底层计算原理已经被封装在各类抽象库中,因此开发者们在开发时往往不再考虑单个人工神经元,而是从层的角度构思网络,并在设计时考虑更粗糙的块(Block)。
层与块 (Layers & Blocks)是代码组织的基石。为了处理成百上千层的网络,工程师引入了面向对象的思想。
- 层 (Layer) :最小计算单元(如
nn.Linear,nn.Conv2d)。它封装了权重 \(W\)、偏置 \(b\) 以及对应的数学算子。 - 块 (Block/Module) :容器。一个块可以包含多个层(如一个残差块 ResNet Block)。
块可以嵌套块,形成树状结构。当你调用 model.backward() 时,它是通过递归遍历这棵树来计算所有梯度的。块会自动识别并搜集它内部所有层的参数来进行参数管理。
Tensor
Tensor是纯数据数组,是PyTorch中最为基本的结构。
我们可以不使用Layer,而是直接用Tensor和函数来搭建模型(这在研究极度新颖的算法时很有用):
# 纯手写,不使用 nn.Linear Layer
import torch.nn.functional as F
weight = torch.randn(10, 5, requires_grad=True)
bias = torch.randn(5, requires_grad=True)
def manual_linear(x):
return F.linear(x, weight, bias) # 或者直接 x @ weight.t() + bias
Tensor也是Layer容器内部的数据实体,比如说nn.Linear是一个Layer,在其内部weight就是一个Tensor,其计算逻辑 x @ weight + bias 是由基础的矩阵乘法和加法算子组成的。
这里要注意
nn.Parameter
Parameter是被标记为需要训练的变量,换句话说,nn.Parameter就是要训练的参数。
对于一个要训练的参数,其本质也是一个tensor,但是如果我们直接用tensor记录,他就不会被optimizer更新:
self.W = torch.empty(input_dim, output_dim)
因此我们需要用 nn.Parameter来写:
self.W = nn.Parameter(torch.empty(input_dim, output_dim))
这里的dim指的是维度。
Layer
层(Layer)是PyTorch中的最基本结构,定义了神经网络中处理数据的原子单元。层接收输入,进行特定的数学运算,然后输出结果。
简单来说,一个层 = 参数 + 计算规则,一般是权重W和b,以及forward规则。
常见的抽象层包括:
- 全连接层 Linear/Dense
- 卷积层 Conv2d
- 激活函数 ReLU
- 归一化层 BatchNorm
我们可以使用预定义的层,也可以自定义层。
预定义好的层可以通过 torch.nn来快速创建(torch.nn几乎提供了所有标准的深度学习层):
import torch
import torch.nn as nn
linear_layer = nn.Linear(in_features=10, out_features=5)
relu_layer = nn.ReLU()
我们也可以自定义层:
class MyCustomLayer(nn.Module):
def __init__(self, size):
super().__init__()
# 初始化可学习的参数 (权重)
# nn.Parameter 会告诉 PyTorch:"这是一个需要被训练更新的变量"
self.weights = nn.Parameter(torch.randn(size, size))
def forward(self, x):
# 定义层的前向计算逻辑
return x @ self.weights + 1 # 比如:矩阵乘法后加1
自定义层属于较高端的操作,在我们熟悉一般开发方法后再看。
当我们调用 nn.Linear(10,5)这一行简单的代码的时候,PyTorch帮助我们封装管理了大量的细节,包括:
- 参数管理,weight, bias;自动初始化参数
- 计算逻辑,矩阵乘法、卷积运算、加法等
- 梯度追踪,autograd,自动维护计算图,反向传播的时候自动计算参数梯度等,不需要手写求导公式
- 状态管理,某些层如dropout或batchnorm在训练和预测时的行为不同,层内封装了.train(), .eval()等开关来自动切换行为
- 设备管理,如CPU和GPU计算,移动参数张量到GPU上等。
nn.Module
Block由多个层组合而成,相当于封装了多个Layer的复合结构,方便复用。ResNet中的残差块(Residual Block)、Transformer中的编码器块(Encoder Block)、Inception块等都可以轻松实现。
在PyToch中,层、块、整个Model本质上都是 nn.Module。
nn.Module是PyTorch中可组合的计算模块的标准格式。
Model
我们把Layer和Block组装起来,就构成了Model。
同样的,PyTorch预定义了 nn.Sequential,可以让我们实现简单的封装。比方说,我们拿上面Layer章节的例子来说明:
import torch
import torch.nn as nn
linear_layer = nn.Linear(in_features=10, out_features=5)
relu_layer = nn.ReLU()
我们可以直接用 nn.Sequential把上面定义好的 linear_layer和 relu_layer来封装到model中:
# 直接把实例化好的层放进去
model = nn.Sequential(
linear_layer,
relu_layer,
# 还可以继续加...
nn.Linear(5, 1)
)
# 现在的 model 就是一个可以直接使用的神经网络了
# input_data = torch.randn(1, 10)
# output = model(input_data)
我们当然也可以自己封装。一般来说,在论文代码和GitHub中,通过继承 nn.Module定义一个类,是最为常见的方法,比如:
class MyMLP(nn.Module):
def __init__(self):
super().__init__()
# 在这里定义你有多少个零件
self.hidden = linear_layer # 你之前创建的线性层
self.act = relu_layer # 你之前创建的激活层
self.output = nn.Linear(5, 1) # 假设最后输出一个值
def forward(self, x):
# 在这里定义数据流动的路径(拼装逻辑)
x = self.hidden(x)
x = self.act(x)
x = self.output(x)
return x
# 实例化模型
model = MyMLP()
这里的MyMLP类,或者说 nn.Sequential对象,既是Module,也是Model。从概念上来说,这里是Model。至此,我们便通过PyTorch构建好了一个深度学习模型。
MLP开发示例
我们来看一个完整的MLP示例。MLP是最简单的神经网络,我们用这个例子来熟悉PyTorch的用法。
Layer
首先我们要实现全连接层,也就是线性变换+激活函数,即:
这里要注意,虽然理论上Linear是一层、ReLU是一层,但是因为他们通常都作为一个整体出现,因此一般就叫做一个Dense Layer。
我们知道一个神经网络是由很多个隐藏层组合起来的:
x -> layer1 -> layer2 -> layer3 -> output
我们可以把层级结构abstraction为一个class,这样的话我们想组装什么样的层级结构的model就都可以了。其大致形状如下:
layer(inputs X):
z = XW + b
y = activation(z)
return y
假如我们要拼一个三层的MLP,那么拼法就是:
y1 = layer1(X)
y2 = layer2(y1)
y3 = layer3(y2)
return y3
我们可以把这个DenseLayer写出来:
class DenseLayer(nn.Module):
"""
自定义全连接层(Dense Layer)
数学形式:
z = XW + b
y = activation(z)
参数:
- W:形状 (input_dim, output_dim)
- b:形状 (output_dim,)
- activation:指定激活函数类型
"""
def __init__(self, input_dim, output_dim, activation):
super(DenseLayer, self).__init__()
# 权重 W:使用 Xavier 初始化,通常能让训练更稳定
self.W = nn.Parameter(torch.empty(input_dim, output_dim))
nn.init.xavier_uniform_(self.W)
# 偏置 b:初始化为 0
self.b = nn.Parameter(torch.zeros(output_dim))
# 记录该层使用的激活函数类型
self.activation = activation
def forward(self, inputs):
"""
前向传播:
1) 线性变换:inputs @ W + b
2) 激活函数:根据 activation 选择不同非线性
"""
z = inputs @ self.W + self.b
if self.activation == 'relu':
outputs = torch.relu(z)
elif self.activation == 'sigmoid':
outputs = torch.sigmoid(z)
elif self.activation == 'tanh':
outputs = torch.tanh(z)
elif self.activation == 'softmax':
# softmax 按类别维度归一化(dim=1)
outputs = torch.softmax(z, dim=1)
else:
# linear:不做激活,直接输出线性结果
outputs = z
return outputs
用的时候,假设我们要拼一个三层的MLP:
- 第一层:输入10维向量 → 输出32维向量 (relu)
- 第二层:32 → 16 (relu)
- 第三层:16 → 3 (分类输出)
我们就可以把3个DenseLayer串起来:
layer1 = DenseLayer(10, 32, "relu")
layer2 = DenseLayer(32, 16, "relu")
layer3 = DenseLayer(16, 3, "softmax") # 你现在的写法:输出概率
我们在用的时候,根据 nn.Module的规定,我们可以像函数一样直接调用,也就是说:Y = layer(X)等价于 Y = layer.forward(X)。假设我们现在有4条数据(batch_size = 4),每条数据10个特征,那么X就是一个 (4, 10)的矩阵。这个时候,由于第一层layer1输入向量维度是10,我们就可以把X输入进去,进行矩阵乘法 X@W。然后我们就可以得到一个batch_size = 4, output_dim = 32的数据。然后加上b后,进行ReLU激活,得到输出,进而继续导入到layer2中。
Model
我们把刚才写好的DenseLayer依次堆叠,即可得到MLP(也叫做Feedforward Network):
class Feedforward(nn.Module):
"""
前馈神经网络(MLP)
由若干个 DenseLayer 依次堆叠组成。
构建方式:
- 使用 depth 指定“连接层”的总数
- hidden_sizes 指定每个隐藏层的宽度(长度应为 depth-1)
- output_size 决定输出层维度(分类为类别数,回归通常为 1)
"""
def __init__(self, input_size, depth, hidden_sizes, output_size):
super(Feedforward, self).__init__()
# 结构合法性检查:隐藏层个数必须比 depth 少 1
if not (depth - len(hidden_sizes)) == 1:
raise Exception(
"The depth (%d) of the network should be 1 larger than `hidden_sizes` (%d)." %
(depth, len(hidden_sizes))
)
# sizes 用来描述每层的输入/输出维度序列
# 例如:input=10, hidden=[32,16], output=3 => sizes=[10,32,16,3]
sizes = [input_size] + hidden_sizes + [output_size]
layers = []
# 构建前 depth-1 层:作为隐藏层,这里统一用 ReLU 激活
for i in range(depth - 1):
layers.append(DenseLayer(sizes[i], sizes[i + 1], 'relu'))
# 构建最后一层:根据任务(output_size)决定激活
# - output_size == 1:回归任务,输出连续值,用 linear
# - output_size > 1:分类任务,用 softmax 输出各类别概率
if output_size == 1:
layers.append(DenseLayer(sizes[-2], sizes[-1], 'linear'))
else:
layers.append(DenseLayer(sizes[-2], sizes[-1], 'softmax'))
# 用 ModuleList 保存层,确保参数能被 PyTorch 正确注册
self.layers = nn.ModuleList(layers)
def forward(self, inputs):
"""
前向传播:把输入依次送入每一层
"""
outputs = inputs
for layer in self.layers:
outputs = layer(outputs)
return outputs
.
数据封装
.
class MyDataset(Dataset):
"""
用于把 (x, y) 数据封装成 PyTorch Dataset,方便 DataLoader 按 batch 读取。
- x:统一转换为 float32 Tensor
- y:分类任务转换为 long(类别索引),回归任务转换为 float32(连续值)
"""
def __init__(self, x, y, pr_type):
self.x = torch.tensor(x, dtype=torch.float32)
self.y = torch.tensor(y, dtype=(torch.long if pr_type == "classification"
else torch.float32))
def __len__(self):
# 返回数据集样本数
return self.x.size()[0]
def __getitem__(self, idx):
# 返回单个样本 (特征, 标签)
return self.x[idx], self.y[idx]
.
训练函数
.
def train(x_train, y_train, x_val, y_val, loss_type, model, num_train_epochs, batch_size, lr, weight_decay):
"""
训练函数:使用小批量训练优化模型,并在每个 epoch 后在验证集上评估与记录历史。
参数说明:
- x_train, y_train:训练数据与标签(numpy)
- x_val, y_val:验证数据与标签(numpy)
- loss_type:
'CrossEntropy' -> 分类任务
'SquaredError' -> 回归任务(这里用 MSELoss)
- model:待训练的 PyTorch 模型(Feedforward 实例或兼容模块)
- num_train_epochs:训练轮数
- batch_size:批大小
- lr:学习率
- weight_decay:权重衰减(L2 正则项)
"""
# 1) 判断任务类型(影响标签 dtype 与评估方式)
if loss_type == 'CrossEntropy':
pr_type = 'classification'
else:
pr_type = 'regression'
# 2) 构造训练/验证 Dataset
train_dataset = MyDataset(x_train, y_train, pr_type)
val_dataset = MyDataset(x_val, y_val, pr_type)
# 3) 构造训练 DataLoader(shuffle=True 打乱更利于泛化)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# 4) 构造优化器(Adam),并支持 weight_decay 正则
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
# 5) 选择损失函数
# 分类:CrossEntropyLoss
# 回归:MSELoss
if loss_type == 'CrossEntropy':
criterion = nn.CrossEntropyLoss()
else:
criterion = nn.MSELoss()
# 6) history:记录训练过程
# - loss:训练集平均损失
# - val_loss:验证集损失
# - accuracy:分类为准确率;回归这里用 val_loss(MSE)代替指标
history = {"loss": [], "val_loss": [], "accuracy": []}
# 7) 训练循环
for epoch in range(num_train_epochs):
# ---- 训练阶段 ----
model.train()
total_loss = 0
count = 0
for x_batch, y_batch in train_loader:
# 前向预测
pred = model(x_batch)
# 计算损失
loss = criterion(pred, y_batch)
# 反向传播与参数更新
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 统计本 epoch 的总损失(按样本数加权),用于算平均
total_loss += loss.item() * x_batch.size(0)
count += x_batch.size(0)
avg_train_loss = total_loss / count
# ---- 验证阶段 ----
model.eval()
with torch.no_grad():
# 在验证集上前向预测
val_pred = model(val_dataset.x)
# 计算验证损失
val_loss = criterion(val_pred, val_dataset.y).item()
# 计算指标:
# 分类:accuracy(用 argmax 得到类别预测)
# 回归:这里直接把 val_loss 当作 MSE 指标
if loss_type == 'CrossEntropy':
acc = accuracy_score(y_val, val_pred.argmax(dim=1).numpy())
else:
acc = val_loss
# 记录历史
history["loss"].append(avg_train_loss)
history["val_loss"].append(val_loss)
history["accuracy"].append(acc)
# 输出训练过程信息:第 1 个 epoch 与每 10 个 epoch 输出一次
if (epoch + 1) % 10 == 0 or epoch == 0:
if loss_type == 'CrossEntropy':
print(f"Epoch [{epoch+1}/{num_train_epochs}], "
f"Train Loss: {avg_train_loss:.4f}, "
f"Val Loss: {val_loss:.4f}, "
f"Val Accuracy: {acc:.4f}")
else:
print(f"Epoch [{epoch+1}/{num_train_epochs}], "
f"Train Loss: {avg_train_loss:.4f}, "
f"Val Loss: {val_loss:.4f}, "
f"Val MSE: {acc:.4f}")
# 返回训练好的模型与训练记录
return model, history
.
回归任务应用
.
# =========================
# Toy Regression: y = sin(1/x)
# =========================
import numpy as np
import torch
import matplotlib.pyplot as plt
# 从你实现的文件中导入训练函数与模型结构
from implementation import train, Feedforward
# 为可复现实验结果固定随机种子
torch.manual_seed(137)
np.random.seed(137)
# -------------------------
# 1) 构造数据:x 在 (0.05, 1.05) 附近,但更偏向小数(power=4 会让分布更偏小)
# -------------------------
def target_func(x):
"""目标函数:y = sin(1/x),x 越小振荡越强"""
return np.sin(1 / x)
# 训练集与验证集:各 2000 个点
x_train = np.power(np.random.random_sample([2000, 1]), 4) + 0.05
y_train = target_func(x_train)
x_val = np.power(np.random.random_sample([2000, 1]), 4) + 0.05
y_val = target_func(x_val)
# -------------------------
# 2) 可视化训练数据:看目标函数形态与采样点分布
# -------------------------
sort_ind = np.argsort(x_train[:, 0])
plt.figure()
plt.plot(x_train[sort_ind, 0], y_train[sort_ind, 0], label="target curve")
plt.plot(x_train[sort_ind, 0], y_train[sort_ind, 0], '.', alpha=0.5, label="train samples")
plt.title("Toy regression data: y = sin(1/x)")
plt.xlabel("x")
plt.ylabel("y")
plt.legend()
plt.show()
# -------------------------
# 3) 定义模型超参数并创建模型
# - input_size=1:因为 x 是一维
# - output_size=1:回归输出一个连续值
# - depth=4, hidden_sizes=[64,64,64]:3 个隐藏层 + 1 个输出层
# -------------------------
input_size = x_train.shape[-1]
output_size = 1
depth = 4
hidden_sizes = [64, 64, 64]
num_train_epochs = 2000
batch_size = 64
learning_rate = 0.01
weight_decay = 1e-5
model = Feedforward(input_size, depth, hidden_sizes, output_size)
# -------------------------
# 4) 训练模型(回归任务 -> SquaredError -> MSELoss)
# -------------------------
model, history = train(
x_train, y_train,
x_val, y_val,
loss_type="SquaredError",
model=model,
num_train_epochs=num_train_epochs,
batch_size=batch_size,
lr=learning_rate,
weight_decay=weight_decay
)
# 输出最终与最优验证集 MSE
print("final MSE: ", f"{history['val_loss'][-1]:.6f}")
print("best MSE: ", f"{min(history['val_loss']):.6f}")
# -------------------------
# 5) 绘制训练曲线:训练 loss 与验证 loss
# -------------------------
plt.figure()
plt.plot(history['loss'], label='train loss')
plt.plot(history['val_loss'], label='val loss')
plt.title('Training curve (MSE)')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()
# -------------------------
# 6) 保存模型并验证能否正确加载
# 作业要求:torch.save(model, 'sin_inv_x.sav') 并能 torch.load 回来
# -------------------------
torch.save(model, 'sin_inv_x.sav')
model = torch.load('sin_inv_x.sav', weights_only=False)
# -------------------------
# 7) 可视化模型在验证集上的拟合效果
# -------------------------
model.eval()
with torch.no_grad():
y_pred = model(torch.tensor(x_val, dtype=torch.float32))
plt.figure()
plt.plot(x_val[:, 0], y_pred.numpy()[:, 0], '.', alpha=0.5)
plt.title("Predictions on validation set")
plt.xlabel("x")
plt.ylabel("predicted y")
plt.show()
.
MNIST分类应用
.
# =========================
# MNIST Classification (Fully Connected NN)
# =========================
import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from torchvision import datasets as dts
from torchvision.transforms import ToTensor
from sklearn.model_selection import train_test_split
from math import sqrt, ceil
# 从你的实现中导入训练函数与前馈网络
from implementation import train, Feedforward
# -------------------------
# 1) 数据加载与展开
# MNIST 原图是 28x28,这里展开成 784 维向量
# -------------------------
def transform(x):
"""torchvision 会把图片转 tensor;这里额外 flatten 成 784 向量"""
return ToTensor()(x).flatten()
traindt = dts.MNIST(
root='data',
train=True,
transform=transform,
download=True
)
testdt = dts.MNIST(
root='data',
train=False,
transform=transform
)
# torchvision 的 data/targets 是 torch 张量,这里转成 numpy 方便后续 split
x_tr = traindt.data.numpy().reshape(-1, 28 * 28)
x_test = testdt.data.numpy().reshape(-1, 28 * 28)
y_tr = traindt.targets.numpy()
y_test = testdt.targets.numpy()
# 划分训练/验证集(按类别分层抽样,保证比例一致)
x_train, x_val, y_train, y_val = train_test_split(
x_tr, y_tr,
train_size=0.8,
stratify=y_tr,
random_state=137
)
print('Shape of training input: ', x_train.shape)
print('Shape of training labels: ', y_train.shape)
print('Shape of validation input: ', x_val.shape)
print('Shape of validation labels: ', y_val.shape)
print('Shape of test input: ', x_test.shape)
print('Shape of test labels: ', y_test.shape)
print('Number of channels: ', np.max(y_train) + 1)
print('Data range:', np.max(x_train), np.min(x_train))
# -------------------------
# 2) 归一化模块:把输入从 [0,255] -> [0,1] -> 标准化
# 注意:这里做的是“按 MNIST 全局均值/方差”的标准化
# -------------------------
MNIST_MEAN = 0.1307
MNIST_STD = 0.3081
class MNISTNormalizer(nn.Module):
"""可插入 nn.Sequential 的标准化层(把预处理写进模型结构里)"""
def __init__(self):
super().__init__()
def forward(self, x):
# 1) 缩放到 [0,1]
x = x / 255.0
# 2) 标准化
x = (x - MNIST_MEAN) / MNIST_STD
return x
mnist_normalizer = MNISTNormalizer()
# -------------------------
# 3) 搭建网络结构
# - input_size=784
# - output_size=10(0~9)
# - depth=3, hidden_sizes=[256,128]:2 个隐藏层 + 1 个输出层
# -------------------------
input_size = x_train.shape[-1]
output_size = 10
depth = 3
hidden_sizes = [256, 128]
ff_net = Feedforward(input_size, depth, hidden_sizes, output_size)
# 把预处理(normalizer)和网络(ff_net)串起来变成一个模型
# 好处:训练/推理时都自动做归一化,不容易忘记
model = nn.Sequential(mnist_normalizer, ff_net)
# -------------------------
# 4) 训练分类模型(CrossEntropy)
# -------------------------
model, history = train(
x_train, y_train,
x_val, y_val,
loss_type="CrossEntropy",
model=model,
batch_size=64,
num_train_epochs=50,
lr=0.001,
weight_decay=1e-4
)
# -------------------------
# 5) 绘制训练曲线:loss 与验证准确率
# -------------------------
plt.figure(figsize=(10, 8))
plt.subplot(2, 1, 1)
plt.plot(history['loss'], label='train loss')
plt.plot(history['val_loss'], label='val loss')
plt.legend()
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.subplot(2, 1, 2)
plt.plot(history['accuracy'], label='val accuracy')
plt.ylim(0.0, 1.0)
plt.xlabel('Epoch')
plt.ylabel('Classification accuracy')
plt.legend()
plt.show()
# -------------------------
# 6) 可视化第一层权重(观察是否学到“笔画结构”)
# 这里取 model[1] 是 Feedforward(因为 model[0] 是 normalizer)
# -------------------------
def visualize_grid(Xs, ubound=255.0, padding=1):
"""
Reshape a 4D tensor of image data to a grid for easy visualization.
Inputs:
- Xs: Data of shape (N, H, W)
- ubound: Output grid will have values scaled to the range [0, ubound]
- padding: The number of blank pixels between elements of the grid
"""
(N, H, W) = Xs.shape
#grid_size = int(ceil(sqrt(N)))
num_grid_h = 2
num_grid_w = int(ceil(N / 2))
grid_height = H * num_grid_h + padding * (num_grid_h - 1)
grid_width = W * num_grid_w + padding * (num_grid_w - 1)
grid = np.zeros((grid_height, grid_width))
next_idx = 0
y0, y1 = 0, H
for y in range(num_grid_h):
x0, x1 = 0, W
for x in range(num_grid_w):
if next_idx < N:
img = Xs[next_idx]
low, high = np.min(img), np.max(img)
grid[y0:y1, x0:x1] = ubound * (img - low) / (high - low)
# grid[y0:y1, x0:x1] = Xs[next_idx]
next_idx += 1
x0 += W + padding
x1 += W + padding
y0 += H + padding
y1 += H + padding
# grid_max = np.max(grid)
# grid_min = np.min(grid)
# grid = ubound * (grid - grid_min) / (grid_max - grid_min)
return grid
with torch.no_grad():
W1 = model[1].layers[0].W.numpy() # 第一层权重:形状 (784, 256)
W1 = W1.transpose() # 转成 (256, 784),每个神经元一张“权重图”
W1 = np.reshape(W1, [W1.shape[0], 28, 28])
plt.figure()
plt.imshow(visualize_grid(W1))
plt.title("First-layer weights visualization")
plt.axis('off')
plt.show()
# -------------------------
# 7) 保存模型并验证能否正确加载
# -------------------------
torch.save(model, 'mnist_cls.sav')
model = torch.load('mnist_cls.sav', weights_only=False)
# -------------------------
# 8) 在测试集上评估准确率
# -------------------------
model.eval()
with torch.no_grad():
y_pred = model(torch.tensor(x_test.astype(np.float32)))
acc = np.mean(y_test == np.argmax(y_pred.numpy(), axis=1))
print('The test accuracy is ', acc)
.
CNN开发示例
LeNet
LeNet-5是深度学习的开山鼻祖,其结构非常简单:
Input (32×32×1) —— 灰度图,不分RGB三色通道
↓
Conv C1 (6@5×5) —— 6个5x5卷积核,stride=1,输出28x28x6,提取低级特征
↓
AvgPool S2 —— 平均池化,用于降采样
↓
Conv C3 (16@5×5) —— 卷积层,16个5x5卷积核,输出10x10x16,提取复杂特征
↓
AvgPool S4 —— 2x2平均池化,用于降采样
↓
Conv C5 (120@5×5) —— 120个5x5的卷积核,相当于全连接,输出1x1x120
↓
FC F6 (84) —— 全连接层,84个神经元
↓
FC Output (10) —— 输出层,10个分类结果
我们用PyTorch可以快速搭建:
import torch
import torch.nn as nn
import torch.nn.functional as F
class LeNet5(nn.Module):
def __init__(self):
super(LeNet5, self).__init__()
# C1: 1 → 6, kernel=5
self.conv1 = nn.Conv2d(
in_channels=1,
out_channels=6,
kernel_size=5,
stride=1
)
# S2: AvgPool
self.pool1 = nn.AvgPool2d(kernel_size=2, stride=2)
# C3: 6 → 16
self.conv2 = nn.Conv2d(
in_channels=6,
out_channels=16,
kernel_size=5
)
# S4
self.pool2 = nn.AvgPool2d(2, 2)
# C5 (等价FC)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
# F6
self.fc2 = nn.Linear(120, 84)
# Output
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
# Input: (batch, 1, 32, 32)
x = self.conv1(x) # → (batch, 6, 28, 28)
x = torch.tanh(x)
x = self.pool1(x) # → (batch, 6, 14, 14)
x = self.conv2(x) # → (batch, 16, 10, 10)
x = torch.tanh(x)
x = self.pool2(x) # → (batch, 16, 5, 5)
x = x.view(x.size(0), -1) # flatten → (batch, 400)
x = self.fc1(x) # → 120
x = torch.tanh(x)
x = self.fc2(x) # → 84
x = torch.tanh(x)
x = self.fc3(x) # → 10
return x
也可以直接用nn.Sequential定义:
model = nn.Sequential(
nn.Conv2d(1,6,5),
nn.Tanh(),
nn.AvgPool2d(2),
nn.Conv2d(6,16,5),
nn.Tanh(),
nn.AvgPool2d(2),
nn.Flatten(),
nn.Linear(400,120),
nn.Tanh(),
nn.Linear(120,84),
nn.Tanh(),
nn.Linear(84,10)
)
最适合作业、面试的写法:
class LeNet5(nn.Module):
def __init__(self):
super().__init__()
self.features = nn.Sequential(
nn.Conv2d(1,6,5),
nn.Tanh(),
nn.AvgPool2d(2),
nn.Conv2d(6,16,5),
nn.Tanh(),
nn.AvgPool2d(2)
)
self.classifier = nn.Sequential(
nn.Flatten(),
nn.Linear(400,120),
nn.Tanh(),
nn.Linear(120,84),
nn.Tanh(),
nn.Linear(84,10)
)
def forward(self,x):
x = self.features(x)
x = self.classifier(x)
return x
上述写法最适合CNN,因为几乎所有的经典CNN都按照这个模式组织:
features → 提取特征(Conv + Pool)
classifier → 分类(FC)
。
AlexNet
VGG
ResNet
以CIPHAR-10作为数据集,介绍一下ResNet的实现与训练等。
首先我们加载核心PyTorch组件:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
import pandas as pd
然后加载数据集:
from torch.utils.data import Subset
# === 数据增强(Data Augmentation)===
# 注意:增强只能加在训练集的 transform 里,不能加在 val/test 上。
# 原因:val 和 test 需要稳定、可复现的评估结果;
# 如果对它们随机裁剪/翻转,每次评估结果都不一样,失去了参考意义。
# 因此需要为训练集和评估集定义两套不同的 transform。
# 训练集 transform:加入随机增强,人为扩充训练数据的多样性
train_transform = transforms.Compose([
# RandomCrop: 先在图片四周各填充 4 个像素(padding=4),再随机裁剪回 32x32
# 模拟图片平移,让模型学会识别不同位置的目标
transforms.RandomCrop(32, padding=4),
# RandomHorizontalFlip: 以 50% 概率随机水平翻转图片
# 对 CIFAR-10 的大多数类别(车、鸟、船等)来说,翻转后语义不变
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
# 评估集 transform:只做必要的归一化,不做任何随机变换
eval_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
# 加载两份原始数据集(数据文件相同,只是 transform 不同)
# full_trainset 用于探查 .classes 等属性
full_trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
download=True, transform=train_transform)
full_trainset_eval = torchvision.datasets.CIFAR10(root='./data', train=True,
download=True, transform=eval_transform)
# 用固定随机种子切分索引,保证 train/val 每次划分一致
val_size = 5000
train_size = len(full_trainset) - val_size
generator = torch.Generator().manual_seed(42) # 固定种子,结果可复现
indices = torch.randperm(len(full_trainset), generator=generator).tolist()
# train split:使用带增强的 full_trainset
# val split:使用不带增强的 full_trainset_eval(相同索引,不同 transform)
trainset = Subset(full_trainset, indices[val_size:])
valset = Subset(full_trainset_eval, indices[:val_size])
trainloader = DataLoader(trainset, batch_size=64, shuffle=True)
valloader = DataLoader(valset, batch_size=64, shuffle=False)
# 测试集:只用 eval_transform,不做增强
testset = torchvision.datasets.CIFAR10(root='./data', train=False,
download=True, transform=eval_transform)
testloader = DataLoader(testset, batch_size=64, shuffle=False)
检验数据集:
import numpy as np
import matplotlib.pyplot as plt
# full_trainset 保留了 .classes 属性,trainset/valset 是 Subset 没有该属性
classes = full_trainset.classes
# === 基本信息 ===
print(f"Full train set : {len(full_trainset)}")
print(f" -> Train split: {len(trainset)}")
print(f" -> Val split: {len(valset)}")
print(f"Test set : {len(testset)}")
# 取一张图看格式(从 full_trainset 取,Subset 索引机制不同)
sample_img, sample_label = full_trainset[0]
print(f"\nSingle image tensor shape : {sample_img.shape}")
print(f"Pixel range : [{sample_img.min():.3f}, {sample_img.max():.3f}]")
print(f"Label example : {sample_label} -> '{classes[sample_label]}'")
# === 查看一个 batch 的 shape ===
sample_batch_imgs, sample_batch_labels = next(iter(trainloader))
print(f"\nOne batch shape : {sample_batch_imgs.shape}")
print(f"Label shape : {sample_batch_labels.shape}")
# === 可视化每个类别的样例图片 ===
fig, axes = plt.subplots(2, 5, figsize=(12, 5))
shown = {i: False for i in range(10)}
for img, label in full_trainset:
label = int(label)
if not shown[label]:
ax = axes[label // 5][label % 5]
npimg = (img.numpy() * 0.5 + 0.5).transpose(1, 2, 0)
ax.imshow(np.clip(npimg, 0, 1))
ax.set_title(classes[label])
ax.axis('off')
shown[label] = True
if all(shown.values()):
break
plt.suptitle("CIFAR-10 Sample Images (32x32)", fontsize=13)
plt.tight_layout()
plt.show()
# === 各类别样本数量分布 ===
from collections import Counter
label_counts = Counter([label for _, label in full_trainset])
print("\nFull training set class distribution:")
for i, cls in enumerate(classes):
print(f" {cls:>10}: {label_counts[i]}")
接着我们便可以实现一个基本的ResNet:
import torch
from torch import nn
class BasicBlock(nn.Module):
"""
ResNet 的基本构建块(Basic Block)。
结构:Conv -> BN -> ReLU -> Conv -> BN -> (+shortcut) -> ReLU
核心思想:残差连接(skip connection)
让输出 H(x) = F(x) + x,模型只需学习残差 F(x) = H(x) - x。
这样梯度可以直接通过 shortcut 回传,解决了深层网络的梯度消失问题。
当 stride != 1 或通道数变化时,shortcut 需要用 1x1 Conv 匹配维度。
"""
def __init__(self, in_channels, out_channels, stride=1):
super().__init__()
# 主路径:两个 3x3 卷积,Conv -> BN -> ReLU -> Conv -> BN
self.conv1 = nn.Conv2d(in_channels, out_channels,
kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
self.conv2 = nn.Conv2d(out_channels, out_channels,
kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(out_channels)
# shortcut 路径:当 stride != 1 或通道数变化时,用 1x1 Conv 对齐维度
self.shortcut = nn.Sequential()
if stride != 1 or in_channels != out_channels:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, out_channels,
kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(out_channels)
)
def forward(self, x):
out = self.relu(self.bn1(self.conv1(x))) # Conv -> BN -> ReLU
out = self.bn2(self.conv2(out)) # Conv -> BN
out += self.shortcut(x) # 残差加法:F(x) + x
out = self.relu(out)
return out
class ResNet18(nn.Module):
"""
ResNet-18 for CIFAR-10.
结构:
stem: Conv3x3(3->64, stride=1) + BN + ReLU [不降采样,保留 32x32]
layer1: 2x BasicBlock(64->64, stride=1) [32x32]
layer2: 2x BasicBlock(64->128, stride=2) [16x16]
layer3: 2x BasicBlock(128->256, stride=2) [ 8x8]
layer4: 2x BasicBlock(256->512, stride=2) [ 4x4]
head: AdaptiveAvgPool2d(1) + Flatten + Linear(512->10)
"""
def __init__(self, num_classes=10):
super().__init__()
# stem: 用 3x3 conv(stride=1) 替代原版 7x7 conv(stride=2),适配 32x32 输入
self.stem = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True)
)
self.layer1 = self._make_layer(64, 64, num_blocks=2, stride=1)
self.layer2 = self._make_layer(64, 128, num_blocks=2, stride=2)
self.layer3 = self._make_layer(128, 256, num_blocks=2, stride=2)
self.layer4 = self._make_layer(256, 512, num_blocks=2, stride=2)
# 全局平均池化后 flatten,接一个全连接层输出 10 类
self.head = nn.Sequential(
nn.AdaptiveAvgPool2d(1), # 任意空间尺寸 -> 1x1
nn.Flatten(),
nn.Linear(512, num_classes)
)
def _make_layer(self, in_channels, out_channels, num_blocks, stride):
# 第一个 block 负责下采样(stride),后续 block stride=1
layers = [BasicBlock(in_channels, out_channels, stride)]
for _ in range(1, num_blocks):
layers.append(BasicBlock(out_channels, out_channels, stride=1))
return nn.Sequential(*layers)
def forward(self, x):
x = self.stem(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.head(x)
return x
net = ResNet18(num_classes=10)
print(net)
# 验证 forward pass 维度正确
dummy = torch.zeros(1, 3, 32, 32)
print(f"\nOutput shape: {net(dummy).shape}") # 应为 torch.Size([1, 10])
训练:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
## Code Starting Here
# The training code references the training code I used in CS130 and CS137.
# 首先将数据集分成若干份,其数量=总数据集数量/批次大小.
# 我们设置了batch_size = 64,这个的意思就是每次喂给模型64张图片。
# iterations = len(trainset) / 64 = 781
# 这个长度的意思是每一个epoch我们要跑781个batch,才能把整个数据集跑完
# 一次iteration,就是跑完了一个batch,也就是看了64张图片。
# 一次epoch,就是跑完了所有的图片,也就是跑完781次iteration。
model = net.to(device)
criterion = nn.CrossEntropyLoss()
# 优化器:SGD + Momentum + Weight Decay(ResNet 原论文配置,泛化性优于 AdamW)
# - lr=0.1:SGD 的初始学习率通常比 Adam 大得多
# - momentum=0.9:动量,让梯度更新方向带有"惯性",有助于冲出局部最优、加速收敛
# - weight_decay=5e-4:L2 正则化系数,让权重保持小值,防止过拟合
optimizer = torch.optim.SGD(model.parameters(),
lr=0.1,
momentum=0.9,
weight_decay=5e-4)
# 学习率调度器:MultiStepLR
# 在指定的 epoch 到达时,将学习率乘以 gamma(即缩小为原来的 1/10)
# 训练前期用大 lr 快速下降,后期用小 lr 精细收敛:
# epoch 1-59: lr = 0.1
# epoch 60-79: lr = 0.01
# epoch 80+: lr = 0.001
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[60, 80], gamma=0.1)
import os
os.makedirs('./resnet_checkpoints', exist_ok=True)
train_losses = []
test_accuracies = []
best_acc = 0.0
# 100 epoch:配合 MultiStepLR 在第 60、80 轮衰减学习率
num_epochs = 100
for epoch in range(num_epochs):
# 使用nn.Module类,切换.train()开关
# 这里的.train()是开关,主要是dropout和batchNorm这种特殊层在训练和测试时动作不一样
# dropout:评估时不随机关闭神经元
# BatchNorm,停止计算,这个后面再说,这里暂时不问
model.train()
# running loss是用来手动记录训练误差的变量
running_loss = 0.0
# for image, label in trainloader:
for images, labels in trainloader:
images, labels = images.to(device), labels.to(device)
# 清除上一个batch留下的梯度,因为PyTorch默认梯度是累积的。
optimizer.zero_grad()
# 前向传播,得到预测结果
outputs = model(images)
# 对比预测结果和实际结果,算出loss(交叉熵损失函数)
loss = criterion(outputs, labels)
# 反向传播,算出每一层的梯度。
loss.backward()
# 更新参数
optimizer.step()
# 在PyTorch中,loss是一个带有计算图的tensor,直接累加是不合适的
# .item()可以把tsnor中具体的数字抠出来,变成轻量级的python浮点数
running_loss += loss.item()
avg_loss = running_loss / len(trainloader)
train_losses.append(avg_loss)
# 在验证集上评估(val set 从训练集切出,用于训练过程监控)
model.eval()
correct = 0
total = 0
with torch.no_grad():
for images, labels in valloader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
val_acc = 100 * correct / total
test_accuracies.append(val_acc)
# scheduler.step() 必须在每个 epoch 结束后调用,触发学习率按计划衰减
scheduler.step()
current_lr = scheduler.get_last_lr()[0]
print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {avg_loss:.4f} Val Acc: {val_acc:.2f}% LR: {current_lr:.5f}")
# 每次验证集准确率创新高时,覆盖保存 best_model(用于部署)
if val_acc > best_acc:
best_acc = val_acc
torch.save(model.state_dict(), './best_model_resnet.pth')
print(f" -> New best val accuracy: {best_acc:.2f}%, best model saved.")
# 每5轮保存一次checkpoint(包含 scheduler 状态,方便从中断处恢复训练)
if (epoch + 1) % 5 == 0:
torch.save({
'epoch': epoch + 1,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'scheduler_state_dict': scheduler.state_dict(),
'train_losses': train_losses,
'test_accuracies': test_accuracies,
}, f'./resnet_checkpoints/checkpoint_epoch{epoch+1}.pth')
print(f" -> Checkpoint saved at epoch {epoch+1}")
# 跑完后保存 final 权重
torch.save(model.state_dict(), './resnet_final_weights.pth')
print("Training complete. Final weights saved to resnet_final_weights.pth")
测试评估:
# Final evaluation on test set (with data augmentation)
model.eval()
correct = 0
total = 0
with torch.no_grad():
for images, labels in testloader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
test_acc = 100 * correct / total
print(f"Final Test Accuracy (ResNet-18 on CIFAR-10): {test_acc:.2f}%")
print(f"Best Val Accuracy during training: {best_acc:.2f}%")
得到结果:
Final Test Accuracy (ResNet-18 on CIFAR-10): 93.69%
Best Val Accuracy during training: 94.46%
我们也一般会查看training loss and test accuracy over epochs:
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 5))
# plot loss
plt.subplot(1, 2, 1)
plt.plot(train_losses, marker='o', label="Training Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training Loss over Epochs")
plt.legend()
# plot accuracy
plt.subplot(1, 2, 2)
plt.plot(test_accuracies, marker='o', color='orange', label="Test Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy (%)")
plt.title("Test Accuracy over Epochs")
plt.legend()
plt.tight_layout()
plt.show()
这些训练日志也可以保存下来,留给未来分析用:
torch.save(model.state_dict(), './model_architecture_1.pth')
## Save your log file and upload to Canvas
# Your Code Here
# csv log
import pandas as pd
df = pd.DataFrame({
'epoch': range(1, len(train_losses) + 1),
'train_loss': train_losses,
'val_acc': test_accuracies
})
df.to_csv('./training_log_model1.csv', index=False)
# json log
import json
log = {'train_losses': train_losses, 'val_accuracies': test_accuracies}
with open('./training_log_model1.json', 'w') as f:
json.dump(log, f)
###
。
自定义CNN模型
我们可以像搭积木那样搭建一个CNN模型。
比如说,我们设计了如下架构:
Input → Conv Layer 1 → ReLU → Max Pooling →
→ Conv Layer 2 → ReLU → Max Pooling →
→ Conv Layer 3 → ReLU → Max Pooling →
→ Flatten → FC1 → ReLU → FC2 → ReLU → Output
我们就可以用PyTorch直接像搭积木一样把他搭建起来:
# ============================================================
# 知识点:nn.Conv2d(in_channels, out_channels, kernel_size, padding)
# - in_channels : 输入通道数
# 第一层:CIFAR-10 是 RGB 图像,固定为 3
# 之后每层:必须等于上一层的 out_channels(维度必须对齐)
# - out_channels: 该层输出的特征图数量,是超参数,你自己决定
# 常见选择:32, 64, 128 …… 通常越深通道数越多
# - kernel_size=3 : 3×3 卷积核,CIFAR-10 常用
# - padding=1 : kernel_size=3 时加 padding=1 可保持空间尺寸不变
#
# 知识点:nn.MaxPool2d(kernel_size)
# - kernel_size=2 : 2×2 窗口,每次将空间尺寸减半
# - CIFAR-10 输入 32×32,经过 3 次 MaxPool2d(2):32 → 16 → 8 → 4
#
# 知识点:nn.Linear(in_features, out_features)
# - in_features : Flatten 后的向量长度 = 最后一层 Conv 的 out_channels × 4 × 4
# 因为 32×32 经过 3 次 /2 后剩 4×4,与通道数相乘就是展平后的维度
# - out_features : FC2 的输出固定为 10(CIFAR-10 有 10 个类别)
#
# 知识点:forward 方法签名
# def forward(self, x): ← self 和 x 是两个独立参数,中间用逗号,不是 self.x
#
# 要求的架构流程(来自作业说明):
# Input → Conv1 → ReLU → MaxPool
# → Conv2 → ReLU → MaxPool
# → Conv3 → ReLU → MaxPool
# → Flatten → FC1 → ReLU → FC2 → ReLU → Output (10类)
# ============================================================
class MyCNN(nn.Module):
def __init__(self):
super(MyCNN, self).__init__()
# Your Code Here
# --- 卷积部分:3 个 Conv→ReLU→MaxPool 块 ---
self.features = nn.Sequential(
# Block 1
nn.Conv2d(3, 64, kernel_size=3, padding=1), # in=3(RGB固定), out=? ← 你来填
nn.ReLU(),
nn.MaxPool2d(2), # kernel_size=? ← 填写池化窗口大小
# Block 2
# 注意:in_channels 必须等于上一层的 out_channels
nn.Conv2d(64, 128, kernel_size=3, padding=1), # in=? out=? ← 你来填
nn.ReLU(),
nn.MaxPool2d(2),
# Block 3
nn.Conv2d(128, 256, kernel_size=3, padding=1), # in=? out=? ← 你来填
nn.ReLU(),
nn.MaxPool2d(2),
)
# --- 全连接部分:Flatten 在 forward 中做,这里只放 Linear 层 ---
# FC1 的 in_features = Conv3 的 out_channels × 4 × 4
# (32×32 经过 3 次 MaxPool(2) 后空间尺寸变为 4×4)
self.classifier = nn.Sequential(
# 卷积层flatten后得到的向量包含了非常丰富的空间特征
# 但是里头有很多冗余的信息
# 我们用FC层把这些特征压缩提炼:
# 4096维(底层空间特征)→ 512维(中层语义特征)→ 128维(高层抽象特征)→ 10维(类别分数)
nn.Flatten(), # 这里把256展平为4096
nn.Linear(4096, 512), # in=? (Conv3_out * 4 * 4), out=? ← 你来填
nn.ReLU(),
nn.Linear(512, 128), # in=? (上一层的 out), out=10(固定,10类)
nn.ReLU(),
nn.Linear(128, 10)
)
def forward(self, x): # 注意:(self, x) 不是 (self.x)
x = self.features(x) # 经过 3 个卷积块,shape: (B, C3, 4, 4)
x = self.classifier(x) # 经过全连接层,shape: (B, 10)
return x
###
model = MyCNN()
print(model)
####