Spring源码(10)-事务


一、事务管理

1、核心接口

image-20230421171413788

Spring事务管理高层抽象主要有3个:

  • PlatformTransactionManager:事务管理器(用来管理事务,包含事务的提交,回滚)

  • TransactionDefinition:事务定义信息(隔离,传播,超时,只读)

  • TransactionStatus:事务具体运行状态

(1)PlatformTransactionManager

是Spring的事务管理器核心接口。

Spring本身并不支持事务实现,只是负责包装底层事务,应用底层支持什么样的事务策略,Spring就支持什么样的事务策略。

里面提供了常用的操作事务的方法:

  • TransactionStatus getTransaction(TransactionDefinition definition):获取事务状态信息

  • void commit(TransactionStatus status):提交事务

  • void rollback(TransactionStatus status):回滚事务

Public interface PlatformTransactionManager extends TransactionManager{

    // 由TransactionDefinition得到TransactionStatus对象
    TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;

    // 提交
    Void commit(TransactionStatus status) throws TransactionException; 

    // 回滚
    Void rollback(TransactionStatus status) throws TransactionException; 
}

(2)TransactionDefinition

该接口定义了一些基本事务属性

public interface TransactionDefinition {

    // 返回事务的传播行为
    int getPropagationBehavior(); 

    // 返回事务的隔离级别
    int getIsolationLevel(); 

    // 超时时间
    int getTimeout();  

    // 事务是否只读,事务管理器能够根据这个返回值进行优化,确保事务是只读的
    boolean isReadOnly(); 
} 

(3)TransactionStatus

该接口定义了事务具体的运行状态

public interface TransactionStatus 
    extends TransactionExecution, SavepointManager, Flushable {

    /**
    * 是否有恢复点
    */
    boolean hasSavepoint();

    @Override
    void flush();

}
public interface TransactionExecution {

    /**
     * Return whether the present transaction is new; otherwise participating
     * in an existing transaction, or potentially not running in an actual
     * transaction in the first place.
     */
    boolean isNewTransaction();

    /**
     * Set the transaction rollback-only. This instructs the transaction manager
     * that the only possible outcome of the transaction may be a rollback, as
     * alternative to throwing an exception which would in turn trigger a rollback.
     */
    void setRollbackOnly();

    /**
     * Return whether the transaction has been marked as rollback-only
     * (either by the application or by the transaction infrastructure).
     */
    boolean isRollbackOnly();

    /**
     * Return whether this transaction is completed, that is,
     * whether it has already been committed or rolled back.
     */
    boolean isCompleted();

}
public interface TransactionStatus{

    boolean hasSavepoint(); // 

    boolean isNewTransaction(); // 是否是新的事物

    void setRollbackOnly();  // 设置为只回滚
    boolean isRollbackOnly(); // 是否为只回滚
    boolean isCompleted; // 是否已完成
} 

2、TransactionSynchronizationManager

(1)官方介绍

管理每个线程的资源和事务同步的中心委托。由资源管理代码使用,而不是由典型的应用程序代码使用。

支持每个键一个资源,不覆盖,即在为同一键设置新资源之前需要删除一个资源。如果同步是活动的,则支持事务同步列表。

资源管理代码应该通过getResource检查线程绑定的资源,例如JDBC连接或Hibernate会话。这样的代码通常不应该将资源绑定到线程,因为这是事务管理器的责任。另一个选项是,如果事务同步是活动的,则在第一次使用时惰性绑定,用于执行跨越任意数量资源的事务。

事务同步必须由事务管理器通过initSynchronization()和clearSynchronization()激活和取消激活。AbstractPlatformTransactionManager自动支持此功能,因此所有标准的Spring事务管理器都支持此功能,例如org.springframework.transaction.jta. jtattransactionmanager和org.springframework.jdbc.datasource.DataSourceTransactionManager。

资源管理代码应该只在这个管理器处于活动状态时注册同步,这可以通过isSynchronizationActive来检查;它应该立即执行资源清理。如果事务同步不是活动的,则要么没有当前事务,要么事务管理器不支持事务同步。

例如,同步用于在JTA事务中始终返回相同的资源,例如,对于任何给定的数据源或SessionFactory,分别使用JDBC连接或Hibernate会话。

(2)源码

Spring 提供了事务同步管理器让我们能监听Spring的事务操作。

不同平台的事务管理实现方式不同,DB 连接资源对象也可能不同,所以,通过 TransactionSynchronizationManager 将事务资源相关的信息保存在各个 ThreadLocal 对象中,可以隔离不同平台带来的差异。

public abstract class TransactionSynchronizationManager {

    // 保存DB连接资源。
    // 使用 Object 来保存是因为每种平台的DB连接资源对象可能不一样,
    // 比如:JDBC,Hibernate,EJB 等使用的 DB 连接对象是不一样的。 
    private static final ThreadLocal<Map<Object, Object>> resources =
            new NamedThreadLocal<>("Transactional resources");

    // 事务同步回调。每个线程可以注册多个事务同步回调
    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
            new NamedThreadLocal<>("Transaction synchronizations");

    // 当前事务的名称  
    private static final ThreadLocal<String> currentTransactionName =
            new NamedThreadLocal<>("Current transaction name");

    // 当前事务是否只读
    private static final ThreadLocal<Boolean> currentTransactionReadOnly =
            new NamedThreadLocal<>("Current transaction read-only status");

    // 当前事务的隔离级别
    private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
            new NamedThreadLocal<>("Current transaction isolation level");

    // 事务是否开启
    private static final ThreadLocal<Boolean> actualTransactionActive =
            new NamedThreadLocal<>("Actual transaction active");

    //...
}

(3)API

public abstract class TransactionSynchronizationManager {
    // ...

    //-------------------------------------------------------------------------
    // 管理事务连接资源
    //-------------------------------------------------------------------------

    // 获取当前线程所有绑定的连接
    public static Map<Object, Object> getResourceMap(){}

    // 判断当前线程是否绑定指定类型的连接,key一般为资源工厂
    public static boolean hasResource(Object key){}

    // 获取当前线程绑定的连接中的指定类型,key一般为资源工厂
    public static Object getResource(Object key){}

    // 绑定连接,key一般为资源工厂
    public static void bindResource(Object key, Object value){}

    // 解绑连接,key一般为资源工厂
    public static Object unbindResource(Object key){}

    //-------------------------------------------------------------------------
    // 管理事务同步器
    //-------------------------------------------------------------------------

    // 如果当前线程的事务是活动的,则返回true。可以在注册之前调用,以避免不必要的实例创建。
    public static boolean isSynchronizationActive(){}

    // 激活当前线程的事务同步。由事务管理器在事务开始时调用。
    public static void initSynchronization(){}

    // 为当前线程注册一个新的事务同步。通常由资源管理代码调用。
    public static void registerSynchronization(TransactionSynchronization synchronization){}

    // 返回当前线程所有已注册同步的不可修改快照列表。
    public static List<TransactionSynchronization> getSynchronizations(){}

    // 停用当前线程的事务同步。由事务管理器在事务清理时调用。
    public static void clearSynchronization(){}

    //-------------------------------------------------------------------------
    // 操作当前的事务名称
    //-------------------------------------------------------------------------

    // 设置当前的事务名称
    public static void setCurrentTransactionName(@Nullable String name){}

    // 获取当前的事务名称
    public static String getCurrentTransactionName(){}

    // 为当前事务公开一个只读标志。由事务管理器在事务开始和清理时调用。
    public static void setCurrentTransactionReadOnly(boolean readOnly){}

    // 当前事务是否只读
    public static boolean isCurrentTransactionReadOnly(){}

    // 公开当前事务的隔离级别。由事务管理器在事务开始和清理时调用。
    public static void setCurrentTransactionIsolationLevel(@Nullable Integer isolationLevel){}

    // 返回当前事务的隔离级别(如果有)。
    // 在准备新创建的资源(例如JDBC Connection)时由资源管理代码调用。
    public static Integer getCurrentTransactionIsolationLevel(){}

    // 公开当前是否有实际的活动事务。由事务管理器在事务开始和清理时调用。
    public static void setActualTransactionActive(boolean active){}

    //返回当前是否有实际的活动事务。这表明当前线程是否与实际事务相关联,而不仅仅是与活动事务同步相关联。
    //由资源管理代码调用,以区分活动事务同步(有或没有后台资源事务;
    //也在PROPAGATION_SUPPORTS上),并且实际事务处于活动状态(具有支持资源事务;PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW等)。
    public static boolean isActualTransactionActive(){}

    // 清除当前线程的整个事务同步状态:已注册的同步以及各种事务特征。
    public static void clear(){}
}

(4)案例

//当前事务提交后方可进行异步任务,防止异步任务先于未提交的事务执行
private void callBack(Invoice invoice){
    // 判断当前是否存在事务
    boolean synchronizationActive = 
        TransactionSynchronizationManager.isSynchronizationActive();

    if (synchronizationActive) {
        // 当前存在事务,在事务提交后执行
        TransactionSynchronizationManager.registerSynchronization(
            new TransactionSynchronizationAdapter() {
                // 监听事务提交完成
                @Override
                public void afterCommit() { 
                    doCall(invoice);
                }

                // 事务完成后执行(不会区分他是提交还是回滚操作)
                public void afterCompletion(int status) {

                }
            }
        );
    } else {
        // 当前不存在事务,直接执行
        doCall(invoice);
    }
}

3、TransactionAspectSupport

事务性切面的基类,例如 TransactionInterceptorAspectJ 切面。

这使得底层Spring事务基础设施可以很容易地用于实现任何方面系统的方面。

子类负责以正确的顺序调用该类中的方法。

如果 TransactionAttribute 中没有指定事务名,那么公开的名称将是完全限定类名+“.”+方法名(默认)。

使用策略设计模式。PlatformTransactionManager或ReactiveTransactionManager实现将执行实际的事务管理,而TransactionAttributeSource(例如基于注释的)用于确定特定类或方法的事务定义。

4、TransactionInterceptor

该类是 AOP 结合 MethodInterceptor,用于声明性事务管理,使用通用的Spring事务基础设施(PlatformTransactionManager / ReactiveTransactionManager)。

派生自 TransactionAspectSupport 类,该类包含了与Spring底层事务API的集成。TransactionInterceptor 只是按照正确的顺序调用相关的超类方法,比如 invokeWithinTransaction

TransactionInterceptors 是线程安全的。

spring tx的入口就是在TxAdviceBeanDefinitionParser这里将解析tx的配置,生成TransactionInterceptor对象,这个也就是一个普通的切面类,只要符合AOP规则的调用都会进入此切面。

二、事务执行流程

Spring事务,其实现原理源自于自定义标签,而开启自定义标签之后会注册三个关于Advisor的Bean,和一个BeanPostProcessor ,当Spring中有BeanPostProcessor 时,会在每个Bean实例化与依赖注入之后执行BeanPostProcessorpostProcessAfterInitialization方法,在这个方法中,会对Bean进行验证是否需要进行代理,如果需要则将上面的Advisor与此Bean一起去交给动态代理工厂做一个代理,返回代理类给IOC容器,如果不需要进行代理直接返回原Bean,达到了事务的效果。Advisor中的advice是事务功能实现的关键类,也就是自定义标签注册的Bean中叫做TransactionInterceptor的这个类。

image-20230427171046893

spring tx的入口就是在TxAdviceBeanDefinitionParser这里将解析tx的配置,生成TransactionInterceptor对象,这个也就是一个普通的切面类,只要符合AOP规则的调用都会进入此切面。

spring 会把 Adivsor 中的 Advice 转换成拦截器链,然后去调用。spring事务会创建一个BeanFactoryTransactionAttributeSourceAdvisor,并把 TransactionInterceptor 注入进去,而 TransactionInterceptor 实现了 Advice 接口。

实现MethodInterceptor 接口,在调用目标对象的方法时,就可以实现在调用方法之前、调用方法过程中、调用方法之后对其进行控制。

MethodInterceptor 接口可以实现MethodBeforeAdvice接口、AfterReturningAdvice接口、ThrowsAdvice接口这三个接口能够所能够实现的功能,但是应该谨慎使用MethodInterceptor 接口,很可能因为一时的疏忽忘记最重要的MethodInvocation而造成对目标对象方法调用失效,或者不能达到预期的设想。

image-20230426173158867

由类图看出,TransactionInterceptor实现了MethodInterceptor接口,那么逻辑处理就会放在invoke方法中。

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
    @Override
    //实现了MethodInterceptor的invoke方法
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        //获取目标类
     Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
     //父类TransactionAspectSupport的模板方法
        return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
            @Override
       //InvocationCallback接口的回调方法
            public Object proceedWithInvocation() throws Throwable {
          //执行目标方法
                return invocation.proceed();
            }
        });
    }
}

img

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {

   //protected修饰,不允许其他包和无关类调用
    protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
        // 获取对应事务属性.如果事务属性为空(则目标方法不存在事务)
        final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
     // 根据事务的属性获取beanFactory中的PlatformTransactionManager(spring事务管理器的顶级接口),一般这里或者的是DataSourceTransactiuonManager
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
     // 目标方法唯一标识(类.方法,如service.UserServiceImpl.save)
        final String joinpointIdentification = methodIdentification(method, targetClass);
     //如果txAttr为空或者tm 属于非CallbackPreferringPlatformTransactionManager,执行目标增强
        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            //看是否有必要创建一个事务,根据事务传播行为,做出相应的判断
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            Object retVal = null;
            try {
          //回调方法执行,执行目标方法(原有的业务逻辑)
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // 异常回滚
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
          //清除信息
                cleanupTransactionInfo(txInfo);
            }
        //提交事务
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }
     //编程式事务处理(CallbackPreferringPlatformTransactionManager) 不做重点分析
        else {
            try {
                Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
                        new TransactionCallback<Object>() {
                            @Override
                            public Object doInTransaction(TransactionStatus status) {
                                TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                                try {
                                    return invocation.proceedWithInvocation();
                                }
                                catch (Throwable ex) {
                                    if (txAttr.rollbackOn(ex)) {
                                        // A RuntimeException: will lead to a rollback.
                                        if (ex instanceof RuntimeException) {
                                            throw (RuntimeException) ex;
                                        }
                                        else {
                                            throw new ThrowableHolderException(ex);
                                        }
                                    }
                                    else {
                                        // A normal return value: will lead to a commit.
                                        return new ThrowableHolder(ex);
                                    }
                                }
                                finally {
                                    cleanupTransactionInfo(txInfo);
                                }
                            }
                        });

                // Check result: It might indicate a Throwable to rethrow.
                if (result instanceof ThrowableHolder) {
                    throw ((ThrowableHolder) result).getThrowable();
                }
                else {
                    return result;
                }
            }
            catch (ThrowableHolderException ex) {
                throw ex.getCause();
            }
        }
    }
}

https://www.cnblogs.com/chihirotan/p/6739748.html

https://www.jianshu.com/p/c56bdc7f1d1d

https://blog.csdn.net/qq_39002724/article/details/113060584

https://blog.csdn.net/D812359/article/details/97675279

https://blog.csdn.net/D812359/article/details/97675279


  目录