根据官网我们知道rabbitmq支持事务,所以我们能用事务来解决消息丢失的问题(由于事务的性能差所以不推荐使用)
使用事务会降低250倍的性能
推荐使用确认机制
确认机制
RabbitMQ结构
生产者,消费者,交换机Exchange,路由键,队列(queue),通道(channal)
RabbitM消息确认机制
P(消费者)----------->Broker【Exchange---------->Queue】--------------------->C(消费者)
消费者到达RabbitMQ的确认机制为ConfirmCallback
特点:只要消息到达服务器就会触发
服务器接收到消息持久化到硬盘的机制是returnCallback
特点:失败才触发
配置
@Configuration
public class MyRabbitmqConfig {
@Resource
RabbitTemplate rabbitTemplate;
/**
* 使用JSON序列化机制,进行消息转换
* @return
*/
@Bean
MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
* 1.服务收到消息就回调
* 1.spring.rabbitmq.publisher-confirms=true
* 2.设置确认回调ConfirmCallback
* 2.消息正确抵达队列进行回调
* # 开启发送端消息抵达队列确认
* publisher-returns: true
* # 只要抵达队列以异步方法优先回调returnconfirm确认
* template:
* mandatory: true
*
*
*
* 3.消费端确认(保证每一个消息被正确消费,此时才可以broker删除这个消息)
* 1.默认是自动确认的,只要消息接收到,服务端就会移除这个消息
* 问题:
* 收到很多消息,自动回复给服务器ack,只有一个消息处理成功,宕机了,发生消息丢失
* 手动确认
* 只要我们没有明确告诉MQ,货物被签收,没有ACK,消息就一直是unacked状态,即使Consumer宕机
* 消息不会丢失,会变为Ready状态,下一次有新的
* 2.如何签收
*
*/
@PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate(){
//设置确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id)
* @param b 消息是否成功收到
* @param s 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("confirm...."+s);
System.out.println(" ==========>"+b);
}
});
//设置消息抵达队列的确认回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
/**
* 只要消息没有投递给指定的队列就触发这个失败回调
* @param returnedMessage(
* 包含了
* message 投递失败的详细信息
* replyCode 回复的状态码
* replyText 回复的文本内容
* exchange 当时这个消息发给那个交换机
* routingKey 当时这个消息调用那个路由键
* )
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("失败的信息"+returnedMessage.getMessage()+
"replyCode===>"+returnedMessage.getReplyCode()+
"exchange=====>"+returnedMessage.getExchange()+
"routingKey=====>"+returnedMessage.getRoutingKey());
//将没有持久化的消息发送到数据库中
String xx= returnedMessage.getMessage().toString();
String encrypt = MD5.encrypt(xx);
try{
lostMapper.addmessage(encrypt,xx);
}catch (Exception e){
System.out.println("消息已经存入数据库中");
}
}
});
}
}
Service层
@Service
public class GjxServiceImpl implements GjxService {
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
LostService lostService;
@Override
public void sendmessage() {
try {
rabbitTemplate.convertAndSend("zlifeexchange", "zlife", "郭峻旭");
} catch (Exception exception) {
System.out.println("rabbitmq服务器宕机");
String gjx = MD5.encrypt("郭峻旭");
System.out.println(gjx);
try{
lostService.addmessage(gjx, "郭峻旭");
}catch (Exception e){
System.out.println("丢失的消息已经存入数据库当中");
}
}
}
}
主要的思想是如果消息没有发送到RabbitMQ服务器上就将发送的消息存入到数据库中通过定时任务定时将数据库中发送失败的消息重新发送
定时任务
springboot在主启动类上添加注解@EnableScheduling
通过注解@Scheduled完成定时任务
@Service
public class CsImpl implements Cs {
@Autowired
LostMapper lostMapper;
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
LostService lostService;
@Override
@Scheduled(cron ="0/5 * * * * ?")
public void CsText(){
List<Lost> losts = lostMapper.selectList(null);
for (int i = 0; i <losts.size() ; i++) {
System.out.println("====================>");
System.out.println(losts.get(i).getMessage());
String xx=losts.get(i).getMessage();
try {
rabbitTemplate.convertAndSend("zlifeexchange", "zlife", xx);
QueryWrapper queryWrapper=new QueryWrapper();
queryWrapper.like("message",xx);
lostMapper.delete(queryWrapper);
}catch (Exception e){
System.out.println("rabbitmq服务器宕机");
String gjx = MD5.encrypt(xx);
System.out.println(gjx);
try {
lostService.addmessage(gjx, xx);
}catch (Exception exception){
System.out.println("丢失的消息已经存入数据库当中");
}
}
}
}
}
配置文件
server:
port: 8084
spring:
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
username: root
password:
redis:
host: 127.0.0.1
port: 6379
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
# 开启发送端确认机制
publisher-confirm-type: correlated
# 开启发送端消息抵达队列确认
publisher-returns: true
# 只要抵达队列以异步方法优先回调returnconfirm确认
template:
mandatory: true
# 手动ACK 手动签收
listener:
simple:
acknowledge-mode: manual
main:
# 运行循环调用
allow-circular-references: true
#mybatis:
# mapper-locations: classpath:mapper/*.xml
mybatis:
mapper-locations: classpath:mapper/*.xml
logging:
level:
com.gjx: debug

同理加上手动ack机制确保消息不会丢失
消息丢失的问题解决了,又出现了新的问题消息重复消费怎么解决?
例子:比如当我们已经消费了消息还没来得及ack即中间的时间段 RabbitMQ服务器突然宕机这个时候我们明明已经消费了消息但是没来得及ack,这个消息就会重新回到队列当中RabbitMQ服务重新启动后这个消息又会再消费一次,重复消费了
版权声明:本文为GJX00原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。