一、项目背景
- AiStudio是一个很好的学习平台,我相信无时无刻都有很多像我一样的小白出于对人工智能的兴趣,而汇聚在这里。这一次,我想做一个入门级的项目来和各位同学一起学习图像分割领域的基础任务——语义分割任务。
- 本次项目将基于Culane车道线数据集,搭建ERFnet网络,来实现简单的车道线检测。
二、分割任务简介
简单来说,其实图像分割任务可以分为以下三大类:
- 语义分割
- 实例分割
- 全景分割
在本项目中咱们将基于基础的语义分割任务进行展开,如果同学们对进阶的实例分割、全景分割感兴趣的话,在以后我也会有相关的项目分享,当然您也可以自行在aistudio上搜索,相信有很多优秀的开发者早已分享了他们的精选项目。
那么语义分割任务是什么呢?
总的来说,语义分割其实就是对一张图片中的物体按类别进行分割提取,本质是像素点级别上进行分类。
还是有点懵?那就再简单一点。
其实分割的概念和抠图有点像,只不过咱们抠图更多的时候是人为的手工操作,但是分割的话可以交给机器去进行实现,通俗的理解为抠图其实也并无不妥。
但值得注意的是!分割的结果并不是漂亮小姐姐的图像哦!我们语义分割模型输出的结果其实是类似图二的图像。并且当分割任务是对多个类别进行分割时,为了便于可视化,我们会对分割结果按照不同类别进行上色,因而上面的讲解中才得到了这样的结果。
“抠图”完成后,咱们按类别进行了上色。
那么接下来,就让咱们进入项目的实操部分吧,冲冲冲。
三、CULane车道线数据集简介
CULane是一个大规模且具有挑战性的车道线数据集,主要应用于行车道检测的学术研究。 它是由安装在六辆由北京不同驾驶员驾驶的不同车辆上的摄像机收集的。 收集了超过55小时的视频,并提取了133,235帧。 数据集分为88880张训练集图像,9675张验证集图像和34680张测试集图像。 测试集分为正常类别和8个具有挑战性的类别,分别对应于下面的9个示例。
在每一帧数据图像中,作者都使用三次样条曲线手动注释行车道。 对于车道被车辆遮挡或看不见的情况,作者仍会根据上下文对车道进行注释,作者还希望算法能够区分道路上的障碍。 因此,对于在障碍物另一侧的车道没有进行标注。 在此数据集中,作者将注意力集中在四个车道标记的检测上,这在实际应用中最受关注,其他车道标记未标注。
光看介绍有点懵?那咱们找组图片深♂入一下?
观察Label图像不难发现,数据集对原图中的4条车道线进行了标注,其他东西通通当作背景,而我们这次项目想要达到的效果也是如此,输入一张图片,模型输出一张类似Label图像一样的车道线分割结果。
四、ERFnet网络结构
具体来讲, ERFnet是一种基于残差连接和可分离卷积的语义分割网络,旨在在不降低准确率的同时提高模型处理帧数的效率,可以很好的满足在自动驾驶中对于实时性的要求,且由于其轻量级的特性,在进行硬件部署时也具有很好的契合度,因而在自动驾驶及车道线检测领域具有不俗的参考意义。ERFnet网络结构中所用到的Non-bottleneck-1D残差连接块,对经典残差网络开山之作Resnet中提出的Non-bottleneck残差块作出了改进,将二维卷积拆分为了两个一维卷积,增加了非线性化的层数,以及在整体上减少参数数量。改进后的Non-bottleneck-1D残差块可以在保持相同的参数数量的情况下,扩大卷积的尺寸,增强感受的能力,很好的保证了ERFnet网络的精确度。从总体结构来看,ERFnet沿用了U-net中经典的Encoder-Decoder结构,通过Encoder编码器进行下采样(提取特征),通过Decoder解码器进行上采样(还原图像),并在其中穿插了特征融合和空洞卷积,因而在模型轻量的情况下还能保证不俗的精度。
核心模块
具体实现细节可参考郑佬的项目
ERFNet:用于实时语义分割的高效残差分解卷积神经网络
当然啦,学习网络结构对于刚接触的同学来说可能比较吃力,学习是一个持续的过程,现在您可能一知半解,但随着知识的积累,我相信很快您就能理解其中的奥妙,但在此之前,难道因为不熟悉网络结构,咱们就不能进行下去了嘛? 当然不是~!
以下是我的一些简单的理解:
- 神经网络更多讲究搭建端到端网络,什么意思呢?其实就是您给网络一个输入,然后网络给您一个输出,而数据在网络中的运算过程对一般用户来说其实是不可见过程,说这个有什么意义呢?其实是这样的,现在开源的网络结构有很多,虽然咱们可能不太清楚其中的一些高深的原理,但其实咱们可以站在用户的角度来看待这个问题,虽然咱们不懂细节,但我们会用就行!
- 想要“走捷径”快速的使用一个陌生的网络结构,我们更多的是需要关注网络的输入和输出部分的构建,对于不同的任务,输出部分的构建往往也是不一样的。举个简单的例子,在图像分类任务中,网络的结尾会接全连接层,最后网络的输出是多个类别对应的概率值,但是在语义分割的网络中,咱们输出结果是与原图大小相等的多通道图像!其中通道数等于类别数!这是分割任务的一大特点,大家千万不要弄混了。所以结论是,对于咱们这个车道线检测项目,咱们输入的结果是形如[1,3,576,1640]的多维图像矩阵(图像是3通道图像所以通道数为3),咱们的输出结果是形如[1,2,576,1640]的多维图像矩阵,其中1是图像数量(咱们输入网络1张图像,所以输出也为1),其中2为类别数(咱们这个车道线检测任务,主要是2分类,一类是背景一类是车道线,所以这里输出结果通道数为2(分割任务的结果分类类别数等于通道数)),其中(576,1640)是输出图像的大小尺寸这与输入保持一致。
五、代码实操部分
该走的流程还是要走滴,该导的库还是要导滴。
import random
import paddle
import paddle.nn as nn
from paddle.nn import functional as F
from paddle.io import Dataset
from paddle.vision.transforms import transforms as T
import os
import io
import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt
from PIL import Image as PilImage
%matplotlib inline
# 第一步解压数据集
!unzip -oq /home/aistudio/data/data112899/CULane.zip #第一次运行记得解压一下数据集(只运行一次就可以啦)
# 第二步 数据集划分,在这里咱们按8:1:1的比例划分训练集、验证集、测试集
path_origin = 'CULane/JPEGImages/'
path_seg = 'CULane/Annotations/'
pic_dir = os.listdir(path_origin)
f_train = open('CULane/train_list.txt', 'w')
f_val = open('CULane/val_list.txt', 'w')
f_test = open('CULane/test_list.txt','w')
for i in range(len(pic_dir)):
if i % 9 == 0:
f_val.write(path_origin + pic_dir[i] + '\t' + path_seg + pic_dir[i].split('.')[0] + '.png' + '\n')
elif i % 10 == 0 :
f_test.write(path_origin + pic_dir[i] + '\t' + path_seg + pic_dir[i].split('.')[0] + '.png' + '\n')
else:
f_train.write(path_origin + pic_dir[i] + '\t' + path_seg + pic_dir[i].split('.')[0] + '.png' + '\n')
f_train.close()
f_val.close()
f_test.close()
# 第三步 测试一下生成的train_list.txt中的路径是否设置正确,根据路径读取图片进行展示。
with open('CULane/train_list.txt', 'r') as f:
i = 0
for line in f.readlines():
image_path, label_path = line.strip().split('\t')
image = np.array(PilImage.open(image_path))
label = np.array(PilImage.open(label_path))
if i > 2:
break
# 进行图片的展示
plt.figure()
plt.subplot(1,2,1),
plt.title('Train Image')
plt.imshow(image.astype('uint8'))
plt.axis('off')
plt.subplot(1,2,2),
plt.title('Label')
plt.imshow(label.astype('uint8'), cmap='gray')
plt.axis('off')
plt.show()
i = i + 1
# 第四步 构建数据读取器
IMAGE_SIZE = (576,1640)
class MyDateset(Dataset):
"""
数据集定义
"""
def __init__(self, mode='train'):
"""
构造函数
"""
self.image_size = IMAGE_SIZE
self.mode = mode.lower()
self.train_images = []
self.label_images = []
with open('CULane/{}_list.txt'.format(self.mode), 'r') as f:
for line in f.readlines():
image, label = line.strip().split('\t')
self.train_images.append(image)
self.label_images.append(label)
def _load_img(self, path, color_mode='rgb', transforms=[]):
"""
统一的图像处理接口封装,用于规整图像大小和通道
"""
with open(path, 'rb') as f:
img = PilImage.open(io.BytesIO(f.read()))
if color_mode == 'grayscale':
if img.mode not in ('L', 'I;16', 'I'):
img = img.convert('L')
elif color_mode == 'rgba':
if img.mode != 'RGBA':
img = img.convert('RGBA')
elif color_mode == 'rgb':
if img.mode != 'RGB':
img = img.convert('RGB')
else:
raise ValueError('color_mode must be "grayscale", "rgb", or "rgba"')
return T.Compose([T.Resize(self.image_size)] + transforms)(img)
def __getitem__(self, idx):
"""
返回 image, label
"""
train_image = self._load_img(self.train_images[idx],
transforms=[
T.Transpose(),
T.Normalize(mean=127.5, std=127.5)
]) # 加载原始图像
label_image = self._load_img(self.label_images[idx],
color_mode='grayscale',
transforms=[T.Grayscale()]) # 加载Label图像
# 返回image, label
train_image = np.array(train_image, dtype='float32')
label_image = np.array(label_image, dtype='int64')
return train_image, label_image
def __len__(self):
"""
返回数据集总数
"""
return len(self.train_images)
# 第五步 模型网络搭建,这里采用了ERFnet网络,当然啦实在看不懂也可以先暂时跳过,您只需要知道输入和输出的结果是什么就行。
class non_bottleneck_1d(paddle.nn.Layer):
def __init__(self, chann, dropprob, dilated):
super().__init__()
self.conv3x1_1 = paddle.nn.Conv2D(in_channels=chann, out_channels=chann, kernel_size=(3, 1), stride=1, padding=(1, 0), bias_attr=True)
self.conv1x3_1 = paddle.nn.Conv2D(in_channels=chann, out_channels=chann, kernel_size=(1, 3), stride=1, padding=(0, 1), bias_attr=True)
self.bn1 = paddle.nn.BatchNorm(chann, epsilon=1e-03)
self.conv3x1_2 = paddle.nn.Conv2D(in_channels=chann, out_channels=chann, kernel_size=(3, 1), stride=1, padding=(1 * dilated, 0), bias_attr=True,
dilation=(dilated, 1))
self.conv1x3_2 = paddle.nn.Conv2D(in_channels=chann, out_channels=chann, kernel_size=(1, 3), stride=1, padding=(0, 1 * dilated), bias_attr=True,
dilation=(1, dilated))
self.bn2 = paddle.nn.BatchNorm(chann, epsilon=1e-03)
self.dropout = paddle.nn.Dropout(dropprob)
self.p = dropprob
def forward(self, input):
output = self.conv3x1_1(input)
output = paddle.nn.functional.relu(output)
output = self.conv1x3_1(output)
output = self.bn1(output)
output = paddle.nn.functional.relu(output)
output = self.conv3x1_2(output)
output = paddle.nn.functional.relu(output)
output = self.conv1x3_2(output)
output = self.bn2(output)
if self.p != 0:
output = self.dropout(output)
return paddle.nn.functional.relu(output + input)
import paddle
class non_bottleneck_1d(paddle.nn.Layer):
def __init__(self, chann, dropprob, dilated):
super().__init__()
self.conv3x1_1 = paddle.nn.Conv2D(in_channels=chann, out_channels=chann, kernel_size=(3, 1), stride=1, padding=(1, 0), bias_attr=True)
self.conv1x3_1 = paddle.nn.Conv2D(in_channels=chann, out_channels=chann, kernel_size=(1, 3), stride=1, padding=(0, 1), bias_attr=True)
self.bn1 = paddle.nn.BatchNorm(chann, epsilon=1e-03)
self.conv3x1_2 = paddle.nn.Conv2D(in_channels=chann, out_channels=chann, kernel_size=(3, 1), stride=1, padding=(1 * dilated, 0), bias_attr=True,
dilation=(dilated, 1))
self.conv1x3_2 = paddle.nn.Conv2D(in_channels=chann, out_channels=chann, kernel_size=(1, 3), stride=1, padding=(0, 1 * dilated), bias_attr=True,
dilation=(1, dilated))
self.bn2 = paddle.nn.BatchNorm(chann, epsilon=1e-03)
self.dropout = paddle.nn.Dropout(dropprob)
self.p = dropprob
def forward(self, input):
output = self.conv3x1_1(input)
output = paddle.nn.functional.relu(output)
output = self.conv1x3_1(output)
output = self.bn1(output)
output = paddle.nn.functional.relu(output)
output = self.conv3x1_2(output)
output = paddle.nn.functional.relu(output)
output = self.conv1x3_2(output)
output = self.bn2(output)
if self.p != 0:
output = self.dropout(output)
return paddle.nn.functional.relu(output + input)
class DownsamplerBlock(paddle.nn.Layer):
def __init__(self, ninput, noutput):
super().__init__()
self.conv = paddle.nn.Conv2D(in_channels=ninput, out_channels=noutput-ninput, kernel_size=3,
stride=2, padding=1, bias_attr=True)
self.pool = paddle.nn.MaxPool2D(kernel_size=2, stride=2)
self.bn = paddle.nn.BatchNorm(noutput, epsilon=1e-3)
def forward(self, input):
output = paddle.concat(x=[self.conv(input), self.pool(input)], axis=1)
output = self.bn(output)
return paddle.nn.functional.relu(output)
class Encoder(paddle.nn.Layer):
def __init__(self, num_classes):
super().__init__()
self.initial_block = DownsamplerBlock(3, 16)
self.layers = paddle.nn.LayerList()
self.layers.append(DownsamplerBlock(16, 64))
for x in range(0, 5): # 5 times
self.layers.append(non_bottleneck_1d(64, 1, 1))
self.layers.append(DownsamplerBlock(64, 128))
for x in range(0, 2): # 2 times
self.layers.append(non_bottleneck_1d(128, 1, 2))
self.layers.append(non_bottleneck_1d(128, 1, 4))
self.layers.append(non_bottleneck_1d(128, 1, 8))
self.layers.append(non_bottleneck_1d(128, 1, 16))
self.output_conv = paddle.nn.Conv2D(in_channels=128, out_channels=num_classes, kernel_size=1, stride=1, padding=0, bias_attr=True)
def forward(self, input, predict=False):
output = self.initial_block(input)
for layer in self.layers:
output = layer(output)
if predict:
output = self.output_conv(output)
return output
class UpsamplerBlock(paddle.nn.Layer):
def __init__(self, ninput, noutput, output_size=[16, 16]):
super().__init__()
self.conv = paddle.nn.Conv2DTranspose(ninput, noutput, kernel_size=3, stride=2, padding=1, bias_attr=True)
self.bn = paddle.nn.BatchNorm(noutput, epsilon=1e-3)
self.output_size = output_size
def forward(self, input):
output = self.conv(input, output_size=self.output_size)
output = self.bn(output)
return paddle.nn.functional.relu(output)
class Decoder(paddle.nn.Layer):
def __init__(self, num_classes, raw_size=[576, 1640]):
super().__init__()
self.layers = paddle.nn.LayerList()
self.raw_size = raw_size
self.layers.append(UpsamplerBlock(128, 64, output_size=[raw_size[0] // 4, raw_size[1] // 4]))
self.layers.append(non_bottleneck_1d(64, 0, 1))
self.layers.append(non_bottleneck_1d(64, 0, 1))
self.layers.append(UpsamplerBlock(64, 16, output_size=[raw_size[0] // 2, raw_size[1] // 2]))
self.layers.append(non_bottleneck_1d(16, 0, 1))
self.layers.append(non_bottleneck_1d(16, 0, 1))
self.output_conv = paddle.nn.Conv2DTranspose(16, num_classes, kernel_size=2, stride=2, padding=0, bias_attr=True)
def forward(self, input):
output = input
for layer in self.layers:
output = layer(output)
output = self.output_conv(output, output_size=[self.raw_size[0], self.raw_size[1]])
return output
class ERFNet(paddle.nn.Layer):
def __init__(self, num_classes, raw_size=[576, 1640]):
super().__init__()
self.encoder = Encoder(num_classes)
self.decoder = Decoder(num_classes, raw_size=raw_size)
def forward(self, input):
output = self.encoder(input)
return self.decoder.forward(output)
# 第六步 查看一下网络结构,测试一下能不能跑通。
paddle.summary(ERFNet(2),(1,3,576,1640))
# 第七步 实例化训练集、验证集、测试集
train_dataset = MyDateset(mode='train') # 训练数据集
val_dataset = MyDateset(mode='val') # 验证数据集
test_dataset = MyDateset(mode='test') # 测试数据集
train_dataloader = paddle.io.DataLoader(
train_dataset,
batch_size=16,
shuffle=True,
drop_last=False)
val_dataloader = paddle.io.DataLoader(
val_dataset,
batch_size=1,
shuffle=True,
drop_last=False)
test_dataloader = paddle.io.DataLoader(
test_dataset,
batch_size=1,
shuffle=True,
drop_last=False)
# 配置模型、loss函数、优化器
model = ERFNet(num_classes=2)
model.train()
loss_fn = paddle.nn.CrossEntropyLoss(axis=1)
max_epoch=20
scheduler = paddle.optimizer.lr.CosineAnnealingDecay(learning_rate=0.001, T_max=max_epoch)
opt = paddle.optimizer.Adam(learning_rate=scheduler, parameters=model.parameters())
# 可用于加载之前训练的模型文件
# model_state_dict = paddle.load("save_model/epoch_3.pdparams")
# opt_state_dict = paddle.load("save_model/epoch_3.pdopt")
# model.set_state_dict(model_state_dict)
# opt.set_state_dict(opt_state_dict)
# 模型训练并记录
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
f_log = open('log.txt', 'a')
for epoch in range(0,max_epoch):
for step, data in enumerate(train_dataloader):
img, label = data
pre = model(img)
loss = loss_fn(pre, label)
predicts = paddle.argmax(pre, axis=1)
# 计算miou
miou,wrong,correct=paddle.fluid.layers.mean_iou(predicts,label,2)
loss.backward()
opt.step()
opt.clear_gradients()
if step % 100 == 0 and step!=0:
temp ="epoch: {}, step : {}, loss is: {}, miou is: {}".format(epoch, step, loss.numpy(),miou.numpy())
print(temp)
f_log.write(temp+'\n')
paddle.save(model.state_dict(),"save_model/epoch_{}.pdparams".format(epoch))
paddle.save(opt.state_dict(),"save_model/epoch_{}.pdopt".format(epoch))
f_log.close()
# 模型训练完后,咱们可以用动态图的形式保存,这样一来的话,下次加载模型就不需要重新加载网络结构啦
from paddle.static import InputSpec
path = "./export_model/MyERFnet"
paddle.jit.save(
layer=model,
path=path,
input_spec=[InputSpec(shape=[1,3,576,1640])])
六、模型推理部分
# 加载模型
path = "./export_model/MyERFnet"
loaded_layer = paddle.jit.load(path)
loaded_layer.eval()
for i in range(0,5):
img = cv.imread("/home/aistudio/CULane/JPEGImages/{}.jpg".format(i))
b, g, r = cv.split(img)
img = cv.merge([r, g, b])
orign = img
orign = cv.resize(orign,(1640,576))
transforms =T.Compose([
T.Resize((576,1640)),
T.Transpose(),
T.Normalize(mean=127.5, std=127.5)
])
img = transforms(img)
img = paddle.to_tensor(img)
img = paddle.unsqueeze(img, axis=0)
result = loaded_layer(img)
result = paddle.squeeze(result, axis=0)
result = paddle.transpose(result, perm=[1, 2, 0])
result = np.array(result)
result = np.argmax(result,-1)
plt.figure()
plt.subplot(1,2,1),
plt.title('img')
plt.imshow(orign.astype('uint8'))
plt.axis('off')
plt.subplot(1,2,2),
plt.title('predict')
plt.imshow(result.astype('uint8'), cmap='gray')
plt.axis('off')
plt.show()
七、本地部署效果演示
作者:萧泽锋
文章来源:AI Studio
推荐阅读
更多芯擎AI开发板干货请关注芯擎AI开发板专栏。欢迎添加极术小姐姐微信(id:aijishu20)加入技术交流群,请备注研究方向。