消息中间件MQ rocketMQ学习笔记

中文文档在此https://github.com/apache/rocketmq/tree/master/docs/cn
rocket与其他产品结合的一些外部应用:https://github.com/apache/rocketmq-externals

在什么场景下使用了MQ,为什么要使用MQ

我只知道是分布式事务的解决……
异步:之前要同步执行的代码可以变成异步;要同步等待的代码会变成异步,可以去做其他的事情
解耦:同步变成异步之后,自然就解耦了,producer和consumer分开,producer只负责向broker生产消息,consumer订阅消息,并且处理
削峰填谷:这producer如果生产大量的请求直接打向系统,系统也扛不住,但是打向的是消息中间件,用消息中间件去抗高并发
同时也是为了避免微服务中的服务雪崩问题,以及解决微服务之间系统和系统接口耦合度高问题
所以目前解决服务雪崩问题的方法:1.熔断降级2.消息队列

中间件名词解释

还是参照文档理解的
producer,consumer,broker 三大基本组件
name server 相当于是broker的路由(注册中心)
拉取式消费/推动式消费 就是consumer自己去要数据还是broker主动给consumer推数据
生产者组/消费者组 生产者消费者集群
集群消费/广播消费 消费者组中的每一个消费者,是获得等额的分配的消息,还是获得全部消息
普通顺序消息/严格顺序消息 在接受不同消息队列中的消息时,是有序的还是无序的

RocketMQ架构

在这里插入图片描述
hhh从github盗的图,从上面可以看出来主要四个部分 producer,consumer,name server,broker
然后producer consumer都是有集群的 broker里也是有主从分布的

架构设计

先略……

服务端架设rocketmq nameserver和broker

先下载bin-realease这个压缩包,然后解压缩,到bin目录下,开启nameserver和broker

开启nameserver 遇到的问题

使用的是学生白嫖的云服务器嘛,这个错误是在报无法分配该大小的堆,所以去到rocket mq的配置这里,修改一下配置即可
bin目录下runbroker.sh runserver.sh
在这里插入图片描述
这谁顶得住啊

在这里插入图片描述

rocketmq 注册topic ,topic分片的含义

跑这个helloworld样例的时候,被告知,因此要在broker那里去创建该topic
(为啥文档里面没有讲服务器端broker的配置啊……)
哦哦,找到了,http://rocketmq.apache.org/docs/quick-start/,讲服务器端的配置
在这里插入图片描述
在一个topic分片又会有很多个queue,然后会将这些queue均分给consumer (集群消费)
Queue是Topic在一个Broker上的分片等分为指定份数后的其中一份,是负载均衡过程中资源分配的基本单元。
在一个Consumer Group内,Queue和Consumer之间的对应关系是一对多的关系:一个Queue最多只能分配给一个Consumer,一个Cosumer可以分配得到多个Queue。这样的分配规则,每个Queue只有一个消费者,可以避免消费过程中的多线程处理和资源锁定,有效提高各Consumer消费的并行度和处理效率。

No route info of this topic

这个问题我开了broker的autoCreateTopicEnable=true之后再启动,也报错了,然后去查broker日志

2021-04-09 10:48:40 ERROR DiskCheckScheduledThread1 - Error when measuring disk space usage, file doesn’t exist on this path: /root/store/commitlog

storePathCommitLog这个文件路径不存在?
github上也有人提出这个 https://github.com/apache/rocketmq/issues/2663
这个bug在github上的版本已经更新了,但是在Apache官网上下载的版本尚未更新

也可以进行紧急处理如直接mkdir 对应的文件夹

还没解决……

查了一下name server的日志,也没啥问题啊
云服务器的9876端口是不是在安全组里没有开放……我去查查
但是在本机内开启producer测试也报错了……
在这里插入图片描述
又是容量问题……真的折磨,继续改配置
算了,开2Gswap试试看(所以又有一个问题:swap文件和内存到底区别在哪)
好了,开了2Gswap,问题解决……本机开producer测试通过
但是端口还是得改,这个telnet连接不上去,
我去,我这个不是云服务器,是轻量应用服务器,这个不配置安全组,直接改防火墙就行了……
好了,解决完这个问题,出现了第二个bug

sendDefaultImpl call timeout

这个使用网上说的关防火墙并没有什么用……
最后是参考https://blog.csdn.net/mefly521/article/details/84394483
把name server和broker的ip全部改成公网ip,然后成功了,解决了这个bug……
所以为什么,必须要修改ip,换成公网ip,这个bug……
终于启动起来了

可视化工具

https://github.com/apache/rocketmq-externals
clone这个,然后build这里面的rocketmq-console(跳过test)

mvn clean package -Dmaven.test.skip=true

然后上传jar包即可,执行jar包即可运行
默认密码 admin/admin

SyncProducer和AsyncProducer

SyncProducer向broker发送消息会等待broker响应返回信息sendResult,然后这一条消息的发送才结束。重要数据,必须要即时处理的用这种方式会比较好
AsyncProducer异步,这种是对于时间敏感的消息,发送端不会等待broker长时间的响应,而是发送完结束,然后等待回调函数去处理返回的结果

注意这个返回信息代表的是消息已经被正确持久化到磁盘,不是消息已经被消费!!
这两个都是使用的DefaultMQProducer这个producer,但是只是处理消息的函数不同,是使用返回sendResult的send函数,还是使用注册callback的send函数
DefaultMQProducer类中具体代码如下


	//同步处理消息
    public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.defaultMQProducerImpl.send(msg);
    }
    //异步处理消息
	public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
        this.defaultMQProducerImpl.send(msg, sendCallback);
    }	

对于要使用分布式事务的Producer,要使用TransactionMQProducer

consumer

这里面的消费者DefaultMQPushConsumer,消费者是没有事务的要求的
在这里Consumer类中我没找到持久监听的方法,只找到了注册监听函数的方法
注:Consumer里面找到具体实现,发现里面是有线程池的,所以每声明一个consumer,实际上都是创建一个consumer线程池

所以你在执行文档上提供给你的代码的时候会发现,consumer线程名字是不同的

Java消息服务JMS

这个是不是有点过时了啊……
目前rocketmq使用producer不需要获取connection,直接新建DefaultMQProducer或者TransactionMQProducer即可
这个还在这创建connection session呢hhh

rocketmq和Activemq

activemq的provider生产者是可以选择使用主题topic还是使用队列queue的,消费者监听特定主题,或者监听特定队列,主题是广播消费模式的(所有都广播),队列是负载均衡(集群消费)
rocketmq是基于主题topic的订阅模式,强制使用topic,生产者无法直接连接queue,consumer订阅topic,而每一个topic都会内含多个queue,queue如何使用由用户指定的消息顺序决定
rocketmq如何保证严格顺序消息:每一个topic只使用一个queue
这两个的区别
activemq还是基于JMS的,需要自己手动建立和关闭连接,rocketmq避免了这种问题而是变成了类似启动线程的写法,start和shutdown producer和consumer即可
以及topic的不同

rocketmq通信机制

具体在rocketmq文档的设计部分,我就不复制粘贴了
Name server要与所有人通信,所以设计了新的通信机制
rocketmq消息协议规则 主要介绍一条消息的完整构成
rocketmq消息的通信方式 同步(sync)、异步(async)、单向(oneway) 三种
同步:会一直等待响应返回
异步:client在netty上注册回调函数,然后将消息发送给server,server返回消息给client后,由异步的handler处理消息
单向:不需要response,使用较少

rocketmq消息存储和持久化

存储架构三大部分构成
CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。这个就是储存所有topic消息的,无论是什么都会被放到这里。
ConsumeQueue:ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
consumequeue文件可以看成是基于topic的commitlog索引文件,它实际上相当于保存了该topic的消息在哪个commitLog中,偏移量和长度,使得队列能够直接获取到该消息的具体信息
IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。
IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。
刷盘方式:同步刷盘和异步刷盘 在消息真正持久化至磁盘后才会返回ACK/只要消息写入PageCache即可将成功的ACK返回

rocketmq事务消息

采取的是两阶段法:commit or rollback

Activemq自动签收与手动签收

在事务性会话中,消息的签收与commit一致;在非事务性会话中,是看应答模式是自动签收还是手动签收,手动签收的需要自己调用ack()方法。

Active底层调优之nio协议,auto协议

Activemq消息存储和持久化

Activemq持久化主要是借助外部数据库来进行的……
leveldb+zookeeper是比较推荐的,leveldb这个是可复制的
默认是使用kahadb
还可以使用JDBC或者AMQ(不推荐)

rocketmq在本身的文档中没有介绍这些,但是有一个rocketmq-externals项目,在这里我看到了熟悉的存储支持如redis,mysql,hbase等
https://github.com/apache/rocketmq-externals

Activemq集群:zookeeper+leveldb

延时投递和定时投递


版权声明:本文为weixin_38857307原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。