Seata1.5.0源码解析
写在前面
之前项目有使用Seata中间件实现分布式事务, 最近忙里偷闲做一次总结,一方面把一些踩坑记录下来,方便自己和读者快速排错,还有就是争取做到温故而知新, 话不多说, 搞起,搞起.
入门知识建议参考官网: Seata官网地址
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
基础知识
- 术语表
- TC (Transaction Coordinator) - 事务协调者: 维护全局和分支事务的状态,驱动全局事务提交或回滚, 就是我们启动的Seata服务端,一个单独的jvm进程
- TM (Transaction Manager) - 事务管理器: 定义全局事务的范围:开始全局事务、提交或回滚全局事务, 就是我们使用@GlobalTransactional的那个服务, 它同时也可以是RM
- RM (Resource Manager) - 资源管理器: 管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚, 有异常情况需要处理的地方, 一般是被调用的服务
- 示意图, 这些知识官网上都有
- 源码下载: Seata地址,来源于全球最大同性交友网站github
- 建议先fork到自己的仓库
- 下载后的目录
- 模块作用:
- common模块: 通用的异常、工具类、线程工具、计数器、SPI、常量等
- serializer模块: 编解码相关,实现了kryo、protobuf等编解码器
- config模块: Seata中的配置中心和注册中心是分离的,实现了主流的配置中心,默认file模式,apollo,nacos,etcd3等
- core模块: 包括封装好的RPC、数据模型、通信协议、事件、认证、序列化、存储、锁等等
- discovery模块: seata-server充当的事务协调者的角色,用于管理全局事务, RM,TM通过网络通信(没错就是netty)进行事务的提交与回滚,类似与微服务这样, 所以需要一个注册中心,现在支持consul、etcd3、eureka、redis等等
- metrics模块: 统计相关的功能, 支持导出
- rm模块: resource manager相关功能的核心抽象, 上个图看一下
- rm-datasource模块: 对JDBC的扩展, 实现操作数据库相关的功能
- saga模块: 前面提到了seata是支持多种模式的, saga模式的实现
- tcc模块: 对tcc模式的支持
- tm模块: 分布式事务中事务管理者的实现, 稍后会详细介绍
- spring模块: 使用Spring注解类的功能实现
- server模块: TC的实现模块。包含不同模式的事务实现
- test模块: 单元测试
- compressor模块: 支持主流压缩模式, 像7z,zip,gzip等
- console模块: 可视化相关,整合了SpringSecurity,JWT相关
服务端启动
启动类: ServerApplication
配置了一下包扫描@SpringBootApplication(scanBasePackages = {"io.seata"}) public class ServerApplication { public static void main(String[] args) throws IOException { // run the spring-boot application SpringApplication.run(ServerApplication.class, args); } }
使用SpringBoot启动和关闭的钩子函数
@Component public class ServerRunner implements CommandLineRunner, DisposableBean { private static final Logger LOGGER = LoggerFactory.getLogger(ServerRunner.class); // 项目是否启动标识 private boolean started = Boolean.FALSE; // 释放资源 private static final List<Disposable> DISPOSABLE_LIST = new CopyOnWriteArrayList<>(); // 维护释放资源列表 public static void addDisposable(Disposable disposable) { DISPOSABLE_LIST.add(disposable); } // SpringBoot启动后的执行逻辑 @Override public void run(String... args) { try { long start = System.currentTimeMillis(); Server.start(args); started = true; long cost = System.currentTimeMillis() - start; LOGGER.info("seata server started in {} millSeconds", cost); } catch (Throwable e) { started = Boolean.FALSE; LOGGER.error("seata server start error: {} ", e.getMessage(), e); System.exit(-1); } } public boolean started() { return started; } // SpringBoot关闭的执行逻辑 @Override public void destroy() throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug("destoryAll starting"); } for (Disposable disposable : DISPOSABLE_LIST) { disposable.destroy(); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("destoryAll finish"); } } }
启动业务的类是Server
public class Server { /** * The entry point of application. * * @param args the input arguments */ public static void start(String[] args) { // create logger final Logger logger = LoggerFactory.getLogger(Server.class); //initialize the parameter parser //Note that the parameter parser should always be the first line to execute. //Because, here we need to parse the parameters needed for startup. // 解析启动命令 ParameterParser parameterParser = new ParameterParser(args); //initialize the metrics // 使用SPI机制获取registry实例对象 MetricsManager.get().init(); // 从配置中读取存储模式,设置到系统变量中 System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode()); // 自定义线程池, 自定义线程工厂, 拒绝策略使用CallerRunsPolicy ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(), NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()), new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy()); // 初始化PRC远程服务器 NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads); // UUIDGenerator初始化,雪花算法 UUIDGenerator.init(parameterParser.getServerNode()); //log store mode : file, db, redis // log的持久化 SessionHolder.init(parameterParser.getSessionStoreMode()); // 事务日志的持久化, 默认file LockerManagerFactory.init(parameterParser.getLockStoreMode()); // TC核心事务处理类 DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer); coordinator.init(); nettyRemotingServer.setHandler(coordinator); // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028 ServerRunner.addDisposable(coordinator); //127.0.0.1 and 0.0.0.0 are not valid here. if (NetUtil.isValidIp(parameterParser.getHost(), false)) { XID.setIpAddress(parameterParser.getHost()); } else { String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS); if (StringUtils.isNotBlank(preferredNetworks)) { XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR))); } else { XID.setIpAddress(NetUtil.getLocalIp()); } } // 初始化Netty,开始监听端口 nettyRemotingServer.init(); } }
参数解析
我们主要看ParameterParser的init方法, 还有ConfigurationFactoryprivate void init(String[] args) { try { // 基于JCommander解析启动时的参数 getCommandParameters(args); // 基于容器获取参数 getEnvParameters(); // 参数设置 if (StringUtils.isNotBlank(seataEnv)) { System.setProperty(ENV_PROPERTY_KEY, seataEnv); } if (StringUtils.isBlank(storeMode)) { storeMode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE); } if (StringUtils.isBlank(sessionStoreMode)) { sessionStoreMode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE, storeMode); } if (StringUtils.isBlank(lockStoreMode)) { lockStoreMode = CONFIG.getConfig(ConfigurationKeys.STORE_LOCK_MODE, storeMode); } } catch (ParameterException e) { printError(e); } }
ConfigurationFactory的静态方法默认读取registry.conf
static { load(); } private static void load() { // 从系统参数中获取seata.config.name的值 String seataConfigName = System.getProperty(SYSTEM_PROPERTY_SEATA_CONFIG_NAME); if (seataConfigName == null) { // 从系统参数中获取SEATA_CONFIG_NAME的值 seataConfigName = System.getenv(ENV_SEATA_CONFIG_NAME); } if (seataConfigName == null) { // 如果没有的话赋值registry seataConfigName = REGISTRY_CONF_DEFAULT; } String envValue = System.getProperty(ENV_PROPERTY_KEY); if (envValue == null) { envValue = System.getenv(ENV_SYSTEM_KEY); } // 通过registry.conf文件的配置,构建Configuration对象 Configuration configuration = (envValue == null) ? new FileConfiguration(seataConfigName, false) : new FileConfiguration(seataConfigName + "-" + envValue, false); Configuration extConfiguration = null; try { extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration); if (LOGGER.isInfoEnabled()) { LOGGER.info("load Configuration from :{}", extConfiguration == null ? configuration.getClass().getSimpleName() : "Spring Configuration"); } } catch (EnhancedServiceNotFoundException ignore) { } catch (Exception e) { LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e); } CURRENT_FILE_INSTANCE = extConfiguration == null ? configuration : extConfiguration; }
ConfigurationFactory的buildConfiguration方法,加载更多配置项
private static Configuration buildConfiguration() { // 获取config.type信息 String configTypeName = CURRENT_FILE_INSTANCE.getConfig( ConfigurationKeys.FILE_ROOT_CONFIG + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR + ConfigurationKeys.FILE_ROOT_TYPE); if (StringUtils.isBlank(configTypeName)) { throw new NotSupportYetException("config type can not be null"); } ConfigType configType = ConfigType.getType(configTypeName); Configuration extConfiguration = null; Configuration configuration; if (ConfigType.File == configType) { // 默认为file.conf String pathDataId = String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_CONFIG, FILE_TYPE, NAME_KEY); String name = CURRENT_FILE_INSTANCE.getConfig(pathDataId); // 根据配置信息创建Configuration对象 configuration = new FileConfiguration(name); try { extConfiguration = EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration); if (LOGGER.isInfoEnabled()) { LOGGER.info("load Configuration from :{}", extConfiguration == null ? configuration.getClass().getSimpleName() : "Spring Configuration"); } } catch (EnhancedServiceNotFoundException ignore) { } catch (Exception e) { LOGGER.error("failed to load extConfiguration:{}", e.getMessage(), e); } } else { // 如果是其他,比如nacos,通过SPI生成ConfigurationProvider对象 configuration = EnhancedServiceLoader .load(ConfigurationProvider.class, Objects.requireNonNull(configType).name()).provide(); } try { Configuration configurationCache; if (null != extConfiguration) { configurationCache = ConfigurationCache.getInstance().proxy(extConfiguration); } else { // 基于CGLIB对configuration对象做了内存缓存,提升性能 configurationCache = ConfigurationCache.getInstance().proxy(configuration); } if (null != configurationCache) { extConfiguration = configurationCache; } } catch (EnhancedServiceNotFoundException ignore) { } catch (Exception e) { LOGGER.error("failed to load configurationCacheProvider:{}", e.getMessage(), e); } return null == extConfiguration ? configuration : extConfiguration; }
初始化PRC远程服务器, 直接调用了AbstractNettyRemotingServer的构造方法,
public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) { // 设置线程池 super(messageExecutor); // 参数化构建NettyServerBootstrap serverBootstrap = new NettyServerBootstrap(nettyServerConfig); // 设置ChannelHandler, ServerHandler继承了ChannelDuplexHandler,包含了处理消息的逻辑 serverBootstrap.setChannelHandlers(new ServerHandler()); }
线程池的使用(亮点),自定义了线程工厂
public class NamedThreadFactory implements ThreadFactory { // 对前缀不同的线程索引进行缓存 private final static Map<String, AtomicInteger> PREFIX_COUNTER = new ConcurrentHashMap<>(); // 所属群组 private final ThreadGroup group; // 原子类 private final AtomicInteger counter = new AtomicInteger(0); // 线程名称前缀 private final String prefix; // 线程总数量 private final int totalSize; // 是否为守护线程 private final boolean makeDaemons; /** * Instantiates a new Named thread factory. * * @param prefix the prefix * @param totalSize the total size * @param makeDaemons the make daemons */ public NamedThreadFactory(String prefix, int totalSize, boolean makeDaemons) { // 同一前缀的数量, 例如前缀都是test,第一次创建时是1,第二次创建时是2,不同前缀得到的都是1 int prefixCounter = CollectionUtils.computeIfAbsent(PREFIX_COUNTER, prefix, key -> new AtomicInteger(0)) .incrementAndGet(); SecurityManager securityManager = System.getSecurityManager(); group = (securityManager != null) ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup(); this.prefix = prefix + "_" + prefixCounter; this.makeDaemons = makeDaemons; this.totalSize = totalSize; } /** * Instantiates a new Named thread factory. * * @param prefix the prefix * @param makeDaemons the make daemons */ public NamedThreadFactory(String prefix, boolean makeDaemons) { this(prefix, 0, makeDaemons); } /** * Instantiates a new Named thread factory. * * @param prefix the prefix * @param totalSize the total size */ public NamedThreadFactory(String prefix, int totalSize) { this(prefix, totalSize, true); } @Override public Thread newThread(Runnable r) { String name = prefix + "_" + counter.incrementAndGet(); if (totalSize > 1) { name += "_" + totalSize; } Thread thread = new FastThreadLocalThread(group, r, name); thread.setDaemon(makeDaemons); if (thread.getPriority() != Thread.NORM_PRIORITY) { thread.setPriority(Thread.NORM_PRIORITY); } return thread; } }
初始化UUIDGenerator
- 雪花算法的实现: 0 + 41位时间戳(时间还包含移位操作) + Mac地址(4 和 5 位的值还包含或操作) + 12位序列号
SessionHolder的初始化
每一个都对应一个事务, 事务又分为分支事务和全局事务,主要看SessionHoder的init方法public static void init(String mode) { // 默认为file if (StringUtils.isBlank(mode)) { mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE, CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE)); } StoreMode storeMode = StoreMode.get(mode); if (StoreMode.DB.equals(storeMode)) { // 通过SPI机制加载ROOT_SESSION_MANAGER,ASYNC_COMMITTING_SESSION_MANAGER,RETRY_COMMITTING_SESSION_MANAGER, // RETRY_ROLLBACKING_SESSION_MANAGER实例 ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName()); ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(), new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME}); RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(), new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME}); RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(), new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME}); DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.DB.getName()); } else if (StoreMode.FILE.equals(storeMode)) { String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR, DEFAULT_SESSION_STORE_FILE_DIR); if (StringUtils.isBlank(sessionStorePath)) { throw new StoreException("the {store.file.dir} is empty."); } ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(), new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath}); ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(), new Class[]{String.class, String.class}, new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME, null}); RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(), new Class[]{String.class, String.class}, new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME, null}); RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.FILE.getName(), new Class[]{String.class, String.class}, new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME, null}); DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.FILE.getName()); } else if (StoreMode.REDIS.equals(storeMode)) { ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName()); ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName(), new Object[]{ASYNC_COMMITTING_SESSION_MANAGER_NAME}); RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName(), new Object[]{RETRY_COMMITTING_SESSION_MANAGER_NAME}); RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.REDIS.getName(), new Object[]{RETRY_ROLLBACKING_SESSION_MANAGER_NAME}); DISTRIBUTED_LOCKER = DistributedLockerFactory.getDistributedLocker(StoreMode.REDIS.getName()); } else { // unknown store throw new IllegalArgumentException("unknown store mode:" + mode); } reload(storeMode); }
初始化DefaultCoordinator(重点)
负责与TM, RM通信, 像全局事务的开启,提交,回滚, 分支事务的注册, 提交, 回滚都和它有关. 首先双重检测锁获取单例对象public static DefaultCoordinator getInstance(RemotingServer remotingServer) { if (null == instance) { synchronized (DefaultCoordinator.class) { if (null == instance) { instance = new DefaultCoordinator(remotingServer); } } } return instance; } private DefaultCoordinator(RemotingServer remotingServer) { String mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE); // file mode requires no delay in processing this.delayHandleSession = !StringUtils.equalsIgnoreCase(mode, StoreMode.FILE.getName()); if (remotingServer == null) { throw new IllegalArgumentException("RemotingServer not allowed be null."); } // 接口的实现类就是之前提到过的Netty Server this.remotingServer = remotingServer; // 封装了ATCore,TCCCore,SAGACore this.core = new DefaultCore(remotingServer); }
然后看一下初始化方法
public void init() { // 定时处理分布式事务中事务重试相关的逻辑,具体的逻辑在handleAllSession中 handleAllSession.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(HANDLE_ALL_SESSION, this::handleAllSession), 0, HANDLE_ALL_SESSION_PERIOD, TimeUnit.MILLISECONDS); // 定时删除回滚日志, 这个日志包含了执行业务之前和之后的数据的状态,后边会详细介绍 undoLogDelete.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete), UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS); }
看一下handleAllSession的主线
protected void handleAllSession() { SessionCondition sessionCondition = new SessionCondition(GlobalStatus.values()); sessionCondition.setLazyLoadBranch(true); Collection<GlobalSession> allSessions = SessionHolder.getRootSessionManager().findGlobalSessions(sessionCondition); if (CollectionUtils.isEmpty(allSessions)) { return; } // 回滚状态重试事务 List<GlobalSession> retryRollbackingSessions = new ArrayList<>(); // 全局事务开启 List<GlobalSession> beginGlobalSessions = new ArrayList<>(); // 重试提交事务 List<GlobalSession> retryCommittingSessions = new ArrayList<>(); // 异步提交事务 List<GlobalSession> asyncCommittingSessions = new ArrayList<>(); // 把全局事务填充不同的list中 for (GlobalSession session : allSessions) { if (rollbackingStatuses.contains(session.getStatus())) { retryRollbackingSessions.add(session); } else if (retryCommittingStatuses.contains(session.getStatus())) { retryCommittingSessions.add(session); } else if (GlobalStatus.AsyncCommitting.equals(session.getStatus())) { asyncCommittingSessions.add(session); } else if (GlobalStatus.Begin.equals(session.getStatus())) { beginGlobalSessions.add(session); } } // 负责保存执行的结果 List<CompletableFuture<Void>> futures = new ArrayList<>(4); // 异步执行 if (!retryRollbackingSessions.isEmpty()) { futures.add( CompletableFuture.runAsync(() -> handleRetryRollbacking(retryRollbackingSessions), retryRollbacking)); } if (!beginGlobalSessions.isEmpty()) { futures.add(CompletableFuture.runAsync(() -> timeoutCheck(beginGlobalSessions), timeoutCheck)); } if (!retryCommittingSessions.isEmpty()) { futures.add( CompletableFuture.runAsync(() -> handleRetryCommitting(retryCommittingSessions), retryCommitting)); } if (!asyncCommittingSessions.isEmpty()) { futures.add( CompletableFuture.runAsync(() -> handleAsyncCommitting(asyncCommittingSessions), asyncCommitting)); } if (CollectionUtils.isNotEmpty(futures)) { try { // 获取执行结果 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); } catch (InterruptedException e) { LOGGER.error("transaction task thread ran abnormally: {}", e.getMessage(), e); } catch (ExecutionException e) { Throwable throwable = e.getCause() != null ? e.getCause() : e; LOGGER.error("task execution exception: {}", throwable.getMessage(), throwable); } } }
GlobalTransactional注解
- 一切的开始都源于@GlobalTransactional, 与之相关联的是GlobalTransactionScanner、GlobalTransactionalInterceptor两个类
- GlobalTransactionScanner:
- UML图
- 接口或类的作用:
- AbstractAutoProxyCreator类: 选取getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource customTargetSource)方法获取的Advices/Advisors做增强
- ApplicationContextAware接口: Spring启动后会通过setApplicationContext(ApplicationContext applicationContext)方法, 把上下文环境设置到实现类
- ConfigurationChangeListener接口: 当配置发生改变时,通过onChangeEvent(ConfigurationChangeEvent event)方法,实现业务逻辑
- DisposableBean接口: 当容器销毁时会调用该接口下的destroy()方法
- InitializingBean接口: 初始化bean的时候,当所有properties都设置完成后,会执行afterPropertiesSet() 方法
- 重点方法细节实现
先看一下构造函数
/** * Instantiates a new Global transaction scanner. * * @param applicationId 一般设置为 ${spring.application.name} * @param txServiceGroup 一般设置为 ${spring.application.name}-group * @param mode 默认AT_MODE + MT_MODE * @param failureHandlerHook the failure handler hook 失败处理类 */ public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode, FailureHandler failureHandlerHook) { setOrder(ORDER_NUM); setProxyTargetClass(true); this.applicationId = applicationId; this.txServiceGroup = txServiceGroup; this.mode = mode; this.failureHandlerHook = failureHandlerHook; }
AbstractAutoProxyCreator类中的wrapIfNecessary()方法
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { // 缓存中已经存在的话直接返回 if (!doCheckers(bean, beanName)) { return bean; } try { synchronized (PROXYED_SET) { if (PROXYED_SET.contains(beanName)) { return bean; } interceptor = null; // 如果是TCC模式 if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { // init tcc fence clean task if enable useTccFence TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext); //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC // 使用TCC interceptor作为Advices/Advisors interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor); } else { Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) { return bean; } if (globalTransactionalInterceptor == null) { globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook); ConfigurationCache.addConfigListener( ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor); } // 使用globalTransactionalInterceptor作为Advices/Advisors interceptor = globalTransactionalInterceptor; } LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName()); // 不是代理类,则走默认流程 if (!AopUtils.isAopProxy(bean)) { bean = super.wrapIfNecessary(bean, beanName, cacheKey); } else { AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean); Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null)); int pos; for (Advisor avr : advisor) { // Find the position based on the advisor's order, and add to advisors by pos pos = findAddSeataAdvisorPosition(advised, avr); advised.addAdvisor(pos, avr); } } PROXYED_SET.add(beanName); return bean; } } catch (Exception exx) { throw new RuntimeException(exx); } }
属性设置完的initClient()方法
private void initClient() { if (LOGGER.isInfoEnabled()) { LOGGER.info("Initializing Global Transaction Clients ... "); } if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) { LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " + "please change your default configuration as soon as possible " + "and we don't recommend you to use default tx-service-group's value provided by seata", DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP); } if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) { throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup)); } // 初始化TM TMClient.init(applicationId, txServiceGroup, accessKey, secretKey); if (LOGGER.isInfoEnabled()) { LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } // 初始化RM RMClient.init(applicationId, txServiceGroup); if (LOGGER.isInfoEnabled()) { LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } if (LOGGER.isInfoEnabled()) { LOGGER.info("Global Transaction Clients are initialized. "); } // 添加到关闭的钩子函数 registerSpringShutdownHook(); }
4.GlobalTransactionalInterceptor的invoke方法
@Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable { Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null; Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) { final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); // 是否带GlobalTransactional注解 final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); // 是否带GlobalLock注解 final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class); boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes); if (!localDisable) { if (globalTransactionalAnnotation != null || this.aspectTransactional != null) { AspectTransactional transactional; if (globalTransactionalAnnotation != null) { transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(), globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.noRollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.propagation(), globalTransactionalAnnotation.lockRetryInterval(), globalTransactionalAnnotation.lockRetryTimes()); } else { transactional = this.aspectTransactional; } // handleGlobalTransaction进行处理带GlobalTransactional注解的方法 return handleGlobalTransaction(methodInvocation, transactional); } else if (globalLockAnnotation != null) { // handleGlobalLock进行处理带GlobalTransactional注解的方法 return handleGlobalLock(methodInvocation, globalLockAnnotation); } } } return methodInvocation.proceed(); }
- UML图