RocketMQ代码实战(五):发送异步消息

maven依赖和配置参考RocketMQ代码实战(一):使用rocketmq-spring-boot-starter发送和消费消息

 

在前面的文章中,我们都是通过rocketMQTemplate.syncSend发送的消息都是同步消息,即需要发送消息成功后才可以继续往下执行。

然而在一些对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。那么我们就需要异步消息了。

上代码:

@RestController
@Slf4j
public class RocketMqController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    @GetMapping("sendMqAsync")
    public Object sendMqAsync() {
        MqMessage message = MqMessage.builder().name("异步消息").msg("这是异步消息").build();
        rocketMQTemplate.asyncSend(MqUtil.async_topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("发送成功:{}",JSON.toJSONString(sendResult));
                //可以处理相应的业务
            }

            @Override
            public void onException(Throwable throwable) {
                //可以处理相应的业务
            }
        });

        return null;
    }

}

常量如下:

public class MqUtil {

    public static final String  async_topic = "async_topic";
}

其中我们通过org.apache.rocketmq.spring.core.RocketMQTemplate#asyncSend(String destination, Object payload, SendCallback sendCallback)发送异步消息

其中SendCallback 用于异步处理消息发送的结果。

消费者如下:

@Slf4j
@Component
@RocketMQMessageListener(
        topic = MqUtil.async_topic,
        consumerGroup = "async_consumer_group")
public class AsyncListener implements RocketMQListener<MqMessage> {
    @Override
    public void onMessage(MqMessage message) {
        log.info("{}收到消息:{}", this.getClass().getSimpleName(), message);
    }
}

访问:http://127.0.0.1:8080/sendMqAsync 发送成功后能够正确打印发送结果


版权声明:本文为kusedexingfu原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。