内容主要参考自《Spring源码深度解析》一书,算是读书笔记或是原书的补充。进入正文后可能会引来各种不适,毕竟阅读源码是件极其痛苦的事情。
本文主要涉及书中第十章的部分,依照书中内容以及个人理解对Spring源码进行了注释,详见Github仓库:https://github.com/MrSorrow/spring-framework
之前我们已经研究了Spring中关于 JDBC 和 MyBatis相关源码,主要涉及到数据库相关操作。但是我们没有分析数据库最重要的部分——事务,本文主要就Spring中的事务来进行相关的源码研究。
Spring声明式事务让我们从复杂的事务处理中得到解脱,使我们再也不需要去自己处理获得数据库连接、关闭连接、事务提交和回滚等操作,再也不需要在与事务相关的方法中处理大量的 try…catch…finally 代码。
声明式事务其实是建立在AOP之上的,本质是对方法前后进行拦截,然后在目标方法开始之前创建或者加入一个事务,在执行完目标方法之后根据执行情况提交或者回滚事务。编程式事务使用 TransactionTemplate
或者直接使用底层的 PlatformTransactionManager
手动通过代码为方法添加事务。
声明式事务最大的优点就是不需要通过编程的方式管理事务,这样就不需要在业务逻辑代码中掺杂事务管理的代码,只需在配置文件中做相关的事务规则声明(或通过基于 @Transactional 注解的方式),便可以将事务规则应用到业务逻辑中。声明式事务管理也有两种常用的方式,一种是基于 tx 和 aop 命名空间的 xml 配置文件,另一种就是基于 @Transactional 注解。
我们以注解的声明式事务示例,简单的演示Spring中如何使用事务。我们继续使用之前 JDBC 文章使用的数据库、 actor 表、表对应的实体类 Actor
以及 表与实体类之间的映射类 ActorMapper
。
首先,需要创建 dao 接口。
@Transactional(propagation = Propagation.REQUIRED)public interface IActorService { void save(Actor actor) throws RuntimeException; List<Actor> getUsers(); List<Actor> getAllUsers(); Integer getActorsCount();}
这里的 IActorService
接口用了 @Transactional 注解进行标注。@Transactional 可以配置的参数如下。
参数名称 | 功能描述 |
---|---|
readOnly | 用于设置当前事务是否为只读事务,设置为true表示只读,false则表示可读写,默认值为false。例如:@Transactional(readOnly=true) |
rollbackFor | 用于设置需要进行回滚的异常类数组,当方法中抛出指定异常数组中的异常时,则进行事务回滚。 例如: · 指定单一异常类: @Transactional(rollbackFor=RuntimeException.class) · 指定多个异常类: @Transactional(rollbackFor={RuntimeException.class, Exception.class}) |
rollbackForClassName | 用于设置需要进行回滚的异常类名称数组,当方法中抛出指定异常名称数组中的异常时,则进行事务回滚。例如: · 指定单一异常类名称: @Transactional(rollbackForClassName=“RuntimeException”) · 指定多个异常类名称: @Transactional(rollbackForClassName={“RuntimeException”,“Exception”}) |
noRollbackFor | 用于设置不需要进行回滚的异常类数组,当方法中抛出指定异常数组中的异常时,不进行事务回滚。例如: · 指定单一异常类: @Transactional(noRollbackFor=RuntimeException.class) · 指定多个异常类: @Transactional(noRollbackFor={RuntimeException.class, Exception.class}) |
noRollbackForClassName | 用于设置不需要进行回滚的异常类名称数组,当方法中抛出指定异常名称数组中的异常时,不进行事务回滚。 |
propagation | 用于设置事务的传播行为。 |
isolation | 用于设置底层数据库的事务隔离级别,事务隔离级别用于处理多事务并发的情况,通常使用数据库的默认隔离级别即可,基本不需要进行设置。 |
timeout | 用于设置事务的超时秒数,默认值为-1表示永不超时。 |
使用 @Transactional 注解时需要注意:
我们经常会遇到一种情况,比如有一个大方法,批量进行调用小方法,也就是小方法只是大方法的一个步骤。那么当小方法出现异常时,正常情况下我们不希望整个大方法回滚,哪怕其他成功的小方法也要回滚,而是仅仅回滚出错的那次小方法,其它正常提交。例如,一个用户调用了批量为其所有信用卡进行还款,如果其中一个还款失败,不会回滚其他正常还款的信用卡。关于这一功能,Spring的事务传播机制,能够帮我们实现。
关于Spring中的事务传播行为,主要有7种配置。
传播行为 | 含义 | 备注 |
---|---|---|
REQUIRED | 当方法调用时,如果不存在当前事务,那么就创建事务;如果之前的方法已经存在事务了,那么就沿用之前的事务 | Spring默认的事务传播行为 |
SUPPORTS | 当方法调用时,如果不存在当前事务,那么不启用事务;如果存在当前事务,那么就沿用之前的事务 | |
MANDATORY | 方法必须在事务内运行 | 如果当前不存在事务,抛出异常 |
REQUIRES_NEW | 无论是否存在当前事务,方法都会在新事务中运行。新事务遇到错误,仍然全部回滚。 | 也就是事务管理器会打开新的事务运行该方法 |
NOT_SUPPORTED | 不支持事务,如果不存在当前事务也不会创建事务;如果存在当前事务,则挂起它,直至该方法结束后恢复当前事务 | 适用于那些不需要事务的SQL |
NEVER | 不支持事务,只有在没有事务的环境中才能运行它 | 如果当前存在事务,抛出异常 |
NESTED | 嵌套事务,也就是调用方法如果抛出异常只回滚自己内部执行的SQL,不会回滚主方法的SQL。 | 实现存在两种情况:一种是数据库保存点;另一种是创建新的事务。 |
上述的其中配置对于理解源码具有很强的帮助,可以仔细看看。至于另一个重要的配置项——设置底层数据库的事务隔离级别,主要和数据库相关概念有联系,可以参考Spring-事物隔离级别。
创建 dao 接口的实现类,其中在 save()
方法中我们抛出了一个运行时异常,这将会导致保存数据失败,用于后面Spring事务功能的测试与分析。
public class ActorServiceImpl implements IActorService { private JdbcTemplate jdbcTemplate; public void setDataSource(DataSource dataSource) { jdbcTemplate = new JdbcTemplate(dataSource); } @Override public void save(Actor actor) throws RuntimeException { jdbcTemplate.update("insert into actor(first_name, last_name, last_update) values (?, ?, ?)", new Object[]{actor.getFirstName(), actor.getLastName(), actor.getLastDate()}, new int[]{Types.VARCHAR, Types.VARCHAR, Types.DATE}); throw new RuntimeException("发生异常...."); } @Override public List<Actor> getUsers() { return jdbcTemplate.query("select * from actor where actor_id < ?", new Object[]{10}, new int[]{Types.INTEGER}, new ActorRowMapper()); } @Override public List<Actor> getAllUsers() { return jdbcTemplate.query("select * from actor", new ActorRowMapper()); } @Override public Integer getActorsCount() { return jdbcTemplate.queryForObject("select count(*) from actor", Integer.class); }}
在之前 JDBC 的测试示例配置文件基础上,添加上事务管理器 transactionManager 这个 bean 以及自定义的事务标签 <tx:annotation-driven />。
<?xml version='1.0' encoding='UTF-8' ?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"> <tx:annotation-driven transaction-manager="transactionManager" /> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean> <bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://127.0.0.1:3306/sakila" /> <property name="username" value="root" /> <property name="password" value="1234" /> <!--连接池启动时的初始值--> <property name="initialSize" value="1" /> <!--连接池的最大活动连接数--> <property name="maxTotal" value="5" /> <!--最大空闲值,当经过一个高峰时间后,连接池可以慢慢将已经用不到的连接慢慢释放掉一部分,一致减少到maxIdel为止--> <property name="maxIdle" value="2" /> <!--最小空闲值,当空闲的连接数少于阈值时,连接池就会预申请取一些连接,以免洪峰来时来不及申请--> <property name="minIdle" value="1" /> </bean> <!--配置业务bean--> <bean id="actorService" class="guo.ping.transaction.ActorServiceImpl"> <!--向属性中注入数据源--> <property name="dataSource" ref="dataSource" /> </bean></beans>
添加单元测试代码:
public class TransactionTest { @Test public void testTransaction() { ApplicationContext context = new ClassPathXmlApplicationContext("transaction-Test.xml"); IActorService actorService = (IActorService) context.getBean("actorService"); Actor actor = new Actor(null, "事务", "测试", new Timestamp(System.currentTimeMillis())); actorService.save(actor); }}
测试结果如下图,由于出现了运行时异常,所以插入数据失败。
对于Spring中事务功能的代码分析,我们首先从配置文件开始入手,在配置文件中配置了 <tx:annotation-driven />。这个配置是事务的开关,如果没有此处配置,那么Spring光靠 @Transactional 注解不能开启事务的功能。那么我们就从这个配置开始分析。
分析自定义标签的解析流程我们已经不再陌生,下面是解析事务标签的部分 debug 图。
解析事务标签的 namespaceHandler 可以看到是 TxNamespaceHandler
。
进入 TxNamespaceHandler
查看其 init()
方法。我们可以看到 annotation-driven 的解析类是 AnnotationDrivenBeanDefinitionParser
。
public class TxNamespaceHandler extends NamespaceHandlerSupport { static final String TRANSACTION_MANAGER_ATTRIBUTE = "transaction-manager"; static final String DEFAULT_TRANSACTION_MANAGER_BEAN_NAME = "transactionManager"; static String getTransactionManagerName(Element element) { return (element.hasAttribute(TRANSACTION_MANAGER_ATTRIBUTE) ? element.getAttribute(TRANSACTION_MANAGER_ATTRIBUTE) : DEFAULT_TRANSACTION_MANAGER_BEAN_NAME); } @Override public void init() { registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser()); registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser()); registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser()); }}
我们进入 AnnotationDrivenBeanDefinitionParser
类中查看其 parse()
方法。
/** * 事务的annotation-driven标签的的解析 * Parses the {@code <tx:annotation-driven/>} tag. Will * {@link AopNamespaceUtils#registerAutoProxyCreatorIfNecessary register an AutoProxyCreator} * with the container as necessary. */@Override@Nullablepublic BeanDefinition parse(Element element, ParserContext parserContext) { registerTransactionalEventListenerFactory(parserContext); // 获取mode属性设置 String mode = element.getAttribute("mode"); if ("aspectj".equals(mode)) { // mode="aspectj" registerTransactionAspect(element, parserContext); if (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader())) { registerJtaTransactionAspect(element, parserContext); } } else { // mode="proxy" AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext); } return null;}
<tx:annotation-driven> 一共有四个属性如下:
PlatformTransactionManager
bean 的引用,通知会使用该引用。parse()
方法主要获取自定义 annotation-driven 标签的 mode 属性值,属性值默认为 proxy,除此之外可以配置成 aspectj。proxy 是代理模式,仅在外部方法调用才会被代理截获,自身方法调用,即使配置了 @Transactional 注解事务也无法生效,此外代理模式也不能应用在非 public 方法上;而 aspectj 模式与代理模式不同,aspectj 模式可以自身方法调用,也可以应用在非 public 方法上。 当然关于自调用的事务失效问题,之前在介绍AOP时,已经接触过,就是关于配置 expose-proxy 属性的作用。
我们以第一部分的示例为例,着重分析代理模式的事务。进入 AopAutoProxyConfigurer
中的 configureAutoProxyCreator()
方法。AopAutoProxyConfigurer
是定义在 AnnotationDrivenBeanDefinitionParser
类中的一个内部类,其内部就一个静态方法。整个方法的目录结构还是较为清晰的,主要就是在容器中注册了四个 bean。
/** * Inner class to just introduce an AOP framework dependency when actually in proxy mode. */private static class AopAutoProxyConfigurer { public static void configureAutoProxyCreator(Element element, ParserContext parserContext) { // 注册代理类(InfrastructureAdvisorAutoProxyCreator类型的bean) AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element); // TRANSACTION_ADVISOR_BEAN_NAME=org.springframework.transaction.config.internalTransactionAdvisor String txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME; if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) { Object eleSource = parserContext.extractSource(element); // =============创建TransactionAttributeSource类型的bean.============== RootBeanDefinition sourceDef = new RootBeanDefinition( "org.springframework.transaction.annotation.AnnotationTransactionAttributeSource"); sourceDef.setSource(eleSource); sourceDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); // 注册bean,并使用Spring中的定义规则生成beanName,返回beanName String sourceName = parserContext.getReaderContext().registerWithGeneratedName(sourceDef); // =====================创建TransactionInterceptor类型的bean.===================== RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class); interceptorDef.setSource(eleSource); interceptorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); registerTransactionManager(element, interceptorDef); interceptorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName)); // 注册bean,并使用Spring中的定义规则生成beanName,返回beanName String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef); // ==============创建TransactionAttributeSourceAdvisor类型的bean.================ RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class); advisorDef.setSource(eleSource); advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); // 将sourceName的bean注入advisorDef的transactionAttributeSource属性中 advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName)); // 将interceptorName的bean注入advisorDef的adviceBeanName属性中 advisorDef.getPropertyValues().add("adviceBeanName", interceptorName); // 如果配置了order属性,则加入到bean中 if (element.hasAttribute("order")) { advisorDef.getPropertyValues().add("order", element.getAttribute("order")); } // 注册TransactionAttributeSourceAdvisor类型的bean parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef); // 创建CompositeComponentDefinition CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource); compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName)); compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName)); compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName)); parserContext.registerComponent(compositeDef); } }}
四个 bean 分别是:
invoke()
方法进行目标方法增强等。Advisor
类型,其中可以定义切点表达式相关的内容。首先,先来查看注册 InfrastructureAdvisorAutoProxyCreator
类型的 bean 的逻辑。可以看到,configureAutoProxyCreator()
函数中只用了一行代码完成这项工作,主要委托 AopNamespaceUtils
中的方法。
// 注册代理类(InfrastructureAdvisorAutoProxyCreator类型的bean)AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);
看到这个函数,其实感觉有些眼熟,我们在解析AOP的自定义标签 <aop:aspectj-autoproxy />
中调用的方法是
// 注册AnnotationAwareAspectJAutoProxyCreatorAopNamespaceUtils.registerAspectJAnnotationAutoProxyCreatorIfNecessary(parserContext, element);
两个方法参数相同,只是具体的自定义标签元素不一样,方法从命名上也有一定的共同性,我们具体看看。进入 AopNamespaceUtils
的 registerAutoProxyCreatorIfNecessary()
方法,传入的参数 registry 其实就是Spring容器,source 为 null。
/** * 注册InfrastructureAdvisorAutoProxyCreator类型的bean * @param parserContext * @param sourceElement */public static void registerAutoProxyCreatorIfNecessary( ParserContext parserContext, Element sourceElement) { // 注册或升级AutoProxyCreator定义beanName为org.Springframework.aop.config.internalAutoProxyCreator的BeanDefinition BeanDefinition beanDefinition = AopConfigUtils.registerAutoProxyCreatorIfNecessary( parserContext.getRegistry(), parserContext.extractSource(sourceElement)); // 对于proxy-target-class以及expose-proxy属性的处理 useClassProxyingIfNecessary(parserContext.getRegistry(), sourceElement); // 注册组件并通知,便于监听器进一步处理 // 其中beanDefinition的className为InfrastructureAdvisorAutoProxyCreator registerComponentIfNecessary(beanDefinition, parserContext);}
方法中主要有三步,相信这三步应该非常的熟悉,基本和AOP的一样。区别就在于第一步调用 AopConfigUtils
中函数的不同,我们进入 registerAutoProxyCreatorIfNecessary()
方法中。
/** * 注册或升级AutoProxyCreator定义beanName为org.Springframework.aop.config.internalAutoProxyCreator的BeanDefinition * bean的class为InfrastructureAdvisorAutoProxyCreator类型 * @param registry * @param source * @return */@Nullablepublic static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry, @Nullable Object source) { return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);}
和AOP的 registerAspectJAnnotationAutoProxyCreatorIfNecessary()
方法内容相比,内部都是继续调用同一个方法 registerOrEscalateApcAsRequired()
,注册或升级 xxxCreator。只不过,事务注册或升级的 bean 的 class 是 InfrastructureAdvisorAutoProxyCreator
。
/** * 注册或升级AutoProxyCreator定义beanName为org.Springframework.aop.config.internalAutoProxyCreator的BeanDefinition * @param registry * @param source * @return */@Nullablepublic static BeanDefinition registerAspectJAnnotationAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry, @Nullable Object source) { return registerOrEscalateApcAsRequired(AnnotationAwareAspectJAutoProxyCreator.class, registry, source);}
具体继续深入的分析就不再展开了,基本和AOP的一致,前前后后只是注册的 bean 类型不同。除此之外,在对 proxy-target-class 和 expose-proxy 属性进行设置时,由于事务的自定义标签中没有 expose-proxy 配置项,所以自然就设置为 false。
同样的问题我们会再次问一遍,parse()
方法的第一步就是为了注册了一个 InfrastructureAdvisorAutoProxyCreator
类型的 bean,那么这个自动代理创建器的 bean 有什么用呢?
我们先来看一下 InfrastructureAdvisorAutoProxyCreator
的类继承结构,发现它和AOP的 AnnotationAwareAspectJAutoProxyCreator
都继承自 AbstractAutoProxyCreator
。
之前已经说过,因为 AbstractAutoProxyCreator
实现了 BeanPostProcessor
接口的 postProcessAfterInitialization()
方法,能够对容器中每一个 bean 创建过程时判断是否需要进行动态代理返回代理对象。关于后处理器的调用,我们之前在 ApplicationContext 分析文章中着重分析过,这里建议读者亲自 debug,相信会对Spring的 bean 加载流程更加清晰。下面简单的说一下后处理器调用的流程。
在 ApplicationContext 的执行流程中,先会调用 registryBeanPostProcessor(beanFactory)
方法进行所有后处理器的注册,还未调用。注册的结果就是将所有的后处理器实例添加到 beanFactory 的 beanPostProcessors 集合属性中,如下图所示。
真正调用后处理器的方法是在 ApplicationContext 调用 preInstantiateSingletons()
方法提前实例化所有的非惰性 bean 开始的。在每个 bean 的实例化时,都会获取上面所有的 BeanPostProcessors
进行后处理器方法的调用。具体实现之一例如在 initializeBean()
方法。
/** * 初始化bean-->用户配置的init-method * Initialize the given bean instance, applying factory callbacks * as well as init methods and bean post processors. * <p>Called from {@link #createBean} for traditionally defined beans, * and from {@link #initializeBean} for existing bean instances. * @param beanName the bean name in the factory (for debugging purposes) * @param bean the new bean instance we may need to initialize * @param mbd the bean definition that the bean was created with * (can also be {@code null}, if given an existing bean instance) * @return the initialized bean instance (potentially wrapped) * @see BeanNameAware * @see BeanClassLoaderAware * @see BeanFactoryAware * @see #applyBeanPostProcessorsBeforeInitialization * @see #invokeInitMethods * @see #applyBeanPostProcessorsAfterInitialization */protected Object initializeBean(final String beanName, final Object bean, @Nullable RootBeanDefinition mbd) { if (System.getSecurityManager() != null) { AccessController.doPrivileged((PrivilegedAction<Object>) () -> { invokeAwareMethods(beanName, bean); return null; }, getAccessControlContext()); } else { // 对特殊的bean的处理:BeanNameAware、BeanClassLoaderAware、BeanFactoryAware invokeAwareMethods(beanName, bean); } Object wrappedBean = bean; if (mbd == null || !mbd.isSynthetic()) { // 应用后处理器 wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName); } try { // 激活用户自定义的init方法 invokeInitMethods(beanName, wrappedBean, mbd); } catch (Throwable ex) { throw new BeanCreationException( (mbd != null ? mbd.getResourceDescription() : null), beanName, "Invocation of init method failed", ex); } if (mbd == null || !mbd.isSynthetic()) { // 后处理应用 wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName); } return wrappedBean;}
Spring创建了一个 TransactionAttributeSource 类型的 bean,实际实现类的类型为 AnnotationTransactionAttributeSource
。先创建了 beanDefinition,后面 applicationContext 会自动实例化。
RootBeanDefinition sourceDef = new RootBeanDefinition("org.springframework.transaction.annotation.AnnotationTransactionAttributeSource");sourceDef.setSource(eleSource);sourceDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);// 注册bean,并使用Spring中的定义规则生成beanName,返回beanNameString sourceName = parserContext.getReaderContext().registerWithGeneratedName(sourceDef);
Spring利用其默认的命名规则,为该 bean 进行命名。
/** * 给beanDefinition起名,并注册到容器中 * Call the bean name generator for the given bean definition * and register the bean definition under the generated name. * @see XmlBeanDefinitionReader#getBeanNameGenerator() * @see org.springframework.beans.factory.support.BeanNameGenerator#generateBeanName * @see BeanDefinitionRegistry#registerBeanDefinition */public String registerWithGeneratedName(BeanDefinition beanDefinition) { String generatedName = generateBeanName(beanDefinition); getRegistry().registerBeanDefinition(generatedName, beanDefinition); return generatedName;}/** * 给beanDefinition起名 * Call the bean name generator for the given bean definition. * @see XmlBeanDefinitionReader#getBeanNameGenerator() * @see org.springframework.beans.factory.support.BeanNameGenerator#generateBeanName */public String generateBeanName(BeanDefinition beanDefinition) { return this.reader.getBeanNameGenerator().generateBeanName(beanDefinition, getRegistry());}
在容器中注册一个类型为 TransactionInterceptor
的 beanDefinition 和上面没有什么不同,主要在于不仅仅创建了该 bean,还向其注入了上一步我们注册的 TransactionAttributeSource
的实例。
RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);interceptorDef.setSource(eleSource);interceptorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);registerTransactionManager(element, interceptorDef);interceptorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));// 注册bean,并使用Spring中的定义规则生成beanName,返回beanNameString interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef);
我们仔细验证一下,发现 TransactionInterceptor
类的父类 TransactionAspectSupport
中含有该属性。
@Nullableprivate TransactionAttributeSource transactionAttributeSource;
再次在容器中注册一个类型为 BeanFactoryTransactionAttributeSourceAdvisor
的 bean,将前面两个 bean 注入到该 bean 的依赖中。除此之外,对于自定义标签 <tx:annotation-driven> 的 order 属性进行解析并添加该 bean 的定义中。
RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);advisorDef.setSource(eleSource);advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);// 将sourceName的bean注入advisorDef的transactionAttributeSource属性中advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));// 将interceptorName的bean注入advisorDef的adviceBeanName属性中advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);// 如果配置了order属性,则加入到bean中if (element.hasAttribute("order")) { advisorDef.getPropertyValues().add("order", element.getAttribute("order"));}// 注册TransactionAttributeSourceAdvisor类型的beanparserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);
上面的代码主要就是注册了自动代理创建器及三个 bean,这三个bean支撑了整个的事务功能。
上一部分我们主要工作就是注册了四个 beanDefinition,最终都会实例化成对应的 bean。我们简单的推测Spring实现事务的原理,其实是通过AOP对我们的 service 方法进行增强,添加一些固定的流程代码,而实现这个逻辑基本和AOP没有太大的差异,AOP会在 bean 实例化的时候决定是否进行增强返回代理对象,实现这个功能主要靠的就是 BeanPostProcessor
后处理器接口方法。所以,事务也相似,我们先从 actorService 的实例化代理对象创建着手,去分析事务的功能实现。
我们从容器实例化 actorService 的 bean 实例化开始分析。
通过 getBean()
方法创建 actorService 的 bean,我们着重查看其初始化函数中调用后处理器的逻辑。图中的两个框分别是调用后处理器接口的初始化前方法和初始化后方法。
从代码中可以看出调用后处理器的方法其实是对所有的处理器都调用一遍,对我们构造出的 actorService 的 bean 进行挨个后处理。
/** * 初始化前应用BeanPostProcessors后处理器 */@Overridepublic Object applyBeanPostProcessorsBeforeInitialization(Object existingBean, String beanName) throws BeansException { Object result = existingBean; for (BeanPostProcessor beanProcessor : getBeanPostProcessors()) { Object current = beanProcessor.postProcessBeforeInitialization(result, beanName); if (current == null) { return result; } result = current; } return result;}/** * 初始化后应用BeanPostProcessors后处理器 */@Overridepublic Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName) throws BeansException { Object result = existingBean; for (BeanPostProcessor beanProcessor : getBeanPostProcessors()) { Object current = beanProcessor.postProcessAfterInitialization(result, beanName); if (current == null) { return result; } result = current; } return result;}
第一部分的我们注册了一个 bean 后处理器 InfrastructureAdvisorAutoProxyCreator
就在这派上用场了。debug 代码截图如下,此时遍历的 beanProcessor 就是 InfrastructureAdvisorAutoProxyCreator
类型的实例。
那么看一下,InfrastructureAdvisorAutoProxyCreator
后处理器实现接口的方法内容。首先,先看 postProcessBeforeInitialization()
接口方法,实现该方法是在 InfrastructureAdvisorAutoProxyCreator
的父类 AbstractAutoProxyCreator
中。可以看到,方法直接返回 bean,没做任何处理。
@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName) { return bean;}
转向后处理器的 postProcessAfterInitialization()
方法调用,该方法的实现其实还是在 AbstractAutoProxyCreator
中。
/** * 实现BeanPostProcessor接口方法,可能返回代理对象 * Create a proxy with the configured interceptors if the bean is * identified as one to proxy by the subclass. * @see #getAdvicesAndAdvisorsForBean */@Overridepublic Object postProcessAfterInitialization(@Nullable Object bean, String beanName) { if (bean != null) { // 根据给定的bean的class和name构建出个key Object cacheKey = getCacheKey(bean.getClass(), beanName); if (!this.earlyProxyReferences.contains(cacheKey)) { // 如果它适合被代理,则需要封装指定bean return wrapIfNecessary(bean, beanName, cacheKey); } } return bean;}
看到这里,应该很熟悉了,之前在分析AOP流程的时候,也是这个方法。方法内容也无需多说,主要就是先寻找合适增强器,然后进行增强返回代理类对象。
/** * 包装bean,增强成功返回代理bean * Wrap the given bean if necessary, i.e. if it is eligible for being proxied. * @param bean the raw bean instance * @param beanName the name of the bean * @param cacheKey the cache key for metadata access * @return a proxy wrapping the bean, or the raw bean instance as-is */protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { // 如果已经处理过直接返回 if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) { return bean; } // 如果无需增强也直接返回 if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) { return bean; } // 给定的bean类是否代表一个基础设施类,基础设施类不应代理,或者配置了指定bean不需要自动代理 if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) { this.advisedBeans.put(cacheKey, Boolean.FALSE); return bean; } // 如果存在增强方法或增强器则创建代理 Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null); // 如果获取到了增强则需要针对增强创建代理 if (specificInterceptors != DO_NOT_PROXY) { this.advisedBeans.put(cacheKey, Boolean.TRUE); // 创建代理bean,传入用SingletonTargetSource包装的原始bean Object proxy = createProxy( bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean)); this.proxyTypes.put(cacheKey, proxy.getClass()); return proxy; } // 没获取到增强方法或增强器也直接返回 this.advisedBeans.put(cacheKey, Boolean.FALSE); return bean;}
Spring通过 getAdvicesAndAdvisorsForBean()
获取指定 bean 对应的增强器,包含:增强器与对应。也就是说在 getAdvicesAndAdvisorsForBean()
函数中,不但要找出增强器,而且还需要判断增强器是否满足要求。
/** * 获取增强方法或增强器 * @param beanClass the class of the bean to advise * @param beanName the name of the bean * @param targetSource * @return */@Override@Nullableprotected Object[] getAdvicesAndAdvisorsForBean( Class<?> beanClass, String beanName, @Nullable TargetSource targetSource) { List<Advisor> advisors = findEligibleAdvisors(beanClass, beanName); if (advisors.isEmpty()) { return DO_NOT_PROXY; } return advisors.toArray();}/** * 寻找所有合适的增强(增强器并不一定都适用于当前bean,要选出满足我们通配符的增强器) * Find all eligible Advisors for auto-proxying this class. * @param beanClass the clazz to find advisors for * @param beanName the name of the currently proxied bean * @return the empty List, not {@code null}, * if there are no pointcuts or interceptors * @see #findCandidateAdvisors * @see #sortAdvisors * @see #extendAdvisors */protected List<Advisor> findEligibleAdvisors(Class<?> beanClass, String beanName) { // 获取所有的增强 List<Advisor> candidateAdvisors = findCandidateAdvisors(); // 寻找所有增强中适用于bean的增强并应用 List<Advisor> eligibleAdvisors = findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName); // 在advice链的开始添加ExposeInvocationInterceptor extendAdvisors(eligibleAdvisors); if (!eligibleAdvisors.isEmpty()) { eligibleAdvisors = sortAdvisors(eligibleAdvisors); } return eligibleAdvisors;}
其实我们也渐渐地体会到了Spring中代码的优秀,即使是一个很复杂的逻辑,在Spring中也会被拆分成若干个小的逻辑,然后在每个函数中实现,使得每个函数的逻辑简单到我们能快速地理解。同样,通过上面的函数,Spring又将任务进行了拆分,分成了获取所有增强器与匹配增强器两个功能点。
在 findCandidateAdvisors()
函数中完成的就是获取增强器的功能。
/** * 从XML配置文件获取所有配置的增强 * Find all candidate Advisors to use in auto-proxying. * @return the List of candidate Advisors */protected List<Advisor> findCandidateAdvisors() { Assert.state(this.advisorRetrievalHelper != null, "No BeanFactoryAdvisorRetrievalHelper available"); return this.advisorRetrievalHelper.findAdvisorBeans();}/** * 从容器中获取增强(class为Advisor类型的beanDefinition,然后通过beanFactory的getBean方法转换成对象返回) * Find all eligible Advisor beans in the current bean factory, * ignoring FactoryBeans and excluding beans that are currently in creation. * @return the list of {@link org.springframework.aop.Advisor} beans * @see #isEligibleBean */public List<Advisor> findAdvisorBeans() { // Determine list of advisor bean names, if not cached already. String[] advisorNames = null; synchronized (this) { // 从beanFactory中获取增强器 advisorNames = this.cachedAdvisorBeanNames; if (advisorNames == null) { // Do not initialize FactoryBeans here: We need to leave all regular beans // uninitialized to let the auto-proxy creator apply to them! advisorNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors( this.beanFactory, Advisor.class, true, false); this.cachedAdvisorBeanNames = advisorNames; } } if (advisorNames.length == 0) { return new ArrayList<>(); } List<Advisor> advisors = new ArrayList<>(); for (String name : advisorNames) { if (isEligibleBean(name)) { if (this.beanFactory.isCurrentlyInCreation(name)) { if (logger.isDebugEnabled()) { logger.debug("Skipping currently created advisor '" + name + "'"); } } else { try { // 获取到所有增强器名称后将其对应的实例添加至list advisors.add(this.beanFactory.getBean(name, Advisor.class)); } catch (BeanCreationException ex) { Throwable rootCause = ex.getMostSpecificCause(); if (rootCause instanceof BeanCurrentlyInCreationException) { BeanCreationException bce = (BeanCreationException) rootCause; String bceBeanName = bce.getBeanName(); if (bceBeanName != null && this.beanFactory.isCurrentlyInCreation(bceBeanName)) { if (logger.isDebugEnabled()) { logger.debug("Skipping advisor '" + name + "' with dependency on currently created bean: " + ex.getMessage()); } // Ignore: indicates a reference back to the bean we're trying to advise. // We want to find advisors other than the currently created bean itself. continue; } } throw ex; } } } } return advisors;}
其中最为关键的代码是 BeanFactoryUtils.beanNamesForTypeIncludingAncestors(this.beanFactory, Advisor.class, true, false)
,通过 BeanFactoryUtils
类提供的工具方法获取所有对应 Advisor.class
的类,获取办法无非是使用 ListableBeanFactory
中提供的方法。
/** * 根据类型获取所有符合的beanNames */public static String[] beanNamesForTypeIncludingAncestors( ListableBeanFactory lbf, Class<?> type, boolean includeNonSingletons, boolean allowEagerInit) { Assert.notNull(lbf, "ListableBeanFactory must not be null"); // 调用ListableBeanFactory的方法 String[] result = lbf.getBeanNamesForType(type, includeNonSingletons, allowEagerInit); if (lbf instanceof HierarchicalBeanFactory) { HierarchicalBeanFactory hbf = (HierarchicalBeanFactory) lbf; if (hbf.getParentBeanFactory() instanceof ListableBeanFactory) { // 递归调用 String[] parentResult = beanNamesForTypeIncludingAncestors( (ListableBeanFactory) hbf.getParentBeanFactory(), type, includeNonSingletons, allowEagerInit); result = mergeNamesWithParent(result, parentResult, hbf); } } return result;}
当我们知道所有增强器实例在容器中的 beanName 时,根据 beanName 获取增强器实例也不是问题。
那么这里我们获到哪些增强器呢?
回顾我们上一部分注册的 beanDefinition 中,其中一个是 TransactionAttributeSourceAdvisor
,不用想这肯定是一个增强器,在获取所有增强器时自然也会将此 bean 提取出来,并随着其他增强器一起在后续的步骤中被织入代理。这个 bean 类型其实是 BeanFactoryTransactionAttributeSourceAdvisor
, 其中还包含了注册的其他两个 bean。
当找出所有的增强器后,接来的任务就是看这些增强器是否与 bean (比如 actorService)对应的 class 匹配了,当然不只是 class,class 内部的方法如果能够匹配也可以。
/** * 寻找所有增强中适用于目标bean的增强并应用 */protected List<Advisor> findAdvisorsThatCanApply( List<Advisor> candidateAdvisors, Class<?> beanClass, String beanName) { // 设置当前要代理的beanName ProxyCreationContext.setCurrentProxiedBeanName(beanName); try { // 过滤已经得到的advisors return AopUtils.findAdvisorsThatCanApply(candidateAdvisors, beanClass); } finally { // 清除标记 ProxyCreationContext.setCurrentProxiedBeanName(null); }}/** * 主要功能是寻找所有增强器中适用于当前class的增强器 */public static List<Advisor> findAdvisorsThatCanApply(List<Advisor> candidateAdvisors, Class<?> clazz) { if (candidateAdvisors.isEmpty()) { return candidateAdvisors; } // 保存筛选的合适增强器 List<Advisor> eligibleAdvisors = new ArrayList<>(); // 首先处理引介增强 for (Advisor candidate : candidateAdvisors) { // 核心匹配实现:canApply() if (candidate instanceof IntroductionAdvisor && canApply(candidate, clazz)) { eligibleAdvisors.add(candidate); } } boolean hasIntroductions = !eligibleAdvisors.isEmpty(); for (Advisor candidate : candidateAdvisors) { if (candidate instanceof IntroductionAdvisor) { // 引介增强已经处理 continue; } // 如果不是IntroductionAdvisor实例,对于普通的增强,调用canApply()重载方法 if (canApply(candidate, clazz, hasIntroductions)) { eligibleAdvisors.add(candidate); } } return eligibleAdvisors;}
当前我们分析的是对于 actorService 是否适用于此增强方法,那么当前的 advisor 就是之前查找出来的类型为 BeanFactoryTransactionAttributeSourceAdvisor
的 bean 实例。匹配合适的增强器核心步骤就是调用 canApply()
方法。
public static boolean canApply(Advisor advisor, Class<?> targetClass) { return canApply(advisor, targetClass, false);}public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) { // 如果增强是引介增强,那么直接获取切点表达式去匹配targetClass即可 if (advisor instanceof IntroductionAdvisor) { return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass); } // 如果属于PointcutAdvisor,继续调用重载方法 else if (advisor instanceof PointcutAdvisor) { PointcutAdvisor pca = (PointcutAdvisor) advisor; return canApply(pca.getPointcut(), targetClass, hasIntroductions); } // 其余的默认为符合条件 else { // It doesn't have a pointcut so we assume it applies. return true; }}
通过类的层次结构我们又知道:BeanFactoryTransactionAttributeSourceAdvisor
间接实现了 PointcutAdvisor
。因此,在 canApply()
函数中的第二个 if 判断时就会通过判断,将 BeanFactoryTransactionAttributeSourceAdvisor
中的 getPointcut()
方法返回值作为参数继续调用 canApply()
重载方法。getPointcut()
方法返回的是 TransactionAttributeSourcePointcut
类型的实例,可以通过它的 getTransactionAttributeSource()
返回 TransactionAttributeSource
实例。TransactionAttributeSource
实例是我们上一部分注册的三个 bean 之一。
public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor { @Nullable private TransactionAttributeSource transactionAttributeSource; // 创建BeanFactoryTransactionAttributeSourceAdvisor时,默认就创建出TransactionAttributeSourcePointcut类型的pointcut private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() { @Override @Nullable protected TransactionAttributeSource getTransactionAttributeSource() { return transactionAttributeSource; } }; @Override public Pointcut getPointcut() { return this.pointcut; }}
那么,使用 TransactionAttributeSourcePointcut
类型的实例作为函数参数继续跟踪 canApply()
方法,targetClass 参数为 ActorServiceImpl
。
public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) { Assert.notNull(pc, "Pointcut must not be null"); // 获取切点表达式,并判断是否能够匹配上目标类中的方法 if (!pc.getClassFilter().matches(targetClass)) { return false; } // 然后继续寻找匹配类中哪个方法 // 此时的pc表示TransactionAttributeSourcePointcut,pc.getMethodMatcher()返回的正是自身(this) MethodMatcher methodMatcher = pc.getMethodMatcher(); if (methodMatcher == MethodMatcher.TRUE) { // No need to iterate the methods if we're matching any method anyway... return true; } IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null; if (methodMatcher instanceof IntroductionAwareMethodMatcher) { introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher; } Set<Class<?>> classes = new LinkedHashSet<>(); // 如果是代理类,则需要返回原始类 if (!Proxy.isProxyClass(targetClass)) { classes.add(ClassUtils.getUserClass(targetClass)); } classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass)); for (Class<?> clazz : classes) { Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz); // 遍历类中的所有方法 for (Method method : methods) { if (introductionAwareMethodMatcher != null ? introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) : methodMatcher.matches(method, targetClass)) { return true; } } } return false;}
通过上面函数大致可以理清大体脉络,首先获取对类本身进行遍历,如果不能确认是否匹配成功,那么就尝试获取类的所有接口,再连通类本身获取他们所有的方法,遍历每一个方法查看是否匹配,方法一旦匹配成功便认为这个类适用于当前增强器。匹配是通过 methodMatcher.matches()
方法实现的。methodMatcher 是通过 pc.getMethodMatcher()
获得,而 pc.getMethodMatcher()
返回的其实是 this,也就还是 pc。所以我们查看 TransactionAttributeSourcePointcut
的 matches()
方法。
@Overridepublic boolean matches(Method method, Class<?> targetClass) { // 判断targetClass是否是TransactionalProxy的子类或者子接口,是的话直接返回false if (TransactionalProxy.class.isAssignableFrom(targetClass)) { return false; } // 调用子类的getTransactionAttributeSource()方法 // 其实在构造BeanFactoryTransactionAttributeSourceAdvisor时进行了重写,返回BeanFactoryTransactionAttributeSourceAdvisor的annotationTransactionAttributeSource TransactionAttributeSource tas = getTransactionAttributeSource(); return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);}/** * 模板方法,由BeanFactoryTransactionAttributeSourceAdvisor实现 * Obtain the underlying TransactionAttributeSource (may be {@code null}). * To be implemented by subclasses. */@Nullableprotected abstract TransactionAttributeSource getTransactionAttributeSource();
matches()
方法中定义了一个抽象模板函数,用于返回 TransactionAttributeSource
实例。我们能够猜到返回的就是最初我们注册的三个 beanDefinition 之一,那么究竟怎么返回的呢。其实我们在创建 BeanFactoryTransactionAttributeSourceAdvisor
中创建 TransactionAttributeSourcePointcut
实例的时候就重写了 getTransactionAttributeSource()
方法,方法内返回的就是我们注册的 AnnotationTransactionAttributeSource
实例。
@Nullableprivate TransactionAttributeSource transactionAttributeSource;// 创建BeanFactoryTransactionAttributeSourceAdvisor时,默认就创建出TransactionAttributeSourcePointcut类型的pointcutprivate final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() { @Override @Nullable protected TransactionAttributeSource getTransactionAttributeSource() { return transactionAttributeSource; }};
所以,matches()
方法中返回的 tas 就是 AnnotationTransactionAttributeSource
实例。调用 getTransactionAttribute(method, targetClass)
方法来返回一个 TransactionAttribute
事务属性对象,如果返回结果不为空,那么说明这是一个匹配的增强器。
/** * Determine the transaction attribute for this method invocation. * <p>Defaults to the class's transaction attribute if no method attribute is found. * @param method the method for the current invocation (never {@code null}) * @param targetClass the target class for this invocation (may be {@code null}) * @return a TransactionAttribute for this method, or {@code null} if the method * is not transactional */@Override@Nullablepublic TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) { // 如果是Object中定义的方法,返回null if (method.getDeclaringClass() == Object.class) { return null; } // 首先查看缓存中是否有对应的处理结果 Object cacheKey = getCacheKey(method, targetClass); Object cached = this.attributeCache.get(cacheKey); if (cached != null) { // Value will either be canonical value indicating there is no transaction attribute, // or an actual transaction attribute. if (cached == NULL_TRANSACTION_ATTRIBUTE) { return null; } else { return (TransactionAttribute) cached; } } else { // We need to work it out. TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass); // Put it in the cache. if (txAttr == null) { // 添加null缓存 this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE); } else { String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass); if (txAttr instanceof DefaultTransactionAttribute) { ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification); } if (logger.isDebugEnabled()) { logger.debug("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr); } // 添加缓存 this.attributeCache.put(cacheKey, txAttr); } return txAttr; }}
getTransactionAttribute()
的函数逻辑还是非常清晰的,主要目的是返回和 method 和 targetClass (cacheKey)对应的 TransactionAttribute
事务属性。方法中包含着缓存逻辑,Spring先尝试从缓存加载,如果不存在缓存,则通过 computeTransactionAttribute()
获取,获取到添加缓存。所以,重点研究 computeTransactionAttribute()
方法。当然,需要注意的是,getTransactionAttribute()
的函数最终返回 txAttr,txAttr 可能为空,空的时候表明至少该方法没有事务,说明增强器对该方法不匹配。如果类中所有的方法都不匹配,那就表明增强器不匹配该类了。
继续深入 computeTransactionAttribute()
方法。该方法中其实就是解析事务注解配置信息的过程,看来事务注解的信息对应在Spring中的数据结构是 TransactionAttribute
。解析的规则查看代码注释相信可以很清楚的理解,如果方法存在事务属性,则使用方法上的属性,否则使用方法所在的类上的属性;如果方法所在类的属性上还是没有搜寻到对应的事务属性,那么再搜寻接口中的方法,再没有的话,最后尝试搜寻接口的类上面的声明。
当然,在开始对方法进行事务属性搜寻之前,Spring对方法进行了判断,判断是否是 public 方法,如果不是直接返回空,也就是跳过该方法了。这也就解释我们之前介绍Spring配置的事务如果配置到非 public 方法上就不起效果了。
/** * 事务标签的解析提取 * Same signature as {@link #getTransactionAttribute}, but doesn't cache the result. * {@link #getTransactionAttribute} is effectively a caching decorator for this method. * <p>As of 4.1.8, this method can be overridden. * @since 4.1.8 * @see #getTransactionAttribute */@Nullableprotected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) { // Don't allow no-public methods as required. if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) { return null; } // The method may be on an interface, but we need attributes from the target class. // If the target class is null, the method will be unchanged. // method代表接口中的方法,specificMethod代表实现类中的方法 Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass); // 首先查看方法是否存在事务声明 TransactionAttribute txAttr = findTransactionAttribute(specificMethod); if (txAttr != null) { return txAttr; } // 其次查看方法所在类是否存在事务声明 txAttr = findTransactionAttribute(specificMethod.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } // 如果存在接口,则到接口中找 if (specificMethod != method) { // 首先查看接口方法是否存在事务声明 txAttr = findTransactionAttribute(method); if (txAttr != null) { return txAttr; } // 其次查看方法所在接口是否存在事务声明 txAttr = findTransactionAttribute(method.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } } return null;}
computeTransactionAttribute()
函数中并没有真正的去做搜寻事务属性的逻辑,而是搭建了一个执行框架,将搜寻事务属性的任务委托给了 findTransactionAttribute()
方法去执行。
/** * 寻找方法上是否存在事务声明 * @param method the method to retrieve the attribute for * @return */@Override@Nullableprotected TransactionAttribute findTransactionAttribute(Method method) { return determineTransactionAttribute(method);}/** * 寻找类或接口是否存在事务声明 * @param clazz the class to retrieve the attribute for * @return */@Override@Nullableprotected TransactionAttribute findTransactionAttribute(Class<?> clazz) { return determineTransactionAttribute(clazz);}
可以看到,无论是对方法还是类/ 接口,最终都交由 determineTransactionAttribute(AnnotatedElement ae)
方法去完成解析。
/** * Determine the transaction attribute for the given method or class. * <p>This implementation delegates to configured * {@link TransactionAnnotationParser TransactionAnnotationParsers} * for parsing known annotations into Spring's metadata attribute class. * Returns {@code null} if it's not transactional. * <p>Can be overridden to support custom annotations that carry transaction metadata. * @param ae the annotated method or class * @return the configured transaction attribute, * or {@code null} if none was found */@Nullableprotected TransactionAttribute determineTransactionAttribute(AnnotatedElement ae) { for (TransactionAnnotationParser annotationParser : this.annotationParsers) { TransactionAttribute attr = annotationParser.parseTransactionAnnotation(ae); if (attr != null) { return attr; } } return null;}
在 determineTransactionAttribute()
方法中,解析事务注解配置依靠 parse 解析类来帮助完成,这些 parse 解析类从代码上看应该是存放在 this.annotationParsers 中,而 this.annotationParsers 是当前类 AnnotationTransactionAttributeSource
中的一个 Set 集合成员。
public class AnnotationTransactionAttributeSource extends AbstractFallbackTransactionAttributeSource implements Serializable { ···· private final boolean publicMethodsOnly; private final Set<TransactionAnnotationParser> annotationParsers; public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) { this.publicMethodsOnly = publicMethodsOnly; this.annotationParsers = new LinkedHashSet<>(2); this.annotationParsers.add(new SpringTransactionAnnotationParser()); ···· }
可以看出在 AnnotationTransactionAttributeSource
初始化时,会给集合中添加一个解析类 SpringTransactionAnnotationParser
的实例。所以在 determineTransactionAttribute()
方法中其实委托了 SpringTransactionAnnotationParser
中的 parseTransactionAnnotation()
方法进行解析。
/** * 解析事务注解 * @param ae the annotated method or class * @return */@Override@Nullablepublic TransactionAttribute parseTransactionAnnotation(AnnotatedElement ae) { AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes( ae, Transactional.class, false, false); if (attributes != null) { return parseTransactionAnnotation(attributes); } else { return null; }}
首先,先通过 AnnotatedElementUtils
的 findMergedAnnotationAttributes()
方法解析 @Transactional 注解,返回 AnnotationAttributes
,然后再利用 parseTransactionAnnotation()
对注解信息进行解析返回最终的事务属性实例。
通过 findMergedAnnotationAttributes()
方法返回的结果如下,可以看到返回回来的是事务注解中的所有可配置属性。
细心的读者可能发现了,这个 parseTransactionAnnotation()
传入的参数 AnnotatedElement ae 其实是 ActorServiceImpl
类型,我们的测试示例并没有在 ActorServiceImpl
类上加上 @Transactional 注解,而是选择在 IActorService
接口上添加了注解,为什么依然能够解析的 @Transactional 注解信息呢?其实答案很简单, AnnotatedElementUtils
的 findMergedAnnotationAttributes()
方法中不仅仅去寻找实现类上的注解,同时还去其所有的接口中进行寻找。这里就简单的截取 AnnotatedElementUtils
中的代码片段,读者可以自行 debug。
好了,获取注解信息其实并不是我们的重点,将注解信息封装成Spring的事务属性 TransactionAttribute
数据结构才是我们最该关注的事情。调用 parseTransactionAnnotation(attributes)
方法进行封装。
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) { RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute(); // 解析propagation Propagation propagation = attributes.getEnum("propagation"); rbta.setPropagationBehavior(propagation.value()); // 解析isolation Isolation isolation = attributes.getEnum("isolation"); rbta.setIsolationLevel(isolation.value()); // 解析timeout rbta.setTimeout(attributes.getNumber("timeout").intValue()); // 解析readOnly rbta.setReadOnly(attributes.getBoolean("readOnly")); // 解析value rbta.setQualifier(attributes.getString("value")); ArrayList<RollbackRuleAttribute> rollBackRules = new ArrayList<>(); // 解析rollbackFor Class<?>[] rbf = attributes.getClassArray("rollbackFor"); for (Class<?> rbRule : rbf) { RollbackRuleAttribute rule = new RollbackRuleAttribute(rbRule); rollBackRules.add(rule); } // 解析rollbackForClassName String[] rbfc = attributes.getStringArray("rollbackForClassName"); for (String rbRule : rbfc) { RollbackRuleAttribute rule = new RollbackRuleAttribute(rbRule); rollBackRules.add(rule); } // 解析noRollbackFor Class<?>[] nrbf = attributes.getClassArray("noRollbackFor"); for (Class<?> rbRule : nrbf) { NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(rbRule); rollBackRules.add(rule); } // 解析noRollbackForClassName String[] nrbfc = attributes.getStringArray("noRollbackForClassName"); for (String rbRule : nrbfc) { NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(rbRule); rollBackRules.add(rule); } rbta.getRollbackRules().addAll(rollBackRules); return rbta;}
上面方法中实现了对对应类或者方法的事务注解的解析,你会在这个类中看到任何你常用或者不常用的属性提取。我们实际将事务注解信息转换成了 RuleBasedTransactionAttribute
实例进行了返回。
至此,我们终于完成了事务标签的解析。我们是不是分析的太远了,似乎已经忘了从哪里开始了。再回顾一下,我们的任务是找出某个增强器是否适合于对应的类,而是否匹配的关键则在于是否从指定的类或类中的方法中找到对应的事务属性。
现在,我们是以 ActorServiceImpl 为例的,已经在它的接口上找到了事务注解,那么自然就返回了不为空的 TransactionAttribute
实例,所以它是与事务增强器匹配的,也就是它会被事务功能修饰。
至此,事务功能的初始化工作便结束了,当判断某个 bean 适用于事务增强时,也就是适用于增强器 BeanFactoryTransactionAttributeSourceAdvisor
,所以说,在自定义标签解析时,注入的四个 bean 成为了整个事务功能的基础。
最开始解析自定义标签的时候我们向容器中添加了四个 bean,所有的 bean 我们都已经看到它们的出现,其中三个我们已经重点接触过了,还剩下 TransactionInterceptor
。BeanFactoryTransactionAttributeSourceAdvisor
作为 Advisor
的实现类,自然要遵从 Advisor
的处理方式,当代理对象被调用时会调用这个类的增强方法,也就是此 bean 的 Advise
,又因为在解析事务定义标签时我们把 TransactionInterceptor
类型的 bean 注入到了 BeanFactoryTransactionAttributeSourceAdvisor
中,所以,在调用事务增强器增强的代理类时会首先执行 TransactionInterceptor
进行增强,同时,也就是在 TransactionInterceptor
类中的 invoke()
方法中完成了整个事务的逻辑。
我们从调用 actorService 的 save()
方法开始分析,先确认一下,我们从容器中获取的 actorService 是什么类型。下图显示的是其真实类型是一个代理类类型,说明事务相关方法已经织入目标对象,接下来在调用目标对象的方法时便能够体现增强逻辑。
因为是JDK动态代理,当调用目标方法时,直接执行 JdkDynamicAopProxy
(实现 InvocationHandler
接口) 的 invoke() 方法。
/** * 实现InvocationHandler的invoke方法 */@Override@Nullablepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable { MethodInvocation invocation; Object oldProxy = null; boolean setProxyContext = false; // 包含了原始类对象信息 TargetSource targetSource = this.advised.targetSource; Object target = null; try { // 对equals方法的处理 if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) { // The target does not implement the equals(Object) method itself. return equals(args[0]); } // 对hashcode方法的处理 else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) { // The target does not implement the hashCode() method itself. return hashCode(); } else if (method.getDeclaringClass() == DecoratingProxy.class) { // There is only getDecoratedClass() declared -> dispatch to proxy config. return AopProxyUtils.ultimateTargetClass(this.advised); } // Class类的isAssignableFrom(Class cls)方法:如果调用这个方法的class或接口 与 参数cls表示的类或接口相同, // 或者是参数cls表示的类或接口的父类,则返回true。例如: // System.out.println(ArrayList.class.isAssignableFrom(Object.class)); --> false // System.out.println(Object.class.isAssignableFrom(ArrayList.class)); --> true // 如果method所在类是Advised父类,则直接调用切点方法 else if (!this.advised.opaque && method.getDeclaringClass().isInterface() && method.getDeclaringClass().isAssignableFrom(Advised.class)) { // Service invocations on ProxyConfig with the proxy config... return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args); } Object retVal; // 有时候目标对象内部的自我调用将无法实施切面中的增强则需要通过此属性暴露代理至ThreadLocal中 if (this.advised.exposeProxy) { // Make invocation available if necessary. // 将代理类对象proxy保存到ThreadLocal中,同时获取之前存储的oldProxy oldProxy = AopContext.setCurrentProxy(proxy); setProxyContext = true; } // Get as late as possible to minimize the time we "own" the target, // in case it comes from a pool. // 获取目标对象及类型 target = targetSource.getTarget(); Class<?> targetClass = (target != null ? target.getClass() : null); // 获取当前方法的拦截器链(之前我们找的增强器统一封装成了拦截器链) List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass); // Check whether we have any advice. If we don't, we can fallback on direct // reflective invocation of the target, and avoid creating a MethodInvocation. if (chain.isEmpty()) { // We can skip creating a MethodInvocation: just invoke the target directly // Note that the final invoker must be an InvokerInterceptor so we know it does // nothing but a reflective operation on the target, and no hot swapping or fancy proxying. // 如果没有发现任何拦截器那么直接调用切点方法 Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args); retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse); } else { // We need to create a method invocation... // 将拦截器封装在ReflectiveMethodInvocation,以便于使用其proceed进行链接调用拦截器 invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain); // Proceed to the joinpoint through the interceptor chain. // 执行拦截器链中每个拦截器的invoke方法 retVal = invocation.proceed(); } // Massage return value if necessary. // 返回结果 Class<?> returnType = method.getReturnType(); if (retVal != null && retVal == target && returnType != Object.class && returnType.isInstance(proxy) && !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) { // Special case: it returned "this" and the return type of the method // is type-compatible. Note that we can't help if the target sets // a reference to itself in another returned object. retVal = proxy; } else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) { throw new AopInvocationException( "Null return value from advice does not match primitive return type for: " + method); } return retVal; } finally { if (target != null && !targetSource.isStatic()) { // Must have come from TargetSource. targetSource.releaseTarget(target); } if (setProxyContext) { // Restore old proxy. AopContext.setCurrentProxy(oldProxy); } }}
getInterceptorsAndDynamicInterceptionAdvice()
方法获得的拦截器调用链 chain 内容其实只有 TransactionInterceptor
一个,TransactionInterceptor
类继承自 MethodInterceptor
。
随后进行拦截器的逐一调用,调用的其实是拦截器中的 invoke()
方法。TransactionInterceptor
支撑着整个事务功能的架构,逻辑还是相对复杂的。
/** * 调用该事务拦截器的方法入口 * @param invocation the method invocation joinpoint * @return * @throws Throwable */@Override@Nullablepublic Object invoke(MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);}
targetClass 就是 ActorServiceImpl
的全限定类名,主要还是看 invokeWithinTransaction()
方法。
/** * General delegate for around-advice-based subclasses, delegating to several other template * methods on this class. Able to handle {@link CallbackPreferringPlatformTransactionManager} * as well as regular {@link PlatformTransactionManager} implementations. * @param method the Method being invoked * @param targetClass the target class that we're invoking the method on * @param invocation the callback to use for proceeding with the target invocation * @return the return value of the method, if any * @throws Throwable propagated from the target invocation */@Nullableprotected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. // 1. 获取对应配置的事务属性,事务属性保存在AnnotationTransactionAttributeSource中 TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); // 2. 获取beanFactory中的transactionManager final PlatformTransactionManager tm = determineTransactionManager(txAttr); // 构造方法唯一标识(类.方法,如xxx.xxx.service.ActorServiceImpl.save) final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); // 3. 对不同的事务处理方式使用不同的逻辑,包括声明式和编程式 // 声明式事务处理 if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. // 4. 创建TransactionInfo,在目标方法执行前获取事务并收集事务信息 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. // 5. 执行被增强方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception // 6. 异常回滚 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 7. 清除信息 cleanupTransactionInfo(txInfo); } // 8. 提交事务 commitTransactionAfterReturning(txInfo); return retVal; } // 编程式事务处理 else { final ThrowableHolder throwableHolder = new ThrowableHolder(); // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> { TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); try { return invocation.proceedWithInvocation(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; return null; } } finally { cleanupTransactionInfo(txInfo); } }); // Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; } }}
该函数就是本部分我们需要着重研究的对象,我们尝试整理下事务处理的脉络,在Spring中支持两种事务处理的方式,分别是声明式事务处理与编程式事务处理,两者相对于开发人员来讲差别很大,考虑到对事务的应用比声明式的事务处理使用起来方便,也相对流行些,我们就分析声明式事务。
对于声明式的事务处理主要有以下几个步骤:
AnnotationTransactionAttributeSource
。TransactionManager
。TransactionInfo
,在目标方法执行前获取事务并收集事务信息。RuntimeException
回滚。下面我们就来这八个步骤进行细致分析。
在注册 TransactionInterceptor
的时候我们就已经将 TransactionAttributeSource
注入进去,所以直接利用 getter 方法即可轻易的获得 TransactionAttributeSource
实例。
/** * Return the transaction attribute source. */@Nullablepublic TransactionAttributeSource getTransactionAttributeSource() { return this.transactionAttributeSource;}
我们从事务注解中解析的数据Spring用 TransactionAttribute
进行包装,而 TransactionAttributeSource
又包装了 TransactionAttribute
。通过 tas.getTransactionAttribute(method, targetClass)
方法即可获得事务属性 TransactionAttribute
。可以看到这个方法我们之前已经接触过,再次调用直接从缓存中即可取出 TransactionAttribute
。
/** * Determine the transaction attribute for this method invocation. * <p>Defaults to the class's transaction attribute if no method attribute is found. * @param method the method for the current invocation (never {@code null}) * @param targetClass the target class for this invocation (may be {@code null}) * @return a TransactionAttribute for this method, or {@code null} if the method * is not transactional */@Override@Nullablepublic TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) { // 如果是Object中定义的方法,返回null if (method.getDeclaringClass() == Object.class) { return null; } // 首先查看缓存中是否有对应的处理结果 Object cacheKey = getCacheKey(method, targetClass); Object cached = this.attributeCache.get(cacheKey); if (cached != null) { // Value will either be canonical value indicating there is no transaction attribute, // or an actual transaction attribute. if (cached == NULL_TRANSACTION_ATTRIBUTE) { return null; } else { return (TransactionAttribute) cached; } } else { // We need to work it out. TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass); // Put it in the cache. if (txAttr == null) { // 添加null缓存 this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE); } else { String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass); if (txAttr instanceof DefaultTransactionAttribute) { ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification); } if (logger.isDebugEnabled()) { logger.debug("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr); } // 添加缓存 this.attributeCache.put(cacheKey, txAttr); } return txAttr; }}
根据提取的事务属性通过 determineTransactionManager()
方法返回配置文件中配置的 TransactionManager
实例。
/** * 根据事务属性获取容器中的transactionManager * Determine the specific transaction manager to use for the given transaction. */@Nullableprotected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) { // Do not attempt to lookup tx manager if no tx attributes are set if (txAttr == null || this.beanFactory == null) { return getTransactionManager(); } String qualifier = txAttr.getQualifier(); if (StringUtils.hasText(qualifier)) { return determineQualifiedTransactionManager(this.beanFactory, qualifier); } else if (StringUtils.hasText(this.transactionManagerBeanName)) { return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName); } else { PlatformTransactionManager defaultTransactionManager = getTransactionManager(); if (defaultTransactionManager == null) { defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY); if (defaultTransactionManager == null) { defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class); this.transactionManagerCache.putIfAbsent( DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager); } } return defaultTransactionManager; }}
我们在配置事务属性的时候没有指定 qualifier,而是指定了 transactionManagerBeanName 为 transactionManager。所以根据函数逻辑继续调用 determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName)
。
private PlatformTransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) { PlatformTransactionManager txManager = this.transactionManagerCache.get(qualifier); if (txManager == null) { txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType( beanFactory, PlatformTransactionManager.class, qualifier); this.transactionManagerCache.putIfAbsent(qualifier, txManager); } return txManager;}/** * Obtain a bean of type {@code T} from the given {@code BeanFactory} declaring a * qualifier (e.g. via {@code <qualifier>} or {@code @Qualifier}) matching the given * qualifier, or having a bean name matching the given qualifier. * @param beanFactory the BeanFactory to get the target bean from * @param beanType the type of bean to retrieve * @param qualifier the qualifier for selecting between multiple bean matches * @return the matching bean of type {@code T} (never {@code null}) * @throws NoUniqueBeanDefinitionException if multiple matching beans of type {@code T} found * @throws NoSuchBeanDefinitionException if no matching bean of type {@code T} found * @throws BeansException if the bean could not be created * @see BeanFactory#getBean(Class) */public static <T> T qualifiedBeanOfType(BeanFactory beanFactory, Class<T> beanType, String qualifier) throws BeansException { Assert.notNull(beanFactory, "BeanFactory must not be null"); if (beanFactory instanceof ConfigurableListableBeanFactory) { // Full qualifier matching supported. return qualifiedBeanOfType((ConfigurableListableBeanFactory) beanFactory, beanType, qualifier); } else if (beanFactory.containsBean(qualifier)) { // Fallback: target bean at least found by bean name. return beanFactory.getBean(qualifier, beanType); } else { throw new NoSuchBeanDefinitionException(qualifier, "No matching " + beanType.getSimpleName() + " bean found for bean name '" + qualifier + "'! (Note: Qualifier matching not supported because given " + "BeanFactory does not implement ConfigurableListableBeanFactory.)"); }}/** * Obtain a bean of type {@code T} from the given {@code BeanFactory} declaring a qualifier * (e.g. {@code <qualifier>} or {@code @Qualifier}) matching the given qualifier). * @param bf the BeanFactory to get the target bean from * @param beanType the type of bean to retrieve * @param qualifier the qualifier for selecting between multiple bean matches * @return the matching bean of type {@code T} (never {@code null}) */private static <T> T qualifiedBeanOfType(ConfigurableListableBeanFactory bf, Class<T> beanType, String qualifier) { String[] candidateBeans = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(bf, beanType); String matchingBean = null; for (String beanName : candidateBeans) { if (isQualifierMatch(qualifier::equals, beanName, bf)) { if (matchingBean != null) { throw new NoUniqueBeanDefinitionException(beanType, matchingBean, beanName); } matchingBean = beanName; } } if (matchingBean != null) { return bf.getBean(matchingBean, beanType); } else if (bf.containsBean(qualifier)) { // Fallback: target bean at least found by bean name - probably a manually registered singleton. return bf.getBean(qualifier, beanType); } else { throw new NoSuchBeanDefinitionException(qualifier, "No matching " + beanType.getSimpleName() + " bean found for qualifier '" + qualifier + "' - neither qualifier match nor bean name match!"); }}
经过层层调用,根据 PlatformTransactionManager.class
类型去容器中寻找对应的所有 beanNames,然后根据这些 beanNames,调用 isQualifierMatch(qualifier::equals, beanName, bf)
遍历检测 beanNames 是否是我们想要返回的 TransactionManager
实例的 bean 名称。如果是,直接从容器中返回实例即可。我们配置的 TransactionManager
其实类型是 DataSourceTransactionManager
。DataSourceTransactionManager
实现了 PlatformTransactionManager
接口。
通过配置文件我们就能看出,TransactionManager
主要包含了数据库连接信息等,所以有关数据库的操作等都和它有关。
判断声明式事务还是编程式事务主要有两个依据。
TransactionManager
是否属于 CallbackPreferringPlatformTransactionManager
。txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)
对于声明式事务的处理与编程式事务的处理,第一点区别在于事务属性上,因为编程式的事务处理是不需要有事务属性的,第二点区别就是在 TransactionManager
上,CallbackPreferringPlatformTransactionManager
实现 PlatformTransactionManager
接口,暴露出一个方法用于执行事务处理中的回调。所以,这两种方式都可以用作事务处理方式的判断。
我们示例中定义的 transactionManager 是 DataSourceTransactionManager
类型,从上面的类继承结构可以看出并不是 CallbackPreferringPlatformTransactionManager
的实例,所以Spring判断为是声明式事务。
TransactionInfo
与 TransactionAttribute
并不相同,也就是事务信息与事务属性并不相同。TransactionInfo
中包含 TransactionAttribute
信息,但是除了 TransactionAttribute
外还有其他事务信息,例如 PlatformTransactionManager
以及 TransactionStatus
相关信息。
/** * 创建事务信息TransactionInfo * Create a transaction if necessary based on the given TransactionAttribute. * <p>Allows callers to perform custom TransactionAttribute lookups through * the TransactionAttributeSource. * @param txAttr the TransactionAttribute (may be {@code null}) * @param joinpointIdentification the fully qualified method name * (used for monitoring and logging purposes) * @return a TransactionInfo object, whether or not a transaction was created. * The {@code hasTransaction()} method on TransactionInfo can be used to * tell if there was a transaction created. * @see #getTransactionAttributeSource() */@SuppressWarnings("serial")protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // If no name specified, apply method identification as transaction name. // 如果没有名称指定则使用方法唯一标识,并使用DelegatingTransactionAttribute封装txAttr if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { // 获取TransactionStatus status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } // 根据指定的属性与status准备一个TransactionInfo return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);}
createTransactionIfNecessar()
函数主要流程如下:
DelegatingTransactionAttribute
封装传入的 TransactionAttribute
实例。TransactionStatus
。事务处理是以事务为核心,那么获取事务就是最重要的事情。TransactionInfo
并返回。对于传入的 TransactionAttribute
类型的参数 txAttr,当前的实际类型是 RuleBasedTransactionAttribute
,是由上一部分提取事务注解信息时生成的,主要用于数据承载,而这里之所以使用 DelegatingTransactionAttribute
进行封装,主要是提供了更多的功能。
Spring中通过 AbstractPlatformTransactionManager
的 getTransaction()
方法处理事务的准备工作,包括事务获取以及信息的构建。TransactionAttribute
实现了 TransactionDefinition
接口,所以方法参数直接传入事务属性。
/** * 获取事务状态 * This implementation handles propagation behavior. Delegates to * {@code doGetTransaction}, {@code isExistingTransaction} * and {@code doBegin}. * @see #doGetTransaction * @see #isExistingTransaction * @see #doBegin */@Overridepublic final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // 1. 获取事务 Object transaction = doGetTransaction(); // Cache debug flag to avoid repeated checks. boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); } // 2. 判断当前线程是否存在事务,判读依据为当前线程记录的连接不为空且连接中(connectionHolder)中的transactionAdvice属性不为空 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. // 当前线程已经存在事务,则转向嵌套事务的处理 return handleExistingTransaction(definition, transaction, debugEnabled); } // Check definition settings for new transaction. // 3. 事务超时设置验证 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // No existing transaction found -> check propagation behavior to find out how to proceed. // 4. 执行到这说明当前线程不存在事务,但是propagationBehavior却被声明为PROPAGATION_MANDATORY 抛出异常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED都需要新建事务 // 空挂起 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // 5. 构造DefaultTransactionStatus实例 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 6. 完善transaction,包括设置ConnectionHolder、隔离级别、timeout。如果是新连接,绑定到当前线程 doBegin(transaction, definition); // 7. 新同步事务的设置,针对于当前线程的设置,将事务信息记录在当前线程中 prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); }}
可以看到,获取事务的环节也是非常繁琐,从注释上看就需要很多步骤。事务的准备工作包括:
获取事务。
我们具体查看 DataSourceTransactionManager
的 doGetTransaction()
方法。
/** * 获取事务,实现自AbstractPlatformTransactionManager中的模板方法 * @return */@Overrideprotected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); // 如果当前线程已经记录数据库连接则使用原连接 ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource()); // false表示非新创建的连接 txObject.setConnectionHolder(conHolder, false); return txObject;}
方法中创建了一个 DataSourceTransactionObject
实例,并设置相关属性包含数据库连接等。这里有一个对保存点的设置,是否开启允许保存点取决于是否设置了允许嵌入式事务。
判断当前线程是否存在事务。
/** * 判断当前线程是否存在事务,重写自AbstractPlatformTransactionManager中的方法 * 判断依据为当前线程记录的connectionHolder不为空且connectionHolder中的transactionActive属性不为空,则为存在事务 * @param transaction transaction object returned by doGetTransaction * @return */@Overrideprotected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());}
判断当前线程是否存在事务,判断依据为:当前线程记录的 connectionHolder 不为空且 connectionHolder中的 transactionActive 属性不为空,则为存在事务。从这一点说明,上面一步获取创建 DataSourceTransactionObject
实例设置的 ConnectionHolder
有可能为 null。如果存在事务,则转向嵌套事务的处理。
/** * 已经存在的事务的基础上进行处理 * Create a TransactionStatus for an existing transaction. */private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { // NEVER不支持事务,只有在没有事务的环境中才能运行(运行到这里说明有事务,抛出异常) if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } // NOT_SUPPORTED不支持事务,当前事务已经存在,会挂起存在的事务,直至该方法结束后恢复存在的事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } Object suspendedResources = suspend(transaction); // 挂起 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } // REQUIRES_NEW开启新的事务中运行,出错了全部回滚 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } SuspendedResourcesHolder suspendedResources = suspend(transaction); // 新事务的建立 try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } // NESTED和REQUIRES_NEW有点像,但是如果出错只会回滚新开启的小事务,别的不影响 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } if (useSavepointForNestedTransaction()) { // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); // 如果没有可以使用保存点的方式控制事务回滚,那么在嵌入事务的建立初始新建保存点 status.createAndHoldSavepoint(); return status; } else { // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. // 有些情况是不能使用保存点操作,比如JTA,那么就建立新事务 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger.debug("Participating in existing transaction"); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);}
对于已经存在事务的处理过程中,我们看到了很多熟悉的操作,其实就是实现事务传播行为的含义。但是,也有些不同的地方,函数中对已经存在的事务处理考虑两种情况。
PROPAGATION_REQUIRES_NEW 表示当前方法必须在它自己的事务里运行,一个新的事务将被启动,而如果有一个事务正在运行的话,则在这个方法运行期间被挂起。而Spring中对于此种传播方式的处理与新事务建立最大的不同点在于使用 suspend()
方法将原事务挂起。将信息挂起的目的当然是为了记录原有事务的状态,在当前事务执行完毕后在将原事务还原。
PROPAGATION_NESTED 表示如果当前正有一个事务在运行中,则该方法应该运行在一个嵌套的事务中,被嵌套的事务可以独立于封装事务进行提交或者回滚,如果封装事务不存在,行为就像 PROPAGATION_REQUIRES_NEW。对于嵌入式事务的处理,Spring 中主要考虑了两种方式的处理。Spring中允许嵌入事务的时候,则首选设置保存点的方式作为异常处理的回滚。对于其他方式,比如 JTA 无法使用保存点的方式,那么处理方式与 PROPAGATION_REQUIRES_NEW 相同,而一旦出现异常,则由Spring的事务异常处理机制去完成后续操作。
事务超时设置验证。
TransactionDefinition.TIMEOUT_DEFAULT
的值为 -1,而我们在事务注解中默认配置的也是 -1,当我们在设置超时属性时,设置的值小于 -1,则认为是无效的值,抛出异常。
事务传播行为属性的设置验证。
Spring会依据具体的事务传播行为进行不同逻辑的处理。在第二小步已经对当前线程是否存在事务进行了判断,如果不存在才会执行到第三步往后的操作。这里有个简单的判断,如果我们配置的事务传播行为是 PROPAGATION_MANDATORY,意味着方法必须在事务内运行,然而当前没有事务,所有抛出异常。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'");}
如果传播行为是 REQUIRED、REQUIRES_NEW 或 NESTED,则需要创建新事务执行方法;如果传播行为是 SUPPORTS、NOT_SUPPORTED 和 NEVER,方法都不需要额外创建事务去执行。所以最终分成了两类逻辑进行执行。我们以需要创建新事务逻辑为例进行分析。
构建DefaultTransactionStatus。
/** * Create a TransactionStatus instance for the given arguments. */protected DefaultTransactionStatus newTransactionStatus( TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) { boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive(); return new DefaultTransactionStatus( transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources);}
参数分别是事务属性、DataSourceTransactionManager
实例、是否是开启新事务标记、是否新同步等,利用这些信息创建出 DefaultTransactionStatus
实例。
完善DataSourceTransactionObject。
完善 DataSourceTransactionObject
实例,包括设置 ConnectionHolder
、隔离级别、timeout。如果是新连接,绑定到当前线程。查看 DataSourceTransactionManager
中的 doBegin()
方法。
/** * 构造transaction,包括设置ConnectionHolder、隔离级别、timeout * 如果是新连接,绑定到当前线程 * This implementation sets the isolation level but ignores the timeout. */@Overrideprotected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { // 尝试获取数据库连接,如果当前线程存在就不需要从连接池获取,或者对于事务同步表示设置为true的需要重新获取连接 if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { // 获取新的数据库连接并保存到事务中 Connection newCon = obtainDataSource().getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); // 设置隔离级别(和只读) Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); // Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). // 更改自动提交设置,由Spring控制提交 if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); } // 只读事务 prepareTransactionalConnection(con, definition); // 设置判断当前线程是否存在事务的依据,标识当前连接已经被事务激活 txObject.getConnectionHolder().setTransactionActive(true); // 设置过期时间 int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // 将当前数据库连接绑定到当前线程 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, obtainDataSource()); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); }}
对于一些隔离级别、timeout 等功能的设置并不是由Spring来完成的,而是委托给底层的数据库连接去做。可以说事务是从这个函数开始的,因为在这个函数中已经开始尝试对数据库连接进行获取。当然,在获取数据库连接的同时,一些必要的设置也是需要同步设置的。关于 ConnectionHolder
类型之前在介绍JDBC的时候已经接触过,主要封装了数据库连接。所以,将 ConnectionHolder
实例注入设置给 DataSourceTransactionObject
实例可能需要先从连接池中获取到连接。当然并不是每次都会获取新的连接,如果当前线程中的 connectionHolder
已经存在,则没有必要再次获取,或者,对于事务同步表示设置为 true 的需要重新获取连接。
设置隔离级别以及只读标识是依靠 prepareConnectionForTransaction()
方法完成的。Spring中对只读事务操作做了一些处理,但是核心的实现是设置 connection 上的 readOnly 属性。同样,对于隔离级别的控制也是交由 connection 去控制的。关于只读事务的含义,可以参考对于Spring中事务的readonly属性理解。
/** * Prepare the given Connection with the given transaction semantics. * @param con the Connection to prepare * @param definition the transaction definition to apply * @return the previous isolation level, if any * @throws SQLException if thrown by JDBC methods * @see #resetConnectionAfterTransaction */@Nullablepublic static Integer prepareConnectionForTransaction(Connection con, @Nullable TransactionDefinition definition) throws SQLException { Assert.notNull(con, "No Connection specified"); // 如果事务属性中是只读,那么利用connection设置设置只读 if (definition != null && definition.isReadOnly()) { try { if (logger.isDebugEnabled()) { logger.debug("Setting JDBC Connection [" + con + "] read-only"); } // 设置数据库连接的readOnly为true con.setReadOnly(true); } catch (SQLException | RuntimeException ex) { Throwable exToCheck = ex; while (exToCheck != null) { if (exToCheck.getClass().getSimpleName().contains("Timeout")) { // Assume it's a connection timeout that would otherwise get lost: e.g. from JDBC 4.0 throw ex; } exToCheck = exToCheck.getCause(); } // "read-only not supported" SQLException -> ignore, it's just a hint anyway logger.debug("Could not set JDBC Connection read-only", ex); } } // 根据事务属性配置隔离级别 Integer previousIsolationLevel = null; if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { if (logger.isDebugEnabled()) { logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " + definition.getIsolationLevel()); } int currentIsolation = con.getTransactionIsolation(); if (currentIsolation != definition.getIsolationLevel()) { previousIsolationLevel = currentIsolation; con.setTransactionIsolation(definition.getIsolationLevel()); } } return previousIsolationLevel;}
设置完只读事务与隔离级别后,随后Spring更改了数据库连接的默认提交方式为不自动提交以及过期时间等。Spring还设置标记,当前数据库连接已经被事务激活,用来作为判断当前线程是否存在事务的依据。并且将数据库连接绑定到当前线程。
将事务信息记录在当前线程中。
下面所有的 setter 方法会将信息都将保存到 TransactionSynchronizationManager
的 ThreadLocal
成员变量中,相当于与线程绑定。
/** * 将事务信息记录在当前线程中,主要设置在TransactionSynchronizationManager中 * Initialize transaction synchronization as appropriate. */protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) { if (status.isNewSynchronization()) { TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel( definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null); TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); TransactionSynchronizationManager.initSynchronization(); }}
经过上面的 7 步,最终返回了 TransactionStatus
实例,将留作下一步构建 TransactionInfo
所用。
当已经建立事务连接并完成了事务信息的提取后,我们需要将所有的事务信息统一封装在 TransactionInfo
类型的实例中,这个实例包含了目标方法开始前的所有状态信息,一旦事务执行失败,Spring会通过 TransactionInfo
类型的实例中的信息来进行回滚等后续工作。
/** * 准备事务信息。当已经建立事务连接并完成了事务信息的提取后,我们需要将所有的事务信息统一记录在TransactionInfo类型的实例中 * Prepare a TransactionInfo for the given attribute and status object. * @param txAttr the TransactionAttribute (may be {@code null}) * @param joinpointIdentification the fully qualified method name * (used for monitoring and logging purposes) * @param status the TransactionStatus for the current transaction * @return the prepared TransactionInfo object */protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, String joinpointIdentification, @Nullable TransactionStatus status) { TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { // We need a transaction for this method... if (logger.isTraceEnabled()) { logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]"); } // The transaction manager will flag an error if an incompatible tx already exists. // 记录事务状态 txInfo.newTransactionStatus(status); } else { // The TransactionInfo.hasTransaction() method will return false. We created it only // to preserve the integrity of the ThreadLocal stack maintained in this class. if (logger.isTraceEnabled()) { logger.trace("Don't need to create transaction for [" + joinpointIdentification + "]: This method isn't transactional."); } } // We always bind the TransactionInfo to the thread, even if we didn't create // a new transaction here. This guarantees that the TransactionInfo stack // will be managed correctly even if no transaction was created by this aspect. txInfo.bindToThread(); return txInfo;}
可以看到,TransactionInfo
包含了之前的 PlatformTransactionManager
实例信息、TransactionAttribute
实例信息以及 TransactionStatus
实例信息。
这一部分在AOP中已经分析过,主要是进行事务拦截器调用完就执行目标本身方法。
已经完成了目标方法运行前的事务准备工作并且执行了目标方法,那么,当出现错误的时候,Spring是怎么对数据进行恢复的呢?
从执行目标方法的逻辑可以看出,一旦出现 Throwable
就会被引导至 completeTransactionAfterThrowing()
方法处理。
try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. // 5. 执行被增强方法 retVal = invocation.proceedWithInvocation();}catch (Throwable ex) { // target invocation exception // 6. 异常回滚 completeTransactionAfterThrowing(txInfo, ex); throw ex;}
进入 completeTransactionAfterThrowing()
方法一探究竟。处理异常不代表所有的 Throwable
都会被回滚处理,比如我们最常用的 Exception
默认是不会被处理的。默认情况下,即使出现异常,数据也会被正常提交,而是否回滚,关键就是在 txInfo.transactionAttribute.rollbackOn(ex)
这个函数。
/** * Spring对于异常的处理 * Handle a throwable, completing the transaction. * We may commit or roll back, depending on the configuration. * @param txInfo information about the current transaction * @param ex throwable encountered */protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) { // 当抛出异常时首先判断当前是否存在事务,这是基础依据 if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex); } // 这里判断是否回滚默认的依据是抛出的异常是否是RuntimeException或者是Error的类型 if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { try { // 根据TransactionStatus信息进行回滚处理 txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by rollback exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by rollback exception", ex); throw ex2; } } else { // 如果不满足回滚条件即使抛出异常也同样会提交 // We don't roll back on this exception. // Will still roll back if TransactionStatus.isRollbackOnly() is true. try { txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by commit exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by commit exception", ex); throw ex2; } } }}
@Overridepublic boolean rollbackOn(Throwable ex) { // 根据配置的回滚规则 return this.targetAttribute.rollbackOn(ex);}
调用 RuleBasedTransactionAttribute
事务属性的 rollbackOn()
方法,可以看到对事务注解中配置的回滚规则进行遍历,如果符合规则,则返回 true,即需要回滚。
/** * Winning rule is the shallowest rule (that is, the closest in the * inheritance hierarchy to the exception). If no rule applies (-1), * return false. * @see TransactionAttribute#rollbackOn(java.lang.Throwable) */@Overridepublic boolean rollbackOn(Throwable ex) { if (logger.isTraceEnabled()) { logger.trace("Applying rules to determine whether transaction should rollback on " + ex); } RollbackRuleAttribute winner = null; int deepest = Integer.MAX_VALUE; // 对事务注解中配置的回滚规则进行遍历,如果符合规则,则返回true if (this.rollbackRules != null) { for (RollbackRuleAttribute rule : this.rollbackRules) { int depth = rule.getDepth(ex); if (depth >= 0 && depth < deepest) { deepest = depth; winner = rule; } } } if (logger.isTraceEnabled()) { logger.trace("Winning rollback rule is: " + winner); } // User superclass behavior (rollback on unchecked) if no rule matches. if (winner == null) { logger.trace("No relevant rollback rule found: applying default rules"); // 执行父类的默认回滚规则 return super.rollbackOn(ex); } return !(winner instanceof NoRollbackRuleAttribute);}
可以看到,如果我们没有在Spring中配置回滚规则,默认情况下Spring中的事务异常处理机制只对 RuntimeException
和 Error
两种情况会进行回滚处理。
/** * Spring默认仅对于发生RuntimeException或Error的事务进行回滚 * The default behavior is as with EJB: rollback on unchecked exception * ({@link RuntimeException}), assuming an unexpected outcome outside of any * business rules. Additionally, we also attempt to rollback on {@link Error} which * is clearly an unexpected outcome as well. By contrast, a checked exception is * considered a business exception and therefore a regular expected outcome of the * transactional business method, i.e. a kind of alternative return value which * still allows for regular completion of resource operations. * <p>This is largely consistent with TransactionTemplate's default behavior, * except that TransactionTemplate also rolls back on undeclared checked exceptions * (a corner case). For declarative transactions, we expect checked exceptions to be * intentionally declared as business exceptions, leading to a commit by default. * @see org.springframework.transaction.support.TransactionTemplate#execute */@Overridepublic boolean rollbackOn(Throwable ex) { return (ex instanceof RuntimeException || ex instanceof Error);}
一旦符合回滚条件,Spring就会将程序引导至回滚处理函数中。
// 根据TransactionStatus信息进行回滚处理txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
回滚操作交给了 DataSourceTransactionManager
的 rollback()
方法,参数为之前保存着事务状态的 TransactionStatus
实例。
/** * 事务的回滚操作 * This implementation of rollback handles participating in existing * transactions. Delegates to {@code doRollback} and * {@code doSetRollbackOnly}. * @see #doRollback * @see #doSetRollbackOnly */@Overridepublic final void rollback(TransactionStatus status) throws TransactionException { // 如果事务已经完成,那么回滚会抛出异常 if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; processRollback(defStatus, false);}/** * 真正处理回滚操作的函数 * Process an actual rollback. * The completed flag has already been checked. * @param status object representing the transaction * @throws TransactionException in case of rollback failure */private void processRollback(DefaultTransactionStatus status, boolean unexpected) { try { boolean unexpectedRollback = unexpected; try { // 自定义触发器调用,激活所有TransactionSynchronization中对应的方法 triggerBeforeCompletion(status); if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint"); } // 如果有保存点,也就是当前事务为单独的线程则会退回到保存点。常用于嵌入式事务回滚,不会引起外部事务回滚。 status.rollbackToHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback"); } // 如果当前事务为独立的新事务,则直接退回。常用于单独事务的处理,对于没有保存点的回滚。 doRollback(status); } else { // Participating in larger transaction if (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } // 如果当前事务不是独立的事务,那么只能标记状态,等到事务链执行完毕后统一回滚。多数用于JTA,等到提交的时候不提交。 doSetRollbackOnly(status); } else { if (status.isDebug()) { logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); } } } else { logger.debug("Should roll back transaction but cannot - no transaction available"); } // Unexpected rollback only matters here if we're asked to fail early if (!isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = false; } } } catch (RuntimeException | Error ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } // 激活所有TransactionSynchronization中对应的方法 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); // Raise UnexpectedRollbackException if we had a global rollback-only marker if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } } finally { // 清空记录的资源并将挂起的资源恢复 cleanupAfterCompletion(status); }}
同样,对于在Spring中的复杂的逻辑处理过程,在入口函数一般都会给出个整体的处理脉络,而把实现细节委托给其他函数去执行。Spring对于回滚处理大致流程如下。
首先是自定义触发器的调用,包括在回滚前、完成回滚后的调用。当然完成回滚包括正常回滚与回滚过程中出现异常,自定义的触发器会根据这些信息作进一步处理。
// 自定义触发器调用,激活所有TransactionSynchronization中对应的方法triggerBeforeCompletion(status);triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);// 激活所有TransactionSynchronization中对应的方法triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
而对于触发器的注册,常见是在回调过程中通过 TransactionSynchronizationManager
类中的静态方法直接注册。
/** * synchronizations也是一个ThreadLocal,其中保存的是Set集合,Set集合存放TransactionSynchronization, * 例如ConnectionSynchronization,ConnectionSynchronization包装了ConnectionHolder和DataSource信息 * Register a new transaction synchronization for the current thread. * Typically called by resource management code. * <p>Note that synchronizations can implement the * {@link org.springframework.core.Ordered} interface. * They will be executed in an order according to their order value (if any). * @param synchronization the synchronization object to register * @throws IllegalStateException if transaction synchronization is not active * @see org.springframework.core.Ordered */public static void registerSynchronization(TransactionSynchronization synchronization) throws IllegalStateException { Assert.notNull(synchronization, "TransactionSynchronization must not be null"); if (!isSynchronizationActive()) { throw new IllegalStateException("Transaction synchronization is not active"); } synchronizations.get().add(synchronization);}
然后,进行真正的回滚逻辑处理。 当之前已经保存的事务信息中有保存点信息的时候,使用保存点信息进行回滚。常用于嵌入式事务,对于嵌入式的事务的处理,内嵌的事务异常并不会引起外部事务的回滚。根据保存点回滚的实现方式其实是根据底层的数据库连接进行的。
/** * 回滚事务到保存点 * Roll back to the savepoint that is held for the transaction * and release the savepoint right afterwards. */public void rollbackToHeldSavepoint() throws TransactionException { Object savepoint = getSavepoint(); if (savepoint == null) { throw new TransactionUsageException( "Cannot roll back to savepoint - no savepoint associated with current transaction"); } getSavepointManager().rollbackToSavepoint(savepoint); getSavepointManager().releaseSavepoint(savepoint); setSavepoint(null);}
这里使用的是JDBC的方式进行数据库连接,那么 getSavepointManager()
函数返回的是 JdbcTransactionObjectSupport
,也就是说上面函数会调用 JdbcTransactionObjectSupport
中的 rollbackToSavepoint()
方法。可以看到是基于数据库连接的 rollback
方法进行回滚的。
/** * This implementation rolls back to the given JDBC 3.0 Savepoint. * @see java.sql.Connection#rollback(java.sql.Savepoint) */@Overridepublic void rollbackToSavepoint(Object savepoint) throws TransactionException { ConnectionHolder conHolder = getConnectionHolderForSavepoint(); try { conHolder.getConnection().rollback((Savepoint) savepoint); conHolder.resetRollbackOnly(); } catch (Throwable ex) { throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex); }}
如果没有保存点,之前已经保存的事务信息中的事务为新事物,那么直接回滚。常用于单独事务的处理。对于没有保存点的回滚,Spring同样是使用底层数据库连接提供的API来操作的。由于我们使用的是 DataSourceTransactionManager
,那么 doRollback()
函数会使用此类中的实现。
/** * 如果当前事务为独立的新事务,则直接退回。常用于单独事务的处理,对于没有保存点的回滚。 * 利用数据库连接connection.rollback() API进行实现 * @param status the status representation of the transaction */@Overrideprotected void doRollback(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Rolling back JDBC transaction on Connection [" + con + "]"); } try { con.rollback(); } catch (SQLException ex) { throw new TransactionSystemException("Could not roll back JDBC transaction", ex); }}
当前事务信息中表明是存在事务的,又不属于以上两种情况,多数用于JTA,只做回滚标识,等到提交的时候统一不提交。
@Overrideprotected void doSetRollbackOnly(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); if (status.isDebug()) { logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() + "] rollback-only"); } txObject.setRollbackOnly();}
最后,对于回滚逻辑执行结束后,无论回滚是否成功,都必须要做的事情就是事务结束后的收尾工作。 包括设置事务已经完成,避免重复调用;如果当前事务是新的同步状态,需要将绑定到当前线程的事务信息清除;如果是新事务,需要做些清除数据库连接等等资源的工作;如果在事务执行前有事务挂起,那么当前事务执行结束后需要将挂起事务恢复。
/** * 回滚后的信息清除 * Clean up after completion, clearing synchronization if necessary, * and invoking doCleanupAfterCompletion. * @param status object representing the transaction * @see #doCleanupAfterCompletion */private void cleanupAfterCompletion(DefaultTransactionStatus status) { // 设置完成状态,对事务信息作完成标识以避免重复调用 status.setCompleted(); // 如果当前事务是新的同步状态,需要将绑定到当前线程的事务信息清除 if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } // 如果是新事务,需要做些清除资源的工作 if (status.isNewTransaction()) { doCleanupAfterCompletion(status.getTransaction()); } if (status.getSuspendedResources() != null) { if (status.isDebug()) { logger.debug("Resuming suspended transaction after completion of inner transaction"); } Object transaction = (status.hasTransaction() ? status.getTransaction() : null); // 结束之前事务的挂起状态 resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources()); }}
如果事务的执行并没有出现任何的异常,也就意味着事务可以走正常事务提交的流程了。
/** * 提交事务 * Execute after successful completion of call, but not after an exception was handled. * Do nothing if we didn't create a transaction. * @param txInfo information about the current transaction */protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) { if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); } txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); }}
在真正的数据提交之前,还需要做个判断。之前在我们分析事务异常处理规则的时候,当某个事务既没有保存点又不是新事务,Spring对它的处理方式只是设置一个回滚标识。这个回滚标识在这里就会派上用场了,主要的应用场景如下。
某个事务是另一个事务的嵌入事务,但是,这些事务又不在Spring的管理范围内,或者无法设置保存点,那么 Spring 会通过设置回滚标识的方式来禁止提交。首先当某个嵌入事务发生回滚的时候会设置回滚标识,而等到外部事务提交时,一旦判断出当前事务流被设置了回滚标识,则由外部事务来统一进行整体事务的回滚。
所以,当事务没有被异常捕获的时候也并不意味着一定会执行提交的过程。
/** * 提交事务(不一定真的提交,如果有回滚标识则整体回滚) * This implementation of commit handles participating in existing * transactions and programmatic rollback requests. * Delegates to {@code isRollbackOnly}, {@code doCommit} * and {@code rollback}. * @see org.springframework.transaction.TransactionStatus#isRollbackOnly() * @see #doCommit * @see #rollback */@Overridepublic final void commit(TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; // 如果事务链中已经被标记回滚,那么不会尝试提交事务,直接回滚 if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus, false); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus, true); return; } // 处理事务提交 processCommit(defStatus);}
而当事务执行一切都正常的时候,便可以真正地进入提交流程了。
/** * 真正的提交事务 * Process an actual commit. * Rollback-only flags have already been checked and applied. * @param status object representing the transaction * @throws TransactionException in case of commit failure */private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { boolean unexpectedRollback = false; // 预留 prepareForCommit(status); // 添加的TransactionSynchronization中的对应方法的调用 triggerBeforeCommit(status); // 添加的TransactionSynchronization中的对应方法的调用 triggerBeforeCompletion(status); beforeCompletionInvoked = true; // 如果存在保存点则清除保存点信息,并且不会提交 if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } unexpectedRollback = status.isGlobalRollbackOnly(); // 如果是独立的事务则直接提交 doCommit(status); } else if (isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = status.isGlobalRollbackOnly(); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException | Error ex) { if (!beforeCompletionInvoked) { // 添加的TransactionSynchronization中的对应方法的调用 triggerBeforeCompletion(status); } // 提交过程中出现异常则回滚 doRollbackOnCommitException(status, ex); throw ex; } // Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { // 添加的TransactionSynchronization中的对应方法的调用 triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); }}
在提交过程中也并不是直接提交的,而是考虑了诸多的方面,符合提交的条件如下:
此条件主要考虑内嵌事务的情况。对于内嵌事务,在Spring中正常的处理方式是将内嵌事务开始之前设置保存点,一旦内嵌事务出现异常便根据保存点信息进行回滚,但是如果没有出现异常,内嵌事务并不会单独提交,而是根据事务流由最外层事务负责提交,所以如果当前存在保存点信息便不是最外层事务,不做保存操作,对于是否是新事务的判断也是基于此考虑。
如果程序流通过了事务的层层把关,最后顺利地进入了提交流程,那么同样,Spring会将事务提交的操作引导至底层数据库连接的API,进行事务提交。
/** * 从DefaultTransactionStatus中获取数据库连接并提交事务 * @param status the status representation of the transaction */@Overrideprotected void doCommit(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]"); } try { con.commit(); } catch (SQLException ex) { throw new TransactionSystemException("Could not commit JDBC transaction", ex); }}
Spring中的声明式依靠AOP完成了很多套路代码的编写,让我们可以着重考虑业务的实现。Spring提供了很多事务的传播行为,对应源码的实现也分别实现了对应的逻辑。总体来说,Spring的声明式事务给我们带来了极大的便利。
Spring数据库方面的源码告一段落,下面就准备研究Spring MVC部分。
联系客服