腾讯技术工程 · 2020年04月20日

Spark源码和调优简介 Spark Core(中)

来源:腾讯技术工程微信号
作者:calvinrzluo丨腾讯 IEG 后台开发工程师

本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正。为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容。

Spark Job 执行流程分析

Job 阶段

下面我们通过一个 RDD 上的 Action 操作 count,查看 Spark 的 Job 是如何运行和调度的。特别注意的是,在 SparkSQL 中,Action 操作有不同的执行流程,所以宜对比着看。count通过全局的SparkContext.runJob启动一个 Job,这个函数转而调用DAGScheduler.runJobUtils.getIteratorSize实际上就是遍历一遍迭代器,以便统计 count。

// RDD.scaladef count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum// Utils.scaladef getIteratorSize(iterator: Iterator[_]): Long = {  var count = 0L  while (iterator.hasNext) {    count += 1L    iterator.next()  }  count}

在参数列表里面的下划线_的作用是将方法转为函数,Scala 中方法和函数之间有一些区别,在此不详细讨论。

下面查看runJob函数。比较有趣的是clean函数,它调用ClosureCleaner.clean方法,这个方法用来清理$outer域中未被引用的变量。因为我们要将闭包func序列化,并从 Driver 发送到 Executor 上面。序列化闭包的过程就是为每一个闭包生成一个可序列化类,在生成时,会将这个闭包所引用的外部对象也序列化。容易发现,如果我们为了使用外部对象的某些字段,而序列化整个对象,那么开销是很大的,因此通过clean来清除不需要的部分以减少序列化开销。

此外,getCallSite用来生成诸如s"$lastSparkMethod at $firstUserFile:$firstUserLine"这样的字符串,它实际上会回溯调用栈,找到第一个不是在 Spark 包中的函数,即$lastSparkMethod,它是导致一个 RDD 创建的函数,比如各种 Transform 操作、sc.parallelize等。

// SparkContext.scaladef runJob[T, U: ClassTag](    rdd: RDD[T],    func: (TaskContext, Iterator[T]) => U,    partitions: Seq[Int],    resultHandler: (Int, U) => Unit): Unit = {  if (stopped.get()) {    throw new IllegalStateException("SparkContext has been shutdown")  }  val callSite = getCallSite  val cleanedFunc = clean(func)  logInfo("Starting job: " + callSite.shortForm)  if (conf.getBoolean("spark.logLineage", false)) {    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)  }  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)  progressBar.foreach(_.finishAll())  // CheckPoint机制  rdd.doCheckpoint()}private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true "spark] def clean[F <: AnyRef"): F = {  ClosureCleaner.clean(f, checkSerializable)  f}

我们发现,传入的 func 只接受一个Iterator[_]参数,但是其形参声明却是接受TaskContextIterator[T]两个参数。这是为什么呢?这是因为runJob有不少重载函数,例如下面的这个

def runJob[T, U: ClassTag](    rdd: RDD[T],    func: Iterator[T] => U,    partitions: Seq[Int]): Array[U] = {  val cleanedFunc = clean(func)  runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)}

下面我们查看DAGScheduler.runJob函数,它实际上就是调用submitJob,然后等待 Job 执行的结果。由于 Spark 的DAGScheduler是基于事件循环的,它拥有一个DAGSchedulerEventProcessLoop类型的变量eventProcessLoop,不同的对象向它post事件,然后在它的onReceive循环中会依次对这些事件调用处理函数。

我们需要注意的是partitions不同于我们传入的rdd.partitions,前者是一个Array[Int],后者是一个Array[Partition]。并且在逻辑意义上,前者表示需要计算的 partition,对于如 first 之类的 Action 操作来说,它只是 rdd 的所有 partition 的一个子集,我们将在稍后的submitMissingTasks函数中继续看到这一点。

def runJob[T, U](... "T, U"): Unit = {  val start = System.nanoTime  val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)  // 下面就是在等了  ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)  waiter.completionFuture.value.get match {    case scala.util.Success(_) =>      logInfo("Job %d finished: %s, took %f s".format        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))    case scala.util.Failure(exception) =>      logInfo("Job %d failed: %s, took %f s".format        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))      // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.      val callerStackTrace = Thread.currentThread().getStackTrace.tail      exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)      throw exception  }}def submitJob[T, U](    rdd: RDD[T], // target RDD to run tasks on,就是被执行count的RDD    func: (TaskContext, Iterator[T]) => U, // 在RDD每一个partition上需要跑的函数    partitions: Seq[Int],    callSite: CallSite, // 被调用的位置    resultHandler: (Int, U) => Unit,    properties: Properties): JobWaiter[U] = {  // 检查是否在一个不存在的分区上创建一个Task  val maxPartitions = rdd.partitions.length  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>    throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions)}  // jobId是从后往前递增的  val jobId = nextJobId.getAndIncrement()  if (partitions.isEmpty) {    val time = clock.getTimeMillis()    // listenerBus是一个LiveListenerBus对象,从DAGScheduler构造时得到,用来做event log    // SparkListenerJobStart定义在SparkListener.scala文件中    listenerBus.post(SparkListenerJobStart(jobId, time, Seq[StageInfo]( "StageInfo"), SerializationUtils.clone(properties)))    listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded))    // 如果partitions是空的,那么就直接返回    return new JobWaiter[U](this, jobId, 0, resultHandler "U")  }  assert(partitions.nonEmpty)  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]  val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler "U")  // 我们向eventProcessLoop提交一个JobSubmitted事件  eventProcessLoop.post(JobSubmitted(    jobId, rdd, func2, partitions.toArray, callSite, waiter,    SerializationUtils.clone(properties)))  waiter}// DAGSchedulerEvent.scalaprivate[scheduler] case class JobSubmitted(    jobId: Int,    finalRDD: RDD[_],    func: (TaskContext, Iterator[_]) => _,    partitions: Array[Int],    callSite: CallSite,    listener: JobListener,    properties: Properties = null)  extends DAGSchedulerEvent

下面我们具体看看对JobSubmitted的响应

// DAGScheduler.scalaprivate[scheduler] def handleJobSubmitted(...) {  var finalStage: ResultStage = null  // 首先我们尝试创建一个`finalStage: ResultStage`,这是整个Job的最后一个Stage。  try {    // func: (TaskContext, Iterator[_]) => _    // 下面的语句是可能抛BarrierJobSlotsNumberCheckFailed或者其他异常的,    // 例如一个HadoopRDD所依赖的HDFS文件被删除了    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)  } catch {  ...// DAGScheduler.scalaprivate def createResultStage(...): ResultStage = {  checkBarrierStageWithDynamicAllocation(rdd)  checkBarrierStageWithNumSlots(rdd)  checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)  val parents = getOrCreateParentStages(rdd, jobId)  val id = nextStageId.getAndIncrement()  val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)  stageIdToStage(id) = stage  updateJobIdStageIdMaps(jobId, stage)  stage}

这里createResultStage所返回的ResultStage继承了Stage类。Stage类有个rdd参数,对ResultStage而言就是finalRDD,对ShuffleMapStage而言就是ShuffleDependency.rdd

// DAGScheduler.scaladef createShuffleMapStage[K, V, C](    shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {  val rdd = shuffleDep.rdd  ...

下面我们来看看checkBarrierStageWithNumSlots这个函数,因为它会抛出BarrierJobSlotsNumberCheckFailed这个异常,被handleJobSubmitted捕获。这个函数主要是为了检测是否有足够的 slots 去运行所有的 barrier task。屏障调度器是 Spark 为了支持深度学习在 2.4.0 版本所引入的一个特性。它要求在 barrier stage 中同时启动所有的 Task,当任意的 task 执行失败的时候,总是重启整个 barrier stage。这么麻烦是因为 Spark 希望能够在 Task 中提供一个 barrier 以供显式同步。

// DAGScheduler.scalaprivate def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = {  val numPartitions = rdd.getNumPartitions  val maxNumConcurrentTasks = sc.maxNumConcurrentTasks  if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) {    throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks)  }}// DAGScheduler.scala  ...    case e: BarrierJobSlotsNumberCheckFailed =>      // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.      // barrierJobIdToNumTasksCheckFailures是一个ConcurrentHashMap,表示对每个BarrierJob上失败的Task数量      val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,        (_: Int, value: Int) => value + 1)      ...      if (numCheckFailures <= maxFailureNumTasksCheck) {        messageScheduler.schedule(          new Runnable {            override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,              partitions, callSite, listener, properties))          },          timeIntervalNumTasksCheck,          TimeUnit.SECONDS        )        return      } else {        // Job failed, clear internal data.        barrierJobIdToNumTasksCheckFailures.remove(jobId)        listener.jobFailed(e)        return      }    case e: Exception =>      logWarning("Creating new stage failed due to exception - job: " + jobId, e)      listener.jobFailed(e)      return  }  // Job submitted, clear internal data.  barrierJobIdToNumTasksCheckFailures.remove(jobId)  ...

下面开始创建 Job。ActiveJob表示在DAGScheduler里面运行的一个 Job。

Job 只负责向“叶子”Stage 要结果,而之前 Stage 的运行是由DAGScheduler来调度的。这是因为若干 Job 可能共用同一个 Stage 的计算结果,我这样说的根据是在 Stage 类的定义中的jobIds字段是一个HashSet,也就是说它可以属于多个 Job。所以将某个 Stage 强行归属到某个 Job 是不符合 Spark 设计逻辑的。

// DAGScheduler.scala  ...  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)  clearCacheLocs()  // 在这里会打印四条日志,这个可以被用来在Spark.log里面定位事件  logInfo("Got job %s (%s) with %d output partitions".format(    job.jobId, callSite.shortForm, partitions.length))  logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")  logInfo("Parents of final stage: " + finalStage.parents)  logInfo("Missing parents: " + getMissingParentStages(finalStage))  ...  val stageIds = jobIdToStageIds(jobId).toArray  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))  listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))  // 从最后一个stage开始调用submitStage  submitStage(finalStage)}

Stage 阶段

Stage 是如何划分的呢?又是如何计算 Stage 之间的依赖的?我们继续查看submitStage这个函数,对于一个 Stage,首先调用getMissingParentStages看看它的父 Stage 能不能直接用,也就是说这个 Stage 的 rdd 所依赖的所有父 RDD 能不能直接用,如果不行的话,就要先算父 Stage 的。在前面的论述里,我们知道,若干 Job 可能共用同一个 Stage 的计算结果,而不同的 Stage 也可能依赖同一个 RDD。

private def submitStage(stage: Stage) {    // 找到这个stage所属的job  val jobId = activeJobForStage(stage)  if (jobId.isDefined) {    logDebug("submitStage(" + stage + ")")    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {      // 如果依赖之前的Stage,先列出来,并且按照id排序      val missing = getMissingParentStages(stage).sortBy(_.id)      logDebug("missing: " + missing)      if (missing.isEmpty) {          // 运行这个Stage        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")        submitMissingTasks(stage, jobId.get)      } else {          // 先提交所有的parent stage        for (parent <- missing) {          submitStage(parent)        }        waitingStages += stage      }    }  } else {    abortStage(stage, "No active job for stage " + stage.id, None)  }}

下面具体查看getMissingParentStages这个函数,可以看到,Stage 的计算链是以最后一个 RDD 为树根逆着向上遍历得到的,而这个链条的终点要么是一个ShuffleDependency,要么是一个所有分区都被缓存了的 RDD。

private def getMissingParentStages(stage: Stage): List[Stage] = {  val missing = new HashSet[Stage]  val visited = new HashSet[RDD[_]]  val waitingForVisit = new ListBuffer[RDD[_]]  // 这里是个**DFS**,栈是手动维护的,主要是为了防止爆栈  waitingForVisit += stage.rdd  def visit(rdd: RDD[_]): Unit = {    if (!visited(rdd)) {      visited += rdd      val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)      if (rddHasUncachedPartitions) {        // 如果这个RDD有没有被缓存的Partition,那么它就需要被计算        for (dep <- rdd.dependencies) {          // 我们检查这个RDD的所有依赖          dep match {            case shufDep: ShuffleDependency[_, _, _] =>              // 我们发现一个宽依赖,因此我们创建一个新的Shuffle Stage,并加入到missing中(如果不存在)              // 由于是宽依赖,所以我们不需要向上找了              val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)              if (!mapStage.isAvailable) {                missing += mapStage              }            case narrowDep: NarrowDependency[_] =>              // 如果是一个窄依赖,就加入到waitingForVisit中              // prepend是在头部加,+=是在尾部加              waitingForVisit.prepend(narrowDep.rdd)          }        }      }    }  }  while (waitingForVisit.nonEmpty) {    visit(waitingForVisit.remove(0))  }  missing.toList}

Task 阶段

下面是重头戏submitMissingTasks,这个方法负责生成 TaskSet,并且将它提交给 TaskScheduler 低层调度器。

partitionsToCompute计算有哪些分区是待计算的。根据 Stage 类型的不同,findMissingPartitions的计算方法也不同。

// DAGScheduler.scalaprivate def submitMissingTasks(stage: Stage, jobId: Int) {  logDebug("submitMissingTasks(" + stage + ")")  // First figure out the indexes of partition ids to compute.  val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()  ...// ResultStage.scalaoverride def findMissingPartitions(): Seq[Int] = {  val job = activeJob.get  (0 until job.numPartitions).filter(id => !job.finished(id))}// ActiveJob.scalaval numPartitions = finalStage match {  // 对于ResultStage,不一定得到当前rdd的所有分区,例如first()和lookup()的Action,  // 因此这里是r.partitions而不是r.rdd.partitions  case r: ResultStage => r.partitions.length  case m: ShuffleMapStage => m.rdd.partitions.length}// ShuffleMapStage.scalaoverride def findMissingPartitions(): Seq[Int] = {  mapOutputTrackerMaster    .findMissingPartitions(shuffleDep.shuffleId)    .getOrElse(0 until numPartitions)}// MapOutputTrackerMaster.scaladef findMissingPartitions(shuffleId: Int): Option[Seq[Int]] = {  shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())}

这个outputCommitCoordinator是由SparkEnv维护的OutputCommitCoordinator对象,它决定到底谁有权利向输出写数据。在 Executor 上的请求会通过他持有的 Driver 的OutputCommitCoordinatorEndpoint的引用发送给 Driver 处理

// DAGScheduler.scala  ...  // Use the scheduling pool, job group, description, etc. from an ActiveJob associated  // with this Stage  val properties = jobIdToActiveJob(jobId).properties  runningStages += stage  // 在检测Tasks是否serializable之前,就要SparkListenerStageSubmitted,  // 如果不能serializable,那就在这**之后**给一个SparkListenerStageCompleted  stage match {    case s: ShuffleMapStage =>      outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)    case s: ResultStage =>      outputCommitCoordinator.stageStart(        stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)  }  ...

getPreferredLocs计算每个分区的最佳计算位置,它实际上是调用getPreferredLocsInternal这个函数。这个函数是一个关于visit: HashSet[(RDD[_], Int)]的递归函数,visit 用(rdd, partition)元组唯一描述一个分区。getPreferredLocs的计算逻辑是这样的:

  1. 如果已经 visit 过了,就返回 Nil
  2. 如果是被 cached 的,通过getCacheLocs返回 cache 的位置
  3. 如果 RDD 有自己的偏好位置,例如输入 RDD,那么使用rdd.preferredLocations返回它的偏好位置
  4. 如果还没返回,但 RDD 有窄依赖,那么遍历它的所有依赖项,返回第一个具有位置偏好的依赖项的值

理论上,一个最优的位置选取应该尽可能靠近数据源以减少网络传输,但目前版本的 Spark 还没有实现

// DAGScheduler.scala  ...  val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {    stage match {      case s: ShuffleMapStage =>        partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap      case s: ResultStage =>        partitionsToCompute.map { id =>          val p = s.partitions(id)          (id, getPreferredLocs(stage.rdd, p))        }.toMap    }  } catch {    case NonFatal(e) =>      // 如果有非致命异常就创建一个新的Attempt,并且abortStage(这还不致命么)      stage.makeNewStageAttempt(partitionsToCompute.size)      listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))      abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))      runningStages -= stage      return  }  ...

下面,我们开始 attempt 这个 Stage,我们需要将 RDD 对象和依赖通过closureSerializer序列化成taskBinaryBytes,然后广播得到taskBinary。当广播变量过大时,会产生一条Broadcasting large task binary with size的 INFO。

// DAGScheduler.scala  ...  stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)  // 如果没有Task要执行,实际上就是skip了,那么就没有Submission Time这个字段  if (partitionsToCompute.nonEmpty) {    stage.latestInfo.submissionTime = Some(clock.getTimeMillis())  }  listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))  // TODO: 也许可以将`taskBinary`放到Stage里面以避免对它序列化多次。  // 一堆注释看不懂  var taskBinary: Broadcast[Array[Byte]] = null  var partitions: Array[Partition] = null  try {    var taskBinaryBytes: Array[Byte] = null    // taskBinaryBytes and partitions are both effected by the checkpoint status. We need    // this synchronization in case another concurrent job is checkpointing this RDD, so we get a    // consistent view of both variables.    RDDCheckpointData.synchronized {      taskBinaryBytes = stage match {        case stage: ShuffleMapStage =>          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))        case stage: ResultStage =>          // 注意这里的stage.func已经被ClosureCleaner清理过了          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))      }      partitions = stage.rdd.partitions    }    ...    // 广播    taskBinary = sc.broadcast(taskBinaryBytes)  } catch {    // In the case of a failure during serialization, abort the stage.    case e: NotSerializableException =>      abortStage(stage, "Task not serializable: " + e.toString, Some(e))      runningStages -= stage    ...  }

下面,我们根据 Stage 的类型生成 Task。

// DAGScheduler.scala  ...  val tasks: Seq[Task[_]] = try {    val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()    stage match {      case stage: ShuffleMapStage =>        stage.pendingPartitions.clear()        partitionsToCompute.map { id =>          val locs = taskIdToLocations(id)          val part = partitions(id)          stage.pendingPartitions += id          new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,            taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),            Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())        }      case stage: ResultStage =>        partitionsToCompute.map { id =>          val p: Int = stage.partitions(id)          val part = partitions(p)          val locs = taskIdToLocations(id)          new ResultTask(stage.id, stage.latestInfo.attemptNumber,            taskBinary, part, locs, id, properties, serializedTaskMetrics,            Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,            stage.rdd.isBarrier())        }    }  } catch {    ...  }

我们将生成的tasks包装成一个TaskSet,并且提交给taskScheduler

// DAGScheduler.scala  ...  if (tasks.nonEmpty) {    logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +      s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")    taskScheduler.submitTasks(new TaskSet(      tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))  } else {

如果 tasks 是空的,说明任务就已经完成了,打上 DEBUG 日志,并且调用submitWaitingChildStages

    // Because we posted SparkListenerStageSubmitted earlier, we should mark    // the stage as completed here in case there are no tasks to run    markStageAsFinished(stage, None)    stage match {      case stage: ShuffleMapStage =>        logDebug(s"Stage ${stage} is actually done; " +            s"(available: ${stage.isAvailable}," +            s"available outputs: ${stage.numAvailableOutputs}," +            s"partitions: ${stage.numPartitions})")        markMapStageJobsAsFinished(stage)      case stage : ResultStage =>        logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")    }    submitWaitingChildStages(stage)  }}
    



推荐阅读:


更多腾讯AI相关技术干货,请关注专栏腾讯技术工程
推荐阅读
关注数
8146
内容数
225
腾讯AI,物联网等相关技术干货,欢迎关注
目录
极术微信服务号
关注极术微信号
实时接收点赞提醒和评论通知
安谋科技学堂公众号
关注安谋科技学堂
实时获取安谋科技及 Arm 教学资源
安谋科技招聘公众号
关注安谋科技招聘
实时获取安谋科技中国职位信息