在配置项目配置整合kafka时遇到了坑,要进行整合的小伙伴们要注意版本一致的问题哦,运行不成功一定是版本的问题。
1 kafka安装
1.1 Windows
查看安装https://blog.csdn.net/github_38482082/article/details/82112641
2 启动kafka
启动kafka内置的zookeeper
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
出现binding to port …表示zookeeper启动成功,不关闭页面
kafka服务启动 ,成功不关闭页面
.\bin\windows\kafka-server-start.bat .\config\server.properties
创建topic测试主题kafka,成功不关闭页面
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
创建生产者产生消息,不关闭页面
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
创建消费者接收消息,不关闭页面
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
3 springboot整合kafka
3.1 生产者
创建空的springboot项目
3.1.1 pom.xml引包
注意版本号,引不对会报错的,我就是一直卡在这springboot和kafka的包会有问题,目前这个配置是没有问题的。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<!--<version>2.2.6.RELEASE</version>-->
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<!--kafka-clients发送消息所需jar包-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
3.1.2 application.properties配置文件
#============== kafka ===================
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=127.0.0.1:9092
#=============== provider =======================
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432
#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
spring.kafka.producer.acks=1
server.servlet.context-path=/kafka
# 指定生产者消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
server.port=8090
# 配置kafka的主题
kafka.topis=testTopic
3.1.3 测试
配置好了,我们开始测试吧!,开启消费者窗口
注意:修改为自己代码配置的topic 我的是testTopic
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic testTopic --from-beginning
测试代码
package com.dml.kafka;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = {KafkaApplication.class})
public class ProducerTest {
@Autowired
private KafkaTemplate kafkaTemplate;
@Test
public void send() {
String message = "hello world!";
String topic = "testTopic";
kafkaTemplate.send(topic,message);
System.out.println("发送消息===》" + message);
}
}
生产者运行结果:
让我们看看消费者的结果吧:可能会发送完了,没看到结果,回车看看,我的就是回车就看到了:

3.2 消费者
创建一个新的springboot
3.2.1 pom.xml文件
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
我这里引入了数据库的包了,为后面场景使用
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--jdbc -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!-- mysql 依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
3.2.2 application.properties配置文件
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
spring.datasource.username=root
spring.datasource.password=123456
pagehelper.helperDialect=mysql
pagehelper.reasonable=false
pagehelper.supportMethodsArguments=true
pagehelper.params=count=countSql
pagehelper.page-size-zero=true
#============== kafka ===================
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=localhost:9092
#=============== consumer =======================
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=test-group
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=true
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100
# 指定消费者消息key和消息体的编解码方式
#spring.kafka.consumer.=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.listener.concurrency=3
spring.kafka.consumer.max-poll-records=
server.port=8091
kafka.topis=testTopic
3.2.3 测试
package com.fiberhome.consumer;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* kafka消费者
* @author mld
* @date 2020-03-03 22:30:56
*/
@Component
public class ConsumerListener {
Logger logger = LoggerFactory.getLogger(ConsumerListener.class);
@Autowired
private JdbcTemplate jdbcTemplate;
@KafkaListener(topics = "${kafka.topis}")
public void onMessage(ConsumerRecord<?, ?> record){
System.out.println("消息:" + record.value());
}
}
启动就可以了,如果按照上面的步骤进行操作,你没有挺点之前打开的消费者窗口。窗口和你运行的程序都是会有进行消费的。
如果你希望一个消息只被消费一次,那么你需要设置spring.kafka.consumer.group-id值一直,主题一直,就只会被消费一次了。在同一个组,消息被消费一次,不同的组可以消费多次。
运行结果:

3.3 使用场景
我这个使用场景是在数据入库前已经入到redis缓存了,所以数据入库不用实时去操作,通过kafka进行消费,减少数据库的压力。所以发送的消息是sql。我这封装了消息对象,可以根据自己的需要进行封装。
注意:消费者和生产者不是同一个项目
3.3.1 生产者
3.3.1.1 封装消息对象
/**
* kafka发送的消息
* @author mld
* @date 2020-03-04 18:22:27
*/
public class Message {
public Message(String sql, Object[] param, String key) {
this.sql = sql;
this.param = param;
}
public Message(String sql) {
this.sql = sql;
}
public Message(String sql, Object[] param) {
this.sql = sql;
this.param = param;
}
// 要给kafka执行的sql
private String sql;
// sql的参数
private Object[] param;
// key
private String key;
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public Object[] getParam() {
return param;
}
public void setParam(Object[] param) {
this.param = param;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
}
3.3.1.2 工具类
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
/**
* kafka生产者工具类
*/
@Component
public class KafkaProducerUtils {
Logger logger = LoggerFactory.getLogger(KafkaProducerUtils.class);
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Value("${kafka.topis}")
private String topic;
/**
* 没有参数的sql
* @param msg sql
*/
public void sendSqlMessage(Message msg) {
if (StringUtils.isEmpty(msg.getKey())) {
kafkaTemplate.send(topic , JSON.toJSONString(msg));
} else {
kafkaTemplate.send(topic , JSON.toJSONString(msg),msg.getKey());
}
logger.info("发送消息:{}",msg);
}
}
3.3.1.3 controller
package com.dml.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* 使用kafka发送sql消息
*/
@RestController
@RequestMapping("/send")
public class KafkaSenderController {
@Autowired
private KafkaProducerUtils kafkaProducerUtils;
@Value("${kafka.topis}")
private String topic;
/**
* 更新数据
* @param name 姓名
* @param idcard 身份证
*/
@RequestMapping("/sendUpdateMsg")
public void sendUpdateMsg(String name,String idcard) {
List<Object> param = new ArrayList<>();
param.add(name);
param.add(idcard);
String sql = "update user set name = ? where idcard = ?";
Message msg = new Message(sql,param.toArray());
kafkaProducerUtils.sendSqlMessage(msg);
}
/**
* 插入数据
* @param name 姓名
* @param idcard 身份证
*/
@RequestMapping("/sendInsertMsg")
public void sendInsertMsg(String name,String idcard) {
List<Object> param = new ArrayList<>();
param.add(UUID.randomUUID().toString());
param.add(name);
param.add(idcard);
String sql = "insert into user (id,name,idcard) values(?,?,?)";
Message msg = new Message(sql,param.toArray());
kafkaProducerUtils.sendSqlMessage(msg);
}
}
3.3.2 消费者
package com.fiberhome.consumer;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
/**
* kafka消费者
* @author mld
* @date 2020-03-03 22:30:56
*/
@Component
public class ConsumerListener {
Logger logger = LoggerFactory.getLogger(ConsumerListener.class);
@Autowired
private JdbcTemplate jdbcTemplate;
@KafkaListener(topics = "${kafka.topis}")
public void onMessage(ConsumerRecord<?, ?> record){
if (record.value() != null) {
String jsonStr = String.valueOf(record.value());
Message msg = JSON.parseObject(jsonStr, Message.class);
int result;
if (!StringUtils.isEmpty(msg.getSql())) {
if (msg.getParam() != null && msg.getParam().length > 0) {
result = jdbcTemplate.update(msg.getSql(), msg.getParam());
} else {
result = jdbcTemplate.update(msg.getSql());
}
logger.info("更新数据:{}条,执行sql:{},参数:{}",result,msg.getSql(),msg.getParam());
} else {
logger.info("执行sql为null:{}",record);
}
} else {
logger.info("消息为null:{}",record);
}
}
}
3.3.3 运行结果

好的,你已经学会使用kafka了
4 可视化工具
下载地址:添加链接描述
安装后运行,配置kafka的zookeeper地址,我这是本地的:

配置主题的信息类型,不然你会看到一串数字而已:

配置好了,我们可以查看数据了:

感谢阅读~