源码系列(八)-Seata1.5.0源码解析

写在前面

  1. 之前项目有使用Seata中间件实现分布式事务, 最近忙里偷闲做一次总结,一方面把一些踩坑记录下来,方便自己和读者快速排错,还有就是争取做到温故而知新, 话不多说, 搞起,搞起.

  2. 入门知识建议参考官网: Seata官网地址

    在这里插入图片描述

    Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

基础知识

  1. 术语表
    1. TC (Transaction Coordinator) - 事务协调者: 维护全局和分支事务的状态,驱动全局事务提交或回滚, 就是我们启动的Seata服务端,一个单独的jvm进程
    2. TM (Transaction Manager) - 事务管理器: 定义全局事务的范围:开始全局事务、提交或回滚全局事务, 就是我们使用@GlobalTransactional的那个服务, 它同时也可以是RM
    3. RM (Resource Manager) - 资源管理器: 管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚, 有异常情况需要处理的地方, 一般是被调用的服务
  2. 示意图, 这些知识官网上都有
    在这里插入图片描述
  3. 源码下载: Seata地址,来源于全球最大同性交友网站github
    1. 建议先fork到自己的仓库
    2. 下载后的目录
      在这里插入图片描述
    3. 模块作用:
      1. common模块: 通用的异常、工具类、线程工具、计数器、SPI、常量等
      2. serializer模块: 编解码相关,实现了kryo、protobuf等编解码器
      3. config模块: Seata中的配置中心和注册中心是分离的,实现了主流的配置中心,默认file模式,apollo,nacos,etcd3等
      4. core模块: 包括封装好的RPC、数据模型、通信协议、事件、认证、序列化、存储、锁等等
      5. discovery模块: seata-server充当的事务协调者的角色,用于管理全局事务, RM,TM通过网络通信(没错就是netty)进行事务的提交与回滚,类似与微服务这样, 所以需要一个注册中心,现在支持consul、etcd3、eureka、redis等等
      6. metrics模块: 统计相关的功能, 支持导出
      7. rm模块: resource manager相关功能的核心抽象, 上个图看一下
        在这里插入图片描述
      8. rm-datasource模块: 对JDBC的扩展, 实现操作数据库相关的功能
      9. saga模块: 前面提到了seata是支持多种模式的, saga模式的实现
      10. tcc模块: 对tcc模式的支持
      11. tm模块: 分布式事务中事务管理者的实现, 稍后会详细介绍
      12. spring模块: 使用Spring注解类的功能实现
      13. server模块: TC的实现模块。包含不同模式的事务实现
      14. test模块: 单元测试
      15. compressor模块: 支持主流压缩模式, 像7z,zip,gzip等
      16. console模块: 可视化相关,整合了SpringSecurity,JWT相关

服务端启动

  1. 启动类: ServerApplication
    配置了一下包扫描

    @SpringBootApplication(scanBasePackages = {"io.seata"})
    public class ServerApplication {
        public static void main(String[] args) throws IOException {
            // run the spring-boot application
            SpringApplication.run(ServerApplication.class, args);
        }
    }
    
  2. 使用SpringBoot启动和关闭的钩子函数

    @Component
    public class ServerRunner implements CommandLineRunner, DisposableBean {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServerRunner.class);
        // 项目是否启动标识
        private boolean started = Boolean.FALSE;
        // 释放资源
        private static final List<Disposable> DISPOSABLE_LIST = new CopyOnWriteArrayList<>();
        // 维护释放资源列表
        public static void addDisposable(Disposable disposable) {
            DISPOSABLE_LIST.add(disposable);
        }
        // SpringBoot启动后的执行逻辑
        @Override
        public void run(String... args) {
            try {
                long start = System.currentTimeMillis();
                Server.start(args);
                started = true;
    
                long cost = System.currentTimeMillis() - start;
                LOGGER.info("seata server started in {} millSeconds", cost);
            } catch (Throwable e) {
                started = Boolean.FALSE;
                LOGGER.error("seata server start error: {} ", e.getMessage(), e);
                System.exit(-1);
            }
        }
    
        public boolean started() {
            return started;
        }
       // SpringBoot关闭的执行逻辑
        @Override
        public void destroy() throws Exception {
    
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("destoryAll starting");
            }
    
            for (Disposable disposable : DISPOSABLE_LIST) {
                disposable.destroy();
            }
    
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("destoryAll finish");
            }
        }
    }
    
  3. 启动业务的类是Server

    public class Server {
        /**
         * The entry point of application.
         *
         * @param args the input arguments
         */
        public static void start(String[] args) {
            // create logger
            final Logger logger = LoggerFactory.getLogger(Server.class);
    
            //initialize the parameter parser
            //Note that the parameter parser should always be the first line to execute.
            //Because, here we need to parse the parameters needed for startup.
            // 解析启动命令
            ParameterParser parameterParser = new ParameterParser(args);
    
            //initialize the metrics
            // 使用SPI机制获取registry实例对象
            MetricsManager.get().init();
            // 从配置中读取存储模式,设置到系统变量中
            System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());
            // 自定义线程池, 自定义线程工厂, 拒绝策略使用CallerRunsPolicy
            ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
                    NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
                    new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
            // 初始化PRC远程服务器
            NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
            // UUIDGenerator初始化,雪花算法
            UUIDGenerator.init(parameterParser.getServerNode());
            //log store mode : file, db, redis
            // log的持久化
            SessionHolder.init(parameterParser.getSessionStoreMode());
            // 事务日志的持久化, 默认file
            LockerManagerFactory.init(parameterParser.getLockStoreMode());
            // TC核心事务处理类
            DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
            coordinator.init();
            nettyRemotingServer.setHandler(coordinator);
    
            // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
            ServerRunner.addDisposable(coordinator);
    
            //127.0.0.1 and 0.0.0.0 are not valid here.
            if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
                XID.setIpAddress(parameterParser.getHost());
            } else {
                String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS);
                if (StringUtils.isNotBlank(preferredNetworks)) {
                    XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR)));
                } else {
                    XID.setIpAddress(NetUtil.getLocalIp());
                }
            }
            // 初始化Netty,开始监听端口
            nettyRemotingServer.init();
        }
    }
    
  4. 参数解析
    我们主要看ParameterParser的init方法, 还有ConfigurationFactory

    private void init(String[] args) {
        try {
            // 基于JCommander解析启动时的参数
            getCommandParameters(args);
            // 基于容器获取参数
            getEnvParameters();
            // 参数设置
            if (StringUtils.isNotBlank(seataEnv)) {
                System.setProperty(ENV_PROPERTY_KEY, seataEnv);
            }
            if (StringUtils.isBlank(storeMode)) {
                storeMode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE);
            }
            if (StringUtils.isBlank(sessionStoreMode)) {
                sessionStoreMode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE, storeMode);
            }
            if (StringUtils.isBlank(lockStoreMode)) {
                lockStoreMode = CONFIG.getConfig(ConfigurationKeys.STORE_LOCK_MODE, storeMode);
            }
        } catch (ParameterException e) {
            printError(e);
        }
    }
    

    ConfigurationFactory的静态方法默认读取registry.conf

    static {
        load();
    }
    
    private static void load() {
        // 从系统参数中获取seata.config.name的值
        String seataConfigName = System.getProperty(SYSTEM_PROPERTY_SEATA_CONFIG_NAME);
        if (seataConfigName == null) {
            // 从系统参数中获取SEATA_CONFIG_NAME的值
            seataConfigName = System.getenv(ENV_SEATA_CONFIG_NAME);
        }
        if (seataConfigName == null) {
            // 如果没有的话赋值registry
            seataConfigName = REGISTRY_CONF_DEFAULT;
        }
        String envValue = System.getProperty(ENV_PROPERTY_KEY);
        if (envValue == null) {
            envValue = System.getenv(ENV_SYSTEM_KEY);
        }
        // 通过registry.conf文件的配置,构建Configuration对象
        Configuration configuration = (envValue == null) ? new FileConfiguration(seataConfigName,
                false) : new FileConfiguration(seataConfigName + "-" + envValue, false);
        Configuration extConfiguration = null;
        try {
            extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("load Configuration from :{}", extConfiguration == null ?
                    configuration.getClass().getSimpleName() : "Spring Configuration");
            }
        } catch (EnhancedServiceNotFoundException ignore) {
    
        } catch (Exception e) {
            LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
        }
        CURRENT_FILE_INSTANCE = extConfiguration == null ? configuration : extConfiguration;
    }
    

    ConfigurationFactory的buildConfiguration方法,加载更多配置项

     private static Configuration buildConfiguration() {
         // 获取config.type信息
         String configTypeName = CURRENT_FILE_INSTANCE.getConfig(
                 ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
                         + ConfigurationKeys.FILE_ROOT_TYPE);
    
         if (StringUtils.isBlank(configTypeName)) {
             throw new NotSupportYetException("config type can not be null");
         }
         ConfigType configType = ConfigType.getType(configTypeName);
    
         Configuration extConfiguration = null;
         Configuration configuration;
         if (ConfigType.File == configType) {
             // 默认为file.conf
             String pathDataId = String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR,
                     ConfigurationKeys.FILE_ROOT_CONFIG, FILE_TYPE, NAME_KEY);
             String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId);
             // 根据配置信息创建Configuration对象
             configuration = new FileConfiguration(name);
             try {
                 extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration);
                 if (LOGGER.isInfoEnabled()) {
                     LOGGER.info("load Configuration from :{}", extConfiguration == null ?
                         configuration.getClass().getSimpleName() : "Spring Configuration");
                 }
             } catch (EnhancedServiceNotFoundException ignore) {
    
             } catch (Exception e) {
                 LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e);
             }
         } else {
             // 如果是其他,比如nacos,通过SPI生成ConfigurationProvider对象
             configuration = EnhancedServiceLoader
                     .load(ConfigurationProvider.class, Objects.requireNonNull(configType).name()).provide();
         }
         try {
             Configuration configurationCache;
             if (null != extConfiguration) {
                 configurationCache = ConfigurationCache.getInstance().proxy(extConfiguration);
             } else {
                 // 基于CGLIB对configuration对象做了内存缓存,提升性能
                 configurationCache = ConfigurationCache.getInstance().proxy(configuration);
             }
             if (null != configurationCache) {
                 extConfiguration = configurationCache;
             }
         } catch (EnhancedServiceNotFoundException ignore) {
    
         } catch (Exception e) {
             LOGGER.error("failed to load configurationCacheProvider:{}", e.getMessage(), e);
         }
         return null == extConfiguration ? configuration : extConfiguration;
     }
    
  5. 初始化PRC远程服务器, 直接调用了AbstractNettyRemotingServer的构造方法,

    public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {
        // 设置线程池
        super(messageExecutor);
        // 参数化构建NettyServerBootstrap
        serverBootstrap = new NettyServerBootstrap(nettyServerConfig);
        // 设置ChannelHandler, ServerHandler继承了ChannelDuplexHandler,包含了处理消息的逻辑
        serverBootstrap.setChannelHandlers(new ServerHandler());
    }
    

    线程池的使用(亮点),自定义了线程工厂

    public class NamedThreadFactory implements ThreadFactory {
        // 对前缀不同的线程索引进行缓存
        private final static Map<String, AtomicInteger> PREFIX_COUNTER = new ConcurrentHashMap<>();
        // 所属群组
        private final ThreadGroup group;
        // 原子类
        private final AtomicInteger counter = new AtomicInteger(0);
        // 线程名称前缀
        private final String prefix;
        // 线程总数量
        private final int totalSize;
        // 是否为守护线程
        private final boolean makeDaemons;
    
        /**
         * Instantiates a new Named thread factory.
         *
         * @param prefix      the prefix
         * @param totalSize   the total size
         * @param makeDaemons the make daemons
         */
        public NamedThreadFactory(String prefix, int totalSize, boolean makeDaemons) {
            // 同一前缀的数量, 例如前缀都是test,第一次创建时是1,第二次创建时是2,不同前缀得到的都是1
            int prefixCounter = CollectionUtils.computeIfAbsent(PREFIX_COUNTER, prefix, key -> new AtomicInteger(0))
                    .incrementAndGet();
            SecurityManager securityManager = System.getSecurityManager();
            group = (securityManager != null) ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
            this.prefix = prefix + "_" + prefixCounter;
            this.makeDaemons = makeDaemons;
            this.totalSize = totalSize;
        }
    
        /**
         * Instantiates a new Named thread factory.
         *
         * @param prefix      the prefix
         * @param makeDaemons the make daemons
         */
        public NamedThreadFactory(String prefix, boolean makeDaemons) {
            this(prefix, 0, makeDaemons);
        }
    
        /**
         * Instantiates a new Named thread factory.
         *
         * @param prefix    the prefix
         * @param totalSize the total size
         */
        public NamedThreadFactory(String prefix, int totalSize) {
            this(prefix, totalSize, true);
        }
    
        @Override
        public Thread newThread(Runnable r) {
            String name = prefix + "_" + counter.incrementAndGet();
            if (totalSize > 1) {
                name += "_" + totalSize;
            }
            Thread thread = new FastThreadLocalThread(group, r, name);
    
            thread.setDaemon(makeDaemons);
            if (thread.getPriority() != Thread.NORM_PRIORITY) {
                thread.setPriority(Thread.NORM_PRIORITY);
            }
            return thread;
        }
    }
    
  6. 初始化UUIDGenerator

    1. 雪花算法的实现: 0 + 41位时间戳(时间还包含移位操作) + Mac地址(4 和 5 位的值还包含或操作) + 12位序列号
  7. SessionHolder的初始化
    每一个都对应一个事务, 事务又分为分支事务和全局事务,主要看SessionHoder的init方法

    public static void init(String mode) {
        // 默认为file
        if (StringUtils.isBlank(mode)) {
            mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE,
                    CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));
        }
        StoreMode storeMode = StoreMode.get(mode);
        if (StoreMode.DB.equals(storeMode)) {
            // 通过SPI机制加载ROOT_SESSION_MANAGER,ASYNC_COMMITTING_SESSION_MANAGER,RETRY_COMMITTING_SESSION_MANAGER,
            // RETRY_ROLLBACKING_SESSION_MANAGER实例
            ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
            ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
            RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
            RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
    
            DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.DB.getName());
        } else if (StoreMode.FILE.equals(storeMode)) {
            String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,
                    DEFAULT_SESSION_STORE_FILE_DIR);
            if (StringUtils.isBlank(sessionStorePath)) {
                throw new StoreException("the {store.file.dir} is empty.");
            }
            ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
                new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath});
            ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
                new Class[]{String.class, String.class}, new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME, null});
            RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
                new Class[]{String.class, String.class}, new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME, null});
            RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(),
                new Class[]{String.class, String.class}, new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME, null});
    
            DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.FILE.getName());
        } else if (StoreMode.REDIS.equals(storeMode)) {
            ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName());
            ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
                StoreMode.REDIS.getName(), new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME});
            RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
                StoreMode.REDIS.getName(), new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME});
            RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class,
                StoreMode.REDIS.getName(), new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
    
            DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.REDIS.getName());
        } else {
            // unknown store
            throw new IllegalArgumentException("unknown store mode:" + mode);
        }
        reload(storeMode);
    }
    
  8. 初始化DefaultCoordinator(重点)
    负责与TM, RM通信, 像全局事务的开启,提交,回滚, 分支事务的注册, 提交, 回滚都和它有关. 首先双重检测锁获取单例对象

    public static DefaultCoordinator getInstance(RemotingServer remotingServer) {
        if (null == instance) {
            synchronized (DefaultCoordinator.class) {
                if (null == instance) {
                    instance = new DefaultCoordinator(remotingServer);
                }
            }
        }
        return instance;
    }
    
    private DefaultCoordinator(RemotingServer remotingServer) {
        String mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
        // file mode requires no delay in processing
        this.delayHandleSession = !StringUtils.equalsIgnoreCase(mode, StoreMode.FILE.getName());
        if (remotingServer == null) {
            throw new IllegalArgumentException("RemotingServer not allowed be null.");
        }
        // 接口的实现类就是之前提到过的Netty Server
        this.remotingServer = remotingServer;
        // 封装了ATCore,TCCCore,SAGACore
        this.core = new DefaultCore(remotingServer);
    }
    

    然后看一下初始化方法

    public void init() {
        // 定时处理分布式事务中事务重试相关的逻辑,具体的逻辑在handleAllSession中
        handleAllSession.scheduleAtFixedRate(
            () -> SessionHolder.distributedLockAndExecute(HANDLE_ALL_SESSION, this::handleAllSession), 0,
                HANDLE_ALL_SESSION_PERIOD, TimeUnit.MILLISECONDS);
        // 定时删除回滚日志, 这个日志包含了执行业务之前和之后的数据的状态,后边会详细介绍
        undoLogDelete.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),
                UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
    }
    

    看一下handleAllSession的主线

    protected void handleAllSession() {
        SessionCondition sessionCondition = new SessionCondition(GlobalStatus.values());
        sessionCondition.setLazyLoadBranch(true);
        Collection<GlobalSession> allSessions =
            SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition);
        if (CollectionUtils.isEmpty(allSessions)) {
            return;
        }
        // 回滚状态重试事务
        List<GlobalSession> retryRollbackingSessions = new ArrayList<>();
        // 全局事务开启
        List<GlobalSession> beginGlobalSessions = new ArrayList<>();
        // 重试提交事务
        List<GlobalSession> retryCommittingSessions = new ArrayList<>();
        // 异步提交事务
        List<GlobalSession> asyncCommittingSessions = new ArrayList<>();
        // 把全局事务填充不同的list中
        for (GlobalSession session : allSessions) {
            if (rollbackingStatuses.contains(session.getStatus())) {
                retryRollbackingSessions.add(session);
            } else if (retryCommittingStatuses.contains(session.getStatus())) {
                retryCommittingSessions.add(session);
            } else if (GlobalStatus.AsyncCommitting.equals(session.getStatus())) {
                asyncCommittingSessions.add(session);
            } else if (GlobalStatus.Begin.equals(session.getStatus())) {
                beginGlobalSessions.add(session);
            }
        }
        // 负责保存执行的结果
        List<CompletableFuture<Void>> futures = new ArrayList<>(4);
        // 异步执行
        if (!retryRollbackingSessions.isEmpty()) {
            futures.add(
                CompletableFuture.runAsync(() -> handleRetryRollbacking(retryRollbackingSessions), retryRollbacking));
        }
        if (!beginGlobalSessions.isEmpty()) {
            futures.add(CompletableFuture.runAsync(() -> timeoutCheck(beginGlobalSessions), timeoutCheck));
        }
        if (!retryCommittingSessions.isEmpty()) {
            futures.add(
                CompletableFuture.runAsync(() -> handleRetryCommitting(retryCommittingSessions), retryCommitting));
        }
        if (!asyncCommittingSessions.isEmpty()) {
            futures.add(
                CompletableFuture.runAsync(() -> handleAsyncCommitting(asyncCommittingSessions), asyncCommitting));
        }
        if (CollectionUtils.isNotEmpty(futures)) {
            try {
                // 获取执行结果
                CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
            } catch (InterruptedException e) {
                LOGGER.error("transaction task thread ran abnormally: {}", e.getMessage(), e);
            } catch (ExecutionException e) {
                Throwable throwable = e.getCause() != null ? e.getCause() : e;
                LOGGER.error("task execution exception: {}", throwable.getMessage(), throwable);
            }
        }
    }
    

GlobalTransactional注解

  1. 一切的开始都源于@GlobalTransactional, 与之相关联的是GlobalTransactionScanner、GlobalTransactionalInterceptor两个类
  2. GlobalTransactionScanner:
    1. UML图
      在这里插入图片描述
    2. 接口或类的作用:
      1. AbstractAutoProxyCreator类: 选取getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource customTargetSource)方法获取的Advices/Advisors做增强
      2. ApplicationContextAware接口: Spring启动后会通过setApplicationContext(ApplicationContext applicationContext)方法, 把上下文环境设置到实现类
      3. ConfigurationChangeListener接口: 当配置发生改变时,通过onChangeEvent(ConfigurationChangeEvent event)方法,实现业务逻辑
      4. DisposableBean接口: 当容器销毁时会调用该接口下的destroy()方法
      5. InitializingBean接口: 初始化bean的时候,当所有properties都设置完成后,会执行afterPropertiesSet() 方法
    3. 重点方法细节实现
      1. 先看一下构造函数

         /**
             * Instantiates a new Global transaction scanner.
             *
             * @param applicationId      一般设置为 ${spring.application.name}
             * @param txServiceGroup     一般设置为 ${spring.application.name}-group
             * @param mode               默认AT_MODE + MT_MODE
             * @param failureHandlerHook the failure handler hook 失败处理类
         */
        public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode,
                                            FailureHandler failureHandlerHook) {
                setOrder(ORDER_NUM);
                setProxyTargetClass(true);
                this.applicationId = applicationId;
                this.txServiceGroup = txServiceGroup;
                this.mode = mode;
                this.failureHandlerHook = failureHandlerHook;
        }
        
      2. AbstractAutoProxyCreator类中的wrapIfNecessary()方法

        protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
                // 缓存中已经存在的话直接返回
                if (!doCheckers(bean, beanName)) {
                    return bean;
                }
        
                try {
                    synchronized (PROXYED_SET) {
                        if (PROXYED_SET.contains(beanName)) {
                            return bean;
                        }
                        interceptor = null;
                        // 如果是TCC模式
                        if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                            // init tcc fence clean task if enable useTccFence
                            TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
                            //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                            // 使用TCC interceptor作为Advices/Advisors
                            interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                            ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                    (ConfigurationChangeListener)interceptor);
                        } else {
                            Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                            Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
        
                            if (!existsAnnotation(new Class[]{serviceInterface})
                                && !existsAnnotation(interfacesIfJdk)) {
                                return bean;
                            }
        
                            if (globalTransactionalInterceptor == null) {
                                globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                                ConfigurationCache.addConfigListener(
                                        ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                        (ConfigurationChangeListener)globalTransactionalInterceptor);
                            }
                            // 使用globalTransactionalInterceptor作为Advices/Advisors
                            interceptor = globalTransactionalInterceptor;
                        }
        
                        LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
                        // 不是代理类,则走默认流程
                        if (!AopUtils.isAopProxy(bean)) {
                            bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                        } else {
                            AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                            Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                            int pos;
                            for (Advisor avr : advisor) {
                                // Find the position based on the advisor's order, and add to advisors by pos
                                pos = findAddSeataAdvisorPosition(advised, avr);
                                advised.addAdvisor(pos, avr);
                            }
                        }
                        PROXYED_SET.add(beanName);
                        return bean;
                    }
                } catch (Exception exx) {
                    throw new RuntimeException(exx);
            }
        }
        
      3. 属性设置完的initClient()方法

        private void initClient() {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Initializing Global Transaction Clients ... ");
                }
                if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
                    LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
                            "please change your default configuration as soon as possible " +
                            "and we don't recommend you to use default tx-service-group's value provided by seata",
                            DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
                }
                if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
                    throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
                }
                // 初始化TM
                TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
                }
                // 初始化RM
                RMClient.init(applicationId, txServiceGroup);
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
                }
        
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Global Transaction Clients are initialized. ");
                }
                // 添加到关闭的钩子函数
                registerSpringShutdownHook();
        
            }
        

        4.GlobalTransactionalInterceptor的invoke方法

        @Override
        	public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        	    Class<?> targetClass =
        	        methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
        	    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        	    if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
        	        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
        	        // 是否带GlobalTransactional注解
        	        final GlobalTransactional globalTransactionalAnnotation =
        	            getAnnotation(method, targetClass, GlobalTransactional.class);
        	        // 是否带GlobalLock注解
        	        final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
        	        boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
        	        if (!localDisable) {
        	            if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
        	                AspectTransactional transactional;
        	                if (globalTransactionalAnnotation != null) {
        	                    transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
        	                        globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
        	                        globalTransactionalAnnotation.noRollbackForClassName(),
        	                        globalTransactionalAnnotation.noRollbackFor(),
        	                        globalTransactionalAnnotation.noRollbackForClassName(),
        	                        globalTransactionalAnnotation.propagation(),
        	                        globalTransactionalAnnotation.lockRetryInterval(),
        	                        globalTransactionalAnnotation.lockRetryTimes());
        	                } else {
        	                    transactional = this.aspectTransactional;
        	                }
        	                // handleGlobalTransaction进行处理带GlobalTransactional注解的方法
        	                return handleGlobalTransaction(methodInvocation, transactional);
        	            } else if (globalLockAnnotation != null) {
        	                // handleGlobalLock进行处理带GlobalTransactional注解的方法
        	                return handleGlobalLock(methodInvocation, globalLockAnnotation);
        	            }
        	        }
        	    }
        	    return methodInvocation.proceed();
        	}
        

全局锁

  1. 借鉴一下objcoding的博客

版权声明:本文为zwq56693原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。