springboot 整合RabbitMQ

一.简介

AMQP(advanced message queuing protocol,高级消息队列协议) 是一个线路层的协议规范,类似于SMTP、HTTP 协议。

RabbitMQ 是一个实现了AMQP 的开源消息中间件,使用Erlang编写,故需要Erlang环境。

架构

在这里插入图片描述

常用命令

启动rabbit服务:service rabbitmq-server start

停止rabbit服务:service rabbitmq-server stop

后台启动:rabbitmq-server -detached

运行状态:rabbitmqctl status

查看所有用户:rabbitmqctl list_users

添加用户:rabbitmqctl add_user username password

删除用户:rabbitmqctl delete_user username

修改密码:rabbitmqctl change_password username newpassword

查看已经安装的插件:rabbitmq-plugins list

开启网页版控制台:rabbitmq-plugins enable rabbitmq_management

访问网页地址:http://localhost:15672/ 默认账号:guest/guest

二.流程

1.安装好RabbitMQ 也有类似Active MQ 的web 管理后台,非docker 安装需要主动开启:

rabbitmq-plugins enable rabbitmq_management

docker 环境安装已经有相关默认配置:

docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management 

rabbitmq 启动成功后有一个默认的guset/guest 用户。访问localhost:15672即可进入登录页面:
在这里插入图片描述

在这里插入图片描述

2.pom.xml 引入spring-boot-starter-amqp:

<dependency>
  <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.application.yml 配置rabbitmq 基本信息:

spring:
  rabbitmq:
    host: localhost

其实springboot 已经有了默认配置:
在这里插入图片描述

4.Exchange
rabbitmq 中所有消息生产者提交的消息都会交由Exchange 进行在分配,Exchange 根据不同的策略将消息发布到不同的Queue中。RabbitMQ 中有4中Exchange策略:Direct、Fanout、Topic、Header。Direct、Topic 在发送消息时都需要指定routing key,Exchange 收到消息后会根据routing key 把消息复制到相应的Queue。每个Queue中的消息都只会被一个消费者消费一次(多个消费者消费同一个Queue中的消息时只有一个消费者能接受到消息)。

(1)DirectExchange策略是将消息队列绑定到一个DirectExchange上,当一条消息到达DirectExchange 时会根据routing key 路由到Queue,如果有多个Queue都符合路由规则,则把消息都复制到多个Queue。

RabbitmqDirectConfig.java

package com.vincent.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqDirectConfig {
    public static final String DIRECT_EXCHANGE_NAME = "direct-exchange-name";

    @Bean
    Queue directQueue(){
        Queue queue = new Queue("com.vincent.direct-queue",true,false);
        return queue;
    }

    @Bean
    DirectExchange directExchange(){
        DirectExchange exchange = new DirectExchange(DIRECT_EXCHANGE_NAME);
        return exchange;
    }

    @Bean
    Binding bindQueue(){
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("com.vincent.direct-queue");
    }
}

配置DirectExchange策略:
new Queue(“com.vincent.direct-queue”,true,false) ,第一个参数配置队列的名称;第2个参数标记队列是否永久,即重启后队列依然存在;第3个参数标识队列长久未使用时是否删除队列。

在这里插入图片描述

消息发送和接受

package com.vincent.controller;


import com.vincent.config.RabbitmqDirectConfig;
import com.vincent.model.R;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;




@RestController
public class HelloController {

    @Autowired
    private RabbitTemplate template;



    @RequestMapping("/test")
    public R test(String name){
        R r = R.ok(name);
        template.convertAndSend(RabbitmqDirectConfig.DIRECT_EXCHANGE_NAME,"com.vincent.direct-queue",r);
        return r;
    }

    @RabbitListener(queues = "com.vincent.direct-queue")
    public void directListener(R r){
        System.out.println(r);
    }
}

RabbitMQ 提供了RabbitTemplate 模板发送消息,convertAndSend 方法有许多重载的方法,常用的有指定Exchange名称、routing key、消息对象作为参数发送消息。@RabbitListener 注解方法接受队列消息,方法参数直接为发送的消息对象。

运行结果:
访问: http://localhost:8080/test?name=vincent
在这里插入图片描述

(2)FanoutExchange 策略是把所有到达FanoutExchange 的消息转发给所有与它绑定的Queue,这种策略中routing key 将不起作用。

RabbitmqFanoutConfig.java

package com.vincent.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqFanoutConfig {
    public static final String FANTOU_EXCHANGE_NAME = "fanout-exchange-name";

    @Bean
    Queue fanoutQueue(){
        Queue queue = new Queue("com.vincent.fanout-queue");
        return queue;
    }

    @Bean
    Queue fanoutQueue2(){
        Queue queue = new Queue("com.vincent.fanout-queue2");
        return queue;
    }

    @Bean
    FanoutExchange fanoutExchange(){
        FanoutExchange exchange = new FanoutExchange(FANTOU_EXCHANGE_NAME);
        return exchange;
    }

    @Bean
    Binding bindQueue(){
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
    }

    @Bean
    Binding bindQueue2(){
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }
}

提供了2个Banding对象使2个Queue 和FanoutExchange 对象绑定,即在FanoutExchange 接受到消息后会把消息复制到绑定的2个Queue上。

HelloController.java

package com.vincent.controller;


import com.vincent.config.RabbitmqDirectConfig;
import com.vincent.config.RabbitmqFanoutConfig;
import com.vincent.model.R;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;




@RestController
public class HelloController {

    @Autowired
    private RabbitTemplate template;



    @RequestMapping("/test")
    public R test(String name){
        R r = R.ok(name);
        template.convertAndSend(RabbitmqDirectConfig.DIRECT_EXCHANGE_NAME,"com.vincent.direct-queue",r);
        template.convertAndSend(RabbitmqFanoutConfig.FANTOU_EXCHANGE_NAME,null,r);
        return r;
    }

    @RabbitListener(queues = "com.vincent.direct-queue")
    public void directListener(R r){
        System.out.println(r);
    }

    @RabbitListener(queues = "com.vincent.fanout-queue")
    public void fanoutListener(R r){
        System.out.println("1---------"+r);
    }

    @RabbitListener(queues = "com.vincent.fanout-queue2")
    public void fanoutListener2(R r){
        System.out.println("2---------"+r);
    }
}

运行效果:
在这里插入图片描述

(3)TopicExchange 策略是通过routing key 绑定到TopicExchange,当消息到达TopicExchange时根据routing key 把消息路由到一个或多个Queue上。Banding 对象需要指定一个banding key,单词之间使用"." 作为分隔。

官网说明如下:

在这里插入图片描述
在这里插入图片描述

RabbitmqTopicConfig.java

package com.vincent.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqTopicConfig {
    public static final String TOPIC_EXCHANGE_NAME = "topic-exchange-name";

    @Bean
    Queue topicQueue(){
        Queue queue = new Queue("com.vincent.topic-queue-1");
        return queue;
    }

    @Bean
    Queue topicQueue2(){
        Queue queue = new Queue("com.vincent.topic-queue-2");
        return queue;
    }

    @Bean
    Queue topicQueue3(){
        Queue queue = new Queue("com.vincent.topic-queue-3");
        return queue;
    }


    @Bean
    TopicExchange topicExchange(){
        TopicExchange exchange = new TopicExchange(TOPIC_EXCHANGE_NAME);
        return exchange;
    }


    @Bean
    Binding bindQueue(){
        return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("phone.#");
    }


    @Bean
    Binding bindQueue2(){
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("#.phone");
    }

    @Bean
    Binding bindQueue3(){
        return BindingBuilder.bind(topicQueue3()).to(topicExchange()).with("#.phone.#");
    }
}

HelloController.java

package com.vincent.controller;


import com.vincent.config.RabbitmqDirectConfig;
import com.vincent.config.RabbitmqFanoutConfig;
import com.vincent.config.RabbitmqTopicConfig;
import com.vincent.model.R;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;




@RestController
public class HelloController {

    @Autowired
    private RabbitTemplate template;



    @RequestMapping("/test")
    public R test(String name){
        R r = R.ok(name);
        template.convertAndSend(RabbitmqTopicConfig.TOPIC_EXCHANGE_NAME,"phone.小米","小米发布新手机了");
        template.convertAndSend(RabbitmqTopicConfig.TOPIC_EXCHANGE_NAME,"苹果.phone","苹果手机");
        template.convertAndSend(RabbitmqTopicConfig.TOPIC_EXCHANGE_NAME,"product.phone.test","product.phone.test");

        return r;
    }

    @RabbitListener(queues = "com.vincent.topic-queue-1")
    public void topicListener1(Object r){
        System.out.println("topic---------"+r);
    }

    @RabbitListener(queues = "com.vincent.topic-queue-2")
    public void topicListener2(Object r){
        System.out.println("topic-1---------"+r);
    }

    @RabbitListener(queues = "com.vincent.topic-queue-3")
    public void topicListener3(Object r){
        System.out.println("topic-3---------"+r);
    }
}

运行效果:

在这里插入图片描述

(4)HeadersExchange 策略根据消息的Header 将消息路由到不同的Queue 上,这种策略与routing key 无关。

RabbitmqHeadersConfig.java

package com.vincent.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;




@Configuration
public class RabbitmqHeadersConfig {
    public static final String HEADERS_EXCHANGE_NAME = "header-exchange-name";

    @Bean
    Queue headerQueue(){
        Queue queue = new Queue("com.vincent.header-queue");
        return queue;
    }

    @Bean
    Queue headerQueue2(){
        Queue queue = new Queue("com.vincent.header-queue2");
        return queue;
    }

    @Bean
    HeadersExchange headerExchange(){
        HeadersExchange exchange = new HeadersExchange(HEADERS_EXCHANGE_NAME);
        return exchange;
    }

    @Bean
    Binding bindQueue(){
        return BindingBuilder.bind(headerQueue()).to(headerExchange()).where("name").exists();
    }

    @Bean
    Binding bindQueue2(){
        return BindingBuilder.bind(headerQueue2()).to(headerExchange()).whereAny("age").exist();
    }
}

HelloController.java

package com.vincent.controller;


import com.vincent.config.RabbitmqDirectConfig;
import com.vincent.config.RabbitmqFanoutConfig;
import com.vincent.config.RabbitmqHeadersConfig;
import com.vincent.config.RabbitmqTopicConfig;
import com.vincent.model.R;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;



@RestController
public class HelloController {

    @Autowired
    private RabbitTemplate template;



    @RequestMapping("/test")
    public R test(String name){
        R r = R.ok(name);
     	template.convertAndSend(RabbitmqHeadersConfig.HEADERS_EXCHANGE_NAME,null, MessageBuilder.withBody("hello amqp".getBytes()).setHeader("name","vincent").build());
        template.convertAndSend(RabbitmqHeadersConfig.HEADERS_EXCHANGE_NAME,null, MessageBuilder.withBody("hello amqp".getBytes()).setHeader("age",27).build());

        return r;
    }

    @RabbitListener(queues = "com.vincent.header-queue")
    public void headerListener(Message r){
        System.out.println("header---------"+r);
    }

    @RabbitListener(queues = "com.vincent.header-queue2")
    public void headerListener2(Message r){
        System.out.println("header2---------"+r);
    }
}

运行效果:
在这里插入图片描述

三.消息投递可靠性

为确保消息投递可靠性需要先为消息设置状态(投递中、消息已确认、已消费)并保存到数据库;收到消息投递确认信息修改数据库数据状态;消费者拿到消息后和数据库数据对比数据状态,只消费状态已确认的消息。

1.application.yml 开启消息确认机制:

spring.rabbitmq.publisher-confirms: true

2.设置RabbitTemplate消息确认回调:

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
      	@Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if(ack){//消息确认成功
                //CorrelationData为消息生产者投递消息时携带的额外数据,作为消息确认的回调参数,其不经过网络。从CorrelationData获取消息主键,修改数据库数据状态为已确认

             }else{
                 //对确认失败消息做失败处理
             }
        }
});

3.发送消息时携带上关联数据,

//发送消息时CorrelationData带上消息主键,方便回调时修改数据状态。CorrelationData不会经过网络传输,在生产者消息确认时会带上CorrelationData
String msg = System.currentTimeMillis()+"";
 rabbitTemplate.convertAndSend("exchange","test",msg,new CorrelationData(msg));

四.总结

1.相关镜像使用手册(内部端口、配置文件、数据目录等)可以到docker 搜索:
在这里插入图片描述

2.整合RabbitMQ 时对MQ 的配置需要提供Queue、不同策略的Exchange、Binding 对象。每个Queue 上的消息只能被消费一次,故FanoutExchange 示例中提供了2个Queue 和 FanoutExchange绑定。


版权声明:本文为Zllvincent原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。