本文共 11152 字,大约阅读时间需要 37 分钟。
摘要
在开发中,我们常常会遇到(或者需要)把一些操作“绑定到数据库事务上”。也就是说,如果数据库事务成功提交,则执行这个操作;如果数据库事务回滚,则不执行这个操作(或者执行另一个操作)。
例如,《JMS与数据库事务》中介绍了一种JmsTemplate的配置方法,可以把“发送JMS消息”的操作绑定到数据库事务上。除此之外,更新缓存的操作也需要做类似的绑定处理。否则,数据库事务回滚了,而缓存中却完成了更新操作,可能导致一段时间内都会发生“脏读”。
那么,这种“绑定到数据库事务上”的功能,是如何实现的呢?spring-cache中就有一个很好的例子。
spring-cache简介
spring-cache本质上不是一个具体的缓存实现方案(例如EHCache 或者 OSCache),而是一个对缓存使用的抽象,通过在既有代码中添加少量它定义的各种 annotation,即能够简单而快捷地操作缓存。
spring-cache提供了一个CacheManager接口,用于抽象和管理缓存;缓存则抽象为Cache接口;而业务数据的CRUD操作,则由@CachePut/@Cacheable/@CacheEviet注解来进行配置后,由Cache接口下的各种实现类来处理。此外还有一些辅助类、配置类,由于这里是“简介”,按下不表。
基本机制
显然,spring-cache使用了基于注解的AOP机制。以@CachePut注解为例,它的基本操作流程是这样的:
其中,“获取缓存实例Cache”就是由CacheManager接口负责的。这里的“缓存实例”只是一个“逻辑”上的实例;在物理实现上,它可能是同一个缓存中的不同命名空间、也可能确实是不同的物理缓存。
“将返回结果写入缓存”,以及其它的缓存读、写操作,都由Cache接口来负责。
事务上下文中的问题
在事务上下文中,上面所说的“基本流程”是存在问题的:如果“写缓存”操作成功、而数据库事务回滚了,那么缓存中就会出现一笔脏数据。如下图所示:
这种场景下,我们就需要把缓存操作绑定到数据库事务上。
将操作绑定到数据库事务上
spring-cache的相关实现
与JmsTemplate类似,Spring-cache提供了一个“绑定数据库事务”的CacheManager实现类:AbstractTransactionSupportingCacheManager。不过,这个类只提供一个“是否绑定到数据库事务上”的配置项(transactionAware),自身并不处理“绑定数据库事务”这个操作。真正实现了“绑定”处理的,是AbstractTransactionSupportingCacheManager提供的Cache实现类:TransactionAwareCacheDecorator。这个类的put方法代码如下:
TransactionAwareCacheDecorator
publicclassTransactionAwareCacheDecoratorimplementsCache {
privatefinalCache targetCache;
@Override
publicvoidput(finalObject key,finalObject value) {
// 判断是否开启了事务
if(TransactionSynchronizationManager.isSynchronizationActive()) {
// 将操作注册到“afterCommit”阶段
TransactionSynchronizationManager.registerSynchronization(newTransactionSynchronizationAdapter() {
@Override
publicvoidafterCommit() {
targetCache.put(key, value);
}
});
}
else{
this.targetCache.put(key, value);
}
}
// 省略其它方法
}
AbstractTransactionSupportingCacheManager是基于“继承”来提供TransactionAwareCacheDecorator。除了它之外,spring-cache还提供了一个基于“组合”的CacheManager实现类:TransactionAwareCacheManagerProxy。不过,后者本质上也要通过TransactionAwareCacheDecorator来实现所需功能。
TransactionSynchronizationManager和TransactionSynchronizationAdapter
TransactionSynchronizationManager中的代码有点复杂。但是其功能可以“一言以蔽之”:维护事务状态。在这个类中有一系列的ThreadLocal类型的类变量,它们就负责存储当前线程中的事务数据。相关代码如下:
TransactionSynchronizationManager中的ThreadLocal
privatestaticfinalThreadLocal> resources =
newNamedThreadLocal>("Transactional resources");
// 关注点:事务相关操作的回调模板
privatestaticfinalThreadLocal> synchronizations =
newNamedThreadLocal>("Transaction synchronizations");
privatestaticfinalThreadLocal currentTransactionName =
newNamedThreadLocal("Current transaction name");
privatestaticfinalThreadLocal currentTransactionReadOnly =
newNamedThreadLocal("Current transaction read-only status");
privatestaticfinalThreadLocal currentTransactionIsolationLevel =
newNamedThreadLocal("Current transaction isolation level");
privatestaticfinalThreadLocal actualTransactionActive =
newNamedThreadLocal("Actual transaction active");
这些类变量中,我们需要关注的是synchronizations 。在TransactionAwareCacheDecorator中使用到的TransactionSynchronizationManager.isSynchronizationActive()、TransactionSynchronizationManager.registerSynchronization()和new TransactionSynchronizationAdapter(),都与它有关。
先看isSynchronizationActive()方法。它的代码实现非常简单,仅仅是判断了synchronizations中是否有数据(Set非null即可,并不要求其中有TransactionSynchronization实例)。之所以可以这样判断,是因为Spring在开启数据库事务(无论是使用@Transactional注解,还是用xml配置)时,都会向其中写入一个实例,用于自动处理Connection的获取、提交或回滚等操作。这个方法的代码如下:
isSynchronizationActive()
/**
* Return if transaction synchronization is active for the current thread.
* Can be called before register to avoid unnecessary instance creation.
* @see #registerSynchronization
*/
publicstaticbooleanisSynchronizationActive() {
return(synchronizations.get() !=null);
}
再看registerSynchronization()方法。它其实也非常简单:首先调用isSynchronizationActive()做一个校验;然后将入参synchronization添加到synchronizations 中。入参synchronization中的方法不会在这里执行,而是要等到事务执行到一定阶段时才会被调用。这个方法的代码如下:
registerSynchronization()
/**
* Register a new transaction synchronization for the current thread.
* Typically called by resource management code.
*
Note that synchronizations can implement the
* {@link org.springframework.core.Ordered} interface.
* They will be executed in an order according to their order value (if any).
* @param synchronization the synchronization object to register
* @throws IllegalStateException if transaction synchronization is not active
* @see org.springframework.core.Ordered
*/
publicstaticvoidregisterSynchronization(TransactionSynchronization synchronization)
throwsIllegalStateException {
Assert.notNull(synchronization,"TransactionSynchronization must not be null");
if(!isSynchronizationActive()) {
thrownewIllegalStateException("Transaction synchronization is not active");
}
synchronizations.get().add(synchronization);
}
比较复杂的是TransactionSynchronizationAdapter类。在进入这个类之前,我们得先看看TransactionSynchronization接口。
TransactionSynchronization接口定义了一系列的回调方法,对应一个事务执行的不同阶段:挂起、恢复、flush、提交(前、后)、完成(事务成功或失败)等。当事务运行到对应阶段时,事务管理器会从TransactionSynchronizationManager维护的synchronizations中拿出所有的回调器,逐个回调其中的对应方法。这个接口的代码如下:
TransactionSynchronization
/**
* Interface for transaction synchronization callbacks.
* Supported by AbstractPlatformTransactionManager.
*
*
TransactionSynchronization implementations can implement the Ordered interface
* to influence their execution order. A synchronization that does not implement the
* Ordered interface is appended to the end of the synchronization chain.
*
*
System synchronizations performed by Spring itself use specific order values,
* allowing for fine-grained interaction with their execution order (if necessary).
*
* @author Juergen Hoeller
* @since 02.06.2003
* @see TransactionSynchronizationManager
* @see AbstractPlatformTransactionManager
* @see org.springframework.jdbc.datasource.DataSourceUtils#CONNECTION_SYNCHRONIZATION_ORDER
*/
publicinterfaceTransactionSynchronizationextendsFlushable {
/** Completion status in case of proper commit */
intSTATUS_COMMITTED =0;
/** Completion status in case of proper rollback */
intSTATUS_ROLLED_BACK =1;
/** Completion status in case of heuristic mixed completion or system errors */
intSTATUS_UNKNOWN =2;
/**
* Suspend this synchronization.
* Supposed to unbind resources from TransactionSynchronizationManager if managing any.
* @see TransactionSynchronizationManager#unbindResource
*/
voidsuspend();
/**
* Resume this synchronization.
* Supposed to rebind resources to TransactionSynchronizationManager if managing any.
* @see TransactionSynchronizationManager#bindResource
*/
voidresume();
/**
* Flush the underlying session to the datastore, if applicable:
* for example, a Hibernate/JPA session.
* @see org.springframework.transaction.TransactionStatus#flush()
*/
@Override
voidflush();
/**
* Invoked before transaction commit (before "beforeCompletion").
* Can e.g. flush transactional O/R Mapping sessions to the database.
*
This callback does not mean that the transaction will actually be committed.
* A rollback decision can still occur after this method has been called. This callback
* is rather meant to perform work that's only relevant if a commit still has a chance
* to happen, such as flushing SQL statements to the database.
*
Note that exceptions will get propagated to the commit caller and cause a
* rollback of the transaction.
* @param readOnly whether the transaction is defined as read-only transaction
* @throws RuntimeException in case of errors; will be propagated to the caller
* (note: do not throw TransactionException subclasses here!)
* @see #beforeCompletion
*/
voidbeforeCommit(booleanreadOnly);
/**
* Invoked before transaction commit/rollback.
* Can perform resource cleanup before transaction completion.
*
This method will be invoked after {@code beforeCommit}, even when
* {@code beforeCommit} threw an exception. This callback allows for
* closing resources before transaction completion, for any outcome.
* @throws RuntimeException in case of errors; will be logged but not propagated
* (note: do not throw TransactionException subclasses here!)
* @see #beforeCommit
* @see #afterCompletion
*/
voidbeforeCompletion();
/**
* Invoked after transaction commit. Can perform further operations right
* after the main transaction has successfully committed.
*
Can e.g. commit further operations that are supposed to follow on a successful
* commit of the main transaction, like confirmation messages or emails.
*
NOTE: The transaction will have been committed already, but the
* transactional resources might still be active and accessible. As a consequence,
* any data access code triggered at this point will still "participate" in the
* original transaction, allowing to perform some cleanup (with no commit following
* anymore!), unless it explicitly declares that it needs to run in a separate
* transaction. Hence: Use {@code PROPAGATION_REQUIRES_NEW} for any
* transactional operation that is called from here.
* @throws RuntimeException in case of errors; will be propagated to the caller
* (note: do not throw TransactionException subclasses here!)
*/
voidafterCommit();
/**
* Invoked after transaction commit/rollback.
* Can perform resource cleanup after transaction completion.
*
NOTE: The transaction will have been committed or rolled back already,
* but the transactional resources might still be active and accessible. As a
* consequence, any data access code triggered at this point will still "participate"
* in the original transaction, allowing to perform some cleanup (with no commit
* following anymore!), unless it explicitly declares that it needs to run in a
* separate transaction. Hence: Use {@code PROPAGATION_REQUIRES_NEW}
* for any transactional operation that is called from here.
* @param status completion status according to the {@code STATUS_*} constants
* @throws RuntimeException in case of errors; will be logged but not propagated
* (note: do not throw TransactionException subclasses here!)
* @see #STATUS_COMMITTED
* @see #STATUS_ROLLED_BACK
* @see #STATUS_UNKNOWN
* @see #beforeCompletion
*/
voidafterCompletion(intstatus);
}
TransactionSynchronizationAdapter显然是一个适配器:它实现了TransactionSynchronization接口,并为每一个接口方法提供了一个空的实现。这类适配器的基本思想是:接口中定义了很多方法,然而业务代码往往只需要实现其中一小部分。利用这种“空实现”适配器,我们可以专注于业务上需要处理的回调方法,而不用在业务类中放大量而且重复的空方法。
TransactionSynchronizationAdapter类的代码如下:
TransactionSynchronizationAdapter
publicabstractclassTransactionSynchronizationAdapterimplementsTransactionSynchronization, Ordered {
@Override
publicintgetOrder() {
returnOrdered.LOWEST_PRECEDENCE;
}
@Override
publicvoidsuspend() {
}
@Override
publicvoidresume() {
}
@Override
publicvoidflush() {
}
@Override
publicvoidbeforeCommit(booleanreadOnly) {
}
@Override
publicvoidbeforeCompletion() {
}
@Override
publicvoidafterCommit() {
}
@Override
publicvoidafterCompletion(intstatus) {
}
}
事务相关操作注册与回调流程
说了这么多,都是静态的代码,抽象而费解。这里再提供一张流程图(省略了一些与缓存操作不太相关的事务相关操作),希望能帮助大家更好的理解相关代码和机制。
上图与事务上下文中的问题相比,所谓“写入”缓存操作实际并没有真正去操作缓存,而仅仅是注册了一个回调实例。直到数据库事务执行到afterCommit阶段时,这个回调实例才会被调用,并真正地向缓存中写入新的数据。
顺带一提,TransactionSynchronization中没有afterRollback()。如果需要在事务回滚后做某些处理,需要在afterCompletion(int)方法中判断入参的值,然后再做处理。
其它应用
“绑定到数据库事务上”这一功能,除了JmsTemplate、Cache操作中可以用到之外,在一些弱/最终一致性分布式事务中也有应用。如TCC模型中,业务代码中只调用Try服务,而在afterCommit或afterCompletion中处理Commit或Cancel服务。两阶段也是类似地在"afterRollback"中去调用第二阶段的回滚服务。
本文转自 斯然在天边 51CTO博客,原文链接:http://blog.51cto.com/winters1224/1967234,如需转载请自行联系原作者