package com.lyzx.day32
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
class T1 {
/**
* Transform Operation
*
* The transform operation (along with its variations like transformWith) allows arbitrary
* RDD-to-RDD functions to be applied on a DStream.
* It can be used to apply any RDD operation that is not exposed in the DStream API. For example,
* the functionality of joining every batch in a data stream with another dataset is not directly exposed
* in the DStream API. However, you can easily use transform to do this.
* This enables very powerful possibilities.
* For example, one can do real-time data cleaning by joining the input data stream with
* precomputed spam information (maybe generated with Spark as well) and then filtering based on it.
*
* Transform 操作允许任意的RDD到RDD的函数被应用于DStream
* 它可以应用任何能在RDD上应用的函数而不暴露DStream的API
* 比如join每一批流式数据到另一个数据集而不是直接暴露DStream的API
* 然而你可以使用transform算子很简单的做一些事情,这是很有用的
* 比如:对于垃圾邮件的实时数据清理可以使用它做过滤
*
*
* 下面是一个简单的黑名单过滤功能
* 其实DStream是由一系列的RDD组成的,这个算子是把RDD抽取出来在使用Transform类型的算子
* 它是一个Transform操作
*/
def f1(ssc:StreamingContext): Unit ={
val line = ssc.socketTextStream("192.168.29.160",9999)
val blackNameArr = Array("雄霸","绝无神","帝释天")
val blackName = ssc.sparkContext.broadcast(blackNameArr)
val newDStream = line.transform(rdd=>{
rdd.map(x=>{(x.split(" ")(1),x)})
.filter(x=>{println("filter:"+x);!blackName.value.contains(x._1)})
// .foreach(println)
})
newDStream.print()
ssc.start()
ssc.awaitTermination()
}
/**
* foreachRDD有类似于Transform的操作
* @param ssc
*/
def f2(ssc:StreamingContext): Unit ={
val blackNameArr = Array("杨戬","哪吒")
val blackName = ssc.sparkContext.broadcast(blackNameArr)
val line = ssc.socketTextStream("192.168.29.160",9999)
line.foreachRDD(rdd=>{
val r = rdd.map(x=>(x.split(" ")(1),x))
.filter(x=>{println("filter:"+x);!blackName.value.contains(x._1)})
r.foreach(println)
})
ssc.start()
ssc.awaitTermination()
}
/** *updateStateByKey * @param ssc */ def f3(ssc:StreamingContext):Unit ={ ssc.checkpoint("D:\\1") val f = (values:Seq[Int],preValues:Option[Int])=>{ /** * 在调用这个函数之前已经使用flatMap和map把输入的数据映射为(k,v)格式 * 所有会对每一个key调用一次这个函数 * values:Seq[Int] 经过分组最后 这个key所对应的值 CompactBuffer(1, 1, 1, 1, 1, 1) * preValues:Option[S] 这个key在本次之前之前的状态 Some(N) */ val preValue = preValues.getOrElse(0) val currentValue = values.sum println("values:"+values+" currentValue:"+currentValue+" preValues:"+preValues+" preValue:"+preValue) Some(preValue+currentValue) } val line = ssc.socketTextStream("192.168.29.160",9999) line.flatMap(_.split(" ")) .map((_,1)) .updateStateByKey[Int](f) .print() ssc.start() ssc.awaitTermination() } /** * reduceByKeyAndWindow * 窗口操作 * @param ssc */ def f4(ssc:StreamingContext): Unit ={ val line = ssc.socketTextStream("192.168.29.160",9999) line.flatMap(_.split(" ")) .map((_,1)) /** * reduceFunc:聚合函数/聚合逻辑 * windowDuration:窗口长度 * slideDuration: 窗口的滑动长度 * numPartitions:partitions的数量 * partitioner:分区器 * 注意:使用3个参数的重载方法时不能写 _+_这种形式 */ // def reduceByKeyAndWindow(reduceFunc,windowDuration) // def reduceByKeyAndWindow(reduceFunc,windowDuration,slideDuration) // def reduceByKeyAndWindow(reduceFunc,windowDuration,slideDuration,numPartitions) // def reduceByKeyAndWindow(reduceFunc,windowDuration,slideDuration,partitioner) .reduceByKeyAndWindow((a:Int,b:Int) =>(a + b), Seconds(20), Seconds(30)) .print() ssc.start() ssc.awaitTermination() } } object T1{ def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("day32").setMaster("local[4]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(5)) val t = new T1 // t.f1(ssc) // t.f2(ssc) // t.f3(ssc) t.f4(ssc) ssc.stop(false) } }版权声明:本文为lyzx_in_csdn原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。