作者简介
Janakiram MSV是Janakiram & Associates的首席分析师,也是国际信息技术学院的兼职教师。他也是Google Qualified Developer、亚马逊认证解决方案架构师、亚马逊认证开发者、亚马逊认证SysOps管理员和微软认证Azure专业人员。
Janakiram是云原生计算基金会(CNCF)的大使,也是首批Kubernetes认证管理员和Kubernetes认证应用开发者之一。他曾在微软、AWS、Gigaom Research等知名公司工作。
图片
在之前的文章中,我讨论了Rancher的轻量级Kubernetes发行版K3s、Calico和Portworx如何成为运行在边缘的现代AI和物联网系统的基础。在本文中我们将设计和部署一个解决方案以运行在这一基础设施上。
我们将基于监控涡轮机的风扇的假设场景来构建一个预测性维护解决方案,以检测风扇的异常情况。该架构利用了各种开源的云原生技术,可以作为设计和构建IoT/边缘解决方案的参考架构。
问题陈述
我们将设计和部署一个解决方案,该解决方案可以从多个风扇摄取遥测数据,并在故障发生之前使用实时流(real-time stream)来预测故障。该解决方案运行在低端机器(如英特尔NUC)的边缘基础设施上。在本文中,我们将使用前文搭建的基础设施(基于K3s、Calico以及Portwox),它们提供了Kubernetes集群的核心组件。
解决方案架构
连接到风扇的传感器提供了当前的转速、振动、温度和噪音水平等数据。这些遥测数据流和每个风扇的设备ID一起作为预测性维护解决方案的输入。
Mosquitto是一款使用广泛的开源MQTT broker,它将作为传感器的网关以及平台的集中式消息broker。传感器将遥测数据摄入Mosquitto broker的fan/messages类别下方。
以下是每个风扇发布到MQTT主题的有效载荷:
预测器微服务和风扇发布的数据在同一个遥测频道,它会从中读取数据。对于每个入站数据点,它都会调用异常检测服务,并将结果发布到一个单独的MQTT主题中,即fan/anomaly。
import time
import requests
import random
import datetime
import json
import os
import paho.mqtt.client as mqtt
broker_address = os.getenv('MQTT_HOST')
dev_topic = os.getenv('MQTT_DEV_TOPIC')
pred_topic = os.getenv('MQTT_PREDICT_TOPIC')
scoring_url=os.getenv('SCORING_URL')
d={}
client = mqtt.Client("pdm")
client.connect(broker_address)
def on_message(mosq, obj, msg):
rotation=json.loads(msg.payload)["rotation"]
temperature=json.loads(msg.payload)["temperature"]
vibration=json.loads(msg.payload)["vibration"]
sound=json.loads(msg.payload)["sound"]
telemetry=[rotation,temperature,vibration,sound]
data={"params":telemetry}
response = requests.post(scoring_url, json=data)
fault=json.loads(response.text)["fault"]
d["deviceID"]=json.loads(msg.payload)["deviceID"]
d["fault"]=fault
payload = json.dumps(d, ensure_ascii=False)
print(payload)
client.publish(pred_topic,payload)
def on_subscribe(mosq, obj, mid, granted_qos):
print("Subscribed: " + str(mid) + " " + str(granted_qos))
client.on_message = on_message
client.on_subscribe = on_subscribe
client.connect(broker_address)
client.subscribe(dev_topic, 0)
while True:
client.loop()
SCORING_URL是异常检测推理服务的一个端点。通过Flask网络服务暴露了一个在TensorFlow中训练的深度学习模型。
下面是预测服务发布到MQTT主题的有效载荷:
训练异常检测模型
用一个超过20000个数据点的历史数据集来训练异常检测模型。
从数据集中观察到,在故障发生前的几个小时,风扇的转速会降低并伴随着振动、声音、温度值的增加。
转速数据的散点图直观地显示了这一点。风扇的转速从正常的平均600转下降到400转。
基于此,我们可以轻松地训练一个简单的TensorFlow逻辑回归模型来预测故障风扇。我们先去掉时间戳和设备ID列。
dataframe = pandas.read_csv("../data/fan.csv", header=None,skiprows=1)
del dataframe[0]
del dataframe[1]
分离特征和标签后,再将数据集分为训练数据和测试数据。
dataset = dataframe.values
X = dataset[:,0:4].astype(float)
y = dataset[:,4]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33)
然后我们创建一个4层神经网络,做逻辑回归。
model = Sequential()
model.add(Dense(60, input_dim=4, activation='relu'))
model.add(Dense(30, activation='relu'))
model.add(Dense(10, activation='relu'))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit(X_train, y_train, epochs=250, batch_size=32, verbose=0)
最后,该模型被保存和评估。
model.save("../model")
loss, acc = model.evaluate(X_test, y_test, verbose=0)
print('Test Accuracy: %.3f' % acc)
保存到磁盘上的TensorFlow模型被推理服务加载,然后对预测器微服务发送的数据进行预测。
时间序列数据和可视化
InfluxDB的实例通过Telegraf与Mosquitto连接。这种配置为我们提供了一个优雅的机制,可以在不写代码的情况下将时间序列数据摄入InfluxDB。
下面是连接Mosquitto和InfluxDB的Telegraf配置:
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = "0s"
flush_jitter = "0s"
debug = false
quiet = false
hostname = ""
omit_hostname = true
[[outputs.influxdb]]
urls = ["http://influxdb:8086"]
database = "fan"
retention_policy = "autogen"
precision = "s"
timeout = "5s"
[[outputs.file]]
files = ["stdout"]
data_format = "influx"
[[inputs.mqtt_consumer]]
servers = ["tcp://mosquitto:1883"]
qos = 0
topics = [
"fan/#"
]
insecure_skip_verify = true
client_id = ""
data_format = "json"
name_override = "fan"
tag_keys = ["deviceID"]
json_string_fields = ["rotation","temperature","vibration","sound","fault"]
现在可以从InfluxDB查询时间序列数据。
最后,我们将Grafana仪表盘连接到InfluxDB,为我们的AIoT解决方案构建一个直观的可视化面板。
在本教程的下一部分,我将讨论部署架构以及基于K3s、Calico和Portworx的存储和网络。保持关注哟!