文章目录
前提
创建一个 Maven 的 Java 工程。
引入 RocketMQ 的 Client 依赖,版本需要和服务端一致。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
</dependency>
一、普通消息
RocketMQ 提供三种方式来发送普通消息:同步(Sync)发送、异步(Async)发送和单向(Oneway)发送。
1、同步发送消息
Producer 发出⼀条消息后,会在收到 MQ 返回的 ACK 之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。
同步发送应用场景广泛,例如通知邮件、报名短信通知等。

同步消息发送生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 创建一个producer,参数为Producer Group名称
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 设置当发送失败时重试发送的次数,默认为2次
producer.setRetryTimesWhenSendFailed(3);
// 设置发送超时时限为5s,默认3s
producer.setSendMsgTimeout(5000);
// 开启生产者
producer.start();
// 生产并发送100条消息
for (int i = 0; i < 100; i++) {
// Message Body可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预。
// 需要Producer与Consumer协商好一致的序列化和反序列化方式。
byte[] body = ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET);
Message msg = new Message("LearnTopic", "DonTag", body);
// 为消息指定key
// 设置代表消息的业务关键属性,请尽可能全局唯一。
msg.setKeys("key-" + i);
// 同步发送消息
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
// 关闭producer
producer.shutdown();
}
}
消息发送的状态
package org.apache.rocketmq.client.producer;
public enum SendStatus {
SEND_OK, // 发送成功
FLUSH_DISK_TIMEOUT,// 刷盘超时。当Broker设置的刷盘策略为同步刷盘时才可能出 现这种异常状态。异步刷盘不会出现
FLUSH_SLAVE_TIMEOUT,// Slave同步超时。当Broker集群设置的Master-Slave的复 制方式为同步复制时才可能出现这种异常状态。异步复制不会出现
SLAVE_NOT_AVAILABLE;// 没有可用的Slave。当Broker集群设置为Master-Slave的 复制方式为同步复制时才可能出现这种异常状态。异步复制不会出现
private SendStatus() {
}
}
2、异步发送消息
Producer 发出消息后无需等待 MQ 返回 ACK,直接发送下⼀条消息。Producer 通过回调接口接收 MQ 响应,并处理响应结果。该方式的消息可靠性可以得到保障,消息发送效率也可以。
一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

异步消息发送生产者
package com.learn.rocketmq.simple;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 创建一个producer,参数为Producer Group名称
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 指定异步发送失败后不进行重试发送
producer.setRetryTimesWhenSendAsyncFailed(0);
// 指定新创建的Topic的Queue数量为2,默认为4
producer.setDefaultTopicQueueNums(2);
// 开启生产者
producer.start();
final CountDownLatch countDownLatch = new CountDownLatch(100);
for (int i = 0; i < 100; i++) {
// Message Body可以是任何二进制形式的数据,消息队列RocketMQ版不做任何干预。
// 需要Producer与Consumer协商好一致的序列化和反序列化方式。
byte[] body = ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET);
try {
final int index = i;
Message msg = new Message("LearnTopic", "DonTag-Async", body);
// 异步发送。指定回调
producer.send(msg, new SendCallback() {
// 当producer接收到MQ发送来的ACK后就会触发该回调方法的执行
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
3、单向发送消息
Producer 仅负责发送消息,不等待。该发送方式时 MQ 不返回 ACK。该方式的消息发送效率最高,但消息可靠性较差。此方式发送消息的过程耗时非常短,一般在微秒级别。
一般用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

单向消息发送生产者
package com.learn.rocketmq.simple;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class OnewayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
byte[] body = ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET);
Message msg = new Message("LearnTopic", "DonTag-Oneway", body);
msg.setKeys("key-" + i);
// 单向发送
// 由于在oneway方式发送消息时没有请求应答处理,如果出现消息发送失败,则会因为没有重试而导致数据丢失。
// 若数据不可丢,建议选用可靠同步或可靠异步发送方式。
producer.sendOneway(msg);
}
producer.shutdown();
System.out.println("producer shutdown");
}
}
4、消息消费者
package com.learn.rocketmq.simple;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 定义一个push消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 指定nameServer
consumer.setNamesrvAddr("localhost:9876");
// 指定从第一条消息开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 指定消费topic与tag
consumer.subscribe("LearnTopic", "*");
// 指定采用“广播模式”进行消费,默认为“集群模式”
// consumer.setMessageModel(MessageModel.BROADCASTING);
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
// 一旦broker中有了其订阅的消息就会触发该方法的执行,
// 其返回值为当前consumer消费的状态
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 逐条消费消息
for (MessageExt msg : msgs) {
System.out.println(msg);
}
// 返回消费状态:消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 开启消费者消费
consumer.start();
System.out.println("Consumer Started");
}
}
5、三种发送方式对比
| 发送方式 | 速度 | 结果反馈 | 可靠性 |
|---|---|---|---|
| 同步发送 | 快 | 有 | 不丢失 |
| 异步发送 | 快 | 有 | 不丢失 |
| 单向发送 | 最快 | 无 | 可能丢失 |
版权声明:本文为weixin_46812727原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。