【笔记】RabbitMq常见消息模型-SpringAMQP-helloWord-工作队列-发布订阅模式-消息幂等,消息堆积,顺序消费

常见消息模型

基本消息队列
工作消息队列

在这里插入图片描述

发布订阅
含有交换机,完整的消息驱动模型
在这里插入图片描述

案例示范

helloWord案例

基本的模型,只包含三个角色,生产者队列消费者
在这里插入图片描述
demo地址:
https://blog.csdn.net/m0_49194578/article/details/122247212
如果遇到不显示统计信息可以参考
https://www.freesion.com/article/20611339568/

SpringAMQP

在这里插入图片描述
SpringAMQP底层就是对RabiitMQ的封装
在这里插入图片描述

配置

导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

修改配置文件

server:
  port: 8080
spring:
  rabbitmq:
    host: 159.203.111.111
    port: 5672
    virtual-host: / #虚拟主机
    username: api
    password: 12312312

helloword模型

生产者

@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitTest {
    @Resource
    private RabbitTemplate rabbitTemplate;

	@Test
    public void test(){
    String queue = "helloQueue";
    String msg = "我是来自AMQP的消息";
    rabbitTemplate.convertAndSend(queue,msg);
    }
}

这是刚才的原api的消费者收到的
在这里插入图片描述

消费者

主动接收消息

 public String receive(String queueName){
        Message receive = rabbitTemplate.receive(queueName);
        if (receive==null||receive.getBody()==null)
            return "";
        return new String(receive.getBody());
    }

接收效果
在这里插入图片描述
这里有一个问题,由于这里只是主动接收,我们总不能死循环一直调用吧,所以我们需要使用另外一个方法,监听这这个队列;
方法如下
监听队列

@Component
@Slf4j
public class SpringRabbitListener {
    
    // 声明监听的队列
    @RabbitListener(queues = {"helloQueue","bagaQueue"})
    public void listenSimpleQueueMessage(String message){
        log.info("接收到了消息:【{}】",message);
    }
}

然后我使用原来api的方法进行发送消息,接收效果如下
在这里插入图片描述

工作队列模型

![在这里插入图片描述](https://img-blog.csdnimg.cn/9acfd0e52f27449fb9710abd931880
其实就是使用多个消费者同时监听一个队列
接收消息

消费者

@Component
@Slf4j
public class SpringRabbitListener {

    @RabbitListener(queues = "helloQueue")
    public void listenSimpleQueueMessage1(String message){
        log.info("消费者1接收到了消息:【{}】",message);
    }

    @RabbitListener(queues = "helloQueue")
    public void listenSimpleQueueMessage2(String message){
        log.info("消费者2接收到了消息:【{}】",message);
    }
}

消息发送

for (int i = 0; i < 20; i++) {
    channel.basicPublish("", queueName, null, (LocalTime.now() + ":【"+i+"】"+msg).getBytes());
    Thread.sleep(500);
}

接收效果
在这里插入图片描述

使用注解自动创建队列,有就加入,没有就新建

@RabbitListener(
            queuesToDeclare = @Queue(name = "helloQueue1",durable = "false",autoDelete = "true"))
    public void listenSimpleQueueMessage1(String message) throws InterruptedException {
        log.info("消费者1接收到了消息:【{}】",message);
    }

预取机制

可以看到默认是平均分配;这是默认的预取机制,这样有一个坏处,如果有某一台消费服务处理的比较慢,那么就会使得整体处理时间降低;

我们期望的效果是,处理快的服务器多处理几条处理慢的少处理几条
预取会导致消费者会预先取很多条消息,然后再处理,这里我们只需要修改最大能预取的上限就可以了,这里我们设置为1,也就是说处理完手头的工作才能处理下一条,默认是无限的

server:
  port: 8080
spring:
  rabbitmq:
    host: 159.203.91.216
    port: 5672
    virtual-host: / #虚拟主机
    username: api
    password: leavemealone
    listener:
      simple:
        prefetch: 1
logging:
  pattern:
    dateformat: yyyy-MM-dd HH:mm:ss:SSS

我们修改配置文件,并且给消费者设置一个休眠时间模拟处理快慢的服务器

@RabbitListener(queues = "helloQueue")
public void listenSimpleQueueMessage1(String message) throws InterruptedException {
    log.info("消费者1接收到了消息:【{}】",message);
    Thread.sleep(50);
}
@RabbitListener(queues = "helloQueue")
public void listenSimpleQueueMessage2(String message) throws InterruptedException {
    log.error("消费者2接收到了消息:【{}】",message);
    Thread.sleep(200);
}

执行效果
在这里插入图片描述

发布订阅模型

在这里插入图片描述
在这里插入图片描述

案例:FanoutExchange类型交换机

FanoutExchange会把消息分发给绑定在交换机上的每一个队列
在这里插入图片描述

在这里插入图片描述

创建配置类,声明FanoutExcahnge,Queue,Bindig,完成绑定

@Configuration
public class FanoutConfig {
	// 配置类执行完成以后rabbit就拥有了这个交换机和队列

    // 声明FanoutExchange交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("doria.fanout");
    }

    // 声明第一个队列
    @Bean(value = "doriaQueue1")
    public Queue fanoutQueue1() {
        return new Queue("doria.queue1");
    }

    // 声明第二个队列
    @Bean(value = "doriaQueue2")
    public Queue fanoutQueue2() {
        return new Queue("doria.queue2");
    }

   /**
 * 绑定队列和交换机的方法
 * 这里自动根据bean名称完成注入到方法参数并执行bean程序完成绑定,注意这里如果指定的bean名称出现异常
 * 就会默认按类型注入,然后发现两个一样类型的bean从而报错,方法名或者name和value值可以指定bean名称
 * Queue对象必须在Bena容器重否则会报错,根虚拟机"/"没有xxx队列
 * @param doriaQueue1   队列
 * @param fanoutExchange 交换机
 * @return
 */
    @Bean
    public Binding bindingQueue1(Queue doriaQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(doriaQueue1).to(fanoutExchange);
    }
    @Bean
    public Binding bindingQueue2(Queue doriaQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(doriaQueue2).to(fanoutExchange);
    }
}

启动服务
查看管理平台

队列
在这里插入图片描述
交换机
在这里插入图片描述
点击进入交换机可以看到
在这里插入图片描述

绑定完成以后进行消息的收发

消息接收(消费者)

@Component
@Slf4j
public class SpringRabbitListener {

    // 这里写的就是我们绑定到交换机的队列的名称.原来我们绑定这个,当消息发送给队列的时候就能收到。
    // 现在我们绑定了交换机,我们依然监听这个队列,需要改变的是我们发送时,这次我们发送给了交换机
    // 交换机收到了消息就会发送给我们这个队列,所以只需要改变发送到交换机就可以了
    @RabbitListener(queues = "doria.queue1")
    public void listenFanoutQueue1(String message){
        log.info("接收到了doria.queue1的消息:【{}】",message);
    }

    // 这里写的就是我们绑定到交换机的队列的名称
    @RabbitListener(queues = "doria.queue2")
    public void listenFanoutQueue2(String message){
        log.info("接收到了doria.queue2的消息:【{}】",message);
    }
}

消息发送(生产者)

@Test
public void sendToFanout() {
    String exchangeName = "doria.fanout"; // 交换机名称
    String message = "发送给所有人的消息!!!";
    // 路由名称,路由键,消息;;;路由键以后再解释,这里先不写
    rabbitTemplate.convertAndSend(exchangeName,"",message);
}

效果如图:
两个队列都收到了消息
在这里插入图片描述
在这里插入图片描述

发布订阅更简单的写法

https://blog.csdn.net/Va1mZzz/article/details/120834699

@RabbitListener( // 配置监听
        bindings = @QueueBinding(   // 配置队列绑定
                value = @Queue(   // 配置队列
                        value = "doria.queue.annotation",   // 队列名称
                        durable = "true",   // 是否持久化,重启rabbitmq后队列是否继续存在
                        autoDelete = "false"   // 当所有消费者客户端都断开链接后,是否自动删除队列.true是,false否(默认)
                ),
                exchange = @Exchange(   // 配置交换机
                        value = "doria.fanout",   // 配置交换机名称
                        type = ExchangeTypes.FANOUT,   // 配置交换机类型
                        durable = "true", // 是否持久化,重启rabbitmq后交换机是否继续存在 
                        autoDelete = "false" // 当所有绑定队列都不再使用时是否自动删除交换机.true是,false否(默认)
                ),
                key = "r"   // 路由键
        )
)
public void listenFanoutQueueAnnotation(String message){
    log.info("接收到了doria.queue注解的消息:【{}】",message);
}

能和前面两个队列一样从相同的交换机中读取消息
在这里插入图片描述

一个注解,内部声明了队列,并建立绑定关系,就是这么神奇!!!

注意@QueueBinding注解的三个属性:

value: @Queue 注解,用于声明队列,value 为 queueName, durable 表示队列是否持久化, autoDelete 表示没有消费者之后队列是否自动删除
exchange: @Exchange 注解,用于声明 exchange, type 指定消息投递策略,我们这里用的 topic 方式
key: 在 topic 方式下,这个就是我们熟知的 routingKey
以上,就是在队列不存在时的使用姿势,看起来也不复杂

相关报错

**1.**如果声明的交换机是一个现有的交换机,而现有的交换机类型,以及其他配置和这次在注解中声明的类型不一致时会爆出这个异常在这里插入图片描述
如果你是新建交换机,则不会出现,总之,这个功能将会在交换机没有时自动创建交换机,如果有同名交换机,就会直接使用他,如果同名交换机但是有配置不一样,就会抛出异常
**2.**同样的,队列也遵从这一点,如果出现同名队列但是配置不一样,也会出现异常
在这里插入图片描述

案例:DirectExchangelexicon交换机

在这里插入图片描述
描述:
消息在发布的时候可以指定一个routingKey,队列在声明绑定关系的时候也可以指定一个bindingKey,一个队列和交换机之间可以指定多个key,不同的队列之间也可以指定相同的key,也就是说它只要给所有队列指定相同的key就可以模范Fanout路由器了,只要key能命中就有资格路由,路由会把消息路由给所有key值匹配队列
为什么不给队列直接指定key?
因为一个队列可以绑定多个交换机,如果队列的key写死了,那他在和别的交换机绑定的时候怎么办,还是只能是这个key吗,所以这里叫做bindKey,一个交换机和一个队列有一个绑定Key交换机遵循key对消息进行路由

消息接收(消费者)

配置声明交换机队列绑定规则以及配置
这里我们就使用注解完成绑定了!:
不用去麻烦的配置了,直接在监听器上面描述,而且当监听器或者路由不存在的时候就会自动创建

@RabbitListener(
        // 一个监听器也可以绑定多个路由器和队列的关系
        bindings = {
                @QueueBinding(
                        value = @Queue(
                                name = "red.queue.red",
                                autoDelete = "false",
                                durable = "true"
                        ),
                        exchange = @Exchange(
                                name = "direct.exchange",
                                type = ExchangeTypes.DIRECT,
                                autoDelete = "false",
                                durable = "true"
                        ),
                        key = {"red","all"}
                )
        })
public void directListenerRed(String message) {
    log.info("[红方]收到了消息:【{}】",message);
}
@RabbitListener(
        // 一个监听器也可以绑定多个路由器和队列的关系
        bindings = {
                @QueueBinding(
                        value = @Queue(
                                name = "red.queue.blue",
                                autoDelete = "false",
                                durable = "true"
                        ),
                        exchange = @Exchange(
                                name = "direct.exchange",
                                type = ExchangeTypes.DIRECT,
                                autoDelete = "false",
                                durable = "true"
                        ),
                        key = {"blue","all"}
                )
        })
public void directListenerBlue(String message) {
    log.info("[蓝方]收到了消息:【{}】",message);
}

消息发送(生产者)

发送消息到交换机根据bindingKey进行路由

/**
 * 发送消息到direct路由
 *
 * @param routingKey 路由key,对应绑定时指定的bindingKey
 * @param message    消息内容
 * @return 发送结果
 */
public boolean directSender(String routingKey, String message) {
    String exchangeName = "direct.exchange";
    rabbitTemplate.convertAndSend(exchangeName,routingKey,message);
    return true;
}

发送测试
三次发送分别为

  • key:red ,msg:红方你好
  • key:blue ,msg:蓝方你好
  • key:all , msg:大家好

发送结果:
在这里插入图片描述

案例:TopicExchange类型交换机

在这里插入图片描述
这个模式其实就相当于direct的升级版,当我们绑定多个的时候可以使用topic模式进行简化;
在队列和交换机进行绑定时,指定的bindingKey,可以使用通配符进行匹配,匹配到的就会进行路由分发;
*#:0或者多个单词
:一个单词

虽然发送的api和direct都是一样的,但是如果你使用derect的路由绑定依然不会触发,,因为路由类型不一样,它依然会全词匹配,只有启用Topic模式的交换机才会触发正则匹配
在这里插入图片描述

消息接收(消费者)

@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(name = "listenerA",durable = "true",autoDelete = "false"),
                exchange = @Exchange(name = "topic.exchange",durable = "true",autoDelete = "false",type = ExchangeTypes.TOPIC),
                key = {"#.new","japan.weather"}
        ))
public void listenerA(String message){
    log.info("我是A听众,我的订阅规则[{\"#.new\",\"japan.weather\"}],收到的消息是:{}",message);
}
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(name = "listenerB",durable = "true",autoDelete = "false"),
                exchange = @Exchange(name = "topic.exchange",durable = "true",autoDelete = "false",type = ExchangeTypes.TOPIC),
                key = {"#.weather","china.#"}
        ))
public void listenerB(String message){
    log.info("我是B听众,我的订阅规则[{\"#.weather\",\"china.#\"}],收到的消息是:{}",message);
}

消息发送(生产者)

/**
 * 发送消息到topic路由
 * @param topicRoutingKey 匹配规则,格式为xxx.xxx.xxx
 * @param message 发送的消息
 */
public void topicSender(String topicRoutingKey, String message) {
    String exchangeName = "topic.exchange";
    rabbitTemplate.convertAndSend(exchangeName, topicRoutingKey, message);
}

效果:
测试1
在这里插入图片描述
在这里插入图片描述
发送japan.new日本新闻,监听所有新闻#.new的听众A收到新闻!
测试2
在这里插入图片描述
在这里插入图片描述
发送日本天气信息,订阅所有天气信息#.wearther的听众B和订阅日本天气信息japan.weather的听众A收到消息;
测试3
在这里插入图片描述
在这里插入图片描述

发送中国新闻,订阅中国所有信息china.#的B听众和订阅所有新闻#.new的A听众收到消息

消息转换器

注意不要使用自动创建队列同时监听的注解,那样一会发送出去就看不到了,会被秒接收然后报错,因为那边我们写的是String类型接收
在这里插入图片描述
可以看到,被jdk序列化了,jdk序列化长度太长而且不易阅读,而且性能低。我们引入自己的jackson
在这里插入图片描述
在这里插入图片描述
依赖

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

添加配置类

@Configuration
public class RabbitMQConfig {

    /**
     * 消息序列化转换
     *
     * @return
     */
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

}

在这里插入图片描述可以看到消息已经变成Json格式了,消息转换回来同样也需要配置这个bean,注意
消息接收

@RabbitListener(
            queuesToDeclare = @Queue(name = "helloQueue1",durable = "false",autoDelete = "false"))
    public void listenSimpleQueueMessage1(Map<String,Object> message) throws InterruptedException {
        log.info("消费者1接收到了消息:【{}】",message);
    }

什么类型发的就什么类型接

补充:交换机和队列的操作

删除队列同时删除交换机绑定关系

// 删除队列,并删除绑定关系
rabbitAdmin.deleteQueue("helloQueue1");

删除交换机的队列的绑定关系(保留交换机和队列)

// 删除交换和队列的绑定关系,当然需要从spring重拿bean
        Queue queue1 = applicationContext.getBean("doriaQueue1", Queue.class);
        // 注意这里上下两个bean的名称如果没有设置name就是方法名
        FanoutExchange fanoutExchange = applicationContext.getBean("fanoutExchange", FanoutExchange.class);
        rabbitAdmin.removeBinding(
                BindingBuilder.bind(queue1).to(fanoutExchange)
        );

删除交换机(保留队列)

// 单独删除交换机[这里是交换机名称,不是交换机bean的名称]
rabbitAdmin.deleteExchange("doria.fanout");

消息的幂等性,顺序消费,消息堆积等问题

https://www.cnblogs.com/lyraHeartstrings/p/16403715.html


版权声明:本文为m0_49194578原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。