Spark Streaming 集成 Kafka 详解

概述

Spark Streaming 支持多种输入源数据的读取,其中基本数据源有:File System、Socket connections;而高级数据源有:Kafka、Flume、Kinesis等。但是高级数据源需要额外依赖,而且不能在 Spark Shell 中测试这些高级数据源,如果想要在Spark Shell 中测试需要下载依赖到Spark 依赖库中。

关于读取Kafka 的方式,Spark Streaming 官方提供了两种方式:Receiver 和 Direct,此两种读取方式存在很大的不同,当然也各有优劣,接下来就让我们具体刨解这两种数据读取方式。此外,因为Kafka 在0.8 和0.10 之间引入了一个新的消费者API,因此有两个独立的Spark Streaming 包可用,根据功能来选择合适的包,0.8 版本兼容后来的0.9 和0.10,但是0.10 不向前兼容。官网地址:http://spark.apache.org/docs/2.1.2/streaming-kafka-integration.html

 

Receiver-based Approach

Spark 官方最先提供了基于 Receiver 的Kafka 数据消费模式。但会存在程序失败丢失数据的风险,在Spark 1.2 时引入了一个配置参数 spark.streaming.receiver.writeAheadLog.enable (WAL)以规避此风险。

 

Receiver-based 读取方式

Receiver-based 的Kafka 读取方式是基于Kafka 高阶API 来实现对Kafka 数据的消费。在提交Spark Streaming 任务后,Spark 集群会划出指定的Receivers 来专门、持续不断、异步读取Kafka 数据,读取时间间隔以及每次去读offsets 范围可以由参数来配置。读取的数据保存在Receiver 中,具体StorageLevel 方式由用户指定,入MEMORY_ONLY等。当driver 触发Batch 任务的时候,Receivers 中的数据会转移到剩余的Executors 中去执行。在执行完后,Receivers 会相应更新Zookeeper 的offsets。如要确保at least once 的读取方式,可以设置spark.streaming.receiver.writeAheadLog.enable为true。集体Receiver 执行流程如下图:

 

Receiver-based 读取实现

Kafka 的high-level 数据读取方式让用户可以专注于所读数据,而不用关注或维护consumer 的offsets,这减少用户的工作量以及代码量而且相对比较简单。因此,在刚开始引入Spark Streaming 计算引擎时,我们优先考虑采用此种方式来读取数据,具体代码如下:

# 添加依赖

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.1.2
 
# 调用方法
import org.apache.spark.streaming.kafka._

val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) 

# 注意
1. Kafka中的主题分区与Spark Streaming中生成的RDD分区无关。因此,增加KafkaUtils.createStream()中特定于主题的分区的数量只会增加使用单个接收器中使用的主题的线程数。它不会增加Spark在处理数据时的并行性。
2. 可以使用不同的组和主题创建多个Kafka输入DStream,以使用多个接收器并行接收数据。
3. 如果开启了WAL,可以使用KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

# 部署
对于Scala和Java应用程序,如果您使用SBT或Maven进行项目管理,则将spark-streaming-kafka-0-8_2.11及其依赖项打包到应用程序JAR中。确保spark-core_2.11和spark-streaming_2.11被标记为provider的依赖项,因为它们已存在于Spark安装中。然后使用spark-submit启动您的应用程序.
./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.2 ...
或者
you can also download the JAR of the Maven artifact spark-streaming-kafka-0-8-assembly from the Maven repository and add it to spark-submit with --jars.    
 
# 参考资料
官方地址:http://spark.apache.org/docs/2.1.2/streaming-kafka-0-8-integration.html

 

Receiver-based 读取问题

为了防数据丢失,我们做了checkpoint 操作以及配置了spark.streaming.receiver.writeAheadLog.enable 参数,提高了receiver 的吞吐量。采用MEMORY_AND_DISK_SER 方式读取数据、提高单Receiver 的内存或事调大并行度,将数据分散到多个Receiver 中去,但是也是会出现各种情况的问题:

  • 配置spark.streaming.receiver.writeAheadLog.enable参数,每次处理之前需要将该batch内的日志备份到checkpoint目录中,这降低了数据处理效率,反过来又加重了Receiver端的压力;另外由于数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。
  • 采用MEMORY_AND_DISK_SER降低对内存的要求。但是在一定程度上影响计算的速度。
  • 单Receiver内存。由于receiver也是属于Executor的一部分,那么为了提高吞吐量,提高Receiver的内存。但是在每次batch计算中,参与计算的batch并不会使用到这么多的内存,导致资源严重浪费。
  • 提高并行度,采用多个Receiver来保存Kafka的数据。Receiver读取数据是异步的,并不参与计算。如果开较高的并行度来平衡吞吐量很不划算。
  • Receiver和计算的Executor的异步的,那么遇到网络等因素原因,导致计算出现延迟,计算队列一直在增加,而Receiver则在一直接收数据,这非常容易导致程序崩溃。
  • 在程序失败恢复时,有可能出现数据部分落地,但是程序失败,未更新offsets的情况,这导致数据重复消费。

 

Direct Approach(No Receivers)

区别与Receiver-based 的数据消费方法,Spark 官方在 1.3 时引入了Direct 方式的Kafka 消费方式。相对于Receiver-based 的方法,Direct 方式具有一下的优势:

  • 简化并行性
    • 无需创建多个输入Kafka流并将它们联合起来。使用directStream,Spark Streaming将创建与要使用的Kafka分区一样多的RDD分区,这些分区将并行地从Kafka读取数据。因此,Kafka和RDD分区之间存在一对一的映射,这更容易理解和调整。
  • 高效
    • 在第一种方法中实现零数据丢失需要将数据存储在Write Ahead Log中,这进一步复制了数据。这实际上是低效的,因为数据有效地被复制两次 - 一次由Kafka复制,第二次由Write Ahead Log复制。第二种方法消除了问题,因为没有接收器,因此不需要Write Ahead Logs。只要您有足够的Kafka保留,就可以从Kafka恢复消息。
  • 强一致性
    • 第一种方法使用Kafka的高级API在Zookeeper中存储消耗的偏移量。传统上,这是从Kafka使用数据的方式。虽然这种方法(与预写日志结合使用)可以确保零数据丢失(即至少一次语义),但某些记录在某些故障下可能会被消耗两次的可能性很小。这是因为Spark Streaming可靠接收的数据与Zookeeper跟踪的偏移之间存在不一致。因此,在第二种方法中,我们使用不使用Zookeeper的简单Kafka API。 Spark Streaming在其检查点内跟踪偏移量。这消除了Spark Streaming和Zookeeper / Kafka之间的不一致,因此尽管出现故障,Spark Streaming也会有效地接收每条记录一次。为了实现输出结果的一次性语义,将数据保存到外部数据存储的输出操作必须是幂等的,或者是保存结果和偏移的原子事务。

 

Direct 读取方式

Direct 方式采用Kafka 简单的 consumer API 方式来读取数据,无需经由Zookeeper,此种方式不再需要专门的Receiver 来持续不断的读取数据。当Batch 任务触发时,由Executor 读取数据,并参与到其他的Executor 的数据计算过程中去。driver 来决定读取多少offsets,并将offsets 交由checkpoint 来维护。当触发下次Batch 任务时,再由executor 读取Kafka 数据并计算。从此过程我们可以发现Direct 方式无需Receiver 读取数据,而是需要计算时在读取数据,所以Direct 方式的数据消费对内存的要求不高,只需要考虑批量计算所需要的内存即可;另外Batch 任务堆积时,也不会影响数据堆积。其具体读取方式如图:

 

Direct 去读实现

# 引入依赖

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.11
version = 2.1.2

# 调用方法

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

# 参考地址:
官方地址:http://spark.apache.org/docs/2.1.2/streaming-kafka-0-10-integration.html

 

Direct 读取问题

Direct 方式与Receiver-based 方式相比,具有以下优势:

  • 降低资源
    • Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。 降低内存。Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。
  • 鲁棒性更好
    • Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。
  • 加大了开发成本
    • Direct需要用户采用checkpoint或者第三方存储来维护offsets,而不像Receiver-based那样,通过ZooKeeper来维护Offsets,此提高了用户的开发成本。
  • 不方便可视化监控
    • Receiver-based方式指定topic指定consumer的消费情况均能通过ZooKeeper来监控,而Direct则没有这种便利,如果做到监控并可视化,则需要投入人力开发

 

kafka offsets 管理方式

Spark Streaming 集成了 Kafka 允许用户从 Kafka 中读取一个或多个 topic 的数据。一个 Kafka topic 包含多个存储消息的分区(partition)。每个分区中的消息都是顺序存储,并且用 offset 来标记消息。开发者可以在 Spark Streaming 应用中通过offset 来控制数据的读取位置,此时就需要好的 offset 的管理机制。

Offsets 管理对于保证流式应用在整个生命周期中数据的连贯性式非常有益的。比如,在应用停止或者报错推出之前没有将 offset 保存在持久化数据库中,那么 offset rangges 就会丢失。进一步说,如果没有保存每个分区已经读取的 offset,那么 Spark Streaming 就没有办法从上次断开的位置继续读取消息。

上图描述通常的 Spark Streaming 应用管理 offset 流程。Offsets 可以通过多种方式来管理,但是一般都遵循下面的步骤:

  • 在 Direct DStream 初始化的说话,需要指定一个包含每个 topic 的每个分区的 offset 用于让 Direct DStream 从指定位置读取数据
  • 读取并处理数据
  • 处理完之后存储结果数据
  • 最后,将 offsets 保存在外部持久化数据库如 HBase,Kafka,HDFS,Zookeeper等

 

Spark Streming checkpoints

使用 Spark Streaming 的 checkpoint 是最简单的存储方式,并且在 Spark 框架中很容易实现。Spark Streaming checkpoint 就是为了保存应用状态而设计的,我们将路径设置在 HDFS 上,所以能够从失败中恢复数据。

对 Kafka Stream 执行 checkpoint 操作使得 offset 保存在 checkpoint 中,如果是应用挂掉的话,那么 Spark Streaming 应用功能可以从保存的 offset 中开始读取消息。但是,如果对 Spark Streaming 应用进行升级的话,不能 checkpoint 的数据没有使用,所以这种机制并不可靠,因此我们不推荐这种方式。

 

将 offsets 存储在 HBase 中

HBase 可以作为一个可靠的外部数据库来持久化 offsets。通过将 offsets 存储在外部系统中,Spark Streaming 应用功能能够重读或者回放任何仍然存储在 Kafka 中的数据。

根据 HBase 的设计模式,允许应用能够以 rowkey 和 column 的结构将过个 Spark Streaming 应用和多个 Kafka topic 存放在一张表格中。在这个例子中,表格以 topic 名称、消费者 group ID 和 Spark Streaming 的 batchTime.milliSeconds 作为 rowkey 以做唯一标示。尽管 batchTime.milliSeconds 不是必须的,但是它能够更好的展示历史的每批次的 offsets。表格将存储30天的积累数据,如果超出30天则会被移除。下面是创建表格的 DDL 和结构

DDL
create 'stream_kafka_offsets', {NAME=>'offsets', TTL=>2592000}

RowKey Layout:
row:              <TOPIC_NAME>:<GROUP_ID>:<EPOCH_BATCHTIME_MS>
column family:    offsets
qualifier:        <PARTITION_ID>
value:            <OFFSET_ID>

 

具体代码实现如下:

import kafka.utils.ZkUtils
import org.apache.hadoop.hbase.filter.PrefixFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.client.{Scan, Put, ConnectionFactory}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.{OffsetRange, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}



/**
 * Created by gmedasani on 6/10/17.
 */
object KafkaOffsetsBlogStreamingDriver {

  def main(args: Array[String]) {

    if (args.length < 6) {
      System.err.println("Usage: KafkaDirectStreamTest <batch-duration-in-seconds> <kafka-bootstrap-servers> " +
        "<kafka-topics> <kafka-consumer-group-id> <hbase-table-name> <kafka-zookeeper-quorum>")
      System.exit(1)
    }

    val batchDuration = args(0)
    val bootstrapServers = args(1).toString
    val topicsSet = args(2).toString.split(",").toSet
    val consumerGroupID = args(3)
    val hbaseTableName = args(4)
    val zkQuorum = args(5)
    val zkKafkaRootDir = "kafka"
    val zkSessionTimeOut = 10000
    val zkConnectionTimeOut = 10000

    val sparkConf = new SparkConf().setAppName("Kafka-Offset-Management-Blog")
                                  .setMaster("local[4]")//Uncomment this line to test while developing on a workstation
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong))
    val topics = topicsSet.toArray
    val topic = topics(0)

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> bootstrapServers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> consumerGroupID,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    /*
    Create a dummy process that simply returns the message as is.
     */
    def processMessage(message:ConsumerRecord[String,String]):ConsumerRecord[String,String]={
      message
    }

    /*
    Save Offsets into HBase
     */
    def saveOffsets(TOPIC_NAME:String,GROUP_ID:String,offsetRanges:Array[OffsetRange],hbaseTableName:String,
                    batchTime: org.apache.spark.streaming.Time) ={
      val hbaseConf = HBaseConfiguration.create()
      hbaseConf.addResource("src/main/resources/hbase-site.xml")
      val conn = ConnectionFactory.createConnection(hbaseConf)
      val table = conn.getTable(TableName.valueOf(hbaseTableName))
      val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(batchTime.milliseconds)
      val put = new Put(rowKey.getBytes)
      for(offset <- offsetRanges){
        put.addColumn(Bytes.toBytes("offsets"),Bytes.toBytes(offset.partition.toString),
          Bytes.toBytes(offset.untilOffset.toString))
      }
      table.put(put)
      conn.close()
    }

    /*
    Returns last committed offsets for all the partitions of a given topic from HBase in following cases.
      - CASE 1: SparkStreaming job is started for the first time. This function gets the number of topic partitions from
        Zookeeper and for each partition returns the last committed offset as 0
      - CASE 2: SparkStreaming is restarted and there are no changes to the number of partitions in a topic. Last
        committed offsets for each topic-partition is returned as is from HBase.
      - CASE 3: SparkStreaming is restarted and the number of partitions in a topic increased. For old partitions, last
        committed offsets for each topic-partition is returned as is from HBase as is. For newly added partitions,
        function returns last committed offsets as 0
     */
    def getLastCommittedOffsets(TOPIC_NAME:String,GROUP_ID:String,hbaseTableName:String,zkQuorum:String,
                                zkRootDir:String, sessionTimeout:Int,connectionTimeOut:Int):Map[TopicPartition,Long] ={

      val hbaseConf = HBaseConfiguration.create()
      hbaseConf.addResource("src/main/resources/hbase-site.xml")
      val zkUrl = zkQuorum+"/"+zkRootDir
      val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl,sessionTimeout,connectionTimeOut)
      val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2,false)
      val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME)).get(TOPIC_NAME).toList.head.size

      //Connect to HBase to retrieve last committed offsets
      val conn = ConnectionFactory.createConnection(hbaseConf)
      val table = conn.getTable(TableName.valueOf(hbaseTableName))
      val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(System.currentTimeMillis())
      val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0
      val scan = new Scan()
      val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(stopRow.getBytes).setReversed(true))
      val result = scanner.next()

      var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0
      if (result != null){
        //If the result from hbase scanner is not null, set number of partitions from hbase to the number of cells
        hbaseNumberOfPartitionsForTopic = result.listCells().size()
      }

      val fromOffsets = collection.mutable.Map[TopicPartition,Long]()

      if(hbaseNumberOfPartitionsForTopic == 0){
        // initialize fromOffsets to beginning
          for (partition <- 0 to zKNumberOfPartitionsForTopic-1){
            fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)}
      } else if(zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic){
        // handle scenario where new partitions have been added to existing kafka topic
          for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1){
            val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),Bytes.toBytes(partition.toString)))
            fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)}
          for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic-1){
            fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)}
      } else {
        //initialize fromOffsets from last run
          for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1 ){
            val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),Bytes.toBytes(partition.toString)))
            fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)}
      }
      scanner.close()
      conn.close()
      fromOffsets.toMap
    }


    val fromOffsets= getLastCommittedOffsets(topic,consumerGroupID,hbaseTableName,zkQuorum,zkKafkaRootDir,
      zkSessionTimeOut,zkConnectionTimeOut)
    val inputDStream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Assign[String, String](
      fromOffsets.keys,kafkaParams,fromOffsets))

    /*
      For each RDD in a DStream apply a map transformation that processes the message.
    */
    inputDStream.foreachRDD((rdd,batchTime) => {
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      offsetRanges.foreach(offset => println(offset.topic, offset.partition, offset.fromOffset,offset.untilOffset))
      val newRDD = rdd.map(message => processMessage(message))
      newRDD.count()
      saveOffsets(topic,consumerGroupID,offsetRanges,hbaseTableName,batchTime) //save the offsets to HBase
    })

    println("Number of messages processed " + inputDStream.count())
    ssc.start()
    ssc.awaitTermination()
  }
}

 

将 offsets 存储到 Zookeeper 中

在 SparkStreaming 连接 Kafka 应用中使用 Zookeeper 来存储 offsets 也是一种比较可靠的方式。

在这个方案中,Spark Streaming 任务在启动时回去 Zookeeper 中读取每个分区的 offsets。如果有新的分区出现,那么他的 offset 将会设置在最开始的位置。在每批数据处理完之后,用户需要存储已处理数据的一个 offset 。此外,新消费者将使用跟旧的 Kafka 消费者 API 一样的格式将 offset 保存在 Zookeeper 中。因此,任何追踪或监控 Zookeeper 中 Kafka offset 的工具仍然生效。

package com.baidu.hec.zk

import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils}
import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import scala.util.Try

/**
  * Kafka的连接和Offset管理工具类
  *
  * @param zkHosts     Zookeeper地址
  * @param kafkaParams Kafka启动参数
  * @author Leibniz
  */
class KafkaManager(zkHosts: String, kafkaParams: Map[String, Object]) extends Serializable {

  //Logback日志对象,使用slf4j框架
  @transient private lazy val log = LoggerFactory.getLogger(getClass)

  //建立ZkUtils对象所需的参数
  val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(zkHosts, 3000, 3000)

  //ZkUtils对象,用于访问Zookeeper
  val zkUtils = new ZkUtils(zkClient, zkConnection, false)

  /**
    * 包装createDirectStream方法,支持Kafka Offset,用于创建Kafka Streaming流
    *
    * @param ssc    Spark Streaming Context
    * @param topics Kafka话题
    * @tparam K Kafka消息Key类型
    * @tparam V Kafka消息Value类型
    * @return Kafka Streaming流
    * @author Leibniz
    */
  def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] = {
    val groupId = kafkaParams("group.id").toString
    val storedOffsets = readOffsets(topics, groupId)
    log.info("Kafka消息偏移量汇总(格式:(话题,分区号,偏移量)):{}", storedOffsets.map(off => (off._1.topic, off._1.partition(), off._2)))
    val kafkaStream = KafkaUtils.createDirectStream[K, V](ssc, PreferConsistent, ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, storedOffsets))
    kafkaStream
  }

  /**
    * 从Zookeeper读取Kafka消息队列的Offset
    *
    * @param topics  Kafka话题
    * @param groupId Kafka Group ID
    * @return 返回一个Map[TopicPartition, Long],记录每个话题每个Partition上的offset,如果还没消费,则offset为0
    * @author Leibniz
    */
  def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = {
    val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
    val partitionMap = zkUtils.getPartitionsForTopics(topics)

    // /consumers/<groupId>/offsets/<topic>/
    partitionMap.foreach(topicPartitions => {
      val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)

      topicPartitions._2.foreach(partition => {
        val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition

        val tryGetKafkaOffset = Try {
          val offsetStatTuple = zkUtils.readData(offsetPath)
          if (offsetStatTuple != null) {
            log.info("查询Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)
            topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong)
          }
        }

        if(tryGetKafkaOffset.isFailure){
          //http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
          val consumer = new KafkaConsumer[String, Object](kafkaParams)
          val partitionList = List(new TopicPartition(topicPartitions._1, partition))
          consumer.assign(partitionList)
          val minAvailableOffset = consumer.beginningOffsets(partitionList).values.head
          consumer.close()
          log.warn("查询Kafka消息偏移量详情: 没有上一次的ZK节点:{}, 话题:{}, 分区:{}, ZK节点路径:{}, 使用最小可用偏移量:{}", Seq[AnyRef](tryGetKafkaOffset.failed.get.getMessage, topicPartitions._1, partition.toString, offsetPath, minAvailableOffset): _*)
          topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), minAvailableOffset)
        }
      })
    })

    topicPartOffsetMap.toMap
  }

  /**
    * 保存Kafka消息队列消费的Offset
    *
    * @param rdd            SparkStreaming的Kafka RDD,RDD[ConsumerRecord[K, V]]
    * @param storeEndOffset true=保存结束offset, false=保存起始offset
    * @author Leibniz
    */
  def persistOffsets[K, V](rdd: RDD[ConsumerRecord[K, V]], storeEndOffset: Boolean = true): Unit = {
    val groupId = kafkaParams("group.id").toString
    val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    offsetsList.foreach(or => {
      val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic)
      val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition
      val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
      zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "" /*, JavaConversions.bufferAsJavaList(acls)*/)
      log.info("保存Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)
    })
  }
}

 

package com.baidu.hec.zk

import com.alibaba.fastjson.JSON
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Test {

  def main(args: Array[String]): Unit = {
    val zkHosts = "0.0.0.0:1181"

    val kafkaParams = Map[String, Object](
      "auto.offset.reset" -> "latest",
      "bootstrap.servers" -> "0.0.0.0:9092",
      "group.id" -> "test_topic_group",
      "enable.auto.commit" -> (false: java.lang.Boolean), //禁用自动提交Offset,否则可能没正常消费完就提交了,造成数据错误
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer])

    val km = new KafkaManager(zkHosts, kafkaParams)

    val sparkConf = new SparkConf().setAppName("SearchContentAnalysis").setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(60))

    val topics = Array("test-topic")

    val stream: InputDStream[ConsumerRecord[String, String]] = km.createDirectStream(ssc, topics)

    stream.foreachRDD(rdd => {
      val message = rdd.map(recode => recode.value())

      if(!message.isEmpty()) {

        val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
        import spark.implicits._

        val dataFrame = message.map(data =>{
          val jsonData = JSON.parseObject(data)
          val content = jsonData.getString("content")
          val userId = jsonData.getLong("userId")
          val createTime = jsonData.getLong("createTime")
          Record(content, userId, createTime)
        }).toDF()

        // val dataFrame = message.toDF()
        dataFrame.createOrReplaceTempView("temp_search_table")
        spark.sql("select * from temp_search_table").show(100)

        km.persistOffsets(rdd)
        printf("******************************************")

      }
    })

    ssc.start()
    ssc.awaitTermination()
  }

  case class Record(content: String, userId: Long, createTime: Long)

  object SparkSessionSingleton {

    @transient private var instance: SparkSession = _

    def getInstance(sparkConf: SparkConf): SparkSession = {
      if (instance == null) {
        instance = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
      }
      instance
    }
  }

}

 

Kafka 本身

Apache Spark 2.1.x以及spark-streaming-kafka-0-10使用新的的消费者API即异步提交API。你可以在你确保你处理后的数据已经妥善保存之后使用commitAsync API(异步提交 API)来向Kafka提交offsets。新的消费者API会以消费者组id作为唯一标识来提交offsets。

将 offsets 提交到 Kafka 中:

stream.foreachRDD { rdd =>

  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed

  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

}

 

但是该中方式在测试中,无法根据 offset 来消费消息。

参考:https://juejin.im/entry/5acd7224f265da237c693f7d


版权声明:本文为pizicai007原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。