1.0 前言
目前Spark Streaming编程指南地址:
http://spark.apache.org/docs/latest/streaming-programming-guide.html
1.1 创建StreamingContext对象
1.1.1通过SparkContext创建
源码如下:
def this (sparkContext:SparkContext, batchDuration: Duration) = {
this (sparkContext, null ,batchDuration)
}
第一参数为sparkContext对象,第二个参数为批次时间;
创建实例:
val ssc = new StreamingContext( sc , Seconds( 5 ))
1.1.2通过SparkConf创建
源码如下:
def this (conf:SparkConf, batchDuration: Duration) = {
this (StreamingContext.createNewSparkContext(conf), null ,batchDuration)
}
第一参数为SparkConf对象,第二个参数为批次时间;
创建实例:
val conf = new SparkConf().setAppName( "StreamTest" )
val ssc = new StreamingContext( conf ,Seconds( 5 ))
1.1.3通过SparkConf参数创建
源码如下:
def this (
master: String,
appName: String,
batchDuration: Duration,
sparkHome: String = null ,
jars: Seq[String] = Nil,
environment: Map[String, String] = Map()) = {
this (StreamingContext.createNewSparkContext(master,appName, sparkHome, jars, environment),
null , batchDuration)
}
第一参数为需要创建SparkConf对象的详细参数,master-spark地址,appName-对象名称,sparkHome- sparkHome环境变量,jars, environment,第二个参数为批次时间;
创建实例:
val ssc = newStreamingContext(“ spark://host:port”, "StreamTest", Seconds(5), System.getenv("SPARK_HOME"),StreamingContext.jarOfClass(this.getClass))
1.1.4通过checkpointfile参数创建
源码如下:
def this (path:String, hadoopConf: Configuration) =
this ( null , CheckpointReader.read(path, new SparkConf(), hadoopConf).get, null )
第一参数为checkpoint file的路径,第二个参数为haoop的配置
源码如下:
def this (path:String) = this (path, new Configuration)
第一参数为checkpoint file的路径
1.2创建Dstream监听对象
1.2.1 fileStream
源码如下:
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and inputformat.
* Files must be written to the monitored directory by "moving"them from another
* location within the same file system. File names starting with . areignored.
* @param directory HDFS directory to monitor for new file