回溯消息主要使用场景就是重复消费一次指定时间后产生的相关topic的消息,需要在生产、消费时设置指定的参数,开启消息轨迹相关的配置,才能够使用。
broker.conf 添加配置项
# 开启消息轨迹
traceTopicEnable=true
注入Producer
@Bean(name = "traceProducer")
public DefaultMQProducer traceProducer() throws MQClientException {
// 主要是第二个参数true是开启消息轨迹回溯,第三个参数是指定回溯的消息topic
DefaultMQProducer producer = new DefaultMQProducer("trace-producer-group", true, "trace-topic");
producer.setNamesrvAddr(nameServer);
producer.start();
return producer;
}
生产消息代码
@PostMapping("/trace-messages")
public ResponseResult trace(@RequestBody AccountVo accountVo) {
try {
Message message = new Message();
message.setBody(accountVo.getUserId().getBytes());
message.setTopic("topic-test");
message.setTags("trace");
try {
traceProducer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息结果:{}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("生产消息失败:{}", throwable.getCause());
}
});
} catch (MQClientException e) {
log.error("{}", e);
throw new BaseException("生产消息失败");
}
return ResultUtil.success("生产消息成功", null);
} catch (Exception e) {
e.printStackTrace();
}
return ResultUtil.error("生产消息成功", null);
}
正常消费者
@Bean
public DefaultMQPushConsumer traceConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("topic-test-consumer-group", true, "trace-topic");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("topic-test", "trace");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (!CollectionUtils.isEmpty(list)) {
try {
for (MessageExt messageExt : list) {
log.info("消费消息:{}", messageExt);
}
} catch (Exception e) {
log.error("处理异常:{}", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
return consumer;
}
专门用于回溯消息的消费者
@Bean
public DefaultMQPushConsumer traceTopicConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("trace-topic-consumer-group", true);
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("trace-topic", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 必须是按照下面格式 年月日时分秒 如:20200722110701
consumer.setConsumeTimestamp("20200722110701");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (!CollectionUtils.isEmpty(list)) {
try {
for (MessageExt messageExt : list) {
log.info("回溯消息:{}", messageExt);
}
} catch (Exception e) {
log.error("处理异常:{}", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
return consumer;
}
版权声明:本文为Kevin_King1992原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。