1-Intro

分布式事务

跟传统的事务中的 ACID 对比, 分布式事务没有 被普遍接受的 公认的 特别准确的定义, 下面是一个 个人理解.

  • 全局的原子性: 这些 分布式的操作, 是指 事务中的组件, 例如数据库组件,缓存组件的操作要不 都成功,要不都失败,无论有没有网络故障.

    • 这个好像没有明确时间要求, 失败的情况下,要求全部的马上回滚,还是容忍一定时间的 实现 最终的一致性
  • 全局持久性, 提交后也是全部持久化的

选型

这个时候要看 技术的方案

  • XA: 标准规范,需要 各种中间件改配置,一般生产中不用
  • AT: 通过 数据源代理,面向是的 SQL 语句 , 常用 ;
  • TCC: 面向的是 Rpc Service 接口的改造, 扩展性强, 常用 ;
  • 事务型 MQ: 实现 全局的最终一致性, 常用
    • 生产者补偿
    • 消费者幂等

2-Seata-AT

2-1 What

特点

  1. 侵入性小,原理上通过代理数据库, DataSourceProxy 来实现. 跟单机的 Mysql 事务原理一样, 锁和 UNDO_LOG ;
  2. 有一定的局限性, 比如说一个新的 组件, DataSource 需要在各个语言层面实现一个对应的 Proxy , 需要这个组件本身支持 ACID, 同时语言上优先 Java ;
  3. 可以利用本地事务的一些 特性实现复杂的 隔离特性, 类似 ACID 的各种级别 ;

QuickStart

@GlobalTransactional
public void purchase(String userId, String commodityCode, int count, int money) {
    jdbcTemplateA.update("update stock_tbl set count = count - ? where commodity_code = ?", new Object[] {count, commodityCode});
    jdbcTemplateB.update("update account_tbl set money = money - ? where user_id = ?", new Object[] {money, userId});
}

2-2 How

需要在所有参与的 数据库组件中 创建一个 UNDO_LOG_TABLE

FieldType
branch_idbigint PK
xidvarchar(100)
contextvarchar(128)
rollback_infolongblob
log_statustinyint
log_createddatetime
log_modifieddatetime

二阶段的演变

  • 一阶段(Prepare): 业务表 和回滚日志表 同一个本地事务中, 包括锁和连接资源, 是 本地的 ACID
  • 二阶段(Commit Or Rollback):
    • 提交: 异步
    • 回滚: 同时第一阶段的日志反向补偿

写隔离: 分布式全局锁

  1. 在第一个阶段 也就是本地事务提交前,确保拿到全局锁, 否则不能提交本地事务

    1. 一定次数和时间范围内的重试
  2. 这个锁的粒度是 行锁, 可以参考 SEATA-AT 模式中的行锁

例子

我们使用上面的 修改用户余额来分析这个问题

update account_tbl set money = money - ? where user_id = ?

第一个阶段: 准备阶段,在本地事务中

  • SQL 解析, 得到 WHERE 条件, 反向生成查询 SQL, 用来辅助生成 UNDO_LOG.
  • SELECT * FROM account_tbl WHERE user_id = ?* 得到 pksbeforeImage 修改的前置内容
  • 执行 update account_tbl set money = money - ? where user_id = ?
  • 再次查询得到 afterImage 得到修改的后置内容
  • beforeImageafterImage 插入到 undoLog 表中
  • 在本地事务提交之前,根据 pks 去像 TC 也就是 seata-server 叫做 事务协调组件 发送 rpc 申请注册需要的全局行锁 pks
  • 申请成功了,把事务的结果上报给 tc

有如下的注意点:

  • 这个时候获取了锁,但是没有释放
  • 这个时候其实本地事务已经提交了. 也就是其他的 代码 可能能读到
    • 读到是指 能读到本地事务提交的内容,但是这个内容没有全局的 commit
    • 可以代理这个读语句去判断锁 - 实现更高的隔离性?
  • 锁保证的是被 GlobalTransaction 注解的语句去走 写隔离, 因为这个时候其他的写 也需要去 TC 申请锁, 锁没有释放,其他的 本地事务就 不能提交

第二个阶段的-commit

  • TC 根据所有 分支事务的结果,一个失败则认定全部失败,全部成功就 通知对应的 RM 全员事务已经全部成功
  • RM 收到了 TC分支提交请求,会放到一个异步的队列中,马上返回成功给 TC, 异步队列的消费者会 异步和批量的删除对应的 UNDO LOG 记录

第二个阶段的-rollback

  • 同上,会收到 TC分支回滚请求 . 开启本地事务:
    • 根据 XIDBranchID 找到对应的 UNDO LOG 记录
    • 数据校验: 查询语句查询当前的数据 和 UNDO_LOG 中的 afterImage 对比, 如果不同, 说明有哥们走了之外的逻辑调用了 update, 这个异常情况,走不同的策略, 比如忽略?
    • 校验通过后,根据 beforeImage 来生成 UPDATE 语句

可以看到:

  • 第二个阶段是全部异步的过程,确实会很快, 服务端要释放锁了.
  • commit: 其实核心的业务都在第一个阶段做完了,这里可以马上就成功,核心是异步批量的清理回滚日志
  • rollback: 核心是 补偿, 基于回滚日志的补偿.

3-Seata-TCC

3-1 What

比较经典的方案.

  • 实现完全由 业务方控制, 比较自由,没有 数据库等等的限制 ;
  • 由于需要业务方的改造, 有一定的工程量, 而且 由于网络三态的原理,所有的 实现要求实现幂等;

Tcc: Trycommitcancel

  • Try: 检查,预留,以扣减用户余额 为例子, 就是 确保用户当前的钱够, 并且把钱冻结起来 确保后续 commit 能够成功扣减 ;
  • Commit: 扣钱 ;
  • Cancel: 取消冻结 ;

有三个角色

  • TM: 管理全局的事务, 包括开启全局的事务, 提交/回滚 全局的事务 ;
  • RM: 管理分支的事务 ;
  • TC: 管理全局实物和分支事务的状态 ;

下图来自 SEATA

用上面的例子. 创建订单的操作 (CreateOrderCommandHandler) 这个调用方 依赖三个服务提供方:

  1. 扣减库存服务: ReduceStockRpcService
  2. 用户余额扣减服务: DecrAccountRpcService
  3. 创建订单服务: CreateOrderRpcService.

服务方作为 RM, 都要实现 try, commit, cancel 方法.

public class CreateOrderCommandHandler {
 
    private final ReduceStockRpcService reduceStockRpcService;
    private final DecrAccountRpcService decrAccountRpcService;
    private final CreateOrderRpcService createOrderRpcService;
 
    @GlobalTransactional
	public Result createOrder (CreateOrderCommand command) {
        reduceStockRpcService.reduceStock();
        decrAccountRpcService.decrAccount();
        createOrderRpcService.createOrder();
    }
}

注意下面是没有优化的流程: 是可以优化的, 新版本的 Seata 中优化了流程,后续有空再说.

  1. GlobalTransactional 触发 TM 开始全局事务, 在 TC 上注册全局的事务信息 ;
  2. 调用 reduceStockRpcServicereduceStock 方法,其本质是一个 try, 冻结库存,
    • 触发了 try 方法
    • 成功之后 TM 告诉 TC 这个 try 成功了,
  3. 依次触发其他的 远程服务 ;
  4. TC 知道了所有的服务 try 是否成功还是失败, 作为 协调器触发每个 RMcommit 或者 cancel 方法.

3-2 Usage

定义一个 Provider

public interface TccActionOne {
    @TwoPhaseBusinessAction(name = "DubboTccActionOne", commitMethod = "commit", rollbackMethod = "rollback")
    public boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "a") String a);
    public boolean commit(BusinessActionContext actionContext);
    public boolean rollback(BusinessActionContext actionContext);
}
  • 这个服务提供方被称为 Tcc Resource , prepare 方法中指定:

    • name: 要求全局唯一
    • commitrollback 的方法名
  • 可以使用 BusinessActionContext 事务上下文 参数封装, 其中封装了:

    • xid: 全局的事务 id
    • branchId: 分支事务的 id
    • actionName: 分支的资源 id
    • actionContext 业务的参数, 需要注解 BusinessActionContextParameter
  • 一个带额外的 demo 如下:

@LocalTCC
public interface TccActionTwo {
    @TwoPhaseBusinessAction(name = "TccActionTwo", commitMethod = "commit", rollbackMethod = "rollback")
    public boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "a") String a);
    public boolean commit(BusinessActionContext actionContext);
    public boolean rollback(BusinessActionContext actionContext);
}

3-3 Tcc In Action

TCC 中有三个问题:

  • 幂等
  • 悬挂
  • 空回滚

Seata 1.5 之后的版本提供了 一张表 tcc_fence_log , 它的修改要和本地业务在同一个事务中, 之前的版本要业务侧自己用类似的思路处理.

CREATE TABLE IF NOT EXISTS `tcc_fence_log`
(
    `xid`           VARCHAR(128)  NOT NULL COMMENT 'global id',
    `branch_id`     BIGINT        NOT NULL COMMENT 'branch id',
    `action_name`   VARCHAR(64)   NOT NULL COMMENT 'action name',
    `status`        TINYINT       NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)',
    `gmt_create`    DATETIME(3)   NOT NULL COMMENT 'create time',
    `gmt_modified`  DATETIME(3)   NOT NULL COMMENT 'update time',
    PRIMARY KEY (`xid`, `branch_id`),
    KEY `idx_gmt_modified` (`gmt_modified`),
    KEY `idx_status` (`status`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

幂等问题

public static boolean commitFence(Method commitMethod, Object targetTCCBean,
                                  String xid, Long branchId, Object[] args) {
    return transactionTemplate.execute(status -> {
        try {
            Connection conn = DataSourceUtils.getConnection(dataSource);
            TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId);
            if (tccFenceDO == null) {
                throw new TCCFenceException(String.format("TCC fence record not exists, commit fence method failed. xid= %s, branchId= %s", xid, branchId),
                        FrameworkErrorCode.RecordAlreadyExists);
            }
            if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) {
                LOGGER.info("Branch transaction has already committed before. idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
                return true;
            }
            if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) {
                if (LOGGER.isWarnEnabled()) {
                    LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus()). 
                }
                return false;
            }
            return updateStatusAndInvokeTargetMethod(conn, commitMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_COMMITTED, status, args);
        } catch (Throwable t) {
            status.setRollbackOnly();
            throw new SkipCallbackWrapperException(t);
        }
    });
}
  • status: 状态机
  • xid + branch_id 组成了唯一 id
  • 本地事务

这三者 很容易实现幂等.

空回滚问题

Try 阶段, 可能会部分失败导致 Cancel, 这个时候会有没有走 Try 直接走 Cancel 的. 这个是可以预测的异常. 这个时候 应该允许,返回成功,但是什么都不做.

新版本中, 会和上面一样用状态机,来知道是否走到了 STATUS_TRIED . 来决定是否要真的回滚.

//TCCFenceHandler 类
public static boolean rollbackFence(Method rollbackMethod, Object targetTCCBean,
                                    String xid, Long branchId, Object[] args, String actionName) {
    return transactionTemplate.execute(status -> {
        try {
            Connection conn = DataSourceUtils.getConnection(dataSource);
            TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(conn, xid, branchId);
            // non_rollback
            if (tccFenceDO == null) {
                //不执行回滚逻辑
                return true;
            } else {
                if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) {
                    LOGGER.info("Branch transaction had already rollbacked before, idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
                    return true;
                }
                if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) {
                    if (LOGGER.isWarnEnabled()) {
                        LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus());
                    }
                    return false;
                }
            }
            return updateStatusAndInvokeTargetMethod(conn, rollbackMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_ROLLBACKED, status, args);
        } catch (Throwable t) {
            status.setRollbackOnly();
            throw new SkipCallbackWrapperException(t);
        }
    });
}

其中. updateStatussql 语句.

update tcc_fence_log set status = ?, gmt_modified = ?  
where xid = ? and branch_id = ? and status = ? ;

statusSTATUS_TRIED 改为 STATUS_ROLLBACKED . 如果改变成功,再在同一个事务中执行 回滚逻辑.

悬挂: 本质问题是乱序和超时

网络的可能性导致 收到 TryCancel 晚到.

  1. Cancel 先到, 这个时候 是 空回滚
  2. Try 晚到, 会锁定资源,由于 它的 Cancel 永远不会来了,所以会一直锁定资源, 也就是空悬挂问题

解决方案还是状态机,不过可能有幻读的问题. 最好用 SELECT FOR UPDATE .

首先. 第一个阶段 Cancel 的时候,由于这个时候没有 Try, 所以没有任何记录代表了 XidBranchId . 所以在 Cancel 的时候要处理这个问题, 我们这个时候插入一条记录代表这种特殊的状态: STATUS_SUSPENDED, 不用执行回滚操作 .

然后,第二个阶段收到了 Try 请求,会发现之前有一条 STATUS_SUSPENDED 的数据,我们尝试插入失败,会略掉这个 Try

Tips

这些问题 是由于 Try 阶段部分失败,导致走了 Cancel 的问题导致,Commit 一定是所有的 Try 之后再走 Commit 的保证.

4-Transactional MQ

有一种说法是 分布式事务追求的是柔性事务, BASE, 而 事务性 MQ + 幂等 也是做到 BASE 的策略.

  1. 基于一个数据库引擎(可以是 NOSQL 或者 RDB) 作为 MQ 的引擎. 使用支持 ACIDRDB 会简单不少.

  2. 原始的业务表基于 BINLOG 作为 MQ 作为生产者. BINLOG 大致如下的内容:

    • BINLOG_FILE_NO, BINLOG_FILE_OFFSET, BINLOG_XID, BINLOG_XOFFSET, BINLOG_COMMIT 可以作为一个 有序而且唯一ID
    • 事件类型: UPDATE, INSERT , DELETE
    • AfterImage: 修改后的值
    • Delta : 这次修改的字段 和 修改前的值, 因此可以很容易 推出前面的 Value
  3. 这个生产者不会像 TCC 基于 RPC 导致乱序的空悬挂, 数据也是可靠的, 吞吐量是可以的,因为全 异步化过程

  4. 消费者可以类似上面幂等思路,提供一个 SDK 解决幂等问题, 对于上面的每个 BINLOG_ID 仅仅消费一次. 成功了再继续滚动.

Tips

基于 MQ 的方法由于是全异步,所以会有最大的吞吐量,但是也要考虑到 异步编程的范式不适合复杂的业务,同步的编程更直观,逻辑更顺一些,而且异步的消费 也要考虑死信队列,尤其是事务消息出了 BUG 导致 消费者无法继续前进的问题,建议使用死信队列 + 人工处理 .

Tips

这里说一种 MongoDB 实现幂等方案,MongoDB 4.x 以上也支持 一致性的跨文档事务,所以类似即可,还有一种办法适合特殊的场景,就是 文档数据库直接嵌入字段,我们直接把 Doc 嵌入到原始表中, 再使用 单文档原子性事务 也是一种技巧.

5-Sega

TODO

Refer