全面解析JTA 深度历险
什么是事务处理
事务是计算机应用中不可或缺的组件模型,它保证了用户操作的原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durabilily)。关于事务最经典的示例莫过于信用卡转账:将用户A账户中的500元人民币转移到用户B的账户中,其操作流程如下:
1.将A账户中的金额减少500
2.将B账户中的金额增加500
这两个操作必须保正ACID的事务属性:即要么全部成功,要么全部失败;假若没有事务保障,用户的账号金额将可能发生问题:
假如第一步操作成功而第二步失败,那么用户A账户中的金额将就减少500元而用户B的账号却没有任何增加(不翼而飞);同样如果第一步出错而第二步成功,那么用户A的账户金额不变而用户B的账号将增加500元(凭空而生)。上述任何一种错误都会产生严重的数据不一致问题,事务的缺失对于一个稳定的生产系统是不可接受的。
J2EE事务处理方式
1.本地事务:紧密依赖于底层资源管理器(例如数据库连接),事务处理局限在当前事务资源内。此种事务处理方式不存在对应用服务器的依赖,因而部署灵活却无法支持多数据源的分布式事务。在数据库连接中使用本地事务示例如下:
清单1.本地事务处理实例
publicvoidtransferAccount(){
Connectionconn=null;
Statementstmt=null;
try{
conn=getDataSource().getConnection();
//将自动提交设置为false,
//若设置为true则数据库将会把每一次数据更新认定为一个事务并自动提交
conn.setAutoCommit(false);
stmt=conn.createStatement();
//将A账户中的金额减少500
stmt.execute("\
updatet_accountsetamount=amount-500whereaccount_id='A'");
//将B账户中的金额增加500
stmt.execute("\
updatet_accountsetamount=amount+500whereaccount_id='B'");
//提交事务
conn.commit();
//事务提交:转账的两步操作同时成功
}catch(SQLExceptionsqle){
try{
//发生异常,回滚在本事务中的操做
conn.rollback();
//事务回滚:转账的两步操作完全撤销
stmt.close();
conn.close();
}catch(Exceptionignore){
}
sqle.printStackTrace();
}
}
2.分布式事务处理:Java事务编程接口(JTA:JavaTransactionAPI)和Java事务服务(JTS;JavaTransactionService)为J2EE平台提供了分布式事务服务。分布式事务(DistributedTransaction)包括事务管理器(TransactionManager)和一个或多个支持XA协议的资源管理器(ResourceManager)。我们可以将资源管理器看做任意类型的持久化数据存储;事务管理器承担着所有事务参与单元的协调与控制。JTA事务有效的屏蔽了底层事务资源,使应用可以以透明的方式参入到事务处理中;但是与本地事务相比,XA协议的系统开销大,在系统开发过程中应慎重考虑是否确实需要分布式事务。若确实需要分布式事务以协调多个事务资源,则应实现和配置所支持XA协议的事务资源,如JMS、JDBC数据库连接池等。使用JTA处理事务的示例如下(注意:connA和connB是来自不同数据库的连接)
清单2.JTA事务处理
publicvoidtransferAccount(){
UserTransactionuserTx=null;
ConnectionconnA=null;
StatementstmtA=null;
ConnectionconnB=null;
StatementstmtB=null;
try{
//获得Transaction管理对象
userTx=(UserTransaction)getContext().lookup("\
java:comp/UserTransaction");
//从数据库A中取得数据库连接
connA=getDataSourceA().getConnection();
//从数据库B中取得数据库连接
connB=getDataSourceB().getConnection();
//启动事务
userTx.begin();
//将A账户中的金额减少500
stmtA=connA.createStatement();
stmtA.execute("
updatet_accountsetamount=amount-500whereaccount_id='A'");
//将B账户中的金额增加500
stmtB=connB.createStatement();
stmtB.execute("\
updatet_accountsetamount=amount+500whereaccount_id='B'");
//提交事务
userTx.commit();
//事务提交:转账的两步操作同时成功(数据库A和数据库B中的数据被同时更新)
}catch(SQLExceptionsqle){
try{
//发生异常,回滚在本事务中的操纵
userTx.rollback();
//事务回滚:转账的两步操作完全撤销
//(数据库A和数据库B中的数据更新被同时撤销)
stmt.close();
conn.close();
...
}catch(Exceptionignore){
}
sqle.printStackTrace();
}catch(Exceptionne){
e.printStackTrace();
}
}
JTA实现原理
很多开发人员都会对JTA的内部工作机制感兴趣:我编写的代码没有任何与事务资源(如数据库连接)互动的代码,但是我的操作(数据库更新)却实实在在的被包含在了事务中,那JTA究竟是通过何种方式来实现这种透明性的呢?要理解JTA的实现原理首先需要了解其架构:它包括事务管理器(TransactionManager)和一个或多个支持XA协议的资源管理器(ResourceManager)两部分,我们可以将资源管理器看做任意类型的持久化数据存储;事务管理器则承担着所有事务参与单元的协调与控制。根据所面向对象的不同,我们可以将JTA的事务管理器和资源管理器理解为两个方面:面向开发人员的使用接口(事务管理器)和面向服务提供商的实现接口(资源管理器)。其中开发接口的主要部分即为上述示例中引用的UserTransaction对象,开发人员通过此接口在信息系统中实现分布式事务;而实现接口则用来规范提供商(如数据库连接提供商)所提供的事务服务,它约定了事务的资源管理功能,使得JTA可以在异构事务资源之间执行协同沟通。以数据库为例,IBM公司提供了实现分布式事务的数据库驱动程序,Oracle也提供了实现分布式事务的数据库驱动程序,在同时使用DB2和Oracle两种数据库连接时,JTA即可以根据约定的接口协调者两种事务资源从而实现分布式事务。正是基于统一规范的不同实现使得JTA可以协调与控制不同数据库或者JMS厂商的事务资源。
开发人员使用开发人员接口,实现应用程序对全局事务的支持;各提供商(数据库,JMS等)依据提供商接口的规范提供事务资源管理功能;事务管理器(TransactionManager)将应用对分布式事务的使用映射到实际的事务资源并在事务资源间进行协调与控制。下面,本文将对包括UserTransaction、Transaction和TransactionManager在内的三个主要接口以及其定义的方法进行介绍。
面向开发人员的接口为UserTransaction(使用方法如上例所示),开发人员通常只使用此接口实现JTA事务管理,其定义了如下的方法:
begin()-开始一个分布式事务,(在后台TransactionManager会创建一个Transaction事务对象并把此对象通过ThreadLocale关联到当前线程上)
commit()-提交事务(在后台TransactionManager会从当前线程下取出事务对象并把此对象所代表的事务提交)
rollback()-回滚事务(在后台TransactionManager会从当前线程下取出事务对象并把此对象所代表的事务回滚)
getStatus()-返回关联到当前线程的分布式事务的状态(Status对象里边定义了所有的事务状态,感兴趣的读者可以参考API文档)
setRollbackOnly()-标识关联到当前线程的分布式事务将被回滚
面向提供商的实现接口主要涉及到TransactionManager和Transaction两个对象
Transaction代表了一个物理意义上的事务,在开发人员调用UserTransaction.begin()方法时TransactionManager会创建一个Transaction事务对象(标志着事务的开始)并把此对象通过ThreadLocale关联到当前线程。UserTransaction接口中的commit()、rollback(),getStatus()等方法都将最终委托给Transaction类的对应方法执行。Transaction接口定义了如下的方法:
commit()-协调不同的事务资源共同完成事务的提交
rollback()-协调不同的事务资源共同完成事务的回滚
setRollbackOnly()-标识关联到当前线程的分布式事务将被回滚
getStatus()-返回关联到当前线程的分布式事务的状态
enListResource(XAResourcexaRes,intflag)-将事务资源加入到当前的事务中(在上述示例中,在对数据库A操作时其所代表的事务资源将被关联到当前事务中,同样,在对数据库B操作时其所代表的事务资源也将被关联到当前事务中)
delistResourc(XAResourcexaRes,intflag)-将事务资源从当前事务中删除
registerSynchronization(Synchronizationsync)-回调接口,Hibernate等ORM工具都有自己的事务控制机制来保证事务,但同时它们还需要一种回调机制以便在事务完成时得到通知从而触发一些处理工作,如清除缓存等。这就涉及到了Transaction的回调接口registerSynchronization。工具可以通过此接口将回调程序注入到事务中,当事务成功提交后,回调程序将被激活。
TransactionManager本身并不承担实际的事务处理功能,它更多的是充当用户接口和实现接口之间的桥梁。下面列出了TransactionManager中定义的方法,可以看到此接口中的大部分事务方法与UserTransaction和Transaction相同。在开发人员调用UserTransaction.begin()方法时TransactionManager会创建一个Transaction事务对象(标志着事务的开始)并把此对象通过ThreadLocale关联到当前线程上;同样UserTransaction.commit()会调用TransactionManager.commit(),方法将从当前线程下取出事务对象Transaction并把此对象所代表的事务提交,即调用Transaction.commit()
begin()-开始事务
commit()-提交事务
rollback()-回滚事务
getStatus()-返回当前事务状态
setRollbackOnly()
getTransaction()-返回关联到当前线程的事务
setTransactionTimeout(intseconds)-设置事务超时时间
resume(Transactiontobj)-继续当前线程关联的事务
suspend()-挂起当前线程关联的事务
在系统开发过程中会遇到需要将事务资源暂时排除的操作,此时就需要调用suspend()方法将当前的事务挂起:在此方法后面所做的任何操作将不会被包括在事务中,在非事务性操作完成后调用resume()以继续事务(注:要进行此操作需要获得TransactionManager对象,其获得方式在不同的J2EE应用服务器上是不一样的)。
清单3.开始事务-UserTransactionImplimplenmentsUserTransaction
publicvoidbegin()throwsNotSupportedException,SystemException{
//将开始事务的操作委托给TransactionManagerImpl
TransactionManagerImpl.singleton().begin();
}
清单4.开始事务-TransactionManagerImplimplementsTransactionManager
//此处transactionHolder用于将Transaction所代表的事务对象关联到线程上
privatestaticThreadLocaltransactionHolder
=newThreadLocal();
//TransacationMananger必须维护一个全局对象,因此使用单实例模式实现
privatestaticTransactionManagerImplsingleton=newTransactionManagerImpl();
privateTransactionManagerImpl(){
}
publicstaticTransactionManagerImplsingleton(){
returnsingleton;
}
publicvoidbegin()throwsNotSupportedException,SystemException{
//XidImpl实现了Xid接口,其作用是唯一标识一个事务
XidImplxid=newXidImpl();
//创建事务对象,并将对象关联到线程
TransactionImpltx=newTransactionImpl(xid);
transactionHolder.set(tx);
}
现在我们就可以理解Transaction接口上没有定义begin方法的原因了:Transaction对象本身就代表了一个事务,在它被创建的时候就表明事务已经开始,因此也就不需要额外定义begin()方法了。
清单5.提交事务-UserTransactionImplimplenmentsUserTransaction
publicvoidcommit()throwsRollbackException,HeuristicMixedException,
HeuristicRollbackException,SecurityException,
IllegalStateException,SystemException{
//检查是否是Rollbackonly事务,如果是回滚事务
if(rollBackOnly){
rollback();
return;
}else{
//将提交事务的操作委托给TransactionManagerImpl
TransactionManagerImpl.singleton().commit();
}
}
清单6.提交事务-TransactionManagerImplimplenmentsTransactionManager
publicvoidcommit()throwsRollbackException,HeuristicMixedException,
HeuristicRollbackException,SecurityException,
IllegalStateException,SystemException{
//取得当前事务所关联的事务并通过其commit方法提交
TransactionImpltx=transactionHolder.get();
tx.commit();
}
同理,rollback、getStatus、setRollbackOnly等方法也采用了与commit()相同的方式实现。UserTransaction对象不会对事务进行任何控制,所有的事务方法都是通过TransactionManager传递到实际的事务资源即Transaction对象上。
上述示例演示了JTA事务的处理过程,下面将为您展示事务资源(数据库连接,JMS)是如何以透明的方式加入到JTA事务中的。首先需要明确的一点是,在JTA事务代码中获得的数据库源(DataSource)必须是支持分布式事务的。在如下的代码示例中,尽管所有的数据库操作都被包含在了JTA事务中,但是因为MySql的数据库连接是通过本地方式获得的,对MySql的任何更新将不会被自动包含在全局事务中。
清单7.JTA事务处理
publicvoidtransferAccount(){
UserTransactionuserTx=null;
ConnectionmySqlConnection=null;
StatementmySqlStat=null;
ConnectionconnB=null;
StatementstmtB=null;
try{
//获得Transaction管理对象
userTx=
(UserTransaction)getContext().lookup("java:comp/UserTransaction");
//以本地方式获得mySql数据库连接
mySqlConnection=DriverManager.getConnection("localhost:1111");
//从数据库B中取得数据库连接,getDataSourceB返回应用服务器的数据源
connB=getDataSourceB().getConnection();
//启动事务
userTx.begin();
//将A账户中的金额减少500
//mySqlConnection是从本地获得的数据库连接,不会被包含在全局事务中
mySqlStat=mySqlConnection.createStatement();
mySqlStat.execute("
updatet_accountsetamount=amount-500whereaccount_id='A'");
//connB是从应用服务器得的数据库连接,会被包含在全局事务中
stmtB=connB.createStatement();
stmtB.execute("
updatet_accountsetamount=amount+500whereaccount_id='B'");
//事务提交:connB的操作被提交,mySqlConnection的操作不会被提交
userTx.commit();
}catch(SQLExceptionsqle){
//处理异常代码
}catch(Exceptionne){
e.printStackTrace();
}
}
为什么必须从支持事务的数据源中获得的数据库连接才支持分布式事务呢?其实支持事务的数据源与普通的数据源是不同的,它实现了额外的XADataSource接口。我们可以简单的将XADataSource理解为普通的数据源(继承了java.sql.PooledConnection),只是它为支持分布式事务而增加了getXAResource方法。另外,由XADataSource返回的数据库连接与普通连接也是不同的,此连接除了实现java.sql.Connection定义的所有功能之外还实现了XAConnection接口。我们可以把XAConnection理解为普通的数据库连接,它支持所有JDBC规范的数据库操作,不同之处在于XAConnection增加了对分布式事务的支持。
应用程序从支持分布式事务的数据源获得的数据库连接是XAConnection接口的实现,而由此数据库连接创建的会话(Statement)也为了支持分布式事务而增加了功能,如下代码所示:
清单8.JTA事务资源处理
publicvoidtransferAccount(){
UserTransactionuserTx=null;
Connectionconn=null;
Statementstmt=null;
try{
//获得Transaction管理对象
userTx=(UserTransaction)getContext().lookup("
java:comp/UserTransaction");
//从数据库中取得数据库连接,getDataSourceB返回支持分布式事务的数据源
conn=getDataSourceB().getConnection();
//会话stmt已经为支持分布式事务进行了功能增强
stmt=conn.createStatement();
//启动事务
userTx.begin();
stmt.execute("updatet_account...whereaccount_id='A'");
userTx.commit();
}catch(SQLExceptionsqle){
//处理异常代码
}catch(Exceptionne){
e.printStackTrace();
}
}
我们来看一下由XAConnection数据库连接创建的会话(Statement)部分的代码实现(不同的JTA提供商会有不同的实现方式,此处代码示例只是向您演示事务资源是如何被自动加入到事务中)。我们以会话对象的execute方法为例,通过在方法开始部分增加对associateWithTransactionIfNecessary方法的调用,即可以保证在JTA事务期间,对任何数据库连接的操作都会被透明的加入到事务中。
清单9.将事务资源自动关联到事务对象-XAStatementimplementsStatement
publicvoidexecute(Stringsql){
//对于每次数据库操作都检查此会话所在的数据库连接是否已经被加入到事务中
associateWithTransactionIfNecessary();
try{
//处理数据库操作的代码
....
}catch(SQLExceptionsqle){
//处理异常代码
}catch(Exceptionne){
e.printStackTrace();
}
}
publicvoidassociateWithTransactionIfNecessary(){
//获得TransactionManager
TransactionManagertm=getTransactionManager();
Transactiontx=tm.getTransaction();
//检查当前线程是否有分布式事务
if(tx!=null){
//在分布式事务内,通过tx对象判断当前数据连接是否已经被包含在事务中,
//如果不是那么将此连接加入到事务中
Connectionconn=this.getConnection();
//tx.hasCurrentResource,xaConn.getDataSource()不是标准的JTA
//接口方法,是为了实现分布式事务而增加的自定义方法
if(!tx.hasCurrentResource(conn)){
XAConnectionxaConn=(XAConnection)conn;
XADataSourcexaSource=xaConn.getDataSource();
//调用Transaction的接口方法,将数据库事务资源加入到当前事务中
tx.enListResource(xaSource.getXAResource(),1);
}
}
}
XAResource与Xid:XAResource是DistributedTransactionProcessing:TheXASpecification标准的Java实现,它是对底层事务资源的抽象,定义了分布式事务处理过程中事务管理器和资源管理器之间的协议,各事务资源提供商(如JDBC驱动,JMS)将提供此接口的实现。使用此接口,开发人员可以通过自己的编程实现分布式事务处理,但这些通常都是由应用服务器实现的(服务器自带实现更加高效,稳定)为了说明,我们将举例说明他的使用方式。
在使用分布式事务之前,为了区分事务使之不发生混淆,必须实现一个Xid类用来标识事务,可以把Xid想象成事务的一个标志符,每次在新事务创建是都会为事务分配一个Xid,Xid包含三个元素:formatID、gtrid(全局事务标识符)和bqual(分支修饰词标识符)。formatID通常是零,这意味着你将使用OSICCR(OpenSystemsInterconnectionCommitment,Concurrency和Recovery标准)来命名;如果你要使用另外一种格式,那么formatID应该大于零,-1值意味着Xid为无效。
gtrid和bqual分别包含64个字节二进制码来分别标识全局事务和分支事务,唯一的要求是gtrid和bqual必须是全局唯一的。
XAResource接口中主要定义了如下方法:
commit()-提交事务
isSameRM(XAResourcexares)-检查当前的XAResource与参数是否同一事务资源
prepare()-通知资源管理器准备事务的提交工作
rollback()-通知资源管理器回滚事务
在事务被提交时,Transaction对象会收集所有被当前事务包含的XAResource资源,然后调用资源的提交方法,如下代码所示:
清单10.提交事务-TransactionImplimplementsTransaction
publicvoidcommit()throwsRollbackException,HeuristicMixedException,
HeuristicRollbackException,SecurityException,
IllegalStateException,SystemException{
//得到当前事务中的所有事务资源
Listlist=getAllEnlistedResouces();
//通知所有的事务资源管理器,准备提交事务
//对于生产级别的实现,此处需要进行额外处理以处理某些资源准备过程中出现的异常
for(XAResourcexa:list){
xa.prepare();
}
//所有事务性资源,提交事务
for(XAResourcexa:list){
xa.commit();
}
}
结束语
通过如上介绍相信读者对JTA的原理已经有所了解,本文中的示例代码都是理想情况下的假设实现。一款完善成熟的JTA事务实现需要考虑与处理的细节非常多,如性能(提交事务的时候使用多线程方式并发提交事务)、容错(网络,系统异常)等,其成熟也需要经过较长时间的积累。感兴趣的读者可以阅读一些开源JTA实现以进一步深入学习。希望本文对大家能有所帮助。