什么是mq?
Message Queue 即消息队列,是基础数据结构中先进先出的一种数据结构。一般用来解决应用解耦,异步,削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构
什么是rabbitmq?
rabbitmq是实现AMQP(高级消息队列协议)的一个开源消息代理软件。使用erlang语言编写。
用来做什么的?
应用在系统应用的大概三个方向
- 异步
- 削峰
- 解耦
概念
组成部分:
- Broker::就是rabbitmq中间件本身.包含两部分Exchange和Queue
- Exchange: 交换机,负责把接收到的消息转发给指定队列
- Queue:队列,负责接收消息,并把消息发送给消费者
- Producer: 生产者,生成消息
- Consumer:消费者,消费消息
其他概念
- RoutingKey: 路由键, 交换机在发送消息时可以指定路由键(这里可以拆分出另一个概念BindingKey),队列在接收消息时可以指定路由键
交换器4种类型
- fanout:把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
- direct:把消息路由到BindingKey和RoutingKey完全匹配的队列中。
topic:RoutingKey 为一个 点号'.': 分隔的字符串。比如: java.xiaoka.show
BindingKey和RoutingKey一样也是点号“.“分隔的字符串。
BindingKey可使用 * 和 # 用于做模糊匹配,*匹配一个单词,#匹配多个或者0个
headers:不依赖路由键匹配规则路由消息。是根据发送消息内容中的headers属性进行匹配。性能差,基本用不到
消息模型:
- 基本消息模型:生产 -> 队列 -> 消费
- work消费模型:一个生产 -> 队列 -> 多个消费
- 发布订阅:交换机类型为Fanout(广播) 生产 -> 交换机 -> 队列 -> 多个消费
- Routing路由模型: 交换机类型为Direct 生产 -> 交换机(根据路由键规则转发) -> 多个队列 -> 多个消费
- Topics 通配符模型: 交换机类型为Topics 生产 -> 交换机(根据路由键规则转发可以用类似正则规则匹配) -> 多个队列 -> 多个消费
死信队列:
条件:
- 被拒绝的消息
- channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
- 这里说明一下最后一个参数:如果设置为true,会一直重试。设置为false,如果有死信队列直接丢到死信队列里,如果没有就直接丢弃。
- channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
- 消息超时
- 这里说的超时 是指没有消费者接收的情况下,消息投递超时
- 队列达到最大长度
怎么用?
和springboot集成
1. maven依赖(springboot版本为2.6.6)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐test</artifactId>
</dependency>2. 在application.yml中添加Rabbitmq的配置:
server:
port: 10006
spring:
application:
name: mtc-rabbitmq-producer
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: /
listener:
simple:
acknowledge-mode: manual # 设置手动ack
template:
retry:
enabled: true
initial-interval: 10000ms
max-interval: 300000ms
multiplier: 2
max-attempts: 3
exchange: topic.exchange
publisher-confirm-type: simpletemplate:
retry:失败重试(捕获异常的重试,没有抛出异常此配置不生效)
enabled:开启失败重试
initial-interval:第一次重试的间隔时长
max-interval:最长重试间隔,超过这个间隔将不再重试
multiplier:下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
max-attempts: 尝试重试次数
publisher-confirm-type:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试
3、定义RabbitConifg配置类,配置Exchange、Queue、绑定交换机以及死信队列
package com.example.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
public static final String QUEUE_LOG = "queue_log";
public static final String EXCHANGE_NAME="topic.exchange";
public static final String ROUTING_KEY_LOG="topic.#.log.#";
public static final String QUEUE_LOG_DEAD = "queue_log_dead";
public static final String EXCHANGE_LOG_DEAD_NAME="topic.exchange.dead";
public static final String ROUTING_KEY_LOG_DEAD="topic.dead";
/**
* 交换机
* @return
*/
@Bean(EXCHANGE_NAME)
public Exchange exchange(){
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
/**
* 日志队列
* @return
*/
@Bean(QUEUE_LOG)
public Queue logQueue(){
//这里设置私信队列,超过200s没有人消费/队列长度超过10 都会把消息投递到死信队列
return QueueBuilder.durable(QUEUE_LOG)
.ttl(200000)
.maxLength(10)
.deadLetterExchange(EXCHANGE_LOG_DEAD_NAME)
.deadLetterRoutingKey(ROUTING_KEY_LOG_DEAD).build();
}
/**
* 路由规则
*/
@Bean
public Binding bindingLog(@Qualifier(QUEUE_LOG) Queue queue,
@Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_LOG).noargs();
}
//声明死信Exchange
@Bean(EXCHANGE_LOG_DEAD_NAME)
public TopicExchange deadLetterExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_LOG_DEAD_NAME).durable(true).build();
}
@Bean(QUEUE_LOG_DEAD)
public Queue deadLetterQueue(){
return QueueBuilder.durable(QUEUE_LOG_DEAD).build();
}
@Bean
public Binding deadLetterQueueBinding(@Qualifier(QUEUE_LOG_DEAD)Queue deadLetterQueue, @Qualifier(EXCHANGE_LOG_DEAD_NAME) TopicExchange deadLetterExchange){
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(ROUTING_KEY_LOG_DEAD);
}
}
4、生产者
package com.example.demo;
import com.example.demo.config.RabbitmqConfig;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
class DemoApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMsgByTopics(){
/**
* 参数:
* 1、交换机名称
* 2、routingKey
* 3、消息内容
*/
for (int i=0;i<5;i++){
String message = "日志来了"+i;
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"topic.log",message);
System.out.println(" [x] Sent '" + message + "'");
}
}
}
5、消费者
package com.example.demo.rabbitmq;
import com.example.demo.config.RabbitmqConfig;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ReceiveHandler {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_mtc", durable = "true"),
exchange = @Exchange(
value = RabbitmqConfig.EXCHANGE_NAME,
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {RabbitmqConfig.ROUTING_KEY_LOG}))
public void receiveLog(String msg){
System.out.println(" log received : " + msg + "!");
}
@RabbitListener(queues = RabbitmqConfig.QUEUE_LOG_DEAD)
public void deadLetterQueue(String msg, Channel channel, Message message) throws IOException {
System.out.println("死信队列消费消息:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
遇到的问题以及如何解决?
RabbitMQ 如何持久化
- exchange 持久化,在声明时指定 durable 为 true
- queue 持久化,在声明时指定 durable 为 true
- message 持久化,在投递时指定 delivery_mode=2(1是非持久化)
代码中体现
//交换机持久化
channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);
//队列持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
//消息持久化 (第一种)
channel.basicPublish("", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
//消息持久化 (第二种)
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
properties.deliveryMode(2); // 设置消息是否持久化,1: 非持久化 2:持久化
channel.basicPublish(EXCHANGE_NAME, "", properties.build(), message.getBytes("UTF-8"));
消息堆积怎么办?
通过线程池,异步消费
版权声明:本文为lixiaoyi01原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。