# 快速开始
# 业务场景
# 用例
用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:
- 仓储服务:对给定的商品扣除仓储数量
- 订单服务:根据采购需求创建订单
- 帐户服务:从用户帐户中扣除余额
# 架构图
# 仓储服务
public interface StorageService {
/**
* 扣除存储数量
*/
void deduct(String commodityCode, int count);
}
# 订单服务
public interface OrderService {
/**
* 创建订单
*/
Order create(String userId, String commodityCode, int orderCount);
}
# 帐户服务
public interface AccountService {
/**
* 从用户账户中借出
*/
void debit(String userId, int money);
}
# 主要业务逻辑
public class BusinessServiceImpl implements BusinessService {
private StorageService storageService;
private OrderService orderService;
/**
* 用户购买
*/
public void purchase(String userId, String commodityCode, int orderCount) {
storageService.deduct(commodityCode, orderCount);
orderService.create(userId, commodityCode, orderCount);
}
}
public class OrderServiceImpl implements OrderService {
private OrderDAO orderDAO;
private AccountService accountService;
public Order create(String userId, String commodityCode, int orderCount) {
int orderMoney = calculate(commodityCode, orderCount);
accountService.debit(userId, orderMoney);
Order order = new Order();
order.userId = userId;
order.commodityCode = commodityCode;
order.count = orderCount;
order.money = orderMoney;
// INSERT INTO orders ...
return orderDAO.insert(order);
}
}
# SEATA 的分布式交易解决方案
我们只需要使用一个 @GlobalTransactional 注解在业务方法上:
@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
......
}
# 具体示例
# 客户端
# 建立数据库
要求:具有InnoDB引擎的MySQL。 注意: 实际上,在示例用例中,这3个服务应该有3个数据库。 但是,为了简单起见,我们只能创建一个数据库
- dubbo-account-service.xml
- dubbo-order-service.xml
- dubbo-storage-service.xml
<property name="url" value="jdbc:mysql://x.x.x.x:3306/xxx" />
<property name="username" value="xxx" />
<property name="password" value="xxx" />
# 创建 UNDO_LOG 表
SEATA AT 模式需要 UNDO_LOG 表
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
# 为示例业务创建表
DROP TABLE IF EXISTS `storage_tbl`;
CREATE TABLE `storage_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
PRIMARY KEY (`id`),
UNIQUE KEY (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `account_tbl`;
CREATE TABLE `account_tbl` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
# 服务端
从 https://github.com/seata/seata/releases (opens new window),下载服务器软件包,将其解压缩
# 服务端配置
修改配置文件:seata/conf/file.conf,mode改成“db”模式:
store {
#store修改点5个 其他不要动
## store mode: file、db
#db储存 修改单1
mode = "db"
## file store
/**file {
dir = "sessionStore"
max-branch-session-size = 16384
# globe session size , if exceeded throws exceptions
max-global-session-size = 512
# file buffer size , if exceeded allocate new buffer
file-write-buffer-cache-size = 16384
# when recover batch read size
session.reload.read_size = 100
# async, sync
flush-disk-mode = async
}**/
## database store
db {
#修改点2
datasource = "druid"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
driver-class-name = "com.mysql.jdbc.Driver"
#修改点3 上面创建数据库的路径地址
url = "jdbc:mysql://127.0.0.1:3306/seata"
#修改单4
user = "root"
#修改点5
password = "123456"
min-conn = 1
max-conn = 3
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}
}
使用db模式存储事务日志,所以,我们要创建三张表:global_table,branch_table,lock_table
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(64) DEFAULT NULL,
`transaction_service_group` varchar(64) DEFAULT NULL,
`transaction_name` varchar(64) DEFAULT NULL,
`timeout` int(11) DEFAULT NULL,
`begin_time` bigint(20) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`lock_key` varchar(128) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
create table `lock_table` (
`row_key` varchar(128) not null,
`xid` varchar(96),
`transaction_id` long ,
`branch_id` long,
`resource_id` varchar(256) ,
`table_name` varchar(32) ,
`pk` varchar(32) ,
`gmt_create` datetime ,
`gmt_modified` datetime,
primary key(`row_key`)
);
# 启动Seata服务端
seata-server.sh
# 配置注册中心与配置中心
# 注册中心配置
registry {
# file nacos eureka redis zk consul etcd3 sofa
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "localhost"
namespace = ""
cluster = "default"
username = ""
password = ""
}
}
# 其他配置
config {
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
}
}
注册中心支持:file、nacos、eureka、redis、zk等等 其中nacos也可以用做配置中心,当注册中心使用nacos时,可以把file.conf的配置放到nacos中,就不需要file.conf了 然后将其他全部的配置也导入nacos,例如:
store.db.datasource=druid
store.db.db-type=mysql
store.db.driver-class-name=com.mysql.jdbc.Driver
...
# AT 模式
@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
// 库存扣减
storageService.deduct(commodityCode, orderCount);
// 金额扣减
...
// 生成订单
orderService.create(userId, commodityCode, orderCount);
}
# 服务端3张表的作用
- global_table:当执行到有@GlobalTransactional注解的方法时,会向global_table插入一条数据
- branch_table:库存扣减、金额扣减、生成订单这些参与者,会分别插入一条记录到branch_table
- lock_table:如果id为1的数据正在参与分布式事务,还没有结束,会存在该表中;别的事务就不能操作这一条数据
# AT和TCC的区别
都是基于两阶段提交,AT模式
- 一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录
- 二阶段 commit 行为:马上成功结束,自动异步批量清理回滚日志
- 二阶段 rollback 行为:通过回滚日志,自动生成补偿操作,完成数据回滚
相应的,TCC 模式,不依赖于底层数据资源的事务支持:
一阶段 prepare 行为:调用自定义的prepare逻辑
二阶段 commit 行为:调用自定义的commit逻辑
二阶段 rollback 行为:调用自定义的rollback逻辑
AT TCC 全局锁 需要 不需要 回滚日志 需要 不需要 commit/cancel阶段代码实现 不需要 需要 是否需要开发者解决悬挂和空回滚问题 不需要 需要 性能 低(需要全局锁导致) 高(无锁)
# 空回滚与事务悬挂
空回滚:pre阶段超时,此时rollback
事务悬挂:pre延时,rollback执行后,pre到达,然后执行了pre
# 第一阶段流程
# 第二阶段流程
# 提交流程
向seata服务器发送全局提交请求之后,如果发现有一个TCC类型分支事务,同步提交; branchTable表里用branch_type字段区分不同类型的事务
# 回滚流程
回滚流程没有异步
# 前置镜像和后置镜像的使用
- 修改前镜像等于修改后镜像,说明数据没有变更,无须回滚
- 当前镜像等于修改后镜像,则说明当前数据需要进行回滚
- 当前镜像不等于修改后镜像且当前镜像等于修改前镜像,说明数据无变更,无须回滚
- 当前镜像不等于修改后镜像,且不等于修改前镜像,脏数据,这种情况要看用户配置要不要回滚
前置镜像 | 后置镜像 | 反向sql | |
---|---|---|---|
insert | 无 | 有 | delete |
delete | 有 | 无 | insert |
update | 有 | 有 | update |
# UndoLog
# 角色
- TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚
- TM (Transaction Manager) - 事务管理器:定义全局事务的范围:开始全局事务、提交或回滚全局事务
- RM (Resource Manager) - 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚
# 事务隔离问题
# 写隔离
- 一阶段本地事务提交前,需要确保先拿到全局锁(@GlobalLock)
- 拿不到全局锁,不能提交本地事务
- 拿全局锁的尝试被限制在一定范围内(例如次数),超出范围将放弃,并回滚本地事务,释放本地锁
@GlobalTransactional 标注的方法执行时,会先判断全局锁lock_table中是否存在,如果存在则失败,如果成功会把要修改数据记录到lock_table中,相当于这个数据的全局锁
比如商品增加库存,它不是一个分布式事务,不需要一个全局的事务,就可以在方法上加上@GlobalLock注解,那么在方法执行时就会判断数据是否在lock_table中被锁着
# 读隔离
Seata(AT 模式)的默认全局隔离级别是读未提交(Read Uncommitted):第一阶段分支事务提交的数据别人都可以看到 如果应用在特定场景下,必需要求全局的读已提交,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理:SELECT FOR UPDATE语句的执行会申请全局锁,如果全局锁被其他事务持有,查询是被 block 住的,直到全局锁拿到
出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句
实现读已提交的方式:使用 SELECT FOR UPDATE 语句的同时,方法上要加上 @GlobalLock 注解
# XA 模式
# 传统 XA
XA 是数据库厂商实现的两阶段提交的一个强一致的协议(也就是对 2PC 规范的落地);换句话说,XA模式也就是基于事务资源(数据库)本身提供的XA规范和协议的支持,它的核心价值:
- 从场景上看,满足全局一致性的需求
- 从应用上看,保持与AT模式一样的业务无侵入
- 基于数据库,对语言无要求
问题 解释 表现 数据锁定 数据在整个事务处理过程结束前,都被锁定,读写都按隔离级别的定义约束起来(不会有AT的读隔离问题) 增加单个事务 RT(RequestTime请求耗时) 并发事务数据的锁冲突概率高 协议阻塞 XA prepare 后,分支事务进入阻塞阶段,收到 XA commit 或 XA rollback 前必须阻塞等待 数据库连接长时间占用,可能有些情况会导致数据库连接无法释放
# Seata XA 模式
该模式的前提是:需要支持 XA 事务的数据库
它分成以下两个阶段
- 执行阶段(Execute):XA start/XA end/XA prepare + 注册分支事务
- 完成阶段(Finish):XA commit/XA rollback
处理流程就非常简单了:
- TM 向 TC 开启全局事务
- 每个分支事务注册到 TC
- 各个 RM 中的分支事务自己去处理(执行SQL),向 TC 上报事务状态
- 最后 TC 再统一调用 commit 或 rollback 就行了
# TCC 模式
主事务和AT模式一样,加@GlobalTransactional注解 分支事务要加@TwoPhaseBusinessAction,注解被拦截到之后,注册分支事务
public interface TccAction {
@TwoPhaseBusinessAction(name = "yourTccActionName",
commitMethod = "confirm",
rollbackMethod = "cancel")
public boolean try(BusinessActionContext businessActionContext, int a, int b);
public boolean confirm(BusinessActionContext businessActionContext);
public boolean cancel(BusinessActionContext businessActionContext);