SparkStreaming常见算子示例
两者的差异
- updateStateByKey:
- UpdateStateBykey会统计全局的key的状态,不管有没有数据输入,它会在每一个批次间隔返回之前的key的状态。updateStateBykey会对已存在的key进行state的状态更新,同时还会对每个新出现的key执行相同的更新函数操作。如果通过更新函数对state更新后返回来为none,此时刻key对应的state状态会删除(state可以是任意类型的数据结构)
- updataeStateByKey返回在指定的批次间隔内返回之前的全部历史数据
- mapWithState:
- mapWithState也是用于对于全局统计key的状态,但是它如果没有数据输入,便不会返回之前的key的状态,类型于增量的感觉。
- mapWithState只返回变化后的key的值
总结
- 统计历史数据使用updateStateByKey
- 统计实时数据尽量使用mapWithState,
mapWithState
的性能要优于updateStateByKey
updateStateByKey
源码注释
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* In every batch the updateFunc will be called for each state even if there are no new values.
*/
适用场景:
UpdataStateBykey可以用来统计历史数据
,每次输出所有的key值。列如统计不同时间段用户平均消费金额,消费次数,消费总额,网站的不同时间段的返回量等指标。
注意
由于UpdataStateBykey
统计的是全量数据,因此要求必须做checkpoint
。每次计算新的batch都从上个checkpoint
拿取数据,从而避免从头全部计算
代码
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object update {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf: SparkConf = new SparkConf().setAppName("upKey").setMaster("local[2]")
val sc = new StreamingContext(conf,Seconds(2))
//使用hdfs保存之前的计算结果
sc.checkpoint("hdfs://node01:8020/ck")
//接受socket数据
val socketTextStream: ReceiverInputDStream[String] = sc.socketTextStream("node01",9999)
//处理当前批次的数据
val resultOne: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//处理当前和之前所有批次数据的累加
val resykt: DStream[(String, Int)] = resultOne.updateStateByKey(updateFunc)
resultOne.print()
sc.start()
sc.awaitTermination()
}
//currentValue:当前批次中每一个单词出现的所有的1
//(hive,1)(hive,1)(hive,1)--->List(1,1,1)即Seq[Int]
//historyValues:之前批次中每个单词出现的总次数,Option类型表示存在或者不存在。 Some表示存在有值,None表示没有并消除当前key
def updateFunc(currentValue:Seq[Int],historyValues: Option[Int]):Option[Int]={
val newValue: Int = currentValue.sum + historyValues.getOrElse(0)
Some(newValue)
}
}
mapWithState
源码注释
/**
* Return a [[MapWithStateDStream]] by applying a function to every key-value element of
* `this` stream, while maintaining some state data for each unique key. The mapping function
* and other specification (e.g. partitioners, timeouts, initial state data, etc.) of this
* transformation can be specified using `StateSpec` class. The state data is accessible in
* as a parameter of type `State` in the mapping function.
*/
适用场景
mapWithState可以用于一些实时性较高
,延迟较少的一些场景,例如你在某宝上下单买了个东西,付款之后返回你账户里余额信息
代码
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, MapWithStateDStream, ReceiverInputDStream}
import org.apache.spark.streaming._
object map {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("mapKey").setMaster("local[2]")
val sc = new StreamingContext(conf,Seconds(2))
val initRDD: RDD[(String, Int)] = sc.sparkContext.parallelize((List(("hello",10),("world",20))))
sc.checkpoint("hdfs://node01:8020/ck")
val socketTextStream: ReceiverInputDStream[String] = sc.socketTextStream("node01",9999)
//处理当前批次数据
val resultOne: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_,1))
// A mapping function that maintains an integer state and return a String
val stateSpec=StateSpec.function((time:Time,key:String,currentValue:Option[Int],historyState:State[Int])=>{
//当前批次结果与历史批次的结果累加
val sumValue: Int = currentValue.getOrElse(0)+ historyState.getOption().getOrElse(0)
val output=(key,sumValue)
// Use state.exists(), state.get(), state.update() and state.remove()
// to manage state, and return the necessary string
if(!historyState.isTimingOut()){
historyState.update(sumValue)
}
Some(output)
//给一个初始的结果initRDD
//timeout: 当一个key超过这个时间没有接收到数据的时候,这个key以及对应的状态会被移除掉
}).initialState(initRDD).timeout(Durations.seconds(5))
//使用mapWithState方法,实现累加
val result: MapWithStateDStream[String, Int, Int, (String, Int)] = resultOne.mapWithState(stateSpec)
result.stateSnapshots().print()
//todo: 6、开启流式计算
sc.start()
sc.awaitTermination()
}
}
版权声明:本文为a3125504x原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。