Spark Streaming之基础知识
一、概叙
- Spark Streaming是spark的核心API的扩展,用于构建弹性、高吞吐量、容错的在线数据流的流式处理程序
- 数据源多种,可来自kafka、flume、HDFS等
- 数据输出到HDFS、数据库、可视化界面
- 处理的数据是一批,属于微批处理
- 批处理间隔是 Spark Streaming 的核心概念和关键参数,它决定了 Spark Streaming 提交作业的频率和数据处理的延迟,同时也影响着数据处理的吞吐量和性能
- 它提供了一个高级抽象DStream,表示一个连续的数据流
- DStream可以通过输入数据流或者是其他DStream来创建
- 在内部,DStream是由一个个RDD序列来表示
二、特点
- 优点:
- 易用性:通过一些高阶函数来构建应用
- 容错性
- 易整合性:易整合到spark体系中
- 缺点:相对于“一次处理一条数据”架构的系统来说,它的延迟相对会高一些
三、DStream的创建
RDD队列创建(queueStream关键字)
val conf = new SparkConf().setAppName("RDDQueueDemo").setMaster("local[*]") val scc = new StreamingContext(conf, Seconds(5)) val queue: mutable.Queue[RDD[Int]] = mutable.Queue[RDD[Int]]() val rddDS: InputDStream[Int] = scc.queueStream(queue, true)自定义数据源(需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集)
class MySource(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){ /* 接收器启动的时候调用该方法. This function must initialize all resources (threads, buffers, etc.) necessary for receiving data. 这个函数内部必须初始化一些读取数据必须的资源 该方法不能阻塞, 所以 读取数据要在一个新的线程中进行. */ override def onStart(): Unit = { // 启动一个新的线程来接收数据 new Thread("Socket Receiver"){ override def run(): Unit = { receive() } }.start() } // 此方法用来接收数据 def receive()={ val socket = new Socket(host, port) val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8)) var line: String = null // 当 receiver没有关闭, 且reader读取到了数据则循环发送给spark while (!isStopped && (line = reader.readLine()) != null ){ // 发送给spark store(line) } // 循环结束, 则关闭资源 reader.close() socket.close() // 重启任务 restart("Trying to connect again") } override def onStop(): Unit = { } } //使用自定义数据源 object MySourceDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[*]") // 1. 创建SparkStreaming的入口对象: StreamingContext 参数2: 表示事件间隔 val ssc = new StreamingContext(conf, Seconds(5)) // 2. 创建一个DStream val lines: ReceiverInputDStream[String] = ssc.receiverStream[String](new MySource("hadoop101", 9999)) // 3. 一个个的单词 val words: DStream[String] = lines.flatMap(_.split("""\s+""")) // 4. 单词形成元组 val wordAndOne: DStream[(String, Int)] = words.map((_, 1)) // 5. 统计单词的个数 val count: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _) //6. 显示 count.print //7. 启动流式任务开始计算 ssc.start() //8. 等待计算结束才退出主程序 ssc.awaitTermination() ssc.stop(false) } }kafka数据源(包内提供的 KafkaUtils 对象可以在 StreamingContext和JavaStreamingContext中以你的 Kafka 消息创建出 DStream)(重要)
导入依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version> </dependency>高级API (没有缓存,每次只能消费kafka中最新产生的消息)
import kafka.serializer.StringDecoder import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object HighKafka { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka") val ssc = new StreamingContext(conf, Seconds(3)) // kafka 参数 //kafka参数声明 val brokers = "hadoop101:9092,hadoop102:9092,hadoop103:9092" val topic = "first" val group = "bigdata" val deserialization = "org.apache.kafka.common.serialization.StringDeserializer" val kafkaParams = Map( ConsumerConfig.GROUP_ID_CONFIG -> group, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, ) val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Set(topic)) dStream.print() ssc.start() ssc.awaitTermination() } }高级API (设置缓存,可以从上次的位置接着消费)
import kafka.serializer.StringDecoder import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object HighKafka2 { def createSSC(): StreamingContext = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka") val ssc = new StreamingContext(conf, Seconds(3)) // 偏移量保存在 checkpoint 中, 可以从上次的位置接着消费 ssc.checkpoint("./ck1") // kafka 参数 //kafka参数声明 val brokers = "hadoop101:9092,hadoop102:9092,hadoop103:9092" val topic = "first" val group = "bigdata" val deserialization = "org.apache.kafka.common.serialization.StringDeserializer" val kafkaParams = Map( "zookeeper.connect" -> "hadoop101:2181,hadoop102:2181,hadoop103:2181", ConsumerConfig.GROUP_ID_CONFIG -> group, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization ) val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Set(topic)) dStream.print() ssc } def main(args: Array[String]): Unit = { val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck1", () => createSSC()) ssc.start() ssc.awaitTermination() } }低级API (可以自己设置偏移量,保证下次消费kafka中的数据是自己上次停止消费的位置开始)
import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.KafkaCluster.Err import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} object LowKafka { // 获取 offset def getOffset(kafkaCluster: KafkaCluster, group: String, topic: String): Map[TopicAndPartition, Long] = { // 最终要返回的 Map var topicAndPartition2Long: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]() // 根据指定的主体获取分区信息 val topicMetadataEither: Either[Err, Set[TopicAndPartition]] = kafkaCluster.getPartitions(Set(topic)) // 判断分区是否存在 if (topicMetadataEither.isRight) { // 不为空, 则取出分区信息 val topicAndPartitions: Set[TopicAndPartition] = topicMetadataEither.right.get // 获取消费消费数据的进度 val topicAndPartition2LongEither: Either[Err, Map[TopicAndPartition, Long]] = kafkaCluster.getConsumerOffsets(group, topicAndPartitions) // 如果没有消费进度, 表示第一次消费 if (topicAndPartition2LongEither.isLeft) { // 遍历每个分区, 都从 0 开始消费 topicAndPartitions.foreach { topicAndPartition => topicAndPartition2Long = topicAndPartition2Long + (topicAndPartition -> 0) } } else { // 如果分区有消费进度 // 取出消费进度 val current: Map[TopicAndPartition, Long] = topicAndPartition2LongEither.right.get topicAndPartition2Long ++= current } } // 返回分区的消费进度 topicAndPartition2Long } // 保存消费信息 def saveOffset(kafkaCluster: KafkaCluster, group: String, dStream: InputDStream[String]) = { dStream.foreachRDD(rdd => { var map: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]() // 把 RDD 转换成HasOffsetRanges对 val hasOffsetRangs: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges] // 得到 offsetRangs val ranges: Array[OffsetRange] = hasOffsetRangs.offsetRanges ranges.foreach(range => { // 每个分区的最新的 offset map += range.topicAndPartition() -> range.untilOffset }) kafkaCluster.setConsumerOffsets(group,map) }) } def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka") val ssc = new StreamingContext(conf, Seconds(3)) // kafka 参数 //kafka参数声明 val brokers = "hadoop101:9092,hadoop102:9092,hadoop103:9092" val topic = "first" val group = "bigdata" val deserialization = "org.apache.kafka.common.serialization.StringDeserializer" val kafkaParams = Map( "zookeeper.connect" -> "hadoop101:2181,hadoop102:2181,hadoop103:2181", ConsumerConfig.GROUP_ID_CONFIG -> group, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization ) // 读取 offset val kafkaCluster = new KafkaCluster(kafkaParams) val fromOffset: Map[TopicAndPartition, Long] = getOffset(kafkaCluster, group, topic) val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String]( ssc, kafkaParams, fromOffset, (message: MessageAndMetadata[String, String]) => message.message() ) dStream.print() // 保存 offset saveOffset(kafkaCluster, group, dStream) ssc.start() ssc.awaitTermination() } }
四、DStream的转换
| Transformation | Meaning |
|---|---|
| map(func) | 通过函数func传递源数据流的每个元素,返回新的数据流。 |
| flatMap(func) | 与map类似,但是每个输入项都可以映射到0个或更多个输出项。 |
| filter(func) | 通过仅选择func返回true的源数据流的记录来返回新的数据流。 |
| repartition(numPartitions) | 通过创建更多或更少的分区来更改此数据流中的并行级别。 |
| union(otherStream) | 返回包含源数据流和其他数据流中元素并集的新数据流。 |
| count() | 通过计算源数据流中每个RDD中的元素数,返回单个元素RDD的新数据流。 |
| reduce(func) | 通过使用函数func(接受两个参数并返回一个)聚合源数据流中每个RDD中的元素,返回一个新的单元素RDD数据流。这个函数应该是结合的和可交换的,这样就可以并行计算了。 |
| countByValue() | 在K类型元素的数据流上调用时,返回(K,Long)对的新数据流,其中每个键的值是其在源数据流的每个RDD中的频率。 |
| reduceByKey(func, [numTasks]) | 在(K,V)对的数据流上调用时,返回一个新的(K,V)对数据流,其中每个键的值使用给定的reduce进行聚合功能注释:默认情况下,使用Spark的默认并行任务数(2表示本地模式,在集群模式下,这个数字由config属性决定spark.default.parallelism)进行分组。可以传递可选的numTasks参数来设置不同的任务数。 |
| join(otherStream, [numTasks]) | 当在(K,V)和(K,W)对的两个数据流上调用时,返回一个新的(K,(V,W))对的数据流,其中包含每个键的所有元素对。 |
| cogroup(otherStream, [numTasks]) | 在(K,V)和(K,W)对的数据流上调用时,返回(K,Seq[V],Seq[W])元组的新数据流。 |
| transform(func) | 通过对源数据流的每个RDD应用RDD to RDD函数,返回新的数据流。这可用于在数据流上执行任意RDD操作。 |
| updateStateByKey(func) | 返回一个新的“state”数据流,在该数据流中,通过将给定函数应用于键的前一个状态和该键的新值来更新每个键的状态。这可以用来维护每个键的任意状态数据。 |
无状态转换:无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD
transform函数
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object TransformDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Practice").setMaster("local[2]") val sctx = new StreamingContext(conf, Seconds(3)) val dstream: ReceiverInputDStream[String] = sctx.socketTextStream("hadoop101", 10000) val resultDStream = dstream.transform(rdd => { rdd.flatMap(_.split("\\W")).map((_, 1)).reduceByKey(_ + _) }) resultDStream.print sctx.start sctx.awaitTermination() } }
有状态转换
updateStateByKey函数:操作允许在使用新信息不断更新状态的同时能够保留他的状态.
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object StreamingWordCount2 { def main(args: Array[String]): Unit = { // 设置将来访问 hdfs 的使用的用户名, 否则会出现权限不够 System.setProperty("HADOOP_USER_NAME", "kgg") val conf = new SparkConf().setAppName("StreamingWordCount2").setMaster("local[*]") // 1. 创建SparkStreaming的入口对象: StreamingContext 参数2: 表示事件间隔 val ssc = new StreamingContext(conf, Seconds(5)) // 2. 创建一个DStream val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101", 9999) // 3. 一个个的单词 val words: DStream[String] = lines.flatMap(_.split("""\s+""")) // 4. 单词形成元组 val wordAndOne: DStream[(String, Int)] = words.map((_, 1)) // 开始 /* 1. 定义状态: 每个单词的个数就是我们需要更新的状态 2. 状态更新函数. 每个key(word)上使用一次更新新函数 参数1: 在当前阶段 一个新的key对应的value组成的序列 在我们这个案例中是: 1,1,1,1... 参数2: 上一个阶段 这个key对应的value */ def updateFunction(newValue: Seq[Int], runningCount: Option[Int]): Option[Int] = { // 新的总数和状态进行求和操作 val newCount: Int = (0 /: newValue) (_ + _) + runningCount.getOrElse(0) Some(newCount) } // 设置检查点: 使用updateStateByKey必须设置检查点 ssc.sparkContext.setCheckpointDir("hdfs://hadoop101:9000/checkpoint") val stateDS: DStream[(String, Int)] = wordAndOne.updateStateByKey[Int](updateFunction _) //结束 //6. 显示 stateDS.print //7. 启动流失任务开始计算 ssc.start() //8. 等待计算结束才推出主程序 ssc.awaitTermination() ssc.stop(false) } }window操作:允许执行转换操作作用在一个窗口内的数据。(所有窗口函数的窗口大小和步长都是批处理间隔的整数倍)
reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration)
val wordAndOne: DStream[(String, Int)] = words.map((_, 1)) /* 参数1: reduce 计算规则 参数2: 窗口长度 参数3: 窗口滑动步长. 每隔这么长时间计算一次. */ val count: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y,Seconds(15), Seconds(10))reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration):比没有invReduceFunc参数的高效,会利用旧值进行计算
ssc.sparkContext.setCheckpointDir("hdfs://hadoop101:9000/checkpoint") val count: DStream[(String, Int)] =wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y,(x: Int, y: Int) => x - y,Seconds(15), Seconds(10))window(windowLength, slideInterval) :基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream
countByWindow(windowLength, slideInterval):返回一个滑动窗口计数流中的元素的个数
countByValueAndWindow(windowLength, slideInterval, [numTasks]):对**(K,V)对的DStream调用,返回(K,Long)对的新DStream**,其中每个key的的对象的v是其在滑动窗口中频率。如上,可配置reduce任务数量
五、DStream的输出
| Output Operation | Meaning |
|---|---|
| print() | 在运行流式应用程序的驱动程序节点上,打印数据流中每批数据的前十个元素。这对于开发和调试非常有用。Python API在Python API中称为pprint()。 |
| saveAsTextFiles(prefix, [suffix]) | 将此数据流的内容另存为文本文件。每个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS[.suffix]”。 |
| saveAsObjectFiles(prefix, [suffix]) | 将此数据流的内容另存为序列化Java对象的序列文件。每个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS[.suffix]”。Python API这在Python API中不可用。 |
| saveAsHadoopFiles(prefix, [suffix]) | 将此数据流的内容另存为Hadoop文件。每个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME_IN_MS[.suffix]”。Python API这在Python API中不可用。 |
| foreachRDD(func) | 对从流生成的每个RDD应用函数func的最通用的输出运算符。这个函数应该将每个RDD中的数据推送到外部系统,例如将RDD保存到文件中,或者通过网络将其写入数据库。请注意,函数func是在运行流式应用程序的驱动程序进程中执行的,它通常包含RDD操作,这将强制计算流式RDD。 |
注意:
- 连接不能写在driver层面(序列化)
- 如果写在foreach则每个RDD中的每一条数据都创建,得不偿失
- 增加foreachPartition,在分区创建(获取)
六、累加器和广播变量
和RDD中的累加器和广播变量的用法完全一样. RDD中怎么用, 这里就怎么用
七、DataFrame ans SQL Operations
val spark = SparkSession.builder.config(conf).getOrCreate()
import spark.implicits._
count.foreachRDD(rdd =>{
val df: DataFrame = rdd.toDF("word", "count")
df.createOrReplaceTempView("words")
spark.sql("select * from words").show
})
八、Caching / Persistence
- 和 RDDs 类似,DStreams 同样允许开发者将流数据保存在内存中。也就是说,在DStream 上使用 persist()方法将会自动把DStreams中的每个RDD保存在内存中
- 当DStream中的数据要被多次计算时,这个非常有用(如在同样数据上的多次操作)。对于像reduceByWindow和reduceByKeyAndWindow以及基于状态的**(updateStateByKey)**这种操作,保存是隐含默认的
- 因此,即使开发者没有调用persist(),由基于窗操作产生的DStreams会自动保存在内存中