基于MQ实现分布式事务

问题场景

支付成功后一笔订单, 怎么能保证这笔订单的支付状态就一定修改了?

依赖

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.2</version>
        </dependency>

配置生产者和消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RocketMQConfig {

    @Bean(initMethod = "start",destroyMethod = "shutdown")
    public DefaultMQProducer producer() {
        DefaultMQProducer producer = new
                DefaultMQProducer("paymentGroup");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        return producer;
    }

	// 消费者的具体逻辑在Bean名为messageListener中
    @Bean(initMethod = "start",destroyMethod = "shutdown")
    public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new
                DefaultMQPushConsumer("paymentConsumerGroup");
        // Specify name server addresses.
        consumer.setNamesrvAddr("localhost:9876");
        // Subscribe one more more topics to consume.
        consumer.subscribe("payment", "*");
        consumer.registerMessageListener(messageListener);
        return consumer;
    }

}

支付接口(生产者)

    /**
     * 支付接口(支付成功,MQ发送支付订单orderId消息)
     * transactionManager = "tm131"表示使用哪个事务管理器, 因为项目中可能有多个 具体配置在下面有
     * @param userId
     * @param orderId
     * @param amount
     * @return 0:成功;1:用户不存在;2:余额不足
     */
    @Transactional(transactionManager = "tm131",rollbackFor = Exception.class)
    public int pamentMQ(int userId, int orderId, BigDecimal amount) throws Exception {
        //支付操作
        AccountA accountA = accountAMapper.selectByPrimaryKey(userId);
        if (accountA == null) return 1;
        if (accountA.getBalance().compareTo(amount) < 0) return 2;
        accountA.setBalance(accountA.getBalance().subtract(amount));
        accountAMapper.updateByPrimaryKey(accountA);

        // 封装消息对象
        Message message = new Message();
        message.setTopic("payment");
        message.setKeys(orderId+"");
        message.setBody("订单已支付".getBytes());

        try {
            // 发送消息
            SendResult result = producer.send(message);
            if (result.getSendStatus() == SendStatus.SEND_OK){
                return 0;
            }else {
                throw new Exception("消息发送失败!");
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }

transactionManager = "tm131"具体配置

@Configuration
@MapperScan(value = "com.example.tccdemo.db131.dao",sqlSessionFactoryRef = "factoryBean131")
public class ConfigDb131 {

    @Bean("db131")
    public DataSource db131() {
        MysqlDataSource dataSource = new MysqlDataSource();
        dataSource.setUser("imooc");
        dataSource.setPassword("Imooc@123456");
        dataSource.setUrl("jdbc:mysql://192.168.73.131:3306/xa_131");

        return dataSource;
    }

    @Bean("factoryBean131")
    public SqlSessionFactoryBean factoryBean(@Qualifier("db131") DataSource dataSource) throws IOException {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();

        factoryBean.setDataSource(dataSource);
        ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();

        factoryBean.setMapperLocations(resourceResolver.getResources("mybatis/db131/*.xml"));
        return factoryBean;
    }

    @Bean("tm131") // transactionManager = "tm131"指的就是这个事务管理器
    public PlatformTransactionManager transactionManager(@Qualifier("db131") DataSource dataSource) {

        return new DataSourceTransactionManager(dataSource);
    }
}

订单状态修改(消费者)

// Bean名字要跟RocketMQConfig类中消费者@Qualifier("messageListener")保持一致
@Component("messageListener") 
public class ChangeOrderStatus implements MessageListenerConcurrently {
    @Resource
    private OrderMapper orderMapper;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                    ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (list == null || list.size()==0) return CONSUME_SUCCESS; // 消费成功

        for (MessageExt messageExt : list) {
            String orderId = messageExt.getKeys();
            String msg = new String(messageExt.getBody());
            System.out.println("msg="+msg);
            Order order = orderMapper.selectByPrimaryKey(Integer.parseInt(orderId));

            if (order==null) return RECONSUME_LATER; // 根据具体业务来, 这里再次消费
            try {
                order.setOrderStatus(1);//已支付
                order.setUpdateTime(new Date());
                order.setUpdateUser(0);//系统更新
                orderMapper.updateByPrimaryKey(order);
            }catch (Exception e){
                e.printStackTrace();
                return RECONSUME_LATER;
            }
        }

        return CONSUME_SUCCESS;
    }
}

版权声明:本文为ZHUSHANGLIN原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。