简介
1️⃣ 消息中间件是什么
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进
行分布式系统的集成。
2️⃣ RocketMQ是什么?
RocketMQ是阿⾥巴巴开源的⼀个消息中间件,是⼀个队列模型的消息中间件,具有高性能、高可靠、
⾼实时、分布式特点。目前已贡献给apache
功能
1️⃣ 异步化
将⼀些可以进行异步化的操作通过发送消息来进行异步化,提高效率
具体场景:⽤户为了使⽤某个应⽤,进⾏注册,系统需要发送注册邮件并验证短信。对这两个操作的处
理⽅式有两种:串⾏及并⾏。



2️⃣ 限流削峰
在⾼并发场景下把请求存⼊消息队列,利⽤排队思想降低系统瞬间峰值
具体场景:购物⽹站开展秒杀活动,⼀般由于瞬时访问量过⼤,服务器接收过⼤,会导致流量暴增,相
关系统⽆法处理请求甚⾄崩溃。⽽加⼊消息队列后,系统可以从消息队列中取数据,相当于消息队列做
了⼀次缓冲。
优点:
- 请求先⼊消息队列,⽽不是由业务处理系统直接处理,做了⼀次缓冲,极⼤地减少了业务处理系统
的压⼒; - 队列⻓度可以做限制,事实上,秒杀时,后⼊队列的⽤户⽆法秒杀到商品,这些请求可以直接被抛
弃,返回活动已结束或商品已售完信息;
模型
1️⃣ 相关概念
- Producer: 消息⽣产者,负责消息的产⽣,由业务系统负责产⽣
- Consumer:消息消费者,负责消息消费,由后台业务系统负责异步消费
- Topic:消息的逻辑管理单位
这三者是RocketMq中最最基本的概念。Producer是消息的⽣产者。Consumer是消息的消费者。
消息通过Topic进⾏传递。Topic存放的是消息的逻辑地址。
2️⃣ 概念模型

- Broker: 消息的中转⻆⾊,负责存储消息,转发消息,⼀般也称为server,可以理解为⼀个存放消
息的服务,⾥⾯可以有多个Topic - MessageQueue: 消息的物理管理单位,⼀个Topic下有多个Queue,默认⼀个Topic创建时会创建
四个MessageQueue - ConsumerGroup: 具有同样消费逻辑消费同样消息的Consumer,可以归并为⼀个group
- ProducerGroup: 具有同样属性的⼀些Producer可以归并为同⼀个Group
同样属性是指:发送同样Topic类型的消息 - Nameserver : 注册中⼼
作⽤:
每个Broker启动的时候会向namesrv注册
Producer发送消息的时候根据Topic获取路由到Broker⾥⾯Topic的信息
Consumer根据Topic到Namesrv 获取topic的路由到Broker的信息
3️⃣ 部署模型

- 注册中⼼Nameserver启动
- 消息中转服务Broker启动
启动的时候会去创建Topic并创建对应的MessageQueue
启动的时候会去注册中⼼注册,把⾃⼰的地址以及负责的Topic告诉注册中⼼
Broker和Nameserver之间通过⼼跳机制来检测对⽅是否存活 - 消息⽣产者Produer启动
启动的时候会去注册中⼼注册
注册那些内容呢?
①把⾃⼰的IP地址告诉注册中⼼
②把⾃⼰⽣产的Topic告诉注册中⼼
运⾏时:
单个⽣产者者和⼀台nameserver保持⻓连接,定时查询topic配置信息,如果该
nameserver挂掉,⽣产者会⾃动连接下⼀个nameserver,直到有可⽤连接为⽌,并能
⾃动重连。
单个⽣产者和该⽣产者关联的所有broker保持⻓连接。
默认情况下,⽣产者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着
某个broker如果宕机,⽣产者最多要30秒才能感知,在此期间,发往该broker的消息发
送失败。该时间可⼿动配置
默认情况下,⽣产者每隔30秒向所有broker发送⼼跳,该时间由DefaultMQProducer
的heartbeatBrokerInterval参数决定,可⼿动配置。broker每隔10秒钟(此时间⽆法更
改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超
过2分钟,此时间⽆法更改)没有发送⼼跳数据,则关闭连接 - 消息消费者Consumer启动
启动的时候会去注册中⼼注册
注册哪些内容呢?
①把⾃⼰的IP地址告诉注册中⼼
②把⾃⼰可以消费的Topic告诉注册中⼼
运⾏时:
单个消费者和⼀台nameserver保持⻓连接,定时查询topic配置信息,如果该
nameserver挂掉,消费者会⾃动连接下⼀个nameserver,直到有可⽤连接为⽌,并能
⾃动重连。
单个消费者和该消费者关联的所有broker保持⻓连接。
默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着
某个broker如果宕机,客户端最多要30秒才能感知。该时间由
DefaultMQPushConsumer的pollNameServerInteval参数决定,可⼿动配置。
默认情况下,消费者每隔30秒向所有broker发送⼼跳,该时间由
DefaultMQPushConsumer的heartbeatBrokerInterval参数决定,可⼿动配置。
broker每隔10秒钟(此时间⽆法更改),扫描所有还存活的连接,若某个连接2分钟内
(当前时间与最后更新时间差值超过2分钟,此时间⽆法更改)没有发送⼼跳数据,则
关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续
使用
1️⃣ Producer发送消息
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class BasicProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 1. 创建一个消息生产者对象
DefaultMQProducer producer = new DefaultMQProducer("test_basic_producer");
// 2. 设置producer连接那个nameserver的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 3. 启动消息生产者
producer.start();
//4. 准备好待发送的消息
String topic = "test_t";
Message message = new Message();
// 设置消息的Topic
message.setTopic(topic);
// 向消息对象中,放入我们实际的数据
message.setBody("hello, rocketMq".getBytes());
message.putUserProperty("sendTime", System.currentTimeMillis() + "");
// 5. 利用消息生产者,将消息发送出去
SendResult send = producer.send(message);
System.out.println(send);
if (send.getSendStatus().equals(SendStatus.SEND_OK)) {
System.out.println("消息发送成功,消息id为" + send.getMsgId());
return;
}
System.out.println("消息发送失败" + send.getMsgId());
}
}
2️⃣ Consumer消费消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class BasicConsumer {
public static void main(String[] args) throws MQClientException {
// 创建一个消息消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_basic_consumer");
// 设置消费者和nameserver建立连接的地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 向Rocket订阅指定主题的消息
consumer.subscribe("test_t", "*");
// 注册消息消费的监听器(实现消息的消费逻辑)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0);
String sendTime = messageExt.getUserProperty("sendTime");
try {
String format = String.format("消息从发送到接收的时间延时是%dms"
,(System.currentTimeMillis() - Long.parseLong(sendTime)));
System.out.println(format);
} catch (Exception e) {
e.printStackTrace();
}
// 获取消息的msgId
String msgId = messageExt.getMsgId();
// 从获取到的消息对象中,取出我们消息中的数据
byte[] body = messageExt.getBody();
String s = new String(body);
String formatStr = String.format("接收到消息,消息id为%s, 消息内容是%s", msgId, s);
System.out.println(formatStr);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消息消费者
consumer.start();
}
}
版权声明:本文为Octavius_原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。