傻孩子(GorgonMeducer) · 2022年04月19日

实时性迷思(6)——如何进行跨任务性能分析

b69bade0481d2a66f565030f7fac01b8.jpg

【说在前面的话】

在前一篇文章《实时性迷思(5)——实战RTOS多任务性能分析》中,我们介绍了如何在多任务环境下利用perf_counter“排除多任务穿插的影响”——精确测量某一任务中指定代码片消耗CPU周期数的方法。还没有阅读过这篇文章的小伙伴可以单击这里,今天的内容将在这一基础上继续深入。

在实际应用中,很多数据处理的过程(或者是算法)基本都是由多个步骤构成,假设我们把每个步骤都“简单粗暴”的看做(或者放入)一个函数中的话,为了方便今天的讨论,它们可以被简化为如下的形式:

step_1();
step_2();
...
step_n();

如果这些步骤总是

我们说好不分离~要一直一直在一起~

1fd1228a1da521b906d1c958b1614f7d.jpg

那么测量起来就非常简单:

  • 在裸机中,我们可以使用__cycleof__()
__cycleof__() {
    step_1();
    step_2();
    ...
    step_n();
}

也可以简单的使用系统提供的API函数:

start_cycle_counter();
do {
    step_1();
    step_2();
    ...
    step_n();
} while(0);
int32_t iCycleUsed = stop_cycle_counter();
  • 在RTOS环境下,我们可以使用上一篇文章介绍过的专用函数start_task_cycle_counter()stop_task_cycle_counter()来获取周期数信息:
void example_task (void *argument) 
{
    init_task_cycle_counter();
    ...
    start_task_cycle_counter();
    do {
        step_1();
        step_2();
        ...
        step_n();
    } while(0);
    int64_t lCycleUsed = stop_task_cyclee_counter();
    ...
} 

如果你还不清楚问题的全貌,不妨看下面几张图:

一开始,在裸机中所有的步骤都是在一起的:

48b843227c5721ac76ea8a133688dbdf.png

(理论上中断一关)我们就可以轻松的测量多个步骤所使用的CPU周期数,此时测量是连续进行的。当我们进入多任务环境时,虽然多个步骤仍然集中在同一个任务里,但由于“任务调度的存在”,实际上的情况就变得复杂起来:

d13e0f1f3c9bf7630963a1065061efe7.png

借助每个任务独立的start_task_cycle_counter()stop_task_cycle_counter() 我们得以“无视”任务调度带来的影响——只专注于本任务内指定范围的代码块所消耗的CPU时间

可是,现实是残酷的,发誓要“一直一直在一起”的“好朋友”也难免要各奔东西,更何况是同一个数据处理中的不同步骤呢?——各种各样的原因都会促使多任务应用设计时将不同的步骤分散到不同的任务中,比如:

  • 不同的步骤拥有不同的实时性要求
  • 不同的步骤处于不同的模块中
  • 不同的步骤处于不同的安全域中
  • 考虑到未来扩展的需要,认为的需要将步骤拆散并放置到不同的任务中
  • 不同的步骤处于数据流的不同位置
  • ……

此时,我们又该如何简单的测量这些分散在不同任务中的步骤所消耗的总CPU周期数呢?
33723631975b00b974cd04bda7d1ed1e.jpg

【“活干完了告诉我,我先睡会儿”】

在简单的多任务合作模式中,如下图所示的“主从合作模式”是最常见的异步工作模式:
a5fdf941ae4d67f15c9564baa737fba2.png

这里:

  1. 从任务的目的是从主任务那里分担一部分的工作;
  2. 从任务会在任务启动后完成必要的准备工作后就开始等候来自于主任务的信号量,此时,从任务是处于挂起状态;
  3. 一般来说,从任务应该在主任务发送信号量之前就完成所有的准备工作——否则,主任务就有“触发了个寂寞”的风险;
  4. 当从任务接收到来自主任务的信号量后,将从挂起状态中唤醒,开始正常工作;
  5. 当从任务完成本职工作后,会向主任务发送一个信号量——告诉它“你交代的事情我已经做完啦,我先睡会儿,有事您说话”——然后等待下一次主任务的信号量,并由此进入挂起状态。
  6. 主任务一般会在对应的阶段向从任务发送信号量,以启动异步处理,这就好比是对从任务说:“来活儿了,快醒醒,我先睡会儿”,然后就进入挂起状态——等待来自从任务的完成信号。

如果觉得上述步骤比较抽象,不妨来看一个实际的例子。假设一个数据处理可以被拆分成三个步骤——为了简化讨论,分别由三个函数step_1()step_2()step_3()表示:

void step_1(void)
{
    delay_ms(1);
}

void step_2(void)
{
    delay_ms(2);
}

void step_3(void)
{
    delay_ms(3);
}

这里,每个步骤都是用由 _perf_counter_提供的 _delay_ms() _函数来模拟一个任务负载(注意:该_delay_ms()_函数不会引发RTOS的任务调度)。

假设三个步骤需要在一个任务中以10ms为间隔周期性的进行执行(以CMSIS-RTOS2的API为范例):


osThreadId_t s_tidTaskA;
osThreadId_t s_tidTaskB;

void task_a (void *argument) 
{
    ...
    
    while (1) {
        // 获取本次循环开始时的系统毫秒数
        uint32_t wTick = osKernelGetTickCount();
    
        step_1();
        step_2(); 
        step_3();
    
        //!< 100Hz 的周期性任务
        osDelayUntil(wTick + 10); 
    }
}

根据上一篇文章的内容,我们可以很容易的测量出三个步骤的CPU占用率:


osThreadId_t s_tidTaskA;
osThreadId_t s_tidTaskB;

void task_a (void *argument) 
{
    init_task_cycle_counter();
    ...
    __super_loop_monitor__(100) {
        // 获取本次循环开始时的系统毫秒数
        uint32_t wTick = osKernelGetTickCount();
    
        step_1();
        step_2(); 
        step_3();
    
        //!< 100Hz 的周期性任务
        osDelayUntil(wTick + 10); 
    }
}

由于三个步骤中负载所占用的时间分别为1ms、2ms、3ms,因此在10ms的循环周期中,容易计算出这里的CPU占用率为60%,而perf_counter的测量结果也应征了这一结论(100次循环的平均结果):

1d685694fedaf8ebb8c6ad538194be4b.png

假设处于某种原因,步骤2必须要放置到一个独立的从任务中执行,根据前面的描述,对应的代码为:


osThreadId_t s_tidTaskA;
osThreadId_t s_tidTaskB;

void task_a (void *argument) 
{
    init_task_cycle_counter();
    ...
    __super_loop_monitor__(100) {
    
        uint32_t wTick = osKernelGetTickCount();
    
        step_1();
        
        //! 向从任务发送信号量,催其起床
        osThreadFlagsSet(s_tidTaskB, 0x0001);
        //! 等待从任务完成,挂起当前任务
        osThreadFlagsWait(0x0002, osFlagsWaitAll, osWaitForever);
        
        step_3();
    
        //!< 100Hz 的周期性任务
        osDelayUntil(wTick + 10); 
    }
}

void task_b (void *argument) 
{
    init_task_cycle_counter();
    ...
    
    while(1) {
        //! 等待来自主任务的信号量
        osThreadFlagsWait(0x0001, osFlagsWaitAll, osWaitForever);
        
        //! 干活
        step_2();
        
        //! 向主任务发送完成信号
        osThreadFlagsSet(s_tidTaskA, 0x0002);
    }
}

这里有两点需要注意:

1、为了保证主任务不会“触发了个寂寞”,task_b()需要先于task_a()启动,(或者最保险的方式是将从任务的优先级设置的“大于等于”主任务。)比如:


int main (void) {

  // System Initialization
  SystemCoreClockUpdate();

  osKernelInitialize();                 // Initialize CMSIS-RTOS
    
  init_cycle_counter(true);
  
  s_tidTaskB = osThreadNew(task_b, NULL, NULL);  
  s_tidTaskA = osThreadNew(task_a, NULL, NULL);
  
  ...
  
  if (osKernelGetState() == osKernelReady) {
    osKernelStart();                    // Start thread execution
  }

  while(1);
}

2、别忘在两个任务的一开始使用_ init_task_cycle_counter()_初始化对应任务的 cycle counter。

然而,经过上述修改后,我们发现实际测量到的 CPU 占用率为40%
e288a81ef25bdf859a9555b78e6205a8.png

显然,该值由主任务中step_1()1msstep_3()3ms构成,而从任务中 step_2()所消耗的时间则没有比计算在内——这就是跨任务周期计数的问题所在

为了应对这一问题,perf_counter 专门引入了可以跨越多个任务进行计数的计数器类 task_cycle_info_t,配合对应的方法(API函数)使用:

  • 构造函数(初始化函数):_init_task_cycle_info()_
/*! \brief intialize a given task_cycle_info_t object and enable it
 */
extern 
task_cycle_info_t *
init_task_cycle_info(task_cycle_info_t *ptInfo);
  • 在“涉事”任务内使用的注册和反注册:_register_task_cycle_agent()_和 unregister_task_cycle_agent()

/*! \brief register a global virtual cycle counter agent to the current task
 *! 
 *! \note the ptAgent it is better to be allocated as a static variable, global
 *!       variable or comes from heap or pool
 */
extern
task_cycle_info_agent_t *
register_task_cycle_agent(
    task_cycle_info_t *ptInfo,
    task_cycle_info_agent_t *ptAgent);

/*! \brief remove a global virtual cycle counter agent from the current task
 */
extern
task_cycle_info_agent_t *
unregister_task_cycle_agent(task_cycle_info_agent_t *ptAgent);
  • 在_perf_counter _的头文件中还能找到其它更为精细控制的方法,比如“使能开关相关的函数”等等,这里就不再赘述。

task_cycle_info_t类的使用也非常简单:

1、以静态(或者堆、池)分配的方式获得一个 task_cycle_info_t 类的实例。比如定义一个静态变量:

static task_cycel_info_t s_tMyCycleInfo;

2、在所有相关的任务启动前,对其进行初始化(完成构造):


int main (void) 
{

  // System Initialization
  SystemCoreClockUpdate();

  osKernelInitialize();                 // Initialize CMSIS-RTOS
    
  init_cycle_counter(true);
    
  init_task_cycle_info(&s_tMyCycleInfo);
  
  s_tidTaskB = osThreadNew(task_b, NULL, NULL);  
  s_tidTaskA = osThreadNew(task_a, NULL, NULL);
  
  
  if (osKernelGetState() == osKernelReady) {
    osKernelStart();                    // Start thread execution
  }

  while(1);
}

3、在所有“涉事”任务中,调用函数register_task_cycle_agent()注册我们的计数器实例,比如:


void task_a (void *argument) 
{
    int64_t lTimeElapsed;
    
    init_task_cycle_counter();
    
    task_cycle_info_agent_t tCycleInfoAgent;
    register_task_cycle_agent(&s_tMyCycleInfo, &tCycleInfoAgent);
    start_task_cycle_counter(&s_tMyCycleInfo);
    
    ...
}


void task_b (void *argument) 
{
    init_task_cycle_counter();
    
    task_cycle_info_agent_t tCycleInfoAgent;
    register_task_cycle_agent(&s_tMyCycleInfo, &tCycleInfoAgent);
    
    while(1) {
        osThreadFlagsWait(0x0001, osFlagsWaitAll, osWaitForever);
        step_2();
        osThreadFlagsSet(s_tidTaskA, 0x0002);
    }
}

这里需要注意:

  • 前面的例子中,涉事的任务是task_a()task_b()因此,这两个任务函数在完成了init_task_cycle_counter()后,都要调用register_task_cycle_agent()函数来注册s_tMyCycleInfo
  • 注册时,需要借助一个task_cycle_info_agent_t的链表容器,帮助我们将task_cycle_info_t的实例加入到 每个任务自己的计数器链表中。这实际上也告诉我们,一个任务可以同时挂载多个不同的task_cycle_info_t实例——换句话说:每个任务都能同时服务多个不同目的的跨任务计数器,是不是很强大?
task_cycle_info_agent_t tCycleInfoAgent;
register_task_cycle_agent(&s_tMyCycleInfo, &tCycleInfoAgent);

4、在开始计数时,通过start_task_cycle_counter()来启动我们的计数器:

start_task_cycle_counter(&s_tMyCycleInfo);

同理,在需要获得计数结果的时候,调用stop_task_cycle_counter()来获取计数结果:

int64_t lCycleUsed = stop_task_cycle_counter(&s_tMyCycleInfo);

这里,细心的小伙伴多半会注意到:这两个函数之前使用的时候不是不需要传递参数么?为什么又可以传递task_cycle_info_t类型的指针作为参数呢?其实这里使用了一个此前介绍过的技巧,还不太了解的小伙伴,可以参考这篇文章《【为宏正名】99%人都不知道的"##"里用法》,这里就不再赘述:


extern 
void __start_task_cycle_counter(task_cycle_info_t *ptInfo);

extern 
int64_t __stop_task_cycle_counter(task_cycle_info_t *ptInfo);


#define start_task_cycle_counter(...)                                           
            __start_task_cycle_counter((NULL,##__VA_ARGS__))

#define stop_task_cycle_counter(...)                                            
            __stop_task_cycle_counter((NULL,##__VA_ARGS__)) 

对应前面的例子,一个完整的示例代码如下:

void step_1(void)
{
    delay_ms(1);
}

void step_2(void)
{
    delay_ms(2);
}

void step_3(void)
{
    delay_ms(3);
}


osThreadId_t s_tidTaskA;
osThreadId_t s_tidTaskB;

task_cycle_info_t s_tMyCycleInfo;


void task_a (void *argument) 
{
    int64_t lTimeElapsed;
    
    init_task_cycle_counter();
    task_cycle_info_agent_t tCycleInfoAgent;
    register_task_cycle_agent(&s_tMyCycleInfo, &tCycleInfoAgent);
    start_task_cycle_counter(&s_tMyCycleInfo);
    
    __super_loop_monitor__(100, 
    {
        lTimeElapsed = __cpu_usage__.lTimeElapsed;
        int64_t lCycleUsed = stop_task_cycle_counter(&s_tMyCycleInfo);
        
        printf("s_tMyCycleInfo CPU Usage %2.3f%%\r\n",                    
                        (float)((double)lCycleUsed* 100.0 / 
                                (double)__cpu_usage__.lTimeElapsed));
        start_task_cycle_counter(&s_tMyCycleInfo);
    }) {
    
        uint32_t wTick = osKernelGetTickCount();
    
        step_1();
        
        //! 向从任务发送信号量,催其起床
        osThreadFlagsSet(s_tidTaskB, 0x0001);
        //! 等待从任务完成,挂起当前任务
        osThreadFlagsWait(0x0002, osFlagsWaitAll, osWaitForever);
        
        step_3();
    
        osDelayUntil(wTick + 10);           //!< 50Hz
    }
}

void task_b (void *argument) 
{
    init_task_cycle_counter();
    
    task_cycle_info_agent_t tCycleInfoAgent;
    register_task_cycle_agent(&s_tMyCycleInfo, &tCycleInfoAgent);
    
    while(1) {
        //! 等待来自主任务的信号量
        osThreadFlagsWait(0x0001, osFlagsWaitAll, osWaitForever);
        
        //! 干活
        step_2();
        
        //! 向主任务发送完成信号
        osThreadFlagsSet(s_tidTaskA, 0x0002);
    }
}

int main (void) 
{

    // System Initialization
    SystemCoreClockUpdate();
    
    osKernelInitialize();                 // Initialize CMSIS-RTOS
      
    init_cycle_counter(true);
      
    init_task_cycle_info(&s_tMyCycleInfo);
    
    s_tidTaskB = osThreadNew(task_b, NULL, NULL);  
    s_tidTaskA = osThreadNew(task_a, NULL, NULL);
    
    
    if (osKernelGetState() == osKernelReady) {
      osKernelStart();                    // Start thread execution
    }
    
    while(1);
}

运行结果如下:

8c44da40dfb33828def4d9b33dfbdb10.png

可以看到,三个步骤的任务负载(1+2+3=6ms)都被计算在内。
10f1cff3c26e61e336f6515bff9084dd.jpg

友情提示:如果你对__super_loop_monitor__()结构的用法感到迷惑,可以单击这里

【流水线模式下的性能测量】

除了前面介绍的简单“主从”模式外,多任务环境下往往还存在另外一种类似流水线的多任务“接力”模式:

9411dd0c6c95920778ebd03655a4acda.png

这种任务模式在“数据流图”上往往呈现“百川汇海”模式,有时候,我们需要沿着其中一条线索完成从源头到末端的性能分析,而它的“事件触发图”大体如下:

e1bc3cdf5a1e31eb8cfad066304006af.png

这种情况下,我们仍然可以使用 task_cycle_info_t来测量整个工序的耗时(周期数),只不过需要注意的是:

  • 我们要在工序开始的地方调用start_task_cycle_counter()来开始计数;在工序结束的地方调用stop_task_cycle_counter()来获取测量结果。
  • 由于start_task_cycle_counter()会清零计数器,因此要在源头处作必要的保护——防止在一次完整的测量结束前,过早的调用start_task_cycle_counter()。利用RTOS所提供的互斥量,我们可以轻松的实现这一功能,这里就不再赘述。
  • 测量的结果可以通过SystemCoreClock中保存的CPU工作频率换算成物理时间(ms或者us):
int64_t lCycleUsed = stop_task_cycle_counter(&s_tMyCycleInfo);

printf("Pipeline used %d ms", lCycleUsed / (SystemCoreClock / 1000) );
  • 有时候,进行性能分析需要暂时性的(或者有条件的)关闭某一计数器,此时灵活使用 使能开关函数 对task_cycle_info_t对象进行操作就成为了关键。

【说在后面的话】

跨任务性能测量是perf_counter所提供的“拳头功能”,可以说目前在市面上针对Cortex-M的开源工具中,还鲜有类似的功能。虽然关注【裸机思维】公众号后,在后台发送关键字 "perf_counter" 就可以获得对应 CMSIS-Pack 的网盘链接和相关的教程,但作为一个Github上的开源项目,我还是希望喜欢该工具的小伙伴能给我一个宝贵的Star。复制下面的链接到浏览器就可以找到该项目。

https://github.com/GorgonMedu...

24267ecedf56b8e861019c7af29fd7dd.png

谢谢啦。

原文:裸机思维
作者:GorgonMeducer 傻孩子

专栏推荐文章

如果你喜欢我的思维,欢迎订阅裸机思维
推荐阅读
关注数
1480
内容数
118
探讨嵌入式系统开发的相关思维、方法、技巧。
目录
极术微信服务号
关注极术微信号
实时接收点赞提醒和评论通知
安谋科技学堂公众号
关注安谋科技学堂
实时获取安谋科技及 Arm 教学资源
安谋科技招聘公众号
关注安谋科技招聘
实时获取安谋科技中国职位信息