rabbitmq总结

什么是mq?

Message Queue 即消息队列,是基础数据结构中先进先出的一种数据结构。一般用来解决应用解耦,异步,削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构

什么是rabbitmq?

rabbitmq是实现AMQP(高级消息队列协议)的一个开源消息代理软件。使用erlang语言编写。

用来做什么的?

应用在系统应用的大概三个方向

  • 异步
  • 削峰
  • 解耦

概念

组成部分:

  • Broker::就是rabbitmq中间件本身.包含两部分Exchange和Queue
  • Exchange: 交换机,负责把接收到的消息转发给指定队列
  • Queue:队列,负责接收消息,并把消息发送给消费者
  • Producer: 生产者,生成消息
  • Consumer:消费者,消费消息

其他概念

  • RoutingKey: 路由键, 交换机在发送消息时可以指定路由键(这里可以拆分出另一个概念BindingKey),队列在接收消息时可以指定路由键

交换器4种类型

  • fanout:把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
  • direct:把消息路由到BindingKey和RoutingKey完全匹配的队列中。
  • topic:RoutingKey 为一个 点号'.': 分隔的字符串。比如: java.xiaoka.show

        BindingKey和RoutingKey一样也是点号“.“分隔的字符串。

        BindingKey可使用 * 和 # 用于做模糊匹配,*匹配一个单词,#匹配多个或者0个

  • headers:不依赖路由键匹配规则路由消息。是根据发送消息内容中的headers属性进行匹配。性能差,基本用不到

消息模型:

  • 基本消息模型:生产 -> 队列 -> 消费
  • work消费模型:一个生产 -> 队列 -> 多个消费
  • 发布订阅:交换机类型为Fanout(广播) 生产 -> 交换机 -> 队列  -> 多个消费
  • Routing路由模型: 交换机类型为Direct  生产 -> 交换机(根据路由键规则转发) -> 多个队列  -> 多个消费
  • Topics 通配符模型: 交换机类型为Topics 生产 -> 交换机(根据路由键规则转发可以用类似正则规则匹配) -> 多个队列  -> 多个消费

死信队列:

        条件:

  • 被拒绝的消息
    •  channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
      • 这里说明一下最后一个参数:如果设置为true,会一直重试。设置为false,如果有死信队列直接丢到死信队列里,如果没有就直接丢弃。 
  • 消息超时
    • 这里说的超时 是指没有消费者接收的情况下,消息投递超时
  • 队列达到最大长度

怎么用?

和springboot集成

1. maven依赖(springboot版本为2.6.6)

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring‐boot‐starter‐test</artifactId>
</dependency>

2. 在application.yml中添加Rabbitmq的配置:

server:
  port: 10006  
spring:
  application:
    name: mtc-rabbitmq-producer
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtualHost: /
    listener:
      simple:
        acknowledge-mode: manual # 设置手动ack
    template:
      retry:
        enabled: true
        initial-interval: 10000ms
        max-interval: 300000ms
        multiplier: 2
        max-attempts: 3
      exchange: topic.exchange
    publisher-confirm-type: simple
template:

    retry:失败重试(捕获异常的重试,没有抛出异常此配置不生效)

        enabled:开启失败重试

        initial-interval:第一次重试的间隔时长

        max-interval:最长重试间隔,超过这个间隔将不再重试

        multiplier:下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍

        exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
        max-attempts: 尝试重试次数

publisher-confirm-type:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试

3、定义RabbitConifg配置类,配置Exchange、Queue、绑定交换机以及死信队列

package com.example.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfig {
    public static final String QUEUE_LOG = "queue_log";
    public static final String EXCHANGE_NAME="topic.exchange";
    public static final String ROUTING_KEY_LOG="topic.#.log.#";

    public static final String QUEUE_LOG_DEAD = "queue_log_dead";
    public static final String EXCHANGE_LOG_DEAD_NAME="topic.exchange.dead";
    public static final String ROUTING_KEY_LOG_DEAD="topic.dead";

    /**
     * 交换机
     * @return
     */
    @Bean(EXCHANGE_NAME)
    public Exchange exchange(){
        //durable(true) 持久化,mq重启之后交换机还在
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    /**
     * 日志队列
     * @return
     */
    @Bean(QUEUE_LOG)
    public Queue logQueue(){
        //这里设置私信队列,超过200s没有人消费/队列长度超过10 都会把消息投递到死信队列
        return QueueBuilder.durable(QUEUE_LOG)
                .ttl(200000)
                .maxLength(10)
                .deadLetterExchange(EXCHANGE_LOG_DEAD_NAME)
                .deadLetterRoutingKey(ROUTING_KEY_LOG_DEAD).build();
    }

    /**
     * 路由规则
     */
    @Bean
    public Binding bindingLog(@Qualifier(QUEUE_LOG) Queue queue,
                              @Qualifier(EXCHANGE_NAME) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_LOG).noargs();
    }

    //声明死信Exchange
    @Bean(EXCHANGE_LOG_DEAD_NAME)
    public TopicExchange deadLetterExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_LOG_DEAD_NAME).durable(true).build();
    }

    @Bean(QUEUE_LOG_DEAD)
    public Queue deadLetterQueue(){
        return QueueBuilder.durable(QUEUE_LOG_DEAD).build();
    }

    @Bean
    public Binding deadLetterQueueBinding(@Qualifier(QUEUE_LOG_DEAD)Queue deadLetterQueue, @Qualifier(EXCHANGE_LOG_DEAD_NAME) TopicExchange deadLetterExchange){
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(ROUTING_KEY_LOG_DEAD);
    }

}

4、生产者

package com.example.demo;

import com.example.demo.config.RabbitmqConfig;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
class DemoApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void sendMsgByTopics(){

        /**
         * 参数:
         * 1、交换机名称
         * 2、routingKey
         * 3、消息内容
         */
        for (int i=0;i<5;i++){
            String message = "日志来了"+i;
            rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"topic.log",message);
            System.out.println(" [x] Sent '" + message + "'");
        }

    }

}

 5、消费者

package com.example.demo.rabbitmq;

import com.example.demo.config.RabbitmqConfig;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ReceiveHandler {


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue_mtc", durable = "true"),
            exchange = @Exchange(
                    value = RabbitmqConfig.EXCHANGE_NAME,
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {RabbitmqConfig.ROUTING_KEY_LOG}))
    public void receiveLog(String msg){
        System.out.println(" log received : " + msg + "!");
    }

    @RabbitListener(queues = RabbitmqConfig.QUEUE_LOG_DEAD)
    public void deadLetterQueue(String msg, Channel channel, Message message) throws IOException {
        System.out.println("死信队列消费消息:" + msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

遇到的问题以及如何解决?

RabbitMQ 如何持久化

  • exchange 持久化,在声明时指定 durable 为 true
  • queue 持久化,在声明时指定 durable 为 true
  • message 持久化,在投递时指定 delivery_mode=2(1是非持久化)

代码中体现

//交换机持久化
channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);
//队列持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
//消息持久化 (第一种)
channel.basicPublish("", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
//消息持久化 (第二种)
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
properties.deliveryMode(2);  // 设置消息是否持久化,1: 非持久化 2:持久化
channel.basicPublish(EXCHANGE_NAME, "", properties.build(), message.getBytes("UTF-8"));

消息堆积怎么办?

        通过线程池,异步消费


版权声明:本文为lixiaoyi01原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。