Spring Boot整合RabbitMQ

消息队列在项目中的运用是越来越广泛,上篇博客 初探消息队列RabbitMQ 安装上了RabbitMQ,实现了消息队列的生产和消费,那如何与Spring Boot进行整合呢,查阅些资料,记录下来练习的过程,方便备查。

知识学习与回顾

RabbitMQ有6种消息模型,但第6种属于RPC,因此只需要学习五种。其中第3、4、5种都属于是订阅模型,区别在于他们选择的路由方式不同。下图对应的是这几种消息模型。
在这里插入图片描述
运用Spring Boot进行RabbitMQ的整合,主要通过以下几个步骤。

1、新建项目并引入RabbitMQ依赖

方法一、直接在pom.xml文件中添加以下配置信息

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

方法二、项目构建阶段输入 spring for RabbitMQ,即可自动生成依赖。
在这里插入图片描述
在application.yml文件中添加配置

spring:
  application:
    name : rabbitmq-springboot
  rabbitmq:
    host : 192.168.30.130
    port : 5672
    username : admin
    password : admin
    virtual : /
2、新建测试类

新建测试类SpringbootApplicationTests.java,作为生产者来生产消息。
1) 第一种模型 hello word
生产者模型需要注入 RabbitTemplate 进行消息的生产,hello word 模型的生产者生产消息的程序。

@SpringBootTest(classes = SpringbootApplication.class)
@RunWith(SpringRunner.class)
public class SpringbootApplicationTests {
  // 注入rabbitmqTemplate,来进行消息的发送
  @Autowired
  private RabbitTemplate rabbitTemplate;
  @Test
    public void testHello(){
        // 参数1 队列的名称
        // 参数2 队列的内容
        rabbitTemplate.convertAndSend("hello","hello word 模型");
        System.out.println("hello");
    }
}

消费者进行消息的消费用到 RabbitListener 来监听指定的消息队列。以下是第一种消费者消费的程序。

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// 保证被工厂扫描到,默认是持久化,非独占
@Component
// 添加监听 消费者
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class Consumer {
    @RabbitHandler
    public void receirve (String message){
        System.out.println("message "+ message);
    }
}

hello word 模型 针对的是一对一的生产者和消费者的关系,但在实际的生产中,可能存在多个消费者都要进行消息的消费。

2)第二种模型 work模型
work 模型是对第一种消息模型的改进,多个消费者可以对消息进行消费。生产者的程序和第一种类似,绑定消息队列,代码如下所示。

@SpringBootTest(classes = SpringbootApplication.class)
@RunWith(SpringRunner.class)
public class SpringbootApplicationTests {

    // 注入rabbitmqTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // work 模型
   @Test
    public void testWork(){
        rabbitTemplate.convertAndSend("work","work模型");
    }
}

消费者进行消息的消费,代码如下所示。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ConsumerWork {
    // 消费者1
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receiveOne(String message){
        for (int i = 0; i < 10; i++) {
            System.out.println("message one 消息 >>>> "+ message + i);
        }
    }

    // 消费者2
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receiveTwo(String message){
        System.out.println("message two 消息 >>>> "+ message);
    }
}

消费者对消息进行消费,结果各不相同,实现了任务的分发。
由此可以发现个问题,消费者处理消息的时间是不一样的,平均分发可能导致消费快的在等待,应该是将消费快的尽快消费,完成消息任务。接下来引出第三种消息模型。

3)第三种模型 订阅模型
第三种消息模型对上一种模型的改进,对于生产者的程序基本相同,只是更改了消息队列的名称。以

@SpringBootTest(classes = SpringbootApplication.class)
@RunWith(SpringRunner.class)
public class SpringbootApplicationTests {

    // 注入rabbitmqTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testTopic(){
        rabbitTemplate.convertAndSend("topics","user.save","user.save");
    }
}

消费者对消息进行消费,设置绑定消息队列的名称,交换机的名称,指定交换机代码如下所示。

@Component
public class FanoutConsumer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,  //创建临时队列
                    exchange = @Exchange(value = "logs",type = "fanout") //绑定交换机
            )
    })
    public void receivedOne(String message){
        System.out.println("message one 消息 >>>>> " + message);
    }

    // 消费者2
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,  //创建临时队列
                    exchange = @Exchange(value = "logs",type = "fanout") //绑定交换机
            )
    })
    public void receivedtwo(String message){
        System.out.println("message two 消息 >>>>> " + message);
    }
}

4)第四种消息模型 Route模型
第四种消费类型是路由转发的原理,对消息进行转发处理,满足规则的进行消费的处理,不满足的可以不进行操作。生产者进行消息生产的代码如下所示。

@SpringBootTest(classes = SpringbootApplication.class)
@RunWith(SpringRunner.class)
public class SpringbootApplicationTests {

    // 注入rabbitmqTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 路由模型
    @Test
    public void testRoute(){
        rabbitTemplate.convertAndSend("directs","info","发送info的key的路由信息");
    }
}

消费者在绑定消息队列、交换机时,需要定义好key的范围,满足条件进行消费。

@Component
public class RouteConsumer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "directs",type = "direct"),
                    key = {"info","error","warn"}
            )
    })
    public void receiverOne(String message){
        System.out.println("message one >>>> "+message);
    }

    // 消费者2
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "directs",type = "direct"),
                    key = {"info","error","warn"}
            )
    })
    public void receiverTwo(String message){
        System.out.println("message two >>>> "+ message);
    }
}

5)第五种消息模型 Fanout 模型
第五种消息类型是对上一种的改进,绑定交换机时定义的key的范围是有限的,要进行范围的划定,这里需要进行匹配。

@SpringBootTest(classes = SpringbootApplication.class)
@RunWith(SpringRunner.class)
public class SpringbootApplicationTests {

    // 注入rabbitmqTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // Fanout 广播消息
    @Test
    public void testFanout(){
        rabbitTemplate.convertAndSend("logs","Fanous模型发送的数据");
    }
}

消费者的匹配规则需要进行提前定义,满足规则的都是可以进行消费。

@Component
public class TopicConsumer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(type = "topics",name = "topic"),
                    key = {"user.save","user.*"}
            )
    })
    public void receivedOne(String message){
        System.out.println("message one >>>>>> "+message);
    }

    // 消费者2
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(type = "topics",name = "topic"),
                    key = {"user.save","order.*"}
            )
    })
    public void receivedTwo(String message){
        System.out.println("message two >>>>>> "+message);
    }
}

* 代码全部,满足order开头的消息都是可以进行处理。

参考博客

【1】https://www.cnblogs.com/ifme/p/12024064.html