目录
2.1 生产者微服务服务搭建(user_service_9001)
2.1 消费者微服务服务搭建(order_service_9002)
一 MQ介绍
消息队列是一种“先进先出”的数据结构
其应用场景主要包含以下三个方面:应用解耦 流量削峰 数据分发
其主要功能有:
1.灵活可扩展性、
2.海量消息堆积能力、
3.支持顺序消息、
4.多种消息过滤方式、
5.支持事务消息、
6.回溯消费等常用功能。
RocketMQ 核心的四大组件:Name Server、Broker、Producer、Consumer ,每个组件都可以部署成集群模式进行水平扩展。
二 整合springcloud,实现生产和消费

微服务分析:
user_service_9001:生产者
order_service_9002:消费者
2.1 生产者微服务服务搭建(user_service_9001)
1 pom文件导入:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>2 配置文件中添加如下配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: producer_group
retry-times-when-send-failed: 5
send-message-timeout: 5000
topic: ACCOUNT_INFO_TOPIC
txGroup: producer_group_bank1
3 添加Producer.java类,文件信息如下
package com.yty.system.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.lang.reflect.Type;
/**
* @Description: 发送消息:同步消息、异步消息和单向消息。其中前两种消息是可靠的,因为会有发送是否成功的应答
* @author: yan
* @date: 2021.11.08
*/
@Service
@Slf4j
public class Producer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送消息并返回
* @param messageData 请求参数
* @param topic 主题名称
*/
public void sendAndReceive(String topic,String messageData) throws RuntimeException{
try {
Message<String> message = MessageBuilder.withPayload(messageData).build();
String result = rocketMQTemplate.sendAndReceive(topic,message, String.class);
log.info("Send message result :",result);
if (!result.equals("true")){
throw new RuntimeException("Send message error");
}
}catch (Exception e){
log.error("Send message error:",e);
throw new RuntimeException("Send message error:",e);
}
}
/**
* 普通字符串消息
* @param messageData
* @param topic
*/
public void sendMessage(String topic,String messageData) {
rocketMQTemplate.convertAndSend(topic,messageData);
}
/**
* @description: 同步消息
* 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
* @param:
* @return: void
* @author xiaojie
* @date: 2021/11/10 22:25
*/
public SendResult syncSend(String topic,String messageData) {
SendResult sendResult = rocketMQTemplate.syncSend(topic, messageData);
log.info("发送结果{}", sendResult);
return sendResult;
}
4 编写controller,如下
package com.yty.system.controller;
import com.alibaba.fastjson.JSONObject;
import com.yty.system.config.Producer;
import com.yty.system.entity.AccountInfo;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* <p>
* 前端控制器
* </p>
*
* @author damotou
* @since 2022-06-21
*/
@RestController
@RequestMapping("/accountInfo")
public class AccountInfoController {
@Autowired
private Producer producer;
@GetMapping(value = "/sendMsg")
public SendResult sendMsg(){
try {
// 发送消息体内容
JSONObject jsonObject = new JSONObject();
jsonObject.put("username","zhangsan");
jsonObject.put("sex","男");
jsonObject.put("realName","张三");
// 发送主题
String topic = "send_message_topic"; // 这个主题很重要,消费者端需要按照这个主题获取消息,所以建议配置在配置文件中
SendResult result = producer.syncSend(topic,jsonObject.toJSONString());
return result;
}catch (Exception e){
e.printStackTrace();
return null;
}
}
}
5 测试,需要提前启动rocketMQ和可视化页面,这个在前面章节中已涉及,请参考如下连接
https://blog.csdn.net/ytyDaMoTou/article/details/125343297?spm=1001.2014.3001.5501
启动生产者服务

postman测试

ok,信息发送成功,我们去可视化页面查看


注意,如果发送消息时需要携带Tag和Key,则如下修改
public SendResult syncSend(String topic,String messageData) {
String tag = "messageData";
String keys = UUID.randomUUID().toString().replace("-","");
Message message = MessageBuilder.withPayload(messageData)
.setHeader("KEYS", keys).build();
String destination = topic + ":" + tag;
SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
log.info("发送结果{}", sendResult);
return sendResult;
}
说明消息发送成功,至此,生产者搭建成功,并也成功发送消息,接下来搭建消费者服务
2.1 消费者微服务服务搭建(order_service_9002)
1 导包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>2 配置文件中编写
# RocketMQ配置 rocketmq: name-server: 127.0.0.1:9876 producer: group: order_service retry-times-when-send-failed: 6
3 编写配置类TxmsgConsumer.java,内容如下
package com.yty.order.config; import com.yty.order.service.IAccountInfoService; import com.yty.order.service.IDeDuplicationService; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQReplyListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; // 特别注意,这里topic必须和生产者的一致 @Component @RocketMQMessageListener(topic = "send_message_topic",consumerGroup = "consumer_txmsg_group_bank2") @Slf4j public class TxmsgConsumer implements RocketMQReplyListener { @Autowired IAccountInfoService accountInfoService; @Autowired IDeDuplicationService deDuplicationService; @Override public Object onMessage(Object message) { log.info("开始消费消息,消费信息为:{}",message); return true; } }
4 启动消费者服务,可看到获取的信息,如下

至此,说明消费者已获取信息,服务搭建完毕
