RabbitMq的最终一致性分布式事务
使用rabbitmq的步骤

1.运行安装在服务器上的rabbit服务
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Rs8LPaRN-1664352105453)(C:\Users\han\AppData\Roaming\Typora\typora-user-images\image-20220901201727164.png)]](https://img-blog.csdnimg.cn/ceee81e03c7d4656bcdbe72f07519a9e.png)
或者在docker上运行
# 使用数据卷
docker volume rm rabbitmq-5672-data
docker volume create --name rabbitmq-5672-data
docker run -d --rm --name rabbitmq-5672 \
-v /etc/localtime:/etc/localtime:ro \
-v rabbitmq-5672-data:/var/lib/rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3.10-management
# 这个例子挂载「数据存储目录」
docker run -d --rm --name rabbitmq-5672 \
-v /etc/localtime:/etc/localtime:ro \
-v ~/docker/5672/data:/var/lib/rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3.10-management
2.在项目中安装依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
3.编写对应的配置文件
## 连接rabbitmq服务器
spring.rabbitmq.host=192.168.12.12
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=hl
## 手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
## 确认消息已发送到交换机( Exchange )
spring.rabbitmq.publisher-confirm-type=CORRELATED
# 确认消息已发送到队列(Queue)
spring.rabbitmq.publisher-returns=true
4.创建对应配置并加上启动注解
@Configuration
@EnableRabbit
@Slf4j
@Transactional
public class RabbitConfig {
@Resource
private MessageDao messageDao;
public static final String EMPLOYEE_LIST = "employee-list";
public static final String DEPARTMENT_DELETE = "department-delete";
@Bean
public Queue DepartmentDelete(){
return new Queue(DEPARTMENT_DELETE);
}
@Bean
public Queue employeeList(){
return new Queue(EMPLOYEE_LIST);
}
@Bean("rabbitTemplate")
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
//每次发送队列信息将触发此方法,需要添加配置属性
System.out.println(correlationData.getId());
Message message = messageDao.getOne(Long.parseLong(Objects.requireNonNull(correlationData.getId())));
if (ack){
message.setStatus("B");
}
message.setRetryCount(message.getRetryCount()-1);
log.info("剩余消息数:"+message.getRetryCount());
messageDao.save(message);
}
});
// rabbitTemplate.setMandatory(true);
//
// rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
// @Override
// public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// log.info("ReturnCallback 消息:{}", message);
// log.info("ReturnCallback 回应码:{}", replyCode);
// log.info("ReturnCallback 回应信息:{}", replyText);
// log.info("ReturnCallback 交换机:{}", exchange);
// log.info("ReturnCallback 路由键:{}", routingKey);
// }
// });
return rabbitTemplate;
}
}
5.创建message表记录发送次数及信息
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WSNp3Mbr-1664352105454)(C:\Users\han\AppData\Roaming\Typora\typora-user-images\image-20220901202318282.png)]](https://img-blog.csdnimg.cn/3d02d08d334f43a19f482e37dd2406dc.png)
drop table if exists message;
create table message
(
id bigint auto_increment,
exchange varchar(64) ,
routing_key varchar(64) not null,
content varchar(128) not null,
retry_count int not null,
status varchar(32) not null,
primary key (id)
);
创建对应的DAO类和实体类
6.发送请求时并创建message信息
public void deleteById(Long id) {
departmentDao.deleteById(id);
Message message = new Message(null, null, RabbitConfig.DEPARTMENT_DELETE, id+"", 5, "A");
messageDao.save(message);
}
7.创建spring Task定时器并定时输出rabbitmq信息
@Component
@Slf4j
@Transactional
@EnableScheduling
public class RabbitTimer {
@Resource
private MessageMysqlDao messageDao;
@Resource
private RabbitTemplate rabbitTemplate;
@Scheduled(fixedDelay = 6000)
private void process(){
//获取状态不等于C和次数大于0的信息
QueryWrapper<Message> wrapper = new QueryWrapper<>();
wrapper.ne("status", "C");
wrapper.gt("retry_count", 0);
List<Message> messageList = messageDao.selectList(wrapper);
if (messageList.size()==0){
log.info("暂无消息发送,请等待...");
}else {
//进行信息发送
for (Message message : messageList) {
String content = message.getId()+":"+message.getContent();
CorrelationData correlationData = new CorrelationData(message.getId()+"");
if (message.getExchange()==null) {
rabbitTemplate.convertAndSend(message.getRoutingKey(), (Object) content, correlationData);
}
else{
rabbitTemplate.convertAndSend(message.getExchange(), message.getRoutingKey(), content, correlationData);
}
log.info("消息 {} 已发送",content);
}
}
}
}
8.创建消息确定方法,确认接受方收到的了消息并进行了处理
@RestController
@RequestMapping("/message")
@RequiredArgsConstructor
public class MessageController implements IAMessageController{
@Resource
private MessageMysqlDao messageMysqlDao;
@PostMapping("/update/{id}")
public String messageUpdate(@PathVariable("id")Long id){
QueryWrapper<Message> wrapper = new QueryWrapper<>();
wrapper.eq("id", id);
Message message = new Message();
message.setStatus("C");
messageMysqlDao.update(message,wrapper);
return "success";
}
}
9.消息接受者创建消息重复表进行消息去重

drop table if exists recived_message;
create table recived_message
(
id bigint auto_increment,
recived_at datetime
);
10.接受方微服务创建监听器监听rabbitmq信息
消息接受者处理消息发送者发送的消息,在消息处理无误后进行发送openfeign请求,给消息提供者发送确认信息
@Configuration
@RequiredArgsConstructor
@Transactional
public class HarvestResultLister {
private final HarvestPlanMysqlDao harvestPlanMysqlDao;
private final ReceivedMessageMysqlDao receivedMessageMysqlDao;
private final HarvestCheckClient harvestCheckClient;
@RabbitListener(queues = RabbitConfig.HARVEST_CHECK)
public void harvestUpdateByCheck(String msg, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
System.out.println(msg);
String[] split = msg.split(":");
if (split.length != 3) {
throw new RabbitDataError("发送的数据异常");
}
String mesId = split[0];
//获取发送内容id
String contentId = split[1];
//获取被修改的采收id
String harvestId = split[2];
ReceivedMessage receivedMessage = receivedMessageMysqlDao.selectById(Long.parseLong(contentId ));
if (receivedMessage != null){
throw new RabbitDataError("发送重复数据");
}
//存入数据
receivedMessageMysqlDao.insert( new ReceivedMessage(Long.parseLong(contentId ), new Date()));
QueryWrapper<HarvestPlan> wrapper = new QueryWrapper<>();
wrapper.eq("id", Long.parseLong(harvestId));
HarvestPlan harvestPlan = new HarvestPlan();
harvestPlan.setPurchaseStatusId(3L);
String result = harvestCheckClient.messageUpdate(Long.parseLong(mesId));
if (!"success".equals(result)){
throw new RabbitDataError("确认消息未正常传回");
}
} catch (Exception e) {
e.printStackTrace();
throw e;
}finally {
try {
channel.basicAck(tag, false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
版权声明:本文为qq_42652006原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。