seata源码解读

1、GlobalTransactionalInterceptor seata事务管理器。
GlobalTransactionalInterceptor 作为seata事务管理器,他负责全局事务的开启及提交。
他解析 @GlobalTransactional 和 @GlobalLock。我们先分析 @GlobalTransactional。
由@GlobalTransactional 以及代理调用方法MethodInvocation构建TransactionalExecutor事务执行器。
执行器提供了获取事务信息TransactionInfo及继续调用业务逻辑execute方法。

 /**
     * Execute the business logic here.
     *
     * @return What the business logic returns.
     * @throws Throwable Any throwable during executing.
     */
    Object execute() throws Throwable;

    /**
     * transaction conf or other attr
     * @return transaction info
     */
    TransactionInfo getTransactionInfo();

将执行器作为参数给TransactionalTemplate事务执行模板执行,这里将模板方法设计模式变形为模板方法
中需要具体的执行器执行的部分逻辑抽离成为接口,通过参数的形式注入。
具体模板方法为:

     public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. 从执行器获取事务信息
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
	//获取当前上下文中的事务xid,如果没有则表示是一个全新的事务开始,那么就需要构建一个
	//Launcher(发起者)角色的事务。否则就通过xid新建一个事务对象,并且设置参与者为Participant角色
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();

        // 1.2 获取事务传播行为
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            switch (propagation) {
                case NOT_SUPPORTED:
		    //不支持事务,那么如果存在事务,就将其挂起。并且执行业务逻辑。
		    //而且在最后finally部分会将挂起的事务进行重新恢复。
		    //注意:如果在此业务中调用了远程请求,那么将不会传递xid。因为挂起将
		    //当前事务的xid进行解绑了。故xid不会被传递到远方。
                    // If transaction is existing, suspend it.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                    }
                    // Execute without transaction and return.
		    //在没有事务的环境中执行业务逻辑
                    return business.execute();
                case REQUIRES_NEW:
		    //需要一个新事物,如果存在,则将旧事物挂起,并且新建一个事务。
		    //这里需要注意一个事实,开启的是一个全新的分布式事务,会有一个新的xid。
		    //此事务和老事务没有任何关系。
                    // If transaction is existing, suspend it, and then begin new transaction.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();
                    }
                    // Continue and execute with new transaction
                    break;
                case SUPPORTS:
		    //如果存在事物则在当前事务环境下面执行,否则就直接执行。
                    // If transaction is not existing, execute without transaction.
                    if (notExistingTransaction(tx)) {
                        return business.execute();
                    }
                    // Continue and execute with new transaction
                    break;
                case REQUIRED:
		    //默认的事务传播行为,如果有事物,就在当前事务下执行,否则新建一个事务执行。
                    // If current transaction is existing, execute with current transaction,
                    // else continue and execute with new transaction.
                    break;
                case NEVER:
		    
                    // If transaction is existing, throw exception.
                    if (existingTransaction(tx)) {
		        //存在事务就报错。
                        throw new TransactionException(
                            String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                    , tx.getXid()));
                    } else {
		        //在非事务环境下面执行
                        // Execute without transaction and return.
                        return business.execute();
                    }
                case MANDATORY:
		    //在当前上下文中必须存在事务,否则报错。
                    // If transaction is not existing, throw exception.
                    if (notExistingTransaction(tx)) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    // Continue and execute with current transaction.
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }

            // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
            if (tx == null) {
	        //第一次创建事务,并且上下文中没有xid。也就是作为全局事务的开始。Launcher
                tx = GlobalTransactionContext.createNew();
            }

            // set current tx config to holder
	    //将当前事务信息替换已经存在于当前线程中的事务信息,后续再finally再进行恢复
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
                // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
                //    else do nothing. Of course, the hooks will still be triggered.
                //开始事务,如果是Launcher角色,那么会向tc发送开始事务的信息。否则什么都不做。但是会触发
		//钩子回调TransactionHook。
		beginTransaction(txInfo, tx);

                Object rs;
                try {
                    // Do Your Business
		    //执行实际的业务逻辑
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 3. The needed business exception to rollback.
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                // 4. everything is fine, commit.
		//没有任何错误的情况下面 提交。
                commitTransaction(tx);

                return rs;
            } finally {
                //5. clear
		//对上面存在于上下文信息中的事务信息进行恢复
                resumeGlobalLockConfig(previousConfig);
                triggerAfterCompletion();
                cleanUp();
            }
        } finally {
            // If the transaction is suspended, resume it.
            if (suspendedResourcesHolder != null) {
                tx.resume(suspendedResourcesHolder);
            }
        }
    }

对于GlobalTransactionContext.getCurrent()获取事务信息方法实现为:

public static GlobalTransaction getCurrent() {
        String xid = RootContext.getXID();
        if (xid == null) {
            return null;
        }
        return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
}

只有在存在事务的情况下面才有返回值,否则null。而且返回值的事务角色是参与者。而非发起者。

下面分析beginTransaction(txInfo, tx)。
这里的tx就是前面GlobalTransactionContext.getCurrent()或者GlobalTransactionContext.createNew()
构建返回的,每一个拦截都会构建一个新的,他们之间的联系是xid。

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            //触发钩子函数 TransactionHook.beforeBegin();
	    triggerBeforeBegin();
	    //向tc注册
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
	    //触发钩子函数 TransactionHook.afterBegin();
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);

        }
}

下面分析tx.begin(txInfo.getTimeOut(), txInfo.getName());
具体实现在DefaultGlobalTransaction。
这里事务名称的获取方法为:如果 @GlobalTransaction 指定了名称就用指定的,否则用方法+参数进行生成。

private String formatMethod(Method method) {
        StringBuilder sb = new StringBuilder(method.getName()).append("(");

        Class<?>[] params = method.getParameterTypes();
        int in = 0;
        for (Class<?> clazz : params) {
            sb.append(clazz.getName());
            if (++in < params.length) {
                sb.append(", ");
            }
        }
        return sb.append(")").toString();
}

public void begin(int timeout, String name) throws TransactionException {
        if (role != GlobalTransactionRole.Launcher) {
	    //如果角色不是Launcher 发起者则直接返回。啥也不做,之前提过
            assertXIDNotNull();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
            }
            return;
        }
	//自己的xid必须为null检验。
        assertXIDNull();
	//当前线程上下文中的xid必须为null
        String currentXid = RootContext.getXID();
        if (currentXid != null) {
            throw new IllegalStateException("Global transaction already exists," +
                " can't begin a new global transaction, currentXid = " + currentXid);
        }
	//向tc注册并且获取xid
        xid = transactionManager.begin(null, null, name, timeout);
	//绑定xid、标记事务开始
        status = GlobalStatus.Begin;
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [{}]", xid);
        }
}

下面分析xid = transactionManager.begin(null, null, name, timeout);
这里使用了DefaultTransactionManager 事务管理器。
具体实现如下:

 @Override
 public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
     throws TransactionException {
 //构建一个全局事务开始请求。
     GlobalBeginRequest request = new GlobalBeginRequest();
     request.setTransactionName(name);
     request.setTimeout(timeout);
 //发起tc远程调用
     GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
     if (response.getResultCode() == ResultCode.Failed) {
         throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
     }
 //得到xid
     return response.getXid();
 }

 private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
     try {
     //由tm客户端发起请求。
         return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
     } catch (TimeoutException toe) {
         throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
     }
 }

接下来看服务端如何处理:
来自于rm/tm的信息都由ServerOnRequestProcessor进行处理。
具体信息分类为

  • RM:
    1. {@link MergedWarpMessage}
    1. {@link BranchRegisterRequest}
    1. {@link BranchReportRequest}
    1. {@link GlobalLockQueryRequest}
  • TM:
    1. {@link MergedWarpMessage}
    1. {@link GlobalBeginRequest} 请求开启事务
    1. {@link GlobalCommitRequest}
    1. {@link GlobalReportRequest}
    1. {@link GlobalRollbackRequest}
    1. {@link GlobalStatusRequest}

最终执行代码由transactionMessageHandler执行具体的请求信息。

 AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
 remotingServer进行请求结果的回送
 remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);

TC的transactionMessageHandler由DefaultCoordinator实现
处理方法为
    @Override
    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
        throws TransactionException {
        response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
            request.getTransactionName(), request.getTimeout()));
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
                rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
        }
    }
然后委托给DefaultCore进行处理core.begin
    //applicationId null 
    //transactionServiceGroup null
    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
            timeout);
        MDC.put(RootContext.MDC_KEY_XID, session.getXid());
	//加入一个会话生命周期管理器。后很多种实现数据库 file redis,具体和我们选择的store mode有关
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
	//开始会话
        session.begin();

        // transaction start event
	//将事务启动事件发布出去,让订阅者可以得到通知
        eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));
        //返回此次的目的,构建一个xid
        return session.getXid();
    }

构建一个GlobalSession

public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout) {
        this.transactionId = UUIDGenerator.generateUUID();
        this.status = GlobalStatus.Begin;

        this.applicationId = applicationId;
        this.transactionServiceGroup = transactionServiceGroup;
        this.transactionName = transactionName;
        this.timeout = timeout;
        this.xid = XID.generateXID(transactionId);
}

其中xid生成规则为:

 public static String generateXID(long tranId) {
        return ipAddress + ":" + port + ":" + tranId;
}

会话开始:

 @Override
 public void begin() throws TransactionException {
     this.status = GlobalStatus.Begin;
     this.beginTime = System.currentTimeMillis();
     this.active = true;
     for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
         lifecycleListener.onBegin(this);
     }
 }

继续上面的sesseion.begin()

    @Override
public void begin() throws TransactionException {
        this.status = GlobalStatus.Begin;
        this.beginTime = System.currentTimeMillis();
        this.active = true;
        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
	    //调用所有的生命周期函数的begin方法
            lifecycleListener.onBegin(this);
        }
}
@Override
    public boolean writeSession(LogOperation logOperation, SessionStorable session) {
        if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
	    //最终执行全局表的插入。将会话插入到表中。
            return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
        } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
            return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
        } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
            return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
        } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
            return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
        } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
            return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
        } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
            return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
        } else {
            throw new StoreException("Unknown LogOperation:" + logOperation.name());
        }
    }

最终执行如下sql:
insert into global_table (xid, transaction_id, status, application_id, transaction_service_group,
transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified) values
(?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now())

到此全局事务算是开启了。
seata的事务也支持spring事务的几种传播行为。

还有一个疑问,at模式下面,如果第一阶段完成后,要求提交或者回滚,如果此时由于某些原因部分参与者无法提交或者回滚。那么会尝试多少次进行通知提交或者回滚。如果一直不成功。会将提交的转换为回滚执行吗?

答案: 如果第一阶段完成,TM向TC申请提交后,TC会按照注册顺序发送提交请求。如果在某个节点处得到了提交失败或者网络错误等等。只要不是PhaseTwo_CommitFailed_Unretryable 错误,则会进入到重试提交任务队列 retryCommitting,不断的进行重试,除非设置了最大重试时间,否则一致重试下去。

回滚亦是如此。

不存在提交状态转换到回滚态。如果分布式事务存在某个分支超时就改为回滚,那么还有一致性可言吗,还叫事务吗? 只可能会取消重试,让业务维护人员自己介入处理

还有一个定时调度器是任务超时调度,但是只有在第一阶段才会触发。

还有一个rm unlod删除的定时调度任务。


版权声明:本文为connection_原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。