问题场景
支付成功后一笔订单, 怎么能保证这笔订单的支付状态就一定修改了?
依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
配置生产者和消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RocketMQConfig {
@Bean(initMethod = "start",destroyMethod = "shutdown")
public DefaultMQProducer producer() {
DefaultMQProducer producer = new
DefaultMQProducer("paymentGroup");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
return producer;
}
// 消费者的具体逻辑在Bean名为messageListener中
@Bean(initMethod = "start",destroyMethod = "shutdown")
public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("paymentConsumerGroup");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("payment", "*");
consumer.registerMessageListener(messageListener);
return consumer;
}
}
支付接口(生产者)
/**
* 支付接口(支付成功,MQ发送支付订单orderId消息)
* transactionManager = "tm131"表示使用哪个事务管理器, 因为项目中可能有多个 具体配置在下面有
* @param userId
* @param orderId
* @param amount
* @return 0:成功;1:用户不存在;2:余额不足
*/
@Transactional(transactionManager = "tm131",rollbackFor = Exception.class)
public int pamentMQ(int userId, int orderId, BigDecimal amount) throws Exception {
//支付操作
AccountA accountA = accountAMapper.selectByPrimaryKey(userId);
if (accountA == null) return 1;
if (accountA.getBalance().compareTo(amount) < 0) return 2;
accountA.setBalance(accountA.getBalance().subtract(amount));
accountAMapper.updateByPrimaryKey(accountA);
// 封装消息对象
Message message = new Message();
message.setTopic("payment");
message.setKeys(orderId+"");
message.setBody("订单已支付".getBytes());
try {
// 发送消息
SendResult result = producer.send(message);
if (result.getSendStatus() == SendStatus.SEND_OK){
return 0;
}else {
throw new Exception("消息发送失败!");
}
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
transactionManager = "tm131"具体配置
@Configuration
@MapperScan(value = "com.example.tccdemo.db131.dao",sqlSessionFactoryRef = "factoryBean131")
public class ConfigDb131 {
@Bean("db131")
public DataSource db131() {
MysqlDataSource dataSource = new MysqlDataSource();
dataSource.setUser("imooc");
dataSource.setPassword("Imooc@123456");
dataSource.setUrl("jdbc:mysql://192.168.73.131:3306/xa_131");
return dataSource;
}
@Bean("factoryBean131")
public SqlSessionFactoryBean factoryBean(@Qualifier("db131") DataSource dataSource) throws IOException {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSource);
ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
factoryBean.setMapperLocations(resourceResolver.getResources("mybatis/db131/*.xml"));
return factoryBean;
}
@Bean("tm131") // transactionManager = "tm131"指的就是这个事务管理器
public PlatformTransactionManager transactionManager(@Qualifier("db131") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}
订单状态修改(消费者)
// Bean名字要跟RocketMQConfig类中消费者@Qualifier("messageListener")保持一致
@Component("messageListener")
public class ChangeOrderStatus implements MessageListenerConcurrently {
@Resource
private OrderMapper orderMapper;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (list == null || list.size()==0) return CONSUME_SUCCESS; // 消费成功
for (MessageExt messageExt : list) {
String orderId = messageExt.getKeys();
String msg = new String(messageExt.getBody());
System.out.println("msg="+msg);
Order order = orderMapper.selectByPrimaryKey(Integer.parseInt(orderId));
if (order==null) return RECONSUME_LATER; // 根据具体业务来, 这里再次消费
try {
order.setOrderStatus(1);//已支付
order.setUpdateTime(new Date());
order.setUpdateUser(0);//系统更新
orderMapper.updateByPrimaryKey(order);
}catch (Exception e){
e.printStackTrace();
return RECONSUME_LATER;
}
}
return CONSUME_SUCCESS;
}
}
版权声明:本文为ZHUSHANGLIN原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。