如何在springboot中集成kafka收消息。
1.pom.xml引入依赖的jar包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
注意:此处的kafka版本应与服务器端的版本号完全一致。
2.application.properties文件中配置消费者
采用Kafka提供的StringSerializer和StringDeserializer进行序列化和反序列化
#=============== consumer =======================
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=test
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=true
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
3.消费者监听器
消费者监听topic=testTopic的消息
@Component
public class ConsumerListener {
@KafkaListener(topics = "testTopic")
public void onMessage(String message){
System.out.println(message);
}
}
版权声明:本文为Shero__Y原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。