本地启动RocketMQ5.0
下载RocketMQ5.0
https://rocketmq.apache.org/zh/docs/quickStart/01quickstart
下载完成后执行bin目录下cmd文件
启动NameServer
start mqnamesrv.cmd
启动Broker
start mqbroker.cmd
启动proxy
start mqproxy -n 127.0.0.1:9876
搭建服务
导入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.4</version>
</dependency>
配置
# 自定义key名称
rocketmq.proxy = 127.0.0.1:8081
配置生产者
package com.xxf.rocketmq.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class RocketMqConfig {
@Value("${rocketmq.proxy}")
private String proxy;
@Bean(name = "mqProducer")
public Producer mqProducer() {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(proxy);
ClientConfiguration configuration = builder.build();
// 初始化Producer时需要设置通信配置
try {
log.info("构造mq5.0生产者:proxy:{}", proxy);
Producer producer = provider.newProducerBuilder().setClientConfiguration(configuration).build();
log.info("构造mq5.0生产者成功:proxy:{}", proxy);
return producer;
} catch (ClientException e) {
log.error("初始化生产者异常:", e);
}
return null;
}
}
创建消费者
package com.xxf.rocketmq.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
@Slf4j
@Component
public class RocketMq5Consumer {
@Value("${rocketmq.proxy}")
private String proxy;
private final static String MY_GROUP = "myTestGroup";
private final static String MY_TOPIC = "myTest1Topic";
@Bean(name = "mqConsumer")
public void mqConsumer() {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(proxy);
ClientConfiguration configuration = builder.build();
// 初始化Producer时需要设置通信配置以及预绑定的Topic。
try {
log.info("构建mq5.0消费者:proxy:{}, topic:{}, group:{}", proxy, MY_TOPIC, MY_GROUP);
// 订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
provider.newPushConsumerBuilder()
.setClientConfiguration(configuration)
// 设置消费者分组。
.setConsumerGroup(MY_GROUP)
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap("myTest1Topic", filterExpression))
// 设置消费监听器。
.setMessageListener(messageView -> {
log.info("消费消息:{}", messageView);
log.info("消息内容为:{}", StandardCharsets.UTF_8.decode(messageView.getBody()).toString());
return ConsumeResult.SUCCESS;
}).build();
log.info("构建mq5.0消费者成功:proxy:{}, topic:{}, group:{}", proxy, MY_TOPIC, MY_GROUP);
} catch (ClientException e) {
log.error("构建mq5.0消费者异常:proxy:{}, topic:{}, group:{}", proxy, MY_TOPIC, MY_GROUP, e);
}
}
}
编写测试接口
package com.xxf.rocketmq.controller;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
@Slf4j
@RestController
public class Mq5Controller {
@Resource(name = "mqProducer")
private Producer mqProducer;
/**
* 测试发送消息
* @return
* @throws ClientException
*/
@GetMapping("/5.0/sendMessage")
public String sendMessage5() throws ClientException {
String messageStr = "hello rocketMq5.0";
MessageBuilder messageBuilder = new MessageBuilderImpl();
//构建消息
Message message = messageBuilder.setTopic("myTest1Topic").setBody(messageStr.getBytes(StandardCharsets.UTF_8)).build();
SendReceipt send = mqProducer.send(message);
log.info("mq5.0消息发送成功:{}", send);
return "ok";
}
}
启动服务
发送消息
http://127.0.0.1:8080/5.0/sendMessage
控制台日志
17:30:31.861 [http-nio-8080-exec-1] INFO c.x.r.c.Mq5Controller - [sendMessage5,35] - mq5.0消息发送成功:SendReceiptImpl{messageId=011C697AF19CED4AAC0415FB3700000000}
17:30:31.864 [RocketmqMessageConsumption-1-44] INFO c.x.r.s.RocketMq5Consumer - [lambda$mqConsumer$0,50] - 消费消息:MessageViewImpl{messageId=011C697AF19CED4AAC0415FB3700000000, topic=myTest1Topic, bornHost=DESKTOP-VIJT4QM, bornTimestamp=1678008631821, endpoints=ipv4:127.0.0.1:8081, deliveryAttempt=1, tag=null, keys=[], messageGroup=null, deliveryTimestamp=null, properties={}}
17:30:31.864 [RocketmqMessageConsumption-1-44] INFO c.x.r.s.RocketMq5Consumer - [lambda$mqConsumer$0,51] - 消息内容为:hello rocketMq5.0
版权声明:本文为qq_28014809原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。