RocketMQ-安装部署教程
前期准备
环境准备
Java8安装
Linux系统
安装包
rocketmq-4.2.0.zip(http://rocketmq.apache.org/dowloading/releases/)
- Binary: rocketmq-all-4.2.0-bin-release.zip [PGP] [MD5] [SHA1]
安装
解压缩zip安装包到指定文件夹
unzip rocketmq-all-4.2.0-bin-release.zip -d rocketmq-4,2,0
部署
参考mq官方quickstart文档 :http://rocketmq.apache.org/docs/quick-start/
启动Name Server
> nohup sh bin/mqnamesrv & > tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success...
注:启动过程中可能报错显示内存不足,报错信息如下:
# There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (mmap) failed to map 33554432 bytes for committing reserved memory. |
需修改bin文件夹下的runserver.sh文件,改成
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=320m" |
xms:初始化内存 xmx:最大内存 xmn:新生代内存 permsize:永久区 maxPermSize:最大永久区
一般建议是按官方推荐的内存设置,否则可能发挥不出mq的性能。
同理,修改runbroker.sh文件,改成
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m |
再同理,修改tools.sh文件,改成
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m" |
启动Broker
> nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true & > tail -f ~/logs/rocketmqlogs/broker.log The broker[%s, 172.30.30.233:10911] boot success... |
注:官网上启动Broker用的命令就是nohup sh bin/mqbroker -n localhost:9876 &
,这样启动后在后来运行Producer发送的时候会出现
com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest1 See http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist for further details. at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:542) ... |
autoCreateTopicEnable在broker配置文件里面默认是false,如果在配置文件里面改成true,就可以直接用nohup sh bin/mqbroker -n localhost:9876 &
启动了。
发送&接收信息
在发送/接收消息之前,我们需要告诉客户端服务器的ip。RocketMQ提供了多种实现此目的的方法。为简单起见,我们使用环境变量namesrvaddr
> export NAMESRV_ADDR=localhost:9876 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK, msgId= ... > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ConsumeMessageThread_%d Receive New Messages: [MessageExt... |
看见日志飞速刷,就是启动成功了。
关闭服务器
> sh bin/mqshutdown broker The mqbroker(36695) is running... Send shutdown request to mqbroker(36695) OK > sh bin/mqshutdown namesrv The mqnamesrv(36664) is running... Send shutdown request to mqnamesrv(36664) OK |
本地测试类
ProducerTest——生产者
public class ProducerTest { private static DefaultMQProducer producer = null; public static void main(String[] args) { System.out.print("[----------]Start"); boolean result = false; try { ProducerStart(); System.out.println("producer 启动成功"); SendMessage("qch_20170706", "hello rocketmq!"); }finally { producer.shutdown(); } System.out.print("[----------]Succeed"); } private static boolean ProducerStart() { producer = new DefaultMQProducer("pro_qch_test"); producer.setNamesrvAddr("119.23.211.22:9876"); producer.setInstanceName(UUID.randomUUID().toString()); producer.setVipChannelEnabled(false); try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); return false; } return true; } private static boolean SendMessage(String topic, String str) { Message msg = new Message(topic, str.getBytes()); try { producer.send(msg); } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace(); return false; } return true; } } |
ConsumerTest——消费者
public class ConsumerTest { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("con_qch_test"); consumer.setInstanceName(UUID.randomUUID().toString()); consumer.setConsumeMessageBatchMaxSize(32); consumer.setNamesrvAddr("119.23.211.22:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt me : list) { System.out.print(new String(me.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); try { consumer.subscribe("qch_20170706", "*"); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } } |
注:在本地启动的时候,可以会出现这个错误
需要在conf/broker.conf文件里面添加brokerIP1=你的服务器IP,然后启动broker的时候用
sh bin/mqbroker -n localhost:9876 autoCrenohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true -c ~/software/rocketmq-4.2.0/conf/broker.conf & |
看一下broker.log文件
2018-05-09 19:43:45 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=CID_ONS-HTTP-PROXY, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true] 2018-05-09 19:43:45 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=FILTERSRV_CONSUMER, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true] 2018-05-09 19:43:45 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=CID_ONSAPI_PULL, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true] 2018-05-09 19:43:45 INFO main - load /root/store/config/subscriptionGroup.json OK 2018-05-09 19:43:45 INFO main - load /root/store/config/consumerFilter.json OK 2018-05-09 19:43:45 INFO main - load /root/store/config/delayOffset.json OK 2018-05-09 19:43:46 INFO main - Set user specified name server address: localhost:9876 2018-05-09 19:43:46 INFO PullRequestHoldService - PullRequestHoldService service started 2018-05-09 19:43:47 INFO main - register broker to name server localhost:9876 OK 2018-05-09 19:43:47 INFO main - The broker[broker-a, 119.23.211.22:10911] boot success. serializeType=JSON and name server is localhost:9876 2018-05-09 19:43:56 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes 2018-05-09 19:43:56 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 974140 bytes 2018-05-09 19:43:57 INFO BrokerControllerScheduledThread1 - register broker to name server localhost:9876 OK |
再看一下nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
命令启动的broker的log日志
2018-05-09 19:39:02 INFO main - register broker to name server localhost:9876 OK 2018-05-09 19:39:02 INFO main - The broker[izwz9f1jdvkslcnjfonvbiz, 172.18.213.113:10911] boot success. serializeType=JSON and name server is localhost:9876 2018-05-09 19:39:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes 2018-05-09 19:39:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 974140 bytes 2018-05-09 19:39:12 INFO BrokerControllerScheduledThread1 - register broker to name server localhost:9876 OK |
貌似多加载了config文件夹下的几个配置。
真是个深渊巨坑。。。。。。。。。。
END
参考:https://yestermorrow.github.io/2018/05/08/RocketMQ-安装部署教程