怎么确保消息的可靠性
要确保消息的可靠性,主要从这两方面去确认
- 消息成功到达Exchange
- 消息成功到达Queue
如果这两步中任一步骤出现问题,那么消息就没有成功送达,此时我们可能要通过重试等方式去重新发送消息,多次重试之后,如果消息还是不能到达,则可能就需要人工介入了。
经过上面的分析,我们可以确认,要确保消息成功发送,我们只需要做好三件事就可以了:
- 确认消息到达 Exchange。
- 确认消息到达 Queue。
- 开启定时任务,定时投递那些发送失败的消息。
生产者保证消息可靠
先来看第一步,确认消息到达Exchange,而将消息投递到Exchange是生产者所干的事,在rabbitmq中,提供两种方式确保发送端的消息可靠,一种是事务,一种是confirm机制。事务模式会造成吞吐量下降,并且是同步的,所以实际应用中是不会用到的,主要还是confirm机制。
配置
首先在配置文件中配置:
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
第一行配置是消息到达交换机后的回调,它有3个取值
- none:表示禁用发布确认模式,默认即此。
- correlated:表示成功发布消息到交换器后会触发的回调方法。
- simple:类似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的调用。
第二行配置是消息到达队列后的回调
然后就需要在配置类中配置这两个回调
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("ConfirmCallback: " + "相关数据:" + correlationData);
log.info("ConfirmCallback: " + "确认情况:" + ack);
log.info("ConfirmCallback: " + "原因:" + cause);
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("ReturnCallback: " + "消息:" + message);
log.info("ReturnCallback: " + "回应码:" + replyCode);
log.info("ReturnCallback: " + "回应信息:" + replyText);
log.info("ReturnCallback: " + "交换机:" + exchange);
log.info("ReturnCallback: " + "路由键:" + routingKey);
});
return rabbitTemplate;
}
ConfirmCallback用来确定消息到达交换机,ReturnCallback会在消息路由到队列失败时被调用,因为设置了Mandatory为true,所以无论推送结果怎样都会触发回调。
下面是生产者的一个通用方法:
public void send(Object data) {
//org.springframework.amqp.core.MessageProperties
MessageProperties properties = new MessageProperties();
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
//import org.springframework.amqp.core.Message;
Message message = new Message(JSON.toJSONBytes(data), properties);
rabbitTemplate.send(exchange, routeKey, message);
}
消费者确保消息可靠
为了保证消息能够可靠的到达消费者,RabbitMQ提供了消息消费确认机制。
当 autoAck 为 false 的时候,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除。
当我们将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,消费分成了两个部分:
1、待消费的消息
2、已经投递给消费者,但是还没有被消费者确认的消息换句话说,当设置 autoAck 为 false 的时候,消费者就变得非常从容了,它将有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack,此时 RabbitMQ 才会认为这条消息消费成功了。如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。
当 autoAck 为 true 的时候,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除(从内存或者磁盘中),即使这些消息并没有到达消费者。
综上所述,确保消息被成功消费,无非就是手动 Ack 或者自动 Ack,无他。当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说我们还需要在处理消息时,解决幂等性问题。
配置
在spring boot中,消息默认就是自动确认的,所以需要在配置文件中关闭自动确认,开启手动确认:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
然后来看消费者的代码:
@RabbitListener(queues = "queue",
containerFactory = "配置的自定义工厂,如果有")
@RabbitHandler
public void consumer(Channel channel, @Payload 生产者发送过来的消息, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
log.info("rabbitmq tenant {}", message);
try {
// 业务处理
// 成功
//注意是这个包com.rabbitmq.client.Channel;
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("消息监听:异常", e);
try {
// 未成功消费
// requeue重发 fasle 不会重发,会把消息打入死信队列;
// true会进入死循环的重发(造成重复消费),建议true的情况下,不使用try catch 否则造成循环
channel.basicNack(tag, false, false);
} catch (IOException ee) {
log.error("rabbitmq nack", e);
}
}
}
消费者要做的事情就是将业务处理的逻辑放在一个try catch中,如果消息消费成功,则执行basicAck,如果消息消费失败,则执行basicNack方法,告诉RabbitMQ消息消费失败。
这里涉及到两个方法:
- basicAck:这个是手动确认消息已经成功消费,该方法有两个参数:第一个参数表示消息的投递序号;第二个参数 multiple 如果为 false,表示仅确认当前消息消费成功,如果为 true,则表示当前消息之前所有未被当前消费者确认的消息都消费成功。
- basicNack:这个是告诉 RabbitMQ 当前消息未被成功消费,该方法有三个参数:第一个参数表示消息的投递序号;第二个参数 multiple 如果为 false,表示仅拒绝当前消息的消费,如果为 true,则表示拒绝当前消息之前所有未被当前消费者确认的消息;第三个参数 requeue 含义和前面所说的一样,被拒绝的消息是否重新入队,true表示重新入队,false表示丢弃。
上面还有几个名称,这里解释下:
channel
网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可以建立多个Channel,每个Channel代表一个会话任务。如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
delivery_tag
注意到上面消费者的方法中有一个参数tag,它是Header中的deliveryTag:delivery_tag是消息投递序号,每个channel对应一个(long类型),从1开始到9223372036854775807范围,在手动消息确认时可以对指定delivery_tag的消息进行ack、nack、reject等操作。
每次消费或者重新投递requeue后,delivery_tag都会增加,理论上该正常业务范围内,该值永远不会达到最大范围上限。可以根据每个消费者对应channel的delivery_tag消费速率计算到达最大值需要的时间
在上面的方法上我们也指定了containerFactory,可以做如下配置:
@Bean(name = "systemRabbitListenerContainerFactory")
public RabbitListenerContainerFactory<?> systemRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 指定最小的消费者数量
factory.setConcurrentConsumers(8);
// 指定最大的消费者数量
factory.setMaxConcurrentConsumers(16);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//手动,和上面说的配置文件一样,如果配置文件配了,这里可以不配
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
队列保证消息可靠
交换机创建的时候默认是开启持久化的,但是队列不会,如果MQ宕机或异常重启后队列就会丢失,所以我们可以在创建队列的时候指定开启持久化,但是这样只会持久化队列的元数据,队列中的消息是不会持久化的。至于需不需要持久化消息,需要根据具体的业务,因为持久化消息后,会造成性能的下架,毕竟磁盘的读写速度远远小于内存。
如果需要配置,那么在发送消息的时候设置
MessageProperties.PERSISTENT_TEXT_PLAIN