序言:
对于阿里开源分布式事务框架seata的详细了解可以参考官网,这里不会详细介绍。本章只会介绍seata中AT模式的源码分析(对阿seata有一定了解或者成功完成过demo)。
seata中一个事务的开启是由TM角色来完成,在整体事务发起方我们可以通过在执行方法中包含@GlobalTransactional来标示启用全局事务,并包含该全局事务的一定自定义设置。如下所示:
public @interface GlobalTransactional {
/**
* 设置该全局事务下执行的超时时间 默认毫秒
*
* @return timeoutMills in MILLISECONDS.
*/
int timeoutMills() default TransactionInfo.DEFAULT_TIME_OUT;
/**
* 全局事务实例的的名称
*
* @return Given name.
*/
String name() default "";
/**
* 设置哪些异常类发生需要进行rollback
* @return
*/
Class<? extends Throwable>[] rollbackFor() default {};
/**
* 设置哪些异常类名发生需要进行rollback
* @return
*/
String[] rollbackForClassName() default {};
/**
* 设置哪些异常类发生不需要进行rollback
* @return
*/
Class<? extends Throwable>[] noRollbackFor() default {};
/**
* 设置哪些异常类名发生不需要进行rollback
* @return
*/
String[] noRollbackForClassName() default {};
/**
* 事务传播级别 默认REQUIRED
*
* @return
*/
Propagation propagation() default Propagation.REQUIRED;
}
1:seata客户端-TM(基于springcloud项目分析)
1.0:GlobalTransactionScanner
使用过spring的@Transactional事务的实现知道,它通过动态代理的方式,将事务的创建,提交或回滚这些公干的动作封装到一套执行模版中。这种方式在很多开源框架都是如此构建的例如mybtis中的各执行注解(@Update,@Select等),Springcloud中的Feign调用啊等。通过idea我们可以查看@GlobalTransactional该注解在什么地方被使用到,如下所示:
如果对于springboot的一些开源start(例如mybatis中MapperScannerRegistrar等)项目有过源码走读的经验,从GlobalTransactionScanner名字可以看出该类负责扫描GlobalTransaction注解并构建其代理方法(比较@GlobalTransactionScanner作用在方法中)。继续通过ieda的Find Usages功能寻找GobalTransactionScanner的引用,发现在seata的spring starter项目中的SeataAutoConfiguration对其进行初始化。
我们镜头回到GobalTransactionScanner中(对于SeataAutoConfiguration的其它作用后续描述)。GobalTransactionScanner实现了InitializingBean(bean初始化完成后执行),AbstractAutoProxyCreator,ApplicationContextAware(获取ApplicationContext对象),DisposableBean(bean被消耗时执行),分别对应的spring中bean不同的生命周期。如下所示是spring bean初始化完成后执行
这里会存在一个疑问,为何要开启RM与TM两个client,如果对于某一个服务它在分布式事务链路中只是作为一个分支即RM的角色而非TM,那么对于这TM的启动是否没有存在必要,毕竟需要开启TM与TC之间的连接通道,也是一个资源的浪费。
1.1:客户端TM client
在1.2与1.3对于TM与TC之间连接的有关的管理类有着不同的命名
1.2的时候命名为TmRpcClient
对于1.3的时候改命名为TmNettyRemotingClient如下所示:
其实不论上述两个版本核心都是通过Netty作为服务之间远程网络通信基础架构,所以1.3的改为TmNettyRemotingClient更简单表达底层实现原理。后续都以1.3最新版作为讲解
1.1.1:TmNettyRemotingClient(核心类,TM远程调用client)
public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {
private static final Logger LOGGER = LoggerFactory.getLogger(TmNettyRemotingClient.class);
private static volatile TmNettyRemotingClient instance;
//长链接 keep-alive时间
private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
//常量 线程 等待队列长度
private static final int MAX_QUEUE_SIZE = 2000;
//是否初始化标示
private final AtomicBoolean initialized = new AtomicBoolean(false);
//配置applicationId唯一id
private String applicationId;
private String transactionServiceGroup;
@Override
public void init() {
// 注册返回response 消息处理器
registerProcessor();
//初始化
if (initialized.compareAndSet(false, true)) {
super.init();
}
}
private TmNettyRemotingClient(NettyClientConfig nettyClientConfig,
EventExecutorGroup eventExecutorGroup,
ThreadPoolExecutor messageExecutor) {
super(nettyClientConfig, eventExecutorGroup, messageExecutor, NettyPoolKey.TransactionRole.TMROLE);
}
/**
* 获取一个TmNettyRemotingClient
*
* @param applicationId the application id
* @param transactionServiceGroup the transaction service group
* @return the instance
*/
public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {
//作为一个单列的形式获取TmNettyRemotingClient
TmNettyRemotingClient tmNettyRemotingClient = getInstance();
tmNettyRemotingClient.setApplicationId(applicationId);
tmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);
return tmNettyRemotingClient;
}
/**
* 单例获取 懒汉式获取
* @return the instance
*/
public static TmNettyRemotingClient getInstance() {
if (instance == null) {
synchronized (TmNettyRemotingClient.class) {
if (instance == null) {
NettyClientConfig nettyClientConfig = new NettyClientConfig();
//定义线程pool
final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
nettyClientConfig.getClientWorkerThreads()),
RejectedPolicies.runsOldestTaskPolicy());
instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
}
}
}
return instance;
}
/**
* Sets application id.
*
* @param applicationId the application id
*/
public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}
/**
* Sets transaction service group.
*
* @param transactionServiceGroup the transaction service group
*/
public void setTransactionServiceGroup(String transactionServiceGroup) {
this.transactionServiceGroup = transactionServiceGroup;
}
@Override
public String getTransactionServiceGroup() {
return transactionServiceGroup;
}
/**
*注册成功回调
*/
@Override
public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response,
AbstractMessage requestMessage) {
RegisterTMRequest registerTMRequest = (RegisterTMRequest)requestMessage;
RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;
if (LOGGER.isInfoEnabled()) {
LOGGER.info("register TM success. client version:{}, server version:{},channel:{}", registerTMRequest.getVersion(), registerTMResponse.getVersion(), channel);
}
getClientChannelManager().registerChannel(serverAddress, channel);
}
@Override
public void onRegisterMsgFail(String serverAddress, Channel channel, Object response,
AbstractMessage requestMessage) {
RegisterTMRequest registerTMRequest = (RegisterTMRequest)requestMessage;
RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;
String errMsg = String.format(
"register TM failed. client version: %s,server version: %s, errorMsg: %s, " + "channel: %s", registerTMRequest.getVersion(), registerTMResponse.getVersion(), registerTMResponse.getMsg(), channel);
throw new FrameworkException(errMsg);
}
/**
* bean被销毁
*/
@Override
public void destroy() {
super.destroy();
initialized.getAndSet(false);
instance = null;
}
@Override
protected Function<String, NettyPoolKey> getPoolKeyFunction() {
return (severAddress) -> {
RegisterTMRequest message = new RegisterTMRequest(applicationId, transactionServiceGroup);
return new NettyPoolKey(NettyPoolKey.TransactionRole.TMROLE, severAddress, message);
};
}
/**
* 注册 TC response 有关处理器
*/
private void registerProcessor() {
//注册 TC response netty返回信息解析器
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
// 2.注册 heartbeat netty返回信息解析器
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
}
//父类中init
public void init() {
//定义周期延时任务默认10s 该任务用于TM与TC的channel的连接检测 对于断的Channel进行重连
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
//是否配置开启transport.enableClientBatchSendRequest即客户端事务消息请求是否批量合并发送 默认为true
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
//定义线程pool
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
//运行MergedSendRunnable任务即合并发送请求
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
//
clientBootstrap.start();
}
NettyClientBootstrap是对于NettyClient的封装,对该类进行源码分析:
public class NettyClientBootstrap implements RemotingBootstrap {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class);
//client有关配置
private final NettyClientConfig nettyClientConfig;
//netty 启动类
private final Bootstrap bootstrap = new Bootstrap();
//netty worker
private final EventLoopGroup eventLoopGroupWorker;
//事件调度
private EventExecutorGroup defaultEventExecutorGroup;
//是否初始化标示
private final AtomicBoolean initialized = new AtomicBoolean(false);
private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
private final NettyPoolKey.TransactionRole transactionRole;
//netty handler事件
private ChannelHandler[] channelHandlers;
public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup,
NettyPoolKey.TransactionRole transactionRole) {
if (nettyClientConfig == null) {
nettyClientConfig = new NettyClientConfig();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("use default netty client config.");
}
}
this.nettyClientConfig = nettyClientConfig;
int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();
this.transactionRole = transactionRole;
//nio event group
this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize,
new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
selectorThreadSizeThreadSize));
this.defaultEventExecutorGroup = eventExecutorGroup;
}
/**
* Sets channel handlers.
*
* @param handlers the handlers
*/
protected void setChannelHandlers(final ChannelHandler... handlers) {
if (handlers != null) {
channelHandlers = handlers;
}
}
/**
* Add channel pipeline last.
*
* @param channel the channel
* @param handlers the handlers
*/
private void addChannelPipelineLast(Channel channel, ChannelHandler... handlers) {
if (channel != null && handlers != null) {
channel.pipeline().addLast(handlers);
}
}
@Override
public void start() {
if (this.defaultEventExecutorGroup == null) {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
nettyClientConfig.getClientWorkerThreads()));
}
//初始化 netty client 并设置option属性
this.bootstrap.group(this.eventLoopGroupWorker).channel(
nettyClientConfig.getClientChannelClazz()).option(
ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(
ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(
ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF,
nettyClientConfig.getClientSocketRcvBufSize());
if (nettyClientConfig.enableNative()) {
if (PlatformDependent.isOsx()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("client run on macOS");
}
} else {
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.option(EpollChannelOption.TCP_QUICKACK, true);
}
}
//通过pipeline 绑定默认Handler 以及自定义handler
bootstrap.handler(
new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(
/**
* 设置channel 空闲状态处理器,是用来检测当前Handler的ChannelRead()的空闲时间
* int readerIdleTimeSeconds 为读超时时间(即多长时间没有接受到客户端发送数据)
* int writerIdleTimeSeconds, 为写超时时间(即多长时间没有向客户端发送数据)
* int allIdleTimeSeconds 所有类型(读或写)的超时时间
* 根据个参数IdleStateHandler会启动不同的定时任务,根据设定的时长去检测ChannelRead()方法是否被调用,
* 如果没有被调用。之后则会调用后续handler的userEventTriggered方法去执行一些事情(比如断开链接)
*/
new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
nettyClientConfig.getChannelMaxAllIdleSeconds()))
//设置编码解码器
.addLast(new ProtocolV1Decoder())
.addLast(new ProtocolV1Encoder());
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}
}
});
if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
LOGGER.info("NettyClientBootstrap has started");
}
}
@Override
public void shutdown() {
try {
//关闭netty网络资源
this.eventLoopGroupWorker.shutdownGracefully();
if (this.defaultEventExecutorGroup != null) {
this.defaultEventExecutorGroup.shutdownGracefully();
}
} catch (Exception exx) {
LOGGER.error("Failed to shutdown: {}", exx.getMessage());
}
}
/**
*
* 获取一个新的channel channel为与TC之间网络通道
* @param address the address 网络地址
* @return the new channel
*/
public Channel getNewChannel(InetSocketAddress address) {
Channel channel;
//连接TC
ChannelFuture f = this.bootstrap.connect(address);
try {
//等待超时时间内与Server端进行 若无法连接抛出异常
f.await(this.nettyClientConfig.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS);
if (f.isCancelled()) {
throw new FrameworkException(f.cause(), "connect cancelled, can not connect to services-server.");
} else if (!f.isSuccess()) {
throw new FrameworkException(f.cause(), "connect failed, can not connect to services-server.");
} else {
channel = f.channel();
}
} catch (Exception e) {
throw new FrameworkException(e, "can not connect to services-server.");
}
return channel;
}
/**
* Gets thread prefix.
*
* @param threadPrefix the thread prefix
* @return the thread prefix
*/
private String getThreadPrefix(String threadPrefix) {
return threadPrefix + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();
}
}以上为TM大体的初始化过程,详细可自行研读源码
GlobalTransactionScanner中afterPropertiesSet解析完成之后,会执行AbstractAutoProxyCreator(该类用于为Bean生成代理对象,)中的wrapIfNecessary()方法,(AbstractAutoProxyCreator实际上实现了BeanPostProcessor接口,而wrapIfNecessary在postProcessAfterInitialization方法中被调用,因此它在afterPropertiesSet之后执行
wrapIfNecessary源码分析:
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
//是否开启GlobalTransaction
if (disableGlobalTransaction) {
return bean;
}
try {
//PROXYED_SET 记录已被代理
synchronized (PROXYED_SET) {
//该bean 是否已被代理 若已被无需重复代理
if (PROXYED_SET.contains(beanName)) {
return bean;
}
//MethodInterceptor 定义方法拦截器
interceptor = null;
//检测是否是TCC 模式下代理
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
//非jdk代理 基于class方式
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
//如果bean是jdk代理(基于接口) 获取元Class
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
//是否包含Annotation
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (interceptor == null) {
if (globalTransactionalInterceptor == null) {
//构建globalTransactionalInterceptor
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
if (!AopUtils.isAopProxy(bean)) {
//如果gaibean不是aop代理类
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
// 执行包装目标对象到代理对象
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
/**
* 目标 classes的方法中是否包含GlobalTransactional 或GlobalLock 注解
* @param classes
* @return
*/
private boolean existsAnnotation(Class<?>[] classes) {
if (CollectionUtils.isNotEmpty(classes)) {
for (Class<?> clazz : classes) {
if (clazz == null) {
continue;
}
GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);
if (trxAnno != null) {
return true;
}
Method[] methods = clazz.getMethods();
for (Method method : methods) {
//是否包含GlobalTransactional注解
trxAnno = method.getAnnotation(GlobalTransactional.class);
if (trxAnno != null) {
return true;
}
//GlobalLock
GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
if (lockAnno != null) {
return true;
}
}
}
}
return false;
}从上述代码中可看出,用GlobalTransactionalInterceptor 代替了GlobalTransactional 和 GlobalLock 注解的方法
1.3:GlobalTransactionalInterceptor(全局事务拦截器)
该类用于代理处理@GlobalTransactional被执行,如下源码所示:
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);
private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();
//事务模版类
private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
private final GlobalLockTemplate<Object> globalLockTemplate = new GlobalLockTemplate<>();
//失败处理器
private final FailureHandler failureHandler;
private volatile boolean disable;
//服务自检周期 默认2000,单位ms.每2秒进行一次服务自检,来决定
private static int degradeCheckPeriod;
//降级检测开关 降级开关 默认false。业务侧根据连续错误数自动降级不走seata事务
private static volatile boolean degradeCheck;
//升降级达标阈值 默认10
private static int degradeCheckAllowTimes;
private static volatile Integer degradeNum = 0;
private static volatile Integer reachNum = 0;
private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);
//用于周期检测是否降级 执行器 应该在degradeCheck =true时被初始化
private static ScheduledThreadPoolExecutor executor =
new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("degradeCheckWorker", 1, true));
/**
* Instantiates a new Global transactional interceptor.
*
* @param failureHandler
* the failure handler
*/
public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
//初始化动作
this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
DEFAULT_DISABLE_GLOBAL_TRANSACTION);
degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,
DEFAULT_TM_DEGRADE_CHECK);
//开启降级设置
if (degradeCheck) {
ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(
ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);
degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(
ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
EVENT_BUS.register(this);
if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {
startDegradeCheck();
}
}
}
/**
* 代理方法调用逻辑
* @param methodInvocation 被代理的原方法
* @return
* @throws Throwable
*/
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
//获取方法所属类
Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
//获取执行具体的Method对象
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
//获取GlobalTransactional注解对象 获取定义属性
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
//获取GlobalLock对象
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
//是否被降级或者开启全局事务
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
//判定globalTransactional注解还是globalLock全局锁对象
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);
}
}
}
//执行具体方法
return methodInvocation.proceed();
}
/**
*
* @param methodInvocation
* @return
* @throws Exception
*/
private Object handleGlobalLock(final MethodInvocation methodInvocation) throws Exception {
//
return globalLockTemplate.execute(() -> {
try {
return methodInvocation.proceed();
} catch (Exception e) {
throw e;
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
}
/**
* 核心代理本质通过transactionalTemplate来实现
* @param methodInvocation
* @param globalTrxAnno
* @return
* @throws Throwable
*/
private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
boolean succeed = true;
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
//执行原方法
return methodInvocation.proceed();
}
public String name() {
String name = globalTrxAnno.name();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod());
}
@Override
public TransactionInfo getTransactionInfo() {
//根据注解中设定信息构建TransactionInfo transactionalTemplate中需要使用
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
transactionInfo.setName(name());
transactionInfo.setPropagation(globalTrxAnno.propagation());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.rollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();
switch (code) {
//在事务哪阶段发生了异常 根据不同异常分支走不同代码
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
//第一阶段发生异常
succeed = false;
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
succeed = false;
//commit发生异常
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
//回滚时发生异常
failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
case RollbackRetrying:
//回滚重试异常
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
default:
//其它异常
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
} finally {
if (degradeCheck) {
EVENT_BUS.post(new DegradeCheckEvent(succeed));
}
}
}
public <T extends Annotation> T getAnnotation(Method method, Class<?> targetClass, Class<T> annotationClass) {
return Optional.ofNullable(method).map(m -> m.getAnnotation(annotationClass))
.orElse(Optional.ofNullable(targetClass).map(t -> t.getAnnotation(annotationClass)).orElse(null));
}
//构建该方法唯一名称 避免方法重载使用入参
private String formatMethod(Method method) {
StringBuilder sb = new StringBuilder(method.getName()).append("(");
//方法执行参数类型
Class<?>[] params = method.getParameterTypes();
int in = 0;
for (Class<?> clazz : params) {
sb.append(clazz.getName());
if (++in < params.length) {
sb.append(", ");
}
}
return sb.append(")").toString();
}
/**
* 监听ConfigurationChangeEvent事件 只针对于disable_global_transaction与client_degrade_check变更
* @param event the event
*/
@Override
public void onChangeEvent(ConfigurationChangeEvent event) {
if (ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION.equals(event.getDataId())) {
LOGGER.info("{} config changed, old value:{}, new value:{}", ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
disable, event.getNewValue());
disable = Boolean.parseBoolean(event.getNewValue().trim());
} else if (ConfigurationKeys.CLIENT_DEGRADE_CHECK.equals(event.getDataId())) {
degradeCheck = Boolean.parseBoolean(event.getNewValue());
if (!degradeCheck) {
degradeNum = 0;
}
}
}
/**
* auto upgrade service detection
*/
private static void startDegradeCheck() {
executor.scheduleAtFixedRate(() -> {
if (degradeCheck) {
try {
String xid = TransactionManagerHolder.get().begin(null, null, "degradeCheck", 60000);
TransactionManagerHolder.get().commit(xid);
EVENT_BUS.post(new DegradeCheckEvent(true));
} catch (Exception e) {
EVENT_BUS.post(new DegradeCheckEvent(false));
}
}
}, degradeCheckPeriod, degradeCheckPeriod, TimeUnit.MILLISECONDS);
}
@Subscribe
public static void onDegradeCheck(DegradeCheckEvent event) {
if (event.isRequestSuccess()) {
if (degradeNum >= degradeCheckAllowTimes) {
reachNum++;
if (reachNum >= degradeCheckAllowTimes) {
reachNum = 0;
degradeNum = 0;
if (LOGGER.isInfoEnabled()) {
LOGGER.info("the current global transaction has been restored");
}
}
} else if (degradeNum != 0) {
degradeNum = 0;
}
} else {
if (degradeNum < degradeCheckAllowTimes) {
degradeNum++;
if (degradeNum >= degradeCheckAllowTimes) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("the current global transaction has been automatically downgraded");
}
}
} else if (reachNum != 0) {
reachNum = 0;
}
}
}
}
从上述源码中invoke方法可知,本质通过TransactionalTemplate的execute来执行真正的流程,如下所示:
1.4:TransactionalTemplate(事务执行模版)
该类封装了事务执行的
public class TransactionalTemplate {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalTemplate.class);
/**
* Execute object.
*
* @param business the business
* @return the object
* @throws TransactionalExecutor.ExecutionException the execution exception
*/
public Object execute(TransactionalExecutor business) throws Throwable {
// 1 获取GlobalTransactionalInterceptor中根据注解封装的TransactionInfo类
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1创建一个全局事务 默认为DefaultGlobalTransaction 感觉当前上下文中是否包含一个xid
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.2 处理不同的事务传播级别和branchType
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
//处理不同的事务传播级别
case NOT_SUPPORTED:
suspendedResourcesHolder = tx.suspend(true);
return business.execute();
case REQUIRES_NEW:
suspendedResourcesHolder = tx.suspend(true);
break;
case SUPPORTS:
if (!existingTransaction()) {
//如果已经存在事务直接执行 不创建事务
return business.execute();
}
break;
case REQUIRED:
break;
case NEVER:
if (existingTransaction()) {
//如果已经存在事务
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"
,RootContext.getXID()));
} else {
return business.execute();
}
case MANDATORY:
if (!existingTransaction()) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
try {
// 2. 开始 Transaction
beginTransaction(txInfo, tx);
Object rs = null;
try {
// 执行业务代码
rs = business.execute();
} catch (Throwable ex) {
//3 在业务代码执行若发生异常 判定抛出的异常是否需要被回滚
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4.所有方法都以准备完成 commit事务
commitTransaction(tx);
//返回结果集
return rs;
} finally {
//5. 清除
triggerAfterCompletion();
cleanUp();
}
} finally {
tx.resume(suspendedResourcesHolder);
}
}
public boolean existingTransaction() {
return StringUtils.isNotEmpty(RootContext.getXID());
}
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
//roll back
if (txInfo != null && txInfo.rollbackOn(originalException)) {
try {
//需要进行回滚
rollbackTransaction(tx, originalException);
} catch (TransactionException txe) {
//回滚失败 抛出RollbackFailure类型异常 由GlobalTransactionalInterceptor处理
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, originalException);
}
} else {
//这个异常不需要进行 回滚 直接提交
commitTransaction(tx);
}
}
/**
* 提交事务
* @param tx
* @throws TransactionalExecutor.ExecutionException
*/
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
//执行commit的之前钩子函数
triggerBeforeCommit();
tx.commit();
//执行commit的after钩子函数
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 事务提交失败
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
}
/**
* 回滚一个事务
* @param tx
* @param originalException
* @throws TransactionException
* @throws TransactionalExecutor.ExecutionException
*/
private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
//执行rollback的之前钩子函数
triggerBeforeRollback();
tx.rollback();
//执行rollback的之后钩子函数
triggerAfterRollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
}
/**
* 开始一个事务
* @param txInfo
* @param tx
* @throws TransactionalExecutor.ExecutionException
*/
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
//执行beginTransaction的before钩子函数
triggerBeforeBegin();
//底层是通过GlobalTransaction来执行
tx.begin(txInfo.getTimeOut(), txInfo.getName());
//执行beginTransaction的After钩子函数
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
}
private void triggerBeforeBegin() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeBegin();
} catch (Exception e) {
LOGGER.error("Failed execute beforeBegin in hook {}",e.getMessage(),e);
}
}
}
private void triggerAfterBegin() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterBegin();
} catch (Exception e) {
LOGGER.error("Failed execute afterBegin in hook {}",e.getMessage(),e);
}
}
}
private void triggerBeforeRollback() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeRollback();
} catch (Exception e) {
LOGGER.error("Failed execute beforeRollback in hook {}",e.getMessage(),e);
}
}
}
private void triggerAfterRollback() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterRollback();
} catch (Exception e) {
LOGGER.error("Failed execute afterRollback in hook {}",e.getMessage(),e);
}
}
}
private void triggerBeforeCommit() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeCommit();
} catch (Exception e) {
LOGGER.error("Failed execute beforeCommit in hook {}",e.getMessage(),e);
}
}
}
private void triggerAfterCommit() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCommit();
} catch (Exception e) {
LOGGER.error("Failed execute afterCommit in hook {}",e.getMessage(),e);
}
}
}
private void triggerAfterCompletion() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCompletion();
} catch (Exception e) {
LOGGER.error("Failed execute afterCompletion in hook {}",e.getMessage(),e);
}
}
}
private void cleanUp() {
TransactionHookManager.clear();
}
private List<TransactionHook> getCurrentHooks() {
return TransactionHookManager.getHooks();
}
}
从上述源码可以看出对于全局事务的提交回滚都是通过GlobalTransaction接口来实现的
1.4:GlobalTransaction
该接口提供了事务有关的所有方法,具体的实现为DefaultGlobalTransaction
public interface GlobalTransaction {
/**
* 使用默认超时和名称开始新的全局事务
*
*/
void begin() throws TransactionException;
/**
* 使用给定的超时和默认名称开始新的全局事务。
*
*/
void begin(int timeout) throws TransactionException;
/**
*使用给定的超时和给定的名称开始新的全局事务。
*
*/
void begin(int timeout, String name) throws TransactionException;
/**
* 提交全局事务。
*
*/
void commit() throws TransactionException;
/**
* 回滚全局事务。
*
*/
void rollback() throws TransactionException;
/**
* 暂停全局事务。
*
*/
SuspendedResourcesHolder suspend(boolean unbindXid) throws TransactionException;
/**
* 恢复全局事务。
*
*/
void resume(SuspendedResourcesHolder suspendedResourcesHolder) throws TransactionException;
/**
*向TC询问相应全局事务的当前状态。
*
*/
GlobalStatus getStatus() throws TransactionException;
/**
* 获取 XID.
*
* @return XID. xid
*/
String getXid();
/**
*
* 向tc报告全局事务状态
*
*/
void globalReport(GlobalStatus globalStatus) throws TransactionException;
/**
* 全局事务的本地状态。
*
*/
GlobalStatus getLocalStatus();
}1.4:DefaultGlobalTransaction
public class DefaultGlobalTransaction implements GlobalTransaction {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultGlobalTransaction.class);
//默认全局事务执行的超时事件
private static final int DEFAULT_GLOBAL_TX_TIMEOUT = 60000;
private static final String DEFAULT_GLOBAL_TX_NAME = "default";
private TransactionManager transactionManager;
//全局xid
private String xid;
//全局状态
private GlobalStatus status;
//当前执行流程在全局事务中的角色 Launcher 或Participant
private GlobalTransactionRole role;
private static final int COMMIT_RETRY_COUNT = ConfigurationFactory.getInstance().getInt(
ConfigurationKeys.CLIENT_TM_COMMIT_RETRY_COUNT, DEFAULT_TM_COMMIT_RETRY_COUNT);
private static final int ROLLBACK_RETRY_COUNT = ConfigurationFactory.getInstance().getInt(
ConfigurationKeys.CLIENT_TM_ROLLBACK_RETRY_COUNT, DEFAULT_TM_ROLLBACK_RETRY_COUNT);
/**
* 实例化新的默认全局事务。
*/
DefaultGlobalTransaction() {
this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
}
/**
* 实例化新的默认全局事务。
*
* @param xid the xid
* @param status the status
* @param role the role
*/
DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {
this.transactionManager = TransactionManagerHolder.get();
this.xid = xid;
this.status = status;
this.role = role;
}
@Override
public void begin() throws TransactionException {
begin(DEFAULT_GLOBAL_TX_TIMEOUT);
}
@Override
public void begin(int timeout) throws TransactionException {
begin(timeout, DEFAULT_GLOBAL_TX_NAME);
}
@Override
public void begin(int timeout, String name) throws TransactionException {
//验证角色为Launcher 提交者
if (role != GlobalTransactionRole.Launcher) {
//作为一个参与者 上下文中肯定包含xid 若不包含抛出异常
assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
}
return;
}
//验证xid是否为null 不为 null说明当前事务状态不对
assertXIDNull();
if (RootContext.getXID() != null) {
throw new IllegalStateException();
}
//获取一个新的xid
xid = transactionManager.begin(null, null, name, timeout);
//变更当前事务状态
status = GlobalStatus.Begin;
//将这个xid写入到全局事务上下文中
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid);
}
}
@Override
public void commit() throws TransactionException {
//如果是Participant参与者无法进行事务提交
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNotNull();
//提交重试次数 默认5次
int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
try {
//一直重试知道达到最大次数或commit成功
while (retry > 0) {
try {
//提交事务
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
//重试次数-1
retry--;
if (retry == 0) {
throw new TransactionException("Failed to report global commit", ex);
}
}
}
} finally {
if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {
//解绑xid
suspend(true);
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] commit status: {}", xid, status);
}
}
@Override
public void rollback() throws TransactionException {
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of rollback
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
}
return;
}
//验证xid是否有效
assertXIDNotNull();
//回滚重试次数 默认5次
int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
try {
while (retry > 0) {
try {
//获取回滚状态
status = transactionManager.rollback(xid);
break;
} catch (Throwable ex) {
LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
retry--;
if (retry == 0) {
throw new TransactionException("Failed to report global rollback", ex);
}
}
}
} finally {
if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {
//暂停任务 解绑xid
suspend(true);
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] rollback status: {}", xid, status);
}
}
/**
* 中断全局事务
* @param unbindXid if true,suspend the global transaction.
* @return
* @throws TransactionException
*/
@Override
public SuspendedResourcesHolder suspend(boolean unbindXid) throws TransactionException {
String xid = RootContext.getXID();
if (StringUtils.isNotEmpty(xid) && unbindXid) {
RootContext.unbind();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Suspending current transaction,xid = {}",xid);
}
} else {
xid = null;
}
return new SuspendedResourcesHolder(xid);
}
/**
* 继续全局事务
* @param suspendedResourcesHolder the suspended resources to resume
* @throws TransactionException
*/
@Override
public void resume(SuspendedResourcesHolder suspendedResourcesHolder) throws TransactionException {
if (suspendedResourcesHolder == null) {
return;
}
String xid = suspendedResourcesHolder.getXid();
if (StringUtils.isNotEmpty(xid)) {
RootContext.bind(xid);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Resumimg the transaction,xid = {}", xid);
}
}
}
/**
* 获取TC中携有的全局事务状态
* @return
* @throws TransactionException
*/
@Override
public GlobalStatus getStatus() throws TransactionException {
if (xid == null) {
return GlobalStatus.UnKnown;
}
status = transactionManager.getStatus(xid);
return status;
}
/**
* 获取全局xid
* @return
*/
@Override
public String getXid() {
return xid;
}
@Override
public void globalReport(GlobalStatus globalStatus) throws TransactionException {
assertXIDNotNull();
if (globalStatus == null) {
throw new IllegalStateException();
}
//远程向tc报告当前事务状态
status = transactionManager.globalReport(xid, globalStatus);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] report status: {}", xid, status);
}
if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {
suspend(true);
}
}
/**
* 获取本地持有的全局事务状态
* @return
*/
@Override
public GlobalStatus getLocalStatus() {
return status;
}
private void assertXIDNotNull() {
if (xid == null) {
throw new IllegalStateException();
}
}
private void assertXIDNull() {
if (xid != null) {
throw new IllegalStateException();
}
}
}
1.4:DefaultTransactionManager
用于管理TM与TC之间交互方法
public class DefaultTransactionManager implements TransactionManager {
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
//构建请求参数
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
//同步发送begin netty请求
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
//判断返回状态
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
//获取全局xid
return response.getXid();
}
@Override
public GlobalStatus commit(String xid) throws TransactionException {
//构建请求参数
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
//同步发送commit netty请求
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
//返回当前全局事务状态
return response.getGlobalStatus();
}
@Override
public GlobalStatus rollback(String xid) throws TransactionException {
//构建请求参数
GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
globalRollback.setXid(xid);
//同步发送rollback netty请求
GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
//返回当前全局事务状态
return response.getGlobalStatus();
}
@Override
public GlobalStatus getStatus(String xid) throws TransactionException {
GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
queryGlobalStatus.setXid(xid);
GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);
return response.getGlobalStatus();
}
@Override
public GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException {
GlobalReportRequest globalReport = new GlobalReportRequest();
globalReport.setXid(xid);
globalReport.setGlobalStatus(globalStatus);
//上报当前分支事务状态
GlobalReportResponse response = (GlobalReportResponse) syncCall(globalReport);
return response.getGlobalStatus();
}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
}
}
}
从上述代码可以看出通过sendSyncRequest()来发送netty请求,sendSyncRequest为TmNettyRemotingClient父类AbstractNettyRemotingClient方法。
1.5:AbstractNettyRemotingClient
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNettyRemotingClient.class);
private static final String MSG_ID_PREFIX = "msgId:";
private static final String FUTURES_PREFIX = "futures:";
private static final String SINGLE_LOG_POSTFIX = ";";
private static final int MAX_MERGE_SEND_MILLS = 1;
private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
private static final int MAX_MERGE_SEND_THREAD = 1;
private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L;
private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;
private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend";
protected final Object mergeLock = new Object();
/**
* When sending message type is {@link MergeMessage}, will be stored to mergeMsgMap.
*/
protected final Map<Integer, MergeMessage> mergeMsgMap = new ConcurrentHashMap<>();
/**
* When batch sending is enabled, the message will be stored to basketMap
* Send via asynchronous thread {@link MergedSendRunnable}
* {@link NettyClientConfig#isEnableClientBatchSendRequest}
*/
protected final ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap<>();
private final NettyClientBootstrap clientBootstrap;
private NettyClientChannelManager clientChannelManager;
private final NettyPoolKey.TransactionRole transactionRole;
private ExecutorService mergeSendExecutorService;
private TransactionMessageHandler transactionMessageHandler;
@Override
public void init() {
//定义周期延时任务默认10s 该任务用于TM与TC的channel的连接检测 对于断的Channel进行重连
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
//是否配置开启transport.enableClientBatchSendRequest即客户端事务消息请求是否批量合并发送 默认为true
//批量发送的原理使用 等待(汇集数据)唤醒(发送数据)机制 + CompletableFuture获取异步发送回调方法
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
//定义线程pool 默认1个线程
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
//运行MergedSendRunnable任务即合并发送请求
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
//netty client 启动
clientBootstrap.start();
}
public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
super(messageExecutor);
this.transactionRole = transactionRole;
clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);
//设置netty handler 接受netty 结果返回结果集并写入到 MessageFuture的rs中
clientBootstrap.setChannelHandlers(new ClientHandler());
clientChannelManager = new NettyClientChannelManager(
new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);
}
@Override
public Object sendSyncRequest(Object msg) throws TimeoutException {
//一般TC肯定为集群部署 通过负载均衡算法获取对应服务器地址
String serverAddress = loadBalance(getTransactionServiceGroup());
//获取请求超时时间 默认30s
int timeoutMillis = NettyClientConfig.getRpcRequestTimeout();
//构建请求信息
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
// send batch message
// put message into basketMap, @see MergedSendRunnable
//是否开启批量发送 默认开启 如果开启将信息缓存到basketMap 然后再统一发送
// 在等待批量发送阶段时 这些线程对应任务都在阻塞等待 知道发送成功后才被唤醒
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
// send batch message is sync request, needs to create messageFuture and put it in futures.
//发送批处理消息是同步请求,需要创建messageFuture(本质通过CompletableFuture实现)并将其放入futures缓存中。
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);
// 数据写入到basketMap 缓存中
ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
//获取请求TC地址对应的任务队列
BlockingQueue<RpcMessage> basket = map.get(serverAddress);
//为每一个TC地址创建一个发送任务队列
if (basket == null) {
//数据写入
map.putIfAbsent(serverAddress, new LinkedBlockingQueue<>());
basket = map.get(serverAddress);
}
//写入队列
basket.offer(rpcMessage);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("offer message: {}", rpcMessage.getBody());
}
//判断是否是否可以发送
if (!isSending) {
//唤醒mergeLock 锁下阻塞等待数据的MergedSendRunnable(一次会将堆积的数据全部请求发送掉 堆积的数据大小为上次MergedSendRunnable发送时间)
synchronized (mergeLock) {
mergeLock.notifyAll();
}
}
try {
//在超时时间内阻塞等待结果
return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception exx) {
LOGGER.error("wait response error:{},ip:{},request:{}",
exx.getMessage(), serverAddress, rpcMessage.getBody());
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
throw new RuntimeException(exx);
}
}
} else {
//获取连接通道Netty Channel
Channel channel = clientChannelManager.acquireChannel(serverAddress);
//直接将数据发送
return super.sendSync(channel, rpcMessage, timeoutMillis);
}
}
@Override
public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException {
if (channel == null) {
LOGGER.warn("sendSyncRequest nothing, caused by null channel.");
return null;
}
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
return super.sendSync(channel, rpcMessage, NettyClientConfig.getRpcRequestTimeout());
}
@Override
public void sendAsyncRequest(Channel channel, Object msg) {
if (channel == null) {
LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");
return;
}
//构建请求 RpcMessage
RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage
? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
: ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
if (rpcMessage.getBody() instanceof MergeMessage) {
//记录merge后消息缓存体
mergeMsgMap.put(rpcMessage.getId(), (MergeMessage) rpcMessage.getBody());
}
super.sendAsync(channel, rpcMessage);
}
@Override
public void sendAsyncResponse(String serverAddress, RpcMessage rpcMessage, Object msg) {
RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, ProtocolConstants.MSGTYPE_RESPONSE);
Channel channel = clientChannelManager.acquireChannel(serverAddress);
super.sendAsync(channel, rpcMsg);
}
/**
* 为每个messagetype 注册processor数据解析执行其器
* @param requestCode
* @param processor {@link RemotingProcessor}
* @param executor thread pool
*/
@Override
public void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
this.processorTable.put(requestCode, pair);
}
@Override
public void destroyChannel(String serverAddress, Channel channel) {
clientChannelManager.destroyChannel(serverAddress, channel);
}
@Override
public void destroy() {
clientBootstrap.shutdown();
if (mergeSendExecutorService != null) {
mergeSendExecutorService.shutdown();
}
super.destroy();
}
public void setTransactionMessageHandler(TransactionMessageHandler transactionMessageHandler) {
this.transactionMessageHandler = transactionMessageHandler;
}
public TransactionMessageHandler getTransactionMessageHandler() {
return transactionMessageHandler;
}
public NettyClientChannelManager getClientChannelManager() {
return clientChannelManager;
}
/**
* 根据注册中心获取tx对应实际地址 并根据负载均衡算法从集群地址中选取任意一个地址
* @param transactionServiceGroup tx-service-group 事务分组
* @return
*/
private String loadBalance(String transactionServiceGroup) {
InetSocketAddress address = null;
try {
@SuppressWarnings("unchecked")
//根据使用不同的注册中心获取tx对应实际地址()
List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().lookup(transactionServiceGroup);
//负载均衡算法(支持两种线性roundRobin与随机)从集群地址中选取任意一个地址
address = LoadBalanceFactory.getInstance().select(inetSocketAddressList);
} catch (Exception ex) {
LOGGER.error(ex.getMessage());
}
if (address == null) {
throw new FrameworkException(NoAvailableService);
}
//转化为标准http://ip+port地址
return NetUtil.toStringAddress(address);
}
private String getThreadPrefix() {
return AbstractNettyRemotingClient.MERGE_THREAD_PREFIX + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();
}
/**
* Get pool key function.
*
* @return lambda function
*/
protected abstract Function<String, NettyPoolKey> getPoolKeyFunction();
/**
* Get transaction service group.
*
* @return transaction service group
*/
protected abstract String getTransactionServiceGroup();
/**
* 合并数据发送任务
*/
private class MergedSendRunnable implements Runnable {
@Override
public void run() {
//死循环
while (true) {
//
synchronized (mergeLock) {
try {
//阻塞等待1s
mergeLock.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {
}
}
isSending = true;
//循环发送缓存basketMap中存储数据
for (String address : basketMap.keySet()) {
BlockingQueue<RpcMessage> basket = basketMap.get(address);
if (basket.isEmpty()) {
continue;
}
//message 包装类 封装批量的RpcMessage
MergedWarpMessage mergeMessage = new MergedWarpMessage();
while (!basket.isEmpty()) {
RpcMessage msg = basket.poll();
mergeMessage.msgs.add((AbstractMessage) msg.getBody());
mergeMessage.msgIds.add(msg.getId());
}
if (mergeMessage.msgIds.size() > 1) {
//打印批量日志
printMergeMessageLog(mergeMessage);
}
Channel sendChannel = null;
try {
// send batch message is sync request, but there is no need to get the return value.
// Since the messageFuture has been created before the message is placed in basketMap,
// the return value will be obtained in ClientOnResponseProcessor.
//获取send channel
sendChannel = clientChannelManager.acquireChannel(address);
//发送批量同步请求 这里不需要获取返回值 而是通过messageFuture中的ClientOnResponseProcessor来返回
AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
} catch (FrameworkException e) {
//发送失败处理
if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
destroyChannel(address, sendChannel);
}
// fast fail
//发送merge消息失败 将之前futures缓存的messageId与future关系异常并将future回调结果置为null
for (Integer msgId : mergeMessage.msgIds) {
MessageFuture messageFuture = futures.remove(msgId);
if (messageFuture != null) {
messageFuture.setResultMessage(null);
}
}
LOGGER.error("client merge call failed: {}", e.getMessage(), e);
}
}
isSending = false;
}
}
private void printMergeMessageLog(MergedWarpMessage mergeMessage) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("merge msg size:{}", mergeMessage.msgIds.size());
for (AbstractMessage cm : mergeMessage.msgs) {
LOGGER.debug(cm.toString());
}
StringBuilder sb = new StringBuilder();
for (long l : mergeMessage.msgIds) {
sb.append(MSG_ID_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);
}
sb.append("\n");
for (long l : futures.keySet()) {
sb.append(FUTURES_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);
}
LOGGER.debug(sb.toString());
}
}
}
/**
* The type ClientHandler.
*/
@Sharable
class ClientHandler extends ChannelDuplexHandler {
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
}
//处理TC返回的message数据
processMessage(ctx, (RpcMessage) msg);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
synchronized (lock) {
if (ctx.channel().isWritable()) {
lock.notifyAll();
}
}
ctx.fireChannelWritabilityChanged();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (messageExecutor.isShutdown()) {
return;
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("channel inactive: {}", ctx.channel());
}
clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress()));
super.channelInactive(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.READER_IDLE) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("channel {} read idle.", ctx.channel());
}
try {
String serverAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
clientChannelManager.invalidateObject(serverAddress, ctx.channel());
} catch (Exception exx) {
LOGGER.error(exx.getMessage());
} finally {
clientChannelManager.releaseChannel(ctx.channel(), getAddressFromContext(ctx));
}
}
if (idleStateEvent == IdleStateEvent.WRITER_IDLE_STATE_EVENT) {
try {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("will send ping msg,channel {}", ctx.channel());
}
AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(), HeartbeatMessage.PING);
} catch (Throwable throwable) {
LOGGER.error("send request error: {}", throwable.getMessage(), throwable);
}
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.error(FrameworkErrorCode.ExceptionCaught.getErrCode(),
NetUtil.toStringAddress(ctx.channel().remoteAddress()) + "connect exception. " + cause.getMessage(), cause);
clientChannelManager.releaseChannel(ctx.channel(), getAddressFromChannel(ctx.channel()));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("remove exception rm channel:{}", ctx.channel());
}
super.exceptionCaught(ctx, cause);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(ctx + " will closed");
}
super.close(ctx, future);
}
}
}
//单条记录发送
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
if (timeoutMillis <= 0) {
throw new FrameworkException("timeout should more than 0ms");
}
if (channel == null) {
LOGGER.warn("sendSync nothing, caused by null channel.");
return null;
}
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
//写入缓存futures
futures.put(rpcMessage.getId(), messageFuture);
channelWritableCheck(channel, rpcMessage.getBody());
//message数据写入netty channel 异步发送 注册回调listener
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
//异步回调失败处理
if (!future.isSuccess()) {
//从futures结果集中移除
MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
if (messageFuture1 != null) {
//记录失败原因
messageFuture1.setResultMessage(future.cause());
}
//销毁关闭 channel
destroyChannel(future.channel());
}
});
try {
//等待获取回调传回的
return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception exx) {
LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),
rpcMessage.getBody());
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
throw new RuntimeException(exx);
}
}
}
从上文源码可以看到根据EnableClientBatchSendRequest配置,由client决定是否将数据合并发送,如果该设置会加大seata TM的整体吞吐量,但会损失响应时长。关于这个配置可感觉实际业务场景设定,如果是大量的分布式事务场景,可以设置为true(默认也为true),若是少量可以设置为false,加快响应时间。不论单条还是merge消息聚合发送,本质是通过netty异步发送message,注册对于的listener回调监听,注册ChannelHandlers(ClientHandler继承ChannelInboundHandlerAdapter,在client接受response时执行),ClientHandler中获取response的body,通过body消息类型获取初始化绑定消息解析器(默认为ClientOnResponseProcessor,对于解析器的绑定动作在TmNettyRemotingClient或RmNettyRemotingClient的初始化init中执行)。而在ClientOnResponseProcessor中处理消息后将数据写入到MessageFuture的setResult中,这个MessageFuture中包含CompletableFuture类(对于MessageFuture提供的get或者setResult本质都是对CompletableFuture进行操作,MessageFuture算是对CompletableFuture的包装类吧)。在发送线程发送message后,线程通过MessageFuture提供的get的进行阻塞等待异步回调结果,只有当ClientOnResponseProcessor中对于的message有消息到达即setResult被执行,发送线程才能获取最终值返回值。对于此处的方式就是一个netty异步发送的具体实现。对于此处功能详细的描述,可自行参考源码。
class NettyClientChannelManager {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientChannelManager.class);
//每一个channel对应的lock 保证每一个TC server对只会有一个channel 被创建
private final ConcurrentMap<String, Object> channelLocks = new ConcurrentHashMap<>();
//TC-server地址(ip+port)为key 与NettyPoolKey(reg信息)关系
private final ConcurrentMap<String, NettyPoolKey> poolKeyMap = new ConcurrentHashMap<>();
//缓存所有channel TC-server地址(ip+port)为key channel为value RM 或TM client与TC集群的任意节点只会缓存一个channel
private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();
//管理neety clinet key(reg信息)与channel 关系
private final GenericKeyedObjectPool<NettyPoolKey, Channel> nettyClientKeyPool;
//Tm 或RM中getPoolKeyFunction函数(reg信息) 映射 serverAddress 与NettyPoolKey关系
private Function<String, NettyPoolKey> poolKeyFunction;
NettyClientChannelManager(final NettyPoolableFactory keyPoolableFactory, final Function<String, NettyPoolKey> poolKeyFunction,
final NettyClientConfig clientConfig) {
//初始化 chnnel连接缓存pool 使用apache中连接pool
nettyClientKeyPool = new GenericKeyedObjectPool<>(keyPoolableFactory);
nettyClientKeyPool.setConfig(getNettyPoolConfig(clientConfig));
this.poolKeyFunction = poolKeyFunction;
}
/**
* 本地配置转化为GenericKeyedObjectPool 中连接pool设置
* @param clientConfig
* @return
*/
private GenericKeyedObjectPool.Config getNettyPoolConfig(final NettyClientConfig clientConfig) {
//设置chnnel pool
GenericKeyedObjectPool.Config poolConfig = new GenericKeyedObjectPool.Config();
//最大存活数
poolConfig.maxActive = clientConfig.getMaxPoolActive();
//最小空闲数
poolConfig.minIdle = clientConfig.getMinPoolIdle();
//最大等待连接时间
poolConfig.maxWait = clientConfig.getMaxAcquireConnMills();
//测试
poolConfig.testOnBorrow = clientConfig.isPoolTestBorrow();
poolConfig.testOnReturn = clientConfig.isPoolTestReturn();
poolConfig.lifo = clientConfig.isPoolLifo();
return poolConfig;
}
/**
* 获取在当前Rpc客户端上注册的所有通道
*
* @return channels
*/
ConcurrentMap<String, Channel> getChannels() {
return channels;
}
/**
* 获与TC的netty客户端channel
*
* @param serverAddress server address
* @return netty channel
*/
Channel acquireChannel(String serverAddress) {
Channel channelToServer = channels.get(serverAddress);
//已经存在
if (channelToServer != null) {
//获取alive的channel
channelToServer = getExistAliveChannel(channelToServer, serverAddress);
if (channelToServer != null) {
return channelToServer;
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("will connect to " + serverAddress);
}
/*不存在channel 需要创建一个新的*/
//创建一个chanel对应lock
channelLocks.putIfAbsent(serverAddress, new Object());
//加锁保证只会创建一个channel
synchronized (channelLocks.get(serverAddress)) {
//创建一个channel
return doConnect(serverAddress);
}
}
/**
* Release channel to pool if necessary.
* 从pool中释放channel
* @param channel channel
* @param serverAddress server address
*/
void releaseChannel(Channel channel, String serverAddress) {
if (channel == null || serverAddress == null) { return; }
try {
//加锁
synchronized (channelLocks.get(serverAddress)) {
Channel ch = channels.get(serverAddress);
if (ch == null) {
nettyClientKeyPool.returnObject(poolKeyMap.get(serverAddress), channel);
return;
}
if (ch.compareTo(channel) == 0) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("return to pool, rm channel:{}", channel);
}
destroyChannel(serverAddress, channel);
} else {
nettyClientKeyPool.returnObject(poolKeyMap.get(serverAddress), channel);
}
}
} catch (Exception exx) {
LOGGER.error(exx.getMessage());
}
}
/**
* 销毁 channel.
*
* @param serverAddress server address
* @param channel channel
*/
void destroyChannel(String serverAddress, Channel channel) {
if (channel == null) { return; }
try {
//从缓存中移除
if (channel.equals(channels.get(serverAddress))) {
channels.remove(serverAddress);
}
nettyClientKeyPool.returnObject(poolKeyMap.get(serverAddress), channel);
} catch (Exception exx) {
LOGGER.error("return channel to rmPool error:{}", exx.getMessage());
}
}
/**
* Reconnect to remote server of current transaction service group.
* 从新连接远程远程服务
* @param transactionServiceGroup transaction service group
*/
void reconnect(String transactionServiceGroup) {
List<String> availList = null;
try {
//从注册中心获取有效服务地址
availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {
LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
return;
}
if (CollectionUtils.isEmpty(availList)) {
//error 日志打印
String serviceGroup = RegistryFactory.getInstance()
.getServiceGroup(transactionServiceGroup);
LOGGER.error("no available service '{}' found, please make sure registry config correct", serviceGroup);
return;
}
for (String serverAddress : availList) {
try {
//重新构建channel
acquireChannel(serverAddress);
} catch (Exception e) {
LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);
}
}
}
/**
* 从pool中作废channel
* @param serverAddress tc地址
* @param channel
* @throws Exception
*/
void invalidateObject(final String serverAddress, final Channel channel) throws Exception {
nettyClientKeyPool.invalidateObject(poolKeyMap.get(serverAddress), channel);
}
/**
* 注册一个channel到本地缓存
* @param serverAddress tc地址
* @param channel
*/
void registerChannel(final String serverAddress, final Channel channel) {
//判断channel有效写入缓存
if (channels.get(serverAddress) != null && channels.get(serverAddress).isActive()) {
return;
}
channels.put(serverAddress, channel);
}
/**
* 创建一个新的channel
* @param serverAddress
* @return
*/
private Channel doConnect(String serverAddress) {
//再次校验
Channel channelToServer = channels.get(serverAddress);
if (channelToServer != null && channelToServer.isActive()) {
return channelToServer;
}
Channel channelFromPool;
try {
//获取reg NettyPoolKey
NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
//写入缓存
NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);
//校验是否为RM reg request请求 若是需要写入resourceIds
if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {
RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();
((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());
}
//写入pool中
channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
channels.put(serverAddress, channelFromPool);
} catch (Exception exx) {
LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx);
throw new FrameworkException("can not register RM,err:" + exx.getMessage());
}
return channelFromPool;
}
private List<String> getAvailServerList(String transactionServiceGroup) throws Exception {
//根据服务名称从注册中心中获取有效的服务信息列表
List<InetSocketAddress> availInetSocketAddressList = RegistryFactory.getInstance()
.lookup(transactionServiceGroup);
if (CollectionUtils.isEmpty(availInetSocketAddressList)) {
return Collections.emptyList();
}
//转化数据格式
return availInetSocketAddressList.stream()
.map(NetUtil::toStringAddress)
.collect(Collectors.toList());
}
private Channel getExistAliveChannel(Channel rmChannel, String serverAddress) {
if (rmChannel.isActive()) {
return rmChannel;
} else {
int i = 0;
//重新校验channel 是否alive(默认300次-共3s)
for (; i < NettyClientConfig.getMaxCheckAliveRetry(); i++) {
try {
//等待10ms
Thread.sleep(NettyClientConfig.getCheckAliveInternal());
} catch (InterruptedException exx) {
LOGGER.error(exx.getMessage());
}
//重新校验
rmChannel = channels.get(serverAddress);
if (rmChannel != null && rmChannel.isActive()) {
return rmChannel;
}
}
//警告 移除无效channel
if (i == NettyClientConfig.getMaxCheckAliveRetry()) {
LOGGER.warn("channel {} is not active after long wait, close it.", rmChannel);
releaseChannel(rmChannel, serverAddress);
return null;
}
}
return null;
}
}管理与TC之间的channel的NettyClientChannelManager类
NettyPoolableFactorypublic class NettyPoolableFactory implements KeyedPoolableObjectFactory<NettyPoolKey, Channel> {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyPoolableFactory.class);
private final AbstractNettyRemotingClient rpcRemotingClient;
private final NettyClientBootstrap clientBootstrap;
/**
* Instantiates a new Netty key poolable factory.
*
* @param rpcRemotingClient the rpc remoting client
*/
public NettyPoolableFactory(AbstractNettyRemotingClient rpcRemotingClient, NettyClientBootstrap clientBootstrap) {
this.rpcRemotingClient = rpcRemotingClient;
this.clientBootstrap = clientBootstrap;
}
//构建一个新的channel
@Override
public Channel makeObject(NettyPoolKey key) {
InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());
if (LOGGER.isInfoEnabled()) {
LOGGER.info("NettyPool create channel to " + key);
}
Channel tmpChannel = clientBootstrap.getNewChannel(address);
long start = System.currentTimeMillis();
Object response;
Channel channelToServer = null;
if (key.getMessage() == null) {
throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name());
}
try {
response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage());
if (!isRegisterSuccess(response, key.getTransactionRole())) {
rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());
} else {
channelToServer = tmpChannel;
rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage());
}
} catch (Exception exx) {
if (tmpChannel != null) {
tmpChannel.close();
}
throw new FrameworkException(
"register " + key.getTransactionRole().name() + " error, errMsg:" + exx.getMessage());
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("register success, cost " + (System.currentTimeMillis() - start) + " ms, version:" + getVersion(
response, key.getTransactionRole()) + ",role:" + key.getTransactionRole().name() + ",channel:"
+ channelToServer);
}
return channelToServer;
}
//判断是否channel 是否reg成功
private boolean isRegisterSuccess(Object response, NettyPoolKey.TransactionRole transactionRole) {
if (response == null) {
return false;
}
if (transactionRole.equals(NettyPoolKey.TransactionRole.TMROLE)) {
if (!(response instanceof RegisterTMResponse)) {
return false;
}
RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;
return registerTMResponse.isIdentified();
} else if (transactionRole.equals(NettyPoolKey.TransactionRole.RMROLE)) {
if (!(response instanceof RegisterRMResponse)) {
return false;
}
RegisterRMResponse registerRMResponse = (RegisterRMResponse)response;
return registerRMResponse.isIdentified();
}
return false;
}
private String getVersion(Object response, NettyPoolKey.TransactionRole transactionRole) {
if (transactionRole.equals(NettyPoolKey.TransactionRole.TMROLE)) {
return ((RegisterTMResponse) response).getVersion();
} else {
return ((RegisterRMResponse) response).getVersion();
}
}
@Override
public void destroyObject(NettyPoolKey key, Channel channel) throws Exception {
if (channel != null) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("will destroy channel:" + channel);
}
channel.disconnect();
channel.close();
}
}
@Override
public boolean validateObject(NettyPoolKey key, Channel obj) {
if (obj != null && obj.isActive()) {
return true;
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("channel valid false,channel:" + obj);
}
return false;
}
@Override
public void activateObject(NettyPoolKey key, Channel obj) throws Exception {
}
@Override
public void passivateObject(NettyPoolKey key, Channel obj) throws Exception {
}
}
上述描述对channel管理,包含创建,重连,移除等动作
2.0:异常处理
在GlobalTransactionalInterceptor中描述handleGlobalTransaction()的正常流程,当transactionalTemplate.execute()发生了异常情况,根据同步的异常类型,seata有着不同的处理方式。处理异常类由FailureHandler接口体现,如下所示:
public interface FailureHandler {
/**
* On begin failure.
*
*/
void onBeginFailure(GlobalTransaction tx, Throwable cause);
/**
* On commit failure.
*
*/
void onCommitFailure(GlobalTransaction tx, Throwable cause);
/**
* On rollback failure.
*
*/
void onRollbackFailure(GlobalTransaction tx, Throwable originalException);
/**
* On rollback retrying
*
*/
void onRollbackRetrying(GlobalTransaction tx, Throwable originalException);
}从上述源码可以看出,FailureHandler中封装了TM与TC交互中基本所有异常的异常处理流程,它的默认实现DefaultFailureHandlerImpl,在GlobalTransactionalInterceptor初始化时被指定,如下所示:
public class DefaultFailureHandlerImpl implements FailureHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultFailureHandlerImpl.class);
/**
* 重试最大时间默认1个小时 每次重试间隔时间10s 360次 共一个小时
*/
private static final int RETRY_MAX_TIMES = 6 * 60;
//计划间隔秒数 默认10s
private static final long SCHEDULE_INTERVAL_SECONDS = 10;
private static final long TICK_DURATION = 1;
private static final int TICKS_PER_WHEEL = 8;
//timer 定时时间轮用于定时检测TC中对于事务状态
private HashedWheelTimer timer = new HashedWheelTimer(
new NamedThreadFactory("failedTransactionRetry", 1),
TICK_DURATION, TimeUnit.SECONDS, TICKS_PER_WHEEL);
@Override
public void onBeginFailure(GlobalTransaction tx, Throwable cause) {
//在begin阶段 无任何业务逻辑执行 无需重试
LOGGER.warn("Failed to begin transaction. ", cause);
}
@Override
public void onCommitFailure(GlobalTransaction tx, Throwable cause) {
//在全局提交阶段 该阶段发生任何异常 不断测试TC中全局事务状态
LOGGER.warn("Failed to commit transaction[" + tx.getXid() + "]", cause);
timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.Committed), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
@Override
public void onRollbackFailure(GlobalTransaction tx, Throwable originalException) {
//在全局回滚阶段 该阶段发生任何异常 不断测试TC中全局事务状态
LOGGER.warn("Failed to rollback transaction[" + tx.getXid() + "]", originalException);
//定时器每隔10s进行Rollbacked状态检测
timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.Rollbacked), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
@Override
public void onRollbackRetrying(GlobalTransaction tx, Throwable originalException) {
StackTraceLogger.warn(LOGGER, originalException, "Retrying to rollback transaction[{}]", new String[] {tx.getXid()});
timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.RollbackRetrying), SCHEDULE_INTERVAL_SECONDS,
TimeUnit.SECONDS);
}
/**
* 异常重试
*/
protected class CheckTimerTask implements TimerTask {
private final GlobalTransaction tx;
//确认状态
private final GlobalStatus required;
//记录重试次数
private int count = 0;
//重试标识 直到重试成功
private boolean isStopped = false;
protected CheckTimerTask(final GlobalTransaction tx, GlobalStatus required) {
this.tx = tx;
this.required = required;
}
@Override
public void run(Timeout timeout) throws Exception {
if (!isStopped) {
//
if (++count > RETRY_MAX_TIMES) {
//超过次数重新再来
LOGGER.error("transaction [{}] retry fetch status times exceed the limit [{} times]", tx.getXid(), RETRY_MAX_TIMES);
return;
}
//通过查询当前事务在TC中状态
isStopped = shouldStop(tx, required);
//不断重试
timer.newTimeout(this, SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
}
}
private boolean shouldStop(final GlobalTransaction tx, GlobalStatus required) {
try {
//获取tc中该事务当前状态
GlobalStatus status = tx.getStatus();
LOGGER.info("transaction [{}] current status is [{}]", tx.getXid(), status);
//当前全局事务状态为确认或终态时才能结束
if (status == required || status == GlobalStatus.Finished) {
return true;
}
} catch (TransactionException e) {
LOGGER.error("fetch GlobalTransaction status error", e);
}
return false;
}
}2:seata客户端-RM(基于springcloud项目分析)
上述描述了TM大大致使用流程,在GlobalTransactionScanner初始化时一起被初始化。这里感觉有些服务可能不需要TC而只作为一个分支RM使用,所以这里个人感觉没必要两个都进行初始化,可以根据使用者的选择进行。
2.1:客户端RM client
rm与TM类似,在1.2与1.3版本中对于管理channel类有着不同的命名
public class RMClient {
/**
* 初始化
*
* @param applicationId the application id
* @param transactionServiceGroup the transaction service group
*/
public static void init(String applicationId, String transactionServiceGroup) {
//单列 获取RmNettyRemotingClient
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
//设置DefaultResourceManager 单例模式 该类使用策略模式 携有BranchType类型对应ResouceManager
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
//设置DefaultRMHandler 单例模式 该类使用策略模式 携有BranchType类型对应RMHandler
// 针对不同类型client的分布式事务实现具体使用不同策略
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
//初始化
rmNettyRemotingClient.init();
}
}RmNettyRemotingClient与TmNettyRemotingClient都继承AbstractNettyRemotingClient了,所以RmNettyRemotingClient的初始化过程与TmNettyRemotingClient基本差不多,所以这里也不再叙述。
2.1:DefaultRMHandler
携有BranchType类型对应RMHandler 针对不同类型client的分布式事务实现具体使用不同策略
public class DefaultRMHandler extends AbstractRMHandler {
//记录BranchType(AT XA TCC SAGA) 与其对应4种RMHandler
protected static Map<BranchType, AbstractRMHandler> allRMHandlersMap
= new ConcurrentHashMap<BranchType, AbstractRMHandler>();
protected DefaultRMHandler() {
initRMHandlers();
}
protected void initRMHandlers() {
List<AbstractRMHandler> allRMHandlers = EnhancedServiceLoader.loadAll(AbstractRMHandler.class);
if (CollectionUtils.isNotEmpty(allRMHandlers)) {
for (AbstractRMHandler rmHandler : allRMHandlers) {
allRMHandlersMap.put(rmHandler.getBranchType(), rmHandler);
}
}
}
//针对 commit rollbakc undologdel 3种流程的handler方法
@Override
public BranchCommitResponse handle(BranchCommitRequest request) {
return getRMHandler(request.getBranchType()).handle(request);
}
@Override
public BranchRollbackResponse handle(BranchRollbackRequest request) {
return getRMHandler(request.getBranchType()).handle(request);
}
@Override
public void handle(UndoLogDeleteRequest request) {
getRMHandler(request.getBranchType()).handle(request);
}
protected AbstractRMHandler getRMHandler(BranchType branchType) {
return allRMHandlersMap.get(branchType);
}
@Override
protected ResourceManager getResourceManager() {
throw new FrameworkException("DefaultRMHandler isn't a real AbstractRMHandler");
}
private static class SingletonHolder {
private static AbstractRMHandler INSTANCE = new DefaultRMHandler();
}
/**
*单例获取DefaultRMHandler
*
* @return the resource manager
*/
public static AbstractRMHandler get() {
return DefaultRMHandler.SingletonHolder.INSTANCE;
}
@Override
public BranchType getBranchType() {
throw new FrameworkException("DefaultRMHandler isn't a real AbstractRMHandler");
}
}父抽象类 AbstractRMHandler封装具体执行流程,并调用底层ResourceManage执行RM数据层逻辑
public abstract class AbstractRMHandler extends AbstractExceptionHandler
implements RMInboundHandler, TransactionMessageHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRMHandler.class);
//模版方法模式
@Override
public BranchCommitResponse handle(BranchCommitRequest request) {
BranchCommitResponse response = new BranchCommitResponse();
//执行异常处理统一模版
exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
@Override
public void execute(BranchCommitRequest request, BranchCommitResponse response)
throws TransactionException {
doBranchCommit(request, response);
}
}, request, response);
return response;
}
@Override
public BranchRollbackResponse handle(BranchRollbackRequest request) {
BranchRollbackResponse response = new BranchRollbackResponse();
exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
@Override
public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
throws TransactionException {
doBranchRollback(request, response);
}
}, request, response);
return response;
}
/**
* delete undo log 针对于AT下模式
* @param request the request
*/
@Override
public void handle(UndoLogDeleteRequest request) {
// https://github.com/seata/seata/issues/2226
}
/**
* Do branch commit.
*
* @param request the request
* @param response the response
* @throws TransactionException the transaction exception
*/
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
}
//根据不同ResourceManager 提交底层流程
//获取该分支事务执行状态 后续上报TC
BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
applicationData);
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch commit result: " + status);
}
}
/**
* Do branch rollback.
*
* @param request the request
* @param response the response
* @throws TransactionException the transaction exception
*/
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
}
//获取该分支事务执行状态 后续上报TC
BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
applicationData);
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch Rollbacked result: " + status);
}
}
/**
* get resource manager implement
* 对应4种分布式事务branch 模式 AT XA TCC SAGA
*
* @return
*/
protected abstract ResourceManager getResourceManager();
@Override
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
if (!(request instanceof AbstractTransactionRequestToRM)) {
throw new IllegalArgumentException();
}
//
AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
transactionRequest.setRMInboundMessageHandler(this);
return transactionRequest.handle(context);
}
@Override
public void onResponse(AbstractResultMessage response, RpcContext context) {
LOGGER.info("the rm client received response msg [{}] from tc server.", response.toString());
}
public abstract BranchType getBranchType();
}
2.2:DefaultResourceManager(ResourceManager)
该类的设计策略与DefaultRMHandler一样,也是使用策略设计模式,内部包含了4种(AT,XA,TCC,SAGA)分布式事务分支对应的具体ResourceManager,由handler中调用,具体执行数据层的分支事务执行。
public class DefaultResourceManager implements ResourceManager {
/**
* all resource managers
*/
protected static Map<BranchType, ResourceManager> resourceManagers
= new ConcurrentHashMap<>();
private DefaultResourceManager() {
initResourceManagers();
}
/**
* Get resource manager.
*
* @return the resource manager
*/
public static DefaultResourceManager get() {
return SingletonHolder.INSTANCE;
}
/**
* only for mock
*
* @param branchType
* @param rm
*/
public static void mockResourceManager(BranchType branchType, ResourceManager rm) {
resourceManagers.put(branchType, rm);
}
protected void initResourceManagers() {
//初始化所有ResourceManager 并写入缓存中
List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);
if (CollectionUtils.isNotEmpty(allResourceManagers)) {
for (ResourceManager rm : allResourceManagers) {
resourceManagers.put(rm.getBranchType(), rm);
}
}
}
//分支 resource commit 本质就是删除undo日志
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId,
String resourceId, String applicationData)
throws TransactionException {
return getResourceManager(branchType).branchCommit(branchType, xid, branchId, resourceId, applicationData);
}
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId,
String resourceId, String applicationData)
throws TransactionException {
return getResourceManager(branchType).branchRollback(branchType, xid, branchId, resourceId, applicationData);
}
@Override
public Long branchRegister(BranchType branchType, String resourceId,
String clientId, String xid, String applicationData, String lockKeys)
throws TransactionException {
return getResourceManager(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData,
lockKeys);
}
@Override
public void branchReport(BranchType branchType, String xid, long branchId, BranchStatus status,
String applicationData) throws TransactionException {
getResourceManager(branchType).branchReport(branchType, xid, branchId, status, applicationData);
}
@Override
public boolean lockQuery(BranchType branchType, String resourceId,
String xid, String lockKeys) throws TransactionException {
return getResourceManager(branchType).lockQuery(branchType, resourceId, xid, lockKeys);
}
@Override
public void registerResource(Resource resource) {
getResourceManager(resource.getBranchType()).registerResource(resource);
}
@Override
public void unregisterResource(Resource resource) {
getResourceManager(resource.getBranchType()).unregisterResource(resource);
}
@Override
public Map<String, Resource> getManagedResources() {
Map<String, Resource> allResource = new HashMap<>();
for (ResourceManager rm : resourceManagers.values()) {
Map<String, Resource> tempResources = rm.getManagedResources();
if (tempResources != null) {
allResource.putAll(tempResources);
}
}
return allResource;
}
/**
* get ResourceManager by Resource Type
*
* @param branchType
* @return
*/
public ResourceManager getResourceManager(BranchType branchType) {
ResourceManager rm = resourceManagers.get(branchType);
if (rm == null) {
throw new FrameworkException("No ResourceManager for BranchType:" + branchType.name());
}
return rm;
}
@Override
public BranchType getBranchType() {
throw new FrameworkException("DefaultResourceManager isn't a real ResourceManager");
}
private static class SingletonHolder {
private static DefaultResourceManager INSTANCE = new DefaultResourceManager();
}
}这里以AT模式为主 介绍其DatasourceManager(它底层通过AsyncWorker进行异步事务执行)
public class DataSourceManager extends AbstractResourceManager implements Initialize {
private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceManager.class);
private ResourceManagerInbound asyncWorker;
private Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();
/**
* Sets async worker.
*
* @param asyncWorker the async worker
*/
public void setAsyncWorker(ResourceManagerInbound asyncWorker) {
this.asyncWorker = asyncWorker;
}
/**
* 锁查询 在拥有GlobalLock判断当前是否存在全局锁
*/
@Override
public boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys)
throws TransactionException {
try {
//封装请求
GlobalLockQueryRequest request = new GlobalLockQueryRequest();
request.setXid(xid);
request.setLockKey(lockKeys);
request.setResourceId(resourceId);
GlobalLockQueryResponse response = null;
if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
//请求TC 或者结果
response = (GlobalLockQueryResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
} else {
throw new RuntimeException("unknow situation!");
}
if (response.getResultCode() == ResultCode.Failed) {
throw new TransactionException(response.getTransactionExceptionCode(),
"Response[" + response.getMsg() + "]");
}
//是否被锁定
return response.isLockable();
} catch (TimeoutException toe) {
throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
} catch (RuntimeException rex) {
throw new RmTransactionException(TransactionExceptionCode.LockableCheckFailed, "Runtime", rex);
}
}
@Deprecated
@SuppressWarnings("unchecked")
private String loadBalance() {
InetSocketAddress address = null;
try {
List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().lookup(
TmNettyRemotingClient.getInstance().getTransactionServiceGroup());
address = LoadBalanceFactory.getInstance().select(inetSocketAddressList);
} catch (Exception ignore) {
LOGGER.error(ignore.getMessage());
}
if (address == null) {
throw new FrameworkException(NoAvailableService);
}
return NetUtil.toStringAddress(address);
}
/**
* Init.
*
* @param asyncWorker the async worker
*/
public synchronized void initAsyncWorker(ResourceManagerInbound asyncWorker) {
setAsyncWorker(asyncWorker);
}
/**
* Instantiates a new Data source manager.
*/
public DataSourceManager() {
}
@Override
public void init() {
//创建AsyncWorker并初始化
AsyncWorker asyncWorker = new AsyncWorker();
asyncWorker.init();
initAsyncWorker(asyncWorker);
}
//注册Resource代理对象 缓存在本地
@Override
public void registerResource(Resource resource) {
DataSourceProxy dataSourceProxy = (DataSourceProxy) resource;
//ResourceId 与DataSourceProxy映射关系
dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);
super.registerResource(dataSourceProxy);
}
@Override
public void unregisterResource(Resource resource) {
throw new NotSupportYetException("unregister a resource");
}
/**
*获取datasource代理对象
*
* @param resourceId the resource id
* @return the data source proxy
*/
public DataSourceProxy get(String resourceId) {
return (DataSourceProxy) dataSourceCache.get(resourceId);
}
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
}
//二阶段 分支回滚
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
//获取resource下对应DataSource代理对象
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
//一般undo日志存在需要改变的事务数据源下 执行其undo数据
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
StackTraceLogger.info(LOGGER, te,
"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
//返回二阶段回滚
return BranchStatus.PhaseTwo_Rollbacked;
}
@Override
public Map<String, Resource> getManagedResources() {
return dataSourceCache;
}
@Override
public BranchType getBranchType() {
return BranchType.AT;
}
}AsyncWorker异步执行分支事务(主要执行RM端的二次提交-删除undo日志):public class AsyncWorker implements ResourceManagerInbound {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncWorker.class);
//默认
private static final int DEFAULT_RESOURCE_SIZE = 16;
//避免在高并发下导致存在大量commit请求 一次删除过于庞大 所以定义一个循环下最大的undolog删除数量
private static final int UNDOLOG_DELETE_LIMIT_SIZE = 1000;
/**
* 2阶段Context 包含commit所需要数据
*/
private static class Phase2Context {
/**
* 实例化一个新的2阶段Context
*
* @param branchType the branchType
* @param xid the xid
* @param branchId the branch id
* @param resourceId the resource id
* @param applicationData the application data
*/
public Phase2Context(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) {
this.xid = xid;
this.branchId = branchId;
this.resourceId = resourceId;
this.applicationData = applicationData;
this.branchType = branchType;
}
/**
* The Xid.
*/
String xid;
/**
* The Branch id.
*/
long branchId;
/**
* The Resource id.
*/
String resourceId;
/**
* The Application data.
*/
String applicationData;
/**
* the branch Type
*/
BranchType branchType;
}
//异步提交 buffer数 默认10000
private static int ASYNC_COMMIT_BUFFER_LIMIT = ConfigurationFactory.getInstance().getInt(
CLIENT_ASYNC_COMMIT_BUFFER_LIMIT, DEFAULT_CLIENT_ASYNC_COMMIT_BUFFER_LIMIT);
//异步提交阻塞队列存储需要提交Phase2Context
private static final BlockingQueue<Phase2Context> ASYNC_COMMIT_BUFFER = new LinkedBlockingQueue<>(
ASYNC_COMMIT_BUFFER_LIMIT);
//分支提交
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
//新建Phase2Context存储队列
if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid);
}
//返回中间状态
return BranchStatus.PhaseTwo_Committed;
}
/**
* Init. 在DataSourceManager中被初始化
*/
public synchronized void init() {
LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
//定义调度器
ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));
//定时调度 没1s调度一次
timerExecutor.scheduleAtFixedRate(() -> {
try {
//执行分支批量提交
doBranchCommits();
} catch (Throwable e) {
LOGGER.info("Failed at async committing ... {}", e.getMessage());
}
}, 10, 1000 * 1, TimeUnit.MILLISECONDS);
}
private void doBranchCommits() {
if (ASYNC_COMMIT_BUFFER.isEmpty()) {
return;
}
//映射上下文 缓存resourceId 下多次commit请求
Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
//从buffer队列中获取 上文提交的Phase2Context数据 直到空队列
while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
//存在多数据源 多resourceId
List<Phase2Context> contextsGroupedByResourceId = mappedContexts.computeIfAbsent(commitContext.resourceId, k -> new ArrayList<>());
contextsGroupedByResourceId.add(commitContext);
}
//单独处理每一个resource 数据源
for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
Connection conn = null;
DataSourceProxy dataSourceProxy;
try {
try {
//获取resource对应 dataSourceProxy 数据源DataSource代理对象
DataSourceManager resourceManager = (DataSourceManager) DefaultResourceManager.get()
.getResourceManager(BranchType.AT);
dataSourceProxy = resourceManager.get(entry.getKey());
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());
}
//获取数据库连接对象
conn = dataSourceProxy.getPlainConnection();
} catch (SQLException sqle) {
LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
continue;
}
//获取需要执行的commit数据
List<Phase2Context> contextsGroupedByResourceId = entry.getValue();
//封装 xids 与branchIds
Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
for (Phase2Context commitContext : contextsGroupedByResourceId) {
xids.add(commitContext.xid);
branchIds.add(commitContext.branchId);
int maxSize = Math.max(xids.size(), branchIds.size());
//避免一次性批量删除过多数据 所以这里每过1000条执行一次该数据源下UndoLog清理
if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
try {
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(
xids, branchIds, conn);
} catch (Exception ex) {
LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
}
xids.clear();
branchIds.clear();
}
}
if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
return;
}
try {
//说明目前数据存量未达到1000条标准 直接删除undolog
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids,
branchIds, conn);
} catch (Exception ex) {
LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
}
if (!conn.getAutoCommit()) {
conn.commit();
}
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
try {
//执行失败回滚
conn.rollback();
} catch (SQLException rollbackEx) {
LOGGER.warn("Failed to rollback JDBC resource while deleting undo_log ", rollbackEx);
}
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
}
}
}
}
}
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
throw new NotSupportYetException();
}
}这里可以看出如果在执行undo log批量删除的时候发送错误,导致数据回滚,从而导致undo日志无法删除。这样存储数据堆积风险。seta是通过TC定时发送undo log删除命令给RM做到这些数据的清除,详细参考RmUndoLogProcessor
2.3:处理TC传来的Message 的Processor
根据消息类型 (类型可详细参考常量类MessageType )的不同选择使用不同Processor与执行线程,对于这些Processor的注册与TM的描述一致。RmNettyRemotingClient中init中执行。初始化过程如下所示:
private void registerProcessor() {
// 1.registry rm client handle branch commit processor
RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
// 2.registry rm client handle branch rollback processor
RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
// 3.registry rm handler undo log processor
RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
// 4.registry TC response processor
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
// 5.registry heartbeat message processor
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}2.3.1:处理commit的RmBranchCommitProcessor
处理分支commit的processor,实际就是接受TM发起的二阶段commit,本质就是删除undo日志
public class RmBranchCommitProcessor implements RemotingProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(RmBranchCommitProcessor.class);
private TransactionMessageHandler handler;
private RemotingClient remotingClient;
public RmBranchCommitProcessor(TransactionMessageHandler handler, RemotingClient remotingClient) {
this.handler = handler;
this.remotingClient = remotingClient;
}
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
//获取TC远程地址
String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
//获取返回数据
Object msg = rpcMessage.getBody();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("rm client handle branch commit process:" + msg);
}
//执行分支commit
handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);
}
private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {
BranchCommitResponse resultMessage;
//通过本地branch事务manage执行本地事务
resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("branch commit result:" + resultMessage);
}
try {
//异步向TC汇报本地分支事务执行结果
this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
} catch (Throwable throwable) {
LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);
}
}
}具体handler执行由上文 DefaultRMHandler执行。拿到最终分支执行事务执行结果上报给TC,由TC决定整体事务流程。
2.3.2:处理rollback的RmBranchRollbackProcessor
处理分支rollback的processor,实际就是接受TM发起的二阶段rollback,本质就是执行undo日志,达到回滚目的
public class RmBranchRollbackProcessor implements RemotingProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(RmBranchRollbackProcessor.class);
private TransactionMessageHandler handler;
private RemotingClient remotingClient;
public RmBranchRollbackProcessor(TransactionMessageHandler handler, RemotingClient remotingClient) {
this.handler = handler;
this.remotingClient = remotingClient;
}
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
//获取TC地址
String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
Object msg = rpcMessage.getBody();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("rm handle branch rollback process:" + msg);
}
handleBranchRollback(rpcMessage, remoteAddress, (BranchRollbackRequest) msg);
}
private void handleBranchRollback(RpcMessage request, String serverAddress, BranchRollbackRequest branchRollbackRequest) {
BranchRollbackResponse resultMessage;
//本地执行undo 回滚 底层调用链路 handler-》resourceManager
resultMessage = (BranchRollbackResponse) handler.onRequest(branchRollbackRequest, null);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("branch rollback result:" + resultMessage);
}
try {
//发送TC分支执行回滚结果
this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
} catch (Throwable throwable) {
LOGGER.error("send response error: {}", throwable.getMessage(), throwable);
}
}
}
2.3.3:处理undoLog的RmUndoLogProcessor
处理TC发起的undo log删除命令
/**
* 处理TC undo log delete命令
* {@link UndoLogDeleteRequest}
*
*/
public class RmUndoLogProcessor implements RemotingProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(RmUndoLogProcessor.class);
private TransactionMessageHandler handler;
public RmUndoLogProcessor(TransactionMessageHandler handler) {
this.handler = handler;
}
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
//获取tc发送的UndoLogDeleteRequest数据
Object msg = rpcMessage.getBody();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("rm handle undo log process:" + msg);
}
//执行undo log删除命令
handleUndoLogDelete((UndoLogDeleteRequest) msg);
}
private void handleUndoLogDelete(UndoLogDeleteRequest undoLogDeleteRequest) {
try {
//底层调用链路 直接在RMHandlerAT中执行
handler.onRequest(undoLogDeleteRequest, null);
} catch (Exception e) {
LOGGER.error("Failed to delete undo log by undoLogDeleteRequest on" + undoLogDeleteRequest.getResourceId());
}
}
}RMHandlerAT:处理AT模式下的handlerpublic class RMHandlerAT extends AbstractRMHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(RMHandlerAT.class);
private static final int LIMIT_ROWS = 3000;
/**
* 处理UndoLogDeleteRequest 请求
* @param request the request
*/
@Override
public void handle(UndoLogDeleteRequest request) {
//获取需要处理的DataSource 代理对象
DataSourceManager dataSourceManager = (DataSourceManager)getResourceManager();
DataSourceProxy dataSourceProxy = dataSourceManager.get(request.getResourceId());
if (dataSourceProxy == null) {
LOGGER.warn("Failed to get dataSourceProxy for delete undolog on {}", request.getResourceId());
return;
}
//获取当前时间对应前SaveDays天数(默认7天)
Date logCreatedSave = getLogCreated(request.getSaveDays());
Connection conn = null;
try {
conn = dataSourceProxy.getPlainConnection();
//记录被删除rows
int deleteRows = 0;
do {
try {
//删除undo表crate时间小于指定天数前3000条数据
deleteRows = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType())
.deleteUndoLogByLogCreated(logCreatedSave, LIMIT_ROWS, conn);
if (deleteRows > 0 && !conn.getAutoCommit()) {
//手动commit
conn.commit();
}
} catch (SQLException exx) {
if (deleteRows > 0 && !conn.getAutoCommit()) {
conn.rollback();
}
throw exx;
}
//每次删除3000 直到数据被删除干净
} while (deleteRows == LIMIT_ROWS);
} catch (Exception e) {
LOGGER.error("Failed to delete expired undo_log, error:{}", e.getMessage(), e);
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
}
}
}
}
//获取删除undo条件 时间
private Date getLogCreated(int saveDays) {
if (saveDays <= 0) {
saveDays = UndoLogDeleteRequest.DEFAULT_SAVE_DAYS;
}
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.DATE, -saveDays);
return calendar.getTime();
}
/**
* get AT resource managerDataSourceManager.java
*
* @return
*/
@Override
protected ResourceManager getResourceManager() {
return DefaultResourceManager.get().getResourceManager(BranchType.AT);
}
@Override
public BranchType getBranchType() {
return BranchType.AT;
}
}这样就可以保证RM中undo表的体积不会因为异步删除的原因导致体量变大
2.3.4:处理RM的response消息的ClientOnResponseProcessor
与TM中的ClientOnResponseProcessor功能一致
public class ClientOnResponseProcessor implements RemotingProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientOnResponseProcessor.class);
/**
* 缓存message Id 与merge消息之间映射关系 由AbstractNettyRemotingClient存储
*/
private Map<Integer, MergeMessage> mergeMsgMap;
/**
* 缓存每一条message Id(如果是merge中是其中每一条消息) 与MessageFuture映射关系 由AbstractNettyRemoting存储
*/
private ConcurrentMap<Integer, MessageFuture> futures;
/**
* To handle the received RPC message on upper level.
*
*/
private TransactionMessageHandler transactionMessageHandler;
public ClientOnResponseProcessor(Map<Integer, MergeMessage> mergeMsgMap,
ConcurrentHashMap<Integer, MessageFuture> futures,
TransactionMessageHandler transactionMessageHandler) {
this.mergeMsgMap = mergeMsgMap;
this.futures = futures;
this.transactionMessageHandler = transactionMessageHandler;
}
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
//判断是否是聚合发送消息
if (rpcMessage.getBody() instanceof MergeResultMessage) {
//获取结果
MergeResultMessage results = (MergeResultMessage) rpcMessage.getBody();
//移除缓存MergeResultMessage集合
MergedWarpMessage mergeMessage = (MergedWarpMessage) mergeMsgMap.remove(rpcMessage.getId());
for (int i = 0; i < mergeMessage.msgs.size(); i++) {
//处理每一条数据 结果写入future中
int msgId = mergeMessage.msgIds.get(i);
MessageFuture future = futures.remove(msgId);
if (future == null) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("msg: {} is not found in futures.", msgId);
}
} else {
//写回结果 发起请求方阻塞等待结果
future.setResultMessage(results.getMsgs()[i]);
}
}
} else {
//非聚合单条消息
MessageFuture messageFuture = futures.remove(rpcMessage.getId());
if (messageFuture != null) {
messageFuture.setResultMessage(rpcMessage.getBody());
} else {
if (rpcMessage.getBody() instanceof AbstractResultMessage) {
if (transactionMessageHandler != null) {
transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);
}
}
}
}
}
}2.3.5:处理RM的heartbeat消息的ClientHeartbeatProcessor
处理TC心跳检测的processor
public class ClientHeartbeatProcessor implements RemotingProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientHeartbeatProcessor.class);
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
//接受TC PONG请求
if (rpcMessage.getBody() == HeartbeatMessage.PONG) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("received PONG from {}", ctx.channel().remoteAddress());
}
}
}
}AT模式下整体执行流程图