RocketMq使用教程

简介:

基于windows 下的使用教程

一、启动服务端

1-下载安装rocket

https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.0/rocketmq-all-4.9.0-bin-release.zip

2-配置环境变量:

ROCKETMQ_HOME =(解压位置) D:\rocket\rocketmq-all-4.9.0-bin-release

3-启动 mqnamesrv 服务

进入 D:\rocket\rocketmq-all-4.9.0-bin-release\bin 下 执行 mqnamesrv.cmd 启动服务 
或黑窗口执行 .\mqnamesrv 命令

正常启动显示:
在这里插入图片描述

异常处理:
1)如果一闪而过 重启电脑试试
2)启动后如果报错“无法加载类 xxx”的话

a)把java_home 的空格去掉 比如 program Files 或者直接改成没有空格的路径下
b)runbroker.cmd 和 runserver.cmd 文件下 classpath  添加英文双引号

在这里插入图片描述

4-启动 broker 服务 进入安装目录bin下执行以下命令

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=ture
或执行 .\mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true
正常显示:
在这里插入图片描述

5-启动rocketmq后台(可选)


```java
1)下载console代码
链接: https://pan.baidu.com/s/1ee80ppXmJ_IY8R7FefThjA 提取码: zhfy 
2)修改D:\rocket\rocketmq-externals-master\rocketmq-console\src\main\resources\application.properties 文件  

在这里插入图片描述

3)启动项目

   a-重新打包 
   在\rocketmq-console项目下  mvn clean package -Dmaven.test.skip=true
   b-启动项目
   在\rocketmq-console\target目录下 执行  java -jar rocketmq-console-ng-1.0.0.jar
4)访问后台
 localhost:serverPort 
 http://localhost:8090/

二、编写服务端代码

项目代码:
https://github.com/lixiao04/RocketMq.git

1-集成springmvc

1-pom引入
 		<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>
2-生产者注入
package com.example.demo.rocketmq;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
@Component
@Slf4j
public class Producer {
    @Value(value = "${rocketmq.namesrv}")
    private  String nameservAddr;
    @Value(value = "${rocketmq.group}")
    private  String group;
    @Value(value = "${rocketmq.topic}")
    private  String topic;
    @Value(value = "${rocketmq.tag}")
    private  String tag;
    private  DefaultMQProducer producer = null;
    @PostConstruct
    public  void Producer() throws MQClientException {
        producer = new DefaultMQProducer(group);
        producer.setNamesrvAddr(nameservAddr);
        producer.setInstanceName("producer");
        producer.setRetryTimesWhenSendFailed(3);
        producer.start();
        log.info("生产者启动成功!");
    }

    public DefaultMQProducer getProducer() {
        return producer;
    }
}

3-消费者注入
package com.example.demo.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;
@Component
@Slf4j
public class Consumer {
    @Value(value = "${rocketmq.namesrv}")
    private  String nameservAddr;
    @Value(value = "${rocketmq.group}")
    private  String group;
    @Value(value = "${rocketmq.topic}")
    private  String topic;
    @Value(value = "${rocketmq.tag}")
    private  String tag;
    DefaultMQPushConsumer consumer=null;
    @PostConstruct
    public  void consumer() throws MQClientException {
         consumer = new DefaultMQPushConsumer(group);
        consumer.setNamesrvAddr(nameservAddr);
        consumer.setConsumerGroup(group);

        consumer.setInstanceName("consumer");

        consumer.subscribe(topic, tag);
        //第一次是从对头开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //消费者一次最多可消费多少条
        consumer.setConsumeMessageBatchMaxSize(100);
        //集群模式消费失败默认重试16次,延迟等级为3~18。(messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h")
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //最多重试次数
        consumer.setMaxReconsumeTimes(6);


        consumer.registerMessageListener(new MessageListenerConcurrently()
        {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    //获取重试次数
                    int reconsumeTimes = msg.getReconsumeTimes();
                    //获取topic
                    String topic1 = msg.getTopic();
                    //获取tag
                    String tags = msg.getTags();
                    System.out.println("消息id:" + msg.getMsgId() + "---" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        log.info("消费者启动成功");
    }
}

4-代码中调用

在这里插入图片描述

注意rocketmq服务端的版本和客户端的版本不一致也会无法创建topic
也就是当初你下载的客户端是哪个版本的,需要在服务端引入相应版本的jar包,否则设置了 autoCreateTopicEnable=true 也会无法自动创建新的topic

在这里插入图片描述


版权声明:本文为weixin_41572449原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。