RabbitMQ(七)【SpringBoot案例】

七、RabbitMQ - SpringBoot案例


上一篇文章基础入门案例

整体核心

在这里插入图片描述

7.1 fanout 模式

生产者:交换机绑定队列

在这里插入图片描述

  1. 创建一个 springboot 项目springboot-order-rabbitmq-producer

在这里插入图片描述

  1. 配置文件application.yml
# 服务端口
server:
  port: 8080

# 配置rabbitmq
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 192.168.159.100
    port: 5672
  1. 添加依赖
    <dependencies>
        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
  1. 编写一个OrderServiceImpl.java
package com.vinjcent.rabbitmq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@SuppressWarnings("all")
@Service
public class OrderServiceImpl {

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void createOrderFanout(){
        // 1.根据商品id查询库存是否充足
        // 2.保存订单
        String orderId = UUID.randomUUID().toString();
        // 3.通过MQ来完成消息的分发
        // 参数1: 交换机   参数2: 路由key/queue队列名称   参数3: 消息内容
        String exchangeName = "fanout_order_exchange";
        String routingKey = "";
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);

    }


}
  1. 编写FanoutRabbitMQConfiguration.java
package com.vinjcent.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutRabbitMQConfiguration {

    // 1.声明注册fanout模式交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout_order_exchange",true,false);
    }


    // 2.声明队列 account.fanout.queue、express.fanout.queue、sms.fanout.queue
    @Bean
    public Queue fanout_accountQueue(){
        return new Queue("account.fanout.queue",true);
    }
    @Bean
    public Queue fanout_expressQueue(){
        return new Queue("express.fanout.queue",true);
    }
    @Bean
    public Queue fanout_smsQueue(){
        return new Queue("sms.fanout.queue",true);
    }
    // 3.完成绑定关系(队列)
    @Bean
    public Binding fanout_accountBinding(){
        return BindingBuilder.bind(fanout_accountQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding fanout_expressBinding(){
        return BindingBuilder.bind(fanout_expressQueue()).to(fanoutExchange());
    }
    @Bean
    public Binding fanout_smsBinding(){
        return BindingBuilder.bind(fanout_smsQueue()).to(fanoutExchange());
    }

}

  1. 在web界面中将队列 queue 和交换机 exchange 清空

在这里插入图片描述

在这里插入图片描述

  1. 运行测试
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {

    @Autowired
    OrderServiceImpl orderService;

    @Test
    void contextLoads() {
        orderService.createOrderFanout("1","1",12);
    }

}
  1. 观察web界面中的消息队列和交换机

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

消费者接收消息

  1. 创建一个 springboot 项目springboot-order-rabbitmq-consumer

在这里插入图片描述

  1. 配置文件
# 应用服务 WEB 访问端口
server:
  port: 80

# 配置rabbitmq
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 192.168.159.100
    port: 5672
  application:
    # 应用名称
    name: springboot-order-rabbitmq-consumer
  1. service包下添加一个fanout包,添加以下类,注入Spring容器中

(1)FanoutAccountConsumer.java监听队列account.fanout.queueexpress.fanout.queuesms.fanout.queue

package com.vinjcent.rabbitmq.service.fanout;


import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(queues = {"account.fanout.queue"})
public class FanoutAccountConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("account.fanout.queue===>" + message);
    }

}

(2)FanoutExpressConsumer.java

package com.vinjcent.rabbitmq.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(queues = {"express.fanout.queue"})
public class FanoutExpressConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("express.fanout.queue===>" + message);
    }

}

(3)FanoutSMSConsumer.java

package com.vinjcent.rabbitmq.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(queues = {"sms.fanout.queue"})
public class FanoutSMSConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("sms.fanout.queue===>" + message);
    }

}
  1. 运行springboot-order-rabbitmq-consumer工程

  2. 查看控制台打印结果

在这里插入图片描述

在这里插入图片描述

7.2 direct 模式

1)生产者:交换机绑定队列

  1. springboot-order-rabbitmq-producer工程的service包下OrderServiceImpl.java添加如下;并配置一个DirectRabbitMQConfiguration.java

OrderServiceImpl.java

package com.vinjcent.rabbitmq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@SuppressWarnings("all")
@Service
public class OrderServiceImpl {

    @Autowired
    RabbitTemplate rabbitTemplate;

    // fanout模式...


    // direct模式
    public void createOrderDirect(String userId, String productId,int num){
        // 1.根据商品id查询库存是否充足
        // 2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功: " + orderId);
        // 3.通过MQ来完成消息的分发
        // 参数1: 交换机   参数2: 路由key/queue队列名称   参数3: 消息内容
        String exchangeName = "direct_order_exchange";
        String routingKey = "";
        // 根据路由推送消息
        rabbitTemplate.convertAndSend(exchangeName, "account", orderId);
        rabbitTemplate.convertAndSend(exchangeName, "express", orderId);

    }


}

DirectRabbitMQConfiguration.java

package com.vinjcent.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectRabbitMQConfiguration {


    // 1.声明注册fanout模式交换机
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("direct_order_exchange",true,false);
    }


    // 2.声明队列 account.direct.queue、express.direct.queue、sms.direct.queue
    @Bean
    public Queue direct_accountQueue(){
        return new Queue("account.direct.queue",true);
    }
    @Bean
    public Queue direct_expressQueue(){
        return new Queue("express.direct.queue",true);
    }
    @Bean
    public Queue direct_smsQueue(){
        return new Queue("sms.direct.queue",true);
    }
    // 3.完成绑定关系(队列)
    // direct模式比fanout模式多了一个路由key
    @Bean
    public Binding direct_accountBinding(){
        return BindingBuilder.bind(direct_accountQueue()).to(directExchange()).with("account");
    }
    @Bean
    public Binding direct_expressBinding(){
        return BindingBuilder.bind(direct_expressQueue()).to(directExchange()).with("express");
    }
    @Bean
    public Binding direct_smsBinding(){
        return BindingBuilder.bind(direct_smsQueue()).to(directExchange()).with("sms");
    }

}
  1. 测试类
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {

    @Autowired
    OrderServiceImpl orderService;

    @Test
    void testFanout() {
        orderService.createOrderFanout("1","1",12);
    }

    // 运行该测试用例
    @Test
    void testDirect() {
        orderService.createOrderDirect("1","1",12);
    }

}
  1. 在web界面观察交换机与队列情况

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

先要启动生产者再自动消费者,不然交换机不存在会报错

2)消费者接收消息

  1. 在项目springboot-order-rabbitmq-consumer工程中的service包下添加一个direct包,添加以下类,注入Spring容器中

(1)DirectAccountConsumer.java监听队列account.direct.queue

@Service
@RabbitListener(queues = {"account.direct.queue"})
public class DirectAccountConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("account.direct.queue===>" + message);
    }

}

(2)DirectExpressConsumer.java监听队列express.direct.queue

@Service
@RabbitListener(queues = {"express.direct.queue"})
public class DirectExpressConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("express.direct.queue===>" + message);
    }

}

(3)DirectSMSConsumer.java监听队列sms.direct.queue

@Service
@RabbitListener(queues = {"sms.direct.queue"})
public class DirectSMSConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("sms.direct.queue===>" + message);
    }

}
  1. 运行springboot-order-rabbitmq-consumer

  2. 查看控制台打印结果

在这里插入图片描述

7.3 topic 模式

使用注解方式实现绑定

消费者

  1. springboot-order-rabbitmq-consumer工程下的service包下添加一个topic包,添加以下类,注入Spring容器中

(1)TopicAccountConsumer.java

package com.vinjcent.rabbitmq.service.topic;


import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "account.topic.queue", durable = "true", autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
        key = "#.account.#"
))
public class TopicAccountConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("account.topic.queue===>" + message);
    }

}

(2)TopicExpressConsumer.java

package com.vinjcent.rabbitmq.service.topic;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "express.topic.queue", durable = "true", autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
        key = "*.express.#"
))
public class TopicExpressConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("express.topic.queue===>" + message);
    }

}

(3)TopicSMSConsumer.java

package com.vinjcent.rabbitmq.service.topic;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "sms.topic.queue", durable = "true", autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
        key = "sms.#"
))public class TopicSMSConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("sms.topic.queue===>" + message);
    }

}
  1. 启动springboot-order-rabbitmq-consumer
  2. 查看web界面

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

推荐使用配置类的方式实现交换机与队列的绑定,以及路由key规则

生产者

  1. springboot-order-rabbitmq-producer工程下,service包下添加以下内容
package com.vinjcent.rabbitmq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

@SuppressWarnings("all")
@Service
public class OrderServiceImpl {

    @Autowired
    RabbitTemplate rabbitTemplate;

    // fanout模式
    public void createOrderFanout(String userId, String productId,int num){
        //......
    }

    // direct模式
    public void createOrderDirect(String userId, String productId,int num){
        //......
    }

    // topic
    public void createOrderTopic(String userId, String productId,int num){
        // 1.根据商品id查询库存是否充足
        // 2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生产成功: " + orderId);
        // 3.通过MQ来完成消息的分发
        // 参数1: 交换机   参数2: 路由key/queue队列名称   参数3: 消息内容
        String exchangeName = "topic_order_exchange";
        // 路由给三个消息队列推送消息
        String routingKey = "sms.express.account.xxxxx";

        /*
         *  #.account.#
         *  *.express.#
         *  sms.#
         */
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);

    }


}
  1. 运行测试用例,生产消息
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {

    @Autowired
    OrderServiceImpl orderService;

    @Test
    void contextLoads() {
        orderService.createOrderFanout("1","1",12);
    }

    @Test
    void testDirect() {
        orderService.createOrderDirect("1","1",12);
    }

    // 运行该测试用例
    @Test
    void testTopic() {
        orderService.createOrderTopic("1","1",12);
    }
}

在这里插入图片描述

下一篇文章RabbitMQ高级 - 过期时间 TTL


版权声明:本文为Wei_Naijia原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。