【黑马-SpringCloudAlibaba】学习笔记10-Seata:实现分布式事务控制

Seata介绍

2019 年 1 月,阿里巴巴中间件团队发起了开源项目 Fescar(Fast & EaSy Commit And
Rollback),其愿景是让分布式事务的使用像本地事务的使用一样,简单和高效,并逐步解决开发者们
遇到的分布式事务方面的所有难题。后来更名为 Seata,意为:Simple Extensible Autonomous
Transaction Architecture,是一套分布式事务解决方案。
Seata的设计目标是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进。
它把一个分布式事务理解成一个包含了若干分支事务的全局事务。全局事务的职责是协调其下管辖的分
支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据
库的本地事务。
Seata主要由三个重要组件组成:
  • TC:Transaction Coordinator 事务协调器,管理全局的分支事务的状态,用于全局性事务的提交和回滚。
  • TM:Transaction Manager 事务管理器,用于开启、提交或者回滚全局事务。
  • RM:Resource Manager 资源管理器,用于分支事务上的资源管理,向TC注册分支事务,上报分支事务的状态,接受TC的命令来提交或者回滚分支事务。
Seata的执行流程如下:
  1. A服务的TM向TC申请开启一个全局事务,TC就会创建一个全局事务并返回一个唯一的XID
  2. A服务的RM向TC注册分支事务,并及其纳入XID对应全局事务的管辖
  3. A服务执行分支事务,向数据库做操作
  4. A服务开始远程调用B服务,此时XID会在微服务的调用链上传播
  5. B服务的RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖
  6. B服务执行分支事务,向数据库做操作
  7. 全局事务调用链处理完毕,TM根据有无异常向TC发起全局事务的提交或者回滚
  8. TC协调其管辖之下的所有分支事务, 决定是否回滚
Seata实现2PC与传统2PC的差别:
  1. 架构层次方面,传统2PC方案的 RM 实际上是在数据库层,RM本质上就是数据库自身,通过XA协
    议实现,而 Seata的RM是以jar包的形式作为中间件层部署在应用程序这一侧的。
  2. 两阶段提交方面,传统2PC无论第二阶段的决议是commit还是rollback,事务性资源的锁都要保
    持到Phase2完成才释放。而Seata的做法是在Phase1 就将本地事务提交,这样就可以省去Phase2
    持锁的时间,整体提高效率。
Seata实现分布式事务控制

本示例通过Seata中间件实现分布式事务,模拟电商中的下单和扣库存的过程
我们通过订单微服务执行下单操作,然后由订单微服务调用商品微服务扣除库存

修改order微服务
//orderController
@RestController
@Slf4j
public class OrderController5 {
  @Autowired
  private OrderServiceImpl5 orderService;
  //下单
  @RequestMapping("/order/prod/{pid}")
  public Order order(@PathVariable("pid") Integer pid) {
    log.info("接收到{}号商品的下单请求,接下来调用商品微服务查询此商品信息", pid);
    return orderService.createOrder(pid);
  }
}
//OrderService
@Service
@Slf4j
public class OrderServiceImpl5{
  @Autowired
  private OrderDao orderDao;
  @Autowired
  private ProductService productService;
  @Autowired
  private RocketMQTemplate rocketMQTemplate;
  @GlobalTransactional
  public Order createOrder(Integer pid) {
//1 调用商品微服务,查询商品信息
    Product product = productService.findByPid(pid);
    log.info("查询到{}号商品的信息,内容是:{}", pid, JSON.toJSONString(product));
//2 下单(创建订单)
    Order order = new Order();
    order.setUid(1);
    order.setUsername("测试用户");
    order.setPid(pid);
    order.setPname(product.getPname());
    order.setPprice(product.getPprice());
    order.setNumber(1);
    orderDao.save(order);
    log.info("创建订单成功,订单信息为{}", JSON.toJSONString(order));
    //3 扣库存
    productService.reduceInventory(pid, order.getNumber());
    //4 向mq中投递一个下单成功的消息
    rocketMQTemplate.convertAndSend("order-topic", order);
    return order;
  }
}
//ProductService
//value用于指定调用nacos下哪个微服务
//fallback 指定当调用出现问题之后,要进入到哪个类中的同名方法之下执行备用逻辑
@FeignClient(
        value = "service-product", //
        fallback = ProductServiceFallback.class ,
        fallbackFactory = ProductServiceFallbackFactory.class
)
public interface ProductService {
    //@FeignClient的value +  @RequestMapping的value值  其实就是完成的请求地址  "http://service-product/product/" + pid
    //指定请求的URI部分
    @RequestMapping("/product/{pid}")
    Product findByPid(@PathVariable Integer pid);



//    扣减库存
//    参数一: 商品标识
//    参数二:扣减数量
    @RequestMapping("/product/reduceInventory")
    void reduceInventory(@RequestParam("pid") Integer pid,
                         @RequestParam("number") Integer number);
}
修改Product微服务
//ProductController
@RestController
@Slf4j
public class ProductController {
  @Autowired
  private ProductService productService;
  //商品信息查询
  @RequestMapping("/product/{pid}")
  public Product product(@PathVariable("pid") Integer pid) {
    log.info("接下来要进行{}号商品信息的查询", pid);
    Product product = productService.findByPid(pid);
    log.info("商品信息查询成功,内容为{}", JSON.toJSONString(product));
    return product;
  }

  //扣减库存
  @RequestMapping("/product/reduceInventory")
  public void reduceInventory(Integer pid, Integer number) {
    productService.reduceInventory(pid, number);
  }
}
public interface ProductService {
  @Autowired
  public ProductDao productDao;

  //根据pid查询商品信息
  public Product findById(Integer pid);

//    public ProductDto getProductById(Integer id);
//    List<ProductDto> getAll();
//
//    ProductDto sailOut(Integer productId,int num);

  //扣减库存
//    void reduceInventory(Integer pid, Integer number);
  @Override
  public void reduceInventory(Integer pid, int number);
  
}

@Service
@Transactional
public class ProductServiceImpl implements ProductService {
    @Autowired
    ProductDao productDao;

    @Override
    public Product findById(Integer pid) {
        return productDao.findById(pid).get();
    }

    @Transactional
    @Override
    public void reduceInventory(Integer pid, Integer number) {
        //查询
        Product product = productDao.findById(pid).get();
        //省略校验

        //内存中扣减
        product.setStock(product.getStock() - number);

        //模拟异常
        //int i = 1 / 0;

        //保存
        productDao.save(product);
    }
}
异常模拟

在ProductServiceImpl的代码中模拟一个异常, 然后调用下单接口

@Override
public void reduceInventory(Integer pid, Integer number) {
  Product product = productDao.findById(pid).get();
  if (product.getStock() < number) {
    throw new RuntimeException("库存不足");
  }
      int i = 1 / 0;
      product.setStock(product.getStock() - number);
      productDao.save(product);
  }

启动Seata

下载seata
下载地址:https://github.com/seata/seata/releases/v0.9.0/
2 修改配置文件
将下载得到的压缩包进行解压,进入conf目录,调整下面的配置文件

registry.conf,将type改为nacos,只留下nacos就行

registry {
  type = "nacos"
  nacos {
    serverAddr = "localhost"
    namespace = "public"
    cluster = "default"
  }
}
config {
  type = "nacos"
  nacos {
    serverAddr = "localhost"
    namespace = "public"
    cluster = "default"
  }
}

nacos-config.txt,添加并修改

service.vgroup_mapping.service-product=default
service.vgroup_mapping.service-order=default

这里的语法为: service.vgroup_mapping.${your-service-gruop}=default ,中间的
${your-service-gruop} 为自己定义的服务组名称, 这里需要我们在程序的配置文件中配置。

3 初始化seata在nacos的配置
# 初始化seata 的nacos配置
# 注意: 这里要保证nacos是已经正常运行的
cd conf    # 在config文件夹下cmd
nacos-config.sh 127.0.0.1

执行成功后可以打开Nacos的控制台,在配置列表中,可以看到初始化了很多Group为SEATA_GROUP
的配置。

4 启动seata服务
cd bin
# seata-server.bat -p 9000 -m file   #我运行这个报错了,不能带 -m
seata-server.bat -p 9000

参数	全写	         作用	                    备注
-h	–host	指定在注册中心注册的ip	            不指定时获取当前ip,外部访问部署建议指定
-p	–port	指定server启动的端口	            默认8091
-m	–storeMode	事务日志存储方式	            支持file,db,redis,默认为file,注意:redis需seata-server1.3版本及以上
-n	–serverNode	用户指定seata-server节点id	如1,2,3默认为1
-e	–seataEnv	指定seata-server运行环境	    如dev,test,服务启动会使用registry-dev.conf这样的配置

启动后在 Nacos 的服务列表下面可以看到一个名为 serverAddr 的服务。

使用Seata实现事务控制

1 初始化数据表

在我们的数据库中加入一张undo_log表,这是Seata记录事务日志要用到的表

CREATE TABLE `undo_log`
(
`id` BIGINT(20) NOT NULL AUTO_INCREMENT,
`branch_id` BIGINT(20) NOT NULL,
`xid` VARCHAR(100) NOT NULL,
`context` VARCHAR(128) NOT NULL,
`rollback_info` LONGBLOB NOT NULL,
`log_status` INT(11) NOT NULL,
`log_created` DATETIME NOT NULL,
`log_modified` DATETIME NOT NULL,
`ext` VARCHAR(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = INNODB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
2 添加配置

在需要进行分布式控制的微服务中进行下面几项配置:
添加依赖

<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>   
DataSourceProxyConfig

Seata 是通过代理数据源实现事务分支的,所以需要配置 io.seata.rm.datasource.DataSourceProxy 的
Bean,且是 @Primary默认的数据源,否则事务不会回滚,无法实现分布式事务

//在order和product都要加DataSourceProxyConfig
@Configuration
public class DataSourceProxyConfig {
  @Bean
  @ConfigurationProperties(prefix = "spring.datasource")
  public DruidDataSource druidDataSource() {
    return new DruidDataSource();
  }

  @Primary
  @Bean
  public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
    return new DataSourceProxy(druidDataSource);
  }
}
registry.conf

在resources下添加Seata的配置文件 registry.conf

registry {
  type = "nacos"
  nacos {
    serverAddr = "localhost"
    namespace = "public"
    cluster = "default"
  }
}
config {
  type = "nacos"
  nacos {
    serverAddr = "localhost"
    namespace = "public"
    cluster = "default"
  }
}
bootstrap.yaml
spring:
  application:
    name: service-product
  cloud:
    nacos:
      config:
        server-addr: localhost:8848 # nacos的服务端地址
        namespace: public
        group: SEATA_GROUP
  alibaba:
    seata:
      tx-service-group: ${spring.application.name}
在order微服务开启全局事务,给下订单的方法开启
@GlobalTransactional//全局事务控制
public Order createOrder(Integer pid) {}
测试

再次下单测试

seata运行流程分析

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ILBNbr2D-1667891300778)(seata过程分析.png)]

要点说明:
  1. 每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连
    接代理的目的就是在第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务
    操作就一定有undo_log。
  2. 在第一阶段undo_log中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成
    就已经将分支事务提交,也就释放了锁资源。
  3. TM开启全局事务开始,将XID全局事务id放在事务上下文中,通过feign调用也将XID传入下游分支
    事务,每个分支事务将自己的Branch ID分支事务ID与XID关联。
  4. 第二阶段全局事务提交,TC会通知各各分支参与者提交分支事务,在第一阶段就已经提交了分支事
    务,这里各各参与者只需要删除undo_log即可,并且可以异步执行,第二阶段很快可以完成。
  5. 第二阶段全局事务回滚,TC会通知各各分支参与者回滚分支事务,通过 XID 和 Branch ID 找到相应
    的回滚日志,通过回滚日志生成反向的 SQL 并执行,以完成分支事务回滚到之前的状态,如果回滚失
    败则会重试回滚操作

版权声明:本文为qq_44897733原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。