SpringBoot整合RabbitMQ之发送接收消息实战

实战前言
前几篇文章中,我们介绍了SpringBoot整合RabbitMQ的配置以及实战了Spring的事件驱动模型,这两篇文章对于我们后续实战RabbitMQ其他知识要点将起到奠基的作用的。特别是Spring的事件驱动模型,当我们全篇实战完毕RabbitMQ并大概了解一下RabbitMQ相关组件的源码时,会发现其中的ApplicationEvent、ApplicationListener、ApplicationEventPublisher跟RabbitMQ的Message、Listener、RabbitTemplate有“异曲同工之妙”

实战概要
从本篇文章将开始采用SpringBoot整合RabbitMQ的方式来实战相关知识要点、企业级应用业务模块以及微服务项目一些典型的问题。
本篇文章将介绍实战RabbitMQ在SpringBoot项目中的基本应用,即如何创建队列、交换机、路由及其绑定以及如何发送接收消息!

实战历程
前几篇文章我们已经实现了如何采用IDEA开发工具实现SpringBoot整合RabbitMQ的配置,其中有一个相当重要的配置类 RabbitmqConfig.java ,我们将在这里创建队列、交换机、路由及其绑定,下面我们就创建一个简单的消息模型吧:DirectExchange+RoutingKey 。以下为创建队列、交换机、路由及其绑定的相关信息。
 

1.maven包导入

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com</groupId>
    <artifactId>springboot-rabbitmq-01</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbitmq-01</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>

        <slf4j-version>1.7.13</slf4j-version>
        <log4j.version>1.2.17</log4j.version>
        <mybatis-spring-boot.version>1.1.1</mybatis-spring-boot.version>
        <mybatis-pagehelper.version>4.1.2</mybatis-pagehelper.version>
        <druid.version>1.0.16</druid.version>
        <mysql.version>5.1.37</mysql.version>
        <okhttp.version>3.1.2</okhttp.version>
        <retrofit.version>2.1.0</retrofit.version>
        <guava.version>19.0</guava.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- log start-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j-version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j-version}</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <!-- log end -->

        <!-- spring-mybatis -->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot</artifactId>
            <version>${mybatis-spring-boot.version}</version>
        </dependency>

        <!-- for page -->
        <dependency>
            <groupId>com.github.pagehelper</groupId>
            <artifactId>pagehelper-spring-boot-starter</artifactId>
            <version>1.2.10</version>
        </dependency>

        <!-- druid -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>${druid.version}</version>
        </dependency>

        <!-- mysql -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>

        <!-- rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>${parent.version}</version>
        </dependency>

        <!-- okhttp -->
        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>${okhttp.version}</version>
        </dependency>

        <dependency>
            <groupId>com.squareup.retrofit2</groupId>
            <artifactId>retrofit</artifactId>
            <version>${retrofit.version}</version>
        </dependency>

       <!-- <dependency>
            <groupId>com.squareup.retrofit2</groupId>
            <artifactId>converter-javkson</artifactId>
            <version>${retrofit.version}</version>
        </dependency>-->

        <!-- guava -->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>${guava.version}</version>
        </dependency>

        <!-- lombok简化java代码 如果没有安装,先安装这个插件-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2.application.properties配置文件中配置的信息

#rabbitmq
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.rabbitmq.listener.simple.concurrency=10
spring.rabbitmq.listener.simple.max-concurrency=20
spring.rabbitmq.listener.direct.prefetch=50

mq.env=local
basic.info.mq.exchang.name=${mq.env}:basic:info:mq:exchang
basic.info.mq.routing.key.name=${mq.env}:basic:info:mq:routing:key
basic.info.mq.queue.name=${mq.env}:basic:info:mq:queue

3.响应请求头编写

package com.response;

import lombok.Data;

/**
 * 相应请求
 * @param <T>
 */
@Data
public class BaseResponse <T>{
    private Integer code;
    private String msg;
    private T data;

    public BaseResponse(StatusCode statusCode ,T data){
        this.code = statusCode.getCode();
        this.msg = statusCode.getMsg();
        this.data = data;
    }

    public BaseResponse(StatusCode statusCode){
        this.code = statusCode.getCode();
        this.msg = statusCode.getMsg();
    }

    public BaseResponse(Integer code,String msg ,T data){
        this.code = code;
        this.msg = msg;
        this.data = data;
    }

    public BaseResponse(Integer code,String msg){
        this.code = code;
        this.msg = msg;
    }
}
package com.response;

/**
 * 响应请求类型
 */
public enum StatusCode {
    Success(0,"成功"),
    Fail(-1,"失败");

    private Integer code;
    private String msg;

    StatusCode(Integer code, String msg){
        this.code = code;
        this.msg = msg;
    }

    public Integer getCode() {
        return code;
    }

    public void setCode(Integer code) {
        this.code = code;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}

4.RabbitmqConfig创建队列、交换机、路由及其绑定

package com.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

@Configuration
public class RabbitmqConfig {

    private static final Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);

    @Autowired
    private Environment env;

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    /**
     * 单一消费者
     * @return
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }

    /**
     * 多个消费者
     * @return
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class));
        factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class));
        factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class));
        return factory;
    }

    //TODO:基本信息模型构建
    // direct交换机
    @Bean
    public DirectExchange basicExchange(){
        return new DirectExchange(env.getProperty("basic.info.mq.exchang.name"),true,false);
    }

    //申明队列
    @Bean(name = "basicQueue")
    public Queue basicQueue(){
        System.out.println("asdadasdad:"+env.getProperty("basic.info.mq.exchang.name"));
        return new Queue(env.getProperty("basic.info.mq.queue.name"), true);
    }

    //绑定队列与交换机
    @Bean
    public Binding basicBinding(){
        return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(env.getProperty("basic.info.mq.routing.key.name"));
    }
}

5.当我们在上面创建好队列、交换机、路由及其绑定后,我们可以先把整个项目跑起来,然后打开http://localhost:15672/  访问RabbitMQ后端控制台,点击 Queues、Exchanges 栏目,即可看到我们创建好的队列、交换机。如下所示

6.消息发送

“对象实体信息”!在这里,我们在Controller执行发送逻辑,其中充当发送消息的组件是RabbitTemplate,充当消息的组件为Message。如下所示RabbitController.java
 

package com.controller;

import com.entity.User;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.response.BaseResponse;
import com.response.StatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;

import java.util.Map;

@RestController
public class RabbitController {
    private static final Logger log= LoggerFactory.getLogger(RabbitController.class);
    private static final String Prefix="rabbit";

    @Autowired
    private Environment env;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ObjectMapper objectMapper;


    /**
     * 发送简单消息
     * @param message
     * @return
     */
    @RequestMapping(value = Prefix+"/simple/message/send",method = RequestMethod.GET)
    public BaseResponse sendSimpleMessage(@RequestParam String message){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            log.info("待发送的消息: {} ",message);

            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            rabbitTemplate.setExchange(env.getProperty("basic.info.mq.exchang.name"));
            rabbitTemplate.setRoutingKey(env.getProperty("basic.info.mq.routing.key.name"));

            Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(message)).build();
            rabbitTemplate.convertAndSend(msg);
        }catch (Exception e){
            log.error("发送简单消息发生异常: ",e.fillInStackTrace());
        }
        return response;
    }


    /**
     * 发送对象消息
     * @param user
     * @return
     */
    @RequestMapping(value = Prefix+"/object/message/send",method = RequestMethod.POST,consumes = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public BaseResponse sendObjectMessage(@RequestBody User user){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            log.info("待发送的消息: {} ",user);

            rabbitTemplate.setExchange(env.getProperty("basic.info.mq.exchange.name"));
            rabbitTemplate.setRoutingKey(env.getProperty("basic.info.mq.routing.key.name"));
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

            Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(user)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                    .build();
            rabbitTemplate.convertAndSend(msg);
        }catch (Exception e){
            log.error("发送对象消息发生异常: ",e.fillInStackTrace());
        }
        return response;
    }

    @RequestMapping(value = Prefix+"/map/message/send",method = RequestMethod.GET)
    public BaseResponse sendMapMessage(){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            Integer id =120;
            String name = "huangan";
            Long longId =12000L;
            Map<String,Object> dataMap = Maps.newHashMap();

            dataMap.put("id",id);
            dataMap.put("name",name);
            dataMap.put("longId",longId);
            log.info("待发送的消息: {} ",dataMap);

            rabbitTemplate.setExchange(env.getProperty("basic.info.mq.exchange.name"));
            rabbitTemplate.setRoutingKey(env.getProperty("basic.info.mq.routing.key.name"));
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

            Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(dataMap)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                    .build();
            rabbitTemplate.convertAndSend(msg);
        }catch (Exception e){
            log.error("发送对象消息发生异常: ",e.fillInStackTrace());
        }
        return response;
    }
}

在上面可以看到我们的发送代码逻辑其实并不复杂,其思路主要是来源于第一阶段的介绍消息模型中的其中一种,如下图所示。即我们是将消息发送到exchange,然后由于exchange与某个routingKey绑定路由到某个队列queue,故而当消息到达exchange后,将自然而然的被路由到指定的queue中,等待被监听消费。

7.消息监听

下面我们需要创建一个listener用于监听消费此队列中的消息。代码逻辑如下CommonListener.java

package com.listener;

import com.entity.User;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class CommonListener {

    private static final Logger log= LoggerFactory.getLogger(CommonListener.class);

    @Autowired
    private Environment env;
    @Autowired
    private ObjectMapper objectMapper;

    /**
     * 监听消费消息
     * @param message
     */
    @RabbitListener(queues ="${basic.info.mq.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeMessage(@Payload byte[] message){
        try {
            //TODO:接收String
            // String result=new String(message,"UTF-8");
            //log.info("接收String消息: {} ",result);

            //TODO:接收对象
            //User user=objectMapper.readValue(message, User.class);
            //log.info("接收对象消息: {} ",user);

            //TODO: 接收map字段信息
            Map<String,Object> resMap = objectMapper.readValue(message,Map.class);
            log.info("接收对象消息: {} ",resMap);
        }catch (Exception e){
            log.error("监听消费消息 发生异常: ",e.fillInStackTrace());
        }
    }
}

我们将整个项目跑起来,然后首先访问 “http://127.0.0.1:9092/mq/rabbit/simple/message/send?message=简单消息模型2”  ,然后即可看到listener接收到改消息,可以在控制台打印输出!

 

 

 

 

 

 

 

 

 

 

 

 


版权声明:本文为weixin_45863786原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。