SparkStreaming updateStateByKey和mapWithState

SparkStreaming常见算子示例

两者的差异

  • updateStateByKey
    • UpdateStateBykey会统计全局的key的状态,不管有没有数据输入,它会在每一个批次间隔返回之前的key的状态。updateStateBykey会对已存在的key进行state的状态更新,同时还会对每个新出现的key执行相同的更新函数操作。如果通过更新函数对state更新后返回来为none,此时刻key对应的state状态会删除(state可以是任意类型的数据结构)
    • updataeStateByKey返回在指定的批次间隔内返回之前的全部历史数据
  • mapWithState
    • mapWithState也是用于对于全局统计key的状态,但是它如果没有数据输入,便不会返回之前的key的状态,类型于增量的感觉。
    • mapWithState只返回变化后的key的值

总结

  • 统计历史数据使用updateStateByKey
  • 统计实时数据尽量使用mapWithState,mapWithState的性能要优于updateStateByKey

updateStateByKey

源码注释

/**
   * Return a new "state" DStream where the state for each key is updated by applying
   * the given function on the previous state of the key and the new values of each key.
   * In every batch the updateFunc will be called for each state even if there are no new values.
   */

适用场景
UpdataStateBykey可以用来统计历史数据,每次输出所有的key值。列如统计不同时间段用户平均消费金额,消费次数,消费总额,网站的不同时间段的返回量等指标。

注意
由于UpdataStateBykey统计的是全量数据,因此要求必须做checkpoint。每次计算新的batch都从上个checkpoint拿取数据,从而避免从头全部计算

代码


import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object update {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf: SparkConf = new SparkConf().setAppName("upKey").setMaster("local[2]")
    val sc = new StreamingContext(conf,Seconds(2))
    //使用hdfs保存之前的计算结果
    sc.checkpoint("hdfs://node01:8020/ck")
    //接受socket数据
    val socketTextStream: ReceiverInputDStream[String] = sc.socketTextStream("node01",9999)
    //处理当前批次的数据
    val resultOne: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    //处理当前和之前所有批次数据的累加
    val resykt: DStream[(String, Int)] = resultOne.updateStateByKey(updateFunc)

    resultOne.print()
    sc.start()
    sc.awaitTermination()

  }
  //currentValue:当前批次中每一个单词出现的所有的1
  //(hive,1)(hive,1)(hive,1)--->List(1,1,1)即Seq[Int]
  //historyValues:之前批次中每个单词出现的总次数,Option类型表示存在或者不存在。 Some表示存在有值,None表示没有并消除当前key
  def updateFunc(currentValue:Seq[Int],historyValues: Option[Int]):Option[Int]={
    val newValue: Int = currentValue.sum + historyValues.getOrElse(0)
    Some(newValue)
  }

}

在这里插入图片描述

在这里插入图片描述

mapWithState

源码注释

/**
 * Return a [[MapWithStateDStream]] by applying a function to every key-value element of
   * `this` stream, while maintaining some state data for each unique key. The mapping function
   * and other specification (e.g. partitioners, timeouts, initial state data, etc.) of this
   * transformation can be specified using `StateSpec` class. The state data is accessible in
   * as a parameter of type `State` in the mapping function.
   */

适用场景
mapWithState可以用于一些实时性较高,延迟较少的一些场景,例如你在某宝上下单买了个东西,付款之后返回你账户里余额信息

代码

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, MapWithStateDStream, ReceiverInputDStream}
import org.apache.spark.streaming._

object map {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("mapKey").setMaster("local[2]")
    val sc = new StreamingContext(conf,Seconds(2))
    val initRDD: RDD[(String, Int)] = sc.sparkContext.parallelize((List(("hello",10),("world",20))))
    sc.checkpoint("hdfs://node01:8020/ck")
    val socketTextStream: ReceiverInputDStream[String] = sc.socketTextStream("node01",9999)
    //处理当前批次数据
    val resultOne: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_,1))

    // A mapping function that maintains an integer state and return a String
    val stateSpec=StateSpec.function((time:Time,key:String,currentValue:Option[Int],historyState:State[Int])=>{

      //当前批次结果与历史批次的结果累加
      val sumValue: Int = currentValue.getOrElse(0)+ historyState.getOption().getOrElse(0)
      val output=(key,sumValue)

      // Use state.exists(), state.get(), state.update() and state.remove()
      // to manage state, and return the necessary string
      if(!historyState.isTimingOut()){
        historyState.update(sumValue)
      }

      Some(output)
      //给一个初始的结果initRDD
      //timeout: 当一个key超过这个时间没有接收到数据的时候,这个key以及对应的状态会被移除掉
    }).initialState(initRDD).timeout(Durations.seconds(5))

    //使用mapWithState方法,实现累加
    val result: MapWithStateDStream[String, Int, Int, (String, Int)] = resultOne.mapWithState(stateSpec)

    result.stateSnapshots().print()

    //todo: 6、开启流式计算
    sc.start()
    sc.awaitTermination()

  }
}

在这里插入图片描述


版权声明:本文为a3125504x原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。