1、GlobalTransactionalInterceptor seata事务管理器。
GlobalTransactionalInterceptor 作为seata事务管理器,他负责全局事务的开启及提交。
他解析 @GlobalTransactional 和 @GlobalLock。我们先分析 @GlobalTransactional。
由@GlobalTransactional 以及代理调用方法MethodInvocation构建TransactionalExecutor事务执行器。
执行器提供了获取事务信息TransactionInfo及继续调用业务逻辑execute方法。
/**
* Execute the business logic here.
*
* @return What the business logic returns.
* @throws Throwable Any throwable during executing.
*/
Object execute() throws Throwable;
/**
* transaction conf or other attr
* @return transaction info
*/
TransactionInfo getTransactionInfo();
将执行器作为参数给TransactionalTemplate事务执行模板执行,这里将模板方法设计模式变形为模板方法
中需要具体的执行器执行的部分逻辑抽离成为接口,通过参数的形式注入。
具体模板方法为:
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. 从执行器获取事务信息
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
//获取当前上下文中的事务xid,如果没有则表示是一个全新的事务开始,那么就需要构建一个
//Launcher(发起者)角色的事务。否则就通过xid新建一个事务对象,并且设置参与者为Participant角色
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 获取事务传播行为
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
//不支持事务,那么如果存在事务,就将其挂起。并且执行业务逻辑。
//而且在最后finally部分会将挂起的事务进行重新恢复。
//注意:如果在此业务中调用了远程请求,那么将不会传递xid。因为挂起将
//当前事务的xid进行解绑了。故xid不会被传递到远方。
// If transaction is existing, suspend it.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
}
// Execute without transaction and return.
//在没有事务的环境中执行业务逻辑
return business.execute();
case REQUIRES_NEW:
//需要一个新事物,如果存在,则将旧事物挂起,并且新建一个事务。
//这里需要注意一个事实,开启的是一个全新的分布式事务,会有一个新的xid。
//此事务和老事务没有任何关系。
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
// Continue and execute with new transaction
break;
case SUPPORTS:
//如果存在事物则在当前事务环境下面执行,否则就直接执行。
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx)) {
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
//默认的事务传播行为,如果有事物,就在当前事务下执行,否则新建一个事务执行。
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
case NEVER:
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
//存在事务就报错。
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
//在非事务环境下面执行
// Execute without transaction and return.
return business.execute();
}
case MANDATORY:
//在当前上下文中必须存在事务,否则报错。
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
if (tx == null) {
//第一次创建事务,并且上下文中没有xid。也就是作为全局事务的开始。Launcher
tx = GlobalTransactionContext.createNew();
}
// set current tx config to holder
//将当前事务信息替换已经存在于当前线程中的事务信息,后续再finally再进行恢复
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
//开始事务,如果是Launcher角色,那么会向tc发送开始事务的信息。否则什么都不做。但是会触发
//钩子回调TransactionHook。
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business
//执行实际的业务逻辑
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
//没有任何错误的情况下面 提交。
commitTransaction(tx);
return rs;
} finally {
//5. clear
//对上面存在于上下文信息中的事务信息进行恢复
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
对于GlobalTransactionContext.getCurrent()获取事务信息方法实现为:
public static GlobalTransaction getCurrent() {
String xid = RootContext.getXID();
if (xid == null) {
return null;
}
return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
}
只有在存在事务的情况下面才有返回值,否则null。而且返回值的事务角色是参与者。而非发起者。
下面分析beginTransaction(txInfo, tx)。
这里的tx就是前面GlobalTransactionContext.getCurrent()或者GlobalTransactionContext.createNew()
构建返回的,每一个拦截都会构建一个新的,他们之间的联系是xid。
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
//触发钩子函数 TransactionHook.beforeBegin();
triggerBeforeBegin();
//向tc注册
tx.begin(txInfo.getTimeOut(), txInfo.getName());
//触发钩子函数 TransactionHook.afterBegin();
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
}
下面分析tx.begin(txInfo.getTimeOut(), txInfo.getName());
具体实现在DefaultGlobalTransaction。
这里事务名称的获取方法为:如果 @GlobalTransaction 指定了名称就用指定的,否则用方法+参数进行生成。
private String formatMethod(Method method) {
StringBuilder sb = new StringBuilder(method.getName()).append("(");
Class<?>[] params = method.getParameterTypes();
int in = 0;
for (Class<?> clazz : params) {
sb.append(clazz.getName());
if (++in < params.length) {
sb.append(", ");
}
}
return sb.append(")").toString();
}
public void begin(int timeout, String name) throws TransactionException {
if (role != GlobalTransactionRole.Launcher) {
//如果角色不是Launcher 发起者则直接返回。啥也不做,之前提过
assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
}
return;
}
//自己的xid必须为null检验。
assertXIDNull();
//当前线程上下文中的xid必须为null
String currentXid = RootContext.getXID();
if (currentXid != null) {
throw new IllegalStateException("Global transaction already exists," +
" can't begin a new global transaction, currentXid = " + currentXid);
}
//向tc注册并且获取xid
xid = transactionManager.begin(null, null, name, timeout);
//绑定xid、标记事务开始
status = GlobalStatus.Begin;
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid);
}
}
下面分析xid = transactionManager.begin(null, null, name, timeout);
这里使用了DefaultTransactionManager 事务管理器。
具体实现如下:
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
//构建一个全局事务开始请求。
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
//发起tc远程调用
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
//得到xid
return response.getXid();
}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
//由tm客户端发起请求。
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
}
}
接下来看服务端如何处理:
来自于rm/tm的信息都由ServerOnRequestProcessor进行处理。
具体信息分类为
- RM:
- {@link MergedWarpMessage}
- {@link BranchRegisterRequest}
- {@link BranchReportRequest}
- {@link GlobalLockQueryRequest}
- TM:
- {@link MergedWarpMessage}
- {@link GlobalBeginRequest} 请求开启事务
- {@link GlobalCommitRequest}
- {@link GlobalReportRequest}
- {@link GlobalRollbackRequest}
- {@link GlobalStatusRequest}
最终执行代码由transactionMessageHandler执行具体的请求信息。
AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
remotingServer进行请求结果的回送
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
TC的transactionMessageHandler由DefaultCoordinator实现
处理方法为
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
}
}
然后委托给DefaultCore进行处理core.begin
//applicationId null
//transactionServiceGroup null
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
//加入一个会话生命周期管理器。后很多种实现数据库 file redis,具体和我们选择的store mode有关
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
//开始会话
session.begin();
// transaction start event
//将事务启动事件发布出去,让订阅者可以得到通知
eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));
//返回此次的目的,构建一个xid
return session.getXid();
}
构建一个GlobalSession
public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout) {
this.transactionId = UUIDGenerator.generateUUID();
this.status = GlobalStatus.Begin;
this.applicationId = applicationId;
this.transactionServiceGroup = transactionServiceGroup;
this.transactionName = transactionName;
this.timeout = timeout;
this.xid = XID.generateXID(transactionId);
}
其中xid生成规则为:
public static String generateXID(long tranId) {
return ipAddress + ":" + port + ":" + tranId;
}
会话开始:
@Override
public void begin() throws TransactionException {
this.status = GlobalStatus.Begin;
this.beginTime = System.currentTimeMillis();
this.active = true;
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onBegin(this);
}
}
继续上面的sesseion.begin()
@Override
public void begin() throws TransactionException {
this.status = GlobalStatus.Begin;
this.beginTime = System.currentTimeMillis();
this.active = true;
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
//调用所有的生命周期函数的begin方法
lifecycleListener.onBegin(this);
}
}
@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
//最终执行全局表的插入。将会话插入到表中。
return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else {
throw new StoreException("Unknown LogOperation:" + logOperation.name());
}
}
最终执行如下sql:
insert into global_table (xid, transaction_id, status, application_id, transaction_service_group,
transaction_name, timeout, begin_time, application_data, gmt_create, gmt_modified) values
(?, ?, ?, ?, ?, ?, ?, ?, ?, now(), now())
到此全局事务算是开启了。
seata的事务也支持spring事务的几种传播行为。
还有一个疑问,at模式下面,如果第一阶段完成后,要求提交或者回滚,如果此时由于某些原因部分参与者无法提交或者回滚。那么会尝试多少次进行通知提交或者回滚。如果一直不成功。会将提交的转换为回滚执行吗?
答案: 如果第一阶段完成,TM向TC申请提交后,TC会按照注册顺序发送提交请求。如果在某个节点处得到了提交失败或者网络错误等等。只要不是PhaseTwo_CommitFailed_Unretryable 错误,则会进入到重试提交任务队列 retryCommitting,不断的进行重试,除非设置了最大重试时间,否则一致重试下去。
回滚亦是如此。
不存在提交状态转换到回滚态。如果分布式事务存在某个分支超时就改为回滚,那么还有一致性可言吗,还叫事务吗? 只可能会取消重试,让业务维护人员自己介入处理
还有一个定时调度器是任务超时调度,但是只有在第一阶段才会触发。
还有一个rm unlod删除的定时调度任务。