五陵年少金市东,银鞍白马渡春风。这篇文章主要讲述盘点 Seata : Server 端事务的 Session 如何处理?相关的知识,希望能为你提供帮助。
一 .前言之前提及过 Seata Client 的请求流程 , 这一篇从 Session 的处理来看一下 Seata Server 端的处理 .
每一次 Seata 的全局操作都会创建一个 Session , 并且往表中插入事务数据.
二 . global_table 表先来看一下 global_table 的表结构
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int(11) DEFAULT NULL,
`begin_time` bigint(20) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
三 . Server Session 处理一览我们通过启动参数 -m 对请求 STORE_MODE 进行配置 : seata-server.bat -m db
整个 Session 的处理会分别对2个操作进行处理 , 一个为 global_table , 一个为 branch_table , 依次来说 :
Pro 1 : global_table 的作用
global_table 用于持久化全局事务 , 可以通过 store.db.global.table 进行配置
Pro 2 : branch_table 的作用
branch_table 用于标识分支事务 , 可以通过 store.db.branch.table 进行配置
# C- LogStoreDataBaseDAO # insertGlobalTransactionDO : 插入 global_table
INSERT INTO `seata`.`global_table`
( `xid`,
`transaction_id`,
`status`,
`application_id`,
`transaction_service_group`,
`transaction_name`,
`timeout`,
`begin_time`,
`application_data`,
`gmt_create`,
`gmt_modified` )
VALUES
( 192.168.181.2:8091:8466916507467911205,
8466916507467911205,
1,
business-seata-example,
business-service-seata-service-group,
dubbo-gts-seata-example,
300000,
1624863673423,
NULL,
2021-06-28 15:01:28,
2021-06-28 15:01:28 );
# C- LogStoreDataBaseDAO # insertBranchTransactionDO
INSERT INTO `seata`.`branch_table`
(`branch_id`,
`xid`,
`transaction_id`,
`resource_group_id`,
`resource_id`,
`branch_type`,
`status`,
`client_id`,
`application_data`,
`gmt_create`,
`gmt_modified`)
VALUES
(8466916507467911829,
192.168.181.2:8091:8466916507467911205,
8466916507467911205,
NULL,
jdbc:mysql://127.0.0.1:3306/seata,
AT,
0,
storage-seata-example:192.168.181.2:51964,
NULL,
2021-06-28 15:35:18.534107,
2021-06-28 15:35:18.534107);
3.1 global_table 的处理流程配置 STORE_MODE 为 db 后 , 会使用 DataBaseSessionManager 和 DataBaseTransactionStoreManager 进行业务的处理
// 创建的调用入口 (此处忽略前置逻辑 , 但从 Session 的创建开始)
C- AbstractSessionManager # onBegin
C- DataBaseSessionManager # addGlobalSession
C- DataBaseTransactionStoreManager # writeSession (此处类型为 GLOBAL_ADD((byte)1))
从 Step 1 中可以看到 , 添加时会调用 writeSession , 这是个很重要的方法 , 基本上所有的编辑session 操作都会经历该类 , 可以通过 Debug 该部分
/**
* DataBaseTransactionStoreManager 该方法中进行了全局事务和分支事务的管理
**/
public boolean writeSession(LogOperation logOperation, SessionStorable session)
if (LogOperation.GLOBAL_ADD.equals(logOperation))
// 插入全局事务
return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
else if (LogOperation.GLOBAL_UPDATE.equals(logOperation))
// 更新全局事务
return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
else if (LogOperation.GLOBAL_REMOVE.equals(logOperation))
// 删除全局事务
return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
else if (LogOperation.BRANCH_ADD.equals(logOperation))
// 插入分支事务
return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
else if (LogOperation.BRANCH_UPDATE.equals(logOperation))
// 更新分支事务
return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
else if (LogOperation.BRANCH_REMOVE.equals(logOperation))
// 删除分支事务
return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
else
throw new StoreException("Unknown LogOperation:" + logOperation.name());
[Pro31001] : logOperation 的作用和来源
LogOperation 作用 :
LogOperation 是一个枚举类 , 用于表示操作的类型
enum LogOperation
GLOBAL_ADD((byte)1),
GLOBAL_UPDATE((byte)2),
GLOBAL_REMOVE((byte)3),BRANCH_ADD((byte)4),
BRANCH_UPDATE((byte)5),
BRANCH_REMOVE((byte)6);
private byte code;
LogOperation 的来源:
在调用该流程的时候 , 会传入对应的 LogOperation.code . 例如 DataBaseSessionManager 操作中
C- DataBaseSessionManager
M- addGlobalSession
- transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
M- updateGlobalSessionStatus
- transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
M- removeGlobalSession
- transactionStoreManager.writeSession(LogOperation.GLOBAL_REMOVE, session)
3.2 branch_table 的处理逻辑
//======== 以下是 Beanch 逻辑
C- DataBaseTransactionStoreManager # writeSession (此处类型为 BRANCH_ADD((byte)4))// 最终调用有以下方法
C- LogStoreDataBaseDAO
M- insertBranchTransactionDO
M- updateBranchTransactionDO
M- deleteBranchTransactionDO
四 . Session 的初始化流程 4.1 Session 的初始化
// 在 server # main 启动方法中 , 会调用以下语句
SessionHolder.init(parameterParser.getStoreMode());
C- SessionHolder # init 中进行了如下操作 :
- 获得配置的 store.mode , 从下面的代码可以看到支持 DB , FILE , REDIS
- 通过 EnhancedServiceLoader#load 加载 SessionManager
public static void init(String mode) // 获取配置文件中的 store.mode 属性
if (StringUtils.isBlank(mode))
mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
// 构建 StoreMode
StoreMode storeMode = StoreMode.get(mode);
if (StoreMode.DB.equals(storeMode))
// 基础会话管理器
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
// 异步会话管理器
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] ASYNC_COMMITTING_SESSION_MANAGER_NAME);
// 重试提交会话管理器
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] RETRY_COMMITTING_SESSION_MANAGER_NAME);
// 重试回退会话管理器
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
new Object[] RETRY_ROLLBACKING_SESSION_MANAGER_NAME);
else if (StoreMode.FILE.equals(storeMode))
//..... 省略
else if (StoreMode.REDIS.equals(storeMode))
//..... 省略
else
// unknown store
throw new IllegalArgumentException("unknown store mode:" + mode);
// 详见最后 , 刷新操作
reload(storeMode);
public static <
S>
S load(Class<
S>
service, String activateName) throws EnhancedServiceNotFoundException
// SPI 核心 : 这里就是一个简单的下层调用 , 这里简单说一下 InnerEnhancedServiceLoader
return InnerEnhancedServiceLoader.getServiceLoader(service).load(activateName, findClassLoader());
// getServiceLoader 获取 ServiceLoader
private static <
S>
InnerEnhancedServiceLoader<
S>
getServiceLoader(Class<
S>
type)
// 主要就是通过 SERVICE_LOADERS 获取整个集合 , 另外可以看到 , 这里是每次调用时为空就会先创建 , 再缓存一遍
return (InnerEnhancedServiceLoader<
S>
)CollectionUtils.computeIfAbsent(SERVICE_LOADERS, type,
key ->
new InnerEnhancedServiceLoader<
>
(type));
[PRO:] InnerEnhancedServiceLoader 的作用 ?
InnerEnhancedServiceLoader 是 EnhancedServiceLoader 的内部类 :// Pro : EnhancedServiceLoader 作用
EnhancedServiceLoader 是 Seata SPI 实现核心类 , Seata 通过 SPI 机制来实现 seata 的扩展 , 使其可以兼容多种注册中心 :EnhancedServiceLoader 中 load 有如下的方式加载一个服务 :
M- load(Class<
S>
service, ClassLoader loader) : 通过服务类型和加载器加载
M- load(Class<
S>
service) : 通过服务类型加载
M- load(Class<
S>
service, String activateName) : 通过服务类型和激活名 (加载 ExtensionDefinition 会使用该名称)
M- load(Class<
S>
service, String activateName, ClassLoader loader)
M- load(Class<
S>
service, String activateName, Object[] args) : 带属性参数 (Instance 创建实例时会使用该参数)
// load 同时提供载入一组服务
M- loadAll(Class<
S>
service)
M- loadAll(Class<
S>
service, Class[] argsType, Object[] args)// Pro :SPI Server 存放的位置
Seata 的Service 类和 Spring factories 基本上一直 , 也是放在 META-INF.service 中 , 其中提供了如下配置->
PIC30001 // Pro :EnhancedServiceLoader 的 子类
EnhancedServiceLoader 中有一个内部类 : C- InnerEnhancedServiceLoader, 主要作用为避免多次载入时出现不必要的载入InnerEnhancedServiceLoader 中提供了如下的参数 :
// class 对应的 InnerEnhancedServiceLoader 集合
ConcurrentMap<
Class<
?>
, InnerEnhancedServiceLoader<
?>
>
SERVICE_LOADERS = new ConcurrentHashMap<
>
();
// Holder 内部有一个 volatile 参数用于保存对象, 保证多线程可见
Holder<
List<
ExtensionDefinition>
>
definitionsHolder = new Holder<
>
();
// ExtensionDefinition 集合
ConcurrentMap<
ExtensionDefinition, Holder<
Object>
>
definitionToInstanceMap = new ConcurrentHashMap<
>
();
// name 对应的 ExtensionDefinition 集合
ConcurrentMap<
String, List<
ExtensionDefinition>
>
nameToDefinitionsMap = new ConcurrentHashMap<
>
();
// ExtensionDefinition class类型 对应的 ExtensionDefinition
ConcurrentMap<
Class<
?>
, ExtensionDefinition>
classToDefinitionMap = new ConcurrentHashMap<
>
();
PIC30001 : META-INF.service 数据
文章图片
该单元是主要的处理流程 , 用于
C- EnhancedServiceLoader
private S loadExtension(String activateName, ClassLoader loader, Class[] argTypes,
Object[] args) // activateName 判空操作 , 为空抛出异常 IllegalArgumentException
try
// 1 . 从配置文件 (META-INF 中加载所有的 Extension 对象)
loadAllExtensionClass(loader);
// 2 . 通过激活名获得 ExtensionDefinition 类数据
ExtensionDefinition cachedExtensionDefinition = getCachedExtensionDefinition(activateName);
// 3 . 获得实例
return getExtensionInstance(cachedExtensionDefinition, loader, argTypes, args);
catch (Throwable e)
// .... 异常处理省略
C- EnhancedServiceLoader
// 1\\. 判断和发起加载
private List<
Class>
loadAllExtensionClass(ClassLoader loader)
List<
ExtensionDefinition>
definitions = definitionsHolder.get();
if (definitions == null)
synchronized (definitionsHolder)
definitions = definitionsHolder.get();
if (definitions == null)
// 加锁后查询所有的 ExtensionDefinition , 避免线程冲突
definitions = findAllExtensionDefinition(loader);
definitionsHolder.set(definitions);
return definitions.stream().map(def ->
def.getServiceClass()).collect(Collectors.toList());
// 2\\. 加载流程
private List<
ExtensionDefinition>
findAllExtensionDefinition(ClassLoader loader) // 从 META-INF.service 和META-INF.seata 中获取配置
List<
ExtensionDefinition>
extensionDefinitions = new ArrayList<
>
();
try
loadFile(SERVICES_DIRECTORY, loader, extensionDefinitions);
loadFile(SEATA_DIRECTORY, loader, extensionDefinitions);
catch (IOException e)
throw new EnhancedServiceNotFoundException(e);
// 加载所有扩展后,按顺序对缓存进行排序 ->
nameToDefinitionsMap
if (!nameToDefinitionsMap.isEmpty())
for (List<
ExtensionDefinition>
definitions : nameToDefinitionsMap.values())
Collections.sort(definitions, (def1, def2) ->
int o1 = def1.getOrder();
int o2 = def2.getOrder();
return Integer.compare(o1, o2);
);
// 对加载的 extensionDefinitions 进行排序
if (!extensionDefinitions.isEmpty())
Collections.sort(extensionDefinitions, (definition1, definition2) ->
int o1 = definition1.getOrder();
int o2 = definition2.getOrder();
return Integer.compare(o1, o2);
);
return extensionDefinitions;
// 比较简单 , 就是获取 ConcurrentMap<
String, List<
ExtensionDefinition>
>
nameToDefinitionsMap
private ExtensionDefinition getCachedExtensionDefinition(String activateName)
List<
ExtensionDefinition>
definitions = nameToDefinitionsMap.get(activateName);
return CollectionUtils.getLast(definitions);
// 发起流程 :
loadExtension ->
getExtensionInstance ->
createNewExtension// getExtensionInstance 逻辑比较简单 , 就是判断是否为单例从而进行了一个单例模式的创建// createNewExtension 创建实例
private S createNewExtension(ExtensionDefinition definition, ClassLoader loader, Class[] argTypes, Object[] args)
Class<
?>
clazz = definition.getServiceClass();
try
S newInstance = initInstance(clazz, argTypes, args);
return newInstance;
catch (Throwable t)
throw new IllegalStateException(....);
// initInstance 初始化实例
private S initInstance(Class implClazz, Class[] argTypes, Object[] args)
throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException
S s = null;
if (argTypes != null &
&
args != null)
// 获取 构造函数的参数
Constructor<
S>
constructor = implClazz.getDeclaredConstructor(argTypes);
// 如果有参数 ,通过参数创建实例
s = type.cast(constructor.newInstance(args));
else
// 使用默认构造器创建 (无参数的情况)
s = type.cast(implClazz.newInstance());
if (s instanceof Initialize)
// 核心 7-1 实例init初始化
((Initialize)s).init();
return s;
其中关于 DataBase 会使用 DataBaseSessionManager 操作 , 这一块看一下整体的体系 :
文章图片
public void init()
// 初始化 DataBaseTransactionStoreManager
transactionStoreManager = DataBaseTransactionStoreManager.getInstance();
// PS : Initialize 的实现类都需要实现 init 方法
public interface Initialize
void init();
C- DataBaseTransactionStoreManager
P- int DEFAULT_LOG_QUERY_LIMIT = 100;
private DataBaseTransactionStoreManager()
logQueryLimit = CONFIG.getInt(ConfigurationKeys.STORE_DB_LOG_QUERY_LIMIT, DEFAULT_LOG_QUERY_LIMIT);
// 获取 Datasource 类型
String datasourceType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE);
// 初始化 dataSource
DataSource logStoreDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide();
// 构建 LogStoreDataBaseDAO
logStore = new LogStoreDataBaseDAO(logStoreDataSource);
// [Pro] : ConfigurationKeys 的参数
String STORE_DB_LOG_QUERY_LIMIT = STORE_DB_PREFIX + "queryLimit";
// [Pro] : DataSourceProvider 的实现类
io.seata.server.store.DbcpDataSourceProvider
io.seata.server.store.DruidDataSourceProvider
io.seata.server.store.HikariDataSourceProvider// 此处构建了一个 DataSource
@LoadLevel(name = "hikari")
public class HikariDataSourceProvider extends AbstractDataSourceProvider @Override
public DataSource generate()
Properties properties = new Properties();
properties.setProperty("dataSource.cachePrepStmts", "true");
properties.setProperty("dataSource.prepStmtCacheSize", "250");
properties.setProperty("dataSource.prepStmtCacheSqlLimit", "2048");
properties.setProperty("dataSource.useServerPrepStmts", "true");
properties.setProperty("dataSource.useLocalSessionState", "true");
properties.setProperty("dataSource.rewriteBatchedStatements", "true");
properties.setProperty("dataSource.cacheResultSetMetadata", "true");
properties.setProperty("dataSource.cacheServerConfiguration", "true");
properties.setProperty("dataSource.elideSetAutoCommits", "true");
properties.setProperty("dataSource.maintainTimeStats", "false");
HikariConfig config = new HikariConfig(properties);
config.setDriverClassName(getDriverClassName());
config.setJdbcUrl(getUrl());
config.setUsername(getUser());
config.setPassword(getPassword());
config.setMaximumPoolSize(getMaxConn());
config.setMinimumIdle(getMinConn());
config.setAutoCommit(true);
config.setConnectionTimeout(getMaxWait());
config.setInitializationFailTimeout(-1);
return new HikariDataSource(config);
在执行完上述逻辑后还没完全 , 注意 Step 2 中最后还有个 Reload 操作 ,该操作会继续处理 DataBaseSessionManager
c- SessionHolder // 这里会议一下之前的属性
private static SessionManager ROOT_SESSION_MANAGER;
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
protected static void reload(StoreMode storeMode)
if (ROOT_SESSION_MANAGER instanceof Reloadable)
((Reloadable) ROOT_SESSION_MANAGER).reload();
//
Collection<
GlobalSession>
allSessions = ROOT_SESSION_MANAGER.allSessions();
if (CollectionUtils.isNotEmpty(allSessions))
List<
GlobalSession>
removeGlobalSessions = new ArrayList<
>
();
Iterator<
GlobalSession>
iterator = allSessions.iterator();
while (iterator.hasNext())
GlobalSession globalSession = iterator.next();
GlobalStatus globalStatus = globalSession.getStatus();
// 通过属性来判断处理的方式
switch (globalStatus)
case UnKnown:
case Committed:
case CommitFailed:
case Rollbacked:
case RollbackFailed:
case TimeoutRollbacked:
case TimeoutRollbackFailed:
case Finished:
removeGlobalSessions.add(globalSession);
break;
case AsyncCommitting:
if (storeMode == StoreMode.FILE)
queueToAsyncCommitting(globalSession);
break;
default:
// TODO : 此处的原理在后面说 Lock 逻辑的时候统一说
if (storeMode == StoreMode.FILE)
lockBranchSessions(globalSession.getSortedBranches());
// 如果上述都没有 , 需要先处理分支事务
switch (globalStatus)
case Committing:
case CommitRetrying:
queueToRetryCommit(globalSession);
break;
case Rollbacking:
case RollbackRetrying:
case TimeoutRollbacking:
case TimeoutRollbackRetrying:
queueToRetryRollback(globalSession);
break;
case Begin:
globalSession.setActive(true);
break;
default:
throw new ShouldNeverHappenException("NOT properly handled " + globalStatus);
break;
for (GlobalSession globalSession : removeGlobalSessions)
removeInErrorState(globalSession);
C- 以 Database 为例 ,allSessions 又如下操作
String ROOT_SESSION_MANAGER_NAME = "root.data";
String ASYNC_COMMITTING_SESSION_MANAGER_NAME = "async.commit.data";
String RETRY_COMMITTING_SESSION_MANAGER_NAME = "retry.commit.data";
String RETRY_ROLLBACKING_SESSION_MANAGER_NAME = "retry.rollback.data";
public Collection<
GlobalSession>
allSessions()
// get by taskName
if (SessionHolder.ASYNC_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName))
return findGlobalSessions(new SessionCondition(GlobalStatus.AsyncCommitting));
else if (SessionHolder.RETRY_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName))
return findGlobalSessions(new SessionCondition(new GlobalStatus[] GlobalStatus.CommitRetrying, GlobalStatus.Committing));
else if (SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName))
return findGlobalSessions(new SessionCondition(new GlobalStatus[] GlobalStatus.RollbackRetrying,
GlobalStatus.Rollbacking, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying));
else
// all data
return findGlobalSessions(new SessionCondition(new GlobalStatus[]
GlobalStatus.UnKnown, GlobalStatus.Begin,
GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking,
GlobalStatus.RollbackRetrying,
GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.AsyncCommitting));
[Pro] : Reloadable 对象体系
public interface Reloadable
void reload();
// 这里的实现类主要为 FileSessionManager
五 . 扩展知识点 5.1 LoadLevel 的作用@LoadLevel(name = " file" , scope = Scope.PROTOTYPE)
LoadLevel 注解提供了三个参数 :@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE, ElementType.METHOD)
public @interface LoadLevel String name();
// 在类似链式调用的过程中 , 可以对 Provider 进行排序
int order() default 0;
Scope scope() default Scope.SINGLETON;
// 作用域范围
public enum Scope SINGLETON,
PROTOTYPE
总结LoadLevel 和 MATA-INF 真正的作用是用于扩展不同的数据库 , 后续等 seata 梳理完成后 , 再来看一下如何进行定制.
【盘点 Seata : Server 端事务的 Session 如何处理()】自此 Session 的处理类初始化完成 , 后面来看一下 Session 在调用过程中的处理和数据库处理
推荐阅读
- CentOS之——设置系统时间与网络时间同步
- Kali之——系统网络设置
- Kali之——关闭防火墙
- Kali之——安装内核集成的无线驱动补丁
- Kali之——更新软件源
- 为心爱的人做一个超具创意的表白网页吧?(告白气球)HTML+CSS+JavaScript
- Kali之——所有工具的安装
- Kali之——设置静态IP
- Linux内核虚拟文件系统