spring-boot集成kafka(包含zookeeper和kafka的下载介绍)

目录

1:zookeeper和kafka的下载介绍:

2:zookeeper和kafka的启动

zookeeper的启动

kafka的启动:(以创建三个broker为例子)

3:使用spring-boot集成卡夫卡

创建一个kafka项目:

把项目文件创建成这个样子:

首先在pom.xml文件中引入依赖:

在appliciation.xml中配置:(xml的文件后缀需要自己手动更改)

创建生产者

 创建消费者

 Application


1:zookeeper和kafka的下载介绍:

关于zookeeper和卡夫卡的下载介绍,我在弄这些东西的时候也搞了好久,在网上也翻了很多文章,从中我强力推荐一篇文章来供大家来参考。注意:看这个教程的时候要注意zookeeper和kafka的版本号我建议和文章中的版本号一定要一致!!

文章链接:https://www.cnblogs.com/chenwolong/p/kafka.html

2:zookeeper和kafka的启动

配置好zookeeper和kafka之后,我们先启动zookeeper

zookeeper的启动

打开zookeeper所在的bin文件的cmd命令行,输入:

zkServer.cmd

kafka的启动:(以创建三个broker为例子)

在D:\kafka\kafka_2.12-2.7.0\bin\windows路径中启动cmd命令行

在命令行中输入

kafka-server-start.bat ..\..\config\server.properties

运行之后,如法炮制再在D:\kafka\kafka_2.12-2.7.0\bin\windows路径下再打开一个cmd窗口输入

kafka-server-start.bat ..\..\config\server1.properties

server2.properties也是如此。关于server.properties,server1.properties,server2.properties

中的配置,可以看我在《在windows环境下搭建kafka集群》的设置方式。

3:使用spring-boot集成卡夫卡

在使用spring-boot集成的卡夫卡之前,我们一定要启动zookeeper和三个kafka-broker的启动!!!!!!!我发现好多文章都没有写,这对新手来讲可能不太友好,所以在这里提醒一下。

创建一个kafka项目:

custom

http://start.springboot.io

把项目文件创建成这个样子:

首先在pom.xml文件中引入依赖:

<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
</dependency>

在appliciation.xml中配置:(xml的文件后缀需要自己手动更改)

spring:
  kafka:
    bootstrap-servers: 10.12.37.140:9092,10.12.37.140:9093,10.12.37.140:9094 # 集群的地址
    producer:
      retries: 3   #设置大于0的值,则客户端会将发送失败的记录重新发送。
      batch-size: 16384 #16KB
      buffer-memory: 33554432  #32MB
      acks: 1  #指定消息key和消息体的编码方式。
      key-serializer: org.apache.kafka.common.serialization.StringSerializer  #键的序列化
      value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化
    consumer:
      group-id: default-group
      enable-auto-commit: false  # 手动提交
      auto-offset-reset: earliest   #当默认的消费组启动的时候,会从默认的第一个消费组开始消费。
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 500  #一次最多拉500条消息。

    listener:
      ack-mode: MANUAL_IMMEDIATE
  redis:
    host: 10.12.37.140

 

创建生产者

package com.qf.kafka.spring.boot.demo.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/msg")
public class MyKafkaController {

  private final static String TOPIC_NAME = "my-replicated-topic";

  @Autowired
  private KafkaTemplate<String,String> kafkaTemplate;

  @RequestMapping("/send")
  public String sendMessage(){

    kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!");

    return "send success!";

  }




}

 创建消费者

package com.qf.kafka.spring.boot.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class MyConsumer {

  @KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")
  public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
    String value = record.value();
    System.out.println(value);
    System.out.println(record);
    //手动提交offset
    ack.acknowledge();
  }

 Application

package com.qf.kafka.spring.boot.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaSpringBootDemoApplication {

  public static void main(String[] args) {
    SpringApplication.run(KafkaSpringBootDemoApplication.class, args);
  }

}

上面的代码都贴上去之后,点击运行KafkaspringBoottestApplication

然后在打开一个浏览器:输入

http://localhost:8080/msg/send

 网页端显示:

控制台显示:

 

 


 


版权声明:本文为m0_48339508原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。