一、下载rocketmq,本文案例采用的是最新稳定版本rocketmq-all-4.9.3-bin-release,以下是下载路径
Downloading the Apache RocketMQ Releases - Apache RocketMQ
二、准备三台虚拟机安装centos7环境,root密码 root(此处可以根据实际情况设置),IP地址分别为:
192.168.164.128 hadoop01
192.168.164.129 hadoop02
192.168.164.130 hadoop03
使用root账户进入
vim /etc/hosts
三、创建用户
useradd hxroot
password hxroot root(这里根据自己的实际情况设置)
四、系统配置
4.1 设置免密登陆
使用命令ssh-keygen在hadoop01上面生成key。
ssh-keygen
进入/root/.ssh目录下查看是否生成成功
cd /root/.ssh/
出现如下
表示公私密钥生成成功
利用ssh-copy-id分发到其他几台机器
ssh-copy-id hadoop01
ssh-copy-id hadoop02
ssh-copy-id hadoop03
我环境已经分发过,这里就不再演示。主要做的目的是可以在hadoop01上直接ssh 或者scp到另外的机器,不需要输密码,注意:此设置非rocketmq官方要求
4.2 关闭防火墙
分别关闭三台机器的防火墙
systemctl stop firewalld.service
firewall-cmd --state
出现not running表示关闭完成
五、安装jdk
给hxroot用户创建/app目录
上传jdk的tar包
修改~/.bash_profile,配置环境变量。
export JAVA_HOME=/app/jdk1.8/
使用命令 source ~/.bash_profile
使环境生效
具体的jdk安装过程请阅读我的rocketmq单机环境中的安装步骤
上传zip包,再指定目录下解压。
unzip rocketmq-all-4.9.3-bin-release.zip
然后配置环境变量
export ROCKETMQ_HOME=/app/rocketmq/
具体的安装和配置过程请阅读我的另一篇rocketmq单机环境搭建,链接如下:
https://blog.csdn.net/he_xin2009/article/details/124312414?spm=1001.2014.3001.5502https://blog.csdn.net/he_xin2009/article/details/124312414?spm=1001.2014.3001.5502
温馨提示:
RocketMQ在4.5版本之前都不支持master宕机后slave自动切换。在4.5版本后,增加了基于
Dleger实现的主从切换。这里用的目前最新的4.9.3版本
七、配置RocketMQ集群
RocketMQ支持多种集群策略
2m-2s-async(本文采用模式)-2主2从异步刷盘(吞吐量较大,但是消息可能丢失)
2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全)
2m-noslave :2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置
dledger:用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader,
其他的就都是follower。通常正式环境都会采用这种方式来搭建集群。
此次搭建一个2主2从异步刷盘的集群,所以我们会使用conf/2m-2s-async下的配
置文件,实际项目中,为了达到高可用,一般会使用dleger。预备设计的集群情况如下:
机器名称 | nameServer节点部署 | broker节点部署 |
hadoop01 | nameserver | |
hadoop02 | nameserver | broker-a,broker-b-s |
hadoop03 | nameserver | broker-b,broker-a-s |
由于采用的是2m-2s-async,所以需要进入rocketmq的config目录下修改2m-2s-async的配置文件。--只需要配置broker.conf
7.1、配置第一组broker-a
在hadoop02上先配置borker-a的master节点。先配置2m-2s-async/broker-a.properties,cd到文件目录
执行vim broker-a.properties
配置内容如下
#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=hadoop01:9876;hadoop02:9876;hadoop03:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/app/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/app/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/app/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
:wq!保存并退出
该节点对应的从节点在hadoop03上。在hadoop03上cd到指定目录下/app/rocketmq/conf/2m-2s-async下修改2m-2s-async/broker-a-s.properties 只需要修改brokerId和brokerRole
执行
vim broker-a-s.properties
配置内容
#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=hadoop01:9876;hadoop02:9876;hadoop03:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketmq/storeSlave
#commitLog存储路径
storePathCommitLog=/app/rocketmq/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/storeSlave/index
#checkpoint文件存储路径
storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
#abort文件存储路径
abortFile=/app/rocketmq/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker的角色
#- ASYNC_MASTER异步复制Master
#- SYNC_MASTER同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH异步刷盘
#- SYNC_FLUSH同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
7.2、 配置第二组Broker-b
这一组broker的主节点在hadoop03上,所以需要配置hadoop03上的conf/2m-2s-async/broker-b.properties
同上进入到hadoop03的文件目录/app/rocketmq/conf/2m-2s-async/下,执行编辑文件命令
vim broker-b.properties
配置内容
#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=hadoop01:9876;hadoop02:9876;hadoop03:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/app/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/app/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/app/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
:wq!保存并退出
然后第二组hadoop03对应的slave在hadoop02上,修改hadoop02上的conf/2m-2s-async/broker-b-s.properties
同样地进入到hadoop02指定目录/app/rocketmq/conf/2m-2s-async/,执行修改broker-b-s.properties文件的命令
vim broker-b-s.properties
配置内容
#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=hadoop01:9876;hadoop02:9876;hadoop03:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据:wq!业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketmq/storeSlave
#commitLog存储路径
storePathCommitLog=/app/rocketmq/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/storeSlave/index
#checkpoint文件存储路径
storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
#abort文件存储路径
abortFile=/app/rocketmq/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker的角色
#- ASYNC_MASTER异步复制Master
#- SYNC_MASTER同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH异步刷盘
#- SYNC_FLUSH同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
7.3、启动nameServer。
由于本地内存有限,修改三个节点上的bin/runserver.sh,调整里面的jvm内存配置。找到下面这一行调整下内存
JAVA_OPT="${JAVA_OPT}-server -Xms512m -Xmx512m -Xmn256m -
XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
:wq!保存并退出
在hadoop01、hadoop02、hadoop03分别执行启动nameServer的命令
nohup ./mqnamesrv &
执行命令完成后,使用cat nohup.out查看nohup.out,看到下面的一条关键日志就是启动成功了。
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS
collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and
will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
执行命令后通过jps命令查看进程验证是否启动成功,
注意:如果jps命令不存在,这个与jdk有关
执行一下
source ~/.bash_profile
通过日记发现RocketMQ在runserver.sh中是使用的CMS垃圾回收期,而在runbroker.sh中使用
的是G1垃圾回收期
7.4、启动broker
启动broker是使用的mqbroker指令,只是注意启动broker时需要通过-c 指定对应的配置文件。在hadoop02上启动broker-a的master节点和在hadoop03上启动broker-b的slave节点
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties &
在hadoop03上启动broker-b的master节点和在hadoop02上启动broker-a的slave节点
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &
注意:启动slave时,如果遇到报错 Lock failed,MQ already started ,那是因为有多个实例共用了同一个storePath造成的,这时就需要调整store的路径
八、启动状态检查
使用jps指令,能看到一个NameSrvStartup进程和两个BrokerStartup进程。
nohup.out中也有启动成功的日志。
对应的日志文件:
# 查看nameServer日志
tail -500f ~/logs/rocketmqlogs/namesrv.log
# 查看broker日志
tail -500f ~/logs/rocketmqlogs/broker.log
踩坑:如果出现如下错误,检查防火墙是否关闭
九、命令行快速验证
在RocketMQ的安装包中,提供了一个tools.sh工具可以用来在命令行快速验证RocketMQ服务。我们在hadoop02上进入RocketMQ的安装目录:
发送消息:默认会发1000条消息
./bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
接收消息:
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
注意,这是官方提供的Demo,但是官方的源码中,这两个类都是没有指定nameServer的,所以
运行会有点问题。要指定NameServer地址,可以配置一个环境变量NAMESRV_ADDR,这样默认会读取这个NameServer地址。可以配到.bash_profile里或者直接临时指定。
export NAMESRV_ADDR='hadoop01:9876;hadoop02:9876;hadoop03:9876'
然后就可以正常执行了。
表示发送成功,如果在hadoop03机器正常消费,表示集群环境搭建完成