文章目录
RocketMQ整合SpringBoot
pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<rocketmq.version>2.0.4</rocketmq.version>
application.yml配置文件
rocketmq:
name-server: 192.168.1.8:9876 # 访问地址
producer:
group: Pro_Group # 必须指定group
send-message-timeout: 3000 # 消息发送超时时长,默认3s
retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2
RocketMQ结构

MQConsumerService
package com.base.framework.iot.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
@Slf4j
@Component
public class MQConsumerService {
// topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
// selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
// @Service
// @RocketMQMessageListener(topic = “RLT_TEST_TOPIC”, selectorExpression = “tag1”, consumerGroup = “Con_Group_One”)
// public class ConsumerSend implements RocketMQListener {
// // 监听到消息就会执行此方法
// @Override
// public void onMessage(User user) {
// log.info(“监听到消息:user={}”, JSON.toJSONString(user));
// }
// }
// 注意:这个ConsumerSend2和上面ConsumerSend在没有添加tag做区分时,不能共存,不然生产者发送一条消息,这两个都会去消费,类型不同会有一个报错,所以实际运用中最好加上tag,写这只是让你看知道就行
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", consumerGroup = "Con_Group_Two")
public class ConsumerSend2 implements RocketMQListener<String> {
@Override
public void onMessage(String str) {
log.info("监听到消息:str={}", str);
}
}
// MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag2", consumerGroup = "Con_Group_Three")
public class Consumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("监听到消息:msg={}", msg);
}
}
}
MQProducerService
package com.base.framework.iot.rocketmq;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
mq生产者
*/
@Slf4j
@Component
public class MQProducerService {@Value("${rocketmq.producer.send-message-timeout}")
private Integer messageTimeOut;// 建议正常规模项目统一用一个TOPIC
private static final String topic = “RLT_TEST_TOPIC”;// 直接注入使用,用于发送消息到broker服务器
@Autowired
private RocketMQTemplate rocketMQTemplate;/**
- 普通发送(这里的参数对象User可以随意定义,可以发送个对象,也可以是字符串等)
*/
// public void send(User user) {
// rocketMQTemplate.convertAndSend(topic + “:tag1”, user);
rocketMQTemplate.send(topic + “:tag1”, MessageBuilder.withPayload(user).build()); // 等价于上面一行
// }
/**
- 发送同步消息(sendResult为返回的发送结果)这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
*/
public SendResult sendMsg(String msgBody) {
SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());
log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
return sendResult;
}
/**
发送异步消息(适合对响应时间敏感的业务场景即发送端不能容忍长时间地等待Broker的响应)
*/
public void sendAsyncMsg(String msgBody) {
rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 处理消息发送成功逻辑
}@Override public void onException(Throwable throwable) { // 处理消息发送异常逻辑 }});
}
/**
- 发送延时消息
- 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
public void sendDelayMsg(String msgBody, int delayLevel) {
rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);
}
/**
- 发送单向消息(不关心发送结果,如日志)
*/
public void sendOneWayMsg(String msgBody) {
rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build());
}
/**
- 发送带tag的消息,直接在topic后面加上":tag"
*/
public SendResult sendTagMsg(String msgBody) {
return rocketMQTemplate.syncSend(topic + “:tag2”, MessageBuilder.withPayload(msgBody).build());
}
- 普通发送(这里的参数对象User可以随意定义,可以发送个对象,也可以是字符串等)
}
RocketMQController
package com.base.framework.iot.rocketmq;
import com.base.framework.common.response.Result;
import com.github.xiaoymin.knife4j.annotations.ApiSort;
import io.swagger.annotations.Api;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
rocketmq 前端控制层
@author lingchen
@since 2021-01-26
*/
@Api(tags = “rocketmq相关”)
@ApiSort(10)
@Validated
@RestController
@RequestMapping("/rocketmq")
public class RocketMQController {@Autowired
private MQProducerService mqProducerService;
// @GetMapping("/send")
// public void send() {
// User user = User.getUser();
// mqProducerService.send(user);
// }
@GetMapping("/sendTag")
public Result<SendResult> sendTag() {
SendResult sendResult = mqProducerService.sendTagMsg("带有tag的字符消息");
return Result.success(sendResult);
}
}