maven依赖和配置参考RocketMQ代码实战(一):使用rocketmq-spring-boot-starter发送和消费消息
在前面的文章中,我们都是通过rocketMQTemplate.syncSend发送的消息都是同步消息,即需要发送消息成功后才可以继续往下执行。
然而在一些对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。那么我们就需要异步消息了。
上代码:
@RestController
@Slf4j
public class RocketMqController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("sendMqAsync")
public Object sendMqAsync() {
MqMessage message = MqMessage.builder().name("异步消息").msg("这是异步消息").build();
rocketMQTemplate.asyncSend(MqUtil.async_topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送成功:{}",JSON.toJSONString(sendResult));
//可以处理相应的业务
}
@Override
public void onException(Throwable throwable) {
//可以处理相应的业务
}
});
return null;
}
}常量如下:
public class MqUtil {
public static final String async_topic = "async_topic";
}其中我们通过org.apache.rocketmq.spring.core.RocketMQTemplate#asyncSend(String destination, Object payload, SendCallback sendCallback)发送异步消息
其中SendCallback 用于异步处理消息发送的结果。
消费者如下:
@Slf4j
@Component
@RocketMQMessageListener(
topic = MqUtil.async_topic,
consumerGroup = "async_consumer_group")
public class AsyncListener implements RocketMQListener<MqMessage> {
@Override
public void onMessage(MqMessage message) {
log.info("{}收到消息:{}", this.getClass().getSimpleName(), message);
}
}访问:http://127.0.0.1:8080/sendMqAsync 发送成功后能够正确打印发送结果
版权声明:本文为kusedexingfu原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。