数据在网络中由采集到最终处理结束时会存在一条数据在某一个点被重复接受处理的。比如,kafka支持的是至少一次写语义。因此,当写数据到kafka的时候,有些记录可能重复。比如,如果kafka重复发一个消息,该消息一经被broker接收并写入文件但是并没有应答,这种就可能重复。由于kafka的此种写语义,structured streaming不能阻止该种类型数据重复。因此,一旦写入成功,可以假设查询输出是以最少一次语义写入kafka的。一个可行去除重复记录的解决方案是数据中引入一个primary(unique)key,这样就可以在读取数据的时候实行去重。
可以使用事件中的唯一标识符对数据流中的记录进行重复数据删除。这与使用唯一标识符列的静态重复数据删除完全相同。该查询将存储来自先前记录的一定量的数据,以便可以过滤重复的记录。与聚合类似,您可以使用带有或不带有watermark 的重复数据删除功能。
,带watermark:如果重复记录可能到达的时间有上限,则可以在事件时间列上定义watermark ,并使用guid和事件时间列进行重复数据删除。
,不带watermark:由于重复记录可能到达时间没有界限,所以查询将来自所有过去记录的数据存储为状态。
package bigdata.spark.StructuredStreaming.KafkaSourceOperator
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.get_json_object
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
object KafkaDropDuplicate {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
.set("yarn.resourcemanager.hostname", "mt-mdh.local")
.set("spark.executor.instances","2")
.set("spark.default.parallelism","4")
.set("spark.sql.shuffle.partitions","4")
.setJars(List("/Users/meitu/Desktop/sparkjar/bigdata.jar"
,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
,"/opt/jars/kafka-clients-0.10.2.2.jar"
,"/opt/jars/kafka_2.11-0.10.2.2.jar"
,"/opt/jars/spark-sql-kafka-0-10_2.11-2.0.2.jar"))
val spark = SparkSession
.builder
.appName("StructuredKafkaWordCount")
.config(sparkConf)
.getOrCreate()
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","mt-mdh.local:9093")
.option("subscribe", "jsontest")
.load()
val words = df.selectExpr("CAST(value AS STRING)")
val fruit = words.select(
get_json_object($"value", "$.time").alias("timestamp").cast("long")
, get_json_object($"value", "$.fruit").alias("fruit"))
val fruitCast = fruit
.select(fruit("timestamp")
.cast("timestamp"),fruit("fruit"))
// .withWatermark("timestamp", "10 minutes")
.dropDuplicates("fruit")
.groupBy("fruit").count()
fruitCast.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.trigger(Trigger.ProcessingTime(5000))
.option("truncate","false")
.start()
.awaitTermination()
}
}
版权声明:本文为qq_18522601原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。