RabbitMq的最终一致性分布式事务

使用rabbitmq的步骤

在这里插入图片描述

1.运行安装在服务器上的rabbit服务

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Rs8LPaRN-1664352105453)(C:\Users\han\AppData\Roaming\Typora\typora-user-images\image-20220901201727164.png)]

或者在docker上运行

# 使用数据卷
docker volume rm rabbitmq-5672-data
docker volume create --name rabbitmq-5672-data
docker run -d --rm --name rabbitmq-5672 \
    -v /etc/localtime:/etc/localtime:ro \
    -v rabbitmq-5672-data:/var/lib/rabbitmq \
    -p 5672:5672 \
    -p 15672:15672 \
    rabbitmq:3.10-management
# 这个例子挂载「数据存储目录」
docker run -d --rm --name rabbitmq-5672 \
    -v /etc/localtime:/etc/localtime:ro \
    -v ~/docker/5672/data:/var/lib/rabbitmq \
    -p 5672:5672 \
    -p 15672:15672 \
    rabbitmq:3.10-management

2.在项目中安装依赖

<dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
</dependency>

3.编写对应的配置文件

## 连接rabbitmq服务器
spring.rabbitmq.host=192.168.12.12
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=hl

## 手动确认消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual

## 确认消息已发送到交换机( Exchange )
spring.rabbitmq.publisher-confirm-type=CORRELATED

# 确认消息已发送到队列(Queue)
spring.rabbitmq.publisher-returns=true

4.创建对应配置并加上启动注解

@Configuration
@EnableRabbit
@Slf4j
@Transactional
public class RabbitConfig {
    @Resource
    private MessageDao messageDao;

    public static final String EMPLOYEE_LIST = "employee-list";

    public static final String DEPARTMENT_DELETE = "department-delete";

    @Bean
    public Queue DepartmentDelete(){
        return new Queue(DEPARTMENT_DELETE);
    }

    @Bean
    public Queue employeeList(){
        return new Queue(EMPLOYEE_LIST);
    }

    @Bean("rabbitTemplate")
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String s) {
                //每次发送队列信息将触发此方法,需要添加配置属性
                System.out.println(correlationData.getId());
                Message message = messageDao.getOne(Long.parseLong(Objects.requireNonNull(correlationData.getId())));
                if (ack){
                    message.setStatus("B");
                }
                message.setRetryCount(message.getRetryCount()-1);
                log.info("剩余消息数:"+message.getRetryCount());
                messageDao.save(message);
            }
        });

//        rabbitTemplate.setMandatory(true);
//
//        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
//            @Override
//            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//                log.info("ReturnCallback 消息:{}", message);
//                log.info("ReturnCallback 回应码:{}", replyCode);
//                log.info("ReturnCallback 回应信息:{}", replyText);
//                log.info("ReturnCallback 交换机:{}", exchange);
//                log.info("ReturnCallback 路由键:{}", routingKey);
//            }
//        });
        return rabbitTemplate;
    }

}

5.创建message表记录发送次数及信息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WSNp3Mbr-1664352105454)(C:\Users\han\AppData\Roaming\Typora\typora-user-images\image-20220901202318282.png)]

drop table if exists message;
create table message
(
    id          bigint auto_increment,
    exchange    varchar(64) ,
    routing_key varchar(64)  not null,
    content     varchar(128) not null,
    retry_count int          not null,
    status      varchar(32)  not null,
    primary key (id)
);

创建对应的DAO类和实体类

6.发送请求时并创建message信息

public void deleteById(Long id) {
        departmentDao.deleteById(id);
        Message message = new Message(null, null, RabbitConfig.DEPARTMENT_DELETE, id+"", 5, "A");
        messageDao.save(message);
}

7.创建spring Task定时器并定时输出rabbitmq信息

@Component
@Slf4j
@Transactional
@EnableScheduling
public class RabbitTimer {


    @Resource
    private MessageMysqlDao messageDao;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Scheduled(fixedDelay = 6000)
    private void process(){
        //获取状态不等于C和次数大于0的信息
        QueryWrapper<Message> wrapper = new QueryWrapper<>();
        wrapper.ne("status", "C");
        wrapper.gt("retry_count", 0);
        List<Message> messageList = messageDao.selectList(wrapper);
        if (messageList.size()==0){
            log.info("暂无消息发送,请等待...");
        }else {
            //进行信息发送
            for (Message message : messageList) {
                String content = message.getId()+":"+message.getContent();
                CorrelationData correlationData = new CorrelationData(message.getId()+"");
                if (message.getExchange()==null) {
                    rabbitTemplate.convertAndSend(message.getRoutingKey(), (Object) content, correlationData);
                }
                else{
                    rabbitTemplate.convertAndSend(message.getExchange(), message.getRoutingKey(),  content, correlationData);
                }
                log.info("消息 {} 已发送",content);
            }
        }
    }
}

8.创建消息确定方法,确认接受方收到的了消息并进行了处理

@RestController
@RequestMapping("/message")
@RequiredArgsConstructor
public class MessageController implements IAMessageController{

    @Resource
    private MessageMysqlDao messageMysqlDao;

    @PostMapping("/update/{id}")
    public String messageUpdate(@PathVariable("id")Long id){
        QueryWrapper<Message> wrapper = new QueryWrapper<>();
        wrapper.eq("id", id);
        Message message = new Message();
        message.setStatus("C");
        messageMysqlDao.update(message,wrapper);
        return "success";
    }

}

9.消息接受者创建消息重复表进行消息去重

在这里插入图片描述

drop table if exists recived_message;
create table recived_message
(
    id  bigint auto_increment,
    recived_at datetime
);

10.接受方微服务创建监听器监听rabbitmq信息

消息接受者处理消息发送者发送的消息,在消息处理无误后进行发送openfeign请求,给消息提供者发送确认信息

@Configuration
@RequiredArgsConstructor
@Transactional
public class HarvestResultLister {
    private final HarvestPlanMysqlDao harvestPlanMysqlDao;
    private final ReceivedMessageMysqlDao receivedMessageMysqlDao;
    private final HarvestCheckClient harvestCheckClient;

    @RabbitListener(queues = RabbitConfig.HARVEST_CHECK)
    public void harvestUpdateByCheck(String msg, Channel channel,
                                     @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            System.out.println(msg);
            String[] split = msg.split(":");
            if (split.length != 3) {
                throw new RabbitDataError("发送的数据异常");
            }
            String mesId = split[0];
            //获取发送内容id
            String contentId = split[1];
            //获取被修改的采收id
            String harvestId = split[2];

            ReceivedMessage receivedMessage = receivedMessageMysqlDao.selectById(Long.parseLong(contentId ));
            if (receivedMessage != null){
                throw new RabbitDataError("发送重复数据");
            }

            //存入数据
            receivedMessageMysqlDao.insert( new ReceivedMessage(Long.parseLong(contentId ), new Date()));

            QueryWrapper<HarvestPlan> wrapper = new QueryWrapper<>();
            wrapper.eq("id", Long.parseLong(harvestId));
            HarvestPlan harvestPlan = new HarvestPlan();
            harvestPlan.setPurchaseStatusId(3L);

            String result = harvestCheckClient.messageUpdate(Long.parseLong(mesId));
            if (!"success".equals(result)){
                throw new RabbitDataError("确认消息未正常传回");
            }

        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }finally {
            try {
                channel.basicAck(tag, false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

版权声明:本文为qq_42652006原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。