消费者代码实例

一.创建配置,最小化配置如下

var props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");//broker地址
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_id_1");//group id
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//key序列化器
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化器

二.生成KafkaConsumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

三.订阅主题
1.订阅主题
consumer.subscribe(Arrays.asList("jc-kafka-1-topic-2"));
2.订阅主题分区
consumer.assign(Arrays.asList(new TopicPartition("jc-kafka-1-topic-2", 0)));

 

四.拉取消息

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

五.消费消息
for (ConsumerRecord<String, String> record : records) {
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

六.提交消费位移
1.自动提交,自动提交不需要配置,或配置如下:
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
2.手动提交
consumer.commitSync();//同步

consumer.commitAsync();//异步

七.指定位移消费

consumer.poll(Duration.ofMillis(100));//先调用poll分配到分许
consumer.seek(new TopicPartition("jc-kafka-1-topic-2", 0),10);//指定位移
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));//调用poll拉取消息

也可以根据时间戳查询时间戳对应的偏移量,之后指定位移消费

Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(new HashMap<TopicPartition, Long>());

版权声明:本文为idealemail原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。