消息队列 RocketMQ:(三)发送普通消息(三种方式)

文章目录

消息队列 RocketMQ:(一)概述

消息队列 RocketMQ:(二)系统架构


前提

创建一个 Maven 的 Java 工程。
引入 RocketMQ 的 Client 依赖,版本需要和服务端一致。

	<dependency>
	    <groupId>org.apache.rocketmq</groupId>
	    <artifactId>rocketmq-client</artifactId>
	    <version>4.9.0</version>
	</dependency>

一、普通消息

RocketMQ 提供三种方式来发送普通消息:同步(Sync)发送、异步(Async)发送和单向(Oneway)发送。

1、同步发送消息

Producer 发出⼀条消息后,会在收到 MQ 返回的 ACK 之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。

同步发送应用场景广泛,例如通知邮件、报名短信通知等。

在这里插入图片描述

同步消息发送生产者

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 创建一个producer,参数为Producer Group名称
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        // 指定nameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 设置当发送失败时重试发送的次数,默认为2次
        producer.setRetryTimesWhenSendFailed(3);
        // 设置发送超时时限为5s,默认3s
        producer.setSendMsgTimeout(5000);

        // 开启生产者
        producer.start();

        // 生产并发送100条消息
        for (int i = 0; i < 100; i++) {
            // Message Body可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预。
            // 需要Producer与Consumer协商好一致的序列化和反序列化方式。
            byte[] body = ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET);
            Message msg = new Message("LearnTopic", "DonTag", body);
            // 为消息指定key
            // 设置代表消息的业务关键属性,请尽可能全局唯一。
            msg.setKeys("key-" + i);
            // 同步发送消息
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult);
        }
        // 关闭producer
        producer.shutdown();
    }
}

消息发送的状态

package org.apache.rocketmq.client.producer;

public enum SendStatus {
    SEND_OK, // 发送成功
    FLUSH_DISK_TIMEOUT,// 刷盘超时。当Broker设置的刷盘策略为同步刷盘时才可能出 现这种异常状态。异步刷盘不会出现
    FLUSH_SLAVE_TIMEOUT,// Slave同步超时。当Broker集群设置的Master-Slave的复 制方式为同步复制时才可能出现这种异常状态。异步复制不会出现
    SLAVE_NOT_AVAILABLE;// 没有可用的Slave。当Broker集群设置为Master-Slave的 复制方式为同步复制时才可能出现这种异常状态。异步复制不会出现

    private SendStatus() {
    }
}

2、异步发送消息

Producer 发出消息后无需等待 MQ 返回 ACK,直接发送下⼀条消息。Producer 通过回调接口接收 MQ 响应,并处理响应结果。该方式的消息可靠性可以得到保障,消息发送效率也可以。

一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

在这里插入图片描述

异步消息发送生产者

package com.learn.rocketmq.simple;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        // 创建一个producer,参数为Producer Group名称
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        // 指定nameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 指定异步发送失败后不进行重试发送
        producer.setRetryTimesWhenSendAsyncFailed(0);
        // 指定新创建的Topic的Queue数量为2,默认为4
        producer.setDefaultTopicQueueNums(2);

        // 开启生产者
        producer.start();

        final CountDownLatch countDownLatch = new CountDownLatch(100);
        for (int i = 0; i < 100; i++) {
            // Message Body可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预。
            // 需要Producer与Consumer协商好一致的序列化和反序列化方式。
            byte[] body = ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET);
            try {
                final int index = i;
                Message msg = new Message("LearnTopic", "DonTag-Async", body);
                // 异步发送。指定回调
                producer.send(msg, new SendCallback() {
                    // 当producer接收到MQ发送来的ACK后就会触发该回调方法的执行
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });

            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}

3、单向发送消息

Producer 仅负责发送消息,不等待。该发送方式时 MQ 不返回 ACK。该方式的消息发送效率最高,但消息可靠性较差。此方式发送消息的过程耗时非常短,一般在微秒级别。

一般用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

在这里插入图片描述

单向消息发送生产者

package com.learn.rocketmq.simple;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class OnewayProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++) {
            byte[] body = ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET);
            Message msg = new Message("LearnTopic", "DonTag-Oneway", body);
            msg.setKeys("key-" + i);
            // 单向发送
            // 由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。
            // 若数据不可丢,建议选用可靠同步或可靠异步发送方式。
            producer.sendOneway(msg);
        }
        producer.shutdown();
        System.out.println("producer shutdown");
    }
}

4、消息消费者

package com.learn.rocketmq.simple;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        // 定义一个push消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        // 指定nameServer
        consumer.setNamesrvAddr("localhost:9876");
        // 指定从第一条消息开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 指定消费topic与tag
        consumer.subscribe("LearnTopic", "*");
        // 指定采用“广播模式”进行消费,默认为“集群模式”
        // consumer.setMessageModel(MessageModel.BROADCASTING);

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            // 一旦broker中有了其订阅的消息就会触发该方法的执行,
            // 其返回值为当前consumer消费的状态
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                // 逐条消费消息
                for (MessageExt msg : msgs) {
                    System.out.println(msg);
                }
                // 返回消费状态:消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 开启消费者消费
        consumer.start();
        System.out.println("Consumer Started");
    }
}

5、三种发送方式对比

发送方式速度结果反馈可靠性
同步发送不丢失
异步发送不丢失
单向发送最快可能丢失

RocketMQ Simple Message Example


版权声明:本文为weixin_46812727原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。