一、基本概念
Seata 是一款开源的分布式事务解决方案,是一个一加三组件模型(详见术语表),致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
一次业务操作需要垮多个数据源或需要垮多个系统进行远程调用,就会产生分布式事务问题.
官方中文文档:http://seata.io/zh-cn/docs/overview/what-is-seata.html
1、术语表
Transaction ID XID:全局唯一的事务ID
三组件:
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,协调全局事务提交或回滚。
TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源(数据库),与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。简单来说 RM 就是数据库
一个典型的分布式事务的处理过程
1、TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成全局唯一 XID
2、XID 在微服务调用链路的上下文中传播
3、RM 向 TC 发起针对 XID 的全局提交或回滚决议
4、TC 调度 XID 下管辖的全部分支完成提交或回滚请求
在上图中可以知道,TM和RM其实是对业务方法的代理和管理,而TC是脱离业务之外的一个服务,由它去协调TM和RM,协调全局事务和分支事务。
在分布式事务中,会有一个入口方法去调用各个微服务,每一个微服务都有一个分支事务,因此调用了多少个微服务,全局事务就有多少个分支事务,TM代理这个入口方法,因此就定义了全局事务的范围。
当入口方法被执行时,TM会先拦截这个方法的执行,会先想TC发送一个请求,注册这个全局事务,然后既可以开始执行这个入口的业务逻辑了,开始调用每一个微服务。到了微服务里面,每个分支事务就开始执行,这个时候RM就会代理分支业务,在分支业务执行之前向TC注册分支事务,然后开始执行分支业务。在执行完成后,有RM向TC报告分支事务的状态。
因此,TC就知道了所有的分支事务的状态,然后等到全部分支业务执行完成,TM向TC发送全局事务状态的时候,TC就会检查分支事务的状态,如果都成功,就让各个分支事务都去提交,如果失败就让它们都回滚。
二、事务模式
分布式事务模式 | 介绍 | |
---|---|---|
AT 模式 | 无侵入的分布式事务解决方案,适用于不希望对业务进行改造的场景,几乎0学习成本(sql都由框架托管统一执行,会存在脏写问题) | 最终一致性分阶段事务模式,无业务侵入,默认模式 |
TCC 模式 | 高性能分布式事务解决方案,适用于核心系统等对性能有很高要求的场景(第一阶段会产生行锁,事务执行太久会锁行很久) | 最终一致性分阶段事务模式,有业务侵入 |
Saga 模式 | 长事务解决方案,适用于业务流程长且需要保证事务最终一致性的业务系统(第一阶段就操作DB,会存在脏读问题) | 长事务模式,有业务侵入 |
XA模式 | 分布式强一致性的解决方案,但性能低而使用较少。无业务侵入 | 强一致性分阶段事务模式,无业务侵入 |
Saga和TCC模式区别不大,TCC就是多了个锁行的步骤(避免了脏读,但事务执行太久会导致锁行很久,不适用于长事务) |
1、XA 模式
XA规范是分布式事务处理标准,它描述了全局的TM和局部的RM之间的接口,几乎所有的主流的数据库都对XA规范提供了支持;
(1)执行原理
XA将分布式事务分为两个阶段,一个是准备阶段,一个是执行阶段。
准备阶段: 事务协调者会向事务参与者RM发送一个请求,这里的RM其实是由数据库实现的,所以可以认为RM就是数据库。让数据库去执行事务,但执行完不要提交,而是把结果告知事务协调者。
执行阶段: 事务协调者根据结果,通知RM回滚或者提交事务。
简单来说就是一阶段执行事务,暂时挂起,不提交,通知 TM ,等到该全局事务所有的子事务全部完成,才会同一进行回滚或提交
(2)优缺点
优点:
这是一种强一致性的解决方案,因为每一个微服务都是基于各自的事务的,各自的事务是满足ACID的,而且等到大家都执行完了且都成功了才提交,所以全局事务是满足ACID的。
实现比较简单,因为很多数据库都实现了这种模式,使用Seata的XA模式只需要简单的封装上TM。
缺点:
第一阶段不提交,等到第二阶段再提交,但是等的过程中要占用数据库锁,如果一个分布式事务中跨越了很多个分支事务,则可能造成很多资源的浪费,使得别的请求无法访问,降低了可用性;
依赖于数据库,对于如果有的数据库没有实现这种模式,则无法使用这个模式来实现分布式事务。
(3)实现
application.yml
配置文件配置
seata:
data-source-proxy-mode: XA
在全局事务的入口方法上加 @GlobalTransactional
2、TCC 模式
(1)原理
TCC 模式与 AT 模式非常相似,每个阶段都是独立的事务,不同的是TCC通过人工编码来实现数据恢复。
TCC是 Try-Confirm-Cancel
的简称。TCC要求每个分支事务实现三个操作:预处理Try、确认 Confirm、撤销Cancel。
- Try:操作做业务检查及资源预留,
- Confirm:做业务确认操作,要求 try 成功 confirm 一定能成功
- Cancel:实现一个与Try相反的操作即回滚操作,用于释放预留资源
TM首先发起所有的分支事务的try操作,任何一个分支事务的try操作执行失败,TM将会发起所有分支事务的Cancel操作,若try操作全部成功,TM将会发起所有分支事务的Confirm操作,其中Confirm/Cancel 操作若执行失败,TM会进行重试。
根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction Mode 和 TCC (Branch) Transaction Mode.
AT 模式(参考链接 TBD)基于 支持本地 ACID 事务 的 关系型数据库:
- 一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
- 二阶段 commit 行为:马上成功结束,自动 异步批量清理回滚日志。
- 二阶段 rollback 行为:通过回滚日志,自动 生成补偿操作,完成数据回滚。
相应的,TCC 模式,不依赖于底层数据资源的事务支持:
- 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
- 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
- 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。
所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。
所以,使用TCC模式时,需要自定义编码,这时seata将对代码产生侵入性,另外不是所有业务都适合TCC模式,一般是需要进行扣减或者新增数量时使用。
(2)优缺点
优点:
- 一阶段完成直接提交事务,释放数据库资源,性能好
- 相比 AT 模型,无需生成快照,无需使用全局锁,性能最强
- 不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库
缺点:
- 有代码侵入,需要人为编写 try、Confirm、Cancel 接口,太麻烦了
- 软状态、事务是最终一致性
- 需要考虑做好 Confirm、Cancel 失败的情况,做好幂等处理
(3)实现
基本流程为,先执行扣减操作,然后记录一个日志(表结构如下,可自定义),该日志以事物ID为区分,如果整体事物成功,则执行confirm,如果失败则执行cancel。
这时还需要注意的问题是,可能这边还没有执行业务,但是seata认定整体失败,要开始调动cancel了,这时要处理空回滚和幂等性问题。
也可能调用了cancel,此时堵塞的程序又开始执行正常业务逻辑,这时需要注意不能重复执行。
DROP TABLE IF EXISTS `account_freeze_tbl`;
CREATE TABLE `account_freeze_tbl` (
`xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`user_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`freeze_money` int(11) UNSIGNED NULL DEFAULT 0,
`state` int(1) NULL DEFAULT NULL COMMENT '事务状态,0:try,1:confirm,2:cancel',
PRIMARY KEY (`xid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = COMPACT;
接口
- 定义两阶段提交 name = 该tcc的bean名称,全局唯一
commitMethod = confirm
为二阶段确认方法rollbackMethod = cancel
为二阶段取消方法 useTCCFence=true
为开启防悬挂BusinessActionContextParameter
注解 传递参数到二阶段中
@LocalTCC
public interface AccountTccService {
@TwoPhaseBusinessAction(name = "deduct", commitMethod = "confirm", rollbackMethod = "cancel",useTCCFence=true)
void deduct(@BusinessActionContextParameter(paramName = "userId") String userId,
@BusinessActionContextParameter(paramName = "money") int money);
boolean confirm(BusinessActionContext ctx);
boolean cancel(BusinessActionContext ctx);
}
实现类
@Slf4j
@Service
public class AccountTccServiceImpl implements AccountTccService {
@Autowired
private AccountMapper accountMapper;
@Autowired
private AccountFreezeMapper freezeMapper;
@Override
public void deduct(String userId, int money) {
String xid = RootContext.getXID();
// 查询冻结记录,如果有,就是cancel执行过,不能继续执行
AccountFreeze oldfreeze = freezeMapper.selectById(xid);
if (oldfreeze != null){
return;
}
// 扣除
accountMapper.deduct(userId, money);
// 记录
AccountFreeze freeze = new AccountFreeze();
freeze.setXid(xid);
freeze.setUserId(userId);
freeze.setFreezeMoney(money);
freeze.setState(AccountFreeze.State.TRY);
freezeMapper.insert(freeze);
}
@Override
public boolean confirm(BusinessActionContext ctx) {
String xid = ctx.getXid();
int count = freezeMapper.deleteById(xid);
return count == 1;
}
@Override
public boolean cancel(BusinessActionContext ctx) {
String xid = ctx.getXid();
// 查询冻结记录
AccountFreeze freeze = freezeMapper.selectById(xid);
if(null == freeze){
// try没有执行,需要空回滚
freeze = new AccountFreeze();
freeze.setXid(xid);
freeze.setUserId(ctx.getActionContext("userId").toString());
freeze.setFreezeMoney(0);
freeze.setState(AccountFreeze.State.CANCEL);
freezeMapper.insert(freeze);
return true;
}
// 幂等判断
if(freeze.getState() == AccountFreeze.State.CANCEL){
return true;
}
// 恢复金额
accountMapper.refund(freeze.getUserId(), freeze.getFreezeMoney());
freeze.setFreezeMoney(0);
freeze.setState(AccountFreeze.State.CANCEL);
int count = freezeMapper.updateById(freeze);
return count == 1;
}
}
如果你调用一个失败的请求,这时在account_freeze_tbl表会产生一条记录。另外,各个模块中,AT和TCC模式是可以混用。
(4)常见异常
最常见的主要是这三种异常,空回滚、幂等、悬挂。
空回滚
- 描述:
空回滚就是对于一个分布式事务,在没有调用 TCC 资源 Try 方法的情况下,调用了二阶段的 Cancel 方法,Cancel 方法需要识别出这是一个空回滚,然后直接返回成功。
也就是说,金额还没扣减就进入了 Cancel 方法,所以在 Cancel 方法中不需要任何处理,直接返回即可。
- 出现情况:
非入口方法的服务 Try 块未执行,机器宕机或者其他原因导致 try 块无法执行
- 解决:
需要一张额外的事务控制表,其中有分布式事务 ID 和分支事务 ID,第一阶段 Try 方法里会插入一条记录,
表示一阶段执行了。Cancel 接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存在,则是空回滚。
幂等
- 描述
幂等就是对于同一个分布式事务的同一个分支事务,重复去调用该分支事务的第二阶段接口,因此,要求 TCC 的二阶段 Confirm 和 Cancel 接口保证幂等,不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致资损等严重问题。
- 出现情况
什么样的情形会造成重复提交或回滚?提交或回滚是一次 TC 到参与者的网络调用。因此,网络故障、参与者宕机等都有可能造成参与者 TCC 资源实际执行了二阶段防范,但是 TC 没有收到返回结果的情况,这时,TC 就会重复调用,直至调用成功,整个分布式事务结束。
- 解决方式
事务控制表的每条记录关联一个分支事务,那我们完全可以在这张事务控制表上加一个状态字段,用来记录每个分支事务的执行状态。
状态字段有三个值,分别是初始化、已提交、已回滚。Try 方法插入时,是初始化状态。二阶段 Confirm 和 Cancel 方法执行后修改为已提交或已回滚状态。当重复调用二阶段接口时,先获取该事务控制表对应记录,检查状态,如果已执行,则直接返回成功;否则正常执行。
悬挂
- 描述
悬挂就是对于一个分布式事务,其二阶段 Cancel 接口比 Try 接口先执行。
因为允许空回滚的原因,Cancel 接口认为 Try 接口没执行,空回滚直接返回成功,对于 Seata 框架来说,认为分布式事务的二阶段接口已经执行成功,整个分布式事务就结束了。但是这之后 Try 方法才真正开始执行。
- 出现情况
按照前面所讲,在 RPC 调用时,先注册分支事务,再执行 RPC 调用,如果此时 RPC 调用的网络发生拥堵,通常 RPC 调用是有超时时间的,RPC 超时以后,发起方就会通知 TC 回滚该分布式事务,可能回滚完成后,RPC 请求才到达参与者,真正执行,从而造成悬挂。
- 解决方法
可以在二阶段执行时插入一条事务控制记录,状态为已回滚,这样当一阶段执行时,先读取该记录,如果记录存在,就认为二阶段已经执行,进行 空 try ,否则二阶段没执行,正常执行 try。
3、AT 模式(重要)
(1)原理
AT 模式的前提是:
- 基于支持本地 ACID 事务的关系型数据库。
- Java 应用,通过 JDBC 访问数据库。
AT模式同样是分阶段提交的事务模型,不过弥补了XA模型中资源锁定周期过长的缺陷。
- 阶段一 RM 的工作:
- 注册分支事务
- 记录 undo-log 数据快照
- 执行业务 sql 并提交
- 报告事务状态
- 阶段二 RM 工作
- 事务成功提交时,删除 undo-log 即可
- 事务失败回滚时,根据 undo-log 回复数据
与 XA 模式的区别
- XA 模式一阶段不提交事务,锁定资源;AT 一阶段提交事务,不锁定资源;
- XA 模式依赖数据库机制实现回滚;AT 模式利用数据快照实现回滚
- XA 强一致性;AT 最终一致性
(2)脏读、脏写
我们知道 Seata 的事务是一个全局事务,它包含了若干个分支本地事务,在全局事务执行过程中(全局事务还没执行完),某个本地事务提交了,如果 Seata 没有采取任务措施,则会导致已提交的本地事务被读取,造成脏读,如果数据在全局事务提交前已提交的本地事务被修改,则会造成脏写。
由此可以看出,传统意义的脏读是读到了未提交的数据,Seata 脏读是读到了全局事务下未提交的数据,全局事务可能包含多个本地事务,某个本地事务提交了不代表全局事务提交了。
在极端场景下,应用如果需要达到全局的读已提交,Seata 也提供了全局锁机制实现全局事务读已提交。但是默认情况下,Seata 的全局事务是工作在读未提交隔离级别的,保证绝大多数场景的高效性。
案例如下:
事务 2 在修改事务 1 修改过的数据时,出现了脏读和脏写问题,即读取并修改了 事务 1 中理应是未提交的数据(实际数据库事务已提交,但在seata全局事务中属于未提交)。
如果此时事务 2 在 1.2 执行完业务出现异常,由于事务 1 的数据已被修改过,导致无法进行全局回滚。
在 seata 中有全局锁用于解决如上问题
(3)写隔离
Seata 写操作的工作流程:
- 一阶段本地事务提交前,需要确保先拿到 全局锁 。
- 拿不到 全局锁 ,不能提交本地事务。
- 拿 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。
以一个示例来说明:
两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。
tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。
tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。
(4)读隔离
在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。
如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。
SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。
出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
(5)工作流程
以一个示例来说明整个 AT 分支的工作过程。
业务表:product
Field | Type | Key |
---|---|---|
id | bigint(20) | PRI |
name | varchar(100) | |
since | varchar(100) |
AT 分支事务的业务逻辑:
update product set name = 'GTS' where name = 'TXC';
一阶段
过程:
- 解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = ‘TXC’)等相关的信息。
- 查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
select id, name, since from product where name = 'TXC';
得到前镜像:
id | name | since |
---|---|---|
1 | TXC | 2014 |
- 执行业务 SQL:更新这条记录的 name 为 ‘GTS’。
- 查询后镜像:根据前镜像的结果,通过 主键 定位数据。
select id, name, since from product where id = 1`;
得到后镜像:
id | name | since |
---|---|---|
1 | GTS | 2014 |
- 插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到
UNDO_LOG
表中。
{
"branchId": 641789253,
"undoItems": [{
"afterImage": {
"rows": [{
"fields": [{
"name": "id",
"type": 4,
"value": 1
}, {
"name": "name",
"type": 12,
"value": "GTS"
}, {
"name": "since",
"type": 12,
"value": "2014"
}]
}],
"tableName": "product"
},
"beforeImage": {
"rows": [{
"fields": [{
"name": "id",
"type": 4,
"value": 1
}, {
"name": "name",
"type": 12,
"value": "TXC"
}, {
"name": "since",
"type": 12,
"value": "2014"
}]
}],
"tableName": "product"
},
"sqlType": "UPDATE"
}],
"xid": "xid:xxx"
}
- 提交前,向 TC 注册分支:申请
product
表中,主键值等于 1 的记录的 全局锁 。 - 本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
- 将本地事务提交的结果上报给 TC。
二阶段-回滚
- 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
- 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
- 数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。
- 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
update product set name = 'TXC' where id = 1;
- 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。
二阶段-提交
- 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
- 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。
(6)回滚日志表
UNDO_LOG Table:不同数据库在类型上会略有差别。
以 MySQL 为例:
Field | Type |
---|---|
branch_id | bigint PK |
xid | varchar(100) |
context | varchar(128) |
rollback_info | longblob |
log_status | tinyint |
log_created | datetime |
log_modified | datetime |
-- 注意此处0.7.0+ 增加字段 context
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
(7)实现
默认 seata 就是这个模式
seata:
data-source-proxy-mode: AT
(8)事务隔离
https://seata.io/zh-cn/docs/user/appendix/isolation.html
4、SAGA 模式
Saga模式是SEATA提供的长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。
两个阶段:
- 一阶段:直接提交本地事务;
- 二阶段:如果成功:什么都不做;如果失败:通过编写补偿业务来回滚;
适用场景:
- 业务流程长、业务流程多
- 参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口
优势:
- 一阶段提交本地事务,无锁,高性能
- 事件驱动架构,参与者可异步执行,高吞吐
- 补偿服务易于实现
缺点:
- 不保证隔离性(应对方案见后面文档)
实现方式详见 Seata 官方文档:https://seata.io/zh-cn/docs/user/saga.html
5、四种模式对比
XA | AT | TCC | SAGA | |
---|---|---|---|---|
一致性 | 强一致 | 弱一致 | 弱一致 | 最终一致性 |
隔离性 | 完全隔离 | 基于全局锁隔离 | 基于资源预留隔离 | 无隔离 |
代码侵入 | 无 | 无 | 要是先三个接口 | 要编写状态机和补偿机制 |
性能 | 差 | 好 | 非常好 | 非常好 |
适用场景:
- XA
对一致性、隔离性要求较高的业务
- AT
基于关系型数据库的大多数分布式事务场景
- TCC
对性能要求较高的事务;有非关系性数据库参与的事务
- SAGA
业务流程长,业务流程多;
参与者包含其他公司或遗留系统服务,无法提供 TCC 模式要求的三个接口
三、下载安装
Seata分TC、TM和RM三个角色,TC(Server端)为单独服务端部署,TM和RM(Client端)由业务系统集成。
1、原生安装
Server端官网下载,本文使用1.3.0
修改file.conf
里的store的mode = “db”,并修改数据库相关配置。(应先备份file.conf
)
Server端存储模式(store.mode)现有file、db、redis三种(后续将引入raft,mongodb):
file模式无需改动,直接启动即可,下面专门讲下db和redis启动步骤。
注: file模式为单机模式,全局事务会话信息内存中读写并持久化本地文件root.data,性能较高;db模式为高可用模式,全局事务会话信息通过db共享,相应性能差些;
redis模式Seata-Server 1.3及以上版本支持,性能较高,存在事务信息丢失风险,请提前配置合适当前场景的redis持久化配置。
## transaction log store, only used in seata-server
store {
## store mode: file、db、redis
# 修改模式为 db 数据库
mode = "db"
## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 16384
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 16384
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}
## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
datasource = "druid"
## mysql/oracle/postgresql/h2/oceanbase etc.
dbType = "mysql"
driverClassName = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata"
user = "mysql"
password = "mysql"
minConn = 5
maxConn = 30
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
maxWait = 5000
}
## redis store property
redis {
host = "127.0.0.1"
port = "6379"
password = ""
database = "0"
minConn = 1
maxConn = 10
queryLimit = 100
}
}
registry.conf修改registry的type = “nacos”,并修改相关信息
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = ""
password = ""
}
}
如果是使用 mysql ,则需要创建seata数据库,执行conf下的db_store.sql文件,如果没有的话,看readme
全局事务会话信息由3块内容构成,全局事务–>分支事务–>全局锁,对应表global_table、branch_table、lock_table
运行bin下的 seata-server.bat
或 seata-server.sh
2、docker-compose
http://seata.io/zh-cn/docs/ops/deploy-by-docker-compose.html
(1)server端
创建挂载文件 registry.conf
registry {
type = "nacos"
nacos {
# seata服务注册在nacos上的别名,客户端通过该别名调用服务
application = "seata-server"
# 请根据实际生产环境配置nacos服务的ip和端口
serverAddr = "127.0.0.1:8848"
# nacos上指定的namespace
namespace = "38efc944-744e-45bb-a814-e356fbd883c3"
cluster = "default"
username = "nacos"
password = "nacos"
}
}
config {
type = "nacos"
nacos {
# 请根据实际生产环境配置nacos服务的ip和端口
serverAddr = "127.0.0.1:8848"
# nacos上指定的namespace
namespace = "38efc944-744e-45bb-a814-e356fbd883c3"
group = "DEFAULT_GROUP"
username = "nacos"
password = "nacos"
# 从v1.4.2版本开始,已支持从一个Nacos dataId中获取所有配置信息,你只需要额外添加一个dataId配置项
dataId: "seataServer.properties"
}
}
创建 docker-compose.yml
文件
version: "3.1"
services:
seata-server:
image: seataio/seata-server:1.4.2
container_name: seata-server
hostname: seata-server
ports:
- "8091:8091"
environment:
# 指定seata服务启动端口
- SEATA_PORT=8091
# 注册到nacos上的ip。客户端将通过该ip访问seata服务。
# 注意公网ip和内网ip的差异。
- SEATA_IP=业务服务可访问到seata server的ip
- SEATA_CONFIG_NAME=file:/root/seata-config/registry
volumes:
# 因为registry.conf中是nacos配置中心,只需要把registry.conf放到./seata-server/config文件夹中
- "/data/docker/seata/config:/root/seata-config"
(2)Nacos新增配置
Data ID:seataServer.properties
Group:DEFAULT_GROUP
以及 namespace 这三个都要与上面的registry.conf
对应
内容如下
# 存储模式
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
# 需要根据mysql的版本调整driverClassName
# mysql8及以上版本对应的driver:com.mysql.cj.jdbc.Driver
# mysql8以下版本的driver:com.mysql.jdbc.Driver
store.db.driverClassName=com.mysql.cj.jdbc.Driver
# 注意根据生产实际情况调整参数host和port
store.db.url=jdbc:mysql://127.0.0.1:3306/seata-server?useUnicode=true&characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false
store.db.user= 用户名
store.db.password=密码
# 不加该配置业务服务会报错
service.vgroupMapping.default_tx_group=default
其他可选配置
更多配置项参考官方文档
store.mode=db
#-----db-----
store.db.datasource=druid
store.db.dbType=mysql
# 需要根据mysql的版本调整driverClassName
# mysql8及以上版本对应的driver:com.mysql.cj.jdbc.Driver
# mysql8以下版本的driver:com.mysql.jdbc.Driver
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata-server?useUnicode=true&characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useSSL=false
store.db.user= 用户名
store.db.password=密码
# 数据库初始连接数
store.db.minConn=1
# 数据库最大连接数
store.db.maxConn=20
# 获取连接时最大等待时间 默认5000,单位毫秒
store.db.maxWait=5000
# 全局事务表名 默认global_table
store.db.globalTable=global_table
# 分支事务表名 默认branch_table
store.db.branchTable=branch_table
# 全局锁表名 默认lock_table
store.db.lockTable=lock_table
# 查询全局事务一次的最大条数 默认100
store.db.queryLimit=100
# undo保留天数 默认7天,log_status=1(附录3)和未正常清理的undo
server.undo.logSaveDays=7
# undo清理线程间隔时间 默认86400000,单位毫秒
server.undo.logDeletePeriod=86400000
# 二阶段提交重试超时时长 单位ms,s,m,h,d,对应毫秒,秒,分,小时,天,默认毫秒。默认值-1表示无限重试
# 公式: timeout>=now-globalTransactionBeginTime,true表示超时则不再重试
# 注: 达到超时时间后将不会做任何重试,有数据不一致风险,除非业务自行可校准数据,否者慎用
server.maxCommitRetryTimeout=-1
# 二阶段回滚重试超时时长
server.maxRollbackRetryTimeout=-1
# 二阶段提交未完成状态全局事务重试提交线程间隔时间 默认1000,单位毫秒
server.recovery.committingRetryPeriod=1000
# 二阶段异步提交状态重试提交线程间隔时间 默认1000,单位毫秒
server.recovery.asynCommittingRetryPeriod=1000
# 二阶段回滚状态重试回滚线程间隔时间 默认1000,单位毫秒
server.recovery.rollbackingRetryPeriod=1000
# 超时状态检测重试线程间隔时间 默认1000,单位毫秒,检测出超时将全局事务置入回滚会话管理器
server.recovery.timeoutRetryPeriod=1000
(3)启动seata
在 docker-compose.yml
文件同级目录下
docker-compose up -d
四、项目集成
1、依赖
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2021.1</version>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
2、配置
注:数据源一定要使用阿里的druid
使用自动代理数据源时,如果使用XA模式还需要调整配置文件
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
...
seata:
data-source-proxy-mode: AT
enable-auto-data-source-proxy: true # 数据源自动代理
tx-service-group: default_tx_group
registry:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.discovery.server-addr}
namespace: ${spring.cloud.nacos.discovery.namespace}
application: seata-server
group: DEFAULT_GROUP
username: nacos
password: nacos
config:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.discovery.server-addr}
namespace: ${spring.cloud.nacos.discovery.namespace}
group: DEFAULT_GROUP
dataId: seataServer.properties
3、创建 undo_log 表
如果使用 AT 模式,需要在所有的业务数据库中添加回滚日志表,如下
-- 注意此处0.7.0+ 增加字段 context
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
4、注解拦截
全局事务
注:只需要在调用顶层Service加注解即可,其他feign服务不需要加任何注解,包括本地事务注解
@GlobalTransactional(rollbackFor = Exception.class)
public String createOrder() {
System.out.println("xid:" + RootContext.getXID());
String s = stockFeignService.create();
if(!"success".equals(s)){
// 如果其他服务也配置了全局异常处理的配置,需要手动回滚
GlobalTransactionContext.reload(RootContext.getXID()).rollback();
}
return "s";
}
TCC模式
/**
* 定义两阶段提交 name = 该tcc的bean名称,全局唯一 commitMethod = commit 为二阶段确认方法 rollbackMethod = rollback 为二阶段取消方法
* useTCCFence=true 为开启防悬挂
* BusinessActionContextParameter注解 传递参数到二阶段中
*
* @param params -入参
* @return String
*/
@TwoPhaseBusinessAction(name = "beanName", commitMethod = "commit", rollbackMethod = "rollback", useTCCFence = true)
public void insert(@BusinessActionContextParameter(paramName = "params") Map<String, String> params) {
logger.info("此处可以预留资源,或者利用tcc的特点,与AT混用,二阶段时利用一阶段在此处存放的消息,通过二阶段发出,比如redis,mq等操作");
}
/**
* 确认方法、可以另命名,但要保证与commitMethod一致 context可以传递try方法的参数
*
* @param context 上下文
* @return boolean
*/
public void commit(BusinessActionContext context) {
logger.info("预留资源真正处理,或者发出mq消息和redis入库");
}
/**
* 二阶段取消方法
*
* @param context 上下文
* @return boolean
*/
public void rollback(BusinessActionContext context) {
logger.info("预留资源释放,或清除一阶段准备让二阶段提交时发出的消息缓存");
}
5、切点表达式
全局事务
@Bean
public AspectTransactionalInterceptor aspectTransactionalInterceptor () {
return new AspectTransactionalInterceptor();
}
@Bean
public Advisor txAdviceAdvisor(AspectTransactionalInterceptor aspectTransactionalInterceptor ) {
AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
pointcut.setExpression("配置切点表达式使全局事务拦截器生效");
return new DefaultPointcutAdvisor(pointcut, aspectTransactionalInterceptor);
}
五、API
Seata API 分为两大类:High-Level API 和 Low-Level API :
- High-Level API :用于事务边界定义、控制及事务状态查询。
- Low-Level API :用于控制事务上下文的传播。
1、High-Level API
(1)GlobalTransaction
全局事务:包括开启事务、提交、回滚、获取当前状态等方法。
public interface GlobalTransaction {
/**
* 开启一个全局事务(使用默认的事务名和超时时间)
*/
void begin() throws TransactionException;
/**
* 开启一个全局事务,并指定超时时间(使用默认的事务名)
*/
void begin(int timeout) throws TransactionException;
/**
* 开启一个全局事务,并指定事务名和超时时间
*/
void begin(int timeout, String name) throws TransactionException;
/**
* 全局提交
*/
void commit() throws TransactionException;
/**
* 全局回滚
*/
void rollback() throws TransactionException;
/**
* 获取事务的当前状态
*/
GlobalStatus getStatus() throws TransactionException;
/**
* 获取事务的 XID
*/
String getXid();
}
(2)GlobalTransactionContext
GlobalTransaction 实例的获取需要通过 GlobalTransactionContext:
/**
* 获取当前的全局事务实例,如果没有则创建一个新的实例。
*/
public static GlobalTransaction getCurrentOrCreate() {
GlobalTransaction tx = getCurrent();
if (tx == null) {
return createNew();
}
return tx;
}
/**
* 重新载入给定 XID 的全局事务实例,这个实例不允许执行开启事务的操作。
* 这个 API 通常用于失败的事务的后续集中处理。
* 比如:全局提交超时,后续集中处理通过重新载入该实例,通过实例方法获取事务当前状态,并根据状态判断是否需要重试全局提交操作。
*/
public static GlobalTransaction reload(String xid) throws TransactionException {
GlobalTransaction tx = new DefaultGlobalTransaction(xid, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher) {
@Override
public void begin(int timeout, String name) throws TransactionException {
throw new IllegalStateException("Never BEGIN on a RELOADED GlobalTransaction. ");
}
};
return tx;
}
(3)TransactionalTemplate
事务化模板:通过上述 GlobalTransaction 和 GlobalTransactionContext API 把一个业务服务的调用包装成带有分布式事务支持的服务。
public class TransactionalTemplate {
public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
// 1. 获取当前全局事务实例或创建新的实例
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 2. 开启全局事务
try {
tx.begin(business.timeout(), business.name());
} catch (TransactionException txe) {
// 2.1 开启失败
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
Object rs = null;
try {
// 3. 调用业务服务
rs = business.execute();
} catch (Throwable ex) {
// 业务调用本身的异常
try {
// 全局回滚
tx.rollback();
// 3.1 全局回滚成功:抛出原始业务异常
throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
} catch (TransactionException txe) {
// 3.2 全局回滚失败:
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, ex);
}
}
// 4. 全局提交
try {
tx.commit();
} catch (TransactionException txe) {
// 4.1 全局提交失败:
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
return rs;
}
}
模板方法执行的异常:ExecutionException
class ExecutionException extends Exception {
// 发生异常的事务实例
private GlobalTransaction transaction;
// 异常编码:
// BeginFailure(开启事务失败)
// CommitFailure(全局提交失败)
// RollbackFailure(全局回滚失败)
// RollbackDone(全局回滚成功)
private Code code;
// 触发回滚的业务原始异常
private Throwable originalException;
}
外层调用逻辑 try-catch 这个异常,根据异常编码进行处理:
- BeginFailure (开启事务失败):getCause() 得到开启事务失败的框架异常,getOriginalException() 为空。
- CommitFailure (全局提交失败):getCause() 得到全局提交失败的框架异常,getOriginalException() 为空。
- RollbackFailure (全局回滚失败):getCause() 得到全局回滚失败的框架异常,getOriginalException() 业务应用的原始异常。
- RollbackDone (全局回滚成功):getCause() 为空,getOriginalException() 业务应用的原始异常。
2、Low-Level API
(1)RootContext
事务的根上下文:负责在应用的运行时,维护 XID 。
/**
* 得到当前应用运行时的全局事务 XID
*/
public static String getXID() {
return CONTEXT_HOLDER.get(KEY_XID);
}
/**
* 将全局事务 XID 绑定到当前应用的运行时中
*/
public static void bind(String xid) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind " + xid);
}
CONTEXT_HOLDER.put(KEY_XID, xid);
}
/**
* 将全局事务 XID 从当前应用的运行时中解除绑定,同时将 XID 返回
*/
public static String unbind() {
String xid = CONTEXT_HOLDER.remove(KEY_XID);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind " + xid);
}
return xid;
}
/**
* 判断当前应用的运行时是否处于全局事务的上下文中
*/
public static boolean inGlobalTransaction() {
return CONTEXT_HOLDER.get(KEY_XID) != null;
}
High-Level API 的实现都是基于 RootContext 中维护的 XID 来做的。
应用的当前运行的操作是否在一个全局事务的上下文中,就是看 RootContext 中是否有 XID。
RootContext 的默认实现是基于 ThreadLocal 的,即 XID 保存在当前线程上下文中。
Low-Level API 的两个典型的应用场景:
远程调用事务上下文的传播
远程调用前获取当前 XID:
String xid = RootContext.getXID();
远程调用过程把 XID 也传递到服务提供方,在执行服务提供方的业务逻辑前,把 XID 绑定到当前应用的运行时:
RootContext.bind(rpcXid);
事务的暂停和恢复
在一个全局事务中,如果需要某些业务逻辑不在全局事务的管辖范围内,则在调用前,把 XID 解绑:
String unbindXid = RootContext.unbind();
待相关业务逻辑执行完成,再把 XID 绑定回去,即可实现全局事务的恢复:
RootContext.bind(unbindXid);
3、常用API
(1)获取xid
同一个分布式事务下,所有服务的 xid 一致
RootContext.getXID();
(2)手动回滚
Fegin调用使用了Fallback降级或抛出的异常被全局处理(比如全局异常处理GlobalExceptionHandler),seata会认为你已经手动处理了异常,此时就需要手动回滚。
GlobalTransactionContext.reload(RootContext.getXID()).rollback();