rocketmq源码分析目录
- 一、基本与使用
- 二、rocketmq源码分析
- 总结
一、基本概念与使用

补充:消息类型还包括批量消息
技术架构:
rocketmq核心组成:nameserver、broker、producer、consumer组成。
部署架构:

集群工作流程:
- 1.启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- 2.Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 3.收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- 4.Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
rocketmq server搭建
1.从github clone 源码,导入idea
2.打开idea 项目的mvn终端 输入如下命令:
mvn -Prelease-all -DskipTests clean install -U
3.进入/Users/xx/Documents/github_opensource/rocketmq/distribution/target/rocketmq-4.9.1-SNAPSHOT/rocketmq-4.9.1-SNAPSHOT 目录如下:
4.进入/Users/xx/Documents/github_opensource/rocketmq/distribution/target/rocketmq-4.9.1-SNAPSHOT/rocketmq-4.9.1-SNAPSHOT/bin目录
5.启动 nameserver与broker
nohup sh bin/mqnamesrv &nohup sh bin/mqbroker -n localhost:9876 &报错如下:
ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!
ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!
解决方法:修改如下3个文件
修改[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=“你的jdk安装路径”
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_271.jdk/Contents/Home
注释掉如下
#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
#[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
再次启动nameserver 和broker成功如下提示:
6.停止server
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
7.mqadmin:是RocketMQ 提供的控制台及一系列控制台命令,用于管理员对主题,集群,broker 等信息的管理
sh mqadmin 查看帮助
sh mqadmin help updateTopic :查看updateTopic 的使用
8.使用 rocketmq-console 开源项目添加MQ监控告警
利用rocketmq-console做如下的监控:
- RocketMQ 消费者下线
- RocketMQ 消息出现长时间或者大量堆积
rocketmq-console的监控告警功能:
作为mqadmin的GUI封装,具备了mqadmin的功能,也提供了一些额外的功能,如dashboard面板统计
rocketmq-console监控原理:
如何开启告警功能:
从码云中获取源码,rocketmq-externals 地址:https://gitee.com/mirrors/RocketMQ-Externals.git
导入idea ,修改application.properties文件,增加nameserver地址 localhost:9876
server.address=0.0.0.0
server.port=8080
### SSL setting
#server.ssl.key-store=classpath:rmqcngkeystore.jks
#server.ssl.key-store-password=rocketmq
#server.ssl.keyStoreType=PKCS12
#server.ssl.keyAlias=rmqcngkey
#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.level.root=INFO
logging.config=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=localhost:9876
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
#Dashbord文件目录,登录用户配置文件所在目录
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
#set the message track trace topic if you don't want use the default one
rocketmq.config.msgTrackTopicName=
rocketmq.config.ticketKey=ticket
#Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
#开启登录功能
rocketmq.config.loginRequired=false
#set the accessKey and secretKey if you used acl
#rocketmq.config.accessKey=
#rocketmq.config.secretKey=将项目打成jar包,并运行jar文件
执行:
mvn clean package -Dmaven.test.skip=truejava -jar target/rocketmq-console-ng-2.0.0.jar#如果配置文件没有填写Name Server的话,可以在启动项目时指定namesrvAddr
$ java -jar target/rocrocketmq-console-ng-2.0.0.jar --rocketmq.config.namesrvAddr='localhost:9876'
执行成功,访问localhost:8080,成功如下:

开启监控告警:

重新打包,执行
mvn clean package -Dmaven.test.skip=true出现如下错误:
Failed to execute goal org.apache.maven.plugins:maven-checkstyle-plugin:2.17 错误
解决办法:
1.确定pom.xml 中的rocketmq版本
2.删除 maven-checkstyle-plugin 中标红部分该部分
org.apache.maven.pluginsgroupId>
maven-checkstyle-plugin
2.17version>
validateid>
validatephase>
src/main/resourcesexcludes>
style/rmq_checkstyle.xmlconfigLocation>
UTF-8encoding>
trueconsoleOutput>
truefailsOnError>
configuration>
checkgoal>
goals>
execution>
executions>
plugin>
仅剩如下:
重新打包成功
java -jar rocketmq-console-ng-2.0.0.jar
访问成功!
开启定时任务监控,扫描实时数据,做阈值判断,告警提示
当此项功能被放开后,在Consumer菜单下,为每一个consumer-group 的operation 会增加MONITOR CONFIG 选项,如下图所示:
minCount 当前消费分组的机器数量最小阈值,低于此值将会告警
minCount 当前消费分组允许的最大消息堆积量,高于辞职将会告警
默认情况下,rocketmq-console只定义了定时任务入口,具体的策略没有任何处理,我们需要根据自己的需求加入自身的告警方式,比如:邮箱,钉钉,短信,微信等等。
其预留的定时任务实现类为:
org.apache.rocketmq.console.task.MonitorTask
定时任务的扫描频率可根据自身系统要求考量设置
修改MonitorTask代码,增加自己的告警系统:钉钉、短信、、、

重新编译打包。

rocketmq与activemq、kafka对比:

rocketmq生态

二、rocketmq源码分析
待续。。。。
参考资料:https://github.com/apache/rocketmq/tree/master/docs/cn
https://gitee.com/mirrors/RocketMQ-Externals.git
https://rocketmq-1.gitbook.io/rocketmq-connector/
https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md
