1、代码结构目录
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.ue</groupId>
<artifactId>ue-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<!--<version>2.1.1.RELEASE</version>-->
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
实体类 Message:
@Data
public class Message {
private Long id; //id
private String msg; //消息
private Date sendTime; //时间戳
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Date getSendTime() {
return sendTime;
}
public void setSendTime(Date sendTime) {
this.sendTime = sendTime;
}
}
生产者 kafkaPProducer:
@Component
@Slf4j
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private Gson gson = new GsonBuilder().create();
//发送消息方法
public void send() {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg(UUID.randomUUID().toString());
message.setSendTime(new Date());
log.info("发送消息--->>>>>> message = {}", gson.toJson(message));
//topic-ideal为主题
kafkaTemplate.send("ttt", gson.toJson(message));
}
}
消费者 KafkaConsumer:
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = {"ttt"},groupId = "test-groupId-1")
public void consumer(ConsumerRecord<?, ?> record){
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =" + record);
log.info("------------------ message =" + message);
}
}
}
application.yml 文件:
server:
port: 9999
spring:
kafka:
bootstrap-servers: http://192.168.0.181:9092,http://192.168.0.182:9092,http://192.168.0.183:9092
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test-groupId-1
auto-commit-interval: 20000
enable-auto-commit: true
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
MesController:
@Controller
public class MesController {
@Autowired
private KafkaProducer kafkaProducer;
@RequestMapping("/testSendMsg")
@ResponseBody
public String testSendMsg(){
this.kafkaProducer.send();
return "success";
}
}
启动:
完成!代码
版权声明:本文为wx2320061619原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。