【归纳总结】微服务之RocketMQ

简介

1️⃣ 消息中间件是什么

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进
行分布式系统的集成。

2️⃣ RocketMQ是什么?

RocketMQ是阿⾥巴巴开源的⼀个消息中间件,是⼀个队列模型的消息中间件,具有高性能、高可靠、
⾼实时、分布式特点。目前已贡献给apache


功能

1️⃣ 异步化

将⼀些可以进行异步化的操作通过发送消息来进行异步化,提高效率

具体场景:⽤户为了使⽤某个应⽤,进⾏注册,系统需要发送注册邮件并验证短信。对这两个操作的处
理⽅式有两种:串⾏及并⾏。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2️⃣ 限流削峰

在⾼并发场景下把请求存⼊消息队列,利⽤排队思想降低系统瞬间峰值

具体场景:购物⽹站开展秒杀活动,⼀般由于瞬时访问量过⼤,服务器接收过⼤,会导致流量暴增,相
关系统⽆法处理请求甚⾄崩溃。⽽加⼊消息队列后,系统可以从消息队列中取数据,相当于消息队列做
了⼀次缓冲。
在这里插入图片描述
优点

  1. 请求先⼊消息队列,⽽不是由业务处理系统直接处理,做了⼀次缓冲,极⼤地减少了业务处理系统
    的压⼒;
  2. 队列⻓度可以做限制,事实上,秒杀时,后⼊队列的⽤户⽆法秒杀到商品,这些请求可以直接被抛
    弃,返回活动已结束或商品已售完信息;

模型

1️⃣ 相关概念

  • Producer: 消息⽣产者,负责消息的产⽣,由业务系统负责产⽣
  • Consumer:消息消费者,负责消息消费,由后台业务系统负责异步消费
  • Topic:消息的逻辑管理单位

这三者是RocketMq中最最基本的概念。Producer是消息的⽣产者。Consumer是消息的消费者。
消息通过Topic进⾏传递。Topic存放的是消息的逻辑地址。
在这里插入图片描述

2️⃣ 概念模型

在这里插入图片描述

  • Broker: 消息的中转⻆⾊,负责存储消息,转发消息,⼀般也称为server,可以理解为⼀个存放消
    息的服务,⾥⾯可以有多个Topic
  • MessageQueue: 消息的物理管理单位,⼀个Topic下有多个Queue,默认⼀个Topic创建时会创建
    四个MessageQueue
  • ConsumerGroup: 具有同样消费逻辑消费同样消息的Consumer,可以归并为⼀个group
  • ProducerGroup: 具有同样属性的⼀些Producer可以归并为同⼀个Group
    同样属性是指:发送同样Topic类型的消息
  • Nameserver : 注册中⼼
    作⽤:
    每个Broker启动的时候会向namesrv注册
    Producer发送消息的时候根据Topic获取路由到Broker⾥⾯Topic的信息
    Consumer根据Topic到Namesrv 获取topic的路由到Broker的信息

3️⃣ 部署模型

在这里插入图片描述

  1. 注册中⼼Nameserver启动
  2. 消息中转服务Broker启动
    启动的时候会去创建Topic并创建对应的MessageQueue
    启动的时候会去注册中⼼注册,把⾃⼰的地址以及负责的Topic告诉注册中⼼
    Broker和Nameserver之间通过⼼跳机制来检测对⽅是否存活
  3. 消息⽣产者Produer启动
    启动的时候会去注册中⼼注册
    注册那些内容呢?
    ①把⾃⼰的IP地址告诉注册中⼼
    ②把⾃⼰⽣产的Topic告诉注册中⼼
    运⾏时:
    单个⽣产者者和⼀台nameserver保持⻓连接,定时查询topic配置信息,如果该
    nameserver挂掉,⽣产者会⾃动连接下⼀个nameserver,直到有可⽤连接为⽌,并能
    ⾃动重连。
    单个⽣产者和该⽣产者关联的所有broker保持⻓连接。
    默认情况下,⽣产者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着
    某个broker如果宕机,⽣产者最多要30秒才能感知,在此期间,发往该broker的消息发
    送失败。该时间可⼿动配置
    默认情况下,⽣产者每隔30秒向所有broker发送⼼跳,该时间由DefaultMQProducer
    的heartbeatBrokerInterval参数决定,可⼿动配置。broker每隔10秒钟(此时间⽆法更
    改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超
    过2分钟,此时间⽆法更改)没有发送⼼跳数据,则关闭连接
  4. 消息消费者Consumer启动
    启动的时候会去注册中⼼注册
    注册哪些内容呢?
    ①把⾃⼰的IP地址告诉注册中⼼
    ②把⾃⼰可以消费的Topic告诉注册中⼼
    运⾏时:
    单个消费者和⼀台nameserver保持⻓连接,定时查询topic配置信息,如果该
    nameserver挂掉,消费者会⾃动连接下⼀个nameserver,直到有可⽤连接为⽌,并能
    ⾃动重连。
    单个消费者和该消费者关联的所有broker保持⻓连接。
    默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着
    某个broker如果宕机,客户端最多要30秒才能感知。该时间由
    DefaultMQPushConsumer的pollNameServerInteval参数决定,可⼿动配置。
    默认情况下,消费者每隔30秒向所有broker发送⼼跳,该时间由
    DefaultMQPushConsumer的heartbeatBrokerInterval参数决定,可⼿动配置。
    broker每隔10秒钟(此时间⽆法更改),扫描所有还存活的连接,若某个连接2分钟内
    (当前时间与最后更新时间差值超过2分钟,此时间⽆法更改)没有发送⼼跳数据,则
    关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续

使用

1️⃣ Producer发送消息

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class BasicProducer {

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {

        // 1. 创建一个消息生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("test_basic_producer");

        // 2. 设置producer连接那个nameserver的地址
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 3. 启动消息生产者
        producer.start();

        //4.  准备好待发送的消息
        String topic = "test_t";
        Message message = new Message();
        // 设置消息的Topic
        message.setTopic(topic);
        // 向消息对象中,放入我们实际的数据
        message.setBody("hello, rocketMq".getBytes());

        message.putUserProperty("sendTime", System.currentTimeMillis() + "");

        // 5. 利用消息生产者,将消息发送出去
        SendResult send = producer.send(message);
        System.out.println(send);

        if (send.getSendStatus().equals(SendStatus.SEND_OK)) {
            System.out.println("消息发送成功,消息id为" + send.getMsgId());
            return;
        }

        System.out.println("消息发送失败" + send.getMsgId());
    }
}

2️⃣ Consumer消费消息

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class BasicConsumer {

    public static void main(String[] args) throws MQClientException {

        // 创建一个消息消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_basic_consumer");

        // 设置消费者和nameserver建立连接的地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 向Rocket订阅指定主题的消息
        consumer.subscribe("test_t", "*");

        // 注册消息消费的监听器(实现消息的消费逻辑)
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt messageExt = msgs.get(0);
                String sendTime = messageExt.getUserProperty("sendTime");
                try {
                    String format = String.format("消息从发送到接收的时间延时是%dms"
                            ,(System.currentTimeMillis() - Long.parseLong(sendTime)));

                    System.out.println(format);
                } catch (Exception e) {
                    e.printStackTrace();
                }

                // 获取消息的msgId
                String msgId = messageExt.getMsgId();

                // 从获取到的消息对象中,取出我们消息中的数据
                byte[] body = messageExt.getBody();
                String s = new String(body);
                String formatStr = String.format("接收到消息,消息id为%s, 消息内容是%s", msgId, s);
                System.out.println(formatStr);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消息消费者
        consumer.start();



    }
}

版权声明:本文为Octavius_原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。