我们知道,kafka数据分区与spark数据分区一一对应,如果kafka数据倾斜,势必造成spark数据倾斜,在spark-streming源码类
DirectKafkaInputDStream中,compute方法中有对每批次数据的任务切分,在这里修改一下源码,限制生成的每个task消费的数据量,也就是说,将kafka中数据量偏多的分区的数据切分成多个task进行消费,实现逻辑如下:主要的改动就是增加一个参数,控制每个task消费的最大的消息条数,如果超过则切分成多个任务。不过修改后有一个问题,就是由于kafka consumer多线程下消费有问题,因此需要关闭spark-streaming的consumer缓存功能。
val offsetRanges = untilOffsets.flatMap { case (tp, uo) =>
var currentFo = currentOffsets(tp)
var currentUo = currentFo + maxMessagePerTask
val offsetRangeArray = new ju.LinkedList[OffsetRange]()
while (uo > currentUo) {
val or = OffsetRange(tp.topic(), tp.partition(), currentFo, currentUo)
offsetRangeArray.add(or)
currentFo = currentUo
currentUo = currentFo + maxMessagePerTask
}
val or = OffsetRange(tp.topic, tp.partition, currentFo, uo)
offsetRangeArray.add(or)
offsetRangeArray.asScala
}版权声明:本文为TNT_wang原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。