实战前言
前几篇文章中,我们介绍了SpringBoot整合RabbitMQ的配置以及实战了Spring的事件驱动模型,这两篇文章对于我们后续实战RabbitMQ其他知识要点将起到奠基的作用的。特别是Spring的事件驱动模型,当我们全篇实战完毕RabbitMQ并大概了解一下RabbitMQ相关组件的源码时,会发现其中的ApplicationEvent、ApplicationListener、ApplicationEventPublisher跟RabbitMQ的Message、Listener、RabbitTemplate有“异曲同工之妙”
实战概要
从本篇文章将开始采用SpringBoot整合RabbitMQ的方式来实战相关知识要点、企业级应用业务模块以及微服务项目一些典型的问题。
本篇文章将介绍实战RabbitMQ在SpringBoot项目中的基本应用,即如何创建队列、交换机、路由及其绑定以及如何发送接收消息!
实战历程
前几篇文章我们已经实现了如何采用IDEA开发工具实现SpringBoot整合RabbitMQ的配置,其中有一个相当重要的配置类 RabbitmqConfig.java ,我们将在这里创建队列、交换机、路由及其绑定,下面我们就创建一个简单的消息模型吧:DirectExchange+RoutingKey 。以下为创建队列、交换机、路由及其绑定的相关信息。
1.maven包导入
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com</groupId>
<artifactId>springboot-rabbitmq-01</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq-01</name>
<description>Demo project for Spring Boot</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<slf4j-version>1.7.13</slf4j-version>
<log4j.version>1.2.17</log4j.version>
<mybatis-spring-boot.version>1.1.1</mybatis-spring-boot.version>
<mybatis-pagehelper.version>4.1.2</mybatis-pagehelper.version>
<druid.version>1.0.16</druid.version>
<mysql.version>5.1.37</mysql.version>
<okhttp.version>3.1.2</okhttp.version>
<retrofit.version>2.1.0</retrofit.version>
<guava.version>19.0</guava.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- log start-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j-version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j-version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- log end -->
<!-- spring-mybatis -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot</artifactId>
<version>${mybatis-spring-boot.version}</version>
</dependency>
<!-- for page -->
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
<version>1.2.10</version>
</dependency>
<!-- druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${parent.version}</version>
</dependency>
<!-- okhttp -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
</dependency>
<dependency>
<groupId>com.squareup.retrofit2</groupId>
<artifactId>retrofit</artifactId>
<version>${retrofit.version}</version>
</dependency>
<!-- <dependency>
<groupId>com.squareup.retrofit2</groupId>
<artifactId>converter-javkson</artifactId>
<version>${retrofit.version}</version>
</dependency>-->
<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- lombok简化java代码 如果没有安装,先安装这个插件-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.application.properties配置文件中配置的信息
#rabbitmq
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.simple.concurrency=10
spring.rabbitmq.listener.simple.max-concurrency=20
spring.rabbitmq.listener.direct.prefetch=50
mq.env=local
basic.info.mq.exchang.name=${mq.env}:basic:info:mq:exchang
basic.info.mq.routing.key.name=${mq.env}:basic:info:mq:routing:key
basic.info.mq.queue.name=${mq.env}:basic:info:mq:queue
3.响应请求头编写
package com.response;
import lombok.Data;
/**
* 相应请求
* @param <T>
*/
@Data
public class BaseResponse <T>{
private Integer code;
private String msg;
private T data;
public BaseResponse(StatusCode statusCode ,T data){
this.code = statusCode.getCode();
this.msg = statusCode.getMsg();
this.data = data;
}
public BaseResponse(StatusCode statusCode){
this.code = statusCode.getCode();
this.msg = statusCode.getMsg();
}
public BaseResponse(Integer code,String msg ,T data){
this.code = code;
this.msg = msg;
this.data = data;
}
public BaseResponse(Integer code,String msg){
this.code = code;
this.msg = msg;
}
}
package com.response;
/**
* 响应请求类型
*/
public enum StatusCode {
Success(0,"成功"),
Fail(-1,"失败");
private Integer code;
private String msg;
StatusCode(Integer code, String msg){
this.code = code;
this.msg = msg;
}
public Integer getCode() {
return code;
}
public void setCode(Integer code) {
this.code = code;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
4.RabbitmqConfig创建队列、交换机、路由及其绑定
package com.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
@Configuration
public class RabbitmqConfig {
private static final Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);
@Autowired
private Environment env;
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
/**
* 单一消费者
* @return
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setTxSize(1);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
/**
* 多个消费者
* @return
*/
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class));
factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class));
factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class));
return factory;
}
//TODO:基本信息模型构建
// direct交换机
@Bean
public DirectExchange basicExchange(){
return new DirectExchange(env.getProperty("basic.info.mq.exchang.name"),true,false);
}
//申明队列
@Bean(name = "basicQueue")
public Queue basicQueue(){
System.out.println("asdadasdad:"+env.getProperty("basic.info.mq.exchang.name"));
return new Queue(env.getProperty("basic.info.mq.queue.name"), true);
}
//绑定队列与交换机
@Bean
public Binding basicBinding(){
return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(env.getProperty("basic.info.mq.routing.key.name"));
}
}
5.当我们在上面创建好队列、交换机、路由及其绑定后,我们可以先把整个项目跑起来,然后打开http://localhost:15672/ 访问RabbitMQ后端控制台,点击 Queues、Exchanges 栏目,即可看到我们创建好的队列、交换机。如下所示
6.消息发送
“对象实体信息”!在这里,我们在Controller执行发送逻辑,其中充当发送消息的组件是RabbitTemplate,充当消息的组件为Message。如下所示RabbitController.java
package com.controller;
import com.entity.User;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.response.BaseResponse;
import com.response.StatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
@RestController
public class RabbitController {
private static final Logger log= LoggerFactory.getLogger(RabbitController.class);
private static final String Prefix="rabbit";
@Autowired
private Environment env;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ObjectMapper objectMapper;
/**
* 发送简单消息
* @param message
* @return
*/
@RequestMapping(value = Prefix+"/simple/message/send",method = RequestMethod.GET)
public BaseResponse sendSimpleMessage(@RequestParam String message){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
log.info("待发送的消息: {} ",message);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("basic.info.mq.exchang.name"));
rabbitTemplate.setRoutingKey(env.getProperty("basic.info.mq.routing.key.name"));
Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(message)).build();
rabbitTemplate.convertAndSend(msg);
}catch (Exception e){
log.error("发送简单消息发生异常: ",e.fillInStackTrace());
}
return response;
}
/**
* 发送对象消息
* @param user
* @return
*/
@RequestMapping(value = Prefix+"/object/message/send",method = RequestMethod.POST,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
public BaseResponse sendObjectMessage(@RequestBody User user){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
log.info("待发送的消息: {} ",user);
rabbitTemplate.setExchange(env.getProperty("basic.info.mq.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("basic.info.mq.routing.key.name"));
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(user)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
rabbitTemplate.convertAndSend(msg);
}catch (Exception e){
log.error("发送对象消息发生异常: ",e.fillInStackTrace());
}
return response;
}
@RequestMapping(value = Prefix+"/map/message/send",method = RequestMethod.GET)
public BaseResponse sendMapMessage(){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
Integer id =120;
String name = "huangan";
Long longId =12000L;
Map<String,Object> dataMap = Maps.newHashMap();
dataMap.put("id",id);
dataMap.put("name",name);
dataMap.put("longId",longId);
log.info("待发送的消息: {} ",dataMap);
rabbitTemplate.setExchange(env.getProperty("basic.info.mq.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("basic.info.mq.routing.key.name"));
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(dataMap)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
rabbitTemplate.convertAndSend(msg);
}catch (Exception e){
log.error("发送对象消息发生异常: ",e.fillInStackTrace());
}
return response;
}
}
在上面可以看到我们的发送代码逻辑其实并不复杂,其思路主要是来源于第一阶段的介绍消息模型中的其中一种,如下图所示。即我们是将消息发送到exchange,然后由于exchange与某个routingKey绑定路由到某个队列queue,故而当消息到达exchange后,将自然而然的被路由到指定的queue中,等待被监听消费。
7.消息监听
下面我们需要创建一个listener用于监听消费此队列中的消息。代码逻辑如下CommonListener.java
package com.listener;
import com.entity.User;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class CommonListener {
private static final Logger log= LoggerFactory.getLogger(CommonListener.class);
@Autowired
private Environment env;
@Autowired
private ObjectMapper objectMapper;
/**
* 监听消费消息
* @param message
*/
@RabbitListener(queues ="${basic.info.mq.queue.name}",containerFactory = "singleListenerContainer")
public void consumeMessage(@Payload byte[] message){
try {
//TODO:接收String
// String result=new String(message,"UTF-8");
//log.info("接收String消息: {} ",result);
//TODO:接收对象
//User user=objectMapper.readValue(message, User.class);
//log.info("接收对象消息: {} ",user);
//TODO: 接收map字段信息
Map<String,Object> resMap = objectMapper.readValue(message,Map.class);
log.info("接收对象消息: {} ",resMap);
}catch (Exception e){
log.error("监听消费消息 发生异常: ",e.fillInStackTrace());
}
}
}
我们将整个项目跑起来,然后首先访问 “http://127.0.0.1:9092/mq/rabbit/simple/message/send?message=简单消息模型2” ,然后即可看到listener接收到改消息,可以在控制台打印输出!