引入RocketMQ会带来的问题,消息重复问题,数据一致性问题,消息顺序问题,消息堆积问题

引入RocketMQ会有哪些问题?

重复消息问题

  • 消息生产者产生了重复的消息
  • 消息消费者确认失败
    • 当消费失败的时候,mq可能会重新发送消息
  • 消息消费者确认超时
    • 超时也会使mq重新发送消息
  • 业务系统主动发起重试

解决方法:最重要的一点是 幂等性

可以使用自带的messageid做唯一索引,在数据库添加一个唯一索引,从数据库层面会报错,避免消息重复。

mysql唯一索引保证表中字段唯一,若存在id则之前操作过,若不存在则执行业务方法。

高并发可能会有同时两个线程进入不存在的逻辑中,同时判断数据库中没有这个数据,同时做业务,由于数据库是有行锁,在java代码中是同时的,但是在MySQL中是排队执行的,从而还是能保证唯一性。

状态机:也是在数据库中加入一个状态字段,在执行业务的时候,要确保状态是正确的,同样是数据库的行锁来保证逻辑的幂等性

也可以使用redis中的setnx key value,在spring集成redis中是new StringRedisTmplete().opsForValue.getAndSet(key,value),如果redis中没有值则返回null,如果有值则返回value,因为redis也是单线程操作,所以也可以解决这个问题,

**ACK:**rocketmq中自动实现了,其他的消息中间件需要手动ACK,业务逻辑完成之后,返回一个消息给mq,告诉mq这次消费已经完成了,这个过程就叫ACK,其实就是在消费者端应答mq。

数据一致性问题

mq消费者处理异常的话就会出现数据一致性问题,可以用全局事务,比如转账扣钱(生产者)和加钱(消费者)

,要么一起失败,要么一起成功

如果是消费失败的情况,可以根据上一个问题解决,消费失败mq会再次发送消息,直到成功,保证幂等性即可

最大重试次数为16,超过则被投递到死信队列中,

投递第一次失败会进入retry队列,超过16次进入DLQ队列,死信队列中可以手动取出来执行

如果是消费成功,但是生产者那边回滚了,可以用事务消息解决这个问题,下图就是事务消息的流程

在这里插入图片描述

消息顺序问题

平时发送消息,是在队列里面,认为队列就是先进先出,其实一个topic不止一个队列,有4个写队列和四个读队列,可能后投入队列的消息被其他队列先取出来。

集群模式消费则有多个线程,所以可能其他队列的消息先出来,

发送端为了维护index的变量,每次通过求模的方式决定消息发送到哪个队列中。

如何保证顺序消息:RocketMQ本身实现了这个功能,生产者发送时,根据订单号求模计算发送到哪个队列,然后所有该订单的消息发送到同一队列(重写计算队列索引),消费者单线程监听,就可以实现统一订单的先进先出。但是相比于之前多线程效率会下降。

消息堆积

消费速度比生产速度慢就会产生堆积。

消费者业务逻辑问题或者硬件问题都可能导致这个问题,或者业务高峰期,数据库过慢也可能

**解决:**首先定位时什么问题导致的堆积,如果之前消费正常,由于消费者内部异常,导致消息堆积,当我们解决消费异常之后,消费速度应该时大于生产者速度的,但是由于积压太多消息,导致新的消息无法即使更新,此时我们需要判断消息的重要程度,如果消息不怎么重要,我们可以删除,如果不能删,则增加消费者集群。如果是业务高峰的话,可以限流,性价比最高。总结:加机器或者删消息或者限流

我们可以删除,如果不能删,则增加消费者集群。如果是业务高峰的话,可以限流,性价比最高。总结:加机器或者删消息或者限流


版权声明:本文为qq_35032539原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。