张新栋 · 2020年04月17日

基于mxnet设计小型人脸关键点检测器

首发:https://zhuanlan.zhihu.com/p/79701540
作者:张新栋

最近打通了mxnet在research线和产品线上的应用流程,关于产品线的流程可以参考这篇文章简介 整合mxnet和MNN的嵌入式部署流程。mxnet这几天的使用下来,最大的感受就是灵活、灵活、灵活,无论是AI论文的算法复现还是产品线上的算法预研,mxnet都能大大加速你的工作,强烈推荐。

关于本文中想跟大家介绍的人脸关键点检测,专栏中有一篇文章对其背景进行过检测的介绍,PFLD:一个实用的人脸关键点检测器。目前基于关键点的检测一般有如下两种主流方法,一种是基于regression的方式,直接对关键点的(x,y)坐标进行值回归;另外一种是基于heatmap的方式,在N个heatmap中寻找argmax的位置来代替关键点的坐标(其中N为关键点的个数)。本文主要介绍第一种方式,直接进行关键点的值回归。

制作mxnet的训练数据

这里我们采用mxnet中的MXRecordIO作为储存人脸关键点训练数据的结构,该方式比直接一次性载入内存的方式要灵活一些,内存开销较小,数据加载的速度还不错(只要你的电脑和硬盘给力)。另外训练数据我们采用的是WFLW,大家可以在上述网页中下载训练数据。mxrecord的制作很简单,图片数据我们用pack\_img进行编码,关键点的groundtrue我们存储在IRHeader中一起写入mxrecord文件中。

import os
import cv2
import mxnet as mx
import numpy as np

def make_pfld_record(output=None, listName=None, imageFolder=None, dest_size=98):
    record = mx.recordio.MXRecordIO(output, 'w')
    File = open(listName, 'r')
    line = File.readline()
    idx =  0
    while line:
        idx += 1
        print(idx)
        info = line.split(' ')
        filename = info[0].split('/')[-1]
        image = cv2.imread(os.path.join(imageFolder, filename))
        image = cv2.resize(image, (dest_size, dest_size))
        lmks = []
        for i in range(0, 98):
            x = float(info[i*2 + 1]) 
            y = float(info[i*2 + 2])
            lmks.append(x)
            lmks.append(y)
        categories = []
        for i in range(0, 6):
            categories.append(
                float(info[1 + 98*2 + i])
            )
        angles = []
        for i in range(0, 3):
            angles.append(
                float(info[1 + 98*2 + 6 + i])
            )
        label  = lmks + categories + angles

        header = mx.recordio.IRHeader(0, label, i, 0)
        packed_s = mx.recordio.pack_img(header, image)
        record.write(packed_s)

        line = File.readline()

    if File is not None:
        File.close()
    record.close()

if __name__ == '__main__':
    train_record_name = './datas/pfld_train_data.rec'
    valid_record_name = './datas/pfld_valid_data.rec'

    train_file = './datas/train_data/list.txt'
    train_folder = './datas/train_data/imgs/'
    valid_file = './datas/test_data/list.txt'
    valid_folder = './datas/test_data/imgs/'
    
    image_size = 96
    make_pfld_record(train_record_name, train_file, train_folder, image_size)
    make_pfld_record(valid_record_name, valid_file, valid_folder, image_size)

设计网络模型

这里我们采用直接回归的方式来进行人脸关键点的预测,所以为了举例方便,这里采用简单传统的CNN特征提取器,在进行完特征提取后直接flatten后接dense layer进行人脸关键点的预测。使用mxnet的gluon进行网络结构的定义,十分方便,代码如下:

import mxnet as mx
from mxnet.gluon import nn
from mxnet.gluon.model_zoo import vision
from mxnet import gluon

class BASE(mx.gluon.HybridBlock):
    def __init__(self, num_of_pts=98, **kwargs):
        super(BASE, self).__init__(**kwargs)
        self.pts_num = num_of_pts
        self.lmks_net = mx.gluon.nn.HybridSequential()
        self.lmks_net.add(
            nn.Conv2D(channels=16, kernel_size=(3,3), strides=(2,2), padding=(1,1)),
            nn.BatchNorm(),
            nn.Activation('relu'),

            nn.Conv2D(channels=32, kernel_size=(3,3), strides=(1,1), padding=(1,1)),
            nn.BatchNorm(),
            nn.Activation('relu'),
            
            nn.Conv2D(channels=32, kernel_size=(3,3), strides=(2,2), padding=(1,1)),
            nn.BatchNorm(),
            nn.Activation('relu'),

            nn.Conv2D(channels=32, kernel_size=(3,3), strides=(1,1), padding=(1,1)),
            nn.BatchNorm(),
            nn.Activation('relu'),

            nn.Conv2D(channels=32, kernel_size=(3,3), strides=(2,2), padding=(1,1)),
            nn.BatchNorm(),
            nn.Activation('relu'),

            nn.Conv2D(channels=64, kernel_size=(3,3), strides=(1,1), padding=(1,1)),
            nn.BatchNorm(),
            nn.Activation('relu'),

            nn.Conv2D(channels=64, kernel_size=(3,3), strides=(2,2), padding=(1,1)),
            nn.BatchNorm(),
            nn.Activation('relu'),

            nn.Conv2D(channels=128, kernel_size=(3,3), strides=(1,1), padding=(1,1)),
            nn.BatchNorm(),
            nn.Activation('relu'),

            nn.MaxPool2D(pool_size=(2,2), strides=(2,2)),

            nn.Flatten(),

            nn.Dense(units=512, activation=None),
            nn.Dense(units=self.pts_num*2, activation=None)
        )

    def hybrid_forward(self, F, x):
        return self.lmks_net(x)

if __name__ == '__main__':
    x   = mx.nd.random.uniform(0.0, 1.0, shape=(1, 3, 96, 96))
    net = BASE(num_of_pts=98)
    net.initialize(init=mx.initializer.Xavier())
    net.summary(x)

网络训练

网络训练我们采用固定学习率0.0001,adam优化算法进行优化,loss采用的是MSE。mxnet的gluon也提供了非常方便的工具和接口进行调用,代码如下:

import mxnet as mx
from models.NPFLD import NPFLD
from models.CPFLD import CPFLD
from models.BASE import BASE
from models.MSBASE import MSBASE
import numpy as np
from mxnet import nd
from mxnet import autograd
import os
import sys
import math
import cv2
import argparse

def preprocess(data):
    data = (data-123.0) / 58.0
    return data

if __name__ == '__main__':

    parser = argparse.ArgumentParser(description="pfld landmarks detector")
    parser.add_argument("--output_dir", type = str, default = None)
    parser.add_argument("--pretrain_param", type = str, default = None)
    parser.add_argument("--train_data_path", type = str, default = None)
    parser.add_argument("--valid_data_path", type = str, default = None)
    parser.add_argument("--learning_rate", type = float, default = 0.0001)
    parser.add_argument("--batch_size", type = int, default = 128)
    parser.add_argument("--epoches", type = int, default = 1000)
    parser.add_argument("--gpu_ids", type = str, default = "0,1")
    parser.add_argument("--image_size", type = int, default = 112)
    parser.add_argument("--num_of_pts", type = int, default = 98)
    parser.add_argument("--model_type", type = str, default = 'NPFLD')
    parser.add_argument("--logfile_name", type = str, default = 'log.txt')
    parser.add_argument("--with_angle_loss", type = str, default = 1)
    parser.add_argument("--with_category_loss", type = int, default = 0)
    parser.add_argument("--alpha", type = float, default = 1.0)
    args = parser.parse_args()


    train_data_file = args.train_data_path   
    valid_data_file = args.valid_data_path
    output_dir = args.output_dir

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    use_gpu = None
    devices = []
    if 'None' in args.gpu_ids:
        use_gpu = False
        devices.append(mx.cpu())
    else:
        use_gpu = True
        gpu_infos = args.gpu_ids.split(',')
        for gi in gpu_infos:
            devices.append(mx.gpu(int(gi)))
    
    image_size    = args.image_size
    batch_size    = args.batch_size
    epoches       = args.epoches
    base_lr       = args.learning_rate
    pts_num       = args.num_of_pts
    alpha         = args.alpha
    model_type    = args.model_type
    with_category = args.with_category_loss
    with_angle    = args.with_angle_loss
    logF_name     = os.path.join(output_dir, args.logfile_name)

    net = BASE(num_of_pts=pts_num)

    net.initialize(mx.init.Normal(sigma=0.001), ctx=devices, force_reinit=True)

    net.hybridize()
    if args.pretrain_param is not None:
        net.load_parameters(args.pretrain_param)

    huber_loss = mx.gluon.loss.HuberLoss(rho=5)
    mse_loss   = mx.gluon.loss.L2Loss()
    lmks_metric  = mx.metric.MAE()
    angs_metric  = mx.metric.MAE()


    lr_epoch   = []
    train_iter = mx.io.ImageRecordIter(
        path_imgrec=train_data_file, 
        data_shape=(3, image_size, image_size), 
        batch_size=batch_size,
        label_width=205,
        shuffle = True,
        shuffle_chunk_size = 1024,
        seed = 1234, 
        prefetch_buffer = 10, 
        preprocess_threads = 16
    )

       
    ## trainning
    trainer = mx.gluon.Trainer(
        params=net.collect_params(),
        optimizer='adam',
        optimizer_params={'learning_rate': base_lr}
    )

    for epoch in range(0, epoches):
        # reset data iterator
        train_iter.reset()
        batch_idx = 0
        for batch in train_iter:
            batch_idx += 1
            batch_size = batch.data[0].shape[0]
            data   = batch.data[0]
            data   = preprocess(data)
            labels = batch.label[0]
            lmks = labels[:, 0:98*2] * image_size
            cate = labels[:, 2*98+1:2*98+6]
            angs = labels[:, -3:] * np.pi / 180.0

            cat_ratios = nd.mean(cate, axis=0)
            cat_ratios = (cat_ratios > 0.0)  * (1.0 / (cat_ratios+0.00001))
            cate       = cate * cat_ratios
            cate       = nd.sum(cate, axis=1) 
            cate       = (cate <= 0.0001) * 1 + cate

            data_list = mx.gluon.utils.split_and_load(data, ctx_list=devices, even_split=False)
            lmks_list = mx.gluon.utils.split_and_load(lmks, ctx_list=devices, even_split=False)
            angs_list = mx.gluon.utils.split_and_load(angs, ctx_list=devices, even_split=False)
            cate_list = mx.gluon.utils.split_and_load(cate, ctx_list=devices, even_split=False)
            loss_list = []

            with mx.autograd.record():
                for data, lmks, angs, cate in zip(data_list, lmks_list, angs_list, cate_list):
                    lmks_regs = net(data)
                    lmks_regs = nd.Flatten(lmks_regs)

                    lmks_loss = nd.square(lmks_regs - lmks)
                    lmks_loss = nd.sum(lmks_loss, axis=1)

                    loss = lmks_loss
                    
                    if with_category:
                        loss = loss * cate

                    loss_list.append(loss)
                    lmks_metric.update(lmks, lmks_regs)
            for loss in loss_list:
                loss.backward()
            trainer.step(batch_size=batch_size, ignore_stale_grad=True)
            batch_loss = sum([l.sum().asscalar() for l in loss_list]) / batch_size
        net.export(os.path.join(output_dir, 'lmks_detector'), epoch=epoch+1)
        lmks_metric.reset()

  

最后

本文介绍了基于简单小网络的人脸关键点检测,采用mxnet实现起来相当方便,在WFLW上面的NME在0.07,FR在0.17左右,如果采用更合理的data augmentation策略,还可以取得更好的结果。后续的文章我们将会介绍如何采用 MNN 等inference框架来进行端上的人脸检测器部署。另外欢迎大家留言讨论、关注本专栏及公众号,谢谢大家!


参考阅读

专注嵌入式端的AI算法实现,欢迎关注作者微信公众号和知乎嵌入式AI算法实现专栏

WX20200305-192544.png
更多嵌入式AI相关的技术文章请关注极术嵌入式AI专栏

推荐阅读
关注数
18808
内容数
1351
嵌入式端AI,包括AI算法在推理框架Tengine,MNN,NCNN,PaddlePaddle及相关芯片上的实现。欢迎加入微信交流群,微信号:aijishu20(备注:嵌入式)
目录
极术微信服务号
关注极术微信号
实时接收点赞提醒和评论通知
安谋科技学堂公众号
关注安谋科技学堂
实时获取安谋科技及 Arm 教学资源
安谋科技招聘公众号
关注安谋科技招聘
实时获取安谋科技中国职位信息