Masoud Koleini 2021年11月22日
ApacheSpark是一个在机器集群上运行大数据计算的框架。Spark的一个重要用例是机器学习的大数据分析。训练数据集的大小可以是TB。这样的数据集不适合单个机器的内存,必须分布在要处理的机器集群上。
机器学习管道有多个阶段,包括数据提取、转换和训练。根据转换过程和中间数据大小,所需内存可能与原始数据的大小不同。另外,不同的机器学习算法有不同的资源需求。因此,在将其部署到生产环境之前,用户应该评估其ML管道以分配适当的集群资源。
AWS Graviton2处理器使用64位Arm Neoverse内核。与基于x86的实例相比,由Graviron2处理器支持的实例提供了更好的性价比。这个博客展示了如何在AmazonEKS上运行的Spark集群上使用基于Graviton2的实例来训练一个和K-means集群模型。
Spark机器学习库
Spark的机器学习库称为MLlib。它以分布式方式实现ML算法、数据转换和管道。它允许用户保存经过训练的模型,并在预测阶段重新加载它们。新的MLlib库(也称为Spark ML)基于Spark的Dataframe API,允许Spark在数据管道上应用优化。这个博客演示了两个ML案例研究(决策树回归和K-均值聚类),使用EKS集群上运行的大型数据集和Graviton2实例。
要求
对于本文中的案例研究,读者必须遵循我们之前Twitter分析博客中的指导原则。本博客展示了如何设置EKS集群、微调集群以及创建Spark Scala项目。请注意,您可能必须根据正在实施的案例研究更改作为工作节点运行的Graviton2计算机的数量
案例研究1:决策树回归-房价
决策树是监督学习中常用的方法之一。决策树用于解决回归和分类问题。决策树的内部节点表示关于输入特征的问题,分支表示决策,叶子表示结果。
Spark使用Random Forest 和Gradient-Boosted Trees作为两种流行的树集成算法,以减少决策树可能的过度拟合。在Spark上运行决策树的一个重要好处是,实现可以将训练有效地分布到多个节点上。
加州房价数据集
本案例研究中使用的数据集包含不同地理位置的加利福尼亚房价、房产年龄、卧室数量以及其他一些特征。我们利用这个数据集训练了一个决策树回归模型,这样我们就可以预测市场上其他房屋的价格。
原始数据集的大小很小(1.2MB)。因此,本案例研究将复制相同的数据,以创建大小为55.7GB的数据集(分解为多个CSV文件)。90%的数据用于训练,其余10%通过模型进行预测。数据集存储在AmazonS3 bucket中。
简图
此图显示了ML案例研究中的不同组件是如何相互作用的。
图1。决策树ML应用中的组件
AWS组件和权限
在运行Spark作业之前,应创建已上载数据集的bucket。ML管道还应该具有对AWS S3存储桶的读写访问权限。
要使用Terraform部署所需资源,请在EKS Terraform文件夹中创建一个名为ml.tf的文件(从前面的案例研究中删除twitter.tf)。在文件中放置以下内容。这将创建一个名为spark ml demo的S3 bucket,并为角色添加正确的权限:
resource "aws_s3_bucket" "spark-ml-demo" {
bucket = "spark-ml-demo"
}
resource "aws_iam_role_policy_attachment" "s3-full-access" {
role = "eksWorkerRole"
policy_arn = "arn:aws:iam::aws:policy/AmazonS3FullAccess"
}
运行terraform apply来创建资源,然后将数据集上传到spark ml demo bucket中。
Spark实施
与Twitter类似,读者需要创建sbt项目,并将以下依赖项添加到build.sbt文件中:
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.1.2",
"org.apache.spark" %% "spark-sql" % "3.1.2",
"org.apache.spark" %% "spark-mllib" % "3.1.2",
"software.amazon.awssdk" % "s3" % "2.17.31",
"org.apache.hadoop" % "hadoop-common" % "3.2.0",
"org.apache.hadoop" % "hadoop-aws" % "3.2.0",
)
下面的代码将spark ml demo S3 bucket中的数据集(包含_1.csv,…)读取到数据帧中(假设数据集在bucket中被分解为多个csv文件)。输入被分成训练和测试数据集。训练数据集被矢量化并传递给决策树回归器拟合函数,以创建一个模型,该模型是Spark ML上下文中的转换器。
代码中的下一步将介绍ML推理。它使用相同的向量汇编程序对测试数据集进行向量化,以进行训练数据转换。预测数据帧是通过在矢量化测试数据集上调用模型的转换函数生成的。结果将写回Predicts文件夹下的同一个S3 bucket。
请注意,在培训中仅使用特征的子集。
package sparkml
import org.apache.spark.sql._
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.log4j._
object RealEstate {
// this is for California dataset:
// https://developers.google.com/machine-learning/crash-course/california-housing-data-description
case class Property(longitude: Double, latitude: Double, housing_median_age: Double, total_rooms: Double,
total_bedrooms: Double, population: Double, households: Double, median_income: Double,
median_house_value: Double, ocean_proximity: String)
def main(array: Array[String]) {
// set the log level
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.appName("realEstate")
.getOrCreate()
import spark.implicits._
val realEstate = spark
.read
.option("header", "true")
.option("inferSchema", "true")
.csv("s3a://spark-ml-demo/housing_*.csv")
.as[Property]
val Array(trainingData, testData) = realEstate.randomSplit(Array(0.9, 0.1))
val assembler = new VectorAssembler()
.setInputCols(Array("housing_median_age", "total_rooms", "total_bedrooms", "population", "households", "median_income"))
.setOutputCol("features")
.setHandleInvalid("skip")
val trainDf = assembler
.transform(trainingData)
.select("features", "median_house_value")
val dt = new DecisionTreeRegressor()
.setLabelCol("median_house_value")
.setFeaturesCol("features")
val model = dt.fit(trainDf)
val testDF = assembler
.transform(testData)
.select("features", "median_house_value")
val predictions = model.transform(testDF)
.select("median_house_value", "prediction")
predictions.write.csv("s3a://spark-ml-demo/predictions")
spark.stop()
}
}
在上面的代码中,我们只使用数字特征进行训练和推理。读者可以在输入列中添加分类功能,如ocean_邻近性,并验证结果。
在本地测试程序
将原始数据集存储在计算机上,并对Scala脚本应用以下修改:
1.将Spark配置中的master设置为本地运行
val spark = SparkSession
.builder()
.appName("realEstate")
.master("local[*]")
.getOrCreate()
2.将S3数据集位置替换为计算机上csv文件的地址
3.将S3上的输出地址替换为本地目录,如/tmp
现在,在本地运行代码将创建一个包含预测的CSV文件。在打包软件之前,请确保已还原更改。
打包代码和依赖项
遵循Twitter分析案例研究中的指导原则打包代码和依赖项。
提交Spark
这个过程类似于Twitter分析案例研究,除了这里,我们正在处理更大的数据集和更详细的资源规划。
本案例研究在7个内存优化的r6g.4X大型实例上运行。这些实例具有16个vCPU和128GiB的内存。spark submit命令在EKS集群上运行作业,假设应用程序JAR文件的名称为spark_ml.JAR(用相应的信息替换KUBERNETES_MASTER_地址和DOCKER_IMAGE_地址)。分配给驱动程序和每个执行器的VCPU数量设置为5,这允许3个POD在一台机器上运行。相应地计算执行器的数量和内存分配。
bin/spark-submit \
--class sparkml.RealEstate \
--master k8s://<KUBERNETES_MASTER_ADDRESS> \
--deploy-mode cluster \
--conf <DOCKER_IMAGE_ADDRESS> \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.driver.pod.name="spark-ml" \
--conf spark.kubernetes.namespace=default \
--conf spark.executor.instances=20 \
--conf spark.driver.cores=5 \
--conf spark.executor.cores=5 \
--conf spark.executor.memory=34g \
--conf spark.driver.memory=34g \
--conf spark.memory.fraction=0.8 \
--name sparkML \
local:///opt/spark/jars/spark_ml.jar
要监视应用程序在Kubernetes群集上是否无错误运行,可以使用Spark UI监视Spark作业的不同统计信息。通过提交以下命令运行Spark UI,并在浏览器(localhost:4040)上打开该UI:
kubectl port-forward spark-ml 4040:4040
预测结果
案例研究运行6分钟,并将预测结果写入多个CSV文件上的S3存储桶中。CSV文件有两列:第一列是实际房屋中值,第二列是预测值(您可以修改机器学习脚本以在CSV文件中包含其他相关功能)。
95800,132367.88804380846
113800,190347.262329421
113800,190347.262329421
67500,104197.72693265839
67500,104197.72693265839
65800,104197.72693265839
68600,104197.72693265839
71100,190347.262329421
53500,104197.72693265839
71300,158820.92941131844
59200,104197.72693265839
59200,104197.72693265839
案例研究2:K-均值聚类:Uber驱动程序位置
本节介绍如何在Amazon EKS上运行的Spark群集上运行K-means群集训练和推理。
K-均值聚类算法
K-means是无监督学习领域中最流行的聚类算法。假设k为预期的簇数,该算法随机初始化称为簇质心的k个簇中心,并迭代:
1.根据数据点到质心的欧几里德距离,将数据点指定给簇
2.将簇质心移动到每个簇中数据点的平均值
迭代将继续,直到收敛。Spark使用K-means算法的一种变体,可以在集群上并行化。
Uber驱动程序位置数据集
K-means案例研究使用Uber司机位置数据集,其中包括司机在不同日期和时间的纬度和经度。CSV格式的数据集大小为209.7MB。因此,数据被复制以在Amazon S3上创建大小为195.5GB的数据集(分解为多个CSV文件)。它将90%的数据用于培训,其余用于测试。
图、需求和依赖关系与前面的决策树示例相同。
Spark实施
以下是Spark上K-means集群的实现,其中K设置为16。它将结果存储回uber predictions文件夹下的S3 bucket中。
package spark_ml
import org.apache.spark.sql._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.log4j._
import org.apache.spark.ml.clustering.KMeans
object Uber {
def main(array: Array[String]) {
// set the log level
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.appName("uber")
.getOrCreate()
val uber = spark
.read
.option("header", "true")
.option("inferSchema", "true")
.csv("s3a://spark-ml-demo/uber-raw-data*.csv")
val Array(trainingData, testData) = uber.select("Lat", "Lon").randomSplit(Array(0.90, 0.1))
val assembler = new VectorAssembler()
.setInputCols(Array("Lat", "Lon"))
.setOutputCol("features")
.setHandleInvalid("skip")
val trainDf = assembler
.transform(trainingData)
val dt = new KMeans()
.setK(16)
.setFeaturesCol("features")
.setPredictionCol("prediction")
val model = dt.fit(trainDf)
model.clusterCenters.foreach(println)
val testDF = assembler
.transform(testData)
val predictions = model.transform(testDF)
.select("Lat", "Lon", "prediction")
predictions.write.csv("s3a://spark-ml-demo/uber-predictions")
spark.stop()
}
}
在本地测试程序
使用本地存储的Uber驱动程序位置数据集在您的计算机上运行代码。遵循与决策树回归相同的步骤。
打包代码和依赖项
遵循与Twitter分析案例研究中相同的准则打包代码和依赖项。
提交Spark
K-means案例研究在9个内存优化的r6g.4X大型实例上运行。因此,spark submit参数中的执行器数量会相应更改:
bin/spark-submit \
--class sparkml.Uber \
--master k8s://<KUBERNETES_MASTER_ADDRESS> \
--deploy-mode cluster \
--conf <DOCKER_IMAGE_ADDRESS> \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.driver.pod.name="spark-ml" \
--conf spark.kubernetes.namespace=default \
--conf spark.executor.instances=26 \
--conf spark.driver.cores=5 \
--conf spark.executor.cores=5 \
--conf spark.executor.memory=34g \
--conf spark.driver.memory=34g \
--conf spark.memory.fraction=0.8 \
--name sparkML \
local:///opt/spark/jars/spark_ml.jar
聚类结果
该案例研究在34分钟内完成,覆盖了Gravion2工人集群。当Spark作业成功终止时,结果应位于uber prediction文件夹中,并分解为多个CSV文件。CSV文件中有三列:前两列是纬度和经度,第三列是簇编号。您还可以通过在创建模型的行之后调用model.clusterCenters.foreach(println)来打印cluster centroids。
40.8971,-73.8652,4
40.8972,-73.8653,4
40.8972,-73.8653,4
40.8973,-74.4901,15
40.8973,-73.8635,4
40.8977,-73.947,4
40.8977,-73.9469,4
40.8978,-73.9139,4
40.8981,-73.9717,3
40.8982,-74.1262,0
40.8982,-73.8673,4
40.8985,-73.901,4
40.8985,-73.901,4
40.8985,-73.9009,4
结论
在这个博客中,我们演示了两个ML案例研究:决策树回归和K-均值聚类,使用大量数据集,并在EKS Kubernetes集群上运行Graviton2实例。Spark的MLlib帮助行业以分布式方式在TB数据上运行机器学习管道(转换、存储、学习和预测)。当数据大小变大并且ML算法需要更多资源时,工作人员的数量会增加。AWS的研究另外,较低的每小时Gravion2实例为用户提供了高达30%的成本降低。当数十个或数百个工作节点长时间运行以执行ML计算时,这将为用户带来巨大的好处。
有关采用基于Arm处理器的客户案例,请访问AWS Graviton页面。有关在Arm Neoverse平台上运行的软件工作负载的任何查询,请随时联系sw-ecosystem@arm.com.