文章目录
RabbitMQ
皆来自“狂神说Java”
消息队列应用场景
跨系统的数据传递
缓冲/消峰:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们
分布式事务
消息队列的两种模式
点对点模式:
- 消费者主动拉取数据,消息收到后删除消息

发布 / 订阅模式: - 可以有多个topic主题
- 消费者消费数据之后,不会删除数据
- 每个消费者相互独立,都可以消费到数据

消息中间件的本质
是一种提供接收数据,存储数据,发送数据等功能的技术服务
消息中间件的组成部分
- 协议(即通信的双方需要遵守的规则)
- 持久化机制
- 分发策略
- 高可用,高可靠(集群和备份)
- 容错机制
协议
定义
所谓协议是指:
1:计算机底层操作系统和应用程序通讯时共同遵守的一组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流。
2:和一般的网络应用程序的不同它主要负责数据的接受和传递,所以性能比较的高。
3:协议对数据格式和计算机之间交换数据都必须严格遵守规范。
三要素
1.语法:用户数据与控制信息的结构与格式,以及数据出现的顺序。
2.语义:解释控制信息每个部分的意义。它规定了需要发出何种控制信息,以及完成的动作与做出什么样的响应。
3.时序:对事件发生顺序的详细说明。
比如我MQ发送一个信息,是以什么数据格式发送到队列中,然后每个部分的含义是什么,发送完毕以后的执行的动作,以及消费者消费消息的动作,消费完毕的响应结果和反馈是什么,然后按照对应的执行顺序进行处理。
以http为例
语法:http规定了请求报文和响应报文的格式。
语义:客户端主动发起请求称之为请求。(这是一种定义,同时你发起的是post/get请求)
时序:一个请求对应一个响应。(一定先有请求在有响应,这个是时序)
而消息中间件采用的并不是http协议,而常见的消息中间件协议有:OpenWire、AMQP、MQTT、Kafka,OpenMessage协议。
面试题:为什么消息中间件不直接使用http协议呢?
1、(数据冗余)因为http请求报文头和响应报文头是比较复杂的,包含了cookie,数据的加密解密,状态码,响应码等附加的功能,但是对于一个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就行,一定要追求的是高性能。尽量简洁,快速。
2、(不可靠)大部分情况下http大部分都是短链接,在实际的交互过程中,一个请求到响应很有可能会中断,中断以后就不会就行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对数据或消息就行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行。
AMQP
AMQP( Advanced Message Queuing Protocol)是高级消息队列协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
特性:
1、分布式事务支持。
2、消息的持久化支持。
3、高性能和高可靠的消息处理优势。
支持者:RabbitMQ,ActiveMQ
MQTT
MQTT协议(Message Queueing Telemetry Transport)消息队列是IBM开放的一个即时通讯协议,物联网系统架构中的重要组成部分。
特点:
1、轻量
2、结构简单
3、传输快,不支持事务
4、没有持久化设计。
应用场景:
1、计算能力有限
2、低带宽
3、网络不稳定
支持者:RabbitMQ(需要手动支持),ActiveMQ
OpenMessage协议
由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。
特点:
1、结构简单
2、解析速度快
3、支持事务和持久化设计。
支持者:RocketMQ
Kafka协议
Kafka协议是基于TCP/IP的二进制协议。消息内部是通过长度来分割,由一些基本数据类型组成。
特点:
1、结构简单
2、解析速度快
3、无事务支持
4、有持久化设计
分发策略的机制和对比
| ActiveMQ | RabbitMQ | Kafka | RocketMQ | |
|---|---|---|---|---|
| 发布订阅 | 支持 | 支持 | 支持 | 支持 |
| 轮询分发 | 支持 | 支持 | 支持 | / |
| 公平分发(能者多劳) | / | 支持 | 支持 | / |
| 重发 | 支持 | 支持 | / | 支持 |
| 消息拉取 | / | 支持 | 支持 | 支持 |
高可用和高可靠
高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。(集群)
当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署。来达到高可用的目的。
反正终归三句话:
1:要么消息共享
2:要么消息同步
3:要么元数据共享
1、Master-slave主从共享数据的部署方式
2、Master-slave主从同步的部署方式
3、多主集群同步的部署方式
4、多主集群转发的部署方式
5、Master-slave与Breoker-cluster组合的部署方式

高可靠(考虑数据不丢失)
所谓高可用是指:是指系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠。
如何保证中间件消息的可靠性呢?可以从两个方面考虑:
1:消息的传输:通过协议来保证系统间数据解析的正确性。
2:消息的存储可靠:通过持久化来保证消息的可靠性。
下载安装
官网:https://www.rabbitmq.com/
下载教程:https://blog.csdn.net/m0_46093203/article/details/118642090
erlang下载镜像:https://mirrors.cloud.tencent.com/erlang-solutions/
安装erlang
rpm -ivh
yum -y install erlang
// 测试
erl -v
安装RabbitMQ
// 需要该工具
yum -y install socat
rpm -ivh
yum -y install rabbitmq-server
// 开启服务
systemctl start rabbitmq-server
// 查看服务状态
systemctl status rabbitmq-server
// 设置开机自启
systemctl enable rabbitmq-server
安装图形化界面
安装图形化插件
rabbitmq-plugins enable rabbitmq_management
重启rabbitmq服务,访问主机ip:15672
命令小结
rabbitmqctl add_user 账号 密码
rabbitmqctl set_user_tags 账号 权限
rabbitmqctl change_password 账号 新密码 //修改密码
rabbitmqctl delete_user 账号 // 删除用户
rabbitmqctl list_users // 查看用户清单
rabbitmqctl.bat set_ permissions -p / 用户名 ".*" ".*" ".*" //为用户设置权限
docker安装rabbitMQ
默认安装了图形化界面和分配了用户
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management
RabbitMQ角色分类
1: none:
- 不能访问management plugin(图形化界面)
2: management(查看自己相关节点信息)
- 列出自己可以通过AMQP登入的虚拟机
- 查看自己的虚拟机节点virtual hosts的queues,exchanges和bindings信息
- 查看和关闭自己的channels和connections
- 查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts中的活动信息。
3: Policymaker
- 包含management所有权限
- 查看和创建和删除自己的virtual hosts所属的policies和parameters信息。
4: Monitoring
- 包含management所有权限
- 罗列出所有的virtual hosts,包括不能登录的virtual hosts。
- 查看其他用户的connections和channels信息
- 查看节点级别的数据如clustering和memory使用情况
- 查看所有的virtual hosts的全局统计信息。
5: Administrator(最高权限)
- 可以创建和删除virtual hosts
- 可以查看,创建和删除users
- 查看创建permisssions
- 关闭所有用户的connections
AMQP介绍
AMQP生成者流转过程

AMQP消费者流转过程

RabbitMQ的组件和架构

核心概念:
Server(broker) : 接受客户端的连接,实现AMQP实体服务。需要安装rabbitmq-server
Connection(连接):应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手
Channel(网络信道 / 通道):几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message(消息):服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host(虚拟地址):用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange(交换机):接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力,交换机负责消息的投递,消息一定是通过交换机来传递给队列的)
Bindings : Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key :是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue(队列,也称为Message Queue 消息队列):保存消息并将它们转发给消费者。
运行流程
交换机exchange的模式
注意:
消息一定是通过交换机来传递给队列的
routerKey可以重复
一个队列可以绑定多个交换机
默认交换机为direct模式
simple模式-案例

不指定交换机,走的是默认交换机(direct模式)
java原生依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
生产者:
public class Producer {
public static void main(String[] args) {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("47.110.251.206");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 2.创建连接
connection = connectionFactory.
newConnection("myProducer");
// 3.通过连接得到通道 channel
channel = connection.createChannel();
// 4.通过通道得到(交换机exchange,声明队列queue,绑定关系binding,等)
// queueDeclare 声明队列
/*
* queueDeclare的参数
*
* @Params1 队列的名称
* @Params2 durable:是否要持久化,持久化:重启服务后,该队列是否存在
* @Params3 exclusive:排他性,是否为独占队列
* @Params4 autoDelete:是否删除,随着最后一个消费者消费完毕后,该队列是否删除
* @Params5 arguments:携带附加参数
* */
String queueName = "hello";
channel.queueDeclare("hello",false,false,false,null);
// 4.通过通道 发送消息给队列
String message = "hello,RabbitMQ";
// 交换机名,队列名,是否持久化(MessageProperties),要发送的消息
channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("发送成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 5.关闭通道
if (channel.isOpen() && channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 6.关闭连接
if (connection.isOpen() && connection != null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

消费者
public class Consumer {
public static void main(String[] args) {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("47.110.251.206");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 2.创建连接
connection = connectionFactory.
newConnection("myProducer");
// 3.通过连接得到通道 channel
channel = connection.createChannel();
// 4.通过通道 从队列中消费消息
String queueName = "hello";
// true为autoAck,
// 不设置则queue中的message会从ready变为Unacked,释放连接后又变回ready状态
channel.basicConsume(queueName,true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("收到的消息是:" + new String(delivery.getBody(), "UTF-8"));
}
}
// 出现异常的情况
, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("接收消息失败");
}
});
System.out.println("开始接收消息");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 5.关闭通道
if (channel.isOpen() && channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 6.关闭连接
if (connection.isOpen() && connection != null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
fanout模式
向交换机中发送消息时,所有绑定该交换机的队列都能收到消息
direct模式

条件匹配(routerkey),根据routerkey来确定发送给哪些队列(队列的routerkey可以重复)
topic模式

模糊匹配( * 为一级,# 为任意级)

headers模式
参数匹配
发现参数必须全部相等才能匹配
代码创建交换机
注意:
当发送和接收消息时,队列和交换机不存在,会报错
当交换机或队列被重复声明时,会报错
// 通过通道声明(交换机exchange,声明队列queue,绑定关系binding,等)
// queueDeclare 声明队列
/*
* queueDeclare的参数
*
* @Params1 队列的名称
* @Params2 durable:是否要持久化,持久化:重启服务后,该队列是否存在
* @Params3 exclusive:排他性,是否为独占队列
* @Params4 autoDelete:是否删除,随着最后一个消费者消费完毕后,该队列是否删除
* @Params5 arguments:携带附加参数
* */
String queueName = "hello";
channel.queueDeclare(queueName,false,false,false,null);
// 完整 同队列的声明,
// durable,exclusive,autoDelete,arguments:
String exchangeName = "fanout_test_exchange";
String exchangeType = "fanout";
channel.exchangeDeclare(exchangeName, exchangeType);
// 绑定交换机和队列
channel.queueBind(exchangeName, queueName, "routerKey");
实现公平分发(默认轮询分发)
默认为轮询分发
实现公平分发(能者多劳):
只有应答一个消息后,才能从队列中取
整合springboot
步骤:
- 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 编写配置文件
spring:
rabbitmq:
host: 主机地址
username: admin # 默认为guest
password: admin # 默认为guest
port: 5672
- 声明交换机,队列,绑定关系
- 编写配置类
- 注解(不推荐)
- 通过amqpAdmin
@Configuration
public class RabbitMQConfig {
// 声明交换机,direct 模式对应 DirectExchange类,
// 第二个参数,true :durable是否持久化
// 第三个参数,false:autoDelete是否自动删除
@Bean
public DirectExchange directTestExchange(){
return new DirectExchange("direct_test_exchange", true, false);
}
// 声明队列, true:durable是否持久化
@Bean
public Queue testQueue(){
return new Queue("direct_test_queue", true);
}
@Bean
public Queue otherQueue(){
return new Queue("direct_other_queue", true);
}
// 声明绑定关系
// 格式BindingBuilder.bing(队列).to(交换机).with(router_key)
@Bean
public Binding testBind(){
return BindingBuilder.bind(testQueue()).to(directTestExchange()).with("test");
}
@Bean
public Binding otherBind(){
return BindingBuilder.bind(otherQueue()).to(directTestExchange()).with("other");
}
}
在消费者监听时,声明
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
value = @Queue(value = "sms.fanout.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "fanout_order_exchange",
// 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
type = ExchangeTypes.FANOUT)
))
@Autowired
AmqpAdmin amqpAdmin;
@Test
void test(){
amqpAdmin.declareExchange();
amqpAdmin.declareBinding();
amqpAdmin.declareQueue();
}
- 生产者发送,消费者接收
发送消息(通过rabbitTemplate)
@Service
public class Producer {
@Autowired
RabbitTemplate rabbitTemplate;
public void publish(String msg, String routerKey){
String exchangeName = "direct_test_exchange";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, routerKey, msg);
}
}
监听并接收消息
(通过注解)
@Service
@RabbitListener(queues = "direct_test_queue")
public class TestConsume {
@RabbitHandler
public void consume(String msg){
System.out.println("TestConsume: " + msg);
}
}
// 测试后发现,下面的方式也可以
@Service
public class TestConsume {
@RabbitListener(queues = "direct_test_queue")
public void consume(String msg){
System.out.println("TestConsume: " + msg);
}
}
接收消息(通过rabbitTemplate)
rabbitTemplate.receiveAndConvert("");
设置序列化方式
新增配置类,或重写rabbitTemplate(作用是一样的替换了原来的SimpleMessageConverter()序列化方式,可以解析json)
@Configuration
public class MyMqconfig {
@Bean
public MessageConverter getMessageConverter(){
return new Jackson2JsonMessageConverter();
}
}
过期时间 ttl
time to live
有两种设置的方式
方式一:设置队列的过期时间(即队列中的所有消息的过期时间),可以设置过期的消息会进入死信队列

@Bean
public Queue ttlQueue(){
Map<String, Object> args = new HashMap<>();
// 设置过期队列的过期时间,5000ms,在创建队列时传入
args.put("x-message-ttl", 5000);
return new Queue("direct_ttl_queue", true, false, false, args);
}
在图形化界面查看效果
方式二:设置单个消息的过期时间(过期后不可复原)MessagePostProcessor
public void publishTtl(String msg, String routerKey){
String exchangeName = "direct_test_exchange";
// 可以自定义消息的一些属性
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置过期时间 5000ms
message.getMessageProperties().setExpiration("5000");
// 设置编码格式
message.getMessageProperties().setContentEncoding("utf-8");
return message;
}
};
// 发送消息,传入messagePostProcessor
rabbitTemplate.convertAndSend(exchangeName, routerKey, msg, messagePostProcessor);
}
死信队列
本质就是一个普通的队列,被设置了TTL(过期时间)的队列当做了【墓地】
死信交换机(DLX)

@Bean
public Queue ttlQueue(){
Map<String, Object> args = new HashMap<>();
// 设置过期队列的过期时间,5000ms,在创建队列时传入
args.put("x-message-ttl", 5000);
// 设置死信交换机,需要指定一个存在的交换机
args.put("x-dead-letter-exchange", "");
// 若死信交互机是direct模式,则需要指定router-key
args.put("x-dead-letter-routing-key", "");
return new Queue("direct_ttl_queue", true, false, false, args);
}

内存磁盘控制
原文:https://www.kuangstudy.com/zl/rabbitmq#1366722371607715841
内存控制
rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
内存换页
vm_memory_high_watermark.relative = 0.4
vm_memory_high_watermark_paging_ratio = 0.7(设置小于1的值)
磁盘控制
rabbitmqctl set_disk_free_limit <disk_limit>
rabbitmqctl set_disk_free_limit memory_limit <fraction>
disk_limit:固定单位 KB MB GB
fraction :是相对阈值,建议范围在:1.0~2.0之间。(相对于内存)
集群配置
原文:https://www.kuangstudy.com/zl/rabbitmq#1367869499746869249
# 启动第一个节点rabbit-1
RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start &
# 启动第二个节点rabbit-2
RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start &
# 把rabbit-1作为主节点的操作
## 停止应用
rabbitmqctl -n rabbit-1 stop_app
## 目的是清除节点上的历史数据(如果不清除,无法将节点加入到集群)
rabbitmqctl -n rabbit-1 reset
## 启动应用
rabbitmqctl -n rabbit-1 start_app
# 把rabbit-2作为从节点的操作
## 停止应用
rabbitmqctl -n rabbit-2 stop_app
## 目的是清除节点上的历史数据(如果不清除,无法将节点加入到集群)
rabbitmqctl -n rabbit-2 reset
## 将rabbit2节点加入到rabbit1(主节点)集群当中【Server-node服务器的主机名】
rabbitmqctl -n rabbit-2 join_cluster rabbit-1@'Server-node'
## 启动应用
rabbitmqctl -n rabbit-2 start_app
# 给添加账户,主从节点之间账户是共享的
rabbitmqctl -n rabbit-1 add_user admin admin
rabbitmqctl -n rabbit-1 set_user_tags admin administrator
Tips:
如果采用多机部署方式,需读取其中一个节点的cookie, 并复制到其他节点(节点之间通过cookie确定相互是否可通信)。cookie存放在/var/lib/rabbitmq/.erlang.cookie。
例如:主机名分别为rabbit-1、rabbit-2
1、逐个启动各节点
2、配置各节点的hosts文件( vim /etc/hosts)
ip1:rabbit-1
ip2:rabbit-2
其它步骤雷同单机部署方式
消息确认机制
原文:https://blog.csdn.net/pan_junbiao/article/details/112956537
消息发送确认
spring:
# RabbitMQ服务配置
rabbitmq:
# 消息确认(ACK)
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
使用
rabbitTemplate.setConfirmCallback();
// 可自定义rabbitTemplate时设置,也可以通过@PostConstruct注解设置
消息接收确认
消息确认模式有:
- AcknowledgeMode.NONE:自动确认。
- AcknowledgeMode.AUTO:根据情况确认。
- AcknowledgeMode.MANUAL:手动确认。
消费者收到消息后,手动调用Channel类下的 Basic.Ack 或 Basic.Nack 或 Basic.Reject 后,RabbitMQ 收到这些消息后,才认为本次投递完成。
- Basic.Ack 命令:用于确认当前消息。
- Basic.Nack 命令:用于否定当前消息(注意:这是AMQP 0-9-1的RabbitMQ扩展) 。
- Basic.Reject 命令:用于拒绝当前消息。
void basicAck(long deliveryTag, boolean multiple) throws IOException;
long deliveryTag:唯一标识 ID,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。
boolean multiple:是否批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
long deliveryTag:唯一标识 ID。
boolean multiple:上面已经解释。
boolean requeue:如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者。
void basicReject(long deliveryTag, boolean requeue) throws IOException;
分布式事务
可靠生产
创建冗余表记录 发送状态,利用发送确认机制改变状态码,再通过定时任务发送状态码为0的消息,形成闭环
server:
port: 9000
spring:
datasource:
url: jdbc:mysql://localhost:3306/producer-r?useUnicode=true&characterEncoding=utf-8&useSSL=true
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
rabbitmq:
host: 47.110.251.206
username: admin
password: admin
# 消息确认(ACK)
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
@Service
public class Producer {
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
JdbcTemplate jdbcTemplate;
// PostConstruct 很多人以为是spring提供的,其实是java自己的注解
// 被PostConstruct修饰的方法会在服务器加载servlet的时候运行,并且只会被服务器执行一次
// PostConstruct在构造函数之后执行,init()方法之前执行
// 简单来说就是在下面send方法调用rabbitTemplate之前,将rabbitTemplate的确认方法重载好
@PostConstruct
public void callBack(){
// 设置交换机回调方法
// 消息发送成功后,给予生产者的消息回执,来确保生产者的可靠性
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("错误原因: " + cause);
System.out.println("相关数据: " + correlationData);
System.out.println("是否应答(投递交换机是否成功): " + ack);
// 若在发送时未传入correlationData,则此时为空
int id = Integer.parseInt(correlationData.getId());
if (!ack){
System.out.println("投递交换机失败:id 为 " + id);
return;
}
System.out.println(correlationData);
// 冗余表状态置1,代表该消息投递成功(可以额外再做一个定时任务,投递状态为0的消息)
String sql = "update `producer-r`.info\n" +
"set state = 1\n" +
"where id = ?";
jdbcTemplate.update(sql, id);
System.out.println("投递交换机成功:id 为 " + id);
}
});
}
public void publish(String msg) throws JsonProcessingException, InterruptedException {
ObjectMapper mapper = new ObjectMapper();
Info info = mapper.readValue(msg, Info.class);
String exchangeName = "fanout_R_exchange";
rabbitTemplate.convertAndSend(exchangeName, "", msg, new CorrelationData(info.getId() + ""));
String sql = "insert into `producer-r`.info(id, msg) values (?,?)";
try {
jdbcTemplate.update(sql, info.getId(), info.getMsg());
} catch (DataAccessException e) {
System.out.println("数据以存在");
}
// 测试用,需要给交换机回调方法监听的时间
Thread.sleep(10000);
}
}
可靠消费
原文:https://blog.csdn.net/w15558056319/article/details/123577861
server:
port: 9001
spring:
datasource:
url: jdbc:mysql://localhost:3306/consumer-r?useUnicode=true&characterEncoding=utf-8&useSSL=true
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
rabbitmq:
host: 47.110.251.206
username: admin
password: admin
listener:
simple:
retry:
enabled: true # 开启重试,默认是false关闭状态
max-attempts: 3 # 最大重试次数,默认是3次
initial-interval: 2000ms # 每次重试间隔时间
acknowledge-mode: manual
方式一:重新发送 + 死信队列 + 监听死信队列
server:
port: 9001
spring:
datasource:
url: jdbc:mysql://localhost:3306/consumer-r?useUnicode=true&characterEncoding=utf-8&useSSL=true
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
rabbitmq:
host: 47.110.251.206
username: admin
password: admin
listener:
simple:
retry:
enabled: true # 开启重试,默认是false关闭状态
max-attempts: 3 # 最大重试次数,默认是3次
initial-interval: 2000ms # 每次重试间隔时间
try/catch会屏蔽掉重试策略
若配置了acknowledge-mode: manual(手动应答),重试策略会生效,但不会进入死信队列,如下图
@Service
public class Consumer1 {
@Autowired
ObjectMapper objectMapper;
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
JdbcTemplate jdbcTemplate;
@RabbitListener(queues = "fanout_R_queue")
public void consume(String msg) throws JsonProcessingException {
Info info = objectMapper.readValue(msg, Info.class);
System.out.println(info);
// 人为异常
System.out.println(1 / 0);
String sql = "insert into `consumer-r`.info(id, msg) VALUES (?,?)";
jdbcTemplate.update(sql, info.getId(), info.getMsg());
}
}

方式二:try/catch + 手动确认消息 + 死信队列 + 监听死信队列
server:
port: 9001
spring:
datasource:
url: jdbc:mysql://localhost:3306/consumer-r?useUnicode=true&characterEncoding=utf-8&useSSL=true
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
rabbitmq:
host: 47.110.251.206
username: admin
password: admin
listener:
simple:
# 参数说明:none 不确认 auto 自动确认 manual 手动确认
acknowledge-mode: manual
@RabbitListener(queues = "fanout_R_queue")
public void consume(String msg, Channel channel, CorrelationData correlationData
, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
System.out.println(tag);
try {
// 获取消息
Info info = objectMapper.readValue(msg, Info.class);
System.out.println(info);
// 手动制造异常
System.out.println(1 / 0);
// 业务代码
String sql = "insert into `consumer-r`.info(id, msg) VALUES (?,?)";
jdbcTemplate.update(sql, info.getId(), info.getMsg());
// 确认当前信息
channel.basicAck(tag, false);
} catch (Exception e) {
e.printStackTrace();
try {
// 如果出现异常的情况下,根据实际的情况去进行重发
/** @param1 : 传递标签,消息的tag
* @param2 : 确认一条消息还是多条, false:只确认e.DeliverTag这条消息 true:确认小于等于e.DeliverTag的所有消息
* @param3 : 消息失败了是否进行重发
* false:消息直接丢弃,不重发,如果绑定了死信队列,则消息打入死信队列
* true:重发,设置为true,就不要加到try/catch代码中,否则会进入重发死循环
*/
// 否定当前信息
channel.basicNack(tag, false, false);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
流程解析:
- 发生异常,流程进入到catch块
- channel.basicAck(tag,false)。 第一个参数是消息的标签,第二个参数是确认一条消息还是多条,我们设置的是false,表示只确认当前处理的这条消息,确认消费成功了
- catch块是针对消息处理的策略,准备如何处理这条消息?直接抛异常丢弃消息,还是触发消息的重新发送,具体需要根据业务进行处理
幂等性问题及设计思想
原文:https://zhuanlan.zhihu.com/p/74046140
小结
我们知道配置重试策略,当达到最大重试次数,消息会从队列中自动删除,如果同时也配置了手动ack,但实际代码没有进行ack的设置,则达到最大重试次数后,消息不会被删除,而是从ready就绪状态,变更为未应答状态
.
可靠生产中,需要确保消息正确投递到队列中去,由于外界因素,网络波动导致处理延迟等因素,而可能会造成消息的投递失败,或者是多次投递。
.
消费者在消费消息时,要保证数据的幂等性,不能重复消费同一个订单。
.
最好是使用单体架构去处理,避免分布式事务,而非必要同步的非核心业务做成异步,提高响应速度!
RabbitMQ配置文件
rabbitmq:
addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
# port:
##集群配置 addresses之间用逗号隔开
# addresses: ip:port,ip:port
password: admin
username: 123456
virtual-host: / # 连接到rabbitMQ的vhost
requested-heartbeat: #指定心跳超时,单位秒,0为不指定;默认60s
publisher-confirms: #是否启用 发布确认
publisher-reurns: # 是否启用发布返回
connection-timeout: #连接超时,单位毫秒,0表示无穷大,不超时
cache:
channel.size: # 缓存中保持的channel数量
channel.checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
connection.size: # 缓存的连接数,只有是CONNECTION模式时生效
connection.mode: # 连接工厂缓存模式:CHANNEL 和 CONNECTION
listener:
simple.auto-startup: # 是否启动时自动启动容器
simple.acknowledge-mode: # 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
simple.concurrency: # 最小的消费者数量
simple.max-concurrency: # 最大的消费者数量
simple.prefetch: # 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
simple.transaction-size: # 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
simple.default-requeue-rejected: # 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
simple.idle-event-interval: # 多少长时间发布空闲容器时间,单位毫秒
simple.retry.enabled: # 监听重试是否可用
simple.retry.max-attempts: # 最大重试次数
simple.retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔
simple.retry.multiplier: # 应用于上一重试间隔的乘数
simple.retry.max-interval: # 最大重试时间间隔
simple.retry.stateless: # 重试是有状态or无状态
template:
mandatory: # 启用强制信息;默认false
receive-timeout: # receive() 操作的超时时间
reply-timeout: # sendAndReceive() 操作的超时时间
retry.enabled: # 发送重试是否可用
retry.max-attempts: # 最大重试次数
retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔
retry.multiplier: # 应用于上一重试间隔的乘数
retry.max-interval: #最大重试时间间隔
集群监控
原文:https://www.kuangstudy.com/zl/rabbitmq#1368199762003718146