项目进行读写分离及分库分表,在一个业务中,在一个事务中处理时候将切换多个数据源,需要保证同一事务多个数据源数据的一致性。此处使用atomikos来实现:最后附源码:
1:spring3.0之后不再支持jtom[jta]了,第三方开源软件atomikos(http://www.atomikos.com/)来实现.
2:org.springframework.transaction.jta.JotmFactoryBean类,spring-tx-2.5.6.jar中有此类,spring-tx-3.0.0.RELEASE.jar之后没有此类。
3:atomikos事务控制框架,其中看到有3种数据源,分别是,SimpleDataSourceBean,AtomikosDataSourceBean,AtomikosNonXADataSourceBean。
a:SimpleDataSourceBean: 这个是最简单地数据源配置,需要配置XA驱动。
b:AtomikosDataSourceBean: 分布式数据源,Atomikos实现的数据源,需要配置XA驱动,推荐此配置,可以配置连接池的信息。
c:AtomikosNonXADataSourceBean: 非分布式数据源,该数据源配置需要普通JDBC的驱动,可以配置连接池:
4:Atomikos支持XA(全局事务)和NON-XA(非全局事务),NON-XA[nonxadatasource]效率高于XA.XA事务往往是包括多个数据源的全局事务,非XA是单个数据源的.
5:XA连接是一个JTA事务中的参与者。XA连接不支持JDBC的自动提交特性。也就是说应用程序不必在xadatasource[XA]连接上调用java.sql.Connection.commit()或java.sql.Connection.rollback();而应用程序应该使用UserTransaction.begin(),UserTransaction.commit()和UserTransaction.rollback().
看看pom.xml依赖:
org.springframework
spring-tx
4.2.5.RELEASE
javax.transaction
jta
1.1
com.atomikos
atomikos-util
4.0.2
com.atomikos
transactions
4.0.2
com.atomikos
transactions-jta
4.0.2
com.atomikos
transactions-jdbc
4.0.2
com.atomikos
transactions-api
4.0.2
cglib
cglib-nodep
3.2.2
1:AtomikosDataSourceBean[XA(全局事务)]数据源配置datasource-context.xml:
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd" default-lazy-init="true">
配置主-从数据源信息
${jdbc.driverClassName}
${master.jdbc.url}
${jdbc.password}
${jdbc.username}
0
20
0
60000
${validationQuery}
false
false
true
true
1800
true
mergeStat
${jdbc.driverClassName}
${slave.jdbc.url}
${jdbc.password}
${jdbc.username}
0
20
0
60000
${validationQuery}
false
false
true
true
1800
true
mergeStat
2:spring主配置文件spring-context.xml:xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.2.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd">
3:数据源配置参数database.properties:#mysql-Used to verify the effectiveness of the database connection
validationQuery=SELECT 1
jdbc.initialSize=5
jdbc.maxActive=20
jdbc.maxWait=60000
jdbc.poolPreparedStatements=false
jdbc.poolMaximumIdleConnections=0
jdbc.driverClassName=org.gjt.mm.mysql.Driver
jdbc.xaDataSourceClassName=com.alibaba.druid.pool.xa.DruidXADataSource
#jdbc.xaDataSourceClassName=com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
#1.tms business. 2.The db level optimization,data concurrency,desirable.
master.jdbc.url=jdbc:mysql://your ip:3306/master?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
slave.jdbc.url=jdbc:mysql://your ip:3306/slave?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
jdbc.username=username
jdbc.password=password4:mybatis的配置mybatis-context.xml:
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd" default-lazy-init="true">
MyBatis的数据库持久层配置/配置主-从数据源
配置mybatis-config(此代码,只为测试分布式事务,并不涉及真实的业务!!!):mybatis-config-master.xml:
mybatis-config-slave.xml
5:Mapper的管理及注入,为mybatis的dao层mapper接口注入[绑定]sqlSessionFactory:
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd" default-lazy-init="true">
MyBatis为不同的mapper注入sqlSessionFactory
6:atomikos事务配置transaction-context.xml:xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd" default-lazy-init="true">
配置事物
true
7:配置jta启动参数在src下,最后追加详细:com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
com.atomikos.icatch.console_file_name = /home/logs/tx/tx.out.log
com.atomikos.icatch.log_base_name = txlog
com.atomikos.icatch.tm_unique_name = com.atomikos.spring.jdbc.tm
com.atomikos.icatch.console_log_level=DEBUG8:代码(部分不涉及代码已删除):
a:mybatis的mapper和dao接口[MemberMapper/MemberInfoMapper]:
package com.tx.dao;
import com.tx.entity.Member;
public interface MemberMapper {
int insert(Member record);
}
package com.tx.dao;
import com.tx.entity.MemberInfo;
public interface MemberInfoMapper {
int insert(MemberInfo record);
}
id, username, password, status
insert into member (id, username, password,
status)
values (#{id,jdbcType=INTEGER}, #{username,jdbcType=VARCHAR}, #{password,jdbcType=VARCHAR},
#{status,jdbcType=TINYINT})
id, nickname, realname, age
insert into member_info (id, nickname, realname,
age)
values (#{id,jdbcType=INTEGER}, #{nickname,jdbcType=VARCHAR}, #{realname,jdbcType=VARCHAR},
#{age,jdbcType=TINYINT})
b:服务层接口和实现:
package com.tx.sevice;
import com.tx.entity.Member;
import com.tx.entity.MemberInfo;
public interface MemberService {
boolean registerMember(Member member, MemberInfo memberInfo);
}
package com.tx.sevice.impl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.tx.dao.MemberInfoMapper;
import com.tx.dao.MemberMapper;
import com.tx.entity.Member;
import com.tx.entity.MemberInfo;
import com.tx.sevice.MemberService;
@Service("memberService")
public class MemberServiceImpl implements MemberService {
//log
private static final Logger LOG = LoggerFactory.getLogger(MemberServiceImpl.class);
@Autowired
private MemberMapper memberMapper;
@Autowired
private MemberInfoMapper memberInfoMapper;
@Override
public boolean registerMember(Member member, MemberInfo memberInfo) {
boolean resRegister = false;
try {
if(memberMapper.insert(member) != 1){
throw new RuntimeException("注册用户:Member表数据插入不一致.");
}
if(memberInfoMapper.insert(memberInfo) != 1){
throw new RuntimeException("注册用户:MemberInfo表数据插入不一致.");
}
resRegister = true;
} catch (Exception e) {
LOG.info("注册用户:数据库保存异常." + e.getMessage(), e);
throw new RuntimeException("注册用户:数据库保存异常");
}
return resRegister;
}
}c:junit测试代码:
package com.tx.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.tx.entity.Member;
import com.tx.entity.MemberInfo;
import com.tx.sevice.MemberService;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:spring-context.xml","classpath:datasource-context.xml",
"classpath:mybatis-context.xml","classpath:mapper-context.xml","classpath:transaction-context.xml"})
public class JTATest {
//log
private static final Logger LOG = LoggerFactory.getLogger(JTATest.class);
@Autowired
private MemberService memberService;
@Test
public void testRegister(){
Member member = new Member();
member.setId(2);
member.setUsername("童可可");
member.setPassword("12345678");
member.setStatus((byte)0);
MemberInfo memberInfo = new MemberInfo();
memberInfo.setId(2);
memberInfo.setAge((byte)25);
memberInfo.setNickname("keke");
memberInfo.setRealname("童可可");
if(memberService.registerMember(member, memberInfo)){
LOG.info("##用户注册成功");
}else{
LOG.info("##用户注册失败");
}
}
}注:通过主键重复可以测试回滚,数据没问题,正常提交不同数据库!log4j和web.xml和sql,其他代码:Spring实现数据库读写分离/spring事务配置解释

效果图:
执行前,master数据库:

执行前,slave数据库:

执行后,master数据库:

执行后,slave数据库:

spring-tx-2.5.6.jar和spring-tx-3.0.0.RELEASE.jar目录:


jta.properties启动参数:
# SAMPLE PROPERTIES FILE FOR THE TRANSACTION SERVICE
# THIS FILE ILLUSTRATES THE DIFFERENT SETTINGS FOR THE TRANSACTION MANAGER
# UNCOMMENT THE ASSIGNMENTS TO OVERRIDE DEFAULT VALUES;
# Required: factory implementation class of the transaction core.
# NOTE: there is no default for this, so it MUST be specified!
#
com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
# Set base name of file where messages are output
# (also known as the 'console file').
#
com.atomikos.icatch.console_file_name = tm.out
# Size limit (in bytes) for the console file;
# negative means unlimited.
#
# com.atomikos.icatch.console_file_limit=-1
# For size-limited console files, this option
# specifies a number of rotating files to
# maintain.
#
# com.atomikos.icatch.console_file_count=1
# Set the number of log writes between checkpoints
#
# com.atomikos.icatch.checkpoint_interval=500
# Set output directory where console file and other files are to be put
# make sure this directory exists!
#
# com.atomikos.icatch.output_dir = ./
# Set directory of log files; make sure this directory exists!
#
# com.atomikos.icatch.log_base_dir = ./
# Set base name of log file
# this name will be used as the first part of
# the system-generated log file name
#
com.atomikos.icatch.log_base_name = tmlog
# Set the max number of active local transactions
# or -1 for unlimited.
#
# com.atomikos.icatch.max_actives = 50
# Set the default timeout (in milliseconds) for local transactions
#
# com.atomikos.icatch.default_jta_timeout = 10000
# Set the max timeout (in milliseconds) for local transactions
#
# com.atomikos.icatch.max_timeout = 300000
# The globally unique name of this transaction manager process
# override this value with a globally unique name
#
com.atomikos.icatch.tm_unique_name = tm
# Do we want to use parallel subtransactions? JTA's default
# is NO for J2EE compatibility
#
# com.atomikos.icatch.serial_jta_transactions=true
# If you want to do explicit resource registration then
# you need to set this value to false.
#
# com.atomikos.icatch.automatic_resource_registration=true
# Set this to WARN, INFO or DEBUG to control the granularity
# of output to the console file.
#
com.atomikos.icatch.console_log_level=INFO
# Do you want transaction logging to be enabled or not?
# If set to false, then no logging overhead will be done
# at the risk of losing data after restart or crash.
#
# com.atomikos.icatch.enable_logging=true
# Should two-phase commit be done in (multi-)threaded mode or not?
# Set this to false if you want commits to be ordered according
# to the order in which resources are added to the transaction.
#
# NOTE: threads are reused on JDK 1.5 or higher.
# For JDK 1.4, thread reuse is enabled as soon as the
# concurrent backport is in the classpath - see
# http://mirrors.ibiblio.org/pub/mirrors/maven2/backport-util-concurrent/backport-util-concurrent/
#
# com.atomikos.icatch.threaded_2pc=false
# Should shutdown of the VM trigger shutdown of the transaction core too?
#
# com.atomikos.icatch.force_shutdown_on_vm_exit=false
分布式事务操作之Spring+JTA可参照: http://www..com/wangyong/p/4174326.html
Atomikos 中文说明文档:http://blog..net/sun8288/article/details/8674016
源代码:Spring多数据源分布式事务管理