分布式事务实现 —— Seata

2023/2/16

官方示例 (opens new window)

# 快速开始

# 业务场景

# 用例

用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:

  • 仓储服务:对给定的商品扣除仓储数量
  • 订单服务:根据采购需求创建订单
  • 帐户服务:从用户帐户中扣除余额

# 架构图

image.png

# 仓储服务

public interface StorageService {

    /**
     * 扣除存储数量
     */
    void deduct(String commodityCode, int count);
}

# 订单服务

public interface OrderService {
    /**
     * 创建订单
     */
    Order create(String userId, String commodityCode, int orderCount);
}

# 帐户服务

public interface AccountService {
    /**
     * 从用户账户中借出
     */
    void debit(String userId, int money);
}

# 主要业务逻辑

public class BusinessServiceImpl implements BusinessService {
    private StorageService storageService;
    private OrderService orderService;
    /**
     * 用户购买
     */
    public void purchase(String userId, String commodityCode, int orderCount) {
        storageService.deduct(commodityCode, orderCount);
        orderService.create(userId, commodityCode, orderCount);
    }
}
public class OrderServiceImpl implements OrderService {
    private OrderDAO orderDAO;
    private AccountService accountService;
    public Order create(String userId, String commodityCode, int orderCount) {
        int orderMoney = calculate(commodityCode, orderCount);
        accountService.debit(userId, orderMoney);
        Order order = new Order();
        order.userId = userId;
        order.commodityCode = commodityCode;
        order.count = orderCount;
        order.money = orderMoney;
        // INSERT INTO orders ...
        return orderDAO.insert(order);
    }
}

# SEATA 的分布式交易解决方案

image.png 我们只需要使用一个 @GlobalTransactional 注解在业务方法上:

@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
    ......
}

# 具体示例

# 客户端

# 建立数据库

要求:具有InnoDB引擎的MySQL。 注意: 实际上,在示例用例中,这3个服务应该有3个数据库。 但是,为了简单起见,我们只能创建一个数据库

  1. dubbo-account-service.xml
  2. dubbo-order-service.xml
  3. dubbo-storage-service.xml
<property name="url" value="jdbc:mysql://x.x.x.x:3306/xxx" />
<property name="username" value="xxx" />
<property name="password" value="xxx" />

# 创建 UNDO_LOG 表

SEATA AT 模式需要 UNDO_LOG 表

-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

# 为示例业务创建表

DROP TABLE IF EXISTS `storage_tbl`;
CREATE TABLE `storage_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `commodity_code` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT 0,
  PRIMARY KEY (`id`),
  UNIQUE KEY (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(255) DEFAULT NULL,
  `commodity_code` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT 0,
  `money` int(11) DEFAULT 0,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

DROP TABLE IF EXISTS `account_tbl`;
CREATE TABLE `account_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(255) DEFAULT NULL,
  `money` int(11) DEFAULT 0,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

# 服务端

https://github.com/seata/seata/releases (opens new window),下载服务器软件包,将其解压缩

# 服务端配置

修改配置文件:seata/conf/file.conf,mode改成“db”模式:

store {
  #store修改点5个  其他不要动
  ## store mode: file、db
  #db储存   修改单1
  mode = "db"
  
  ## file store
  /**file {
    dir = "sessionStore"
    max-branch-session-size = 16384
    # globe session size , if exceeded throws exceptions
    max-global-session-size = 512
    # file buffer size , if exceeded allocate new buffer
    file-write-buffer-cache-size = 16384
    # when recover batch read size
    session.reload.read_size = 100
    # async, sync
    flush-disk-mode = async
  }**/
  
  ## database store
  db {
        #修改点2
        datasource = "druid"
        ## mysql/oracle/h2/oceanbase etc.
        db-type = "mysql"
        driver-class-name = "com.mysql.jdbc.Driver"
        #修改点3   上面创建数据库的路径地址
        url = "jdbc:mysql://127.0.0.1:3306/seata"   
        #修改单4
        user = "root"
        #修改点5
        password = "123456"
        min-conn = 1
        max-conn = 3
        global.table = "global_table"
        branch.table = "branch_table"
        lock-table = "lock_table"
        query-limit = 100
  }
}

使用db模式存储事务日志,所以,我们要创建三张表:global_table,branch_table,lock_table

CREATE TABLE `global_table` (
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint(20) DEFAULT NULL,
  `status` tinyint(4) NOT NULL,
  `application_id` varchar(64) DEFAULT NULL,
  `transaction_service_group` varchar(64) DEFAULT NULL,
  `transaction_name` varchar(64) DEFAULT NULL,
  `timeout` int(11) DEFAULT NULL,
  `begin_time` bigint(20) DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`xid`),
  KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
  KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `branch_table` (
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint(20) DEFAULT NULL,
  `resource_group_id` varchar(32) DEFAULT NULL,
  `resource_id` varchar(256) DEFAULT NULL,
  `lock_key` varchar(128) DEFAULT NULL,
  `branch_type` varchar(8) DEFAULT NULL,
  `status` tinyint(4) DEFAULT NULL,
  `client_id` varchar(64) DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`branch_id`),
  KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

create table `lock_table` (
  `row_key` varchar(128) not null,
  `xid` varchar(96),
  `transaction_id` long ,
  `branch_id` long,
  `resource_id` varchar(256) ,
  `table_name` varchar(32) ,
  `pk` varchar(32) ,
  `gmt_create` datetime ,
  `gmt_modified` datetime,
  primary key(`row_key`)
);

# 启动Seata服务端

seata-server.sh

# 配置注册中心与配置中心

# 注册中心配置
registry {
	  # file nacos eureka redis zk consul etcd3 sofa
  	type = "nacos"
  
  	nacos {
			application = "seata-server"
  		serverAddr = "localhost"
  		namespace = ""
  		cluster = "default"
  		username = ""
  		password = ""
		}
}

# 其他配置
config {
		type = "nacos"
  	
  	nacos {
			serverAddr = "localhost"
  		namespace = ""
  		group = "SEATA_GROUP"
  		username = ""
  		password = ""
		}
}

注册中心支持:file、nacos、eureka、redis、zk等等 其中nacos也可以用做配置中心,当注册中心使用nacos时,可以把file.conf的配置放到nacos中,就不需要file.conf了 然后将其他全部的配置也导入nacos,例如:

store.db.datasource=druid
store.db.db-type=mysql
store.db.driver-class-name=com.mysql.jdbc.Driver
...

# AT 模式

@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
    // 库存扣减
    storageService.deduct(commodityCode, orderCount);
    // 金额扣减
    ...
    // 生成订单
    orderService.create(userId, commodityCode, orderCount);
}

# 服务端3张表的作用

  • global_table:当执行到有@GlobalTransactional注解的方法时,会向global_table插入一条数据
  • branch_table:库存扣减、金额扣减、生成订单这些参与者,会分别插入一条记录到branch_table
  • lock_table:如果id为1的数据正在参与分布式事务,还没有结束,会存在该表中;别的事务就不能操作这一条数据

# AT和TCC的区别

都是基于两阶段提交,AT模式

  • 一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录
  • 二阶段 commit 行为:马上成功结束,自动异步批量清理回滚日志
  • 二阶段 rollback 行为:通过回滚日志,自动生成补偿操作,完成数据回滚

相应的,TCC 模式,不依赖于底层数据资源的事务支持:

  • 一阶段 prepare 行为:调用自定义的prepare逻辑

  • 二阶段 commit 行为:调用自定义的commit逻辑

  • 二阶段 rollback 行为:调用自定义的rollback逻辑

    AT TCC
    全局锁 需要 不需要
    回滚日志 需要 不需要
    commit/cancel阶段代码实现 不需要 需要
    是否需要开发者解决悬挂和空回滚问题 不需要 需要
    性能 低(需要全局锁导致) 高(无锁)

# 空回滚与事务悬挂

空回滚:pre阶段超时,此时rollback

稿定设计-1.jpg

事务悬挂:pre延时,rollback执行后,pre到达,然后执行了pre

稿定设计-2.jpg

# 第一阶段流程

# 第二阶段流程

# 提交流程

向seata服务器发送全局提交请求之后,如果发现有一个TCC类型分支事务,同步提交; branchTable表里用branch_type字段区分不同类型的事务

# 回滚流程

回滚流程没有异步

# 前置镜像和后置镜像的使用

  • 修改前镜像等于修改后镜像,说明数据没有变更,无须回滚
  • 当前镜像等于修改后镜像,则说明当前数据需要进行回滚
  • 当前镜像不等于修改后镜像且当前镜像等于修改前镜像,说明数据无变更,无须回滚
  • 当前镜像不等于修改后镜像,且不等于修改前镜像,脏数据,这种情况要看用户配置要不要回滚
前置镜像 后置镜像 反向sql
insert delete
delete insert
update update

# UndoLog

# 角色

  • TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚
  • TM (Transaction Manager) - 事务管理器:定义全局事务的范围:开始全局事务、提交或回滚全局事务
  • RM (Resource Manager) - 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚

# 事务隔离问题

# 写隔离

  • 一阶段本地事务提交前,需要确保先拿到全局锁(@GlobalLock)
  • 拿不到全局锁,不能提交本地事务
  • 拿全局锁的尝试被限制在一定范围内(例如次数),超出范围将放弃,并回滚本地事务,释放本地锁

@GlobalTransactional 标注的方法执行时,会先判断全局锁lock_table中是否存在,如果存在则失败,如果成功会把要修改数据记录到lock_table中,相当于这个数据的全局锁

比如商品增加库存,它不是一个分布式事务,不需要一个全局的事务,就可以在方法上加上@GlobalLock注解,那么在方法执行时就会判断数据是否在lock_table中被锁着

# 读隔离

Seata(AT 模式)的默认全局隔离级别是读未提交(Read Uncommitted):第一阶段分支事务提交的数据别人都可以看到 如果应用在特定场景下,必需要求全局的读已提交,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理:SELECT FOR UPDATE语句的执行会申请全局锁,如果全局锁被其他事务持有,查询是被 block 住的,直到全局锁拿到

出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句

实现读已提交的方式:使用 SELECT FOR UPDATE 语句的同时,方法上要加上 @GlobalLock 注解

# XA 模式

# 传统 XA

XA 是数据库厂商实现的两阶段提交的一个强一致的协议(也就是对 2PC 规范的落地);换句话说,XA模式也就是基于事务资源(数据库)本身提供的XA规范和协议的支持,它的核心价值:

  • 从场景上看,满足全局一致性的需求
  • 从应用上看,保持与AT模式一样的业务无侵入
  • 基于数据库,对语言无要求
    问题 解释 表现
    数据锁定 数据在整个事务处理过程结束前,都被锁定,读写都按隔离级别的定义约束起来(不会有AT的读隔离问题) 增加单个事务 RT(RequestTime请求耗时) 并发事务数据的锁冲突概率高
    协议阻塞 XA prepare 后,分支事务进入阻塞阶段,收到 XA commit 或 XA rollback 前必须阻塞等待 数据库连接长时间占用,可能有些情况会导致数据库连接无法释放

# Seata XA 模式

该模式的前提是:需要支持 XA 事务的数据库 它分成以下两个阶段

  • 执行阶段(Execute):XA start/XA end/XA prepare + 注册分支事务
  • 完成阶段(Finish):XA commit/XA rollback

处理流程就非常简单了:

  • TM 向 TC 开启全局事务
  • 每个分支事务注册到 TC
  • 各个 RM 中的分支事务自己去处理(执行SQL),向 TC 上报事务状态
  • 最后 TC 再统一调用 commit 或 rollback 就行了

# TCC 模式

主事务和AT模式一样,加@GlobalTransactional注解 分支事务要加@TwoPhaseBusinessAction,注解被拦截到之后,注册分支事务


public interface TccAction {
    @TwoPhaseBusinessAction(name = "yourTccActionName", 
                            commitMethod = "confirm", 
                            rollbackMethod = "cancel")
    public boolean try(BusinessActionContext businessActionContext, int a, int b);
    public boolean confirm(BusinessActionContext businessActionContext);
    public boolean cancel(BusinessActionContext businessActionContext);

# SAGA [ˈsɑːɡə] 模式

官方例子 (opens new window)