vesperW · 3月25日

分享一个嵌入式开发的轻量级消息库

做嵌入式开发,很多时候都会用到“消息通信”。今天分享一个嵌入式开发的轻量级消息库:ZeroMQ(简称 ZMQ)

作为分布式系统通信的“隐藏王者”,凭借其独特设计理念在物联网、微服务、金融交易等领域广泛应用。本文将从核心特性剖析入手,带你掌握 Linux C 环境下的 ZMQ 开发全流程。

一、ZeroMQ 是什么?

ZeroMQ 不是传统意义上的消息队列(如 RabbitMQ),而是一个嵌入式网络通信库。它通过 TCP/IPC 等传输协议,提供类似 Socket 的 API 接口,却实现了异步消息传递、灵活路由、负载均衡等高级特性,被誉为“网络通信的瑞士军刀”。

二、ZeroMQ 的七大核心特性

  1. 异步非阻塞 IO
    基于事件驱动的消息处理,单线程即可处理数万并发连接。
  2. 多模式通信
/* 支持9种通信模式 */
ZMQ_REQ  // 同步请求
ZMQ_REP   // 同步响应
ZMQ_PUB   // 消息发布
ZMQ_SUB   // 消息订阅
ZMQ_PUSH  // 负载分发
ZMQ_PULL  // 负载收集
// ...其他模式
  1. 去中心化架构
    无需中间代理服务器,节点间直接通信。
  2. 多传输协议支持
tcp://192.168.1.1:5555   # TCP网络传输
ipc:///tmp/feeds.sock     # 进程间通信
inproc://worker_pool      # 线程间通信
  1. 智能消息路由
    自动处理连接重试、队列缓冲、负载均衡。
  2. 多语言支持
    C/C++、Python、Java、Go 等 40+语言绑定。
  3. 轻量高效
    核心库仅 300KB,吞吐量可达百万级消息/秒。

三、环境搭建(Linux C)

下面手把手教大家在 Linux C 下使用 ZMQ 库:

3.1 安装开发包

# Ubuntu/Debian
sudo apt install libzmq3-dev build-essential

# 验证安装
pkg-config --modversion libzmq  # 输出类似4.3.4

3.2 基础编译命令

gcc -o demo demo.c -lzmq -lpthread

四、三大经典模式

下面正对 ZMQ 库非常经典的三种模式进行一个简单的使用介绍:

请求-响应模式(REQ-REP)

服务端(rep_server.c)

#include <zmq.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <unistd.h>

#define CHECK_STATUS(call) if (call == -1) { printf("Error at %s:%d\n", __FILE__, __LINE__); exit(1); }

int main() {
    // 创建上下文
    void *context = zmq_ctx_new();
    assert(context);

    // 创建REP套接字
    void *responder = zmq_socket(context, ZMQ_REP);
    assert(responder);
    
    // 绑定端口
    int rc = zmq_bind(responder, "tcp://*:5555");
    CHECK_STATUS(rc);

    printf("服务端启动,等待请求...\n");
    while (1) {
        char buffer[256];
        
        // 接收请求(阻塞模式)
        int size = zmq_recv(responder, buffer, 255, 0);
        CHECK_STATUS(size);
        buffer[size] = '\0';
        printf("收到请求: %s\n", buffer);

        // 处理业务逻辑
        sleep(1); // 模拟处理耗时
        
        // 发送响应
        constchar *response = "处理结果:OK";
        size = zmq_send(responder, response, strlen(response), 0);
        CHECK_STATUS(size);
    }

    // 清理资源(实际不会执行到此处)
    zmq_close(responder);
    zmq_ctx_destroy(context);
    return0;
}

客户端(req_client.c)

#include <zmq.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>

int main() {
    void *context = zmq_ctx_new();
    assert(context);

    void *requester = zmq_socket(context, ZMQ_REQ);
    assert(requester);
    
    printf("连接服务端...\n");
    int rc = zmq_connect(requester, "tcp://localhost:5555");
    assert(rc == 0);

    for (int i = 0; i < 5; i++) {
        char request[50];
        snprintf(request, 50, "请求#%d", i+1);
        
        // 发送请求
        printf("发送: %s\n", request);
        rc = zmq_send(requester, request, strlen(request), 0);
        assert(rc != -1);

        // 接收响应
        char buffer[256];
        rc = zmq_recv(requester, buffer, 255, 0);
        assert(rc != -1);
        buffer[rc] = '\0';
        printf("收到响应: %s\n\n", buffer);
    }

    zmq_close(requester);
    zmq_ctx_destroy(context);
    return0;
}

编译运行

# 服务端
gcc rep_server.c -o server -lzmq
./server

# 客户端
gcc req_client.c -o client -lzmq
./client

发布-订阅模式(PUB-SUB)

发布者(pub_server.c)

#include <zmq.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <time.h>
#include <unistd.h>

int main() {
    void *context = zmq_ctx_new();
    void *publisher = zmq_socket(context, ZMQ_PUB);
    assert(publisher);

    int rc = zmq_bind(publisher, "tcp://*:6000");
    assert(rc == 0);

    printf("数据发布中...\n");
    while (1) {
        // 生成两种消息类型
        time_t now = time(NULL);
        
        // 温度消息
        char temp_msg[50];
        snprintf(temp_msg, 50, "TEMPERATURE %.2f", 25.0 + (rand()%100)/100.0);
        zmq_send(publisher, temp_msg, strlen(temp_msg), 0);

        // 湿度消息
        char humi_msg[50];
        snprintf(humi_msg, 50, "HUMIDITY %.2f", 60.0 + (rand()%100)/100.0);
        zmq_send(publisher, humi_msg, strlen(humi_msg), 0);

        sleep(1); // 每秒更新
    }

    zmq_close(publisher);
    zmq_ctx_destroy(context);
    return0;
}

订阅者(sub_client.c)

#include <zmq.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>

int main(int argc, char *argv[]) {
    if (argc < 2) {
        printf("Usage: %s [TEMPERATURE|HUMIDITY]\n", argv[0]);
        return1;
    }

    void *context = zmq_ctx_new();
    void *subscriber = zmq_socket(context, ZMQ_SUB);
    assert(subscriber);

    int rc = zmq_connect(subscriber, "tcp://localhost:6000");
    assert(rc == 0);

    // 设置订阅过滤器
    rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, argv[1], strlen(argv[1]));
    assert(rc == 0);

    printf("订阅 [%s] 数据...\n", argv[1]);
    while (1) {
        char buffer[256];
        int size = zmq_recv(subscriber, buffer, 255, 0);
        buffer[size] = '\0';
        printf("收到数据: %s\n", buffer);
    }

    zmq_close(subscriber);
    zmq_ctx_destroy(context);
    return0;
}

运行示例

# 发布者
gcc pub_server.c -o publisher -lzmq
./publisher

# 温度订阅者
gcc sub_client.c -o subscriber -lzmq
./subscriber TEMPERATURE

# 湿度订阅者
./subscriber HUMIDITY

流水线模式(PUSH-PULL)

任务分发器(ventilator.c)

#include <zmq.h>
#include <stdio.h>
#include <assert.h>
#include <unistd.h>

int main() {
    void *context = zmq_ctx_new();
    void *sender = zmq_socket(context, ZMQ_PUSH);
    assert(sender);

    // 绑定推送端口
    int rc = zmq_bind(sender, "tcp://*:7000");
    assert(rc == 0);

    printf("任务分发准备就绪\n");
    sleep(1); // 等待worker连接

    // 生成100个计算任务
    for (int task_id = 0; task_id < 100; task_id++) {
        char msg[20];
        snprintf(msg, 20, "TASK:%04d", task_id);
        
        rc = zmq_send(sender, msg, strlen(msg), 0);
        assert(rc != -1);
        
        usleep(100000); // 0.1秒间隔
    }

    // 发送结束信号
    rc = zmq_send(sender, "EXIT", 4, 0);
    assert(rc != -1);

    zmq_close(sender);
    zmq_ctx_destroy(context);
    return0;
}

工作者(worker.c)

#include <zmq.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <unistd.h>

int main() {
    void *context = zmq_ctx_new();
    void *receiver = zmq_socket(context, ZMQ_PULL);
    assert(receiver);

    int rc = zmq_connect(receiver, "tcp://localhost:7000");
    assert(rc == 0);

    printf("工作者就绪\n");
    while (1) {
        char buffer[256];
        int size = zmq_recv(receiver, buffer, 255, 0);
        buffer[size] = '\0';

        if (strcmp(buffer, "EXIT") == 0) break;

        printf("处理任务: %s\n", buffer);
        usleep(500000); // 模拟处理耗时
    }

    printf("收到退出指令\n");
    zmq_close(receiver);
    zmq_ctx_destroy(context);
    return0;
}

运行步骤

# 启动分发器
gcc ventilator.c -o ventilator -lzmq
./ventilator

# 启动多个工作者(另开终端)
gcc worker.c -o worker -lzmq
./worker

这三种经典的通信模型,也是我们在实际项目中经常使用,还有其他相关的特点,大家可以自行摸索~

END

来源:嵌入式专栏

推荐阅读

欢迎大家点赞留言,更多 Arm 技术文章动态请关注极术社区嵌入式客栈专栏欢迎添加极术小姐姐微信(id:aijishu20)加入技术交流群,请备注研究方向。

推荐阅读
关注数
2914
内容数
341
分享一些在嵌入式应用开发方面的浅见,广交朋友
目录
极术微信服务号
关注极术微信号
实时接收点赞提醒和评论通知
安谋科技学堂公众号
关注安谋科技学堂
实时获取安谋科技及 Arm 教学资源
安谋科技招聘公众号
关注安谋科技招聘
实时获取安谋科技中国职位信息