上次说到任务在submit的时候会根据,资源平台的不同,反射调用不同的类来调度任务,首先说下yarn的调度入口。
找到org.apache.spark.deploy.yarn.Client
总结下:client主要是初试话参数,封装RM为创建Application所需要的信息,最后通过RPC channel 向yarn发送启动ApplicationMaster的请求
//从main方法开始
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
logWarning("WARNING: This client is deprecated and will be removed in a " +
"future version of Spark. Use ./bin/spark-submit with \"--master yarn\"")
}
// 系统环境添加yarn集群的标识
System.setProperty("SPARK_YARN_MODE", "true")
//实例化SparkConf
val sparkConf = new SparkConf
val args = new ClientArguments(argStrings, sparkConf)
// to maintain backwards-compatibility
if (!Utils.isDynamicAllocationEnabled(sparkConf)) {
sparkConf.setIfMissing("spark.executor.instances", args.numExecutors.toString)
}
//实例化Client,调用run方法
new Client(args, sparkConf).run()
}
// 向yarn的ResourceManager提交Application
def run(): Unit = {
// yarn提交Application,并放回appId
this.appId = submitApplication()
if (!launcherBackend.isConnected() && fireAndForget) {
val report = getApplicationReport(appId)
val state = report.getYarnApplicationState
logInfo(s"Application report for $appId (state: $state)")
logInfo(formatReportDetails(report))
if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
throw new SparkException(s"Application $appId finished with status: $state")
}
} else {
val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
if (yarnApplicationState == YarnApplicationState.FAILED ||
finalApplicationStatus == FinalApplicationStatus.FAILED) {
throw new SparkException(s"Application $appId finished with failed status")
}
if (yarnApplicationState == YarnApplicationState.KILLED ||
finalApplicationStatus == FinalApplicationStatus.KILLED) {
throw new SparkException(s"Application $appId is killed")
}
if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
throw new SparkException(s"The final status of application $appId is undefined")
}
}
}
def submitApplication(): ApplicationId = {
var appId: ApplicationId = null
try {
//创建spark server的socket连接
launcherBackend.connect()
// Setup the credentials before doing anything else,
// so we have don't have issues at any point.
// 设置yarn的证书认证
setupCredentials()
// 初始化配置
yarnClient.init(yarnConf)
// 启动yarn client
yarnClient.start()
logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
reportLauncherState(SparkAppHandle.State.SUBMITTED)
launcherBackend.setAppId(appId.toString())
// 判断是否有足够的资源启动AM
verifyClusterResources(newAppResponse)
// 设置containerContext
val containerContext = createContainerLaunchContext(newAppResponse)
//创建ApplicationSubmissionContext
//该对象封装了RM启动AM需要的所有信息
// 在这里有个很重要的地方 代码在下面贴出
val appContext = createApplicationSubmissionContext(newApp, containerContext)
// Finally, submit and monitor the application
logInfo(s"Submitting application ${appId.getId} to ResourceManager")
// 调用YarnClientImpl中的submitApplication方法
yarnClient.submitApplication(appContext)
appId
} catch {
case e: Throwable =>
if (appId != null) {
cleanupStagingDir(appId)
}
throw e
}
}
// createContainerLaunchContext方法中一个重要的点
// 这里可以看到如果用户提交的是cluster的模式
// 则am调用 org.apache.spark.deploy.yarn.ApplicationMaster 类调度
// 如果是client模式的话
// 则am调用 org.apache.spark.deploy.yarn.ExecutorLauncher 调度
val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
// 提交Application的入口
@Override
public ApplicationId
submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException {
ApplicationId applicationId = appContext.getApplicationId();
appContext.setApplicationId(applicationId);
// 实例化SubmitApplicationRequest对象
// 该对象负责客户端向RM提交应用的请求
SubmitApplicationRequest request =
Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
// 最后会通过Google ProtoBuf 的 RPC channel 提交
// 太多层 不贴了
rmClient.submitApplication(request);
int pollCount = 0;
while (true) {
YarnApplicationState state =
getApplicationReport(applicationId).getYarnApplicationState();
if (!state.equals(YarnApplicationState.NEW) &&
!state.equals(YarnApplicationState.NEW_SAVING)) {
break;
}
// Notify the client through the log every 10 poll, in case the client
// is blocked here too long.
if (++pollCount % 10 == 0) {
LOG.info("Application submission is not finished, " +
"submitted application " + applicationId +
" is still in " + state);
}
try {
Thread.sleep(statePollIntervalMillis);
} catch (InterruptedException ie) {
}
}
LOG.info("Submitted application " + applicationId + " to ResourceManager"
+ " at " + rmAddress);
return applicationId;
}
版权声明:本文为aiai20原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。