SpringbBoot2.0整合kafka

在配置项目配置整合kafka时遇到了坑,要进行整合的小伙伴们要注意版本一致的问题哦,运行不成功一定是版本的问题。

1 kafka安装

1.1 Windows

查看安装https://blog.csdn.net/github_38482082/article/details/82112641

2 启动kafka

  1. 启动kafka内置的zookeeper

    .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

    出现binding to port …表示zookeeper启动成功,不关闭页面

  2. kafka服务启动 ,成功不关闭页面

    .\bin\windows\kafka-server-start.bat .\config\server.properties

  3. 创建topic测试主题kafka,成功不关闭页面

    .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

  4. 创建生产者产生消息,不关闭页面

    .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

  5. 创建消费者接收消息,不关闭页面

    .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

3 springboot整合kafka

3.1 生产者

创建空的springboot项目

3.1.1 pom.xml引包

注意版本号,引不对会报错的,我就是一直卡在这springboot和kafka的包会有问题,目前这个配置是没有问题的。

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <!--<version>2.2.6.RELEASE</version>-->
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--kafka-clients发送消息所需jar包-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
3.1.2 application.properties配置文件
#============== kafka ===================
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=127.0.0.1:9092

#=============== provider  =======================
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432

#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
spring.kafka.producer.acks=1


server.servlet.context-path=/kafka


# 指定生产者消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

server.port=8090
# 配置kafka的主题
kafka.topis=testTopic
3.1.3 测试

配置好了,我们开始测试吧!,开启消费者窗口

注意:修改为自己代码配置的topic 我的是testTopic

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic testTopic --from-beginning

测试代码

package com.dml.kafka;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = {KafkaApplication.class})
public class ProducerTest {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Test
    public void send() {
        String message = "hello world!";
        String topic = "testTopic";
        kafkaTemplate.send(topic,message);
        System.out.println("发送消息===》" + message);
    }
}

生产者运行结果:
在这里插入图片描述

让我们看看消费者的结果吧:可能会发送完了,没看到结果,回车看看,我的就是回车就看到了:

在这里插入图片描述

3.2 消费者

创建一个新的springboot

3.2.1 pom.xml文件
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

我这里引入了数据库的包了,为后面场景使用

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

<!--jdbc -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!-- mysql 依赖 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.47</version>
</dependency>
3.2.2 application.properties配置文件
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
spring.datasource.username=root
spring.datasource.password=123456

pagehelper.helperDialect=mysql
pagehelper.reasonable=false
pagehelper.supportMethodsArguments=true
pagehelper.params=count=countSql
pagehelper.page-size-zero=true

#============== kafka ===================
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=localhost:9092



#=============== consumer  =======================
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=test-group
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=true
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100

# 指定消费者消息key和消息体的编解码方式
#spring.kafka.consumer.=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.listener.concurrency=3
spring.kafka.consumer.max-poll-records=
server.port=8091

kafka.topis=testTopic
3.2.3 测试
package com.fiberhome.consumer;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * kafka消费者
 * @author mld
 * @date 2020-03-03 22:30:56
 */
@Component
public class ConsumerListener {
    Logger logger = LoggerFactory.getLogger(ConsumerListener.class);
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @KafkaListener(topics = "${kafka.topis}")
    public void onMessage(ConsumerRecord<?, ?> record){
        System.out.println("消息:" + record.value());
	}

}

启动就可以了,如果按照上面的步骤进行操作,你没有挺点之前打开的消费者窗口。窗口和你运行的程序都是会有进行消费的。

如果你希望一个消息只被消费一次,那么你需要设置spring.kafka.consumer.group-id值一直,主题一直,就只会被消费一次了。在同一个组,消息被消费一次,不同的组可以消费多次。

运行结果:

在这里插入图片描述

3.3 使用场景

我这个使用场景是在数据入库前已经入到redis缓存了,所以数据入库不用实时去操作,通过kafka进行消费,减少数据库的压力。所以发送的消息是sql。我这封装了消息对象,可以根据自己的需要进行封装。

注意:消费者和生产者不是同一个项目

3.3.1 生产者
3.3.1.1 封装消息对象
/**
 * kafka发送的消息
 * @author mld
 * @date 2020-03-04 18:22:27
 */
public class Message {
    public Message(String sql, Object[] param, String key) {
        this.sql = sql;
        this.param = param;
    }

    public Message(String sql) {
        this.sql = sql;
    }

    public Message(String sql, Object[] param) {
        this.sql = sql;
        this.param = param;
    }

    // 要给kafka执行的sql
    private String sql;
    // sql的参数
    private Object[] param;
    // key
    private String key;

    public String getSql() {
        return sql;
    }

    public void setSql(String sql) {
        this.sql = sql;
    }

    public Object[] getParam() {
        return param;
    }

    public void setParam(Object[] param) {
        this.param = param;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }
}

3.3.1.2 工具类
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

/**
 * kafka生产者工具类
 */
@Component
public class KafkaProducerUtils {
    Logger logger = LoggerFactory.getLogger(KafkaProducerUtils.class);

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
    @Value("${kafka.topis}")
    private String topic;


    /**
     * 没有参数的sql
     * @param msg sql
     */
    public void sendSqlMessage(Message msg) {
        if (StringUtils.isEmpty(msg.getKey())) {
            kafkaTemplate.send(topic , JSON.toJSONString(msg));
        } else {
            kafkaTemplate.send(topic , JSON.toJSONString(msg),msg.getKey());
        }

        logger.info("发送消息:{}",msg);
    }
}
3.3.1.3 controller
package com.dml.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

/**
 * 使用kafka发送sql消息
 */
@RestController
@RequestMapping("/send")
public class KafkaSenderController {
    @Autowired
    private KafkaProducerUtils kafkaProducerUtils;

    @Value("${kafka.topis}")
    private String topic;


    /**
     * 更新数据
     * @param name 姓名
     * @param idcard 身份证
     */
    @RequestMapping("/sendUpdateMsg")
    public void sendUpdateMsg(String name,String idcard) {
        List<Object> param = new ArrayList<>();
        param.add(name);
        param.add(idcard);
        String sql = "update user set name = ? where idcard = ?";
        Message msg = new Message(sql,param.toArray());
        kafkaProducerUtils.sendSqlMessage(msg);
    }


    /**
     * 插入数据
     * @param name 姓名
     * @param idcard 身份证
     */
    @RequestMapping("/sendInsertMsg")
    public void sendInsertMsg(String name,String idcard) {
        List<Object> param = new ArrayList<>();
        param.add(UUID.randomUUID().toString());
        param.add(name);
        param.add(idcard);
        String sql = "insert into user (id,name,idcard) values(?,?,?)";
        Message msg = new Message(sql,param.toArray());
        kafkaProducerUtils.sendSqlMessage(msg);
    }
}

3.3.2 消费者
package com.fiberhome.consumer;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

/**
 * kafka消费者
 * @author mld
 * @date 2020-03-03 22:30:56
 */
@Component
public class ConsumerListener {
    Logger logger = LoggerFactory.getLogger(ConsumerListener.class);
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @KafkaListener(topics = "${kafka.topis}")
    public void onMessage(ConsumerRecord<?, ?> record){
        if (record.value() != null) {
            String jsonStr = String.valueOf(record.value());
            Message msg = JSON.parseObject(jsonStr, Message.class);
            int result;
            if (!StringUtils.isEmpty(msg.getSql())) {
                if (msg.getParam() != null && msg.getParam().length > 0) {
                    result = jdbcTemplate.update(msg.getSql(), msg.getParam());
                } else {
                    result = jdbcTemplate.update(msg.getSql());
                }
                logger.info("更新数据:{}条,执行sql:{},参数:{}",result,msg.getSql(),msg.getParam());
            } else {
                logger.info("执行sql为null:{}",record);
            }
        } else {
            logger.info("消息为null:{}",record);
        }
    }

}
3.3.3 运行结果

在这里插入图片描述

好的,你已经学会使用kafka了

4 可视化工具

下载地址:添加链接描述

安装后运行,配置kafka的zookeeper地址,我这是本地的:

在这里插入图片描述

配置主题的信息类型,不然你会看到一串数字而已:

在这里插入图片描述

配置好了,我们可以查看数据了:

在这里插入图片描述
感谢阅读~


版权声明:本文为qq_35638837原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。