RabbitMQ在生产中的实际应用

怎么确保消息的可靠性

要确保消息的可靠性,主要从这两方面去确认

  • 消息成功到达Exchange
  • 消息成功到达Queue

如果这两步中任一步骤出现问题,那么消息就没有成功送达,此时我们可能要通过重试等方式去重新发送消息,多次重试之后,如果消息还是不能到达,则可能就需要人工介入了。

经过上面的分析,我们可以确认,要确保消息成功发送,我们只需要做好三件事就可以了:

  • 确认消息到达 Exchange。
  • 确认消息到达 Queue。
  • 开启定时任务,定时投递那些发送失败的消息。

生产者保证消息可靠

先来看第一步,确认消息到达Exchange,而将消息投递到Exchange是生产者所干的事,在rabbitmq中,提供两种方式确保发送端的消息可靠,一种是事务,一种是confirm机制。事务模式会造成吞吐量下降,并且是同步的,所以实际应用中是不会用到的,主要还是confirm机制。

配置

首先在配置文件中配置:

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

第一行配置是消息到达交换机后的回调,它有3个取值

  • none:表示禁用发布确认模式,默认即此。
  • correlated:表示成功发布消息到交换器后会触发的回调方法。
  • simple:类似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的调用。

第二行配置是消息到达队列后的回调
然后就需要在配置类中配置这两个回调

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        // 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("ConfirmCallback:     " + "相关数据:" + correlationData);
            log.info("ConfirmCallback:     " + "确认情况:" + ack);
            log.info("ConfirmCallback:     " + "原因:" + cause);
        });

        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("ReturnCallback:     " + "消息:" + message);
            log.info("ReturnCallback:     " + "回应码:" + replyCode);
            log.info("ReturnCallback:     " + "回应信息:" + replyText);
            log.info("ReturnCallback:     " + "交换机:" + exchange);
            log.info("ReturnCallback:     " + "路由键:" + routingKey);
        });
        return rabbitTemplate;
    }

ConfirmCallback用来确定消息到达交换机,ReturnCallback会在消息路由到队列失败时被调用,因为设置了Mandatory为true,所以无论推送结果怎样都会触发回调。

下面是生产者的一个通用方法:

    public void send(Object data) {
    	//org.springframework.amqp.core.MessageProperties
        MessageProperties properties = new MessageProperties();
        properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        //import org.springframework.amqp.core.Message;
        Message message = new Message(JSON.toJSONBytes(data), properties);
        rabbitTemplate.send(exchange, routeKey, message);
    }

消费者确保消息可靠

为了保证消息能够可靠的到达消费者,RabbitMQ提供了消息消费确认机制。

  • 当 autoAck 为 false 的时候,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除。

    当我们将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,消费分成了两个部分:
    1、待消费的消息
    2、已经投递给消费者,但是还没有被消费者确认的消息

    换句话说,当设置 autoAck 为 false 的时候,消费者就变得非常从容了,它将有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack,此时 RabbitMQ 才会认为这条消息消费成功了。如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。

  • 当 autoAck 为 true 的时候,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除(从内存或者磁盘中),即使这些消息并没有到达消费者。

综上所述,确保消息被成功消费,无非就是手动 Ack 或者自动 Ack,无他。当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说我们还需要在处理消息时,解决幂等性问题。

配置

在spring boot中,消息默认就是自动确认的,所以需要在配置文件中关闭自动确认,开启手动确认:

spring.rabbitmq.listener.simple.acknowledge-mode=manual

然后来看消费者的代码:

    @RabbitListener(queues = "queue",
            containerFactory = "配置的自定义工厂,如果有")
    @RabbitHandler
    public void consumer(Channel channel, @Payload 生产者发送过来的消息, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        log.info("rabbitmq tenant {}", message);

        try {
            // 业务处理
            // 成功
            //注意是这个包com.rabbitmq.client.Channel;
            channel.basicAck(tag, false);
        } catch (Exception e) {
            log.error("消息监听:异常", e);
            try {
				// 未成功消费
				// requeue重发  fasle 不会重发,会把消息打入死信队列;
                // true会进入死循环的重发(造成重复消费),建议true的情况下,不使用try  catch 否则造成循环
                channel.basicNack(tag, false, false);
            } catch (IOException ee) {
                log.error("rabbitmq nack", e);
            }
        }
    }

消费者要做的事情就是将业务处理的逻辑放在一个try catch中,如果消息消费成功,则执行basicAck,如果消息消费失败,则执行basicNack方法,告诉RabbitMQ消息消费失败。
这里涉及到两个方法:

  • basicAck:这个是手动确认消息已经成功消费,该方法有两个参数:第一个参数表示消息的投递序号;第二个参数 multiple 如果为 false,表示仅确认当前消息消费成功,如果为 true,则表示当前消息之前所有未被当前消费者确认的消息都消费成功。
  • basicNack:这个是告诉 RabbitMQ 当前消息未被成功消费,该方法有三个参数:第一个参数表示消息的投递序号;第二个参数 multiple 如果为 false,表示仅拒绝当前消息的消费,如果为 true,则表示拒绝当前消息之前所有未被当前消费者确认的消息;第三个参数 requeue 含义和前面所说的一样,被拒绝的消息是否重新入队,true表示重新入队,false表示丢弃。

上面还有几个名称,这里解释下:
channel
网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可以建立多个Channel,每个Channel代表一个会话任务。如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。

delivery_tag
注意到上面消费者的方法中有一个参数tag,它是Header中的deliveryTag:delivery_tag是消息投递序号,每个channel对应一个(long类型),从1开始到9223372036854775807范围,在手动消息确认时可以对指定delivery_tag的消息进行ack、nack、reject等操作。

每次消费或者重新投递requeue后,delivery_tag都会增加,理论上该正常业务范围内,该值永远不会达到最大范围上限。可以根据每个消费者对应channel的delivery_tag消费速率计算到达最大值需要的时间

在上面的方法上我们也指定了containerFactory,可以做如下配置:

    @Bean(name = "systemRabbitListenerContainerFactory")
    public RabbitListenerContainerFactory<?> systemRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 指定最小的消费者数量
        factory.setConcurrentConsumers(8);
        // 指定最大的消费者数量
        factory.setMaxConcurrentConsumers(16);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //手动,和上面说的配置文件一样,如果配置文件配了,这里可以不配
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

队列保证消息可靠

交换机创建的时候默认是开启持久化的,但是队列不会,如果MQ宕机或异常重启后队列就会丢失,所以我们可以在创建队列的时候指定开启持久化,但是这样只会持久化队列的元数据,队列中的消息是不会持久化的。至于需不需要持久化消息,需要根据具体的业务,因为持久化消息后,会造成性能的下架,毕竟磁盘的读写速度远远小于内存。

如果需要配置,那么在发送消息的时候设置
MessageProperties.PERSISTENT_TEXT_PLAIN


版权声明:本文为qq_44663150原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。