目录
配置seata
搭建工程
运行结果
原理简介
总结
seata是阿里推出的分布式事务解决方案,今天我来分享一下springboot多数据源怎么整合seata解决分布式事务的问题。它有3中模式:AT模式,TCC模式和Saga模式,本文主要介绍一下AT模式。
首先说一下本文使用的实验环境
springboot:2.1.6.RELEASE
orm框架:mybatis
数据库:mysql
数据库连接池:HikariCP
seata server:1.3.0
配置seata 【spring-boot|springboot研究十(springboot多数据源整合分布式事务中间件seata)】首先下载seata server安装包,本文使用版本是1.3.0,下载地址如下:
https://github.com/seata/seata/releases
下载完成后,解压后在seata目录下建一个目录logs,在里面建一个文件seata_gc.log,如果不创建这个log文件,启动会报找不到文件的错误。
启动server,我本地使用windows环境的启动命令如下:
seata-server.bat -p 8091 -h 127.0.0.1 -m file
关于启动命令的说明,我摘自官网(http://seata.io/en-us/docs/user/quickstart.html)
Usage: sh seata-server.sh(for linux and mac) or cmd seata-server.bat(for windows) [options]
Options:
--host, -h
The host to bind.
Default: 0.0.0.0
--port, -p
The port to listen.
Default: 8091
--storeMode, -m
log store mode : file、db
Default: file
--helpe.g.sh seata-server.sh -p 8091 -h 127.0.0.1 -m file
下面是启动成功后的日志:
00:21:10,892 |-INFO in ch.qos.logback.core.rolling.RollingFileAppender[ERROR] - Active log file name: /root/logs/seata/txc.8091.error.log
00:21:10,892 |-INFO in ch.qos.logback.core.rolling.RollingFileAppender[ERROR] - File property is set to [/root/logs/seata/txc.8091.error.log]
00:21:10,892 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO
00:21:10,892 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [ALL] to Logger[ROOT]
00:21:10,893 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [WARN] to Logger[ROOT]
00:21:10,893 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [ERROR] to Logger[ROOT]
00:21:10,893 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [CONSOLE] to Logger[ROOT]
00:21:10,893 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
00:21:10,894 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@2038ae61 - Registering current configuration as safe fallback point2020-08-06 00:21:11.194INFO --- [main] io.seata.config.FileConfiguration: The configuration file used is registry.conf
2020-08-06 00:21:11.264INFO --- [main] io.seata.config.FileConfiguration: The configuration file used is file.conf
2020-08-06 00:21:11.959INFO --- [main] i.s.core.rpc.netty.NettyServerBootstrap: Server started, listen port: 8091
搭建工程连接seata server的方式有多种,这儿我们采用文件的方式:
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "file"
file {
name = "file.conf"
}
}
关于springboot多数据源的配置,如果你还不太熟悉,看一下我写的这篇文章《springboot研究三:springboot多数据源配置+mybatis+mysql》,本文不再介绍。
本文的示例来自seata官方示例,我做了修改,需要3个数据库:seata_pay、seata_order和seata_storage,每个数据库都有一张undo_log表记录回滚日志。sql语句如下:
# Order
DROP DATABASE IF EXISTS seata_order;
CREATE DATABASE seata_order;
CREATE TABLE seata_order.orders
(
idINT(11) NOT NULL AUTO_INCREMENT,
user_idINT(11)DEFAULT NULL,
product_idINT(11)DEFAULT NULL,
pay_amountDECIMAL(10, 0) DEFAULT NULL,
statusVARCHAR(100)DEFAULT NULL,
add_timeDATETIMEDEFAULT CURRENT_TIMESTAMP,
last_update_time DATETIMEDEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
CREATE TABLE seata_order.undo_log
(
idBIGINT(20)NOT NULL AUTO_INCREMENT,
branch_idBIGINT(20)NOT NULL,
xidVARCHAR(100) NOT NULL,
contextVARCHAR(128) NOT NULL,
rollback_info LONGBLOBNOT NULL,
log_statusINT(11)NOT NULL,
log_createdDATETIMENOT NULL,
log_modifiedDATETIMENOT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8
;
# Storage
DROP DATABASE IF EXISTS seata_storage;
CREATE DATABASE seata_storage;
CREATE TABLE seata_storage.product
(
idINT(11) NOT NULL AUTO_INCREMENT,
priceDOUBLEDEFAULT NULL,
stockINT(11)DEFAULT NULL,
last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
INSERT INTO seata_storage.product (id, price, stock) VALUES (1, 5, 10);
CREATE TABLE seata_storage.undo_log
(
idBIGINT(20)NOT NULL AUTO_INCREMENT,
branch_idBIGINT(20)NOT NULL,
xidVARCHAR(100) NOT NULL,
contextVARCHAR(128) NOT NULL,
rollback_info LONGBLOBNOT NULL,
log_statusINT(11)NOT NULL,
log_createdDATETIMENOT NULL,
log_modifiedDATETIMENOT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
# Pay
DROP DATABASE IF EXISTS seata_pay;
CREATE DATABASE seata_pay;
CREATE TABLE seata_pay.account
(
idINT(11) NOT NULL AUTO_INCREMENT,
balanceDOUBLEDEFAULT NULL,
last_update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
CREATE TABLE seata_pay.undo_log
(
idBIGINT(20)NOT NULL AUTO_INCREMENT,
branch_idBIGINT(20)NOT NULL,
xidVARCHAR(100) NOT NULL,
contextVARCHAR(128) NOT NULL,
rollback_info LONGBLOBNOT NULL,
log_statusINT(11)NOT NULL,
log_createdDATETIMENOT NULL,
log_modifiedDATETIMENOT NULL,
PRIMARY KEY (id),
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8;
INSERT INTO seata_pay.account (id, balance) VALUES (1, 1);
SELECT auto_increment
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = 'seata_order'
AND TABLE_NAME = 'undo_log'
整个springboot的目录如下
文章图片
这里用的是动态数据源,跟我之前文章讲的mybatis多数据源配置不同,这里我主要讲一下这一块
首先看一下mybatis config文件mybatis.xml,所有数据库的xml映射文件都写入一个xml里面,代码如下:
再看一下application.properties中数据源的配置,这儿我们创建了3个数据库,所以需要配置3个数据源:
spring.application.name=springboot-seata
######seata_pay#############
datasource.pay.jdbc-url=jdbc:mysql://192.168.59.1:3306/seata_pay?useAffectedRows=true&serverTimezone=UTC&characterEncoding=utf-8
datasource.pay.username=root
datasource.pay.password=123456
datasource.pay.driver-class-name=com.mysql.cj.jdbc.Driver######seata_storage#############
datasource.storage.jdbc-url=jdbc:mysql://192.168.59.1:3306/seata_storage?useAffectedRows=true&serverTimezone=UTC&characterEncoding=utf-8
datasource.storage.username=root
datasource.storage.password=123456
datasource.storage.driver-class-name=com.mysql.cj.jdbc.Driver######seata_order#############
datasource.order.jdbc-url=jdbc:mysql://192.168.59.1:3306/seata_order?useAffectedRows=true&serverTimezone=UTC&characterEncoding=utf-8
datasource.order.username=root
datasource.order.password=123456
datasource.order.driver-class-name=com.mysql.cj.jdbc.Driverspring.cloud.alibaba.seata.tx-service-group=springboot-seata
mybatis动态数据源配置类如下:
@Configuration
@MapperScan(basePackages={"boot.mapper"}, sqlSessionFactoryRef = "sqlSessionFactoryBean")
public class DataSourceProxyConfig {@Bean("originOrder")
@ConfigurationProperties(prefix = "datasource.order")
public DataSource dataSourceMaster() {
return DataSourceBuilder.create().build();
}@Bean("originStorage")
@ConfigurationProperties(prefix = "datasource.storage")
public DataSource dataSourceStorage() {
return DataSourceBuilder.create().build();
}@Bean("originPay")
@ConfigurationProperties(prefix = "datasource.pay")
public DataSource dataSourcePay() {
return DataSourceBuilder.create().build();
}@Bean(name = "order")
public DataSourceProxy masterDataSourceProxy(@Qualifier("originOrder") DataSource dataSource) {
return new DataSourceProxy(dataSource);
}@Bean(name = "storage")
public DataSourceProxy storageDataSourceProxy(@Qualifier("originStorage") DataSource dataSource) {
return new DataSourceProxy(dataSource);
}@Bean(name = "pay")
public DataSourceProxy payDataSourceProxy(@Qualifier("originPay") DataSource dataSource) {
return new DataSourceProxy(dataSource);
}@Bean("dynamicDataSource")
public DataSource dynamicDataSource(@Qualifier("order") DataSource dataSourceOrder,
@Qualifier("storage") DataSource dataSourceStorage,
@Qualifier("pay") DataSource dataSourcePay) {//这儿是动态数据源配置的关键,3个数据源放在了一个map里面DynamicRoutingDataSource dynamicRoutingDataSource = new DynamicRoutingDataSource();
Map
在service调用dao时,需要切换数据源,代码如下:
DynamicDataSourceContextHolder.setDataSourceKey(DataSourceKey.ORDER);
//切换到seata_order数据库
这里面的切换本质上是将数据库名称放到一个ThreadLocal上
public class DynamicDataSourceContextHolder {private static final ThreadLocal CONTEXT_HOLDER = ThreadLocal.withInitial(DataSourceKey.ORDER::name);
private static List
而ThreadLocal上面的数据库为动态数据源使用
public class DynamicRoutingDataSource extends AbstractRoutingDataSource {@Override
protected Object determineCurrentLookupKey() {
log.info("当前数据源 [{}]", DynamicDataSourceContextHolder.getDataSourceKey());
return DynamicDataSourceContextHolder.getDataSourceKey();
}
}
这样seata在获取连接的时候,就可以取到当前的数据库连接,因为每个库里面都有一个undo_log表需要写回滚日志,所以必须能够保证seata能够动态获取当前的数据库。
运行结果执行Application的main函数启动工程。
在上面的建表语句中,我们建了订单seata_order、支付seata_pay、库存seata_storage3个数据库,支付数据库中的account表插入了一条记录,余额是1,库存数据库的product表插入了1条记录,商品数量(stock)是10。
我们用postman模拟发送一个请求:
url:http://localhost:8083/order/placeOrder,content:
{
"userId":1,
"productId":1,
"price":1
}
执行成功,这时orders表插入了1条记录,account表余额减为0,product表商品数量减为9。如下3个图,
文章图片
文章图片
文章图片
这是我们再发一次上面的http请求,会失败,因为账户余额不足。这次我们采用debug方式,看一下undo_log的数据。程序执行到下图中的断点时,会产生undo_log,如下图:
文章图片
文章图片
我们看下undo_log中字段rollback_info数据:
{
"@class":"io.seata.rm.datasource.undo.BranchUndoLog",
"xid":"192.168.59.132:8091:34937248742391808",
"branchId":34937257391046656,
"sqlUndoLogs":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType":"INSERT",
"tableName":"orders",
"beforeImage":{
"@class":"io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords",
"tableName":"orders",
"rows":[
"java.util.ArrayList",
[]
]
},
"afterImage":{
"@class":"io.seata.rm.datasource.sql.struct.TableRecords",
"tableName":"orders",
"rows":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.sql.struct.Row",
"fields":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"id",
"keyType":"PRIMARY_KEY",
"type":4,
"value":2
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"user_id",
"keyType":"NULL",
"type":4,
"value":1
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"product_id",
"keyType":"NULL",
"type":4,
"value":1
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"pay_amount",
"keyType":"NULL",
"type":3,
"value":[
"java.math.BigDecimal",
1
]
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"status",
"keyType":"NULL",
"type":12,
"value":"INIT"
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"add_time",
"keyType":"NULL",
"type":93,
"value":[
"java.sql.Timestamp",
[
1596793692000,
0
]
]
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"last_update_time",
"keyType":"NULL",
"type":93,
"value":[
"java.sql.Timestamp",
[
1596793692000,
0
]
]
}
]
]
}
]
]
}
}
]
]
}
可以看到undo_log记录了要回滚的表中的每个字段的值,比如id=2,回滚时使用。
这时我们看seata server的日志,如下:
2020-08-06 21:49:08.740INFO --- [Thread_1_12_500] io.seata.server.coordinator.DefaultCore: Rollback global transaction successfully, xid = 192.168.59.132:8091:34937248742391808.
原理简介 其实分布式事务的原理还是使用了单数据库的事务原理,我们可以把分布式事务中每个数据库看做是单数据库的表。首先每个事务有一个全局的事务id,叫做xid,上面的第二个例子中值是192.168.59.132:8091:34937248742391808。
有了这个xid后,我们就可以记录undo_log了,undo_log中记录了这个xid的,每次提交事务前都要先写undo_log,后提交事务,这时你一定恍然大悟,这不就是mysql中的wal机制吗?
而rollback_info字段记录了要回滚的表的记录中的每个字段和对应值,这样就方便的回滚了。这时你肯定又恍然大悟,与其说是回滚,难道这不就是交易补偿啊?
理解了这个,在理解seata官方的解释就容易多了,如下图:
文章图片
seata中有3个角色,TC其实就是seata server,RM是单个数据库的事务管理器,TM是定义开启和提交回滚全局事务的组件。官方定义如下:
Transaction Coordinator(TC): Maintain status of global and branch transactions, drive the global commit or rollback.
Transaction Manager(TM): Define the scope of global transaction: begin a global transaction, commit or rollback a global transaction.
Resource Manager(RM): Manage resources that branch transactions working on, talk to TC for registering branch transactions and reporting status of branch transactions, and drive the branch transaction commit or rollback.
总结 本文主要介绍了springboot多数据源整合seata的使用,也简单介绍了一些原理。seata对分布式事务的管理思想其实还是单个数据库事务的思想。后面有时间再详细介绍seata的原理。
源代码地址:
https://github.com/jinjunzhu/springboot-seata.git
参考:
http://seata.io/en-us/docs/user/quickstart.html
https://github.com/seata/seata
https://github.com/seata/seata-samples
欢迎关注个人公众号,共同学习,共同成长
文章图片
推荐阅读
- JAVA框架之路|spring-boot笔记-工程搭建(一)
- spring|Spring AOP从入门到放弃之概念以及Spring Boot AOP demo
- springcloud+eureka+seata集成
- spring-boot|springboot研究十一(springcloud+eureka整合分布式事务中间件seata)
- spring-boot|zookeeper与grpc集成实现服务注册与发现
- java学习总结|springboot 完整企业项目搭建实记
- java|redis jwt spring boot spring security 实现api token 验证