KafkaConsumer

KafkaConsumer

消费者群组和分区再平衡

一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将有群组里的其他消费者来读取。在主题发生变化时,比如加入了新的分区,也将发生重新分配。

分区的所有权从一个消费者转移到另一个消费者,称为再均衡。再均衡期间,消费者无法读取消息,造成整个群组一小段时间不可用。当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,有可能还需要去刷新缓存。这样会拖慢整个应用程序。

消费者通过向被指派为群组(消费者群组)协调器的broker发送心跳来维持他们和群组的从属关系以及它们对分区的所有权关系。消费者以正常时间间隔发送心跳,就认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。如果消费者停止心跳时间足够长,会话就会过期,群组协调器认为它已经死亡,会触发一次再均衡。

如果消费者是崩溃停止的,那么群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。

max.poll.interval.ms

轮询消息的时间间隔,如果超过,会导致broker认为消费者已经崩溃了。这个用来设置可以多长时间不去轮询数据。

分配分区的过程

当消费者要加入群组时,首先它会向群组协调器发送一个JoinGroup请求。第一个加入群组的消费者将成为"群主",群主从协调器那获取群组成员列表,并负责给每一个消费者分配分区。使用一个PartitionAssignor接口的类决定哪些分区应该被分配给哪个消费者。这里有两种分配算法,分配完毕后,群主把分配情况列表发送到群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道群里所有消费者的分配信息。

创建消费者
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
订阅主题
consumer.subscribe(Collections.singletonList("customerCountries"));
consumer.subscribe("test.*");
轮询
try {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        
    }
} finally {
    consumer.close();
}
线程安全

在同一个群组里,无法让一个线程运行多个消费者,也无法让多个线程安全地共享一个消费者。按照规则,一个消费者使用一个线程

fetch.min.bytes

指定消费者从服务器获取记录的最小字节数。当broker可用的数据小于fetch.min.bytes指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者,这样可以降低消费者和broker的工作负载。

fetch.max.wait.ms

用于指定broker的等待时间,默认是500ms。

max.partition.fetch.bytes

该属性指定了服务器从每个分区里返回给消费者的最大字节数。根据这个属性,估算消费者的分区拥有数量,计算需要的内存。

session.timeout.ms

该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间。这个参数要比heartbeat.interval.ms大,hearbeat.interval.ms是指消费者应该多久发一次心跳。

auto.offset.reset

指定了消费者在读取一个没有偏移量的分区或者便宜量无效的情况下,该如何处理。方式一是latest,意思是从最新的开始读。方式二是earliest,意思是从起始位置开始读。

enable.auto.commit

设置消费者是否自动提交偏移量,默认为true。

partition.assignment.strategy

Range策略和RoundRobin策略,将分区分给消费者的策略。

max.poll.records

控制单次调用poll()方法能够返回的记录数量。

receive.buffer.bytes和send.buffer.bytes

读写时用到的TCP缓冲区可以设置大小,如果跨数据中心可以稍微设置大一些。

提交偏移量

更新分区当前位置的操作叫做提交。

消费者往一个叫作_consumer_offset的特殊主题发送消息,消息包含每个分区的偏移量。

自动提交

enable.auto.commit设置为true,那么每过5s,消费者就会自动把接收到的最大偏移量提交上去。

提交当前偏移量

auto.commit.offset设为false,再使用commitSync()提交偏移量,这个API会提交由poll()方法返回的最新偏移量。

异步提交偏移量

commitAsync();异步提交的API,不足之处是提交失败没有重试,原因是异步重试可能小序号比大序号慢,导致自动偏移量变小。当分区重新平衡,导致消息重复。但是该API提供了一种回调,我们可以这样做:在每次提交偏移量之后或在回调里提交偏移量时递增序列号。在进行重试前,先检查回调的序列号和即将提交的偏移量是否相等,相等,说明没新的提交,可以安全重试。

同步和异步组合提交

因为异步提交通常没办法保证确定提交,但是我们要保证分区再均衡和关闭消费者前的提交一定成功。因此要用同步和异步结合。

try {
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("topic=%s",record.topic());
        }
        consumer.commitAsync();
    }
} catch(Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}
提交特定的偏移量
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = 
    new HashMap<>();
int count = 0;
while (true) {
    currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(reocrd.offset()+1, "no metadata"));
    if (count % 1000 == 0) 
        consumer.commitAsync(currentOffsets, null);
    count++;
}
再均衡监听器

在消费者分配新分区或移除旧分区时,可以通过消费者API执行一些应用程序代码,在调用subscribe()方法时,传入一个ConsumerRebalanceListener实际例就可以了。

(1)public void onPartitionRevoked(Collection partitions)方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道从哪里开始读取分区。

(2)public void onPartitionsAssigned(Collection partitions)方法会在重新分配分区之后和消费者开始读取消息之前被调用。

从特定偏移量处开始处理记录

保证数据的严格唯一性方式的方法之一,假如我们要将记录存在数据库里,但是不希望有数据重复,也不希望有数据丢失。那么可以这么做: 将记录和偏移量以原子的方式入库,这样子就能保证记录和偏移量一致,然后消费者通过ConsumerRebalanceListener和seek()方法能够及时保存偏移量并保证消费者总是能够从正确的位置开始读取消息。

如何退出

consumer.wakeup()方法是唯一一个可以从其他线程里安全调用的方法。调用consumer.wakeup()可以退出poll(),并抛出WakeupException异常。在退出线程之前调用consumer.close()是很有必要的,它会提交任何还没有提交的东西,并向群组协调器发送消息,告知自己要离开群组,接下来会发生再均衡,而不需要等待会话超时。

反序列化器

建议使用标准的序列化与反序列化器。

独立消费者——为什么以及怎么样使用没有群组的消费者

有时候你只需要一个消费者从一个主题的所有分区或者某个特定分区读取数据,这个时候就不需要消费者群组和再均衡了,只需要把主题或者分区分配给消费者,然后开始读取消息并提交偏移量。一个消费者要分订阅主题要么为自己分配分区,但是不可同时进行这两样。


版权声明:本文为Xjzzon原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。