之前我们有跟大家讨论过如何DIY一个轻型的Mobilenet SSD的物体检测器,本专栏的其他文章亦有介绍如何部署该类轻型MSSD的物体检测器于嵌入式设备中,如RK3288、RK3399等。本文我们继续跟大家讨论轻型的神经网络,今天想跟大家介绍的是回归器(Regressor)。回归器不同于分类器,回归器旨在利用CNN学习如何预测连续的值,分类器对应的输出非离散的id(二分类、多分类)。我们不展开去介绍回归器的理论知识,下面我们介绍一些回归器的应用场景。
首发:https://zhuanlan.zhihu.com/p/71648953
作者:张新栋
我们在实际的计算机视觉项目中,其实很多场景应用到了回归器。比如人脸的关键点检测,就是一个特殊的回归器应用场景。以68个关键点的检测任务为例,其实质就是转化成一个输出[1x136]向量的回归器;再比如人脸姿态估计,也是一个典型的回归器模型。人脸姿态估计目前主流的做法分两种,一种的2D->3D的方式,即先检测人脸关键点,然后计算关键点和平均正脸关键点的一个投影矩阵,最后迭代优化以估算出人脸的姿态pitch、roll、yaw;另外一种是转化成一个直接回归的模型,即输出一个1x3的向量,分别代表roll、yaw、pitch,随后以数据驱动的方式去训练该模型。
介绍了回归器和其常见的应用以后,我们回到本文的主题:如何DIY轻型的Mobilenet回归器。Mobilenet已经是一个对嵌入式设备非常友好的网络结构,在许多嵌入式设备中都能高效的运行。本文中,我们将跟大家介绍如何基于Mobilenet,DIY一个轻型的回归器。以人脸姿态估计这个任务为例,我将其分为如下几个阶段(这里不考虑数据准备工作),模型设计、数据导入、训练参数。我们以tensorflow-keras作为示例的框架。
模型设计
我们的最终目的是要将训练好的模型部署到嵌入式设备中,所以尽量避免一些对嵌入式设备计算不友好的Op。阅读过本专栏其他文章的朋友也许会比较熟悉,一般需要注意的Op有Padding、Squeeze、Reshape、GlobalAveragePooing等,不过具体实现需要参考你部署所借助的inference框架。Mobilenet的设计可参考如下代码:
def build_model(depth_multiptiler = 1.0, image_size = 64, channels = 3):
input = keras.Input(shape=(image_size,image_size,channels), name = "input")
mobilenet_feature = MobileNetBase(
img_input = input,
alpha = depth_multiptiler,
)
x0 = keras.layers.Dense(128, activation=tf.nn.relu)(mobilenet_feature)
x0 = keras.layers.Dropout(0.2)(x0)
pitch = keras.layers.Dense(1)(x0)
x1 = keras.layers.Dense(128, activation=tf.nn.relu)(mobilenet_feature)
x1 = keras.layers.Dropout(0.2)(x1)
yaw = keras.layers.Dense(1)(x1)
x2 = keras.layers.Dense(128, activation=tf.nn.relu)(mobilenet_feature)
x2 = keras.layers.Dropout(0.2)(x2)
roll = keras.layers.Dense(1)(x2)
output = keras.layers.concatenate([
pitch, yaw, roll,
], axis=-1)
model = keras.Model(inputs = input, outputs = output)
print (model.summary())
return model
def MobileNetBase(
img_input=None,
alpha=1.0,
depth_multiplier=1,
dropout=1e-3,
include_top=True,
input_tensor=None,
pooling=None,
name = "input"):
x = _conv_block(img_input, 32, alpha, strides=(2, 2))
x = _depthwise_conv_block(x, 64, alpha, depth_multiplier, block_id=1)
x = _depthwise_conv_block(x, 128, alpha, depth_multiplier,
strides=(2, 2), block_id=2)
x = _depthwise_conv_block(x, 128, alpha, depth_multiplier, block_id=3)
x = _depthwise_conv_block(x, 256, alpha, depth_multiplier,
strides=(2, 2), block_id=4)
x = _depthwise_conv_block(x, 256, alpha, depth_multiplier, block_id=5)
x = _depthwise_conv_block(x, 512, alpha, depth_multiplier,
strides=(2, 2), block_id=6)
x = _depthwise_conv_block(x, 512, alpha, depth_multiplier, block_id=7)
x = _depthwise_conv_block(x, 512, alpha, depth_multiplier, block_id=8)
x = _depthwise_conv_block(x, 512, alpha, depth_multiplier, block_id=9)
x = _depthwise_conv_block(x, 512, alpha, depth_multiplier, block_id=10)
x = _depthwise_conv_block(x, 512, alpha, depth_multiplier, block_id=11)
x = _depthwise_conv_block(x, 1024, alpha, depth_multiplier, strides=(2, 2), block_id=12)
x = _depthwise_conv_block(x, 1024, alpha, depth_multiplier, block_id=13)
x = layers.AveragePooling2D(pool_size=(2, 2), padding='valid')(x)
x = layers.Reshape((int(1024*alpha),), name='reshape_1')(x)
return x
def _conv_block(inputs, filters, alpha, kernel=(3, 3), strides=(1, 1)):
channel_axis = -1
filters = int(filters * alpha)
x = layers.ZeroPadding2D(padding=((0, 1), (0, 1)), name='conv1_pad')(inputs)
x = layers.Conv2D(filters, kernel,
padding='valid',
use_bias=False,
strides=strides,
name='conv1')(x)
x = layers.BatchNormalization(axis=channel_axis, name='conv1_bn')(x)
return layers.ReLU(6., name='conv1_relu')(x)
def _depthwise_conv_block(inputs, pointwise_conv_filters, alpha,
depth_multiplier=1, strides=(1, 1), block_id=1):
# channel_axis = 1 if backend.image_data_format() == 'channels_first' else -1
channel_axis = -1
pointwise_conv_filters = int(pointwise_conv_filters * alpha)
if strides == (1, 1):
x = inputs
else:
x = layers.ZeroPadding2D(((0, 1), (0, 1)),name='conv_pad_%d' % block_id)(inputs)
x = layers.DepthwiseConv2D((3, 3),
padding='same' if strides == (1, 1) else 'valid',
depth_multiplier=depth_multiplier,
strides=strides,
use_bias=False,
name='conv_dw_%d' % block_id)(x)
x = layers.BatchNormalization(
axis=channel_axis, name='conv_dw_%d_bn' % block_id)(x)
x = layers.ReLU(6., name='conv_dw_%d_relu' % block_id)(x)
x = layers.Conv2D(pointwise_conv_filters, (1, 1),
padding='same',
use_bias=False,
strides=(1, 1),
name='conv_pw_%d' % block_id)(x)
x = layers.BatchNormalization(axis=channel_axis,
name='conv_pw_%d_bn' % block_id)(x)
return layers.ReLU(6., name='conv_pw_%d_relu' % block_id)(x)
其中,为了对该回归器进行加速,默认的输入尺寸设定为64x64,depth\_multiplier可根据实际情况选取0.5~1.0。这里我们回归器输出的是三个1x1的向量,该类回归器的设计方式在实际应用中可调整的效果要优于直接输出一个1x3的向量,不过引入的代价是增加了全连接层的数量。
数据导入
基于tensorflow-keras训练的数据导入大致分如下几种,一种是直接加载仅RAM中,该类方式的有点是训练加载数据块,缺点是在面对较大的数据量时不适用;另一种是采用文件形式按照特定格式加载训练数据,训练数据的标注文件先载入RAM中,然后每次按批次的加载训练数据及标注数据,此类方式的优点是可处理数据量较大的训练数据,缺点是加载数据的时效性较差(可以开启多线程加速数据的加载过程)。本文介绍的是以文件形式进行加载。先看一下文件的标注格式,
filename,pitch,yaw,roll
LFPW/LFPW_image_train_0005_5.jpg,-0.248872,-0.503261,-0.082886
首先第一行为标注文件的格式,第二行开始往下都是标注数据。第一列元素为图片文件名,第二列元素为pitch角度,第三个为yaw角度,第四个为roll角度。准备好标注文件以后,我们需要建立标注文件到数据加载的流程,可参考如下代码。
datagen = tf.keras.preprocessing.image.ImageDataGenerator(
featurewise_center=False,
samplewise_center=False,
featurewise_std_normalization=False,
samplewise_std_normalization=False,
zca_whitening=False,
zca_epsilon=1e-06,
rotation_range=10,
width_shift_range=0.1,
height_shift_range=0.1,
brightness_range=(1, 1.3),
shear_range=0.1,
zoom_range=(0.85, 1.15),
channel_shift_range=0.0,
fill_mode='nearest',
cval=0.0,
horizontal_flip=False,
vertical_flip=False,
rescale=None,
preprocessing_function=headpose_preprocess,
data_format=None,
validation_split=0.15,
dtype=None)
data_frame = pd.read_csv(annotation_file)
train_data_generator = datagen.flow_from_dataframe(
dataframe = data_frame,
directory = data_dir,
x_col = "filename",
y_col = ["pitch", "yaw", "roll"],
class_mode = "other",
target_size = (image_size, image_size),
batch_size = batch_size,
shuffle = True,
subset = "training"
)
valid_data_generator = datagen.flow_from_dataframe(
dataframe = data_frame,
directory = data_dir,
x_col = "filename",
y_col = ["pitch", "yaw", "roll"],
class_mode = "other",
target_size = (image_size, image_size),
batch_size = batch_size,
shuffle = True,
subset = "validation"
)
以上代码我们还需要注意的一点是,数据的预处理函数,这里面的可操作性比较灵活,以如下代码为例,我们在预处理做了数据增广、数据白化的处理:
seq = iaa.Sequential([
iaa.OneOf([
iaa.GaussianBlur(sigma=(0, 0.5)), # blur images with a sigma of 0 to 1.0
iaa.Dropout((0.0, 0.03), name="Dropout"),
iaa.AdditiveGaussianNoise(scale=0.03*255, name="MyLittleNoise"),
iaa.AdditiveGaussianNoise(loc=32, scale=0.001*255, name="SomeOtherNoise"),
])
])
def headpose_preprocess(image):
if np.random.random_sample() > 0.5:
image = seq.augment_image(image)
image = np.array(image)
converted_img = image - 123.0
converted_img /= 58.0
return converted_img
训练参数
训练参数里我们需要注意如下几个方面,一个是采用什么优化算法(SGD、ADAM、Momentum等)、learning rate(影响模型收敛、发散)、损失函数(回归器一般采用Mean-Square-Error,MSE)、回调函数(可处理模型保存、日志打印等等)。在使用tensorflow-keras处理以上任务非常简单方便,参考如下代码:
# Create checkpoint callback
callback = tf.keras.callbacks.ModelCheckpoint(
checkpoint_path,
save_weights_only=True,
verbose=1,
period=1
)
# build model
model = build_model(depth_multiptiler, image_size, channels)
optimizer = keras.optimizers.Adam(lr)
model.compile(
loss='mean_squared_error',
optimizer=optimizer,
metrics=['mean_absolute_error', 'mean_squared_error']
)
# load check point
latest_ck_path = tf.train.latest_checkpoint(checkpoint_dir)
if latest_ck_path is not None:
model.load_weights(latest_ck_path)
# fit model with data generator
model.fit_generator(
generator = train_data_generator,
steps_per_epoch = train_data_generator.samples,
epochs = EPOCHS,
validation_data = valid_data_generator,
validation_steps = valid_data_generator.samples,
verbose = 1,
use_multiprocessing = False,
callbacks=[callback]
)
最后
在进行完以上步骤后,我们就可以进行模型的训练。回过头来再看看模型本身,基于Mobilenet、输入size为64x64、depth\_multiplier为0.75,该模型在RK3399中一次inference仅需要10ms,运行效率相当可观。在后续专栏内容里,我们还会跟大学继续讨论,如何去训练该类小型网络以得到好的inference效果,如采用数据蒸馏、知识迁移、半监督学习等等。欢迎大家留言讨论、关注专栏,谢谢大家!
推荐阅读
专注嵌入式端的AI算法实现,欢迎关注作者微信公众号和知乎嵌入式AI算法实现专栏。
更多嵌入式AI相关的技术文章请关注极术嵌入式AI专栏