本文主要介绍实时流处理框架Apache Edgent。
作者:与子同袍
首发:物联网前沿技术观察
Edgent简介
实时流数据处理框架Apache Edgent项目前身是Apache Quarks,是IBM于2016年捐赠给Apache软件基金会的开源项目。
为什么叫Edgent?我猜是Edge + Agent合体而成。
它跟一般的实时流数据处理框架的区别,在于运行在资源有限的边缘计算节点内,并且能与多种后端服务对接。
边缘计算节点的计算资源有限,很多都是单片机或嵌入式网关。
Spark、Storm、Kafka、Flink 等实时流数据分析框架主要运行在分布式服务器集群上,在边缘计算的受限资源环境下无法运行。
而Apache Edgent,则可以轻松运行于资源有限的计算节点。
为什么需要在边缘层进行实时流数据处理?
把所有数据都上传到云端分析,时效性差、传输成本高。
万一断网,靠离线缓存也存不了多少数据。
那怎么破?
著名计算机科学家David John Wheeler说过这样的名言:
计算机科学领域的任何问题都可以通过增加一个间接的中间层来解决。(All problems in computer science can be solved by another level of indirection.)
既然云端处理不方便,那就在云和设备中间加一层中间层——贴近设备侧对采集的设备数据进行实时分析处理,并在必要的时候把数据上传到云端。
Apache Edgent的设计就是按照这个思路来实现的。
从下图我们可以看出,Edgent位于网关层,向下连接物联网设备,向上通过网络连接云端服务。处于承上启下的中间位置。
用Edgent在边缘计算层实时流处理带来的价值
- 降低通讯成本和传输到服务器端的数据量。
- 本地及时响应,提高时效性。
- 边缘计算节点在断网情况下,也能处理数据。
- 减轻服务器端的处理和存储的压力。
Edgent的主要功能
Edgent让我们可以从原来的持续不断发送数据到服务器,变成只在问题发生时发送重要的有意义的数据。
基于Edgent框架开发的应用程序,对实时数据进行流分析。并在需要的时候,发送数据到后端系统用于进一步的分析或存储。
例如,我们可以使用Edgent的实时流分析,来确定设备是不是在合法的参数范围内运行——如发动机是否过热。并把发动机过热时的温度数据上传的云端用于存储和分析。
如果系统运行正常,我们就不需要把设备数据发送到后端系统。因为这对于后端系统的处理和存储来说,是额外的成本和压力。
但是,一旦Edgent检测到问题,就会把有问题的数据传输到后端系统,用于分析问题为什么发生,如何解决问题。
Edgent 主要应用场景
- IoT:对边缘设备和移动设备的数据的实时流分析,从而减少传输数据成本,对设备提供及时的本地的反馈。
- 嵌入到应用服务器实例:实时分析应用服务器的错误日志,而不会影响网络流量。
- 机器健康:实时分析机器的健康,不会影响网络流量。
Edgent 测试过的部署环境
- Java 8,包括树莓派B和Pi2 B
- Java 7
- Android
Edgent与后端系统的通讯方式
- MQTT
- IBM Watson IoT Platform
- Apache Kafka
- HTTP, WebSocket client
- 串口
- JDBC,文件
- 自定义消息总线
Edgent的基本概念
Edgent中重要的抽象有:
- **流(Stream)**Edgent应用程序的基础构件块是流(stream)。流是一个连续的Tuple序列(消息、事件、传感器读数等)。
- Source 是用于分析的数据源,如设备的温度传感器的温度值。使用sink函数可以终止流。
- Sink函数 能够执行本地的设备控制,或者发送信息到外部服务(如通过消息总线发送到后端的分析系统)。
- 拓扑(Topology) 它是流的处理转换的拓扑图。
- 提供者(Provider) 它是创建和执行Topology的工厂。
Edgent的这几个概念跟Apache Flume的很像。相同点是两者都是用来做数据聚合和移动的。不同点是,前者是用来在边缘计算节点下采集设备数据进行聚合和上传;后者是用来在分布式环境下采集,聚合和移动大量日志数据到一个中央仓库。
下图为Flume的架构。
Edgent的API是函数式写法,用Java 8 的lambda表达式表示。开发人员通过调用Edgent的API,可以处理和分析流中的每个 Tuple。
基本的Edgent应用程序遵循一个相同的结构:
- 获取一个Provider
- 生成一个Topology
- 提交这个Topology用于执行
更复杂的应用程序会由多个Topology组成,并且是由外部应用通过命令动态创建和启停。
温度传感器应用案例
如果是第一次使用Edgent,最好的入门方式就是写一个简单的程序。
我们来创建一个简单的温度传感器的应用程序。这个应用从温度传感器读数,每毫秒1次。
采集的温度不必每次都上报到服务器端。我们在本地处理数据。只在有感兴趣的情况或者异常情况才上传数据。这种方式更为高效。
首先我们定义一个温度传感器类。
import java.util.Random;
import org.apache.edgent.function.Supplier;
public class TempSensor implements Supplier<Double> {
double currentTemp = 65.0;
Random rand;
TempSensor(){
rand = new Random();
}
@Override
public Double get() {
double newTemp = rand.nextGaussian() + currentTemp;
currentTemp = newTemp;
return currentTemp;
}
}
每次调用TempSensor.get()时,都会返回一个新的温度读数。
我们然后定义一个TempSensorApplication 类。在这个类里创建一个Topology,用于读取温度值并处理这个流,过滤数据并打印结果。
import java.util.concurrent.TimeUnit;
import org.apache.edgent.providers.direct.DirectProvider;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.Topology;
public class TempSensorApplication {
public static void main(String[] args) throws Exception {
TempSensor sensor = new TempSensor();
DirectProvider dp = new DirectProvider();
Topology topology = dp.newTopology();
TStream<Double> tempReadings = topology.poll(sensor, 1, TimeUnit.MILLISECONDS);
TStream<Double> filteredReadings = tempReadings.filter(reading -> reading < 50 || reading > 80);
filteredReadings.print();
dp.submit(topology);
}
}
最后,进入Edgent的样例程序目录,执行如下命令:
cd topology; ./run-sample.sh TempSensorApplication
就会在终端打印出低于50度或者高于80度的温度值了。
在后续的文章中,我将给大家继续介绍跟Edgent类似的Esper和Apama等边缘计算层的实时流分析引擎。
v0.1 初稿, 2019-04-18
推荐阅读:
更多物联网,边缘计算相关技术干货请关注我的专栏物联网前沿技术观察
申请加入物联网技术研讨大佬微信群,请加微信号:iot1999。