先介绍官网提交的例子,我用的是spark 0.9.0 hadoop2.2.0
一.使用脚本提交
1.使用spark脚本提交到yarn,首先需要将spark所在的主机和hadoop集群之间hosts相互配置(也就是把spark主机的ip和主机名配置到hadoop所有节点的/etc/hosts里面,再把集群所有节点的ip和主机名配置到spark所在主机的/etc/hosts里面)。
2.然后需要把hadoop目录etc/hadoop下面的*-sit.xml复制到${SPARK_HOME}的conf下面.
3.确保hadoop集群配置了 HADOOP_CONF_DIR or YARN_CONF_DIR
1.yarn-standalone方式提交到yarn
在${SPARK_HOME}下面执行:
1 2 3 4 5 6 7 8 9 | SPARK_JAR=. /assembly/target/scala-2 .10.4 /spark-assembly-0 .9.0-incubating-hadoop2.2.0.jar \ . /bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar . /examples/target/scala-2 .10 /spark-examples_2 .10-assembly-0.9.0-incubating.jar \ --class org.apache.spark.examples.SparkPi \ --args yarn-standalone \ --num-workers 3 \ --master-memory 2g \ --worker-memory 2g \ --worker-cores 1 |
在${SPARK_HOME}下面执行:
1 2 3 | SPARK_JAR=. /assembly/target/scala-2 .10.4 /spark-assembly-0 .9.0-incubating-hadoop2.2.0.jar \SPARK_YARN_APP_JAR=examples /target/scala-2 .10 /spark-examples_2 .10-assembly-0.9.0-incubating.jar \. /bin/run-example org.apache.spark.examples.SparkPi yarn-client |
1.必须使用linux主机提交任务,使用windows提交到linux hadoop集群会报
1 | org.apache.hadoop.util.Shell$ExitCodeException: /bin/bash: 第 0 行: fg: 无任务控制 |
错误。hadoop2.2.0不支持windows提交到linux hadoop集群,网上搜索发现这是hadoop的bug。
2.提交任务的主机和hadoop集群主机名需要在hosts相互配置。
3.因为使用程序提交是使用yarn-client方式,所以必须像上面脚本那样设置环境变量SPARK_JAR 和 SPARK_YARN_APP_JAR
比如我的设置为向提交任务主机~/.bashrc里面添加:
1 2 | export SPARK_JAR= file : ///home/ndyc/software/sparkTest/lib/spark-assembly-0 .9.0-incubating-hadoop2.2.0.jar export SPARK_YARN_APP_JAR= file : ///home/ndyc/software/sparkTest/ndspark-0 .0.1.jar |
其中SPARK_JAR是${SPARK_HOME}/assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar
SPARK_YARN_APP_JAR是自己程序打的jar包,包含自己的测试程序。
4.程序中加入hadoop、yarn、依赖。
注意,如果引入了hbase依赖,需要这样配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-thrift</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> </exclusion> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </exclusion> </exclusions> </dependency> |
然后再加入
1 2 3 4 5 | <dependency> <groupId>org.ow2.asm</groupId> <artifactId>asm-all</artifactId> <version> 4.0 </version> </dependency> |
1 | IncompatibleClassChangeError has interface org.objectweb.asm.ClassVisitor as super class |
异常是因为Hbase jar hadoop-mapreduce-client-jobclient.jar里面使用到了asm3.1 而spark需要的是asm-all-4.0.jar
5. hadoop conf下的*-site.xml需要复制到提交主机的classpath下,或者说maven项目resources下面。
6.编写程序
代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | package com.sdyc.ndspark.sys;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.ArrayList;import java.util.List;/** * Created with IntelliJ IDEA. * User: zarchary * Date: 14-1-19 * Time: 下午6:23 * To change this template use File | Settings | File Templates. */public class ListTest { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf(); sparkConf.setAppName( "listTest" ); //使用yarn模式提交 sparkConf.setMaster( "yarn-client" ); JavaSparkContext sc = new JavaSparkContext(sparkConf); List<String> listA = new ArrayList<String>(); listA.add( "a" ); listA.add( "a" ); listA.add( "b" ); listA.add( "b" ); listA.add( "b" ); listA.add( "c" ); listA.add( "d" ); JavaRDD<String> letterA = sc.parallelize(listA); JavaPairRDD<String, Integer> letterB = letterA.map( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1 ); } }); letterB = letterB.reduceByKey( new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); //颠倒顺序 JavaPairRDD<Integer, String> letterC = letterB.map( new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return new Tuple2<Integer, String>(stringIntegerTuple2._2, stringIntegerTuple2._1); } }); JavaPairRDD<Integer, List<String>> letterD = letterC.groupByKey();// //false说明是降序 JavaPairRDD<Integer, List<String>> letterE = letterD.sortByKey( false ); System.out.println( "========" + letterE.collect()); System.exit( 0 ); }} |
关于spark需要依赖的jar的配置可以参考我的博客spark安装和远程调用。
以上弄完之后就可以运行程序了。
运行后会看到yarn的ui界面出现:

正在执行的过程中会发现hadoop yarn 有的nodemanage会有下面这个进程:
1 | 13247 org.apache.spark.deploy.yarn.WorkerLauncher |
这是spark的工作进程。
如果接收到异常为:
1 | WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory |
出现这个错误是因为提交任务的节点不能和spark工作节点交互,因为提交完任务后提交任务节点上会起一个进程,展示任务进度,大多端口为4044,工作节点需要反馈进度给该该端口,所以如果主机名或者IP在hosts中配置不正确,就会报
WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory错误。
所以请检查主机名和IP是否配置正确。
我自己的理解为,程序提交任务到yarn后,会上传SPARK_JAR和SPARK_YARN_APP_JAR到hadoop节点, yarn根据任务情况来分配资源,在nodemanage节点上来启动org.apache.spark.deploy.yarn.WorkerLauncher工作节点来执行spark任务,执行完成后退出。