《深入理解Spark》之Transform、foreachRDD、updateStateByKey以及reduceByKeyAndWindow

package com.lyzx.day32

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

class T1 {
  /**
    * Transform Operation
    *
    * The transform operation (along with its variations like transformWith) allows arbitrary
    * RDD-to-RDD functions to be applied on a DStream.
    * It can be used to apply any RDD operation that is not exposed in the DStream API. For example,
    * the functionality of joining every batch in a data stream with another dataset is not directly exposed
    * in the DStream API. However, you can easily use transform to do this.
    * This enables very powerful possibilities.
    * For example, one can do real-time data cleaning by joining the input data stream with
    * precomputed spam information (maybe generated with Spark as well) and then filtering based on it.
    *
    * Transform 操作允许任意的RDD到RDD的函数被应用于DStream
    * 它可以应用任何能在RDD上应用的函数而不暴露DStream的API
    * 比如join每一批流式数据到另一个数据集而不是直接暴露DStream的API
    * 然而你可以使用transform算子很简单的做一些事情,这是很有用的
    * 比如:对于垃圾邮件的实时数据清理可以使用它做过滤
    *
    *
    * 下面是一个简单的黑名单过滤功能
    * 其实DStream是由一系列的RDD组成的,这个算子是把RDD抽取出来在使用Transform类型的算子
    * 它是一个Transform操作
    */
  def f1(ssc:StreamingContext): Unit ={
    val line = ssc.socketTextStream("192.168.29.160",9999)

    val blackNameArr = Array("雄霸","绝无神","帝释天")
    val blackName = ssc.sparkContext.broadcast(blackNameArr)

    val newDStream = line.transform(rdd=>{
                    rdd.map(x=>{(x.split(" ")(1),x)})
                         .filter(x=>{println("filter:"+x);!blackName.value.contains(x._1)})
              //        .foreach(println)
                  })

    newDStream.print()

    ssc.start()
    ssc.awaitTermination()
  }


  /**
    * foreachRDD有类似于Transform的操作
    * @param ssc
    */
  def f2(ssc:StreamingContext): Unit ={
      val blackNameArr = Array("杨戬","哪吒")
      val blackName = ssc.sparkContext.broadcast(blackNameArr)

      val line = ssc.socketTextStream("192.168.29.160",9999)
      line.foreachRDD(rdd=>{
        val r = rdd.map(x=>(x.split(" ")(1),x))
                .filter(x=>{println("filter:"+x);!blackName.value.contains(x._1)})
        r.foreach(println)
      })

    ssc.start()
    ssc.awaitTermination()
  }

/**  *updateStateByKey  * @param ssc  */  def f3(ssc:StreamingContext):Unit ={ ssc.checkpoint("D:\\1") val f = (values:Seq[Int],preValues:Option[Int])=>{ /**  * 在调用这个函数之前已经使用flatMap和map把输入的数据映射为(k,v)格式  * 所有会对每一个key调用一次这个函数  * values:Seq[Int] 经过分组最后 这个key所对应的值 CompactBuffer(1, 1, 1, 1, 1, 1)  * preValues:Option[S] 这个key在本次之前之前的状态 Some(N)  */  val preValue = preValues.getOrElse(0) val currentValue = values.sum println("values:"+values+" currentValue:"+currentValue+" preValues:"+preValues+" preValue:"+preValue) Some(preValue+currentValue) } val line = ssc.socketTextStream("192.168.29.160",9999) line.flatMap(_.split(" ")) .map((_,1)) .updateStateByKey[Int](f) .print() ssc.start() ssc.awaitTermination() } /**  * reduceByKeyAndWindow  * 窗口操作  * @param ssc  */  def f4(ssc:StreamingContext): Unit ={ val line = ssc.socketTextStream("192.168.29.160",9999) line.flatMap(_.split(" ")) .map((_,1)) /**  * reduceFunc:聚合函数/聚合逻辑  * windowDuration:窗口长度  * slideDuration: 窗口的滑动长度  * numPartitions:partitions的数量  * partitioner:分区器  * 注意:使用3个参数的重载方法时不能写 _+_这种形式  */  // def reduceByKeyAndWindow(reduceFunc,windowDuration)  // def reduceByKeyAndWindow(reduceFunc,windowDuration,slideDuration)  // def reduceByKeyAndWindow(reduceFunc,windowDuration,slideDuration,numPartitions)  // def reduceByKeyAndWindow(reduceFunc,windowDuration,slideDuration,partitioner)  .reduceByKeyAndWindow((a:Int,b:Int) =>(a + b), Seconds(20), Seconds(30)) .print() ssc.start() ssc.awaitTermination() } } object T1{ def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("day32").setMaster("local[4]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(5)) val t = new T1 // t.f1(ssc) // t.f2(ssc) // t.f3(ssc)   t.f4(ssc) ssc.stop(false) } }

版权声明:本文为lyzx_in_csdn原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。