rocketmq消息回溯实践

回溯消息主要使用场景就是重复消费一次指定时间后产生的相关topic的消息,需要在生产、消费时设置指定的参数,开启消息轨迹相关的配置,才能够使用。

broker.conf 添加配置项

# 开启消息轨迹
traceTopicEnable=true

注入Producer

@Bean(name = "traceProducer")
public DefaultMQProducer traceProducer() throws MQClientException {
	// 主要是第二个参数true是开启消息轨迹回溯,第三个参数是指定回溯的消息topic
    DefaultMQProducer producer = new DefaultMQProducer("trace-producer-group", true, "trace-topic");
    producer.setNamesrvAddr(nameServer);
    producer.start();
    return producer;
}

生产消息代码

@PostMapping("/trace-messages")
public ResponseResult trace(@RequestBody AccountVo accountVo) {
    try {
        Message message = new Message();
        message.setBody(accountVo.getUserId().getBytes());
        message.setTopic("topic-test");
        message.setTags("trace");
        try {
            traceProducer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("消息结果:{}", sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                    log.error("生产消息失败:{}", throwable.getCause());
                }
            });
        } catch (MQClientException e) {
            log.error("{}", e);
            throw new BaseException("生产消息失败");
        }
        return ResultUtil.success("生产消息成功", null);
    } catch (Exception e) {
        e.printStackTrace();
    }

    return ResultUtil.error("生产消息成功", null);
}

正常消费者

 @Bean
public DefaultMQPushConsumer traceConsumer() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("topic-test-consumer-group", true, "trace-topic");
    consumer.setNamesrvAddr(nameServer);
    consumer.subscribe("topic-test", "trace");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.setMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            if (!CollectionUtils.isEmpty(list)) {
                try {
                    for (MessageExt messageExt : list) {
                        log.info("消费消息:{}", messageExt);
                    }
                } catch (Exception e) {
                    log.error("处理异常:{}", e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    return consumer;
}

专门用于回溯消息的消费者

@Bean
public DefaultMQPushConsumer traceTopicConsumer() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("trace-topic-consumer-group", true);
    consumer.setNamesrvAddr(nameServer);
    consumer.subscribe("trace-topic", "*");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    // 必须是按照下面格式   年月日时分秒 如:20200722110701
    consumer.setConsumeTimestamp("20200722110701");
    consumer.setMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            if (!CollectionUtils.isEmpty(list)) {
                try {
                    for (MessageExt messageExt : list) {
                        log.info("回溯消息:{}", messageExt);
                    }
                } catch (Exception e) {
                    log.error("处理异常:{}", e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    return consumer;
}

版权声明:本文为Kevin_King1992原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。