13-Seata分布式事务管理


一、基本概念

Seata 是一款开源的分布式事务解决方案,是一个一加三组件模型(详见术语表),致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

一次业务操作需要垮多个数据源或需要垮多个系统进行远程调用,就会产生分布式事务问题.

官方中文文档:http://seata.io/zh-cn/docs/overview/what-is-seata.html

1、术语表

Transaction ID XID:全局唯一的事务ID

三组件:

TC (Transaction Coordinator) - 事务协调者

维护全局和分支事务的状态,协调全局事务提交或回滚。

TM (Transaction Manager) - 事务管理器

定义全局事务的范围:开始全局事务、提交或回滚全局事务。

RM (Resource Manager) - 资源管理器

管理分支事务处理的资源(数据库),与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。简单来说 RM 就是数据库

一个典型的分布式事务的处理过程

1、TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成全局唯一 XID

2、XID 在微服务调用链路的上下文中传播

3、RM 向 TC 发起针对 XID 的全局提交或回滚决议

4、TC 调度 XID 下管辖的全部分支完成提交或回滚请求

image-20220620144958068

在上图中可以知道,TM和RM其实是对业务方法的代理和管理,而TC是脱离业务之外的一个服务,由它去协调TM和RM,协调全局事务和分支事务。

在分布式事务中,会有一个入口方法去调用各个微服务,每一个微服务都有一个分支事务,因此调用了多少个微服务,全局事务就有多少个分支事务,TM代理这个入口方法,因此就定义了全局事务的范围。

当入口方法被执行时,TM会先拦截这个方法的执行,会先想TC发送一个请求,注册这个全局事务,然后既可以开始执行这个入口的业务逻辑了,开始调用每一个微服务。到了微服务里面,每个分支事务就开始执行,这个时候RM就会代理分支业务,在分支业务执行之前向TC注册分支事务,然后开始执行分支业务。在执行完成后,有RM向TC报告分支事务的状态。

因此,TC就知道了所有的分支事务的状态,然后等到全部分支业务执行完成,TM向TC发送全局事务状态的时候,TC就会检查分支事务的状态,如果都成功,就让各个分支事务都去提交,如果失败就让它们都回滚。

二、事务模式

分布式事务模式 介绍
AT 模式 无侵入的分布式事务解决方案,适用于不希望对业务进行改造的场景,几乎0学习成本(sql都由框架托管统一执行,会存在脏写问题) 最终一致性分阶段事务模式,无业务侵入,默认模式
TCC 模式 高性能分布式事务解决方案,适用于核心系统等对性能有很高要求的场景(第一阶段会产生行锁,事务执行太久会锁行很久 最终一致性分阶段事务模式,有业务侵入
Saga 模式 长事务解决方案,适用于业务流程长且需要保证事务最终一致性的业务系统(第一阶段就操作DB,会存在脏读问题) 长事务模式,有业务侵入
XA模式 分布式强一致性的解决方案,但性能低而使用较少。无业务侵入 强一致性分阶段事务模式,无业务侵入
Saga和TCC模式区别不大,TCC就是多了个锁行的步骤(避免了脏读,但事务执行太久会导致锁行很久,不适用于长事务)

1、XA 模式

XA规范是分布式事务处理标准,它描述了全局的TM和局部的RM之间的接口,几乎所有的主流的数据库都对XA规范提供了支持;

(1)执行原理

XA将分布式事务分为两个阶段,一个是准备阶段,一个是执行阶段。

准备阶段: 事务协调者会向事务参与者RM发送一个请求,这里的RM其实是由数据库实现的,所以可以认为RM就是数据库。让数据库去执行事务,但执行完不要提交,而是把结果告知事务协调者。

执行阶段: 事务协调者根据结果,通知RM回滚或者提交事务。

image-20220620173926214

简单来说就是一阶段执行事务,暂时挂起,不提交,通知 TM ,等到该全局事务所有的子事务全部完成,才会同一进行回滚或提交

(2)优缺点

优点:

这是一种强一致性的解决方案,因为每一个微服务都是基于各自的事务的,各自的事务是满足ACID的,而且等到大家都执行完了且都成功了才提交,所以全局事务是满足ACID的。

实现比较简单,因为很多数据库都实现了这种模式,使用Seata的XA模式只需要简单的封装上TM。

缺点:

第一阶段不提交,等到第二阶段再提交,但是等的过程中要占用数据库锁,如果一个分布式事务中跨越了很多个分支事务,则可能造成很多资源的浪费,使得别的请求无法访问,降低了可用性;

依赖于数据库,对于如果有的数据库没有实现这种模式,则无法使用这个模式来实现分布式事务。

(3)实现

application.yml 配置文件配置

seata:
  data-source-proxy-mode: XA

在全局事务的入口方法上加 @GlobalTransactional

2、TCC 模式

(1)原理

TCC 模式与 AT 模式非常相似,每个阶段都是独立的事务,不同的是TCC通过人工编码来实现数据恢复。

TCC是 Try-Confirm-Cancel 的简称。TCC要求每个分支事务实现三个操作:预处理Try、确认 Confirm、撤销Cancel。

  • Try:操作做业务检查及资源预留,
  • Confirm:做业务确认操作,要求 try 成功 confirm 一定能成功
  • Cancel:实现一个与Try相反的操作即回滚操作,用于释放预留资源

TM首先发起所有的分支事务的try操作,任何一个分支事务的try操作执行失败,TM将会发起所有分支事务的Cancel操作,若try操作全部成功,TM将会发起所有分支事务的Confirm操作,其中Confirm/Cancel 操作若执行失败,TM会进行重试。

image-20220620175717527

根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction ModeTCC (Branch) Transaction Mode.

AT 模式(参考链接 TBD)基于 支持本地 ACID 事务关系型数据库

  • 一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
  • 二阶段 commit 行为:马上成功结束,自动 异步批量清理回滚日志。
  • 二阶段 rollback 行为:通过回滚日志,自动 生成补偿操作,完成数据回滚。

相应的,TCC 模式,不依赖于底层数据资源的事务支持:

  • 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
  • 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
  • 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。

所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。

image-20220620202716963

所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中

所以,使用TCC模式时,需要自定义编码,这时seata将对代码产生侵入性,另外不是所有业务都适合TCC模式,一般是需要进行扣减或者新增数量时使用。

(2)优缺点

优点:

  • 一阶段完成直接提交事务,释放数据库资源,性能好
  • 相比 AT 模型,无需生成快照,无需使用全局锁,性能最强
  • 不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库

缺点:

  • 有代码侵入,需要人为编写 try、Confirm、Cancel 接口,太麻烦了
  • 软状态、事务是最终一致性
  • 需要考虑做好 Confirm、Cancel 失败的情况,做好幂等处理

(3)实现

基本流程为,先执行扣减操作,然后记录一个日志(表结构如下,可自定义),该日志以事物ID为区分,如果整体事物成功,则执行confirm,如果失败则执行cancel。

这时还需要注意的问题是,可能这边还没有执行业务,但是seata认定整体失败,要开始调动cancel了,这时要处理空回滚幂等性问题。

也可能调用了cancel,此时堵塞的程序又开始执行正常业务逻辑,这时需要注意不能重复执行

DROP TABLE IF EXISTS `account_freeze_tbl`;
CREATE TABLE `account_freeze_tbl`  (
  `xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `user_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `freeze_money` int(11) UNSIGNED NULL DEFAULT 0,
  `state` int(1) NULL DEFAULT NULL COMMENT '事务状态,0:try,1:confirm,2:cancel',
  PRIMARY KEY (`xid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = COMPACT;

接口

  • 定义两阶段提交 name = 该tcc的bean名称,全局唯一 commitMethod = confirm 为二阶段确认方法 rollbackMethod = cancel 为二阶段取消方法
  • useTCCFence=true 为开启防悬挂
  • BusinessActionContextParameter注解 传递参数到二阶段中
@LocalTCC
public interface AccountTccService {

    @TwoPhaseBusinessAction(name = "deduct", commitMethod = "confirm", rollbackMethod = "cancel",useTCCFence=true)
    void deduct(@BusinessActionContextParameter(paramName = "userId") String userId,
                @BusinessActionContextParameter(paramName = "money") int money);

    boolean confirm(BusinessActionContext ctx);

    boolean cancel(BusinessActionContext ctx);
}

实现类

@Slf4j
@Service
public class AccountTccServiceImpl implements AccountTccService {

    @Autowired
    private AccountMapper accountMapper;
    @Autowired
    private AccountFreezeMapper freezeMapper;

    @Override
    public void deduct(String userId, int money) {
        String xid = RootContext.getXID();
        // 查询冻结记录,如果有,就是cancel执行过,不能继续执行
        AccountFreeze oldfreeze = freezeMapper.selectById(xid);
        if (oldfreeze != null){
            return;
        }
        // 扣除
        accountMapper.deduct(userId, money);
        // 记录
        AccountFreeze freeze = new AccountFreeze();
        freeze.setXid(xid);
        freeze.setUserId(userId);
        freeze.setFreezeMoney(money);
        freeze.setState(AccountFreeze.State.TRY);
        freezeMapper.insert(freeze);
    }

    @Override
    public boolean confirm(BusinessActionContext ctx) {
        String xid = ctx.getXid();
        int count = freezeMapper.deleteById(xid);
        return count == 1;
    }

    @Override
    public boolean cancel(BusinessActionContext ctx) {
        String xid = ctx.getXid();
        // 查询冻结记录
        AccountFreeze freeze = freezeMapper.selectById(xid);
        if(null == freeze){
            // try没有执行,需要空回滚
            freeze = new AccountFreeze();
            freeze.setXid(xid);
            freeze.setUserId(ctx.getActionContext("userId").toString());
            freeze.setFreezeMoney(0);
            freeze.setState(AccountFreeze.State.CANCEL);
            freezeMapper.insert(freeze);
            return true;
        }

        // 幂等判断
        if(freeze.getState() == AccountFreeze.State.CANCEL){
            return true;
        }

        // 恢复金额
        accountMapper.refund(freeze.getUserId(), freeze.getFreezeMoney());
        freeze.setFreezeMoney(0);
        freeze.setState(AccountFreeze.State.CANCEL);
        int count = freezeMapper.updateById(freeze);
        return count == 1;
    }
}

如果你调用一个失败的请求,这时在account_freeze_tbl表会产生一条记录。另外,各个模块中,AT和TCC模式是可以混用。

(4)常见异常

最常见的主要是这三种异常,空回滚、幂等、悬挂。

空回滚

  • 描述:

空回滚就是对于一个分布式事务,在没有调用 TCC 资源 Try 方法的情况下,调用了二阶段的 Cancel 方法,Cancel 方法需要识别出这是一个空回滚,然后直接返回成功。

也就是说,金额还没扣减就进入了 Cancel 方法,所以在 Cancel 方法中不需要任何处理,直接返回即可。

  • 出现情况:

非入口方法的服务 Try 块未执行,机器宕机或者其他原因导致 try 块无法执行

  • 解决:

需要一张额外的事务控制表,其中有分布式事务 ID 和分支事务 ID,第一阶段 Try 方法里会插入一条记录,
表示一阶段执行了。Cancel 接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存在,则是空回滚。

幂等

  • 描述

幂等就是对于同一个分布式事务的同一个分支事务,重复去调用该分支事务的第二阶段接口,因此,要求 TCC 的二阶段 Confirm 和 Cancel 接口保证幂等,不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致资损等严重问题。

  • 出现情况

什么样的情形会造成重复提交或回滚?提交或回滚是一次 TC 到参与者的网络调用。因此,网络故障、参与者宕机等都有可能造成参与者 TCC 资源实际执行了二阶段防范,但是 TC 没有收到返回结果的情况,这时,TC 就会重复调用,直至调用成功,整个分布式事务结束。

  • 解决方式

事务控制表的每条记录关联一个分支事务,那我们完全可以在这张事务控制表上加一个状态字段,用来记录每个分支事务的执行状态。

状态字段有三个值,分别是初始化、已提交、已回滚。Try 方法插入时,是初始化状态。二阶段 Confirm 和 Cancel 方法执行后修改为已提交或已回滚状态。当重复调用二阶段接口时,先获取该事务控制表对应记录,检查状态,如果已执行,则直接返回成功;否则正常执行。

悬挂

  • 描述

悬挂就是对于一个分布式事务,其二阶段 Cancel 接口比 Try 接口先执行。

因为允许空回滚的原因,Cancel 接口认为 Try 接口没执行,空回滚直接返回成功,对于 Seata 框架来说,认为分布式事务的二阶段接口已经执行成功,整个分布式事务就结束了。但是这之后 Try 方法才真正开始执行。

  • 出现情况

按照前面所讲,在 RPC 调用时,先注册分支事务,再执行 RPC 调用,如果此时 RPC 调用的网络发生拥堵,通常 RPC 调用是有超时时间的,RPC 超时以后,发起方就会通知 TC 回滚该分布式事务,可能回滚完成后,RPC 请求才到达参与者,真正执行,从而造成悬挂。

  • 解决方法

可以在二阶段执行时插入一条事务控制记录,状态为已回滚,这样当一阶段执行时,先读取该记录,如果记录存在,就认为二阶段已经执行,进行 空 try ,否则二阶段没执行,正常执行 try。

3、AT 模式(重要)

(1)原理

AT 模式的前提是:

  • 基于支持本地 ACID 事务的关系型数据库。
  • Java 应用,通过 JDBC 访问数据库。

AT模式同样是分阶段提交的事务模型,不过弥补了XA模型中资源锁定周期过长的缺陷。

  • 阶段一 RM 的工作:
    • 注册分支事务
    • 记录 undo-log 数据快照
    • 执行业务 sql 并提交
    • 报告事务状态
  • 阶段二 RM 工作
    • 事务成功提交时,删除 undo-log 即可
    • 事务失败回滚时,根据 undo-log 回复数据

image-20220621134813559

与 XA 模式的区别

  • XA 模式一阶段不提交事务,锁定资源;AT 一阶段提交事务,不锁定资源;
  • XA 模式依赖数据库机制实现回滚;AT 模式利用数据快照实现回滚
  • XA 强一致性;AT 最终一致性

(2)脏读、脏写

我们知道 Seata 的事务是一个全局事务,它包含了若干个分支本地事务,在全局事务执行过程中(全局事务还没执行完),某个本地事务提交了,如果 Seata 没有采取任务措施,则会导致已提交的本地事务被读取,造成脏读,如果数据在全局事务提交前已提交的本地事务被修改,则会造成脏写。

由此可以看出,传统意义的脏读是读到了未提交的数据,Seata 脏读是读到了全局事务下未提交的数据,全局事务可能包含多个本地事务,某个本地事务提交了不代表全局事务提交了。

在极端场景下,应用如果需要达到全局的读已提交,Seata 也提供了全局锁机制实现全局事务读已提交。但是默认情况下,Seata 的全局事务是工作在读未提交隔离级别的,保证绝大多数场景的高效性。

案例如下:

image-20220621142024724

事务 2 在修改事务 1 修改过的数据时,出现了脏读和脏写问题,即读取并修改了 事务 1 中理应是未提交的数据(实际数据库事务已提交,但在seata全局事务中属于未提交)。

如果此时事务 2 在 1.2 执行完业务出现异常,由于事务 1 的数据已被修改过,导致无法进行全局回滚。

在 seata 中有全局锁用于解决如上问题

(3)写隔离

Seata 写操作的工作流程:

  • 一阶段本地事务提交前,需要确保先拿到 全局锁
  • 拿不到 全局锁 ,不能提交本地事务。
  • 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。

以一个示例来说明:

两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。

tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁

image-20220621162610503

tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。

image-20220621163029636

如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。

此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。

因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。

(4)读隔离

在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted)

如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。

image-20220621163434414

SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。

出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。

(5)工作流程

以一个示例来说明整个 AT 分支的工作过程。

业务表:product

Field Type Key
id bigint(20) PRI
name varchar(100)
since varchar(100)

AT 分支事务的业务逻辑:

update product set name = 'GTS' where name = 'TXC';

一阶段

过程:

  1. 解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = ‘TXC’)等相关的信息。
  2. 查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
select id, name, since from product where name = 'TXC';

得到前镜像:

id name since
1 TXC 2014
  1. 执行业务 SQL:更新这条记录的 name 为 ‘GTS’。
  2. 查询后镜像:根据前镜像的结果,通过 主键 定位数据。
select id, name, since from product where id = 1`;

得到后镜像:

id name since
1 GTS 2014
  1. 插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中。
{
    "branchId": 641789253,
    "undoItems": [{
        "afterImage": {
            "rows": [{
                "fields": [{
                    "name": "id",
                    "type": 4,
                    "value": 1
                }, {
                    "name": "name",
                    "type": 12,
                    "value": "GTS"
                }, {
                    "name": "since",
                    "type": 12,
                    "value": "2014"
                }]
            }],
            "tableName": "product"
        },
        "beforeImage": {
            "rows": [{
                "fields": [{
                    "name": "id",
                    "type": 4,
                    "value": 1
                }, {
                    "name": "name",
                    "type": 12,
                    "value": "TXC"
                }, {
                    "name": "since",
                    "type": 12,
                    "value": "2014"
                }]
            }],
            "tableName": "product"
        },
        "sqlType": "UPDATE"
    }],
    "xid": "xid:xxx"
}
  1. 提交前,向 TC 注册分支:申请 product 表中,主键值等于 1 的记录的 全局锁
  2. 本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
  3. 将本地事务提交的结果上报给 TC。

二阶段-回滚

  1. 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
  2. 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
  3. 数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。
  4. 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
update product set name = 'TXC' where id = 1;
  1. 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。

二阶段-提交

  1. 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
  2. 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。

(6)回滚日志表

UNDO_LOG Table:不同数据库在类型上会略有差别。

以 MySQL 为例:

Field Type
branch_id bigint PK
xid varchar(100)
context varchar(128)
rollback_info longblob
log_status tinyint
log_created datetime
log_modified datetime
-- 注意此处0.7.0+ 增加字段 context
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

(7)实现

默认 seata 就是这个模式

seata:
  data-source-proxy-mode: AT

(8)事务隔离

https://seata.io/zh-cn/docs/user/appendix/isolation.html

4、SAGA 模式

Saga模式是SEATA提供的长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。

两个阶段:

  • 一阶段:直接提交本地事务;
  • 二阶段:如果成功:什么都不做;如果失败:通过编写补偿业务来回滚;

适用场景:

  • 业务流程长、业务流程多
  • 参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口

优势:

  • 一阶段提交本地事务,无锁,高性能
  • 事件驱动架构,参与者可异步执行,高吞吐
  • 补偿服务易于实现

缺点:

  • 不保证隔离性(应对方案见后面文档)

实现方式详见 Seata 官方文档:https://seata.io/zh-cn/docs/user/saga.html

5、四种模式对比

XA AT TCC SAGA
一致性 强一致 弱一致 弱一致 最终一致性
隔离性 完全隔离 基于全局锁隔离 基于资源预留隔离 无隔离
代码侵入 要是先三个接口 要编写状态机和补偿机制
性能 非常好 非常好

适用场景:

  • XA

对一致性、隔离性要求较高的业务

  • AT

基于关系型数据库的大多数分布式事务场景

  • TCC

对性能要求较高的事务;有非关系性数据库参与的事务

  • SAGA

业务流程长,业务流程多;

参与者包含其他公司或遗留系统服务,无法提供 TCC 模式要求的三个接口

三、下载安装

Seata分TC、TM和RM三个角色,TC(Server端)为单独服务端部署,TM和RM(Client端)由业务系统集成。

1、原生安装

Server端官网下载,本文使用1.3.0

修改file.conf里的store的mode = “db”,并修改数据库相关配置。(应先备份file.conf

Server端存储模式(store.mode)现有file、db、redis三种(后续将引入raft,mongodb):

  • file模式无需改动,直接启动即可,下面专门讲下db和redis启动步骤。
    注: file模式为单机模式,全局事务会话信息内存中读写并持久化本地文件root.data,性能较高;

  • db模式为高可用模式,全局事务会话信息通过db共享,相应性能差些;

  • redis模式Seata-Server 1.3及以上版本支持,性能较高,存在事务信息丢失风险,请提前配置合适当前场景的redis持久化配置。

## transaction log store, only used in seata-server
store {
  ## store mode: file、db、redis
  # 修改模式为 db 数据库
  mode = "db"

  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://127.0.0.1:3306/seata"
    user = "mysql"
    password = "mysql"
    minConn = 5
    maxConn = 30
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }

  ## redis store property
  redis {
    host = "127.0.0.1"
    port = "6379"
    password = ""
    database = "0"
    minConn = 1
    maxConn = 10
    queryLimit = 100
  }

}

registry.conf修改registry的type = “nacos”,并修改相关信息

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"
    nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = ""
    password = ""
  }
}

如果是使用 mysql ,则需要创建seata数据库,执行conf下的db_store.sql文件,如果没有的话,看readme

全局事务会话信息由3块内容构成,全局事务–>分支事务–>全局锁,对应表global_table、branch_table、lock_table

运行bin下的 seata-server.batseata-server.sh

2、docker-compose

http://seata.io/zh-cn/docs/ops/deploy-by-docker-compose.html

(1)server端

创建挂载文件 registry.conf

registry {
  type = "nacos"

  nacos {
  # seata服务注册在nacos上的别名,客户端通过该别名调用服务
    application = "seata-server"
  # 请根据实际生产环境配置nacos服务的ip和端口
    serverAddr = "127.0.0.1:8848"
  # nacos上指定的namespace
    namespace = "38efc944-744e-45bb-a814-e356fbd883c3"
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
}

config {
  type = "nacos"

  nacos {
    # 请根据实际生产环境配置nacos服务的ip和端口
    serverAddr = "127.0.0.1:8848"
    # nacos上指定的namespace
    namespace = "38efc944-744e-45bb-a814-e356fbd883c3"
    group = "DEFAULT_GROUP"
    username = "nacos"
    password = "nacos"
  # 从v1.4.2版本开始,已支持从一个Nacos dataId中获取所有配置信息,你只需要额外添加一个dataId配置项
    dataId: "seataServer.properties"
  }
}

创建 docker-compose.yml 文件

version: "3.1"
services:
  seata-server:
    image: seataio/seata-server:1.4.2
    container_name: seata-server
    hostname: seata-server
    ports:
      - "8091:8091"
    environment:
      # 指定seata服务启动端口
      - SEATA_PORT=8091
      # 注册到nacos上的ip。客户端将通过该ip访问seata服务。
      # 注意公网ip和内网ip的差异。
      - SEATA_IP=业务服务可访问到seata server的ip
      - SEATA_CONFIG_NAME=file:/root/seata-config/registry
    volumes:
    # 因为registry.conf中是nacos配置中心,只需要把registry.conf放到./seata-server/config文件夹中
      - "/data/docker/seata/config:/root/seata-config"

(2)Nacos新增配置

Data ID:seataServer.properties

Group:DEFAULT_GROUP

以及 namespace 这三个都要与上面的registry.conf对应

内容如下

# 存储模式
store.mode=db

store.db.datasource=druid
store.db.dbType=mysql
# 需要根据mysql的版本调整driverClassName
# mysql8及以上版本对应的driver:com.mysql.cj.jdbc.Driver
# mysql8以下版本的driver:com.mysql.jdbc.Driver
store.db.driverClassName=com.mysql.cj.jdbc.Driver
# 注意根据生产实际情况调整参数host和port
store.db.url=jdbc:mysql://127.0.0.1:3306/seata-server?useUnicode=true&characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false
store.db.user= 用户名
store.db.password=密码

# 不加该配置业务服务会报错
service.vgroupMapping.default_tx_group=default
其他可选配置

更多配置项参考官方文档

store.mode=db
#-----db-----
store.db.datasource=druid
store.db.dbType=mysql
# 需要根据mysql的版本调整driverClassName
# mysql8及以上版本对应的driver:com.mysql.cj.jdbc.Driver
# mysql8以下版本的driver:com.mysql.jdbc.Driver
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata-server?useUnicode=true&characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false
store.db.user= 用户名
store.db.password=密码
# 数据库初始连接数
store.db.minConn=1
# 数据库最大连接数
store.db.maxConn=20
# 获取连接时最大等待时间 默认5000,单位毫秒
store.db.maxWait=5000
# 全局事务表名 默认global_table
store.db.globalTable=global_table
# 分支事务表名 默认branch_table
store.db.branchTable=branch_table
# 全局锁表名 默认lock_table
store.db.lockTable=lock_table
# 查询全局事务一次的最大条数 默认100
store.db.queryLimit=100


# undo保留天数 默认7天,log_status=1(附录3)和未正常清理的undo
server.undo.logSaveDays=7
# undo清理线程间隔时间 默认86400000,单位毫秒
server.undo.logDeletePeriod=86400000
# 二阶段提交重试超时时长 单位ms,s,m,h,d,对应毫秒,秒,分,小时,天,默认毫秒。默认值-1表示无限重试
# 公式: timeout>=now-globalTransactionBeginTime,true表示超时则不再重试
# 注: 达到超时时间后将不会做任何重试,有数据不一致风险,除非业务自行可校准数据,否者慎用
server.maxCommitRetryTimeout=-1
# 二阶段回滚重试超时时长
server.maxRollbackRetryTimeout=-1
# 二阶段提交未完成状态全局事务重试提交线程间隔时间 默认1000,单位毫秒
server.recovery.committingRetryPeriod=1000
# 二阶段异步提交状态重试提交线程间隔时间 默认1000,单位毫秒
server.recovery.asynCommittingRetryPeriod=1000
# 二阶段回滚状态重试回滚线程间隔时间  默认1000,单位毫秒
server.recovery.rollbackingRetryPeriod=1000
# 超时状态检测重试线程间隔时间 默认1000,单位毫秒,检测出超时将全局事务置入回滚会话管理器
server.recovery.timeoutRetryPeriod=1000

(3)启动seata

docker-compose.yml 文件同级目录下

docker-compose up -d

四、项目集成

1、依赖

        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>1.4.2</version>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-all</artifactId>
            <version>1.4.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <version>2021.1</version>
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-spring-boot-starter</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-all</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

2、配置

注:数据源一定要使用阿里的druid

使用自动代理数据源时,如果使用XA模式还需要调整配置文件

spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    ...

seata:
  data-source-proxy-mode: AT
  enable-auto-data-source-proxy: true # 数据源自动代理
  tx-service-group: default_tx_group

  registry:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.discovery.server-addr}
      namespace: ${spring.cloud.nacos.discovery.namespace}
      application: seata-server
      group: DEFAULT_GROUP
      username: nacos
      password: nacos
  config:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.discovery.server-addr}
      namespace: ${spring.cloud.nacos.discovery.namespace}
      group: DEFAULT_GROUP
      dataId: seataServer.properties

3、创建 undo_log 表

如果使用 AT 模式,需要在所有的业务数据库中添加回滚日志表,如下

-- 注意此处0.7.0+ 增加字段 context
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

4、注解拦截

全局事务

注:只需要在调用顶层Service加注解即可,其他feign服务不需要加任何注解,包括本地事务注解

@GlobalTransactional(rollbackFor = Exception.class)
public String createOrder() {

    System.out.println("xid:" + RootContext.getXID());

    String s = stockFeignService.create();
    if(!"success".equals(s)){
        // 如果其他服务也配置了全局异常处理的配置,需要手动回滚
        GlobalTransactionContext.reload(RootContext.getXID()).rollback();
    }

    return "s";
}

TCC模式

/**
 * 定义两阶段提交 name = 该tcc的bean名称,全局唯一 commitMethod = commit 为二阶段确认方法 rollbackMethod = rollback 为二阶段取消方法
 * useTCCFence=true 为开启防悬挂
 * BusinessActionContextParameter注解 传递参数到二阶段中
 *
 * @param params  -入参
 * @return String
 */
@TwoPhaseBusinessAction(name = "beanName", commitMethod = "commit", rollbackMethod = "rollback", useTCCFence = true)
public void insert(@BusinessActionContextParameter(paramName = "params") Map<String, String> params) {
    logger.info("此处可以预留资源,或者利用tcc的特点,与AT混用,二阶段时利用一阶段在此处存放的消息,通过二阶段发出,比如redis,mq等操作");
}

/**
 * 确认方法、可以另命名,但要保证与commitMethod一致 context可以传递try方法的参数
 *
 * @param context 上下文
 * @return boolean
 */
public void commit(BusinessActionContext context) {
    logger.info("预留资源真正处理,或者发出mq消息和redis入库");
}

/**
 * 二阶段取消方法
 *
 * @param context 上下文
 * @return boolean
 */
public void rollback(BusinessActionContext context) {
    logger.info("预留资源释放,或清除一阶段准备让二阶段提交时发出的消息缓存");
}

5、切点表达式

全局事务

    @Bean
    public AspectTransactionalInterceptor aspectTransactionalInterceptor () {
        return new AspectTransactionalInterceptor();
    }

    @Bean
    public Advisor txAdviceAdvisor(AspectTransactionalInterceptor aspectTransactionalInterceptor ) {
        AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
        pointcut.setExpression("配置切点表达式使全局事务拦截器生效");
        return new DefaultPointcutAdvisor(pointcut, aspectTransactionalInterceptor);
    }

五、API

Seata API 分为两大类:High-Level API 和 Low-Level API :

  • High-Level API :用于事务边界定义、控制及事务状态查询。
  • Low-Level API :用于控制事务上下文的传播。

1、High-Level API

(1)GlobalTransaction

全局事务:包括开启事务、提交、回滚、获取当前状态等方法。

public interface GlobalTransaction {

    /**
     * 开启一个全局事务(使用默认的事务名和超时时间)
     */
    void begin() throws TransactionException;

    /**
     * 开启一个全局事务,并指定超时时间(使用默认的事务名)
     */
    void begin(int timeout) throws TransactionException;

    /**
     * 开启一个全局事务,并指定事务名和超时时间
     */
    void begin(int timeout, String name) throws TransactionException;

    /**
     * 全局提交
     */
    void commit() throws TransactionException;

    /**
     * 全局回滚
     */
    void rollback() throws TransactionException;

    /**
     * 获取事务的当前状态
     */
    GlobalStatus getStatus() throws TransactionException;

    /**
     * 获取事务的 XID
     */
    String getXid();

}

(2)GlobalTransactionContext

GlobalTransaction 实例的获取需要通过 GlobalTransactionContext:

    /**
     * 获取当前的全局事务实例,如果没有则创建一个新的实例。
     */
    public static GlobalTransaction getCurrentOrCreate() {
        GlobalTransaction tx = getCurrent();
        if (tx == null) {
            return createNew();
        }
        return tx;
    }

    /**
     * 重新载入给定 XID 的全局事务实例,这个实例不允许执行开启事务的操作。
     * 这个 API 通常用于失败的事务的后续集中处理。
     * 比如:全局提交超时,后续集中处理通过重新载入该实例,通过实例方法获取事务当前状态,并根据状态判断是否需要重试全局提交操作。
     */
    public static GlobalTransaction reload(String xid) throws TransactionException {
        GlobalTransaction tx = new DefaultGlobalTransaction(xid, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher) {
            @Override
            public void begin(int timeout, String name) throws TransactionException {
                throw new IllegalStateException("Never BEGIN on a RELOADED GlobalTransaction. ");
            }
        };
        return tx;
    }

(3)TransactionalTemplate

事务化模板:通过上述 GlobalTransaction 和 GlobalTransactionContext API 把一个业务服务的调用包装成带有分布式事务支持的服务。

public class TransactionalTemplate {

    public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {

        // 1. 获取当前全局事务实例或创建新的实例
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 2. 开启全局事务
        try {
            tx.begin(business.timeout(), business.name());

        } catch (TransactionException txe) {
            // 2.1 开启失败
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);

        }

        Object rs = null;
        try {
            // 3. 调用业务服务
            rs = business.execute();

        } catch (Throwable ex) {

            // 业务调用本身的异常
            try {
                // 全局回滚
                tx.rollback();

                // 3.1 全局回滚成功:抛出原始业务异常
                throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);

            } catch (TransactionException txe) {
                // 3.2 全局回滚失败:
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.RollbackFailure, ex);

            }

        }

        // 4. 全局提交
        try {
            tx.commit();

        } catch (TransactionException txe) {
            // 4.1 全局提交失败:
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.CommitFailure);

        }
        return rs;
    }

}

模板方法执行的异常:ExecutionException

class ExecutionException extends Exception {

    // 发生异常的事务实例
    private GlobalTransaction transaction;

    // 异常编码:
    // BeginFailure(开启事务失败)
    // CommitFailure(全局提交失败)
    // RollbackFailure(全局回滚失败)
    // RollbackDone(全局回滚成功)
    private Code code;

    // 触发回滚的业务原始异常
    private Throwable originalException;
}

外层调用逻辑 try-catch 这个异常,根据异常编码进行处理:

  • BeginFailure (开启事务失败):getCause() 得到开启事务失败的框架异常,getOriginalException() 为空。
  • CommitFailure (全局提交失败):getCause() 得到全局提交失败的框架异常,getOriginalException() 为空。
  • RollbackFailure (全局回滚失败):getCause() 得到全局回滚失败的框架异常,getOriginalException() 业务应用的原始异常。
  • RollbackDone (全局回滚成功):getCause() 为空,getOriginalException() 业务应用的原始异常。

2、Low-Level API

(1)RootContext

事务的根上下文:负责在应用的运行时,维护 XID 。

    /**
     * 得到当前应用运行时的全局事务 XID
     */
    public static String getXID() {
        return CONTEXT_HOLDER.get(KEY_XID);
    }

    /**
     * 将全局事务 XID 绑定到当前应用的运行时中
     */
    public static void bind(String xid) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("bind " + xid);
        }
        CONTEXT_HOLDER.put(KEY_XID, xid);
    }

    /**
     * 将全局事务 XID 从当前应用的运行时中解除绑定,同时将 XID 返回
     */
    public static String unbind() {
        String xid = CONTEXT_HOLDER.remove(KEY_XID);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("unbind " + xid);
        }
        return xid;
    }

    /**
     * 判断当前应用的运行时是否处于全局事务的上下文中
     */
    public static boolean inGlobalTransaction() {
        return CONTEXT_HOLDER.get(KEY_XID) != null;
    }

High-Level API 的实现都是基于 RootContext 中维护的 XID 来做的。

应用的当前运行的操作是否在一个全局事务的上下文中,就是看 RootContext 中是否有 XID。

RootContext 的默认实现是基于 ThreadLocal 的,即 XID 保存在当前线程上下文中。

Low-Level API 的两个典型的应用场景:

远程调用事务上下文的传播

远程调用前获取当前 XID:

String xid = RootContext.getXID();

远程调用过程把 XID 也传递到服务提供方,在执行服务提供方的业务逻辑前,把 XID 绑定到当前应用的运行时:

RootContext.bind(rpcXid);

事务的暂停和恢复

在一个全局事务中,如果需要某些业务逻辑不在全局事务的管辖范围内,则在调用前,把 XID 解绑:

String unbindXid = RootContext.unbind();

待相关业务逻辑执行完成,再把 XID 绑定回去,即可实现全局事务的恢复:

RootContext.bind(unbindXid);

3、常用API

(1)获取xid

同一个分布式事务下,所有服务的 xid 一致

RootContext.getXID();

(2)手动回滚

Fegin调用使用了Fallback降级或抛出的异常被全局处理(比如全局异常处理GlobalExceptionHandler),seata会认为你已经手动处理了异常,此时就需要手动回滚。

GlobalTransactionContext.reload(RootContext.getXID()).rollback();

  目录