带你看懂Spark2.x源码之Task分配算法

上篇博客中写了关于Stage划分(Stage划分源码),这次把我对Task分配算法的理解给大家分享一下。

点击上篇博客中最后那个代码的TaskScheduler.submitTasks,会发现这里定义了一个方法,但并没有实现

def submitTasks(taskSet: TaskSet): Unit

点击进去,并选择第三个
在这里插入图片描述
接下来继续点击super后面的submitTasks
在这里插入图片描述
查看代码会发现,在这里针对每一个TaskSet都生成了一个TaskSetManager,这里的TaskSetManager就如同一个保姆一样,它会去管理每个Task,并帮助Task去完成之后的Task分配,以及如果Task提交失败的话,它也会将Task重新提交。

override def submitTasks(taskSet: TaskSet) {
    //存到Task数组中
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      // 创建TaskSetManager,它会跟踪每个task,如果有失败的task就根据重试次数重新提交
      // 还包括计算数据本地化,构建TaskDescription等
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      //下面一系列操作就是判断Task是否有Task任务进来
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      // TaskSet有冲突情况抛错(也就是Task任务执行失败)
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      // 默认使用的是FIFO调度器(另一种是FAIR公平调度器)
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          //在这里会调用Task Run这个方法执行任务
          override def run() {
            // 如果没运行说明集群的资源被占用 还没空闲出来
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              // 如果提交的Task运行了 就关闭这个Timer线程
              // 释放资源
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    //开始匹配资源,注意这个Backend就是向Master提交Application的,它是SparkContext创建时,就创建出来的CoarseGrainedSchedulerBackend
    backend.reviveOffers()
  }

当我们点击reviveOffers时,会发现它也是个抽象方法,而且它是在SchedulerBackend类中

  def reviveOffers(): Unit

点击SchedulerBackend可以看到CoarseGrainedSchedulerBackend,正好是之前提及到的Backend
在这里插入图片描述
在CoarseGrainedSchedulerBackend按两个shift搜索makeoffers,这里makeOffers方法先过滤掉死亡的executor,然后利用resourceOffers的方法执行任务分配算法,将各个task分配到executor上。在分配好后,再执行launchTasks将分配好的task发送TaskLaunch消息到对应的executor上,由executor执行task。

  private def makeOffers() {
      // Make sure no executor is killed while some task is launching on it
      //确保没有executor节点死亡当有task传输到它上面时
      val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
        // Filter out executors under killing
        //过滤掉死亡的executors,剩下的为活跃的executors
        val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
        val workOffers = activeExecutors.map { case (id, executorData) =>
          new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
        }.toIndexedSeq
        scheduler.resourceOffers(workOffers)
      }
      if (!taskDescs.isEmpty) {
        launchTasks(taskDescs)
      }
    }

接下来就是进行到最关键的Task分配了,因为spark就本着移动数据不如移动计算的概念,所以在对Task进行划分时,会根据一些Task的运行数据而分配Task到对应的executor上。这个方法传入的参数是WorkOffers,从上个方法中可以看到WorkOffers的参数是封装了executor的资源,所以它会根据task和executor的资源去分配。在这里由于该方法的代码太长了,我们可以分开来看,第一部分为计算它的本地级别,第二部分为给task找到对应的executor去运行。

 def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    // 这里会标记每个executor和host的关系,executor和正在运行的Task的关系
    var newExecAvail = false
    for (o <- offers) {
      // hostToExecutors里维护着每个节点上已经激活的所有的executor
      if (!hostToExecutors.contains(o.host)) {
        hostToExecutors(o.host) = new HashSet[String]()
      }
      // executorIdToRunningTaskIds里维护着每个executor中运行中的每个task
      if (!executorIdToRunningTaskIds.contains(o.executorId)) {
        hostToExecutors(o.host) += o.executorId
        executorAdded(o.executorId, o.host)
        executorIdToHost(o.executorId) = o.host
        executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
    // this here to avoid a separate thread and added synchronization overhead, and also because
    // updating the blacklist is only relevant when task offers are being made.
    // 在这里它会过滤掉黑名单中的过期节点
    blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

    val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
      offers.filter { offer =>
        !blacklistTracker.isNodeBlacklisted(offer.host) &&
          !blacklistTracker.isExecutorBlacklisted(offer.executorId)
      }
    }.getOrElse(offers)
	//在这里会对过滤出来的WorkOffers进行shuffle过程,为了是负载均衡
    val shuffledOffers = shuffleOffers(filteredOffers)
    // Build a list of tasks to assign to each worker.
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    // 这里会根据不同的调度模式对TaskSetManager匹配不同的调度算法
    //主要是从调度池中取出排好序的TaskSet,然后针对每个TaskSet去进行操作
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        // 里面会调用TaskSetManager本地级别的分配算法,
        // 为每个task分配计算本地级别的等级
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    // 把所有TaskSet按照本地级别的顺序开始分配到各个节点
    // 任务分配算法的核心!!!
    //这里会进行双重for循环,对每个taskSet以及它的所有本地级别
    for (taskSet <- sortedTaskSets) {
      var launchedAnyTask = false
      var launchedTaskAtCurrentMaxLocality = false
      // 遍历当前taskSet的所有本地级别
      for (currentMaxLocality <- taskSet.myLocalityLevels) {
        do {
        //在TaskSetManager拿到了所有本地级别后调用resourceOfferSingleTaskSet
          // 里面会为每个task创建TaskDescription,用来后面启动task用
          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
            taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
        } while (launchedTaskAtCurrentMaxLocality)
      }
      if (!launchedAnyTask) {
        taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
      }
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    // 返回封装着Task的数组
    return tasks
  }

第一部分可以点击 taskSet.executorAdded的方法,它调用了recomputeLocality方法

 def executorAdded() {
    recomputeLocality()
  }
}

点进去recomputeLocality,这里是对计算本地化的方法的封装,里面分别调用调用了computeValidLocalityLevels用来计算位置级别,getLocalityIndex来根据位置级别更新索引,最后返回位置的索引级别也就是本地化等级,getLocalityWait而这个方法正是规定了等待三秒如果获取不到资源就增加一个级别

 def recomputeLocality() {
    // currentLocalityIndex为当前本地级别,默认从0开始,
    // 如果不匹配或者等待资源时间(3s)超时的话级别+1
    // 这里返回的是当前级别的level加上ANY级别的数组,
    // 所以这里拿到的是索引为0的元素->当前级别的level
    val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
    myLocalityLevels = computeValidLocalityLevels()
    // 这里拿到的对应级别的等待时间
    // 若在规定时间内 资源还是被占用 就把本地级别+1
    localityWaits = myLocalityLevels.map(getLocalityWait)
    // 提取到当前level的索引,并更新进currentLocalityIndex
    currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
  }

根据computeValidLocalityLevels方法,可以看到这是计算本地化的算法,本地化会分为五个级别,从低到高为:1.进程本地化:也就是计算数据在同一个executor的内存中;2.节点本地化:计算数据在同一个节点中;3.无本地化,计算数据在关系型数据库中,所以无论哪个节点都可以;4.机架本地化:计算数据在同一机架的不同节点上;5.任何:就是在集群中的任何一个节点上都行,这个是最高级别,其他级别都不行的时候会使用这个。

private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
    // 分为五种级别,下面条件满足的都会添加到levels中
    import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
    val levels = new ArrayBuffer[TaskLocality.TaskLocality]
    if (!pendingTasksForExecutor.isEmpty &&
        pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
      levels += PROCESS_LOCAL
    }
    if (!pendingTasksForHost.isEmpty &&
        pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
      levels += NODE_LOCAL
    }
    if (!pendingTasksWithNoPrefs.isEmpty) {
      levels += NO_PREF
    }
    if (!pendingTasksForRack.isEmpty &&
        pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
      levels += RACK_LOCAL
    }
    // 最后还会加入ANY级别进去
    levels += ANY
    logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
    levels.toArray
  }

接下来我们回到第二部分,也就是去将拿到本地化级别的task分配到对应的executor中,以下就是resourceOffers中的那段代码,我们会进行双重for循环,这里外层for循环还是取出之前调度池排好顺序的TaskSet,然后针对每个TaskSet进行for循环遍历它其中的所有本地级别,拿到本地级别后会调用resourceOfferSingleTaskSet。

 for (taskSet <- sortedTaskSets) {
      var launchedAnyTask = false
      var launchedTaskAtCurrentMaxLocality = false
      // 遍历当前taskSet的所有本地级别
      for (currentMaxLocality <- taskSet.myLocalityLevels) {
        do {
        //在TaskSetManager拿到了所有本地级别后调用resourceOfferSingleTaskSet
          // 里面会为每个task创建TaskDescription,用来后面启动task用
          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
            taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
        } while (launchedTaskAtCurrentMaxLocality)
      }
      if (!launchedAnyTask) {
        taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
      }
    }

resourceOfferSingleTaskSet中,针对每个TaskSet又调用了resourceOffe方法,用于找到可以启动task的executor来启动task。

 private def resourceOfferSingleTaskSet(
      taskSet: TaskSetManager,
      maxLocality: TaskLocality,
      shuffledOffers: Seq[WorkerOffer],
      availableCpus: Array[Int],
      tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
    var launchedTask = false
    // nodes and executors that are blacklisted for the entire application have already been
    // filtered out by this point
    //遍历每个executor
    for (i <- 0 until shuffledOffers.size) {
      val execId = shuffledOffers(i).executorId
      val host = shuffledOffers(i).host
      //当前的cpu数量大于等于每个task需要的cpu数量,默认为1
      if (availableCpus(i) >= CPUS_PER_TASK) {
        try {
          // resourceOffer主要用来对每个task做标记,最后返回每个task的TaskDescription
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
            tasks(i) += task
            val tid = task.taskId
            taskIdToTaskSetManager(tid) = taskSet
            taskIdToExecutorId(tid) = execId
            executorIdToRunningTaskIds(execId).add(tid)
            availableCpus(i) -= CPUS_PER_TASK
            assert(availableCpus(i) >= 0)
            launchedTask = true
          }
        } catch {
          case e: TaskNotSerializableException =>
            logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
            // Do not offer resources for this task, but don't throw an error to allow other
            // task sets to be submitted.
            return launchedTask
        }
      }
    }
    return launchedTask
  }

调用resourceOffer的方法主要用来将task序列化,并封装成TaskDescription,用于最后的launchTask方法提交Task。

def resourceOffer(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription] =
  {
    val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
      blacklist.isNodeBlacklistedForTaskSet(host) ||
        blacklist.isExecutorBlacklistedForTaskSet(execId)
    }
    if (!isZombie && !offerBlacklisted) {
      // 获取当前时间
      val curTime = clock.getTimeMillis()
      // 当前最优的Task本地化级别
      var allowedLocality = maxLocality
      // 如果级别不是NO_PREF
      if (maxLocality != TaskLocality.NO_PREF) {
        // 这里会拿到这个task其他可用的本地级别
        allowedLocality = getAllowedLocalityLevel(curTime)
        if (allowedLocality > maxLocality) {
          // We're not allowed to search for farther-away tasks
          allowedLocality = maxLocality
        }
      }

      dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
        // Found a task; do some bookkeeping and return a task description
        // 获取task
        val task = tasks(index)
        val taskId = sched.newTaskId()
        // Do various bookkeeping
        copiesRunning(index) += 1
        val attemptNum = taskAttempts(index).size
        // 生成一个TaskInfo 里面注入了这个task的所有元数据
        val info = new TaskInfo(taskId, index, attemptNum, curTime,
          execId, host, taskLocality, speculative)
        // taskInfos维护的是每个task对应的元数据信息
        taskInfos(taskId) = info
        taskAttempts(index) = info :: taskAttempts(index)
        // Update our locality level for delay scheduling
        // NO_PREF will not affect the variables related to delay scheduling
        if (maxLocality != TaskLocality.NO_PREF) {
          currentLocalityIndex = getLocalityIndex(taskLocality)
          lastLaunchTime = curTime
        }
        // Serialize and return the task
        // 序列化这个Task
        val serializedTask: ByteBuffer = try {
          ser.serialize(task)
        } catch {
          // If the task cannot be serialized, then there's no point to re-attempt the task,
          // as it will always fail. So just abort the whole task-set.
          case NonFatal(e) =>
            val msg = s"Failed to serialize task $taskId, not attempting to retry it."
            logError(msg, e)
            abort(s"$msg Exception during serialization: $e")
            throw new TaskNotSerializableException(e)
        }
        if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
          !emittedTaskSizeWarning) {
          emittedTaskSizeWarning = true
          logWarning(s"Stage ${task.stageId} contains a task of very large size " +
            s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
            s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
        }
        // 加入到调度池的runningTasks和TaskSetMananger的runningTasksSet
        addRunningTask(taskId)

        // We used to log the time it takes to serialize the task, but task size is already
        // a good proxy to task serialization time.
        // val timeTaken = clock.getTime() - startTime
        val taskName = s"task ${info.id} in stage ${taskSet.id}"
        logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
          s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)")

        sched.dagScheduler.taskStarted(task, info)
        // 生成一个TaskDescription
        // 标记着这个task在那个host的哪个executor执行
        // 以及需要添加到executor的Classpath上的所有Jar包和File
        new TaskDescription(
          taskId,
          attemptNum,
          execId,
          taskName,
          index,
          sched.sc.addedFiles,
          sched.sc.addedJars,
          task.localProperties,
          serializedTask)
      }
    } else {
      None
    }
  }

最后会去执行CoarseGrainedSchedulerBackend类的makeOffers方法去提交task,这时会使用launchTasks,里面传的参数就是之前得到的TaskDescription,会根据之前分配好的情况,去executor上启动相应的task。

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
  //首先对每个executor需要执行的task消息序列化一下,可以在网络间进行传输
    val serializedTask = TaskDescription.encode(task)
    if (serializedTask.limit >= maxRpcMessageSize) {
      scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
        try {
          var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
            "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
            "spark.rpc.message.maxSize or using broadcast variables for large values."
          msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
          taskSetMgr.abort(msg)
        } catch {
          case e: Exception => logError("Exception in error callback", e)
        }
      }
    }
    else {
    //根据task消息中的executorId找到运行的executor
      val executorData = executorDataMap(task.executorId)
      //并将executor空余的core数减去自身需要的core数
      executorData.freeCores -= scheduler.CPUS_PER_TASK

      logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
        s"${executorData.executorHost}.")
//向executor发送LaunchTask消息,用于在对应executor上启动task
      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}

这样的话,Task分配算法就已经分析完了。我感觉在Spark源码中最重要的就是Stage划分算法和Task分配算法了,所以将我自己对源码的一些分析跟大家分享一下。由于本人也不是什么大牛,如果哪点分析错误,希望大家可以给我指出来,互相学习。
需要注意的是:我这次分析的是2.2的源码,与网上流传很多的1.6源码在Stage划分以及Task分配上有一些方法发生了改动。而我自己之前也对1.6源码有所学习,所以希望大家在学习过程中需要谨慎小心,不要带着1.6源码的思想来看2.2源码,重新学习,收获更多!

							compiled up by JiaMingcan
							转载请署名:JiaMingcan

版权声明:本文为qq_41571900原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。