一.简介
AMQP(advanced message queuing protocol,高级消息队列协议) 是一个线路层的协议规范,类似于SMTP、HTTP 协议。
RabbitMQ 是一个实现了AMQP 的开源消息中间件,使用Erlang编写,故需要Erlang环境。
架构
常用命令
启动rabbit服务:service rabbitmq-server start
停止rabbit服务:service rabbitmq-server stop
后台启动:rabbitmq-server -detached
运行状态:rabbitmqctl status
查看所有用户:rabbitmqctl list_users
添加用户:rabbitmqctl add_user username password
删除用户:rabbitmqctl delete_user username
修改密码:rabbitmqctl change_password username newpassword
查看已经安装的插件:rabbitmq-plugins list
开启网页版控制台:rabbitmq-plugins enable rabbitmq_management
访问网页地址:http://localhost:15672/ 默认账号:guest/guest
二.流程
1.安装好RabbitMQ 也有类似Active MQ 的web 管理后台,非docker 安装需要主动开启:
rabbitmq-plugins enable rabbitmq_management
docker 环境安装已经有相关默认配置:
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management
rabbitmq 启动成功后有一个默认的guset/guest 用户。访问localhost:15672即可进入登录页面:
2.pom.xml 引入spring-boot-starter-amqp:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.application.yml 配置rabbitmq 基本信息:
spring:
rabbitmq:
host: localhost
其实springboot 已经有了默认配置:
4.Exchange
rabbitmq 中所有消息生产者提交的消息都会交由Exchange 进行在分配,Exchange 根据不同的策略将消息发布到不同的Queue中。RabbitMQ 中有4中Exchange策略:Direct、Fanout、Topic、Header。Direct、Topic 在发送消息时都需要指定routing key,Exchange 收到消息后会根据routing key 把消息复制到相应的Queue。每个Queue中的消息都只会被一个消费者消费一次(多个消费者消费同一个Queue中的消息时只有一个消费者能接受到消息)。
(1)DirectExchange策略是将消息队列绑定到一个DirectExchange上,当一条消息到达DirectExchange 时会根据routing key 路由到Queue,如果有多个Queue都符合路由规则,则把消息都复制到多个Queue。
RabbitmqDirectConfig.java
package com.vincent.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqDirectConfig {
public static final String DIRECT_EXCHANGE_NAME = "direct-exchange-name";
@Bean
Queue directQueue(){
Queue queue = new Queue("com.vincent.direct-queue",true,false);
return queue;
}
@Bean
DirectExchange directExchange(){
DirectExchange exchange = new DirectExchange(DIRECT_EXCHANGE_NAME);
return exchange;
}
@Bean
Binding bindQueue(){
return BindingBuilder.bind(directQueue()).to(directExchange()).with("com.vincent.direct-queue");
}
}
配置DirectExchange策略:
new Queue(“com.vincent.direct-queue”,true,false) ,第一个参数配置队列的名称;第2个参数标记队列是否永久,即重启后队列依然存在;第3个参数标识队列长久未使用时是否删除队列。
消息发送和接受
package com.vincent.controller;
import com.vincent.config.RabbitmqDirectConfig;
import com.vincent.model.R;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
public class HelloController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/test")
public R test(String name){
R r = R.ok(name);
template.convertAndSend(RabbitmqDirectConfig.DIRECT_EXCHANGE_NAME,"com.vincent.direct-queue",r);
return r;
}
@RabbitListener(queues = "com.vincent.direct-queue")
public void directListener(R r){
System.out.println(r);
}
}
RabbitMQ 提供了RabbitTemplate 模板发送消息,convertAndSend 方法有许多重载的方法,常用的有指定Exchange名称、routing key、消息对象作为参数发送消息。@RabbitListener 注解方法接受队列消息,方法参数直接为发送的消息对象。
运行结果:
访问: http://localhost:8080/test?name=vincent
(2)FanoutExchange 策略是把所有到达FanoutExchange 的消息转发给所有与它绑定的Queue,这种策略中routing key 将不起作用。
RabbitmqFanoutConfig.java
package com.vincent.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqFanoutConfig {
public static final String FANTOU_EXCHANGE_NAME = "fanout-exchange-name";
@Bean
Queue fanoutQueue(){
Queue queue = new Queue("com.vincent.fanout-queue");
return queue;
}
@Bean
Queue fanoutQueue2(){
Queue queue = new Queue("com.vincent.fanout-queue2");
return queue;
}
@Bean
FanoutExchange fanoutExchange(){
FanoutExchange exchange = new FanoutExchange(FANTOU_EXCHANGE_NAME);
return exchange;
}
@Bean
Binding bindQueue(){
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
@Bean
Binding bindQueue2(){
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
}
提供了2个Banding对象使2个Queue 和FanoutExchange 对象绑定,即在FanoutExchange 接受到消息后会把消息复制到绑定的2个Queue上。
HelloController.java
package com.vincent.controller;
import com.vincent.config.RabbitmqDirectConfig;
import com.vincent.config.RabbitmqFanoutConfig;
import com.vincent.model.R;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
public class HelloController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/test")
public R test(String name){
R r = R.ok(name);
template.convertAndSend(RabbitmqDirectConfig.DIRECT_EXCHANGE_NAME,"com.vincent.direct-queue",r);
template.convertAndSend(RabbitmqFanoutConfig.FANTOU_EXCHANGE_NAME,null,r);
return r;
}
@RabbitListener(queues = "com.vincent.direct-queue")
public void directListener(R r){
System.out.println(r);
}
@RabbitListener(queues = "com.vincent.fanout-queue")
public void fanoutListener(R r){
System.out.println("1---------"+r);
}
@RabbitListener(queues = "com.vincent.fanout-queue2")
public void fanoutListener2(R r){
System.out.println("2---------"+r);
}
}
运行效果:
(3)TopicExchange 策略是通过routing key 绑定到TopicExchange,当消息到达TopicExchange时根据routing key 把消息路由到一个或多个Queue上。Banding 对象需要指定一个banding key,单词之间使用"." 作为分隔。
官网说明如下:
RabbitmqTopicConfig.java
package com.vincent.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqTopicConfig {
public static final String TOPIC_EXCHANGE_NAME = "topic-exchange-name";
@Bean
Queue topicQueue(){
Queue queue = new Queue("com.vincent.topic-queue-1");
return queue;
}
@Bean
Queue topicQueue2(){
Queue queue = new Queue("com.vincent.topic-queue-2");
return queue;
}
@Bean
Queue topicQueue3(){
Queue queue = new Queue("com.vincent.topic-queue-3");
return queue;
}
@Bean
TopicExchange topicExchange(){
TopicExchange exchange = new TopicExchange(TOPIC_EXCHANGE_NAME);
return exchange;
}
@Bean
Binding bindQueue(){
return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("phone.#");
}
@Bean
Binding bindQueue2(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("#.phone");
}
@Bean
Binding bindQueue3(){
return BindingBuilder.bind(topicQueue3()).to(topicExchange()).with("#.phone.#");
}
}
HelloController.java
package com.vincent.controller;
import com.vincent.config.RabbitmqDirectConfig;
import com.vincent.config.RabbitmqFanoutConfig;
import com.vincent.config.RabbitmqTopicConfig;
import com.vincent.model.R;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
public class HelloController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/test")
public R test(String name){
R r = R.ok(name);
template.convertAndSend(RabbitmqTopicConfig.TOPIC_EXCHANGE_NAME,"phone.小米","小米发布新手机了");
template.convertAndSend(RabbitmqTopicConfig.TOPIC_EXCHANGE_NAME,"苹果.phone","苹果手机");
template.convertAndSend(RabbitmqTopicConfig.TOPIC_EXCHANGE_NAME,"product.phone.test","product.phone.test");
return r;
}
@RabbitListener(queues = "com.vincent.topic-queue-1")
public void topicListener1(Object r){
System.out.println("topic---------"+r);
}
@RabbitListener(queues = "com.vincent.topic-queue-2")
public void topicListener2(Object r){
System.out.println("topic-1---------"+r);
}
@RabbitListener(queues = "com.vincent.topic-queue-3")
public void topicListener3(Object r){
System.out.println("topic-3---------"+r);
}
}
运行效果:
(4)HeadersExchange 策略根据消息的Header 将消息路由到不同的Queue 上,这种策略与routing key 无关。
RabbitmqHeadersConfig.java
package com.vincent.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqHeadersConfig {
public static final String HEADERS_EXCHANGE_NAME = "header-exchange-name";
@Bean
Queue headerQueue(){
Queue queue = new Queue("com.vincent.header-queue");
return queue;
}
@Bean
Queue headerQueue2(){
Queue queue = new Queue("com.vincent.header-queue2");
return queue;
}
@Bean
HeadersExchange headerExchange(){
HeadersExchange exchange = new HeadersExchange(HEADERS_EXCHANGE_NAME);
return exchange;
}
@Bean
Binding bindQueue(){
return BindingBuilder.bind(headerQueue()).to(headerExchange()).where("name").exists();
}
@Bean
Binding bindQueue2(){
return BindingBuilder.bind(headerQueue2()).to(headerExchange()).whereAny("age").exist();
}
}
HelloController.java
package com.vincent.controller;
import com.vincent.config.RabbitmqDirectConfig;
import com.vincent.config.RabbitmqFanoutConfig;
import com.vincent.config.RabbitmqHeadersConfig;
import com.vincent.config.RabbitmqTopicConfig;
import com.vincent.model.R;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
public class HelloController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/test")
public R test(String name){
R r = R.ok(name);
template.convertAndSend(RabbitmqHeadersConfig.HEADERS_EXCHANGE_NAME,null, MessageBuilder.withBody("hello amqp".getBytes()).setHeader("name","vincent").build());
template.convertAndSend(RabbitmqHeadersConfig.HEADERS_EXCHANGE_NAME,null, MessageBuilder.withBody("hello amqp".getBytes()).setHeader("age",27).build());
return r;
}
@RabbitListener(queues = "com.vincent.header-queue")
public void headerListener(Message r){
System.out.println("header---------"+r);
}
@RabbitListener(queues = "com.vincent.header-queue2")
public void headerListener2(Message r){
System.out.println("header2---------"+r);
}
}
运行效果:
三.消息投递可靠性
为确保消息投递可靠性需要先为消息设置状态(投递中、消息已确认、已消费)并保存到数据库;收到消息投递确认信息修改数据库数据状态;消费者拿到消息后和数据库数据对比数据状态,只消费状态已确认的消息。
1.application.yml 开启消息确认机制:
spring.rabbitmq.publisher-confirms: true
2.设置RabbitTemplate消息确认回调:
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){//消息确认成功
//CorrelationData为消息生产者投递消息时携带的额外数据,作为消息确认的回调参数,其不经过网络。从CorrelationData获取消息主键,修改数据库数据状态为已确认
}else{
//对确认失败消息做失败处理
}
}
});
3.发送消息时携带上关联数据,
//发送消息时CorrelationData带上消息主键,方便回调时修改数据状态。CorrelationData不会经过网络传输,在生产者消息确认时会带上CorrelationData
String msg = System.currentTimeMillis()+"";
rabbitTemplate.convertAndSend("exchange","test",msg,new CorrelationData(msg));
四.总结
1.相关镜像使用手册(内部端口、配置文件、数据目录等)可以到docker 搜索:
2.整合RabbitMQ 时对MQ 的配置需要提供Queue、不同策略的Exchange、Binding 对象。每个Queue 上的消息只能被消费一次
,故FanoutExchange 示例中提供了2个Queue 和 FanoutExchange绑定。