微服务整合Seata分布式事务处理

前言

官方文档: http://seata.io/zh-cn/docs/user/microservice.html

这几天开始做新项目,是基于RPC+Restful的微服务项目。前端用的Vue,后端用的比较冷门的小型轻量级框架zbus。前端和后端交互是通过restful,后端服务之间通过RPC调用,这就涉及到了分布式事务。

官方文档很简洁,网上其他资源都是讲的seata与spring cloud、dubbo等框架的整合,但是看到官方这一句:跨服务调用场景下的事务传播,本质上就是要把 XID 通过服务调用传递到服务提供方,并绑定到 RootContext 中去。只要能做到这点,理论上 Seata 可以支持任意的微服务框架。so,当然也可以支持zbus,只要能获取到全局事务的XID。

一、从 https://github.com/seata/seata/releases,下载服务器软件包,将其解压缩并启动。

Usage: sh seata-server.sh(for linux and mac) or cmd seata-server.bat(for windows) [options]
  Options:
    --host, -h
      The host to bind.
      Default: 0.0.0.0
    --port, -p
      The port to listen.
      Default: 8091
    --storeMode, -m
      log store mode : file、db
      Default: file
    --help

e.g.

sh seata-server.sh -p 8091 -h 127.0.0.1 -m file

PS:自定义命令:

cd /usr/local/seata-server-1.4.0-SNAPSHOT/seata/bin

nohup ./seata-server.sh -h 127.0.0.1 -p 8091 >seata.log 2>&1 & 

默认是file类型启动,可以在conf下的file.conf和registry.conf修改。我这里用的是redis

二、在各个服务里引入seata依赖:

<!-- https://mvnrepository.com/artifact/io.seata/seata-spring-boot-starter -->
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>1.3.0</version>
</dependency>

目前(2020-07)最新版本是1.3.0

三、在yml配置文件里配置seata

# seata配置
seata:
  enabled: false
  application-id: cargo
  tx-service-group: my_test_tx_group
  service:
    vgroup-mapping:
      my_test_tx_group: default
    grouplist:
      default: 172.31.1.191:8091

注意:客户端和服务端都要在同一个tx-service-group事务组里!application-id是区分每个服务的标识,如果为空,自动取服务名name。

四、在调用方的业务方法上加上全局事务注解@GlobalTransactional

/**
     * 保存订单,同时保存订单商品信息。
     *
     * @param order
     * @return com.yorma.entity.YmMsg<com.yorma.cargo.entity.Order>
     * @author zhangchao
     * @date 2020/7/16 15:16
     **/
@GlobalTransactional
@Override
public YmMsg<Order> saveOrder(Order order) {
    PubOperateHistory operateHistory = new PubOperateHistory();
    operateHistory.setLogTime(new Date());
    operateHistory.setUser("admin"); // TODO zhangchao 2020/7/8 先写死,以后根据token从redis里获取!
    //        operateHistory.setType(this.getClass().getName());
    operateHistory.setType("Order");
    operateHistory.setNote(Thread.currentThread().getStackTrace()[1].getMethodName());
    // 新增
    if (null == order.getId()) {
        if (isNotEmpty(order.getOrderDetails())) {
            orderDetailsService.saveBatch(order.getOrderDetails());
        }
        baseMapper.insert(order);
        operateHistory.setContent("新增订单,id:" + order.getId());
        operateHistory.setSourceId(order.getId().toString());
        // 编辑
    } else {
        List<OrderDetail> orderDetails = orderDetailsService.list(new QueryWrapper<OrderDetail>().lambda()
                                                                  .eq(OrderDetail::getOrderId, order.getId()));
        // 先清一遍子表
        if (isNotEmpty(orderDetails)) {
            orderDetails.forEach(orderDetail -> orderDetailsService.removeById(orderDetail.getId()));
        }
        if (isNotEmpty(order.getOrderDetails())) {
            orderDetailsService.saveBatch(order.getOrderDetails());
        }
        baseMapper.updateById(order);
        operateHistory.setContent("编辑订单,id:" + order.getId());
        operateHistory.setSourceId(order.getId().toString());
    }
    CommonApi commonApi = rpc.createProxy("/basic/common", CommonApi.class);
    YmMsg ymMsg = commonApi.saveHistory(operateHistory, RootContext.getXID());
    if (ymMsg.isSuccess()) {
        return YmMsg.ok("保存成功!", Order.class);
    } else {
        throw new RuntimeException("操作历史保存失败!");
    }
    //        int i = 10 / 0;
    //        return YmMsg.ok("保存成功!", Order.class);
}

注意:被调用方不需要加@GlobalTransactional!
被调用方不需要加@GlobalTransactional!
被调用方不需要加@GlobalTransactional!

坑点:

  • 如果本地事务包着seata全局事务,会导致seata获取不到全局锁而出现二阶段提交失败异常!!

本人做的事务测试:
在这里插入图片描述


版权声明:本文为qq_26030541原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。