Pytorch深度学习【十四】
批量归一化
- 归一化
- 损失出现在最后,后面的层(高级语义层)训练较快
- 数据输入在最底部
- 底部的层训练慢
- 底部层一变化,所有高级语义层都得跟着变
- 最后的那些层需要重新学习多次—收敛速度变慢
- 问题—是否可以在学习底部层的时候避免变化顶部层
- 批量归一化
- 固定小批量里面每一层的均值和方差
μ B = 1 | B | ∑ i ∈ B x i a n d σ B 2 = 1 | B | ∑ i ∈ B ( x i − μ B ) 2 + ϵ \mu_B = \frac{1}{|B|}\sum_{i\in{B}}x_i \quad and \quad \sigma_B^2=\frac{1}{|B|}\sum_{i\in{B}}(x_i-\mu_B)^2+\epsilon μB=|B|1i∈B∑xiandσB2=|B|1i∈B∑(xi−μB)2+ϵ - 进行可学习参数的归一化
x i + 1 = γ x i − μ B σ B + β x_{i+1}=\gamma\frac{x_i-\mu_B}{\sigma_B}+\beta xi+1=γσBxi−μB+β - 意思是先进行小批量的均值方差计算,而后对每个输入数据进行处理
- 固定小批量里面每一层的均值和方差
- 批量归一化层
- 可以学习的参数为 γ β \gamma \quad \beta γβ
- 批量归一化层作用位置
- 全连接层和卷积层输出上,激活层之前
- 全连接和卷积层输入上
- 对全连接层,作用在特征维度上
- 对于卷积层,作用在通道维上
- 批量归一化的作用
- 加入噪音来控制模型复杂度
x i + 1 = γ x i − μ B σ B + β x_{i+1}=\gamma\frac{x_i-\mu_B}{\sigma_B}+\beta xi+1=γσBxi−μB+β - 没必要跟丢弃法混合使用
- 加入噪音来控制模型复杂度
- 总结
- 批量归一化固定小批量中的均值和方差,然后学习出适合的偏移和缩放
- 可以加速收敛速度,因此学习率可以比较大,但一般不改变模型精确度
- 代码实现
- 从零实现
def batch_norm(X, gamma, beta, moving_mean, moving_var, eps, momentum): if not torch.is_grad_enabled(): X_hat = (X - moving_mean) / torch.sqrt(moving_var + eps) else: assert len(X.shape) in (2, 4) if len(X.shape) == 2: mean = X.mean(dim=0) var = ((X - mean)**2).mean(dim=0) else: mean = X.mean(dim=(0, 2, 3), keepdim=True) var = ((X - mean)**2).mean(dim=(0, 2, 3), keepdim=True) X_hat = (X - mean) / torch.sqrt(var + eps) moving_mean = momentum * moving_mean + (1.0 - momentum) * mean moving_var = momentum * moving_var + (1.0 - momentum) * var Y = gamma * X_hat + beta return Y, moving_mean.data, moving_var.data class BatchNorm(nn.Module): def __init__(self, num_features, num_dims): super().__init__() if num_dims == 2: shape = (1, num_features) else: shape = (1, num_features, 1, 1) self.gamma = nn.Parameter(torch.ones(shape)) self.beta = nn.Parameter(torch.zeros(shape)) self.moving_mean = torch.zeros(shape) self.moving_var = torch.ones(shape) def forward(self, X): if self.moving_mean.device != X.device: self.moving_mean = self.moving_mean.to(X.device) self.moving_var = self.moving_var.to(X.device) Y, self.moving_mean, self.moving_var = batch_norm( X, self.gamma, self.beta, self.moving_mean, self.moving_var, eps=1e-5, momentum=0.9) return Y
- 快速实现
net = nn.Sequential( nn.Conv2d(1, 6, kernel_size=5), nn.BatchNorm2d(6), # 6就是上一层的输出通道个数 nn.Sigmoid(), nn.MaxPool2d(kernel_size=2, stride=2) )
- 从零实现
残差神经网络
-
加深神经网络一直会带来好处么
- 如果模型的训练方向有问题—模型反而变差
- 因此,我们要保证后续的模型包含前向模型,要保证学到本该学到的东西
-
残差块
- 串联一个层改变函数类,我们希望能扩大函数类
- 残差块加入快速通道(右边)来得到 f ( x ) = x + g ( x ) f(x)=x+g(x) f(x)=x+g(x)
- 解读—即使不通过g(x),我依然能直接跳过本层获得上层特征结果
-
残差网络解决的问题—梯度消失
- 每层梯度都是本层梯度加上层梯度,保证梯度不会太小导致模型参数更新出现问题
-
结构图
-
细节实现—引入1*1的目的—修改通道数
-
总结
- 残差块使得很深的网络更加容易训练
- 因为其包含了前向模型,可以逐步训练保证稳定性
- 残差网络对随后的深层神经网络设计产生了深远影响
- 残差块使得很深的网络更加容易训练
-
代码实现
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l
class Residual(nn.Module):
def __init__(self, input_channels, num_channels, use_1x1conv=False,
strides=1):
super().__init__()
self.conv1 = nn.Conv2d(input_channels, num_channels, kernel_size=3,
padding=1, stride=strides)
self.conv2 = nn.Conv2d(num_channels, num_channels, kernel_size=3,
padding=1)
if use_1x1conv:
self.conv3 = nn.Conv2d(input_channels, num_channels,
kernel_size=1, stride=strides)
else:
self.conv3 = None
self.bn1 = nn.BatchNorm2d(num_channels)
self.bn2 = nn.BatchNorm2d(num_channels)
self.relu = nn.ReLU(inplace=True)
def forward(self, X):
Y = F.relu(self.bn1(self.conv1(X)))
Y = self.bn2(self.conv2(Y))
if self.conv3:
X = self.conv3(X)
Y += X
return F.relu(Y)
b1 = nn.Sequential(nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
nn.BatchNorm2d(64), nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
def resnet_block(input_channels, num_channels, num_residuals,
first_block=False):
blk = []
for i in range(num_residuals):
if i == 0 and not first_block:
blk.append(
Residual(input_channels, num_channels, use_1x1conv=True,
strides=2))
else:
blk.append(Residual(num_channels, num_channels))
return blk
b2 = nn.Sequential(*resnet_block(64, 64, 2, first_block=True))
b3 = nn.Sequential(*resnet_block(64, 128, 2))
b4 = nn.Sequential(*resnet_block(128, 256, 2))
b5 = nn.Sequential(*resnet_block(256, 512, 2))
net = nn.Sequential(b1, b2, b3, b4, b5, nn.AdaptiveAvgPool2d((1, 1)),
nn.Flatten(), nn.Linear(512, 10))
X = torch.rand(size=(1, 1, 224, 224))
for layer in net:
X = layer(X)
print(layer.__class__.__name__, 'output shape:\t', X.shape)
数据增广
- 增加一个已有数据集,使得有更多的多样性
- 在语言里面加入各种不同的背景噪声
- 改变图片颜色和形状
- 流程
- 读取数据—随机读取—数据增强
- 可以认为是一个正则项
- 常见办法
- 翻转
- 左右翻转
- 上下翻转
- 切割
- 从图片中切割一块而后变形到固定形状
- 随机高宽比
- 随机大小
- 随机位置
- 从图片中切割一块而后变形到固定形状
- 颜色
- 改变色调,饱和度,明亮度
- 翻转
- 总结
- 数据增广通过变形数据来获得多样性从而使得模型泛化性能更好
- 常见图片增广包括翻转、切割、变色
- 代码实现
#===============================基础操作部分===============================#
!pip install d2l
!pip install matplotlib_inline
%matplotlib inline
import torch
import torchvision
from torch import nn
from d2l import torch as d2l
d2l.set_figsize() # 设置后续画图的大小
!wget -O ./pic/test.jpg "https://cdn.pixabay.com/photo/2017/02/20/18/03/cat-2083492__480.jpg" # 下载照片
img = d2l.Image.open('./pic/test.jpg') # 读取照片
d2l.plt.imshow(img) # 查看当前照片
def apply(img, aug, num_rows=2, num_cols=4, scale=1.5):
"""对img进行2*4张图的aug方法的增广"""
Y = [aug(img) for _ in range(num_rows*num_cols)]
d2l.show_images(Y, num_rows, num_cols, scale=scale)
# 图像左右翻转
apply(img, torchvision.transforms.RandomHorizontalFlip())
# 图像上下翻转
apply(img, torchvision.transforms.RandomVerticalFlip())
# 随机裁剪
shape_aug = torchvision.transforms.RandomResizedCrop(
(200, 200), scale=(0.3, 0.8), ratio=(0.5, 2)
)
# (200, 200)---不论裁剪多大最后都要放大到200200
# scale---裁剪的范围比例
# 拉伸比例
apply(img, shape_aug)
# 随机修改亮度
apply(img, torchvision.transforms.ColorJitter(brightness=0.5, contrast=0.2, saturation=0.6, hue=0.4))
# 增广方式的混合使用
color_aug = torchvision.transforms.ColorJitter(brightness=0.5, contrast=0.2, saturation=0.6, hue=0.4)
augs = torchvision.transforms.Compose([torchvision.transforms.RandomHorizontalFlip(), shape_aug, color_aug])
apply(img, augs)
- 完整数据增强训练案例
all_images = torchvision.datasets.CIFAR10(
train=True, root="./data", download=True
)
d2l.show_images([
all_images[i][0] for i in range(32)], 4, 8, scale=0.8);
train_augs = torchvision.transforms.Compose([
torchvision.transforms.RandomHorizontalFlip(),
torchvision.transforms.ToTensor()])
test_augs = torchvision.transforms.Compose([
torchvision.transforms.ToTensor()
])
# torchvision.transforms.ToTensor()转换增广使得其可以训练
# 测试不需要增广
# 定义辅助函数,便于读取图像和应用图像增广
def load_cifar10(is_train, augs, batch_size):
dataset = torchvision.datasets.CIFAR10(
root='./data', train=is_train, transform=augs, download=True
)
dataloader = torch.utils.data.DataLoader(
dataset, batch_size=batch_size, shuffle=is_train, num_workers=4
)
return dataloader
# 定义一个函数,使用多GPU对模型进行训练和评估
def train_batch_ch13(net, X, y, loss, trainer, devices):
if isinstance(X, list):
X = [x.to(devices[0]) for x in X]
else:
X = X.to(devices[0])
y = y.to(devices[0])
net.train()
trainer.zero_grad()
pred = net(X)
l = loss(pred, y)
l.sum().backward()
trainer.step()
train_loss_sum = l.sum()
train_acc_sum = d2l.accuracy(pred, y)
return train_loss_sum, train_acc_sum
def train_ch13(net, train_iter, test_iter, loss, trainer, num_epochs,
devices=d2l.try_all_gpus()):
timer, num_batches = d2l.Timer(), len(train_iter)
animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs], ylim=[0, 1],
legend=['train loss', 'train acc', 'test acc'])
net = nn.DataParallel(net, device_ids=devices).to(devices[0]) # 多GPU训练
for epoch in range(num_epochs):
metric = d2l.Accumulator(4)
for i, (features, labels) in enumerate(train_iter):
timer.start()
l, acc = train_batch_ch13(net, features, labels, loss, trainer,
devices)
metric.add(l, acc, labels.shape[0], labels.numel())
# 定义 train_with_data_aug 函数,使用图像增广来训练模型
batch_size, devices, net = 256, d2l.try_all_gpus(), d2l.resnet18(10, 3)
def init_weights(m):
if type(m) in [nn.Linear, nn.Conv2d]:
nn.init.xavier_uniform_(m.weight)
net.apply(init_weights)
def train_with_data_aug(train_augs, test_augs, net, lr=0.001):
train_iter = load_cifar10(True, train_augs, batch_size)
test_iter = load_cifar10(False, test_augs, batch_size)
loss = nn.CrossEntropyLoss(reduction="none")
trainer = torch.optim.Adam(net.parameters(), lr=lr)
train_ch13(net, train_iter, test_iter, loss, trainer, 10, devices)
# 训练模型
train_with_data_aug(train_augs, test_augs, net)