SpringBoot整合kafka(实现consumer)

如何在springboot中集成kafka收消息。

1.pom.xml引入依赖的jar包

<dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>1.0.0.RELEASE</version>
</dependency>         
<dependency>
       <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.1</version>
</dependency>  

注意:此处的kafka版本应与服务器端的版本号完全一致。

2.application.properties文件中配置消费者

采用Kafka提供的StringSerializer和StringDeserializer进行序列化和反序列化

#=============== consumer  =======================
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=test
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=true
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3.消费者监听器

消费者监听topic=testTopic的消息

@Component
public class ConsumerListener {

    @KafkaListener(topics = "testTopic")
    public void onMessage(String message){
        System.out.println(message);
    }
}

版权声明:本文为Shero__Y原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。