首发:https://zhuanlan.zhihu.com/p/79701540
作者:张新栋最近打通了mxnet在research线和产品线上的应用流程,关于产品线的流程可以参考这篇文章简介 整合mxnet和MNN的嵌入式部署流程。mxnet这几天的使用下来,最大的感受就是灵活、灵活、灵活,无论是AI论文的算法复现还是产品线上的算法预研,mxnet都能大大加速你的工作,强烈推荐。
关于本文中想跟大家介绍的人脸关键点检测,专栏中有一篇文章对其背景进行过检测的介绍,PFLD:一个实用的人脸关键点检测器。目前基于关键点的检测一般有如下两种主流方法,一种是基于regression的方式,直接对关键点的(x,y)坐标进行值回归;另外一种是基于heatmap的方式,在N个heatmap中寻找argmax的位置来代替关键点的坐标(其中N为关键点的个数)。本文主要介绍第一种方式,直接进行关键点的值回归。
制作mxnet的训练数据
这里我们采用mxnet中的MXRecordIO作为储存人脸关键点训练数据的结构,该方式比直接一次性载入内存的方式要灵活一些,内存开销较小,数据加载的速度还不错(只要你的电脑和硬盘给力)。另外训练数据我们采用的是WFLW,大家可以在上述网页中下载训练数据。mxrecord的制作很简单,图片数据我们用pack\_img进行编码,关键点的groundtrue我们存储在IRHeader中一起写入mxrecord文件中。
import os
import cv2
import mxnet as mx
import numpy as np
def make_pfld_record(output=None, listName=None, imageFolder=None, dest_size=98):
record = mx.recordio.MXRecordIO(output, 'w')
File = open(listName, 'r')
line = File.readline()
idx = 0
while line:
idx += 1
print(idx)
info = line.split(' ')
filename = info[0].split('/')[-1]
image = cv2.imread(os.path.join(imageFolder, filename))
image = cv2.resize(image, (dest_size, dest_size))
lmks = []
for i in range(0, 98):
x = float(info[i*2 + 1])
y = float(info[i*2 + 2])
lmks.append(x)
lmks.append(y)
categories = []
for i in range(0, 6):
categories.append(
float(info[1 + 98*2 + i])
)
angles = []
for i in range(0, 3):
angles.append(
float(info[1 + 98*2 + 6 + i])
)
label = lmks + categories + angles
header = mx.recordio.IRHeader(0, label, i, 0)
packed_s = mx.recordio.pack_img(header, image)
record.write(packed_s)
line = File.readline()
if File is not None:
File.close()
record.close()
if __name__ == '__main__':
train_record_name = './datas/pfld_train_data.rec'
valid_record_name = './datas/pfld_valid_data.rec'
train_file = './datas/train_data/list.txt'
train_folder = './datas/train_data/imgs/'
valid_file = './datas/test_data/list.txt'
valid_folder = './datas/test_data/imgs/'
image_size = 96
make_pfld_record(train_record_name, train_file, train_folder, image_size)
make_pfld_record(valid_record_name, valid_file, valid_folder, image_size)
设计网络模型
这里我们采用直接回归的方式来进行人脸关键点的预测,所以为了举例方便,这里采用简单传统的CNN特征提取器,在进行完特征提取后直接flatten后接dense layer进行人脸关键点的预测。使用mxnet的gluon进行网络结构的定义,十分方便,代码如下:
import mxnet as mx
from mxnet.gluon import nn
from mxnet.gluon.model_zoo import vision
from mxnet import gluon
class BASE(mx.gluon.HybridBlock):
def __init__(self, num_of_pts=98, **kwargs):
super(BASE, self).__init__(**kwargs)
self.pts_num = num_of_pts
self.lmks_net = mx.gluon.nn.HybridSequential()
self.lmks_net.add(
nn.Conv2D(channels=16, kernel_size=(3,3), strides=(2,2), padding=(1,1)),
nn.BatchNorm(),
nn.Activation('relu'),
nn.Conv2D(channels=32, kernel_size=(3,3), strides=(1,1), padding=(1,1)),
nn.BatchNorm(),
nn.Activation('relu'),
nn.Conv2D(channels=32, kernel_size=(3,3), strides=(2,2), padding=(1,1)),
nn.BatchNorm(),
nn.Activation('relu'),
nn.Conv2D(channels=32, kernel_size=(3,3), strides=(1,1), padding=(1,1)),
nn.BatchNorm(),
nn.Activation('relu'),
nn.Conv2D(channels=32, kernel_size=(3,3), strides=(2,2), padding=(1,1)),
nn.BatchNorm(),
nn.Activation('relu'),
nn.Conv2D(channels=64, kernel_size=(3,3), strides=(1,1), padding=(1,1)),
nn.BatchNorm(),
nn.Activation('relu'),
nn.Conv2D(channels=64, kernel_size=(3,3), strides=(2,2), padding=(1,1)),
nn.BatchNorm(),
nn.Activation('relu'),
nn.Conv2D(channels=128, kernel_size=(3,3), strides=(1,1), padding=(1,1)),
nn.BatchNorm(),
nn.Activation('relu'),
nn.MaxPool2D(pool_size=(2,2), strides=(2,2)),
nn.Flatten(),
nn.Dense(units=512, activation=None),
nn.Dense(units=self.pts_num*2, activation=None)
)
def hybrid_forward(self, F, x):
return self.lmks_net(x)
if __name__ == '__main__':
x = mx.nd.random.uniform(0.0, 1.0, shape=(1, 3, 96, 96))
net = BASE(num_of_pts=98)
net.initialize(init=mx.initializer.Xavier())
net.summary(x)
网络训练
网络训练我们采用固定学习率0.0001,adam优化算法进行优化,loss采用的是MSE。mxnet的gluon也提供了非常方便的工具和接口进行调用,代码如下:
import mxnet as mx
from models.NPFLD import NPFLD
from models.CPFLD import CPFLD
from models.BASE import BASE
from models.MSBASE import MSBASE
import numpy as np
from mxnet import nd
from mxnet import autograd
import os
import sys
import math
import cv2
import argparse
def preprocess(data):
data = (data-123.0) / 58.0
return data
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="pfld landmarks detector")
parser.add_argument("--output_dir", type = str, default = None)
parser.add_argument("--pretrain_param", type = str, default = None)
parser.add_argument("--train_data_path", type = str, default = None)
parser.add_argument("--valid_data_path", type = str, default = None)
parser.add_argument("--learning_rate", type = float, default = 0.0001)
parser.add_argument("--batch_size", type = int, default = 128)
parser.add_argument("--epoches", type = int, default = 1000)
parser.add_argument("--gpu_ids", type = str, default = "0,1")
parser.add_argument("--image_size", type = int, default = 112)
parser.add_argument("--num_of_pts", type = int, default = 98)
parser.add_argument("--model_type", type = str, default = 'NPFLD')
parser.add_argument("--logfile_name", type = str, default = 'log.txt')
parser.add_argument("--with_angle_loss", type = str, default = 1)
parser.add_argument("--with_category_loss", type = int, default = 0)
parser.add_argument("--alpha", type = float, default = 1.0)
args = parser.parse_args()
train_data_file = args.train_data_path
valid_data_file = args.valid_data_path
output_dir = args.output_dir
if not os.path.exists(output_dir):
os.makedirs(output_dir)
use_gpu = None
devices = []
if 'None' in args.gpu_ids:
use_gpu = False
devices.append(mx.cpu())
else:
use_gpu = True
gpu_infos = args.gpu_ids.split(',')
for gi in gpu_infos:
devices.append(mx.gpu(int(gi)))
image_size = args.image_size
batch_size = args.batch_size
epoches = args.epoches
base_lr = args.learning_rate
pts_num = args.num_of_pts
alpha = args.alpha
model_type = args.model_type
with_category = args.with_category_loss
with_angle = args.with_angle_loss
logF_name = os.path.join(output_dir, args.logfile_name)
net = BASE(num_of_pts=pts_num)
net.initialize(mx.init.Normal(sigma=0.001), ctx=devices, force_reinit=True)
net.hybridize()
if args.pretrain_param is not None:
net.load_parameters(args.pretrain_param)
huber_loss = mx.gluon.loss.HuberLoss(rho=5)
mse_loss = mx.gluon.loss.L2Loss()
lmks_metric = mx.metric.MAE()
angs_metric = mx.metric.MAE()
lr_epoch = []
train_iter = mx.io.ImageRecordIter(
path_imgrec=train_data_file,
data_shape=(3, image_size, image_size),
batch_size=batch_size,
label_width=205,
shuffle = True,
shuffle_chunk_size = 1024,
seed = 1234,
prefetch_buffer = 10,
preprocess_threads = 16
)
## trainning
trainer = mx.gluon.Trainer(
params=net.collect_params(),
optimizer='adam',
optimizer_params={'learning_rate': base_lr}
)
for epoch in range(0, epoches):
# reset data iterator
train_iter.reset()
batch_idx = 0
for batch in train_iter:
batch_idx += 1
batch_size = batch.data[0].shape[0]
data = batch.data[0]
data = preprocess(data)
labels = batch.label[0]
lmks = labels[:, 0:98*2] * image_size
cate = labels[:, 2*98+1:2*98+6]
angs = labels[:, -3:] * np.pi / 180.0
cat_ratios = nd.mean(cate, axis=0)
cat_ratios = (cat_ratios > 0.0) * (1.0 / (cat_ratios+0.00001))
cate = cate * cat_ratios
cate = nd.sum(cate, axis=1)
cate = (cate <= 0.0001) * 1 + cate
data_list = mx.gluon.utils.split_and_load(data, ctx_list=devices, even_split=False)
lmks_list = mx.gluon.utils.split_and_load(lmks, ctx_list=devices, even_split=False)
angs_list = mx.gluon.utils.split_and_load(angs, ctx_list=devices, even_split=False)
cate_list = mx.gluon.utils.split_and_load(cate, ctx_list=devices, even_split=False)
loss_list = []
with mx.autograd.record():
for data, lmks, angs, cate in zip(data_list, lmks_list, angs_list, cate_list):
lmks_regs = net(data)
lmks_regs = nd.Flatten(lmks_regs)
lmks_loss = nd.square(lmks_regs - lmks)
lmks_loss = nd.sum(lmks_loss, axis=1)
loss = lmks_loss
if with_category:
loss = loss * cate
loss_list.append(loss)
lmks_metric.update(lmks, lmks_regs)
for loss in loss_list:
loss.backward()
trainer.step(batch_size=batch_size, ignore_stale_grad=True)
batch_loss = sum([l.sum().asscalar() for l in loss_list]) / batch_size
net.export(os.path.join(output_dir, 'lmks_detector'), epoch=epoch+1)
lmks_metric.reset()
最后
本文介绍了基于简单小网络的人脸关键点检测,采用mxnet实现起来相当方便,在WFLW上面的NME在0.07,FR在0.17左右,如果采用更合理的data augmentation策略,还可以取得更好的结果。后续的文章我们将会介绍如何采用 MNN 等inference框架来进行端上的人脸检测器部署。另外欢迎大家留言讨论、关注本专栏及公众号,谢谢大家!
参考阅读
专注嵌入式端的AI算法实现,欢迎关注作者微信公众号和知乎嵌入式AI算法实现专栏。
更多嵌入式AI相关的技术文章请关注极术嵌入式AI专栏