【本文详细介绍了Spark On Yarn Client 和 Cluster 运行原理及区别,欢迎读者朋友们阅读、转发和收藏!】
1 Spark on Yarn Client 模式
整个程序也是通过 spark-submit 脚本提交的。但是 yarn-client 作业程序的运行不需要通过 Client 类来封装启动,而是直接通过反射机制调用作业的 main 函数。
下面就来分析:
1. 通过 SparkSubmit 类的 launch 的函数直接调用作业的 main 函数(通过反射机制实现),如果是集群模式就会调用 Client 的 main 函数。
2. 而应用程序的 main 函数一定都有个 SparkContent ,并对其进行初始化;
3. 在 SparkContent 初始化中将会依次做如下的事情:设置相关的配置、注册 MapOutputTracker 、 BlockManagerMaster 、 BlockManager ,创建 taskScheduler 和 dagScheduler ;其中比较重要的是创建 taskScheduler 和 dagScheduler 。在创建 taskScheduler 的时候会根据我们传进来的 master 来选择 Scheduler 和 SchedulerBackend 。由于我们选择的是 yarn-client 模式,程序会选择 YarnClientClusterScheduler 和 YarnClientSchedulerBackend ,并将 YarnClientSchedulerBackend 的实例初始化 YarnClientClusterScheduler ,上面两个实例的获取都是通过反射机制实现的, YarnClientSchedulerBackend 类是 CoarseGrainedSchedulerBackend 类的子类, YarnClientClusterScheduler 是 TaskSchedulerImpl 的子类,仅仅重写了 TaskSchedulerImpl 中的 getRackForHost 方法。
4. 初始化完 taskScheduler 后,将创建 dagScheduler ,然后通过 taskScheduler.start() 启动 taskScheduler ,而在 taskScheduler 启动的过程中也会调用 SchedulerBackend 的 start 方法。在 SchedulerBackend 启动的过程中将会初始化一些参数,封装在 ClientArguments 中,并将封装好的 ClientArguments 传进 Client 类中,并 client.runApp() 方法获取 Application ID 。
5. client.runApp 里面的做是和前面客户端进行操作那节类似,不同的是在里面启动是 ExecutorLauncher ( yarn-cluster 模式启动的是 ApplicationMaster )。
6. 在 ExecutorLauncher 里面会初始化并启动 amClient ,然后向 ApplicationMaster 注册该 Application 。注册完之后将会等待 driver 的启动,当 driver 启动完之后,会创建一个 MonitorActor 对象用于和 CoarseGrainedSchedulerBackend 进行通信(只有事件 AddWebUIFilter 他们之间才通信, Task 的运行状况不是通过它和 CoarseGrainedSchedulerBackend 通信的)。然后就是设置 addAmIpFilter ,当作业完成的时候, ExecutorLauncher 将通过 amClient 设置 Application 的状态为 FinalApplicationStatus.SUCCEEDED 。
7. 分配 Executors ,这里面的分配逻辑和 yarn-cluster 里面类似,就不再说了。
8. 最后, Task 将在 CoarseGrainedExecutorBackend 里面运行,然后运行状况会通过 Akka 通知 CoarseGrainedScheduler ,直到作业运行完成。
9. 在作业运行的时候, YarnClientSchedulerBackend 会每隔 1 秒通过 client 获取到作业的运行状况,并打印出相应的运行信息,当 Application 的状态是 FINISHED 、 FAILED 和 KILLED 中的一种,那么程序将退出等待。
10. 最后有个线程会再次确认 Application 的状态,当 Application 的状态是 FINISHED 、 FAILED 和 KILLED 中的一种,程序就运行完成,并停止 SparkContext 。整个过程就结束了。

2 Spark on Yarn Cluster 模式
整个程序也是通过 spark-submit 脚本提交的,作业程序的运行需要通过 Client 类来封装启动。
2.1 客户端操作
在 spark-submit 中 cluster 启动运行的主入口是 yarn 的 client 类,通过反射启动 client 的 main 方法。主要步骤:
1. 根据 yarnConf 来初始化 yarnClient ,并启动 yarnClient
2. 创建客户端 Application ,并获取 Application 的 ID ,进一步判断集群中的资源是否满足 executor 和 ApplicationMaster 申请的资源,如果不满足则抛出 IllegalArgumentException ;
3. 设置资源、环境变量:其中包括了设置 Application 的 Staging 目录、准备本地资源( jar 文件、 log4j.properties )、设置 Application 其中的环境变量、创建 Container 启动的 Context 等;
4. 设置 Application 提交的 Context ,包括设置应用的名字、队列、 AM 的申请的 Container 、标记该作业的类型为 Spark ;
5. 申请 Memory ,并最终通过 yarnClient.submitApplication 向 ResourceManager 提交该 Application 。
6. 当作业提交到 YARN 上之后,客户端就没事了,甚至在终端关掉那个进程也没事,因为整个作业运行在 YARN 集群上进行,运行的结果将会保存到 HDFS 或者日志中。
2.2 Yarn 集群的操作
1. 运行 ApplicationMaster 的 run 方法;
2. 设置好相关的环境变量。
3. 创建 amClient ,并启动;
4. 在 Spark UI 启动之前设置 Spark UI 的 AmIpFilter ;
5. 在 startUserClass 函数专门启动了一个线程(名称为 Driver 的线程)来启动用户提交的 Application ,也就是启动了 Driver 。在 Driver 中将会初始化 SparkContext ;
6. 等待 SparkContext 初始化完成,最多等待 spark.yarn.applicationMaster.waitTries 次数(默认为 10 ),如果等待了的次数超过了配置的,程序将会退出;否则用 SparkContext 初始化 yarnAllocator ;
a) 怎么知道 SparkContext 初始化完成?
b) 其实在 5 步骤中启动 Application 的过程中会初始化 SparkContext ,在初始化 SparkContext 的时候将会创建 YarnClusterScheduler ,在 SparkContext 初始化完成的时候,会调用 YarnClusterScheduler 类中的 postStartHook 方法,而该方法会通知 ApplicationMaster 已经初始化好了 SparkContext
7. 当 SparkContext 、 Driver 初始化完成的时候,通过 amClient 向 ResourceManager 注册 ApplicationMaster
8. 分配并启动 Executeors 。在启动 Executeors 之前,先要通过 yarnAllocator 获取到 numExecutors 个 Container ,然后在 Container 中启动 Executeors 。如果在启动 Executeors 的过程中失败的次数达到了 maxNumExecutorFailures 的次数, maxNumExecutorFailures 的计算规则如下 :
// Default to numExecutors * 2, with minimum of 3private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
那么这个 Application 将失败,将 Application Status 标明为 FAILED ,并将关闭 SparkContext 。其实,启动 Executeors 是通过 ExecutorRunnable 实现的,而 ExecutorRunnable 内部是启动 CoarseGrainedExecutorBackend 的。
9 、最后, Task 将在 CoarseGrainedExecutorBackend 里面运行,然后运行状况会通过 Akka 通知 CoarseGrainedScheduler ,直到作业运行完成。
3 Client 和 Cluster 区别
当在 YARN 上运行 Spark 作业,每个 Spark executor 作为一个 YARN 容器 (container) 运行。 Spark 可以使得多个 Tasks 在同一个容器 (container) 里面运行。和 Hadoop 的 MapReduce 作业不一样, MapReduce 作业为每个 Task 开启不同的 JVM 来运行。
从广义上讲, yarn-cluster 适用于生产环境;而 yarn-client 适用于交互和调试,也就是希望快速地看到 application 的输出。
在我们介绍 yarn-cluster 和 yarn-client 的深层次的区别之前,我们先明白一个概念: Application Master 。在 YARN 中,每个 Application 实例都有一个 Application Master 进程,它是 Application 启动的第一个容器。它负责和 ResourceManager 打交道,并请求资源。获取资源之后告诉 NodeManager 为其启动 container 。
从深层次的含义讲, yarn-cluster 和 yarn-client 模式的区别其实就是 Application Master 进程的区别, yarn-cluster 模式下, driver 运行在 AM(Application Master) 中,它负责向 YARN 申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉 Client ,作业会继续在 YARN 上运行。然而 yarn-cluster 模式不适合运行交互类型的作业。而 yarn-client 模式下, Application Master 仅仅向 YARN 请求 executor , client 会和请求的 container 通信来调度他们工作,也就是说 Client 不能离开。