Masoud Koleini 2021年11月16日
介绍
ApacheSpark是一个通用框架,用于在机器集群上进行分布式计算。数据可以存储在分布式数据库(如ApacheHadoop分布式文件系统)中,也可以通过流模块收集。然后,数据可以由运行在一组worker上的Spark执行器进行处理。Spark可以使用自己的集群管理器独立运行,也可以在ApacheMesos、Hadoop Thread或Kubernetes上运行。
AWS Graviton2处理器使用64位Arm Neoverse内核。与基于x86的实例相比,由Graviron2处理器提供支持的Amazon EC2实例提供了更好的性价比。AWS最近的一份报告显示,与Graviton2工人一起在EKS上运行Spark时,性能提高了15%,价格降低了30%。本博客解释了在Kubernetes集群上使用Spark Streaming和基于Graviton2的worker实例实现两个案例研究。共享技术细节和最佳实践,以帮助用户克服在实现类似应用程序时遇到的问题。
Kubernetes:一个容器编排系统
Kubernetes是一个流行的开源框架,用于部署、扩展和管理容器化应用程序。Kubernetes集群由以下部分组成:
控制平面:通过调度应用程序、保留其所需状态以及管理其扩展和更新来管理集群。这是通过接收提交的用户请求的API服务器来处理的。此外,集群节点通过与控制平面通信来同步其状态。
工作节点:由在每个工作节点上运行的名为Kubelet的代理管理的主机吊舱(应用程序)。Kubelet处理与API服务器的控制平面通信,并通过CRI(容器运行时接口)根据所需状态管理容器生命周期。
Kubernetes附带了一个名为kubectl的便捷工具,这是一个与API服务器通信的命令行接口实用程序。kubectl命令行参数允许用户直接描述他们想要部署的资源。此外,用户可以在yaml文件中表达所有需求,并通过kubectl运行该文件进行部署。
图1。使用Docker作为容器运行时的Kubernetes群集
Kubernetes是Spark支持的集群管理器之一。Spark在Kubernetes集群上运行作业的方式是首先在一个工作节点上创建一个驱动程序吊舱。然后,驱动程序pod根据请求的资源生成执行程序pod。作业完成后,所有executor pod终止,驱动程序pod移动到完成状态,允许用户访问日志以进行调试。
Spark二进制文件附带一个名为Spark submit的工具。它可以与多个集群管理器对话以运行作业。传递给spark submit的参数定义了群集管理器的类型、其API服务器的地址、执行器的数量、驱动程序和执行器资源、容器映像地址、应用程序JAR文件以及其他功能参数。在接下来的部分中,我们将使用spark submit在Kubernetes集群上部署spark应用程序。
亚马逊弹性Kubernetes服务
Amazon EKS是AWS提供的一项托管Kubernetes服务。本节介绍如何使用Terraform以编程方式创建基于Graviton2的EKS集群。我们建议读者在继续阅读本文档的其余部分之前对Terraform有所了解。
使用Terraform的EKS群集资源调配
HashiCorp提供了关于如何在AWS上提供EKS集群的文档。Terraform脚本在github存储库中可用。但是,它可能需要一些调整才能在具有不同配置的AWS帐户上运行。
当前工作组参数默认为启动x86实例。运行基于Gravion2的实例需要进行一些更改:
确定要运行集群的EKS支持的Kubernetes版本。您可以在eks-cluster.tf中设置版本。
确定Kubernetes版本的最佳Amazon Linux AMI
使用以下信息更新工作组参数(以下是使用内存优化实例发布博客时Kubernetes 1.21.2的示例)
name = "eks-nodes-aarch64"
ami_id = "ami-08899b9102b960c9c"
instance_type = "r6g.4xlarge"
Spark驱动服务帐户
Spark驱动程序应具有在工作实例上生成执行器的正确权限,同时在其中一个Kubernetes节点上作为pod运行。用户可以为Spark创建Kubernetes服务帐户和ClusterRoleBinding,可以在运行Spark作业时将其分配给驱动程序。使用Spark的文档创建帐户并使用kubectl进行绑定。
微调集群
要在保持资源优化的同时管理成本,微调集群至关重要。以下是一些调整和最佳实践,您可以遵循这些调整和最佳实践来充分利用Kubernetes群集:
标记工作组节点,并在具有特定标签的实例集上运行Spark驱动程序和执行器。此外,监视和度量服务器最好在单独的机器上运行,而不是与执行者共享资源。
动态生成工作节点,并在作业完成时销毁它们。这可能会增加工作流的复杂性,并需要正确的节点配置才能加入集群。但它不允许空闲实例在集群内运行,从而节省了大量资金。
在生产环境中运行之前,应根据具体情况计算每个Spark作业所需的资源。为作业分配太多的资源将导致更高的成本,而资源不足可能导致作业在运行数小时后失败。
经过良好调优的Spark作业应该在UI中显示很少的失败任务。请参阅Tuning Spark文档和AWS最佳实践,以了解有关如何优化群集的指导原则。我们遵循类似的原则来配置集群中的资源、每个执行器的VCPU以及执行器内存计算。
案例研究1:最流行的hashtag
本案例研究解释了如何在EKS集群中使用Spark Streaming部署Twitter分析作业。本节演示如何在Scala中编写作业,将其打包为JAR文件,创建包含应用程序的Spark docker映像,并在EKS集群中部署容器。
我们演示了部署两种常用的tweet分析场景:查找最流行的hashtag和tweet的情绪分析。
要求
Twitter开发者帐户:实现流媒体应用程序的第一步是创建一个Twitter开发者帐户。您必须首先使用开发者门户创建项目和应用程序。然后,创建API密钥、API密钥、访问令牌和访问令牌密钥,以验证应用程序并读取推文。
请注意,Twitter对应用程序应用了速率限制,以提供可靠的服务。因此,一个执行器就足以运行两个流式用例。
使用Spark流处理推文
Spark Streaming是Spark中的一个API,用于可靠地处理高通量数据流,如Kafka、AWS Kinesis、HDFS/S3、Flume和Twitter。它将输入流拆分为小批量,并通过Spark引擎运行它们,从而创建一个处理数据的批量流。数据流的高级抽象称为Dstream(离散化流),它在内部被映射为一系列RDD(弹性分布式数据集)。
Spark在Spark SQL之上附带了另一个流API,称为结构化流。它允许数据以数据集/数据帧(RDD之上的API)的形式呈现,并允许在流数据上优化Spark SQL引擎处理。但是,它不包括Twitter连接器,并且在通过窗口处理聚合数据时有一些限制。因此,为了简单起见,我们在本案例研究中使用Spark Streaming API。
下图显示了实时Twitter分析应用程序中的组件如何交互:
图2。实时Twitter分析应用程序中的组件
Spark Streaming API以窗口方式从Twitter API读取推特流。Spark引擎按窗口运行作业,并将结果发送到Kinesis流,该流由Kinesis流的订户使用。
AWS组件和权限
本案例研究需要以下AWS组件和权限:
1.将输出流写入Kinesis数据流的权限
2.执行器写入输出流的写入权限
3.用于检查点的S3 bucket
4.出于检查点目的对S3进行读/写的权限
如果使用HashiCorp Terraform脚本创建EKS集群,请采取以下步骤添加所需资源:
首先,将工作人员角色名称添加到eks模块:
workers_role_name = "eksWorkerRole"
然后,在同一文件夹中创建一个文件twitter.tf,添加以下内容,并运行terraform apply在AWS环境中创建所需的组件:
resource "aws_kinesis_stream" "spark-analysis-stream" {
name = "spark_analysis_stream"
shard_count = 1
tags = {
// required tags
}
}
resource "aws_iam_role_policy_attachment" "spark-kinesis-access" {
role = "eksWorkerRole"
policy_arn = "arn:aws:iam::aws:policy/AmazonKinesisFullAccess"
}
resource "aws_s3_bucket" " spark-streaming-checkpoints" {
bucket = "spark-streaming-checkpoints"
}
resource "aws_iam_role_policy_attachment" "s3-full-access" {
role = "eksWorkerRole"
policy_arn = "arn:aws:iam::aws:policy/AmazonS3FullAccess"
}
确保terraform在没有抛出任何错误的情况下完成请求。
Spark实施
要求:读者应熟悉Scala编程和sbt项目。使用IDE(如IntelliJ)或命令行创建sbt项目。
创建Scala sbt项目,并更新build.sbt以将以下依赖项添加到项目中(使用Scala版本2.12.12进行测试):
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.1.2",
"org.apache.spark" %% "spark-sql" % "3.1.2",
"org.apache.spark" %% "spark-streaming" % "3.1.2",
"org.apache.spark" %% "spark-streaming-kinesis-asl" % "3.1.2",
"org.apache.bahir" %% "spark-streaming-twitter" % "2.4.0",
"org.twitter4j" % "twitter4j-core" % "4.0.4",
"org.twitter4j" % "twitter4j-stream" % "4.0.4",
"edu.stanford.nlp" % "stanford-corenlp" % "4.2.1",
"edu.stanford.nlp" % "stanford-corenlp" % "4.2.1" classifier "models",
"software.amazon.awssdk" % "kinesis" % "2.17.31",
"org.apache.hadoop" % "hadoop-common" % "3.2.0",
"org.apache.hadoop" % "hadoop-aws" % "3.2.0",
)
使用下面的示例scala代码在bigdata包(src/Main/scala/bigdata)中创建Main.scala。代码在1800秒大小的滑动窗口上,每10秒移动一次,找到5个最流行的英文推文标签。它将结果作为JSON对象写入Kinesis流,供订阅者使用。
package bigdata
import java.nio.charset.StandardCharsets.UTF_8
// AWS imports
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
// Log imports
import org.apache.log4j._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
// Twitter API imports
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
// Spark imports
import org.apache.spark._
object Main extends App {
// Create output JSON to send to output
def createJSON(tuples: List[(String, Int)]): String = tuples match {
case Nil => "{}"
case (x, y) :: Nil => "\"" + x + "\": " + y.toString
case (x, y) :: tail => "\"" + x + "\": " + y.toString + ", " + createJSON(tail)
}
def createRecordEntry(record: String, partition_key: String, stream_name: String): PutRecordsRequest = {
val recordEntry = PutRecordsRequestEntry
.builder()
.data(SdkBytes.fromString(record, UTF_8))
.partitionKey(partition_key)
.build()
val putRecordsRequest = PutRecordsRequest
.builder()
.streamName(stream_name)
.records(recordEntry)
.build();
putRecordsRequest
}
// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
// Read Twitter credentials from the environment variables
System.setProperty("twitter4j.oauth.consumerKey", sys.env("TWITTER_CONSUMER_KEY"))
System.setProperty("twitter4j.oauth.consumerSecret", sys.env("TWITTER_CONSUMER_SECRET"))
System.setProperty("twitter4j.oauth.accessToken", sys.env("TWITTER_ACCESS_TOKEN"))
System.setProperty("twitter4j.oauth.accessTokenSecret", sys.env("TWITTER_TOKEN_SECRET"))
// Kinesis streams parameters
val kinesisStream = "spark_analysis_stream"
val partitionKey = "data"
val kinesisRegion = "us-east-1"
// Sliding window parameters
val slideIntervalSec = 10
val windowLengthSec = 1800
// Directory to output top hashtags
val outputDirectory = "./twitter"
val slideInterval = Seconds(slideIntervalSec)
val windowLength = Seconds(windowLengthSec)
// set AWS kinesis client
val kinesisClient = KinesisAsyncClient
.builder()
.region(Region.of(kinesisRegion))
.build()
// Setup the SparkConfig and StreamingContext
val conf = new SparkConf().setAppName("twitterAnalysis")
val ssc = new StreamingContext(conf, Seconds(1))
val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
val twitterStream = TwitterUtils.createStream(ssc, auth)
// Analyse English tweets only
val tweet = twitterStream.filter(_.getLang == "en").map(_.getText)
// Finding the topmost popular hashtags
val hashTagStream = tweet.flatMap(_.split(" ")).filter(_.startsWith("#")).map(w => (w, 1))
val windowedHashTagCount = hashTagStream.reduceByKeyAndWindow((x: Int, y: Int) => (x + y), (x: Int, y: Int) => (x - y), windowLength, slideInterval)
val orderedHashTags = windowedHashTagCount.transform(rdd => {
val list = rdd.sortBy(_._2, false).take(5)
kinesisClient.putRecords(createRecordEntry("{" + createJSON(list.toList) + "}", partitionKey, kinesisStream))
rdd
})
orderedHashTags.print()
ssc.checkpoint("s3://spark-streaming-checkpoints/checkpoints")
ssc.start()
ssc.awaitTermination()
}
在本地测试程序
您可以在本地运行该程序,以确认结果是否符合预期。但是,您必须对上面的Scala程序进行轻微修改(并在创建包之前将其还原)
1.将Spark配置中的master设置为本地运行
val conf = new SparkConf().setMaster("local[*]").setAppName("twitterAnalysis")
2.向Kinesis客户端添加AWS凭据(profileName是您要使用的AWS配置文件)
val kinesisClient = KinesisAsyncClient
.builder()
.region(Region.of(kinesisRegion))
.credentialsProvider(ProfileCredentialsProvider.create(profileName))
.build()
3.将S3上的检查点地址替换为本地目录,如/tmp
现在应该可以在本地机器上运行基准测试了。使用以下Python脚本从Kinesis流中读取最流行的hashtags:
import boto3
import json
from datetime import datetime
import calendar
import random
import time
my_stream_name = 'spark_analysis_stream'
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
response = kinesis_client.describe_stream(StreamName=my_stream_name)
my_shard_id = response['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name,
ShardId=my_shard_id,
ShardIteratorType='LATEST')
my_shard_iterator = shard_iterator['ShardIterator']
record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator)
record_num = 0
while 'NextShardIterator' in record_response:
record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'])
for record in record_response['Records']:
print(record_num, record['Data'].decode('utf-8'))
record_num = record_num + 1
假设上述脚本存储为kinesis_read.py,则运行此命令以获取5条最流行的推文:
AWS_PROFILE=<AWS_PROFILE_NAME> python ./kinesis_read.py
打包代码和依赖项
由于您要构建一个在Gravion2机器上运行的容器,因此需要在基于AWS Gravion2的实例上构建该容器。
首先,使用sbt assembly创建一个包含包及其依赖项的JAR文件,不包括特定于Spark的JAR文件(IntelliJ用户可以按照本文对其应用程序进行打包)。接下来,下载Spark 3.1.2并将应用程序JAR文件复制到'jars'目录中。
接下来,构建Spark容器映像并将其推送到AmazonElasticContainerRegistry,以便工作节点可以拉取它来创建Spark容器。
提交Spark
现在,是将作业提交到EKS集群的时候了。应用程序由一个执行器提交,并使用默认配置。这不是一个需要大量资源的计算,因为进入流的tweet数量是有限的。在运行spark submit之前,请更换spark submit命令中的以下参数:
1.KUBERNETES_主地址:配置文件中的KUBERNETES主地址(默认配置文件位置为~/.kube/config)
2.DOCKER_IMAGE_ADDRESS:推送到Amazon ECR的Spark DOCKER映像的地址
3.Twitter API keys
假设应用程序JAR文件名为bigdata-assembly-0.1.JAR,在替换所需参数后,在下载的Spark文件夹中运行以下命令。请注意,RDD复制至少需要两个执行器来保持容错性:
bin/spark-submit \
--class bigdata.Main \
--master k8s://<KUBERNETES_MASTER_ADDRESS> \
--deploy-mode cluster \
--conf spark.executor.instances=2 \
--conf <DOCKER_IMAGE_ADDRESS> \
--conf spark.kubernetes.driver.pod.name="twitter" \
--conf spark.kubernetes.namespace=default \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.driverEnv.TWITTER_CONSUMER_KEY=<TWITTER_CONSUMER_KEY> \
--conf spark.kubernetes.driverEnv.TWITTER_CONSUMER_SECRET=<TWITTER_CONSUMER_SECRET> \
--conf spark.kubernetes.driverEnv.TWITTER_ACCESS_TOKEN=<TWITTER_ACCESS_TOKEN> \
--conf spark.kubernetes.driverEnv.TWITTER_TOKEN_SECRET=<TWITTER_TOKEN_SECRET> \
--conf spark.executorEnv.TWITTER_CONSUMER_KEY=<TWITTER_CONSUMER_SECRET> \
--conf spark.executorEnv.TWITTER_CONSUMER_SECRET=<TWITTER_CONSUMER_SECRET> \
--conf spark.executorEnv.TWITTER_ACCESS_TOKEN=<TWITTER_ACCESS_TOKEN> \
--conf spark.executorEnv.TWITTER_TOKEN_SECRET=<TWITTER_TOKEN_SECRET> \
--name sparkTwitter \
local:///opt/spark/jars/bigdata-assembly-0.1.jar
它应该在没有错误中断的情况下开始运行。
运行以下命令以确保驱动程序和执行器吊舱正常工作:
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
spark-twitter 1/1 Running 0 6m1s
twitteranalysis-c435bf7c50edfbbe-exec-1 1/1 Running 0 5m54s
Python脚本的输出(在上一节中介绍)类似于以下内容:
$ AWS_PROFILE=default python ./kinesis_read.py
0 {"#AdBor": 3, "#TAMED_DASHED": 2, "#etsy": 2, "#KingdomHearts": 2, "#GOT7": 2}
1 {"#etsy": 3, "#MACLovesLisa": 3, "#Airforce_Result_Do": 3, "#sora": 3, "#AdBor": 3}
2 {"#Airforce_Result_Do": 4, "#sora": 4, "#etsy": 3, "#MACLovesLisa": 3, "#kingdom": 3}
3 {"#sora": 5, "#MACLovesLisa": 4, "#Airforce_Result_Do": 4, "#etsy": 3, "#kingdom": 3}
案例研究2:推特的情绪分析
另一个有趣的例子是使用已经训练过的模型对Spark Streaming connector接收到的推文进行情感分析。这一次,不是寻找最流行的hashtag,而是计算滑动窗口上不同情绪的数量并发送到Kinesis流。
该存储库使用StanfordNLP库实现了Twitter情绪分析。只需将此scala文件添加到包的目录下,并替换文件中的包名。核心依赖项已经添加到sbt文件中,因此它们应该已经被提取到依赖项目录中。
更改主Scala对象很简单。此代码段替换Main.scala中特定于哈希标记的计算代码:
// transform sentiments into string
import bigdata.SentimentAnalysisUtils._
def sentimentToString(sentiment: SENTIMENT_TYPE): String = {
sentiment match {
case NOT_UNDERSTOOD => "NOT_UNDERSTOOD"
case VERY_NEGATIVE => "VERY_NEGATIVE"
case NEGATIVE => "NEGATIVE"
case NEUTRAL => "NEUTRAL"
case POSITIVE => "POSITIVE"
case VERY_POSITIVE => "VERY_POSITIVE"
}
}
val sentiments = tweet.map(x => (SentimentAnalysisUtils.detectSentiment(x), 1))
val windowedSentimentCount = sentiments.reduceByKeyAndWindow((x: Int, y: Int) => (x + y), (x: Int, y: Int) => (x - y), windowLength, slideInterval)
val sentimentsCount = windowedSentimentCount.transform( rdd => {
val list = rdd.sortBy(_._2, false).take(5).map(x => (sentimentToString(x._1), x._2))
kinesisClient.putRecords(createRecordEntry("{" + createJSON(list.toList) + "}", partitionKey, kinesisStream))
rdd
})
sentimentsCount.print()
按照与前面案例研究相同的步骤构建JAR文件、创建Spark容器和提交作业。由于情感分析由于更高的计算要求而使用更多的资源,因此在本案例研究中使用了5个工作节点。以下是在群集上运行作业的spark submit命令:
bin/spark-submit \
--class bigdata.Main \
--master k8s://<KUBERNETES_MASTER_ADDRESS> \
--deploy-mode cluster \
--conf spark.executor.instances=14 \
--conf <DOCKER_IMAGE_ADDRESS> \
--conf spark.kubernetes.driver.pod.name="twitter" \
--conf spark.kubernetes.namespace=default \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.driverEnv.TWITTER_CONSUMER_KEY=<TWITTER_CONSUMER_KEY> \
--conf spark.kubernetes.driverEnv.TWITTER_CONSUMER_SECRET=<TWITTER_CONSUMER_SECRET> \
--conf spark.kubernetes.driverEnv.TWITTER_ACCESS_TOKEN=<TWITTER_ACCESS_TOKEN> \
--conf spark.kubernetes.driverEnv.TWITTER_TOKEN_SECRET=<TWITTER_TOKEN_SECRET> \
--conf spark.executorEnv.TWITTER_CONSUMER_KEY=<TWITTER_CONSUMER_SECRET> \
--conf spark.executorEnv.TWITTER_CONSUMER_SECRET=<TWITTER_CONSUMER_SECRET> \
--conf spark.executorEnv.TWITTER_ACCESS_TOKEN=<TWITTER_ACCESS_TOKEN> \
--conf spark.executorEnv.TWITTER_TOKEN_SECRET=<TWITTER_TOKEN_SECRET> \
--conf spark.executor.cores=5 \
--conf spark.driver.cores=5 \
--conf spark.driver.memory=34g \
--conf spark.executor.memory=34g \
--conf spark.memory.fraction=0.8 \
--name sparkTwitter \
local:///opt/spark/jars/bigdata-assembly-0.1.jar
运行同一个Python脚本,通过从Kinesis流读取数据,显示如下输出:
$ AWS_PROFILE=default python ./kinesis_read.py
0 {"NEUTRAL": 552, "NEGATIVE": 179, "POSITIVE": 70, "NOT_UNDERSTOOD": 1}
1 {"NEUTRAL": 1563, "NEGATIVE": 537, "POSITIVE": 200, "VERY_POSITIVE": 2, "NOT_UNDERSTOOD": 2}
2 {"NEUTRAL": 3982, "NEGATIVE": 1356, "POSITIVE": 536, "NOT_UNDERSTOOD": 4, "VERY_POSITIVE": 3}
在前面的代码中,我们只使用数字特征进行训练和推理。读者可以在输入列中添加分类功能,如ocean_邻近性,并验证结果。
结论
在这个博客中,我们回顾了两个实时Twitter分析案例研究,使用EKS上的Spark Streaming和基于Graviron2的实例:查找最流行的hashtag和对推特的情绪分析。与基于x86的实例相比,基于AWS Gravion2的实例提供更低的每小时价格,同时提供有竞争力的性能。通常,Spark作业可能需要在集群中的大量工作节点上运行。因此,在存在大量数据集和需要大量资源的计算的情况下,降低每个实例的成本可以显著降低运行Spark作业的成本。
有关采用基于Arm处理器的客户案例,请访问AWS Graviton页面。有关在Arm Neoverse平台上运行的软件工作负载的任何查询,请随时联系sw-ecosystem@arm.com.