1-Intro
分布式事务
跟传统的事务中的 ACID 对比, 分布式事务没有 被普遍接受的 公认的 特别准确的定义, 下面是一个 个人理解.
-
全局的原子性: 这些 分布式的操作, 是指 事务中的组件, 例如数据库组件,缓存组件的操作要不 都成功,要不都失败,无论有没有网络故障.
- 这个好像没有明确时间要求, 失败的情况下,要求全部的马上回滚,还是容忍一定时间的 实现 最终的一致性
-
全局持久性, 提交后也是全部持久化的
选型
这个时候要看 技术的方案
XA: 标准规范,需要 各种中间件改配置,一般生产中不用AT: 通过 数据源代理,面向是的SQL语句 , 常用 ;TCC: 面向的是Rpc Service接口的改造, 扩展性强, 常用 ;事务型 MQ: 实现 全局的最终一致性, 常用- 生产者补偿
- 消费者幂等
2-Seata-AT
2-1 What
特点
- 侵入性小,原理上通过代理数据库,
DataSourceProxy来实现. 跟单机的Mysql事务原理一样, 锁和UNDO_LOG; - 有一定的局限性, 比如说一个新的 组件,
DataSource需要在各个语言层面实现一个对应的Proxy, 需要这个组件本身支持 ACID, 同时语言上优先Java; - 可以利用本地事务的一些 特性实现复杂的 隔离特性, 类似
ACID的各种级别 ;
QuickStart
@GlobalTransactional
public void purchase(String userId, String commodityCode, int count, int money) {
jdbcTemplateA.update("update stock_tbl set count = count - ? where commodity_code = ?", new Object[] {count, commodityCode});
jdbcTemplateB.update("update account_tbl set money = money - ? where user_id = ?", new Object[] {money, userId});
}2-2 How
需要在所有参与的 数据库组件中 创建一个
UNDO_LOG_TABLE
| Field | Type |
|---|---|
| branch_id | bigint PK |
| xid | varchar(100) |
| context | varchar(128) |
| rollback_info | longblob |
| log_status | tinyint |
| log_created | datetime |
| log_modified | datetime |
二阶段的演变
- 一阶段(Prepare): 业务表 和回滚日志表 同一个本地事务中, 包括锁和连接资源, 是 本地的
ACID - 二阶段(Commit Or Rollback):
- 提交: 异步
- 回滚: 同时第一阶段的日志反向补偿
写隔离: 分布式全局锁
-
在第一个阶段 也就是本地事务提交前,确保拿到全局锁, 否则不能提交本地事务
- 一定次数和时间范围内的重试
-
这个锁的粒度是 行锁, 可以参考 SEATA-AT 模式中的行锁
例子
我们使用上面的 修改用户余额来分析这个问题
update account_tbl set money = money - ? where user_id = ?第一个阶段: 准备阶段,在本地事务中
SQL解析, 得到WHERE条件, 反向生成查询SQL, 用来辅助生成UNDO_LOG.SELECT * FROM account_tbl WHERE user_id = ?*得到pks和beforeImage修改的前置内容- 执行
update account_tbl set money = money - ? where user_id = ? - 再次查询得到
afterImage得到修改的后置内容 - 把
beforeImage和afterImage插入到undoLog表中 - 在本地事务提交之前,根据
pks去像TC也就是seata-server叫做 事务协调组件 发送rpc申请注册需要的全局行锁pks - 申请成功了,把事务的结果上报给
tc
有如下的注意点:
- 这个时候获取了锁,但是没有释放
- 这个时候其实本地事务已经提交了. 也就是其他的 代码 可能能读到
- 读到是指 能读到本地事务提交的内容,但是这个内容没有全局的
commit - 可以代理这个读语句去判断锁 - 实现更高的隔离性?
- 读到是指 能读到本地事务提交的内容,但是这个内容没有全局的
- 锁保证的是被
GlobalTransaction注解的语句去走 写隔离, 因为这个时候其他的写 也需要去TC申请锁, 锁没有释放,其他的 本地事务就 不能提交
第二个阶段的-commit
TC根据所有 分支事务的结果,一个失败则认定全部失败,全部成功就 通知对应的RM全员事务已经全部成功RM收到了TC的 分支提交请求,会放到一个异步的队列中,马上返回成功给TC, 异步队列的消费者会 异步和批量的删除对应的UNDO LOG记录
第二个阶段的-rollback
- 同上,会收到
TC的 分支回滚请求 . 开启本地事务:- 根据
XID和BranchID找到对应的UNDO LOG记录 - 数据校验: 查询语句查询当前的数据 和
UNDO_LOG中的afterImage对比, 如果不同, 说明有哥们走了之外的逻辑调用了 update, 这个异常情况,走不同的策略, 比如忽略? - 校验通过后,根据
beforeImage来生成UPDATE语句
- 根据
可以看到:
- 第二个阶段是全部异步的过程,确实会很快, 服务端要释放锁了.
commit: 其实核心的业务都在第一个阶段做完了,这里可以马上就成功,核心是异步批量的清理回滚日志rollback: 核心是 补偿, 基于回滚日志的补偿.
3-Seata-TCC
3-1 What
比较经典的方案.
- 实现完全由 业务方控制, 比较自由,没有 数据库等等的限制 ;
- 由于需要业务方的改造, 有一定的工程量, 而且 由于网络三态的原理,所有的 实现要求实现幂等;
Tcc: Try→commit→cancel
Try: 检查,预留,以扣减用户余额 为例子, 就是 确保用户当前的钱够, 并且把钱冻结起来 确保后续 commit 能够成功扣减 ;Commit: 扣钱 ;Cancel: 取消冻结 ;
有三个角色
TM: 管理全局的事务, 包括开启全局的事务, 提交/回滚 全局的事务 ;RM: 管理分支的事务 ;TC: 管理全局实物和分支事务的状态 ;
下图来自 SEATA

用上面的例子. 创建订单的操作 (CreateOrderCommandHandler) 这个调用方 依赖三个服务提供方:
- 扣减库存服务:
ReduceStockRpcService - 用户余额扣减服务:
DecrAccountRpcService - 创建订单服务:
CreateOrderRpcService.
服务方作为 RM, 都要实现 try, commit, cancel 方法.
public class CreateOrderCommandHandler {
private final ReduceStockRpcService reduceStockRpcService;
private final DecrAccountRpcService decrAccountRpcService;
private final CreateOrderRpcService createOrderRpcService;
@GlobalTransactional
public Result createOrder (CreateOrderCommand command) {
reduceStockRpcService.reduceStock();
decrAccountRpcService.decrAccount();
createOrderRpcService.createOrder();
}
}注意下面是没有优化的流程: 是可以优化的, 新版本的 Seata 中优化了流程,后续有空再说.
GlobalTransactional触发TM开始全局事务, 在TC上注册全局的事务信息 ;- 调用
reduceStockRpcService的reduceStock方法,其本质是一个try, 冻结库存,- 触发了
try方法 - 成功之后
TM告诉TC这个try成功了,
- 触发了
- 依次触发其他的 远程服务 ;
TC知道了所有的服务try是否成功还是失败, 作为 协调器触发每个RM的commit或者cancel方法.
3-2 Usage
定义一个
Provider
public interface TccActionOne {
@TwoPhaseBusinessAction(name = "DubboTccActionOne", commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "a") String a);
public boolean commit(BusinessActionContext actionContext);
public boolean rollback(BusinessActionContext actionContext);
}-
这个服务提供方被称为
Tcc Resource,prepare 方法中指定:name: 要求全局唯一commit和rollback的方法名
-
可以使用
BusinessActionContext事务上下文 参数封装, 其中封装了:xid: 全局的事务idbranchId: 分支事务的idactionName: 分支的资源idactionContext业务的参数, 需要注解BusinessActionContextParameter
-
一个带额外的
demo如下:
@LocalTCC
public interface TccActionTwo {
@TwoPhaseBusinessAction(name = "TccActionTwo", commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "a") String a);
public boolean commit(BusinessActionContext actionContext);
public boolean rollback(BusinessActionContext actionContext);
}3-3 Tcc In Action
TCC 中有三个问题:
- 幂等
- 悬挂
- 空回滚
Seata 1.5 之后的版本提供了 一张表 tcc_fence_log , 它的修改要和本地业务在同一个事务中, 之前的版本要业务侧自己用类似的思路处理.
CREATE TABLE IF NOT EXISTS `tcc_fence_log`
(
`xid` VARCHAR(128) NOT NULL COMMENT 'global id',
`branch_id` BIGINT NOT NULL COMMENT 'branch id',
`action_name` VARCHAR(64) NOT NULL COMMENT 'action name',
`status` TINYINT NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)',
`gmt_create` DATETIME(3) NOT NULL COMMENT 'create time',
`gmt_modified` DATETIME(3) NOT NULL COMMENT 'update time',
PRIMARY KEY (`xid`, `branch_id`),
KEY `idx_gmt_modified` (`gmt_modified`),
KEY `idx_status` (`status`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;幂等问题
public static boolean commitFence(Method commitMethod, Object targetTCCBean,
String xid, Long branchId, Object[] args) {
return transactionTemplate.execute(status -> {
try {
Connection conn = DataSourceUtils.getConnection(dataSource);
TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId);
if (tccFenceDO == null) {
throw new TCCFenceException(String.format("TCC fence record not exists, commit fence method failed. xid= %s, branchId= %s", xid, branchId),
FrameworkErrorCode.RecordAlreadyExists);
}
if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) {
LOGGER.info("Branch transaction has already committed before. idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
return true;
}
if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus()).
}
return false;
}
return updateStatusAndInvokeTargetMethod(conn, commitMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_COMMITTED, status, args);
} catch (Throwable t) {
status.setRollbackOnly();
throw new SkipCallbackWrapperException(t);
}
});
}status: 状态机xid+branch_id组成了唯一id- 本地事务
这三者 很容易实现幂等.
空回滚问题
Try 阶段, 可能会部分失败导致 Cancel, 这个时候会有没有走 Try 直接走 Cancel 的. 这个是可以预测的异常. 这个时候 应该允许,返回成功,但是什么都不做.
新版本中, 会和上面一样用状态机,来知道是否走到了 STATUS_TRIED . 来决定是否要真的回滚.
//TCCFenceHandler 类
public static boolean rollbackFence(Method rollbackMethod, Object targetTCCBean,
String xid, Long branchId, Object[] args, String actionName) {
return transactionTemplate.execute(status -> {
try {
Connection conn = DataSourceUtils.getConnection(dataSource);
TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId);
// non_rollback
if (tccFenceDO == null) {
//不执行回滚逻辑
return true;
} else {
if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) {
LOGGER.info("Branch transaction had already rollbacked before, idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
return true;
}
if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
}
return false;
}
}
return updateStatusAndInvokeTargetMethod(conn, rollbackMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_ROLLBACKED, status, args);
} catch (Throwable t) {
status.setRollbackOnly();
throw new SkipCallbackWrapperException(t);
}
});
}其中. updateStatus 的 sql 语句.
update tcc_fence_log set status = ?, gmt_modified = ?
where xid = ? and branch_id = ? and status = ? ;把 status 从 STATUS_TRIED 改为 STATUS_ROLLBACKED . 如果改变成功,再在同一个事务中执行 回滚逻辑.
悬挂: 本质问题是乱序和超时
网络的可能性导致 收到 Try 比 Cancel 晚到.
Cancel先到, 这个时候 是 空回滚Try晚到, 会锁定资源,由于 它的Cancel永远不会来了,所以会一直锁定资源, 也就是空悬挂问题
解决方案还是状态机,不过可能有幻读的问题. 最好用 SELECT FOR UPDATE .
首先. 第一个阶段 Cancel 的时候,由于这个时候没有 Try, 所以没有任何记录代表了 Xid 和 BranchId . 所以在 Cancel 的时候要处理这个问题, 我们这个时候插入一条记录代表这种特殊的状态: STATUS_SUSPENDED, 不用执行回滚操作 .
然后,第二个阶段收到了 Try 请求,会发现之前有一条 STATUS_SUSPENDED 的数据,我们尝试插入失败,会略掉这个 Try
Tips
这些问题 是由于
Try阶段部分失败,导致走了 Cancel 的问题导致,Commit一定是所有的Try之后再走Commit的保证.
4-Transactional MQ
有一种说法是 分布式事务追求的是柔性事务, BASE, 而 事务性 MQ + 幂等 也是做到 BASE 的策略.
-
基于一个数据库引擎(可以是
NOSQL或者RDB) 作为MQ的引擎. 使用支持ACID的RDB会简单不少. -
原始的业务表基于
BINLOG作为MQ作为生产者.BINLOG大致如下的内容:BINLOG_FILE_NO,BINLOG_FILE_OFFSET,BINLOG_XID,BINLOG_XOFFSET,BINLOG_COMMIT可以作为一个 有序而且唯一 的ID- 事件类型:
UPDATE,INSERT,DELETE AfterImage: 修改后的值Delta: 这次修改的字段 和 修改前的值, 因此可以很容易 推出前面的Value
-
这个生产者不会像
TCC基于RPC导致乱序的空悬挂, 数据也是可靠的, 吞吐量是可以的,因为全 异步化过程 -
消费者可以类似上面幂等思路,提供一个
SDK解决幂等问题, 对于上面的每个BINLOG_ID仅仅消费一次. 成功了再继续滚动.
Tips
基于 MQ 的方法由于是全异步,所以会有最大的吞吐量,但是也要考虑到 异步编程的范式不适合复杂的业务,同步的编程更直观,逻辑更顺一些,而且异步的消费 也要考虑死信队列,尤其是事务消息出了
BUG导致 消费者无法继续前进的问题,建议使用死信队列 + 人工处理 .
Tips
这里说一种
MongoDB实现幂等方案,MongoDB 4.x以上也支持 一致性的跨文档事务,所以类似即可,还有一种办法适合特殊的场景,就是 文档数据库直接嵌入字段,我们直接把Doc嵌入到原始表中, 再使用 单文档原子性事务 也是一种技巧.
5-Sega
TODO
Refer
- SEATA
- SEATA-AT 模式中的行锁
- 详解 AT 模式中的事务隔离和全局锁
- hmily: 一个纯
TCC的实现 - tcc 的理论设计
- tcc 的常见问题解决