阿里开源一站式分布式事务框架seata源码分析(AT模式下TM与RM分析)

序言:

对于阿里开源分布式事务框架seata的详细了解可以参考官网,这里不会详细介绍。本章只会介绍seata中AT模式的源码分析(对阿seata有一定了解或者成功完成过demo)。

seata中一个事务的开启是由TM角色来完成,在整体事务发起方我们可以通过在执行方法中包含@GlobalTransactional来标示启用全局事务,并包含该全局事务的一定自定义设置。如下所示:

public @interface GlobalTransactional {

    /**
     * 设置该全局事务下执行的超时时间 默认毫秒
     *
     * @return timeoutMills in MILLISECONDS.
     */
    int timeoutMills() default TransactionInfo.DEFAULT_TIME_OUT;

    /**
     * 全局事务实例的的名称
     *
     * @return Given name.
     */
    String name() default "";

    /**
     * 设置哪些异常类发生需要进行rollback
     * @return
     */
    Class<? extends Throwable>[] rollbackFor() default {};

    /**
     *  设置哪些异常类名发生需要进行rollback
     * @return
     */
    String[] rollbackForClassName() default {};

    /**
     * 设置哪些异常类发生不需要进行rollback
     * @return
     */
    Class<? extends Throwable>[] noRollbackFor() default {};

    /**
     * 设置哪些异常类名发生不需要进行rollback
     * @return
     */
    String[] noRollbackForClassName() default {};

    /**
     * 事务传播级别 默认REQUIRED
     * 
     * @return
     */
    Propagation propagation() default Propagation.REQUIRED;
}

1:seata客户端-TM(基于springcloud项目分析)

1.0:GlobalTransactionScanner

使用过spring的@Transactional事务的实现知道,它通过动态代理的方式,将事务的创建,提交或回滚这些公干的动作封装到一套执行模版中。这种方式在很多开源框架都是如此构建的例如mybtis中的各执行注解(@Update,@Select等),Springcloud中的Feign调用啊等。通过idea我们可以查看@GlobalTransactional该注解在什么地方被使用到,如下所示:

如果对于springboot的一些开源start(例如mybatis中MapperScannerRegistrar等)项目有过源码走读的经验,从GlobalTransactionScanner名字可以看出该类负责扫描GlobalTransaction注解并构建其代理方法(比较@GlobalTransactionScanner作用在方法中)。继续通过ieda的Find Usages功能寻找GobalTransactionScanner的引用,发现在seata的spring starter项目中的SeataAutoConfiguration对其进行初始化。

我们镜头回到GobalTransactionScanner中(对于SeataAutoConfiguration的其它作用后续描述)。GobalTransactionScanner实现了InitializingBean(bean初始化完成后执行),AbstractAutoProxyCreator,ApplicationContextAware(获取ApplicationContext对象),DisposableBean(bean被消耗时执行),分别对应的spring中bean不同的生命周期。如下所示是spring bean初始化完成后执行

 这里会存在一个疑问,为何要开启RM与TM两个client,如果对于某一个服务它在分布式事务链路中只是作为一个分支即RM的角色而非TM,那么对于这TM的启动是否没有存在必要,毕竟需要开启TM与TC之间的连接通道,也是一个资源的浪费。

1.1:客户端TM client

在1.2与1.3对于TM与TC之间连接的有关的管理类有着不同的命名

1.2的时候命名为TmRpcClient

对于1.3的时候改命名为TmNettyRemotingClient如下所示:

 其实不论上述两个版本核心都是通过Netty作为服务之间远程网络通信基础架构,所以1.3的改为TmNettyRemotingClient更简单表达底层实现原理。后续都以1.3最新版作为讲解

1.1.1:TmNettyRemotingClient(核心类,TM远程调用client)

public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(TmNettyRemotingClient.class);
    private static volatile TmNettyRemotingClient instance;
    //长链接 keep-alive时间 
    private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
    //常量 线程 等待队列长度
    private static final int MAX_QUEUE_SIZE = 2000;
    //是否初始化标示
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    //配置applicationId唯一id
    private String applicationId;
    private String transactionServiceGroup;


    @Override
    public void init() {
        // 注册返回response 消息处理器
        registerProcessor();
        //初始化
        if (initialized.compareAndSet(false, true)) {
            super.init();
        }
    }

    private TmNettyRemotingClient(NettyClientConfig nettyClientConfig,
                                  EventExecutorGroup eventExecutorGroup,
                                  ThreadPoolExecutor messageExecutor) {
        super(nettyClientConfig, eventExecutorGroup, messageExecutor, NettyPoolKey.TransactionRole.TMROLE);
    }

    /**
     * 获取一个TmNettyRemotingClient
     *
     * @param applicationId           the application id
     * @param transactionServiceGroup the transaction service group
     * @return the instance
     */
    public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {
        //作为一个单列的形式获取TmNettyRemotingClient
        TmNettyRemotingClient tmNettyRemotingClient = getInstance();
        tmNettyRemotingClient.setApplicationId(applicationId);
        tmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);
        return tmNettyRemotingClient;
    }

    /**
     * 单例获取 懒汉式获取
     * @return the instance
     */
    public static TmNettyRemotingClient getInstance() {
        if (instance == null) {
            synchronized (TmNettyRemotingClient.class) {
                if (instance == null) {
                    NettyClientConfig nettyClientConfig = new NettyClientConfig();
                    //定义线程pool
                    final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
                        nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
                        KEEP_ALIVE_TIME, TimeUnit.SECONDS,
                        new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
                        new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
                            nettyClientConfig.getClientWorkerThreads()),
                        RejectedPolicies.runsOldestTaskPolicy());
                    instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
                }
            }
        }
        return instance;
    }

    /**
     * Sets application id.
     *
     * @param applicationId the application id
     */
    public void setApplicationId(String applicationId) {
        this.applicationId = applicationId;
    }

    /**
     * Sets transaction service group.
     *
     * @param transactionServiceGroup the transaction service group
     */
    public void setTransactionServiceGroup(String transactionServiceGroup) {
        this.transactionServiceGroup = transactionServiceGroup;
    }

    @Override
    public String getTransactionServiceGroup() {
        return transactionServiceGroup;
    }

    /**
     *注册成功回调
     */
    @Override
    public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response,
                                     AbstractMessage requestMessage) {
        RegisterTMRequest registerTMRequest = (RegisterTMRequest)requestMessage;
        RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("register TM success. client version:{}, server version:{},channel:{}", registerTMRequest.getVersion(), registerTMResponse.getVersion(), channel);
        }
        getClientChannelManager().registerChannel(serverAddress, channel);
    }

    @Override
    public void onRegisterMsgFail(String serverAddress, Channel channel, Object response,
                                  AbstractMessage requestMessage) {
        RegisterTMRequest registerTMRequest = (RegisterTMRequest)requestMessage;
        RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;
        String errMsg = String.format(
            "register TM failed. client version: %s,server version: %s, errorMsg: %s, " + "channel: %s", registerTMRequest.getVersion(), registerTMResponse.getVersion(), registerTMResponse.getMsg(), channel);
        throw new FrameworkException(errMsg);
    }

    /**
     * bean被销毁
     */
    @Override
    public void destroy() {
        super.destroy();
        initialized.getAndSet(false);
        instance = null;
    }

    @Override
    protected Function<String, NettyPoolKey> getPoolKeyFunction() {
        return (severAddress) -> {
            RegisterTMRequest message = new RegisterTMRequest(applicationId, transactionServiceGroup);
            return new NettyPoolKey(NettyPoolKey.TransactionRole.TMROLE, severAddress, message);
        };
    }

    /**
     * 注册 TC response 有关处理器
     */
    private void registerProcessor() {
         //注册 TC response netty返回信息解析器

        ClientOnResponseProcessor onResponseProcessor =
            new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
        // 2.注册 heartbeat netty返回信息解析器
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
    }
}

//父类中init
public void init() {
        //定义周期延时任务默认10s 该任务用于TM与TC的channel的连接检测 对于断的Channel进行重连
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                clientChannelManager.reconnect(getTransactionServiceGroup());
            }
        }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
        //是否配置开启transport.enableClientBatchSendRequest即客户端事务消息请求是否批量合并发送 默认为true
        if (NettyClientConfig.isEnableClientBatchSendRequest()) {
            //定义线程pool
            mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                MAX_MERGE_SEND_THREAD,
                KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
            //运行MergedSendRunnable任务即合并发送请求
            mergeSendExecutorService.submit(new MergedSendRunnable());
        }
        super.init();
        //
        clientBootstrap.start();
    }

 NettyClientBootstrap是对于NettyClient的封装,对该类进行源码分析:

 

public class NettyClientBootstrap implements RemotingBootstrap {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class);
    //client有关配置
    private final NettyClientConfig nettyClientConfig;
    //netty 启动类
    private final Bootstrap bootstrap = new Bootstrap();
    //netty worker
    private final EventLoopGroup eventLoopGroupWorker;
    //事件调度
    private EventExecutorGroup defaultEventExecutorGroup;
    //是否初始化标示
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
    private final NettyPoolKey.TransactionRole transactionRole;
    //netty handler事件
    private ChannelHandler[] channelHandlers;

    public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup,
                                NettyPoolKey.TransactionRole transactionRole) {
        if (nettyClientConfig == null) {
            nettyClientConfig = new NettyClientConfig();
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("use default netty client config.");
            }
        }
        this.nettyClientConfig = nettyClientConfig;
        int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();
        this.transactionRole = transactionRole;
        //nio event group
        this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize,
            new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
                selectorThreadSizeThreadSize));
        this.defaultEventExecutorGroup = eventExecutorGroup;
    }

    /**
     * Sets channel handlers.
     *
     * @param handlers the handlers
     */
    protected void setChannelHandlers(final ChannelHandler... handlers) {
        if (handlers != null) {
            channelHandlers = handlers;
        }
    }

    /**
     * Add channel pipeline last.
     *
     * @param channel  the channel
     * @param handlers the handlers
     */
    private void addChannelPipelineLast(Channel channel, ChannelHandler... handlers) {
        if (channel != null && handlers != null) {
            channel.pipeline().addLast(handlers);
        }
    }

    @Override
    public void start() {

        if (this.defaultEventExecutorGroup == null) {
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
                new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
                    nettyClientConfig.getClientWorkerThreads()));
        }
        //初始化 netty client 并设置option属性
        this.bootstrap.group(this.eventLoopGroupWorker).channel(
            nettyClientConfig.getClientChannelClazz()).option(
            ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(
            ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(
            ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF,
            nettyClientConfig.getClientSocketRcvBufSize());

        if (nettyClientConfig.enableNative()) {
            if (PlatformDependent.isOsx()) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("client run on macOS");
                }
            } else {
                bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
                    .option(EpollChannelOption.TCP_QUICKACK, true);
            }
        }

        //通过pipeline 绑定默认Handler 以及自定义handler
        bootstrap.handler(
            new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(
                            /**
                             * 设置channel 空闲状态处理器,是用来检测当前Handler的ChannelRead()的空闲时间
                             * int readerIdleTimeSeconds 为读超时时间(即多长时间没有接受到客户端发送数据)
                             * int writerIdleTimeSeconds, 为写超时时间(即多长时间没有向客户端发送数据)
                             * int allIdleTimeSeconds 所有类型(读或写)的超时时间
                             * 根据个参数IdleStateHandler会启动不同的定时任务,根据设定的时长去检测ChannelRead()方法是否被调用,
                             * 如果没有被调用。之后则会调用后续handler的userEventTriggered方法去执行一些事情(比如断开链接)
                             */
                        new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
                            nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                            nettyClientConfig.getChannelMaxAllIdleSeconds()))
                        //设置编码解码器
                        .addLast(new ProtocolV1Decoder())
                        .addLast(new ProtocolV1Encoder());
                    if (channelHandlers != null) {
                        addChannelPipelineLast(ch, channelHandlers);
                    }
                }
            });

        if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
            LOGGER.info("NettyClientBootstrap has started");
        }
    }

    @Override
    public void shutdown() {
        try {
            //关闭netty网络资源
            this.eventLoopGroupWorker.shutdownGracefully();
            if (this.defaultEventExecutorGroup != null) {
                this.defaultEventExecutorGroup.shutdownGracefully();
            }
        } catch (Exception exx) {
            LOGGER.error("Failed to shutdown: {}", exx.getMessage());
        }
    }

    /**
     * 
     * 获取一个新的channel channel为与TC之间网络通道
     * @param address the address 网络地址
     * @return the new channel
     */
    public Channel getNewChannel(InetSocketAddress address) {
        Channel channel;
        //连接TC
        ChannelFuture f = this.bootstrap.connect(address);
        try {
            //等待超时时间内与Server端进行 若无法连接抛出异常
            f.await(this.nettyClientConfig.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS);
            if (f.isCancelled()) {
                throw new FrameworkException(f.cause(), "connect cancelled, can not connect to services-server.");
            } else if (!f.isSuccess()) {
                throw new FrameworkException(f.cause(), "connect failed, can not connect to services-server.");
            } else {
                channel = f.channel();
            }
        } catch (Exception e) {
            throw new FrameworkException(e, "can not connect to services-server.");
        }
        return channel;
    }

    /**
     * Gets thread prefix.
     *
     * @param threadPrefix the thread prefix
     * @return the thread prefix
     */
    private String getThreadPrefix(String threadPrefix) {
        return threadPrefix + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();
    }
}

以上为TM大体的初始化过程,详细可自行研读源码


GlobalTransactionScanner中afterPropertiesSet解析完成之后,会执行AbstractAutoProxyCreator(该类用于为Bean生成代理对象,)中的wrapIfNecessary()方法,(AbstractAutoProxyCreator实际上实现了BeanPostProcessor接口,而wrapIfNecessary在postProcessAfterInitialization方法中被调用,因此它在afterPropertiesSet之后执行

wrapIfNecessary源码分析:

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        //是否开启GlobalTransaction
        if (disableGlobalTransaction) {
            return bean;
        }
        try {
            //PROXYED_SET 记录已被代理
            synchronized (PROXYED_SET) {
                //该bean 是否已被代理 若已被无需重复代理
                if (PROXYED_SET.contains(beanName)) {
                    return bean;
                }
                //MethodInterceptor 定义方法拦截器
                interceptor = null;
                //检测是否是TCC 模式下代理
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                } else {
                    //非jdk代理 基于class方式
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    //如果bean是jdk代理(基于接口) 获取元Class
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                    //是否包含Annotation
                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }

                    if (interceptor == null) {
                        if (globalTransactionalInterceptor == null) {
                            //构建globalTransactionalInterceptor
                            globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                            
                            ConfigurationCache.addConfigListener(
                                ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                (ConfigurationChangeListener)globalTransactionalInterceptor);
                        }
                        interceptor = globalTransactionalInterceptor;
                    }
                }

                LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
                if (!AopUtils.isAopProxy(bean)) {
                    //如果gaibean不是aop代理类
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    // 执行包装目标对象到代理对象  

                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    for (Advisor avr : advisor) {
                        advised.addAdvisor(0, avr);
                    }
                }
                
                PROXYED_SET.add(beanName);
                return bean;
            }
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }

    /**
     * 目标 classes的方法中是否包含GlobalTransactional 或GlobalLock 注解
     * @param classes
     * @return
     */
    private boolean existsAnnotation(Class<?>[] classes) {
        if (CollectionUtils.isNotEmpty(classes)) {
            for (Class<?> clazz : classes) {
                if (clazz == null) {
                    continue;
                }
                GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);
                if (trxAnno != null) {
                    return true;
                }
                Method[] methods = clazz.getMethods();

                for (Method method : methods) {
                    //是否包含GlobalTransactional注解
                    trxAnno = method.getAnnotation(GlobalTransactional.class);
                    if (trxAnno != null) {
                        return true;
                    }
                    //GlobalLock
                    GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
                    if (lockAnno != null) {
                        return true;
                    }
                }
            }
        }
        return false;
    }

从上述代码中可看出,用GlobalTransactionalInterceptor 代替了GlobalTransactional 和 GlobalLock 注解的方法

1.3:GlobalTransactionalInterceptor(全局事务拦截器)

该类用于代理处理@GlobalTransactional被执行,如下源码所示:

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {

    private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);
    private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();
    //事务模版类
    private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
    private final GlobalLockTemplate<Object> globalLockTemplate = new GlobalLockTemplate<>();
    //失败处理器
    private final FailureHandler failureHandler;
    private volatile boolean disable;
    //服务自检周期	默认2000,单位ms.每2秒进行一次服务自检,来决定
    private static int degradeCheckPeriod;
    //降级检测开关 降级开关	默认false。业务侧根据连续错误数自动降级不走seata事务
    private static volatile boolean degradeCheck;
    //升降级达标阈值	默认10
    private static int degradeCheckAllowTimes;
    private static volatile Integer degradeNum = 0;
    private static volatile Integer reachNum = 0;
    private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);

    //用于周期检测是否降级 执行器 应该在degradeCheck =true时被初始化
    private static ScheduledThreadPoolExecutor executor =
        new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("degradeCheckWorker", 1, true));

    /**
     * Instantiates a new Global transactional interceptor.
     *
     * @param failureHandler
     *            the failure handler
     */
    public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
        //初始化动作
        this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
        this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
            DEFAULT_DISABLE_GLOBAL_TRANSACTION);
        degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,
            DEFAULT_TM_DEGRADE_CHECK);
        //开启降级设置
        if (degradeCheck) {
            ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
            degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(
                ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);
            degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(
                ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
            EVENT_BUS.register(this);
            if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {
                startDegradeCheck();
            }
        }
    }

    /**
     * 代理方法调用逻辑
     * @param methodInvocation 被代理的原方法
     * @return
     * @throws Throwable
     */
    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {

        //获取方法所属类
        Class<?> targetClass =
            methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
        //获取执行具体的Method对象
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            //获取GlobalTransactional注解对象 获取定义属性
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
            //获取GlobalLock对象
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            //是否被降级或者开启全局事务
            boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
            if (!localDisable) {
                //判定globalTransactional注解还是globalLock全局锁对象
                if (globalTransactionalAnnotation != null) {
                    return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
                } else if (globalLockAnnotation != null) {
                    return handleGlobalLock(methodInvocation);
                }
            }
        }
        //执行具体方法
        return methodInvocation.proceed();
    }

    /**
     *
     * @param methodInvocation
     * @return
     * @throws Exception
     */
    private Object handleGlobalLock(final MethodInvocation methodInvocation) throws Exception {
        //
        return globalLockTemplate.execute(() -> {
            try {
                return methodInvocation.proceed();
            } catch (Exception e) {
                throw e;
            } catch (Throwable e) {
                throw new RuntimeException(e);
            }
        });
    }

    /**
     * 核心代理本质通过transactionalTemplate来实现
     * @param methodInvocation
     * @param globalTrxAnno
     * @return
     * @throws Throwable
     */
    private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
        final GlobalTransactional globalTrxAnno) throws Throwable {
        boolean succeed = true;
        try {
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {
                    //执行原方法
                    return methodInvocation.proceed();
                }

                public String name() {
                    String name = globalTrxAnno.name();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }

                @Override
                public TransactionInfo getTransactionInfo() {
                    //根据注解中设定信息构建TransactionInfo transactionalTemplate中需要使用
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
                    transactionInfo.setName(name());
                    transactionInfo.setPropagation(globalTrxAnno.propagation());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();

                    for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                //在事务哪阶段发生了异常 根据不同异常分支走不同代码
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    //第一阶段发生异常
                    succeed = false;
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    succeed = false;
                    //commit发生异常
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    //回滚时发生异常
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                case RollbackRetrying:
                    //回滚重试异常
                    failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                default:
                    //其它异常
                    throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
            }
        } finally {
            if (degradeCheck) {
                EVENT_BUS.post(new DegradeCheckEvent(succeed));
            }
        }
    }

    public <T extends Annotation> T getAnnotation(Method method, Class<?> targetClass, Class<T> annotationClass) {
        return Optional.ofNullable(method).map(m -> m.getAnnotation(annotationClass))
            .orElse(Optional.ofNullable(targetClass).map(t -> t.getAnnotation(annotationClass)).orElse(null));
    }

    //构建该方法唯一名称 避免方法重载使用入参
    private String formatMethod(Method method) {
        StringBuilder sb = new StringBuilder(method.getName()).append("(");
        //方法执行参数类型
        Class<?>[] params = method.getParameterTypes();
        int in = 0;
        for (Class<?> clazz : params) {
            sb.append(clazz.getName());
            if (++in < params.length) {
                sb.append(", ");
            }
        }
        return sb.append(")").toString();
    }

    /**
     * 监听ConfigurationChangeEvent事件 只针对于disable_global_transaction与client_degrade_check变更
     * @param event the event
     */
    @Override
    public void onChangeEvent(ConfigurationChangeEvent event) {
        if (ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION.equals(event.getDataId())) {
            LOGGER.info("{} config changed, old value:{}, new value:{}", ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                disable, event.getNewValue());
            disable = Boolean.parseBoolean(event.getNewValue().trim());
        } else if (ConfigurationKeys.CLIENT_DEGRADE_CHECK.equals(event.getDataId())) {
            degradeCheck = Boolean.parseBoolean(event.getNewValue());
            if (!degradeCheck) {
                degradeNum = 0;
            }
        }
    }

    /**
     * auto upgrade service detection
     */
    private static void startDegradeCheck() {

        executor.scheduleAtFixedRate(() -> {
            if (degradeCheck) {
                try {
                    String xid = TransactionManagerHolder.get().begin(null, null, "degradeCheck", 60000);
                    TransactionManagerHolder.get().commit(xid);
                    EVENT_BUS.post(new DegradeCheckEvent(true));
                } catch (Exception e) {
                    EVENT_BUS.post(new DegradeCheckEvent(false));
                }
            }
        }, degradeCheckPeriod, degradeCheckPeriod, TimeUnit.MILLISECONDS);
    }

    @Subscribe
    public static void onDegradeCheck(DegradeCheckEvent event) {
        if (event.isRequestSuccess()) {
            if (degradeNum >= degradeCheckAllowTimes) {
                reachNum++;
                if (reachNum >= degradeCheckAllowTimes) {
                    reachNum = 0;
                    degradeNum = 0;
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("the current global transaction has been restored");
                    }
                }
            } else if (degradeNum != 0) {
                degradeNum = 0;
            }
        } else {
            if (degradeNum < degradeCheckAllowTimes) {
                degradeNum++;
                if (degradeNum >= degradeCheckAllowTimes) {
                    if (LOGGER.isWarnEnabled()) {
                        LOGGER.warn("the current global transaction has been automatically downgraded");
                    }
                }
            } else if (reachNum != 0) {
                reachNum = 0;
            }
        }
    }
}

从上述源码中invoke方法可知,本质通过TransactionalTemplate的execute来执行真正的流程,如下所示:

1.4:TransactionalTemplate(事务执行模版)

该类封装了事务执行的

public class TransactionalTemplate {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalTemplate.class);


    /**
     * Execute object.
     *
     * @param business the business
     * @return the object
     * @throws TransactionalExecutor.ExecutionException the execution exception
     */
    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1 获取GlobalTransactionalInterceptor中根据注解封装的TransactionInfo类
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1创建一个全局事务 默认为DefaultGlobalTransaction 感觉当前上下文中是否包含一个xid
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 1.2 处理不同的事务传播级别和branchType
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            switch (propagation) {
                //处理不同的事务传播级别
                case NOT_SUPPORTED:
                    suspendedResourcesHolder = tx.suspend(true);
                    return business.execute();
                case REQUIRES_NEW:
                    suspendedResourcesHolder = tx.suspend(true);
                    break;
                case SUPPORTS:
                    if (!existingTransaction()) {
                        //如果已经存在事务直接执行 不创建事务
                        return business.execute();
                    }
                    break;
                case REQUIRED:
                    break;
                case NEVER:
                    if (existingTransaction()) {
                        //如果已经存在事务
                        throw new TransactionException(
                                String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"
                                        ,RootContext.getXID()));
                    } else {
                        return business.execute();
                    }
                case MANDATORY:
                    if (!existingTransaction()) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }


            try {

                // 2. 开始 Transaction
                beginTransaction(txInfo, tx);

                Object rs = null;
                try {

                    // 执行业务代码
                    rs = business.execute();

                } catch (Throwable ex) {

                    //3 在业务代码执行若发生异常 判定抛出的异常是否需要被回滚
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                // 4.所有方法都以准备完成 commit事务
                commitTransaction(tx);
                //返回结果集
                return rs;
            } finally {
                //5. 清除
                triggerAfterCompletion();
                cleanUp();
            }
        } finally {
            tx.resume(suspendedResourcesHolder);
        }

    }

    public boolean existingTransaction() {
        return StringUtils.isNotEmpty(RootContext.getXID());

    }



    private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
        //roll back
        if (txInfo != null && txInfo.rollbackOn(originalException)) {
            try {
                //需要进行回滚
                rollbackTransaction(tx, originalException);
            } catch (TransactionException txe) {
                //回滚失败 抛出RollbackFailure类型异常 由GlobalTransactionalInterceptor处理
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                        TransactionalExecutor.Code.RollbackFailure, originalException);
            }
        } else {
            //这个异常不需要进行 回滚 直接提交
            commitTransaction(tx);
        }
    }

    /**
     * 提交事务
     * @param tx
     * @throws TransactionalExecutor.ExecutionException
     */
    private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            //执行commit的之前钩子函数
            triggerBeforeCommit();
            tx.commit();
            //执行commit的after钩子函数
            triggerAfterCommit();
        } catch (TransactionException txe) {
            // 4.1 事务提交失败
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.CommitFailure);
        }
    }

    /**
     * 回滚一个事务
     * @param tx
     * @param originalException
     * @throws TransactionException
     * @throws TransactionalExecutor.ExecutionException
     */
    private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
        //执行rollback的之前钩子函数
        triggerBeforeRollback();
        tx.rollback();
        //执行rollback的之后钩子函数
        triggerAfterRollback();
        // 3.1 Successfully rolled back
        throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
            ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
    }

    /**
     * 开始一个事务
     * @param txInfo
     * @param tx
     * @throws TransactionalExecutor.ExecutionException
     */
    private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            //执行beginTransaction的before钩子函数
            triggerBeforeBegin();
            //底层是通过GlobalTransaction来执行
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            //执行beginTransaction的After钩子函数
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);

        }
    }

    private void triggerBeforeBegin() {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.beforeBegin();
            } catch (Exception e) {
                LOGGER.error("Failed execute beforeBegin in hook {}",e.getMessage(),e);
            }
        }
    }

    private void triggerAfterBegin() {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.afterBegin();
            } catch (Exception e) {
                LOGGER.error("Failed execute afterBegin in hook {}",e.getMessage(),e);
            }
        }
    }

    private void triggerBeforeRollback() {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.beforeRollback();
            } catch (Exception e) {
                LOGGER.error("Failed execute beforeRollback in hook {}",e.getMessage(),e);
            }
        }
    }

    private void triggerAfterRollback() {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.afterRollback();
            } catch (Exception e) {
                LOGGER.error("Failed execute afterRollback in hook {}",e.getMessage(),e);
            }
        }
    }

    private void triggerBeforeCommit() {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.beforeCommit();
            } catch (Exception e) {
                LOGGER.error("Failed execute beforeCommit in hook {}",e.getMessage(),e);
            }
        }
    }

    private void triggerAfterCommit() {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.afterCommit();
            } catch (Exception e) {
                LOGGER.error("Failed execute afterCommit in hook {}",e.getMessage(),e);
            }
        }
    }

    private void triggerAfterCompletion() {
        for (TransactionHook hook : getCurrentHooks()) {
            try {
                hook.afterCompletion();
            } catch (Exception e) {
                LOGGER.error("Failed execute afterCompletion in hook {}",e.getMessage(),e);
            }
        }
    }

    private void cleanUp() {
        TransactionHookManager.clear();
    }

    private List<TransactionHook> getCurrentHooks() {
        return TransactionHookManager.getHooks();
    }

}

从上述源码可以看出对于全局事务的提交回滚都是通过GlobalTransaction接口来实现的

1.4:GlobalTransaction

该接口提供了事务有关的所有方法,具体的实现为DefaultGlobalTransaction

public interface GlobalTransaction {

    /**
     * 使用默认超时和名称开始新的全局事务
     *
     */
    void begin() throws TransactionException;

    /**
     * 使用给定的超时和默认名称开始新的全局事务。
     *
     */
    void begin(int timeout) throws TransactionException;

    /**
     *使用给定的超时和给定的名称开始新的全局事务。
     *
     */
    void begin(int timeout, String name) throws TransactionException;

    /**
     * 提交全局事务。
     *
     */
    void commit() throws TransactionException;

    /**
     * 回滚全局事务。
     *
     */
    void rollback() throws TransactionException;

    /**
     * 暂停全局事务。
     *
     */
    SuspendedResourcesHolder suspend(boolean unbindXid) throws TransactionException;

    /**
     * 恢复全局事务。
     *
     */
    void resume(SuspendedResourcesHolder suspendedResourcesHolder) throws TransactionException;

    /**
     *向TC询问相应全局事务的当前状态。
     *
     */
    GlobalStatus getStatus() throws TransactionException;

    /**
     * 获取 XID.
     *
     * @return XID. xid
     */
    String getXid();

    /**
     *
     * 向tc报告全局事务状态
     *
     */
    void globalReport(GlobalStatus globalStatus) throws TransactionException;

    /**
     * 全局事务的本地状态。
     *
     */
    GlobalStatus getLocalStatus();
}

1.4:DefaultGlobalTransaction

public class DefaultGlobalTransaction implements GlobalTransaction {

    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultGlobalTransaction.class);

    //默认全局事务执行的超时事件
    private static final int DEFAULT_GLOBAL_TX_TIMEOUT = 60000;

    private static final String DEFAULT_GLOBAL_TX_NAME = "default";

    private TransactionManager transactionManager;

    //全局xid
    private String xid;

    //全局状态
    private GlobalStatus status;

    //当前执行流程在全局事务中的角色 Launcher 或Participant
    private GlobalTransactionRole role;

    private static final int COMMIT_RETRY_COUNT = ConfigurationFactory.getInstance().getInt(
        ConfigurationKeys.CLIENT_TM_COMMIT_RETRY_COUNT, DEFAULT_TM_COMMIT_RETRY_COUNT);

    private static final int ROLLBACK_RETRY_COUNT = ConfigurationFactory.getInstance().getInt(
        ConfigurationKeys.CLIENT_TM_ROLLBACK_RETRY_COUNT, DEFAULT_TM_ROLLBACK_RETRY_COUNT);

    /**
     * 实例化新的默认全局事务。
     */
    DefaultGlobalTransaction() {
        this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
    }

    /**
     * 实例化新的默认全局事务。
     *
     * @param xid    the xid
     * @param status the status
     * @param role   the role
     */
    DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {
        this.transactionManager = TransactionManagerHolder.get();
        this.xid = xid;
        this.status = status;
        this.role = role;
    }

    @Override
    public void begin() throws TransactionException {
        begin(DEFAULT_GLOBAL_TX_TIMEOUT);
    }

    @Override
    public void begin(int timeout) throws TransactionException {
        begin(timeout, DEFAULT_GLOBAL_TX_NAME);
    }

    @Override
    public void begin(int timeout, String name) throws TransactionException {
        //验证角色为Launcher 提交者
        if (role != GlobalTransactionRole.Launcher) {
            //作为一个参与者 上下文中肯定包含xid 若不包含抛出异常
            assertXIDNotNull();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        //验证xid是否为null 不为 null说明当前事务状态不对
        assertXIDNull();
        if (RootContext.getXID() != null) {
            throw new IllegalStateException();
        }
        //获取一个新的xid
        xid = transactionManager.begin(null, null, name, timeout);
        //变更当前事务状态
        status = GlobalStatus.Begin;
        //将这个xid写入到全局事务上下文中
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [{}]", xid);
        }

    }

    @Override
    public void commit() throws TransactionException {
        //如果是Participant参与者无法进行事务提交
        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of committing
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNotNull();
        //提交重试次数 默认5次
        int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
        try {
            //一直重试知道达到最大次数或commit成功
            while (retry > 0) {
                try {
                    //提交事务
                    status = transactionManager.commit(xid);
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                    //重试次数-1
                    retry--;
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global commit", ex);
                    }
                }
            }
        } finally {
            if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {
                //解绑xid
                suspend(true);
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] commit status: {}", xid, status);
        }

    }

    @Override
    public void rollback() throws TransactionException {

        if (role == GlobalTransactionRole.Participant) {
            // Participant has no responsibility of rollback
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        //验证xid是否有效
        assertXIDNotNull();
        //回滚重试次数 默认5次
        int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                    //获取回滚状态
                    status = transactionManager.rollback(xid);
                    break;
                } catch (Throwable ex) {
                    LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
                    retry--;
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global rollback", ex);
                    }
                }
            }
        } finally {
            if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {
                //暂停任务 解绑xid
                suspend(true);
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] rollback status: {}", xid, status);
        }
    }

    /**
     * 中断全局事务
     * @param unbindXid if true,suspend the global transaction.
     * @return
     * @throws TransactionException
     */
    @Override
    public SuspendedResourcesHolder suspend(boolean unbindXid) throws TransactionException {
        String xid = RootContext.getXID();
        if (StringUtils.isNotEmpty(xid) && unbindXid) {
            RootContext.unbind();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Suspending current transaction,xid = {}",xid);
            }
        } else {
            xid = null;
        }
        return new SuspendedResourcesHolder(xid);
    }

    /**
     * 继续全局事务
     * @param suspendedResourcesHolder the suspended resources to resume
     * @throws TransactionException
     */
    @Override
    public void resume(SuspendedResourcesHolder suspendedResourcesHolder) throws TransactionException {
        if (suspendedResourcesHolder == null) {
            return;
        }
        String xid = suspendedResourcesHolder.getXid();
        if (StringUtils.isNotEmpty(xid)) {
            RootContext.bind(xid);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Resumimg the transaction,xid = {}", xid);
            }
        }
    }

    /**
     * 获取TC中携有的全局事务状态
     * @return
     * @throws TransactionException
     */
    @Override
    public GlobalStatus getStatus() throws TransactionException {
        if (xid == null) {
            return GlobalStatus.UnKnown;
        }
        status = transactionManager.getStatus(xid);
        return status;
    }

    /**
     * 获取全局xid
     * @return
     */
    @Override
    public String getXid() {
        return xid;
    }

    @Override
    public void globalReport(GlobalStatus globalStatus) throws TransactionException {
        assertXIDNotNull();

        if (globalStatus == null) {
            throw new IllegalStateException();
        }

        //远程向tc报告当前事务状态
        status = transactionManager.globalReport(xid, globalStatus);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("[{}] report status: {}", xid, status);
        }

        if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {
            suspend(true);
        }
    }

    /**
     * 获取本地持有的全局事务状态
     * @return
     */
    @Override
    public GlobalStatus getLocalStatus() {
        return status;
    }

    private void assertXIDNotNull() {
        if (xid == null) {
            throw new IllegalStateException();
        }
    }

    private void assertXIDNull() {
        if (xid != null) {
            throw new IllegalStateException();
        }
    }

}

1.4:DefaultTransactionManager

用于管理TM与TC之间交互方法

public class DefaultTransactionManager implements TransactionManager {

    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        //构建请求参数
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        //同步发送begin netty请求
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        //判断返回状态
        if (response.getResultCode() == ResultCode.Failed) {
            throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        //获取全局xid
        return response.getXid();
    }

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        //构建请求参数
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        //同步发送commit netty请求
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        //返回当前全局事务状态
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        //构建请求参数
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setXid(xid);
        //同步发送rollback netty请求
        GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
        //返回当前全局事务状态
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus getStatus(String xid) throws TransactionException {
        GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
        queryGlobalStatus.setXid(xid);
        GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException {
        GlobalReportRequest globalReport = new GlobalReportRequest();
        globalReport.setXid(xid);
        globalReport.setGlobalStatus(globalStatus);
        //上报当前分支事务状态
        GlobalReportResponse response = (GlobalReportResponse) syncCall(globalReport);
        return response.getGlobalStatus();
    }

    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
        } catch (TimeoutException toe) {
            throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
        }
    }
}

 

从上述代码可以看出通过sendSyncRequest()来发送netty请求,sendSyncRequest为TmNettyRemotingClient父类AbstractNettyRemotingClient方法。

1.5:AbstractNettyRemotingClient

public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNettyRemotingClient.class);
    private static final String MSG_ID_PREFIX = "msgId:";
    private static final String FUTURES_PREFIX = "futures:";
    private static final String SINGLE_LOG_POSTFIX = ";";
    private static final int MAX_MERGE_SEND_MILLS = 1;
    private static final String THREAD_PREFIX_SPLIT_CHAR = "_";

    private static final int MAX_MERGE_SEND_THREAD = 1;
    private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
    private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L;
    private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;
    private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend";
    protected final Object mergeLock = new Object();

    /**
     * When sending message type is {@link MergeMessage}, will be stored to mergeMsgMap.
     */
    protected final Map<Integer, MergeMessage> mergeMsgMap = new ConcurrentHashMap<>();

    /**
     * When batch sending is enabled, the message will be stored to basketMap
     * Send via asynchronous thread {@link MergedSendRunnable}
     * {@link NettyClientConfig#isEnableClientBatchSendRequest}
     */
    protected final ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap<>();

    private final NettyClientBootstrap clientBootstrap;
    private NettyClientChannelManager clientChannelManager;
    private final NettyPoolKey.TransactionRole transactionRole;
    private ExecutorService mergeSendExecutorService;
    private TransactionMessageHandler transactionMessageHandler;

    @Override
    public void init() {
        //定义周期延时任务默认10s 该任务用于TM与TC的channel的连接检测 对于断的Channel进行重连
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                clientChannelManager.reconnect(getTransactionServiceGroup());
            }
        }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);

        //是否配置开启transport.enableClientBatchSendRequest即客户端事务消息请求是否批量合并发送 默认为true
        //批量发送的原理使用 等待(汇集数据)唤醒(发送数据)机制 + CompletableFuture获取异步发送回调方法
        if (NettyClientConfig.isEnableClientBatchSendRequest()) {
            //定义线程pool 默认1个线程
            mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                MAX_MERGE_SEND_THREAD,
                KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
            //运行MergedSendRunnable任务即合并发送请求
            mergeSendExecutorService.submit(new MergedSendRunnable());
        }
        super.init();
        //netty client 启动
        clientBootstrap.start();
    }

    public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
                                       ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
        super(messageExecutor);
        this.transactionRole = transactionRole;
        clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);
        //设置netty handler 接受netty 结果返回结果集并写入到 MessageFuture的rs中
        clientBootstrap.setChannelHandlers(new ClientHandler());
        clientChannelManager = new NettyClientChannelManager(
            new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);
    }

    @Override
    public Object sendSyncRequest(Object msg) throws TimeoutException {

        //一般TC肯定为集群部署 通过负载均衡算法获取对应服务器地址
        String serverAddress = loadBalance(getTransactionServiceGroup());
        //获取请求超时时间 默认30s
        int timeoutMillis = NettyClientConfig.getRpcRequestTimeout();
        //构建请求信息
        RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);

        // send batch message
        // put message into basketMap, @see MergedSendRunnable
        //是否开启批量发送 默认开启 如果开启将信息缓存到basketMap 然后再统一发送
        // 在等待批量发送阶段时 这些线程对应任务都在阻塞等待 知道发送成功后才被唤醒
        if (NettyClientConfig.isEnableClientBatchSendRequest()) {

            // send batch message is sync request, needs to create messageFuture and put it in futures.
            //发送批处理消息是同步请求,需要创建messageFuture(本质通过CompletableFuture实现)并将其放入futures缓存中。
            MessageFuture messageFuture = new MessageFuture();
            messageFuture.setRequestMessage(rpcMessage);
            messageFuture.setTimeout(timeoutMillis);
            futures.put(rpcMessage.getId(), messageFuture);

            // 数据写入到basketMap 缓存中
            ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
            //获取请求TC地址对应的任务队列
            BlockingQueue<RpcMessage> basket = map.get(serverAddress);
            //为每一个TC地址创建一个发送任务队列
            if (basket == null) {
                //数据写入
                map.putIfAbsent(serverAddress, new LinkedBlockingQueue<>());
                basket = map.get(serverAddress);
            }
            //写入队列
            basket.offer(rpcMessage);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("offer message: {}", rpcMessage.getBody());
            }
            //判断是否是否可以发送
            if (!isSending) {
                //唤醒mergeLock 锁下阻塞等待数据的MergedSendRunnable(一次会将堆积的数据全部请求发送掉 堆积的数据大小为上次MergedSendRunnable发送时间)
                synchronized (mergeLock) {
                    mergeLock.notifyAll();
                }
            }

            try {
                //在超时时间内阻塞等待结果
                return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
            } catch (Exception exx) {
                LOGGER.error("wait response error:{},ip:{},request:{}",
                    exx.getMessage(), serverAddress, rpcMessage.getBody());
                if (exx instanceof TimeoutException) {
                    throw (TimeoutException) exx;
                } else {
                    throw new RuntimeException(exx);
                }
            }

        } else {

            //获取连接通道Netty Channel
            Channel channel = clientChannelManager.acquireChannel(serverAddress);
            //直接将数据发送
            return super.sendSync(channel, rpcMessage, timeoutMillis);
        }

    }

    @Override
    public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException {
        if (channel == null) {
            LOGGER.warn("sendSyncRequest nothing, caused by null channel.");
            return null;
        }
        RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
        return super.sendSync(channel, rpcMessage, NettyClientConfig.getRpcRequestTimeout());
    }


    @Override
    public void sendAsyncRequest(Channel channel, Object msg) {
        if (channel == null) {
            LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");
            return;
        }
        //构建请求 RpcMessage
        RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage
            ? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
            : ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);

        if (rpcMessage.getBody() instanceof MergeMessage) {
            //记录merge后消息缓存体
            mergeMsgMap.put(rpcMessage.getId(), (MergeMessage) rpcMessage.getBody());
        }
        super.sendAsync(channel, rpcMessage);
    }

    @Override
    public void sendAsyncResponse(String serverAddress, RpcMessage rpcMessage, Object msg) {
        RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, ProtocolConstants.MSGTYPE_RESPONSE);
        Channel channel = clientChannelManager.acquireChannel(serverAddress);
        super.sendAsync(channel, rpcMsg);
    }

    /**
     * 为每个messagetype 注册processor数据解析执行其器
     * @param requestCode
     * @param processor   {@link RemotingProcessor}
     * @param executor    thread pool
     */
    @Override
    public void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {
        Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
        this.processorTable.put(requestCode, pair);
    }

    @Override
    public void destroyChannel(String serverAddress, Channel channel) {
        clientChannelManager.destroyChannel(serverAddress, channel);
    }

    @Override
    public void destroy() {
        clientBootstrap.shutdown();
        if (mergeSendExecutorService != null) {
            mergeSendExecutorService.shutdown();
        }
        super.destroy();
    }

    public void setTransactionMessageHandler(TransactionMessageHandler transactionMessageHandler) {
        this.transactionMessageHandler = transactionMessageHandler;
    }

    public TransactionMessageHandler getTransactionMessageHandler() {
        return transactionMessageHandler;
    }

    public NettyClientChannelManager getClientChannelManager() {
        return clientChannelManager;
    }

    /**
     * 根据注册中心获取tx对应实际地址  并根据负载均衡算法从集群地址中选取任意一个地址
     * @param transactionServiceGroup tx-service-group 事务分组
     * @return
     */
    private String loadBalance(String transactionServiceGroup) {
        InetSocketAddress address = null;
        try {
            @SuppressWarnings("unchecked")
            //根据使用不同的注册中心获取tx对应实际地址()
            List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().lookup(transactionServiceGroup);

            //负载均衡算法(支持两种线性roundRobin与随机)从集群地址中选取任意一个地址
            address = LoadBalanceFactory.getInstance().select(inetSocketAddressList);
        } catch (Exception ex) {
            LOGGER.error(ex.getMessage());
        }
        if (address == null) {
            throw new FrameworkException(NoAvailableService);
        }
        //转化为标准http://ip+port地址
        return NetUtil.toStringAddress(address);
    }

    private String getThreadPrefix() {
        return AbstractNettyRemotingClient.MERGE_THREAD_PREFIX + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();
    }

    /**
     * Get pool key function.
     *
     * @return lambda function
     */
    protected abstract Function<String, NettyPoolKey> getPoolKeyFunction();

    /**
     * Get transaction service group.
     *
     * @return transaction service group
     */
    protected abstract String getTransactionServiceGroup();

    /**
     * 合并数据发送任务
     */
    private class MergedSendRunnable implements Runnable {

        @Override
        public void run() {
            //死循环
            while (true) {
                //
                synchronized (mergeLock) {
                    try {
                        //阻塞等待1s
                        mergeLock.wait(MAX_MERGE_SEND_MILLS);
                    } catch (InterruptedException e) {
                    }
                }
                isSending = true;
                //循环发送缓存basketMap中存储数据
                for (String address : basketMap.keySet()) {
                    BlockingQueue<RpcMessage> basket = basketMap.get(address);
                    if (basket.isEmpty()) {
                        continue;
                    }

                    //message 包装类 封装批量的RpcMessage
                    MergedWarpMessage mergeMessage = new MergedWarpMessage();
                    while (!basket.isEmpty()) {
                        RpcMessage msg = basket.poll();
                        mergeMessage.msgs.add((AbstractMessage) msg.getBody());
                        mergeMessage.msgIds.add(msg.getId());
                    }
                    if (mergeMessage.msgIds.size() > 1) {
                        //打印批量日志
                        printMergeMessageLog(mergeMessage);
                    }

                    Channel sendChannel = null;
                    try {
                        // send batch message is sync request, but there is no need to get the return value.
                        // Since the messageFuture has been created before the message is placed in basketMap,
                        // the return value will be obtained in ClientOnResponseProcessor.
                        //获取send channel
                        sendChannel = clientChannelManager.acquireChannel(address);
                        //发送批量同步请求 这里不需要获取返回值 而是通过messageFuture中的ClientOnResponseProcessor来返回
                        AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
                    } catch (FrameworkException e) {
                        //发送失败处理
                        if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
                            destroyChannel(address, sendChannel);
                        }
                        // fast fail
                        //发送merge消息失败 将之前futures缓存的messageId与future关系异常并将future回调结果置为null
                        for (Integer msgId : mergeMessage.msgIds) {
                            MessageFuture messageFuture = futures.remove(msgId);
                            if (messageFuture != null) {
                                messageFuture.setResultMessage(null);
                            }
                        }
                        LOGGER.error("client merge call failed: {}", e.getMessage(), e);
                    }
                }
                isSending = false;
            }
        }

        private void printMergeMessageLog(MergedWarpMessage mergeMessage) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("merge msg size:{}", mergeMessage.msgIds.size());
                for (AbstractMessage cm : mergeMessage.msgs) {
                    LOGGER.debug(cm.toString());
                }
                StringBuilder sb = new StringBuilder();
                for (long l : mergeMessage.msgIds) {
                    sb.append(MSG_ID_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);
                }
                sb.append("\n");
                for (long l : futures.keySet()) {
                    sb.append(FUTURES_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);
                }
                LOGGER.debug(sb.toString());
            }
        }
    }

    /**
     * The type ClientHandler.
     */
    @Sharable
    class ClientHandler extends ChannelDuplexHandler {

        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            if (!(msg instanceof RpcMessage)) {
                return;
            }
            //处理TC返回的message数据
            processMessage(ctx, (RpcMessage) msg);
        }

        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) {
            synchronized (lock) {
                if (ctx.channel().isWritable()) {
                    lock.notifyAll();
                }
            }
            ctx.fireChannelWritabilityChanged();
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            if (messageExecutor.isShutdown()) {
                return;
            }
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("channel inactive: {}", ctx.channel());
            }
            clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress()));
            super.channelInactive(ctx);
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
                if (idleStateEvent.state() == IdleState.READER_IDLE) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("channel {} read idle.", ctx.channel());
                    }
                    try {
                        String serverAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
                        clientChannelManager.invalidateObject(serverAddress, ctx.channel());
                    } catch (Exception exx) {
                        LOGGER.error(exx.getMessage());
                    } finally {
                        clientChannelManager.releaseChannel(ctx.channel(), getAddressFromContext(ctx));
                    }
                }
                if (idleStateEvent == IdleStateEvent.WRITER_IDLE_STATE_EVENT) {
                    try {
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("will send ping msg,channel {}", ctx.channel());
                        }
                        AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(), HeartbeatMessage.PING);
                    } catch (Throwable throwable) {
                        LOGGER.error("send request error: {}", throwable.getMessage(), throwable);
                    }
                }
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            LOGGER.error(FrameworkErrorCode.ExceptionCaught.getErrCode(),
                NetUtil.toStringAddress(ctx.channel().remoteAddress()) + "connect exception. " + cause.getMessage(), cause);
            clientChannelManager.releaseChannel(ctx.channel(), getAddressFromChannel(ctx.channel()));
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("remove exception rm channel:{}", ctx.channel());
            }
            super.exceptionCaught(ctx, cause);
        }

        @Override
        public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(ctx + " will closed");
            }
            super.close(ctx, future);
        }
    }
}
//单条记录发送
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
        if (timeoutMillis <= 0) {
            throw new FrameworkException("timeout should more than 0ms");
        }
        if (channel == null) {
            LOGGER.warn("sendSync nothing, caused by null channel.");
            return null;
        }

        MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeoutMillis);
        //写入缓存futures
        futures.put(rpcMessage.getId(), messageFuture);

        channelWritableCheck(channel, rpcMessage.getBody());

        //message数据写入netty channel 异步发送 注册回调listener
        channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
            //异步回调失败处理
            if (!future.isSuccess()) {
                //从futures结果集中移除
                MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
                if (messageFuture1 != null) {
                    //记录失败原因
                    messageFuture1.setResultMessage(future.cause());
                }
                //销毁关闭 channel
                destroyChannel(future.channel());
            }
        });

        try {
            //等待获取回调传回的
            return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
        } catch (Exception exx) {
            LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),
                rpcMessage.getBody());
            if (exx instanceof TimeoutException) {
                throw (TimeoutException) exx;
            } else {
                throw new RuntimeException(exx);
            }
        }
    }

从上文源码可以看到根据EnableClientBatchSendRequest配置,由client决定是否将数据合并发送,如果该设置会加大seata TM的整体吞吐量,但会损失响应时长。关于这个配置可感觉实际业务场景设定,如果是大量的分布式事务场景,可以设置为true(默认也为true),若是少量可以设置为false,加快响应时间。不论单条还是merge消息聚合发送,本质是通过netty异步发送message,注册对于的listener回调监听,注册ChannelHandlers(ClientHandler继承ChannelInboundHandlerAdapter,在client接受response时执行),ClientHandler中获取response的body,通过body消息类型获取初始化绑定消息解析器(默认为ClientOnResponseProcessor,对于解析器的绑定动作在TmNettyRemotingClient或RmNettyRemotingClient的初始化init中执行)。而在ClientOnResponseProcessor中处理消息后将数据写入到MessageFuture的setResult中,这个MessageFuture中包含CompletableFuture类(对于MessageFuture提供的get或者setResult本质都是对CompletableFuture进行操作,MessageFuture算是对CompletableFuture的包装类吧)。在发送线程发送message后,线程通过MessageFuture提供的get的进行阻塞等待异步回调结果,只有当ClientOnResponseProcessor中对于的message有消息到达即setResult被执行,发送线程才能获取最终值返回值。对于此处的方式就是一个netty异步发送的具体实现。对于此处功能详细的描述,可自行参考源码。 

class NettyClientChannelManager {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientChannelManager.class);

    //每一个channel对应的lock 保证每一个TC server对只会有一个channel 被创建
    private final ConcurrentMap<String, Object> channelLocks = new ConcurrentHashMap<>();

    //TC-server地址(ip+port)为key  与NettyPoolKey(reg信息)关系
    private final ConcurrentMap<String, NettyPoolKey> poolKeyMap = new ConcurrentHashMap<>();

    //缓存所有channel TC-server地址(ip+port)为key channel为value RM 或TM client与TC集群的任意节点只会缓存一个channel
    private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();

    //管理neety clinet key(reg信息)与channel 关系
    private final GenericKeyedObjectPool<NettyPoolKey, Channel> nettyClientKeyPool;

    //Tm 或RM中getPoolKeyFunction函数(reg信息) 映射 serverAddress 与NettyPoolKey关系
    private Function<String, NettyPoolKey> poolKeyFunction;
    
    NettyClientChannelManager(final NettyPoolableFactory keyPoolableFactory, final Function<String, NettyPoolKey> poolKeyFunction,
                                     final NettyClientConfig clientConfig) {
        //初始化 chnnel连接缓存pool 使用apache中连接pool
        nettyClientKeyPool = new GenericKeyedObjectPool<>(keyPoolableFactory);
        nettyClientKeyPool.setConfig(getNettyPoolConfig(clientConfig));
        this.poolKeyFunction = poolKeyFunction;
    }

    /**
     * 本地配置转化为GenericKeyedObjectPool 中连接pool设置
     * @param clientConfig
     * @return
     */
    private GenericKeyedObjectPool.Config getNettyPoolConfig(final NettyClientConfig clientConfig) {
        //设置chnnel pool
        GenericKeyedObjectPool.Config poolConfig = new GenericKeyedObjectPool.Config();
        //最大存活数
        poolConfig.maxActive = clientConfig.getMaxPoolActive();
        //最小空闲数
        poolConfig.minIdle = clientConfig.getMinPoolIdle();
        //最大等待连接时间
        poolConfig.maxWait = clientConfig.getMaxAcquireConnMills();
        //测试
        poolConfig.testOnBorrow = clientConfig.isPoolTestBorrow();

        poolConfig.testOnReturn = clientConfig.isPoolTestReturn();
        poolConfig.lifo = clientConfig.isPoolLifo();
        return poolConfig;
    }
    
    /**
     * 获取在当前Rpc客户端上注册的所有通道
     *
     * @return channels
     */
    ConcurrentMap<String, Channel> getChannels() {
        return channels;
    }
    
    /**
     * 获与TC的netty客户端channel
     *
     * @param serverAddress server address
     * @return netty channel
     */
    Channel acquireChannel(String serverAddress) {
        Channel channelToServer = channels.get(serverAddress);
        //已经存在
        if (channelToServer != null) {
            //获取alive的channel
            channelToServer = getExistAliveChannel(channelToServer, serverAddress);
            if (channelToServer != null) {
                return channelToServer;
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("will connect to " + serverAddress);
        }
        /*不存在channel 需要创建一个新的*/
        //创建一个chanel对应lock
        channelLocks.putIfAbsent(serverAddress, new Object());
        //加锁保证只会创建一个channel
        synchronized (channelLocks.get(serverAddress)) {
           //创建一个channel
            return doConnect(serverAddress);
        }
    }
    
    /**
     * Release channel to pool if necessary.
     * 从pool中释放channel
     * @param channel channel
     * @param serverAddress server address
     */
    void releaseChannel(Channel channel, String serverAddress) {
        if (channel == null || serverAddress == null) { return; }
        try {
            //加锁
            synchronized (channelLocks.get(serverAddress)) {
                Channel ch = channels.get(serverAddress);
                if (ch == null) {
                    nettyClientKeyPool.returnObject(poolKeyMap.get(serverAddress), channel);
                    return;
                }
                if (ch.compareTo(channel) == 0) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("return to pool, rm channel:{}", channel);
                    }
                    destroyChannel(serverAddress, channel);
                } else {
                    nettyClientKeyPool.returnObject(poolKeyMap.get(serverAddress), channel);
                }
            }
        } catch (Exception exx) {
            LOGGER.error(exx.getMessage());
        }
    }
    
    /**
     * 销毁 channel.
     *
     * @param serverAddress server address
     * @param channel channel
     */
    void destroyChannel(String serverAddress, Channel channel) {
        if (channel == null) { return; }
        try {
            //从缓存中移除
            if (channel.equals(channels.get(serverAddress))) {
                channels.remove(serverAddress);
            }

            nettyClientKeyPool.returnObject(poolKeyMap.get(serverAddress), channel);
        } catch (Exception exx) {
            LOGGER.error("return channel to rmPool error:{}", exx.getMessage());
        }
    }
    
    /**
     * Reconnect to remote server of current transaction service group.
     * 从新连接远程远程服务
     * @param transactionServiceGroup transaction service group
     */
    void reconnect(String transactionServiceGroup) {
        List<String> availList = null;
        try {
            //从注册中心获取有效服务地址
            availList = getAvailServerList(transactionServiceGroup);
        } catch (Exception e) {
            LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
            return;
        }
        if (CollectionUtils.isEmpty(availList)) {
            //error 日志打印
            String serviceGroup = RegistryFactory.getInstance()
                                                 .getServiceGroup(transactionServiceGroup);
            LOGGER.error("no available service '{}' found, please make sure registry config correct", serviceGroup);
            return;
        }
        for (String serverAddress : availList) {
            try {
                //重新构建channel
                acquireChannel(serverAddress);
            } catch (Exception e) {
                LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);
            }
        }
    }

    /**
     * 从pool中作废channel
     * @param serverAddress tc地址
     * @param channel
     * @throws Exception
     */
    void invalidateObject(final String serverAddress, final Channel channel) throws Exception {
        nettyClientKeyPool.invalidateObject(poolKeyMap.get(serverAddress), channel);
    }

    /**
     * 注册一个channel到本地缓存
     * @param serverAddress tc地址
     * @param channel
     */
    void registerChannel(final String serverAddress, final Channel channel) {
        //判断channel有效写入缓存
        if (channels.get(serverAddress) != null && channels.get(serverAddress).isActive()) {
            return;
        }
        channels.put(serverAddress, channel);
    }

    /**
     * 创建一个新的channel
     * @param serverAddress
     * @return
     */
    private Channel doConnect(String serverAddress) {
        //再次校验
        Channel channelToServer = channels.get(serverAddress);
        if (channelToServer != null && channelToServer.isActive()) {
            return channelToServer;
        }
        Channel channelFromPool;
        try {
            //获取reg NettyPoolKey
            NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
            //写入缓存
            NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);
            //校验是否为RM reg request请求 若是需要写入resourceIds
            if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {
                RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();
                ((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());
            }
            //写入pool中
            channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
            channels.put(serverAddress, channelFromPool);
        } catch (Exception exx) {
            LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx);
            throw new FrameworkException("can not register RM,err:" + exx.getMessage());
        }
        return channelFromPool;
    }
    
    private List<String> getAvailServerList(String transactionServiceGroup) throws Exception {
        //根据服务名称从注册中心中获取有效的服务信息列表
        List<InetSocketAddress> availInetSocketAddressList = RegistryFactory.getInstance()
                                                                            .lookup(transactionServiceGroup);
        if (CollectionUtils.isEmpty(availInetSocketAddressList)) {
            return Collections.emptyList();
        }
        //转化数据格式
        return availInetSocketAddressList.stream()
                                         .map(NetUtil::toStringAddress)
                                         .collect(Collectors.toList());
    }
    
    private Channel getExistAliveChannel(Channel rmChannel, String serverAddress) {
        if (rmChannel.isActive()) {
            return rmChannel;
        } else {
            int i = 0;
            //重新校验channel 是否alive(默认300次-共3s)
            for (; i < NettyClientConfig.getMaxCheckAliveRetry(); i++) {
                try {
                    //等待10ms
                    Thread.sleep(NettyClientConfig.getCheckAliveInternal());
                } catch (InterruptedException exx) {
                    LOGGER.error(exx.getMessage());
                }
                //重新校验
                rmChannel = channels.get(serverAddress);
                if (rmChannel != null && rmChannel.isActive()) {
                    return rmChannel;
                }
            }
            //警告 移除无效channel
            if (i == NettyClientConfig.getMaxCheckAliveRetry()) {
                LOGGER.warn("channel {} is not active after long wait, close it.", rmChannel);
                releaseChannel(rmChannel, serverAddress);
                return null;
            }
        }
        return null;
    }
}

管理与TC之间的channel的NettyClientChannelManager类

NettyPoolableFactory
public class NettyPoolableFactory implements KeyedPoolableObjectFactory<NettyPoolKey, Channel> {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyPoolableFactory.class);

    private final AbstractNettyRemotingClient rpcRemotingClient;

    private final NettyClientBootstrap clientBootstrap;

    /**
     * Instantiates a new Netty key poolable factory.
     *
     * @param rpcRemotingClient the rpc remoting client
     */
    public NettyPoolableFactory(AbstractNettyRemotingClient rpcRemotingClient, NettyClientBootstrap clientBootstrap) {
        this.rpcRemotingClient = rpcRemotingClient;
        this.clientBootstrap = clientBootstrap;
    }

    //构建一个新的channel
    @Override
    public Channel makeObject(NettyPoolKey key) {
        InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("NettyPool create channel to " + key);
        }
        Channel tmpChannel = clientBootstrap.getNewChannel(address);
        long start = System.currentTimeMillis();
        Object response;
        Channel channelToServer = null;
        if (key.getMessage() == null) {
            throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name());
        }
        try {
            response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage());

            if (!isRegisterSuccess(response, key.getTransactionRole())) {
                rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());
            } else {
                channelToServer = tmpChannel;
                rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage());
            }
        } catch (Exception exx) {
            if (tmpChannel != null) {
                tmpChannel.close();
            }
            throw new FrameworkException(
                "register " + key.getTransactionRole().name() + " error, errMsg:" + exx.getMessage());
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("register success, cost " + (System.currentTimeMillis() - start) + " ms, version:" + getVersion(
                response, key.getTransactionRole()) + ",role:" + key.getTransactionRole().name() + ",channel:"
                + channelToServer);
        }
        return channelToServer;
    }

    //判断是否channel 是否reg成功
    private boolean isRegisterSuccess(Object response, NettyPoolKey.TransactionRole transactionRole) {
        if (response == null) {
            return false;
        }
        if (transactionRole.equals(NettyPoolKey.TransactionRole.TMROLE)) {
            if (!(response instanceof RegisterTMResponse)) {
                return false;
            }
            RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;
            return registerTMResponse.isIdentified();
        } else if (transactionRole.equals(NettyPoolKey.TransactionRole.RMROLE)) {
            if (!(response instanceof RegisterRMResponse)) {
                return false;
            }
            RegisterRMResponse registerRMResponse = (RegisterRMResponse)response;
            return registerRMResponse.isIdentified();
        }
        return false;
    }

    private String getVersion(Object response, NettyPoolKey.TransactionRole transactionRole) {
        if (transactionRole.equals(NettyPoolKey.TransactionRole.TMROLE)) {
            return ((RegisterTMResponse) response).getVersion();
        } else {
            return ((RegisterRMResponse) response).getVersion();
        }
    }

    @Override
    public void destroyObject(NettyPoolKey key, Channel channel) throws Exception {
        if (channel != null) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("will destroy channel:" + channel);
            }
            channel.disconnect();
            channel.close();
        }
    }

    @Override
    public boolean validateObject(NettyPoolKey key, Channel obj) {
        if (obj != null && obj.isActive()) {
            return true;
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("channel valid false,channel:" + obj);
        }
        return false;
    }

    @Override
    public void activateObject(NettyPoolKey key, Channel obj) throws Exception {

    }

    @Override
    public void passivateObject(NettyPoolKey key, Channel obj) throws Exception {

    }
}

上述描述对channel管理,包含创建,重连,移除等动作

2.0:异常处理

在GlobalTransactionalInterceptor中描述handleGlobalTransaction()的正常流程,当transactionalTemplate.execute()发生了异常情况,根据同步的异常类型,seata有着不同的处理方式。处理异常类由FailureHandler接口体现,如下所示:

public interface FailureHandler {

    /**
     * On begin failure.
     *
     */
    void onBeginFailure(GlobalTransaction tx, Throwable cause);

    /**
     * On commit failure.
     *
     */
    void onCommitFailure(GlobalTransaction tx, Throwable cause);

    /**
     * On rollback failure.
     *
     */
    void onRollbackFailure(GlobalTransaction tx, Throwable originalException);

    /**
     * On rollback retrying
     *
     */
    void onRollbackRetrying(GlobalTransaction tx, Throwable originalException);
}

从上述源码可以看出,FailureHandler中封装了TM与TC交互中基本所有异常的异常处理流程,它的默认实现DefaultFailureHandlerImpl,在GlobalTransactionalInterceptor初始化时被指定,如下所示:

public class DefaultFailureHandlerImpl implements FailureHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultFailureHandlerImpl.class);

    /**
     * 重试最大时间默认1个小时 每次重试间隔时间10s 360次 共一个小时
     */
    private static final int RETRY_MAX_TIMES = 6 * 60;

    //计划间隔秒数 默认10s
    private static final long SCHEDULE_INTERVAL_SECONDS = 10;

    private static final long TICK_DURATION = 1;

    private static final int TICKS_PER_WHEEL = 8;

    //timer 定时时间轮用于定时检测TC中对于事务状态 
    private HashedWheelTimer timer = new HashedWheelTimer(
        new NamedThreadFactory("failedTransactionRetry", 1),
        TICK_DURATION, TimeUnit.SECONDS, TICKS_PER_WHEEL);

    @Override
    public void onBeginFailure(GlobalTransaction tx, Throwable cause) {
        //在begin阶段 无任何业务逻辑执行 无需重试
        LOGGER.warn("Failed to begin transaction. ", cause);
    }

    @Override
    public void onCommitFailure(GlobalTransaction tx, Throwable cause) {
        //在全局提交阶段 该阶段发生任何异常 不断测试TC中全局事务状态
        LOGGER.warn("Failed to commit transaction[" + tx.getXid() + "]", cause);
        timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.Committed), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
    }

    @Override
    public void onRollbackFailure(GlobalTransaction tx, Throwable originalException) {
        //在全局回滚阶段 该阶段发生任何异常 不断测试TC中全局事务状态
        LOGGER.warn("Failed to rollback transaction[" + tx.getXid() + "]", originalException);
        //定时器每隔10s进行Rollbacked状态检测
        timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.Rollbacked), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
    }

    @Override
    public void onRollbackRetrying(GlobalTransaction tx, Throwable originalException) {
        
        StackTraceLogger.warn(LOGGER, originalException, "Retrying to rollback transaction[{}]", new String[] {tx.getXid()});
        timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.RollbackRetrying), SCHEDULE_INTERVAL_SECONDS,
            TimeUnit.SECONDS);
    }

    /**
     * 异常重试
     */
    protected class CheckTimerTask implements TimerTask {

        private final GlobalTransaction tx;

        //确认状态
        private final GlobalStatus required;

        //记录重试次数
        private int count = 0;

        //重试标识 直到重试成功
        private boolean isStopped = false;

        protected CheckTimerTask(final GlobalTransaction tx, GlobalStatus required) {
            this.tx = tx;
            this.required = required;
        }

        @Override
        public void run(Timeout timeout) throws Exception {
            if (!isStopped) {
                //
                if (++count > RETRY_MAX_TIMES) {
                    //超过次数重新再来
                    LOGGER.error("transaction [{}] retry fetch status times exceed the limit [{} times]", tx.getXid(), RETRY_MAX_TIMES);
                    return;
                }
                //通过查询当前事务在TC中状态
                isStopped = shouldStop(tx, required);
                //不断重试
                timer.newTimeout(this, SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
            }
        }
    }

    private boolean shouldStop(final GlobalTransaction tx, GlobalStatus required) {
        try {
            //获取tc中该事务当前状态
            GlobalStatus status = tx.getStatus();
            LOGGER.info("transaction [{}] current status is [{}]", tx.getXid(), status);
            //当前全局事务状态为确认或终态时才能结束
            if (status == required || status == GlobalStatus.Finished) {
                return true;
            }
        } catch (TransactionException e) {
            LOGGER.error("fetch GlobalTransaction status error", e);
        }
        return false;
    }

}

2:seata客户端-RM(基于springcloud项目分析)

上述描述了TM大大致使用流程,在GlobalTransactionScanner初始化时一起被初始化。这里感觉有些服务可能不需要TC而只作为一个分支RM使用,所以这里个人感觉没必要两个都进行初始化,可以根据使用者的选择进行。

2.1:客户端RM client

rm与TM类似,在1.2与1.3版本中对于管理channel类有着不同的命名

public class RMClient {

    /**
     * 初始化
     *
     * @param applicationId           the application id
     * @param transactionServiceGroup the transaction service group
     */
    public static void init(String applicationId, String transactionServiceGroup) {
        //单列 获取RmNettyRemotingClient
        RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
        //设置DefaultResourceManager 单例模式 该类使用策略模式 携有BranchType类型对应ResouceManager
        rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
        //设置DefaultRMHandler 单例模式 该类使用策略模式 携有BranchType类型对应RMHandler
        // 针对不同类型client的分布式事务实现具体使用不同策略
        rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
        //初始化
        rmNettyRemotingClient.init();
    }

}

RmNettyRemotingClient与TmNettyRemotingClient都继承AbstractNettyRemotingClient了,所以RmNettyRemotingClient的初始化过程与TmNettyRemotingClient基本差不多,所以这里也不再叙述。

2.1:DefaultRMHandler

携有BranchType类型对应RMHandler 针对不同类型client的分布式事务实现具体使用不同策略

public class DefaultRMHandler extends AbstractRMHandler {

    //记录BranchType(AT XA TCC SAGA) 与其对应4种RMHandler
    protected static Map<BranchType, AbstractRMHandler> allRMHandlersMap
        = new ConcurrentHashMap<BranchType, AbstractRMHandler>();

    protected DefaultRMHandler() {
        initRMHandlers();
    }

    protected void initRMHandlers() {
        List<AbstractRMHandler> allRMHandlers = EnhancedServiceLoader.loadAll(AbstractRMHandler.class);
        if (CollectionUtils.isNotEmpty(allRMHandlers)) {
            for (AbstractRMHandler rmHandler : allRMHandlers) {
                allRMHandlersMap.put(rmHandler.getBranchType(), rmHandler);
            }
        }
    }
    //针对 commit rollbakc undologdel 3种流程的handler方法
    
    @Override
    public BranchCommitResponse handle(BranchCommitRequest request) {
        return getRMHandler(request.getBranchType()).handle(request);
    }

    @Override
    public BranchRollbackResponse handle(BranchRollbackRequest request) {
        return getRMHandler(request.getBranchType()).handle(request);
    }

    @Override
    public void handle(UndoLogDeleteRequest request) {
        getRMHandler(request.getBranchType()).handle(request);
    }

    protected AbstractRMHandler getRMHandler(BranchType branchType) {
        return allRMHandlersMap.get(branchType);
    }

    @Override
    protected ResourceManager getResourceManager() {
        throw new FrameworkException("DefaultRMHandler isn't a real AbstractRMHandler");
    }

    private static class SingletonHolder {
        private static AbstractRMHandler INSTANCE = new DefaultRMHandler();
    }

    /**
     *单例获取DefaultRMHandler
     *
     * @return the resource manager
     */
    public static AbstractRMHandler get() {
        return DefaultRMHandler.SingletonHolder.INSTANCE;
    }

    @Override
    public BranchType getBranchType() {
        throw new FrameworkException("DefaultRMHandler isn't a real AbstractRMHandler");
    }
}

父抽象类 AbstractRMHandler封装具体执行流程,并调用底层ResourceManage执行RM数据层逻辑

public abstract class AbstractRMHandler extends AbstractExceptionHandler
    implements RMInboundHandler, TransactionMessageHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRMHandler.class);

    //模版方法模式
    @Override
    public BranchCommitResponse handle(BranchCommitRequest request) {
        BranchCommitResponse response = new BranchCommitResponse();
        //执行异常处理统一模版
        exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
            @Override
            public void execute(BranchCommitRequest request, BranchCommitResponse response)
                throws TransactionException {
                doBranchCommit(request, response);
            }
        }, request, response);
        return response;
    }

    @Override
    public BranchRollbackResponse handle(BranchRollbackRequest request) {
        BranchRollbackResponse response = new BranchRollbackResponse();
        exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
            @Override
            public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
                throws TransactionException {
                doBranchRollback(request, response);
            }
        }, request, response);
        return response;
    }

    /**
     * delete undo log 针对于AT下模式
     * @param request the request
     */
    @Override
    public void handle(UndoLogDeleteRequest request) {
        // https://github.com/seata/seata/issues/2226
    }

    /**
     * Do branch commit.
     *
     * @param request  the request
     * @param response the response
     * @throws TransactionException the transaction exception
     */
    protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
        throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
        }
        //根据不同ResourceManager 提交底层流程
        //获取该分支事务执行状态 后续上报TC

        BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch commit result: " + status);
        }

    }

    /**
     * Do branch rollback.
     *
     * @param request  the request
     * @param response the response
     * @throws TransactionException the transaction exception
     */
    protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
        throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
        }
        //获取该分支事务执行状态 后续上报TC

        BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Branch Rollbacked result: " + status);
        }
    }

    /**
     * get resource manager implement 
     * 对应4种分布式事务branch 模式 AT XA TCC SAGA 
     *
     * @return
     */
    protected abstract ResourceManager getResourceManager();

    @Override
    public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
        if (!(request instanceof AbstractTransactionRequestToRM)) {
            throw new IllegalArgumentException();
        }
        //
        AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
        transactionRequest.setRMInboundMessageHandler(this);

        return transactionRequest.handle(context);
    }

    @Override
    public void onResponse(AbstractResultMessage response, RpcContext context) {
        LOGGER.info("the rm client received response msg [{}] from tc server.", response.toString());
    }

    public abstract BranchType getBranchType();
}

 

2.2:DefaultResourceManager(ResourceManager)

该类的设计策略与DefaultRMHandler一样,也是使用策略设计模式,内部包含了4种(AT,XA,TCC,SAGA)分布式事务分支对应的具体ResourceManager,由handler中调用,具体执行数据层的分支事务执行。

public class DefaultResourceManager implements ResourceManager {

    /**
     * all resource managers
     */
    protected static Map<BranchType, ResourceManager> resourceManagers
        = new ConcurrentHashMap<>();

    private DefaultResourceManager() {
        initResourceManagers();
    }

    /**
     * Get resource manager.
     *
     * @return the resource manager
     */
    public static DefaultResourceManager get() {
        return SingletonHolder.INSTANCE;
    }

    /**
     * only for mock
     *
     * @param branchType
     * @param rm
     */
    public static void mockResourceManager(BranchType branchType, ResourceManager rm) {
        resourceManagers.put(branchType, rm);
    }

    protected void initResourceManagers() {
        //初始化所有ResourceManager 并写入缓存中
        List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);
        if (CollectionUtils.isNotEmpty(allResourceManagers)) {
            for (ResourceManager rm : allResourceManagers) {
                resourceManagers.put(rm.getBranchType(), rm);
            }
        }
    }

    //分支 resource commit 本质就是删除undo日志
    @Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId,
                                     String resourceId, String applicationData)
        throws TransactionException {
        return getResourceManager(branchType).branchCommit(branchType, xid, branchId, resourceId, applicationData);
    }

    @Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId,
                                       String resourceId, String applicationData)
        throws TransactionException {
        return getResourceManager(branchType).branchRollback(branchType, xid, branchId, resourceId, applicationData);
    }

    @Override
    public Long branchRegister(BranchType branchType, String resourceId,
                               String clientId, String xid, String applicationData, String lockKeys)
        throws TransactionException {
        return getResourceManager(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData,
            lockKeys);
    }

    @Override
    public void branchReport(BranchType branchType, String xid, long branchId, BranchStatus status,
                             String applicationData) throws TransactionException {
        getResourceManager(branchType).branchReport(branchType, xid, branchId, status, applicationData);
    }

    @Override
    public boolean lockQuery(BranchType branchType, String resourceId,
                             String xid, String lockKeys) throws TransactionException {
        return getResourceManager(branchType).lockQuery(branchType, resourceId, xid, lockKeys);
    }

    @Override
    public void registerResource(Resource resource) {
        getResourceManager(resource.getBranchType()).registerResource(resource);
    }

    @Override
    public void unregisterResource(Resource resource) {
        getResourceManager(resource.getBranchType()).unregisterResource(resource);
    }

    @Override
    public Map<String, Resource> getManagedResources() {
        Map<String, Resource> allResource = new HashMap<>();
        for (ResourceManager rm : resourceManagers.values()) {
            Map<String, Resource> tempResources = rm.getManagedResources();
            if (tempResources != null) {
                allResource.putAll(tempResources);
            }
        }
        return allResource;
    }

    /**
     * get ResourceManager by Resource Type
     *
     * @param branchType
     * @return
     */
    public ResourceManager getResourceManager(BranchType branchType) {
        ResourceManager rm = resourceManagers.get(branchType);
        if (rm == null) {
            throw new FrameworkException("No ResourceManager for BranchType:" + branchType.name());
        }
        return rm;
    }

    @Override
    public BranchType getBranchType() {
        throw new FrameworkException("DefaultResourceManager isn't a real ResourceManager");
    }

    private static class SingletonHolder {
        private static DefaultResourceManager INSTANCE = new DefaultResourceManager();
    }

}

这里以AT模式为主 介绍其DatasourceManager(它底层通过AsyncWorker进行异步事务执行)

public class DataSourceManager extends AbstractResourceManager implements Initialize {

    private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceManager.class);

    private ResourceManagerInbound asyncWorker;

    private Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();

    /**
     * Sets async worker.
     *
     * @param asyncWorker the async worker
     */
    public void setAsyncWorker(ResourceManagerInbound asyncWorker) {
        this.asyncWorker = asyncWorker;
    }

    /**
     * 锁查询 在拥有GlobalLock判断当前是否存在全局锁
     */
    @Override
    public boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys)
        throws TransactionException {
        try {
            //封装请求
            GlobalLockQueryRequest request = new GlobalLockQueryRequest();
            request.setXid(xid);
            request.setLockKey(lockKeys);
            request.setResourceId(resourceId);

            GlobalLockQueryResponse response = null;
            if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
                //请求TC 或者结果
                response = (GlobalLockQueryResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
            } else {
                throw new RuntimeException("unknow situation!");
            }

            if (response.getResultCode() == ResultCode.Failed) {
                throw new TransactionException(response.getTransactionExceptionCode(),
                    "Response[" + response.getMsg() + "]");
            }
            //是否被锁定
            return response.isLockable();
        } catch (TimeoutException toe) {
            throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
        } catch (RuntimeException rex) {
            throw new RmTransactionException(TransactionExceptionCode.LockableCheckFailed, "Runtime", rex);
        }

    }

    @Deprecated
    @SuppressWarnings("unchecked")
    private String loadBalance() {
        InetSocketAddress address = null;
        try {
            List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().lookup(
                TmNettyRemotingClient.getInstance().getTransactionServiceGroup());
            address = LoadBalanceFactory.getInstance().select(inetSocketAddressList);
        } catch (Exception ignore) {
            LOGGER.error(ignore.getMessage());
        }
        if (address == null) {
            throw new FrameworkException(NoAvailableService);
        }
        return NetUtil.toStringAddress(address);
    }

    /**
     * Init.
     *
     * @param asyncWorker the async worker
     */
    public synchronized void initAsyncWorker(ResourceManagerInbound asyncWorker) {
        setAsyncWorker(asyncWorker);
    }

    /**
     * Instantiates a new Data source manager.
     */
    public DataSourceManager() {
    }

    @Override
    public void init() {
        //创建AsyncWorker并初始化
        AsyncWorker asyncWorker = new AsyncWorker();
        asyncWorker.init();
        initAsyncWorker(asyncWorker);
    }

    //注册Resource代理对象 缓存在本地
    @Override
    public void registerResource(Resource resource) {
        DataSourceProxy dataSourceProxy = (DataSourceProxy) resource;
        //ResourceId 与DataSourceProxy映射关系
        dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);
        super.registerResource(dataSourceProxy);
    }

    @Override
    public void unregisterResource(Resource resource) {
        throw new NotSupportYetException("unregister a resource");
    }

    /**
     *获取datasource代理对象
     *
     * @param resourceId the resource id
     * @return the data source proxy
     */
    public DataSourceProxy get(String resourceId) {
        return (DataSourceProxy) dataSourceCache.get(resourceId);
    }

    @Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
        return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
    }
    
    //二阶段 分支回滚
    @Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
        //获取resource下对应DataSource代理对象
        DataSourceProxy dataSourceProxy = get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException();
        }
        try {
            //一般undo日志存在需要改变的事务数据源下 执行其undo数据
            UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
        } catch (TransactionException te) {
            StackTraceLogger.info(LOGGER, te,
                "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
                new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
        }
        //返回二阶段回滚
        return BranchStatus.PhaseTwo_Rollbacked;

    }

    @Override
    public Map<String, Resource> getManagedResources() {
        return dataSourceCache;
    }

    @Override
    public BranchType getBranchType() {
        return BranchType.AT;
    }

}
AsyncWorker异步执行分支事务(主要执行RM端的二次提交-删除undo日志):
public class AsyncWorker implements ResourceManagerInbound {

    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncWorker.class);

    //默认
    private static final int DEFAULT_RESOURCE_SIZE = 16;

    //避免在高并发下导致存在大量commit请求 一次删除过于庞大 所以定义一个循环下最大的undolog删除数量
    private static final int UNDOLOG_DELETE_LIMIT_SIZE = 1000;


    /**
     * 2阶段Context 包含commit所需要数据
     */
    private static class Phase2Context {

        /**
         * 实例化一个新的2阶段Context
         *
         * @param branchType      the branchType
         * @param xid             the xid
         * @param branchId        the branch id
         * @param resourceId      the resource id
         * @param applicationData the application data
         */
        public Phase2Context(BranchType branchType, String xid, long branchId, String resourceId,
                             String applicationData) {
            this.xid = xid;
            this.branchId = branchId;
            this.resourceId = resourceId;
            this.applicationData = applicationData;
            this.branchType = branchType;
        }

        /**
         * The Xid.
         */
        String xid;
        /**
         * The Branch id.
         */
        long branchId;
        /**
         * The Resource id.
         */
        String resourceId;
        /**
         * The Application data.
         */
        String applicationData;

        /**
         * the branch Type
         */
        BranchType branchType;
    }

    //异步提交 buffer数 默认10000
    private static int ASYNC_COMMIT_BUFFER_LIMIT = ConfigurationFactory.getInstance().getInt(
        CLIENT_ASYNC_COMMIT_BUFFER_LIMIT, DEFAULT_CLIENT_ASYNC_COMMIT_BUFFER_LIMIT);

    //异步提交阻塞队列存储需要提交Phase2Context
    private static final BlockingQueue<Phase2Context> ASYNC_COMMIT_BUFFER = new LinkedBlockingQueue<>(
        ASYNC_COMMIT_BUFFER_LIMIT);

    //分支提交
    @Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
        //新建Phase2Context存储队列
        if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
            LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid);
        }
        //返回中间状态
        return BranchStatus.PhaseTwo_Committed;
    }

    /**
     * Init. 在DataSourceManager中被初始化
     */
    public synchronized void init() {

        LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
        //定义调度器
        ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));
        //定时调度 没1s调度一次
        timerExecutor.scheduleAtFixedRate(() -> {
            try {
                //执行分支批量提交
                doBranchCommits();

            } catch (Throwable e) {
                LOGGER.info("Failed at async committing ... {}", e.getMessage());

            }
        }, 10, 1000 * 1, TimeUnit.MILLISECONDS);
    }

    private void doBranchCommits() {
        if (ASYNC_COMMIT_BUFFER.isEmpty()) {
            return;
        }

        //映射上下文 缓存resourceId 下多次commit请求
        Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
        //从buffer队列中获取 上文提交的Phase2Context数据 直到空队列
        while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
            Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
            //存在多数据源 多resourceId
            List<Phase2Context> contextsGroupedByResourceId = mappedContexts.computeIfAbsent(commitContext.resourceId, k -> new ArrayList<>());
            contextsGroupedByResourceId.add(commitContext);
        }
        //单独处理每一个resource 数据源
        for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
            Connection conn = null;
            DataSourceProxy dataSourceProxy;
            try {
                try {
                    //获取resource对应 dataSourceProxy 数据源DataSource代理对象
                    DataSourceManager resourceManager = (DataSourceManager) DefaultResourceManager.get()
                        .getResourceManager(BranchType.AT);
                    dataSourceProxy = resourceManager.get(entry.getKey());
                    if (dataSourceProxy == null) {
                        throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());
                    }
                    //获取数据库连接对象
                    conn = dataSourceProxy.getPlainConnection();
                } catch (SQLException sqle) {
                    LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
                    continue;
                }
                //获取需要执行的commit数据
                List<Phase2Context> contextsGroupedByResourceId = entry.getValue();
                //封装 xids 与branchIds
                Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
                Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
                for (Phase2Context commitContext : contextsGroupedByResourceId) {
                    xids.add(commitContext.xid);
                    branchIds.add(commitContext.branchId);
                    int maxSize = Math.max(xids.size(), branchIds.size());
                    //避免一次性批量删除过多数据  所以这里每过1000条执行一次该数据源下UndoLog清理
                    if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
                        try {
                            UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(
                                xids, branchIds, conn);
                        } catch (Exception ex) {
                            LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
                        }
                        xids.clear();
                        branchIds.clear();
                    }
                }

                if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
                    return;
                }

                try {
                    //说明目前数据存量未达到1000条标准 直接删除undolog
                    UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids,
                        branchIds, conn);
                } catch (Exception ex) {
                    LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
                }

                if (!conn.getAutoCommit()) {
                    conn.commit();
                }
            } catch (Throwable e) {
                LOGGER.error(e.getMessage(), e);
                try {
                    //执行失败回滚
                    conn.rollback();
                } catch (SQLException rollbackEx) {
                    LOGGER.warn("Failed to rollback JDBC resource while deleting undo_log ", rollbackEx);
                }
            } finally {
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (SQLException closeEx) {
                        LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
                    }
                }
            }
        }
    }

    @Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
        throw new NotSupportYetException();

    }
}

这里可以看出如果在执行undo log批量删除的时候发送错误,导致数据回滚,从而导致undo日志无法删除。这样存储数据堆积风险。seta是通过TC定时发送undo log删除命令给RM做到这些数据的清除,详细参考RmUndoLogProcessor

 

2.3:处理TC传来的Message 的Processor

 

根据消息类型 (类型可详细参考常量类MessageType )的不同选择使用不同Processor与执行线程,对于这些Processor的注册与TM的描述一致。RmNettyRemotingClient中init中执行。初始化过程如下所示:

 private void registerProcessor() {
        // 1.registry rm client handle branch commit processor
        RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
        // 2.registry rm client handle branch rollback processor
        RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
        // 3.registry rm handler undo log processor
        RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
        // 4.registry TC response processor
        ClientOnResponseProcessor onResponseProcessor =
            new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
        // 5.registry heartbeat message processor
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
    }

2.3.1:处理commit的RmBranchCommitProcessor

处理分支commit的processor,实际就是接受TM发起的二阶段commit,本质就是删除undo日志

public class RmBranchCommitProcessor implements RemotingProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(RmBranchCommitProcessor.class);

    private TransactionMessageHandler handler;

    private RemotingClient remotingClient;

    public RmBranchCommitProcessor(TransactionMessageHandler handler, RemotingClient remotingClient) {
        this.handler = handler;
        this.remotingClient = remotingClient;
    }

    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        //获取TC远程地址
        String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
        //获取返回数据
        Object msg = rpcMessage.getBody();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("rm client handle branch commit process:" + msg);
        }
        //执行分支commit
        handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);
    }

    private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {
        BranchCommitResponse resultMessage;
        //通过本地branch事务manage执行本地事务
        resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("branch commit result:" + resultMessage);
        }
        try {
            //异步向TC汇报本地分支事务执行结果
            this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
        } catch (Throwable throwable) {
            LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);
        }
    }
}

具体handler执行由上文 DefaultRMHandler执行。拿到最终分支执行事务执行结果上报给TC,由TC决定整体事务流程。

2.3.2:处理rollback的RmBranchRollbackProcessor

处理分支rollback的processor,实际就是接受TM发起的二阶段rollback,本质就是执行undo日志,达到回滚目的

public class RmBranchRollbackProcessor implements RemotingProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(RmBranchRollbackProcessor.class);

    private TransactionMessageHandler handler;

    private RemotingClient remotingClient;

    public RmBranchRollbackProcessor(TransactionMessageHandler handler, RemotingClient remotingClient) {
        this.handler = handler;
        this.remotingClient = remotingClient;
    }

    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        //获取TC地址
        String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
        Object msg = rpcMessage.getBody();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("rm handle branch rollback process:" + msg);
        }
        handleBranchRollback(rpcMessage, remoteAddress, (BranchRollbackRequest) msg);
    }

    private void handleBranchRollback(RpcMessage request, String serverAddress, BranchRollbackRequest branchRollbackRequest) {
        BranchRollbackResponse resultMessage;
        //本地执行undo 回滚 底层调用链路 handler-》resourceManager
        resultMessage = (BranchRollbackResponse) handler.onRequest(branchRollbackRequest, null);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("branch rollback result:" + resultMessage);
        }
        try {
            //发送TC分支执行回滚结果
            this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
        } catch (Throwable throwable) {
            LOGGER.error("send response error: {}", throwable.getMessage(), throwable);
        }
    }
}
 

2.3.3:处理undoLog的RmUndoLogProcessor

处理TC发起的undo log删除命令

/**
 * 处理TC undo log delete命令
 * {@link UndoLogDeleteRequest}
 *
 */
public class RmUndoLogProcessor implements RemotingProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(RmUndoLogProcessor.class);

    private TransactionMessageHandler handler;

    public RmUndoLogProcessor(TransactionMessageHandler handler) {
        this.handler = handler;
    }

    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        //获取tc发送的UndoLogDeleteRequest数据
        Object msg = rpcMessage.getBody();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("rm handle undo log process:" + msg);
        }
        //执行undo log删除命令
        handleUndoLogDelete((UndoLogDeleteRequest) msg);
    }

    private void handleUndoLogDelete(UndoLogDeleteRequest undoLogDeleteRequest) {
        try {
            //底层调用链路 直接在RMHandlerAT中执行
            handler.onRequest(undoLogDeleteRequest, null);
        } catch (Exception e) {
            LOGGER.error("Failed to delete undo log by undoLogDeleteRequest on" + undoLogDeleteRequest.getResourceId());
        }
    }
}
RMHandlerAT:处理AT模式下的handler
public class RMHandlerAT extends AbstractRMHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(RMHandlerAT.class);

    private static final int LIMIT_ROWS = 3000;

    /**
     * 处理UndoLogDeleteRequest 请求
     * @param request the request
     */
    @Override
    public void handle(UndoLogDeleteRequest request) {
        //获取需要处理的DataSource 代理对象
        DataSourceManager dataSourceManager = (DataSourceManager)getResourceManager();
        DataSourceProxy dataSourceProxy = dataSourceManager.get(request.getResourceId());
        if (dataSourceProxy == null) {
            LOGGER.warn("Failed to get dataSourceProxy for delete undolog on {}", request.getResourceId());
            return;
        }
        //获取当前时间对应前SaveDays天数(默认7天)
        Date logCreatedSave = getLogCreated(request.getSaveDays());
        Connection conn = null;
        try {
            conn = dataSourceProxy.getPlainConnection();
            //记录被删除rows
            int deleteRows = 0;
            do {
                try {
                    //删除undo表crate时间小于指定天数前3000条数据
                    deleteRows = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType())
                            .deleteUndoLogByLogCreated(logCreatedSave, LIMIT_ROWS, conn);

                    if (deleteRows > 0 && !conn.getAutoCommit()) {
                        //手动commit
                        conn.commit();
                    }
                } catch (SQLException exx) {
                    if (deleteRows > 0 && !conn.getAutoCommit()) {
                        conn.rollback();
                    }
                    throw exx;
                }
                //每次删除3000 直到数据被删除干净
            } while (deleteRows == LIMIT_ROWS);
        } catch (Exception e) {
            LOGGER.error("Failed to delete expired undo_log, error:{}", e.getMessage(), e);
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException closeEx) {
                    LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
                }
            }
        }
    }

    //获取删除undo条件 时间
    private Date getLogCreated(int saveDays) {
        if (saveDays <= 0) {
            saveDays = UndoLogDeleteRequest.DEFAULT_SAVE_DAYS;
        }
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.DATE, -saveDays);
        return calendar.getTime();
    }

    /**
     * get AT resource managerDataSourceManager.java
     *
     * @return
     */
    @Override
    protected ResourceManager getResourceManager() {
        return DefaultResourceManager.get().getResourceManager(BranchType.AT);
    }

    @Override
    public BranchType getBranchType() {
        return BranchType.AT;
    }

}

这样就可以保证RM中undo表的体积不会因为异步删除的原因导致体量变大

2.3.4:处理RM的response消息的ClientOnResponseProcessor

与TM中的ClientOnResponseProcessor功能一致

public class ClientOnResponseProcessor implements RemotingProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(ClientOnResponseProcessor.class);

    /**
     * 缓存message Id 与merge消息之间映射关系 由AbstractNettyRemotingClient存储
     */
    private Map<Integer, MergeMessage> mergeMsgMap;

    /**
     * 缓存每一条message Id(如果是merge中是其中每一条消息) 与MessageFuture映射关系 由AbstractNettyRemoting存储
     */
    private ConcurrentMap<Integer, MessageFuture> futures;

    /**
     * To handle the received RPC message on upper level.
     *
     */
    private TransactionMessageHandler transactionMessageHandler;

    public ClientOnResponseProcessor(Map<Integer, MergeMessage> mergeMsgMap,
                                     ConcurrentHashMap<Integer, MessageFuture> futures,
                                     TransactionMessageHandler transactionMessageHandler) {
        this.mergeMsgMap = mergeMsgMap;
        this.futures = futures;
        this.transactionMessageHandler = transactionMessageHandler;
    }

    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        //判断是否是聚合发送消息
        if (rpcMessage.getBody() instanceof MergeResultMessage) {
            //获取结果
            MergeResultMessage results = (MergeResultMessage) rpcMessage.getBody();
            //移除缓存MergeResultMessage集合
            MergedWarpMessage mergeMessage = (MergedWarpMessage) mergeMsgMap.remove(rpcMessage.getId());
            for (int i = 0; i < mergeMessage.msgs.size(); i++) {
                //处理每一条数据 结果写入future中
                int msgId = mergeMessage.msgIds.get(i);
                MessageFuture future = futures.remove(msgId);
                if (future == null) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("msg: {} is not found in futures.", msgId);
                    }
                } else {
                    //写回结果 发起请求方阻塞等待结果
                    future.setResultMessage(results.getMsgs()[i]);
                }
            }
        } else {
            //非聚合单条消息
            MessageFuture messageFuture = futures.remove(rpcMessage.getId());
            if (messageFuture != null) {
                messageFuture.setResultMessage(rpcMessage.getBody());
            } else {
                if (rpcMessage.getBody() instanceof AbstractResultMessage) {
                    if (transactionMessageHandler != null) {
                        transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);
                    }
                }
            }
        }
    }
}

2.3.5:处理RM的heartbeat消息的ClientHeartbeatProcessor

处理TC心跳检测的processor

public class ClientHeartbeatProcessor implements RemotingProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(ClientHeartbeatProcessor.class);

    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        //接受TC PONG请求
        if (rpcMessage.getBody() == HeartbeatMessage.PONG) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("received PONG from {}", ctx.channel().remoteAddress());
            }
        }
    }
}

AT模式下整体执行流程图

 


版权声明:本文为F_Hello_World原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。