来源:腾讯技术工程微信号
作者:calvinrzluo丨腾讯 IEG 后台开发工程师本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正。为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容。
Spark Job 执行流程分析
Job 阶段
下面我们通过一个 RDD 上的 Action 操作 count,查看 Spark 的 Job 是如何运行和调度的。特别注意的是,在 SparkSQL 中,Action 操作有不同的执行流程,所以宜对比着看。count
通过全局的SparkContext.runJob
启动一个 Job,这个函数转而调用DAGScheduler.runJob
。Utils.getIteratorSize
实际上就是遍历一遍迭代器,以便统计 count。
// RDD.scaladef count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum// Utils.scaladef getIteratorSize(iterator: Iterator[_]): Long = { var count = 0L while (iterator.hasNext) { count += 1L iterator.next() } count}
在参数列表里面的下划线_
的作用是将方法转为函数,Scala 中方法和函数之间有一些区别,在此不详细讨论。
下面查看runJob
函数。比较有趣的是clean
函数,它调用ClosureCleaner.clean
方法,这个方法用来清理$outer
域中未被引用的变量。因为我们要将闭包func
序列化,并从 Driver 发送到 Executor 上面。序列化闭包的过程就是为每一个闭包生成一个可序列化类,在生成时,会将这个闭包所引用的外部对象也序列化。容易发现,如果我们为了使用外部对象的某些字段,而序列化整个对象,那么开销是很大的,因此通过clean
来清除不需要的部分以减少序列化开销。
此外,getCallSite
用来生成诸如s"$lastSparkMethod at $firstUserFile:$firstUserLine"
这样的字符串,它实际上会回溯调用栈,找到第一个不是在 Spark 包中的函数,即$lastSparkMethod
,它是导致一个 RDD 创建的函数,比如各种 Transform 操作、sc.parallelize
等。
// SparkContext.scaladef runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) // CheckPoint机制 rdd.doCheckpoint()}private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true "spark] def clean[F <: AnyRef"): F = { ClosureCleaner.clean(f, checkSerializable) f}
我们发现,传入的 func 只接受一个Iterator[_]
参数,但是其形参声明却是接受TaskContext
和Iterator[T]
两个参数。这是为什么呢?这是因为runJob
有不少重载函数,例如下面的这个
def runJob[T, U: ClassTag]( rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U] = { val cleanedFunc = clean(func) runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)}
下面我们查看DAGScheduler.runJob
函数,它实际上就是调用submitJob
,然后等待 Job 执行的结果。由于 Spark 的DAGScheduler
是基于事件循环的,它拥有一个DAGSchedulerEventProcessLoop
类型的变量eventProcessLoop
,不同的对象向它post
事件,然后在它的onReceive
循环中会依次对这些事件调用处理函数。
我们需要注意的是partitions
不同于我们传入的rdd.partitions
,前者是一个Array[Int]
,后者是一个Array[Partition]
。并且在逻辑意义上,前者表示需要计算的 partition,对于如 first 之类的 Action 操作来说,它只是 rdd 的所有 partition 的一个子集,我们将在稍后的submitMissingTasks
函数中继续看到这一点。
def runJob[T, U](... "T, U"): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) // 下面就是在等了 ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) throw exception }}def submitJob[T, U]( rdd: RDD[T], // target RDD to run tasks on,就是被执行count的RDD func: (TaskContext, Iterator[T]) => U, // 在RDD每一个partition上需要跑的函数 partitions: Seq[Int], callSite: CallSite, // 被调用的位置 resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // 检查是否在一个不存在的分区上创建一个Task val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions)} // jobId是从后往前递增的 val jobId = nextJobId.getAndIncrement() if (partitions.isEmpty) { val time = clock.getTimeMillis() // listenerBus是一个LiveListenerBus对象,从DAGScheduler构造时得到,用来做event log // SparkListenerJobStart定义在SparkListener.scala文件中 listenerBus.post(SparkListenerJobStart(jobId, time, Seq[StageInfo]( "StageInfo"), SerializationUtils.clone(properties))) listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded)) // 如果partitions是空的,那么就直接返回 return new JobWaiter[U](this, jobId, 0, resultHandler "U") } assert(partitions.nonEmpty) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler "U") // 我们向eventProcessLoop提交一个JobSubmitted事件 eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter}// DAGSchedulerEvent.scalaprivate[scheduler] case class JobSubmitted( jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties = null) extends DAGSchedulerEvent
下面我们具体看看对JobSubmitted
的响应
// DAGScheduler.scalaprivate[scheduler] def handleJobSubmitted(...) { var finalStage: ResultStage = null // 首先我们尝试创建一个`finalStage: ResultStage`,这是整个Job的最后一个Stage。 try { // func: (TaskContext, Iterator[_]) => _ // 下面的语句是可能抛BarrierJobSlotsNumberCheckFailed或者其他异常的, // 例如一个HadoopRDD所依赖的HDFS文件被删除了 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { ...// DAGScheduler.scalaprivate def createResultStage(...): ResultStage = { checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage}
这里createResultStage
所返回的ResultStage
继承了Stage
类。Stage
类有个rdd
参数,对ResultStage
而言就是finalRDD
,对ShuffleMapStage
而言就是ShuffleDependency.rdd
// DAGScheduler.scaladef createShuffleMapStage[K, V, C]( shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd ...
下面我们来看看checkBarrierStageWithNumSlots
这个函数,因为它会抛出BarrierJobSlotsNumberCheckFailed
这个异常,被handleJobSubmitted
捕获。这个函数主要是为了检测是否有足够的 slots 去运行所有的 barrier task。屏障调度器是 Spark 为了支持深度学习在 2.4.0 版本所引入的一个特性。它要求在 barrier stage 中同时启动所有的 Task,当任意的 task 执行失败的时候,总是重启整个 barrier stage。这么麻烦是因为 Spark 希望能够在 Task 中提供一个 barrier 以供显式同步。
// DAGScheduler.scalaprivate def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = { val numPartitions = rdd.getNumPartitions val maxNumConcurrentTasks = sc.maxNumConcurrentTasks if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) { throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks) }}// DAGScheduler.scala ... case e: BarrierJobSlotsNumberCheckFailed => // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically. // barrierJobIdToNumTasksCheckFailures是一个ConcurrentHashMap,表示对每个BarrierJob上失败的Task数量 val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId, (_: Int, value: Int) => value + 1) ... if (numCheckFailures <= maxFailureNumTasksCheck) { messageScheduler.schedule( new Runnable { override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func, partitions, callSite, listener, properties)) }, timeIntervalNumTasksCheck, TimeUnit.SECONDS ) return } else { // Job failed, clear internal data. barrierJobIdToNumTasksCheckFailures.remove(jobId) listener.jobFailed(e) return } case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } // Job submitted, clear internal data. barrierJobIdToNumTasksCheckFailures.remove(jobId) ...
下面开始创建 Job。ActiveJob
表示在DAGScheduler
里面运行的一个 Job。
Job 只负责向“叶子”Stage 要结果,而之前 Stage 的运行是由DAGScheduler
来调度的。这是因为若干 Job 可能共用同一个 Stage 的计算结果,我这样说的根据是在 Stage 类的定义中的jobIds
字段是一个HashSet
,也就是说它可以属于多个 Job。所以将某个 Stage 强行归属到某个 Job 是不符合 Spark 设计逻辑的。
// DAGScheduler.scala ... val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() // 在这里会打印四条日志,这个可以被用来在Spark.log里面定位事件 logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) ... val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) // 从最后一个stage开始调用submitStage submitStage(finalStage)}
Stage 阶段
Stage 是如何划分的呢?又是如何计算 Stage 之间的依赖的?我们继续查看submitStage
这个函数,对于一个 Stage,首先调用getMissingParentStages
看看它的父 Stage 能不能直接用,也就是说这个 Stage 的 rdd 所依赖的所有父 RDD 能不能直接用,如果不行的话,就要先算父 Stage 的。在前面的论述里,我们知道,若干 Job 可能共用同一个 Stage 的计算结果,而不同的 Stage 也可能依赖同一个 RDD。
private def submitStage(stage: Stage) { // 找到这个stage所属的job val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { // 如果依赖之前的Stage,先列出来,并且按照id排序 val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { // 运行这个Stage logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { // 先提交所有的parent stage for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) }}
下面具体查看getMissingParentStages
这个函数,可以看到,Stage 的计算链是以最后一个 RDD 为树根逆着向上遍历得到的,而这个链条的终点要么是一个ShuffleDependency
,要么是一个所有分区都被缓存了的 RDD。
private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] val waitingForVisit = new ListBuffer[RDD[_]] // 这里是个**DFS**,栈是手动维护的,主要是为了防止爆栈 waitingForVisit += stage.rdd def visit(rdd: RDD[_]): Unit = { if (!visited(rdd)) { visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) if (rddHasUncachedPartitions) { // 如果这个RDD有没有被缓存的Partition,那么它就需要被计算 for (dep <- rdd.dependencies) { // 我们检查这个RDD的所有依赖 dep match { case shufDep: ShuffleDependency[_, _, _] => // 我们发现一个宽依赖,因此我们创建一个新的Shuffle Stage,并加入到missing中(如果不存在) // 由于是宽依赖,所以我们不需要向上找了 val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) { missing += mapStage } case narrowDep: NarrowDependency[_] => // 如果是一个窄依赖,就加入到waitingForVisit中 // prepend是在头部加,+=是在尾部加 waitingForVisit.prepend(narrowDep.rdd) } } } } } while (waitingForVisit.nonEmpty) { visit(waitingForVisit.remove(0)) } missing.toList}
Task 阶段
下面是重头戏submitMissingTasks
,这个方法负责生成 TaskSet,并且将它提交给 TaskScheduler 低层调度器。
partitionsToCompute
计算有哪些分区是待计算的。根据 Stage 类型的不同,findMissingPartitions
的计算方法也不同。
// DAGScheduler.scalaprivate def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") // First figure out the indexes of partition ids to compute. val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() ...// ResultStage.scalaoverride def findMissingPartitions(): Seq[Int] = { val job = activeJob.get (0 until job.numPartitions).filter(id => !job.finished(id))}// ActiveJob.scalaval numPartitions = finalStage match { // 对于ResultStage,不一定得到当前rdd的所有分区,例如first()和lookup()的Action, // 因此这里是r.partitions而不是r.rdd.partitions case r: ResultStage => r.partitions.length case m: ShuffleMapStage => m.rdd.partitions.length}// ShuffleMapStage.scalaoverride def findMissingPartitions(): Seq[Int] = { mapOutputTrackerMaster .findMissingPartitions(shuffleDep.shuffleId) .getOrElse(0 until numPartitions)}// MapOutputTrackerMaster.scaladef findMissingPartitions(shuffleId: Int): Option[Seq[Int]] = { shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())}
这个outputCommitCoordinator
是由SparkEnv
维护的OutputCommitCoordinator
对象,它决定到底谁有权利向输出写数据。在 Executor 上的请求会通过他持有的 Driver 的OutputCommitCoordinatorEndpoint
的引用发送给 Driver 处理
// DAGScheduler.scala ... // Use the scheduling pool, job group, description, etc. from an ActiveJob associated // with this Stage val properties = jobIdToActiveJob(jobId).properties runningStages += stage // 在检测Tasks是否serializable之前,就要SparkListenerStageSubmitted, // 如果不能serializable,那就在这**之后**给一个SparkListenerStageCompleted stage match { case s: ShuffleMapStage => outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) case s: ResultStage => outputCommitCoordinator.stageStart( stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) } ...
用getPreferredLocs
计算每个分区的最佳计算位置,它实际上是调用getPreferredLocsInternal
这个函数。这个函数是一个关于visit: HashSet[(RDD[_], Int)]
的递归函数,visit 用(rdd, partition)
元组唯一描述一个分区。getPreferredLocs
的计算逻辑是这样的:
- 如果已经 visit 过了,就返回 Nil
- 如果是被 cached 的,通过
getCacheLocs
返回 cache 的位置 - 如果 RDD 有自己的偏好位置,例如输入 RDD,那么使用
rdd.preferredLocations
返回它的偏好位置 - 如果还没返回,但 RDD 有窄依赖,那么遍历它的所有依赖项,返回第一个具有位置偏好的依赖项的值
理论上,一个最优的位置选取应该尽可能靠近数据源以减少网络传输,但目前版本的 Spark 还没有实现
// DAGScheduler.scala ... val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } } catch { case NonFatal(e) => // 如果有非致命异常就创建一个新的Attempt,并且abortStage(这还不致命么) stage.makeNewStageAttempt(partitionsToCompute.size) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } ...
下面,我们开始 attempt 这个 Stage,我们需要将 RDD 对象和依赖通过closureSerializer
序列化成taskBinaryBytes
,然后广播得到taskBinary
。当广播变量过大时,会产生一条Broadcasting large task binary with size
的 INFO。
// DAGScheduler.scala ... stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq) // 如果没有Task要执行,实际上就是skip了,那么就没有Submission Time这个字段 if (partitionsToCompute.nonEmpty) { stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) // TODO: 也许可以将`taskBinary`放到Stage里面以避免对它序列化多次。 // 一堆注释看不懂 var taskBinary: Broadcast[Array[Byte]] = null var partitions: Array[Partition] = null try { var taskBinaryBytes: Array[Byte] = null // taskBinaryBytes and partitions are both effected by the checkpoint status. We need // this synchronization in case another concurrent job is checkpointing this RDD, so we get a // consistent view of both variables. RDDCheckpointData.synchronized { taskBinaryBytes = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => // 注意这里的stage.func已经被ClosureCleaner清理过了 JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } partitions = stage.rdd.partitions } ... // 广播 taskBinary = sc.broadcast(taskBinaryBytes) } catch { // In the case of a failure during serialization, abort the stage. case e: NotSerializableException => abortStage(stage, "Task not serializable: " + e.toString, Some(e)) runningStages -= stage ... }
下面,我们根据 Stage 的类型生成 Task。
// DAGScheduler.scala ... val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } } } catch { ... }
我们将生成的tasks
包装成一个TaskSet
,并且提交给taskScheduler
。
// DAGScheduler.scala ... if (tasks.nonEmpty) { logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties)) } else {
如果 tasks 是空的,说明任务就已经完成了,打上 DEBUG 日志,并且调用submitWaitingChildStages
// Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None) stage match { case stage: ShuffleMapStage => logDebug(s"Stage ${stage} is actually done; " + s"(available: ${stage.isAvailable}," + s"available outputs: ${stage.numAvailableOutputs}," + s"partitions: ${stage.numPartitions})") markMapStageJobsAsFinished(stage) case stage : ResultStage => logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})") } submitWaitingChildStages(stage) }}
推荐阅读:
更多腾讯AI相关技术干货,请关注专栏腾讯技术工程