上篇博客中写了关于Stage划分(Stage划分源码),这次把我对Task分配算法的理解给大家分享一下。
点击上篇博客中最后那个代码的TaskScheduler.submitTasks,会发现这里定义了一个方法,但并没有实现
def submitTasks(taskSet: TaskSet): Unit
点击进去,并选择第三个
接下来继续点击super后面的submitTasks
查看代码会发现,在这里针对每一个TaskSet都生成了一个TaskSetManager,这里的TaskSetManager就如同一个保姆一样,它会去管理每个Task,并帮助Task去完成之后的Task分配,以及如果Task提交失败的话,它也会将Task重新提交。
override def submitTasks(taskSet: TaskSet) {
//存到Task数组中
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
// 创建TaskSetManager,它会跟踪每个task,如果有失败的task就根据重试次数重新提交
// 还包括计算数据本地化,构建TaskDescription等
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
//下面一系列操作就是判断Task是否有Task任务进来
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
// TaskSet有冲突情况抛错(也就是Task任务执行失败)
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
// 默认使用的是FIFO调度器(另一种是FAIR公平调度器)
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
//在这里会调用Task Run这个方法执行任务
override def run() {
// 如果没运行说明集群的资源被占用 还没空闲出来
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
// 如果提交的Task运行了 就关闭这个Timer线程
// 释放资源
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
//开始匹配资源,注意这个Backend就是向Master提交Application的,它是SparkContext创建时,就创建出来的CoarseGrainedSchedulerBackend
backend.reviveOffers()
}
当我们点击reviveOffers时,会发现它也是个抽象方法,而且它是在SchedulerBackend类中
def reviveOffers(): Unit
点击SchedulerBackend可以看到CoarseGrainedSchedulerBackend,正好是之前提及到的Backend
在CoarseGrainedSchedulerBackend按两个shift搜索makeoffers,这里makeOffers方法先过滤掉死亡的executor,然后利用resourceOffers的方法执行任务分配算法,将各个task分配到executor上。在分配好后,再执行launchTasks将分配好的task发送TaskLaunch消息到对应的executor上,由executor执行task。
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
//确保没有executor节点死亡当有task传输到它上面时
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
// Filter out executors under killing
//过滤掉死亡的executors,剩下的为活跃的executors
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
}
}
接下来就是进行到最关键的Task分配了,因为spark就本着移动数据不如移动计算的概念,所以在对Task进行划分时,会根据一些Task的运行数据而分配Task到对应的executor上。这个方法传入的参数是WorkOffers,从上个方法中可以看到WorkOffers的参数是封装了executor的资源,所以它会根据task和executor的资源去分配。在这里由于该方法的代码太长了,我们可以分开来看,第一部分为计算它的本地级别,第二部分为给task找到对应的executor去运行。
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
// 这里会标记每个executor和host的关系,executor和正在运行的Task的关系
var newExecAvail = false
for (o <- offers) {
// hostToExecutors里维护着每个节点上已经激活的所有的executor
if (!hostToExecutors.contains(o.host)) {
hostToExecutors(o.host) = new HashSet[String]()
}
// executorIdToRunningTaskIds里维护着每个executor中运行中的每个task
if (!executorIdToRunningTaskIds.contains(o.executorId)) {
hostToExecutors(o.host) += o.executorId
executorAdded(o.executorId, o.host)
executorIdToHost(o.executorId) = o.host
executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
// this here to avoid a separate thread and added synchronization overhead, and also because
// updating the blacklist is only relevant when task offers are being made.
// 在这里它会过滤掉黑名单中的过期节点
blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
offers.filter { offer =>
!blacklistTracker.isNodeBlacklisted(offer.host) &&
!blacklistTracker.isExecutorBlacklisted(offer.executorId)
}
}.getOrElse(offers)
//在这里会对过滤出来的WorkOffers进行shuffle过程,为了是负载均衡
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
// 这里会根据不同的调度模式对TaskSetManager匹配不同的调度算法
//主要是从调度池中取出排好序的TaskSet,然后针对每个TaskSet去进行操作
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
// 里面会调用TaskSetManager本地级别的分配算法,
// 为每个task分配计算本地级别的等级
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
// 把所有TaskSet按照本地级别的顺序开始分配到各个节点
// 任务分配算法的核心!!!
//这里会进行双重for循环,对每个taskSet以及它的所有本地级别
for (taskSet <- sortedTaskSets) {
var launchedAnyTask = false
var launchedTaskAtCurrentMaxLocality = false
// 遍历当前taskSet的所有本地级别
for (currentMaxLocality <- taskSet.myLocalityLevels) {
do {
//在TaskSetManager拿到了所有本地级别后调用resourceOfferSingleTaskSet
// 里面会为每个task创建TaskDescription,用来后面启动task用
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
// 返回封装着Task的数组
return tasks
}
第一部分可以点击 taskSet.executorAdded的方法,它调用了recomputeLocality方法
def executorAdded() {
recomputeLocality()
}
}
点进去recomputeLocality,这里是对计算本地化的方法的封装,里面分别调用调用了computeValidLocalityLevels用来计算位置级别,getLocalityIndex来根据位置级别更新索引,最后返回位置的索引级别也就是本地化等级,getLocalityWait而这个方法正是规定了等待三秒如果获取不到资源就增加一个级别
def recomputeLocality() {
// currentLocalityIndex为当前本地级别,默认从0开始,
// 如果不匹配或者等待资源时间(3s)超时的话级别+1
// 这里返回的是当前级别的level加上ANY级别的数组,
// 所以这里拿到的是索引为0的元素->当前级别的level
val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
myLocalityLevels = computeValidLocalityLevels()
// 这里拿到的对应级别的等待时间
// 若在规定时间内 资源还是被占用 就把本地级别+1
localityWaits = myLocalityLevels.map(getLocalityWait)
// 提取到当前level的索引,并更新进currentLocalityIndex
currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
}
根据computeValidLocalityLevels方法,可以看到这是计算本地化的算法,本地化会分为五个级别,从低到高为:1.进程本地化:也就是计算数据在同一个executor的内存中;2.节点本地化:计算数据在同一个节点中;3.无本地化,计算数据在关系型数据库中,所以无论哪个节点都可以;4.机架本地化:计算数据在同一机架的不同节点上;5.任何:就是在集群中的任何一个节点上都行,这个是最高级别,其他级别都不行的时候会使用这个。
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
// 分为五种级别,下面条件满足的都会添加到levels中
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
levels += PROCESS_LOCAL
}
if (!pendingTasksForHost.isEmpty &&
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
if (!pendingTasksWithNoPrefs.isEmpty) {
levels += NO_PREF
}
if (!pendingTasksForRack.isEmpty &&
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
}
// 最后还会加入ANY级别进去
levels += ANY
logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
levels.toArray
}
接下来我们回到第二部分,也就是去将拿到本地化级别的task分配到对应的executor中,以下就是resourceOffers中的那段代码,我们会进行双重for循环,这里外层for循环还是取出之前调度池排好顺序的TaskSet,然后针对每个TaskSet进行for循环遍历它其中的所有本地级别,拿到本地级别后会调用resourceOfferSingleTaskSet。
for (taskSet <- sortedTaskSets) {
var launchedAnyTask = false
var launchedTaskAtCurrentMaxLocality = false
// 遍历当前taskSet的所有本地级别
for (currentMaxLocality <- taskSet.myLocalityLevels) {
do {
//在TaskSetManager拿到了所有本地级别后调用resourceOfferSingleTaskSet
// 里面会为每个task创建TaskDescription,用来后面启动task用
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}
resourceOfferSingleTaskSet中,针对每个TaskSet又调用了resourceOffe方法,用于找到可以启动task的executor来启动task。
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
// nodes and executors that are blacklisted for the entire application have already been
// filtered out by this point
//遍历每个executor
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
//当前的cpu数量大于等于每个task需要的cpu数量,默认为1
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
// resourceOffer主要用来对每个task做标记,最后返回每个task的TaskDescription
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToRunningTaskIds(execId).add(tid)
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
return launchedTask
}
调用resourceOffer的方法主要用来将task序列化,并封装成TaskDescription,用于最后的launchTask方法提交Task。
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
blacklist.isNodeBlacklistedForTaskSet(host) ||
blacklist.isExecutorBlacklistedForTaskSet(execId)
}
if (!isZombie && !offerBlacklisted) {
// 获取当前时间
val curTime = clock.getTimeMillis()
// 当前最优的Task本地化级别
var allowedLocality = maxLocality
// 如果级别不是NO_PREF
if (maxLocality != TaskLocality.NO_PREF) {
// 这里会拿到这个task其他可用的本地级别
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
}
}
dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
// Found a task; do some bookkeeping and return a task description
// 获取task
val task = tasks(index)
val taskId = sched.newTaskId()
// Do various bookkeeping
copiesRunning(index) += 1
val attemptNum = taskAttempts(index).size
// 生成一个TaskInfo 里面注入了这个task的所有元数据
val info = new TaskInfo(taskId, index, attemptNum, curTime,
execId, host, taskLocality, speculative)
// taskInfos维护的是每个task对应的元数据信息
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
// NO_PREF will not affect the variables related to delay scheduling
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
}
// Serialize and return the task
// 序列化这个Task
val serializedTask: ByteBuffer = try {
ser.serialize(task)
} catch {
// If the task cannot be serialized, then there's no point to re-attempt the task,
// as it will always fail. So just abort the whole task-set.
case NonFatal(e) =>
val msg = s"Failed to serialize task $taskId, not attempting to retry it."
logError(msg, e)
abort(s"$msg Exception during serialization: $e")
throw new TaskNotSerializableException(e)
}
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
}
// 加入到调度池的runningTasks和TaskSetMananger的runningTasksSet
addRunningTask(taskId)
// We used to log the time it takes to serialize the task, but task size is already
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)")
sched.dagScheduler.taskStarted(task, info)
// 生成一个TaskDescription
// 标记着这个task在那个host的哪个executor执行
// 以及需要添加到executor的Classpath上的所有Jar包和File
new TaskDescription(
taskId,
attemptNum,
execId,
taskName,
index,
sched.sc.addedFiles,
sched.sc.addedJars,
task.localProperties,
serializedTask)
}
} else {
None
}
}
最后会去执行CoarseGrainedSchedulerBackend类的makeOffers方法去提交task,这时会使用launchTasks,里面传的参数就是之前得到的TaskDescription,会根据之前分配好的情况,去executor上启动相应的task。
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
//首先对每个executor需要执行的task消息序列化一下,可以在网络间进行传输
val serializedTask = TaskDescription.encode(task)
if (serializedTask.limit >= maxRpcMessageSize) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
"spark.rpc.message.maxSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
//根据task消息中的executorId找到运行的executor
val executorData = executorDataMap(task.executorId)
//并将executor空余的core数减去自身需要的core数
executorData.freeCores -= scheduler.CPUS_PER_TASK
logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
//向executor发送LaunchTask消息,用于在对应executor上启动task
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
这样的话,Task分配算法就已经分析完了。我感觉在Spark源码中最重要的就是Stage划分算法和Task分配算法了,所以将我自己对源码的一些分析跟大家分享一下。由于本人也不是什么大牛,如果哪点分析错误,希望大家可以给我指出来,互相学习。
需要注意的是:我这次分析的是2.2的源码,与网上流传很多的1.6源码在Stage划分以及Task分配上有一些方法发生了改动。而我自己之前也对1.6源码有所学习,所以希望大家在学习过程中需要谨慎小心,不要带着1.6源码的思想来看2.2源码,重新学习,收获更多!
compiled up by JiaMingcan
转载请署名:JiaMingcan