Spark Streaming连接kafka消费多个topic只消费其中一个topic一个分区的问题

代码:

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaTest {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR)
    Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.ERROR)
    val spark = SparkSession
      .builder()
      .appName("Spark Jason")
      .master("local[6]")
      .getOrCreate()
    val sc = spark.sparkContext
    val scc = new StreamingContext(sc, Seconds(2))
    val kafkaParams = Map[String, Object](
        "auto.offset.reset" -> "latest"
      , "value.deserializer" -> classOf[StringDeserializer]
      , "key.deserializer" -> classOf[StringDeserializer]
      , "bootstrap.servers" -> "xxx:9092"
      , "group.id" -> "aaaa"
      , "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    val topics = Array( "fact","drools")
    val stream = KafkaUtils.createDirectStream[String, String](
      scc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String] (topics, kafkaParams)
    )
    stream.foreachRDD(rdd=>{
      rdd.foreach(
        line => println(line.value())
      )
    })

    scc.start()
    scc.awaitTermination()
  }

}

现象:

代码中监听了两个topic,但每次只消费到其中一个topic的一个分区的数据
查看消费者发现另一个topic一直堆积没消费:

解决方案:

查看pom依赖文件发现kafka依赖版本是0.10.1.0

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.0</version>
            <scope>${mvn.scop.pro}</scope>
        </dependency>

更换版本后解决:(升级版本或者降级版本0.10.0.1都行)

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.1</version>
            <scope>${mvn.scop.pro}</scope>
        </dependency>

查阅后发现不光是0.10.1.0版本有个问题,0.10.1.10.10.0.2也有这个问题。
而且是只能在local模式下才会出现这个问题,cluster和client模式正常消费。

参考链接:https://issues.apache.org/jira/browse/KAFKA-4547


版权声明:本文为Mr_ye931原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。