SpringBoot + Rocket5.0实战—普通消息

本地启动RocketMQ5.0

下载RocketMQ5.0

https://rocketmq.apache.org/zh/docs/quickStart/01quickstart

下载完成后执行bin目录下cmd文件

启动NameServer

start mqnamesrv.cmd

在这里插入图片描述

启动Broker

start mqbroker.cmd

在这里插入图片描述

启动proxy

start mqproxy -n 127.0.0.1:9876

在这里插入图片描述

搭建服务

在这里插入图片描述

导入依赖

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.4</version>
        </dependency>

配置

# 自定义key名称
rocketmq.proxy = 127.0.0.1:8081

配置生产者

package com.xxf.rocketmq.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class RocketMqConfig {

    @Value("${rocketmq.proxy}")
    private String proxy;

    @Bean(name = "mqProducer")
    public Producer mqProducer() {
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(proxy);
        ClientConfiguration configuration = builder.build();
        // 初始化Producer时需要设置通信配置
        try {
            log.info("构造mq5.0生产者:proxy:{}", proxy);
            Producer producer = provider.newProducerBuilder().setClientConfiguration(configuration).build();
            log.info("构造mq5.0生产者成功:proxy:{}", proxy);
            return producer;
        } catch (ClientException e) {
            log.error("初始化生产者异常:", e);
        }
        return null;
    }

}

创建消费者

package com.xxf.rocketmq.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.Collections;

@Slf4j
@Component
public class RocketMq5Consumer {

    @Value("${rocketmq.proxy}")
    private String proxy;

    private final static String MY_GROUP = "myTestGroup";

    private final static String MY_TOPIC = "myTest1Topic";

    @Bean(name = "mqConsumer")
    public void mqConsumer() {
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(proxy);
        ClientConfiguration configuration = builder.build();
        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
        try {
            log.info("构建mq5.0消费者:proxy:{}, topic:{}, group:{}", proxy, MY_TOPIC, MY_GROUP);
            // 订阅消息的过滤规则,表示订阅所有Tag的消息。
            String tag = "*";
            FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
            provider.newPushConsumerBuilder()
                    .setClientConfiguration(configuration)
                    // 设置消费者分组。
                    .setConsumerGroup(MY_GROUP)
                    // 设置预绑定的订阅关系。
                    .setSubscriptionExpressions(Collections.singletonMap("myTest1Topic", filterExpression))
                    // 设置消费监听器。
                    .setMessageListener(messageView -> {
                        log.info("消费消息:{}", messageView);
                        log.info("消息内容为:{}",  StandardCharsets.UTF_8.decode(messageView.getBody()).toString());
                        return ConsumeResult.SUCCESS;
                    }).build();
            log.info("构建mq5.0消费者成功:proxy:{}, topic:{}, group:{}", proxy, MY_TOPIC, MY_GROUP);
        } catch (ClientException e) {
            log.error("构建mq5.0消费者异常:proxy:{}, topic:{}, group:{}", proxy, MY_TOPIC, MY_GROUP, e);
        }
    }
}

编写测试接口

package com.xxf.rocketmq.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

@Slf4j
@RestController
public class Mq5Controller {

    @Resource(name = "mqProducer")
    private Producer mqProducer;

    /**
     * 测试发送消息
     * @return
     * @throws ClientException
     */
    @GetMapping("/5.0/sendMessage")
    public String sendMessage5() throws ClientException {
        String messageStr = "hello rocketMq5.0";
        MessageBuilder messageBuilder = new MessageBuilderImpl();
        //构建消息
        Message message = messageBuilder.setTopic("myTest1Topic").setBody(messageStr.getBytes(StandardCharsets.UTF_8)).build();
        SendReceipt send = mqProducer.send(message);
        log.info("mq5.0消息发送成功:{}", send);
        return "ok";
    }
}

启动服务

在这里插入图片描述

发送消息

http://127.0.0.1:8080/5.0/sendMessage

控制台日志

17:30:31.861 [http-nio-8080-exec-1] INFO  c.x.r.c.Mq5Controller - [sendMessage5,35] - mq5.0消息发送成功:SendReceiptImpl{messageId=011C697AF19CED4AAC0415FB3700000000}
17:30:31.864 [RocketmqMessageConsumption-1-44] INFO  c.x.r.s.RocketMq5Consumer - [lambda$mqConsumer$0,50] - 消费消息:MessageViewImpl{messageId=011C697AF19CED4AAC0415FB3700000000, topic=myTest1Topic, bornHost=DESKTOP-VIJT4QM, bornTimestamp=1678008631821, endpoints=ipv4:127.0.0.1:8081, deliveryAttempt=1, tag=null, keys=[], messageGroup=null, deliveryTimestamp=null, properties={}}
17:30:31.864 [RocketmqMessageConsumption-1-44] INFO  c.x.r.s.RocketMq5Consumer - [lambda$mqConsumer$0,51] - 消息内容为:hello rocketMq5.0

版权声明:本文为qq_28014809原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。