盘点 Seata : Server 端事务的 Session 如何处理()

五陵年少金市东,银鞍白马渡春风。这篇文章主要讲述盘点 Seata : Server 端事务的 Session 如何处理?相关的知识,希望能为你提供帮助。
一 .前言之前提及过 Seata Client 的请求流程 , 这一篇从 Session 的处理来看一下 Seata Server 端的处理 .
每一次 Seata 的全局操作都会创建一个 Session , 并且往表中插入事务数据.
二 . global_table 表先来看一下 global_table 的表结构

CREATE TABLE `global_table` ( `xid` varchar(128) NOT NULL, `transaction_id` bigint(20) DEFAULT NULL, `status` tinyint(4) NOT NULL, `application_id` varchar(32) DEFAULT NULL, `transaction_service_group` varchar(32) DEFAULT NULL, `transaction_name` varchar(128) DEFAULT NULL, `timeout` int(11) DEFAULT NULL, `begin_time` bigint(20) DEFAULT NULL, `application_data` varchar(2000) DEFAULT NULL, `gmt_create` datetime DEFAULT NULL, `gmt_modified` datetime DEFAULT NULL, PRIMARY KEY (`xid`), KEY `idx_gmt_modified_status` (`gmt_modified`,`status`), KEY `idx_transaction_id` (`transaction_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE `branch_table` ( `branch_id` bigint(20) NOT NULL, `xid` varchar(128) NOT NULL, `transaction_id` bigint(20) DEFAULT NULL, `resource_group_id` varchar(32) DEFAULT NULL, `resource_id` varchar(256) DEFAULT NULL, `branch_type` varchar(8) DEFAULT NULL, `status` tinyint(4) DEFAULT NULL, `client_id` varchar(64) DEFAULT NULL, `application_data` varchar(2000) DEFAULT NULL, `gmt_create` datetime(6) DEFAULT NULL, `gmt_modified` datetime(6) DEFAULT NULL, PRIMARY KEY (`branch_id`), KEY `idx_xid` (`xid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

三 . Server Session 处理一览我们通过启动参数 -m 对请求 STORE_MODE 进行配置 : seata-server.bat -m db
整个 Session 的处理会分别对2个操作进行处理 , 一个为 global_table , 一个为 branch_table , 依次来说 :
Pro 1 : global_table 的作用
global_table 用于持久化全局事务 , 可以通过 store.db.global.table 进行配置
Pro 2 : branch_table 的作用
branch_table 用于标识分支事务 , 可以通过 store.db.branch.table 进行配置
# C- LogStoreDataBaseDAO # insertGlobalTransactionDO : 插入 global_table INSERT INTO `seata`.`global_table` ( `xid`, `transaction_id`, `status`, `application_id`, `transaction_service_group`, `transaction_name`, `timeout`, `begin_time`, `application_data`, `gmt_create`, `gmt_modified` ) VALUES ( 192.168.181.2:8091:8466916507467911205, 8466916507467911205, 1, business-seata-example, business-service-seata-service-group, dubbo-gts-seata-example, 300000, 1624863673423, NULL, 2021-06-28 15:01:28, 2021-06-28 15:01:28 ); # C- LogStoreDataBaseDAO # insertBranchTransactionDO INSERT INTO `seata`.`branch_table` (`branch_id`, `xid`, `transaction_id`, `resource_group_id`, `resource_id`, `branch_type`, `status`, `client_id`, `application_data`, `gmt_create`, `gmt_modified`) VALUES (8466916507467911829, 192.168.181.2:8091:8466916507467911205, 8466916507467911205, NULL, jdbc:mysql://127.0.0.1:3306/seata, AT, 0, storage-seata-example:192.168.181.2:51964, NULL, 2021-06-28 15:35:18.534107, 2021-06-28 15:35:18.534107);

3.1 global_table 的处理流程配置 STORE_MODE 为 db 后 , 会使用 DataBaseSessionManager 和 DataBaseTransactionStoreManager 进行业务的处理
// 创建的调用入口 (此处忽略前置逻辑 , 但从 Session 的创建开始) C- AbstractSessionManager # onBegin C- DataBaseSessionManager # addGlobalSession C- DataBaseTransactionStoreManager # writeSession (此处类型为 GLOBAL_ADD((byte)1))

从 Step 1 中可以看到 , 添加时会调用 writeSession , 这是个很重要的方法 , 基本上所有的编辑session 操作都会经历该类 , 可以通过 Debug 该部分
/** * DataBaseTransactionStoreManager 该方法中进行了全局事务和分支事务的管理 **/ public boolean writeSession(LogOperation logOperation, SessionStorable session) if (LogOperation.GLOBAL_ADD.equals(logOperation)) // 插入全局事务 return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) // 更新全局事务 return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) // 删除全局事务 return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); else if (LogOperation.BRANCH_ADD.equals(logOperation)) // 插入分支事务 return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) // 更新分支事务 return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) // 删除分支事务 return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); else throw new StoreException("Unknown LogOperation:" + logOperation.name());

[Pro31001] : logOperation 的作用和来源
LogOperation 作用 :
LogOperation 是一个枚举类 , 用于表示操作的类型
enum LogOperation GLOBAL_ADD((byte)1), GLOBAL_UPDATE((byte)2), GLOBAL_REMOVE((byte)3),BRANCH_ADD((byte)4), BRANCH_UPDATE((byte)5), BRANCH_REMOVE((byte)6); private byte code;

LogOperation 的来源:
在调用该流程的时候 , 会传入对应的 LogOperation.code . 例如 DataBaseSessionManager 操作中
C- DataBaseSessionManager M- addGlobalSession - transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session); M- updateGlobalSessionStatus - transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session); M- removeGlobalSession - transactionStoreManager.writeSession(LogOperation.GLOBAL_REMOVE, session)

3.2 branch_table 的处理逻辑
//======== 以下是 Beanch 逻辑 C- DataBaseTransactionStoreManager # writeSession (此处类型为 BRANCH_ADD((byte)4))// 最终调用有以下方法 C- LogStoreDataBaseDAO M- insertBranchTransactionDO M- updateBranchTransactionDO M- deleteBranchTransactionDO

四 . Session 的初始化流程 4.1 Session 的初始化
// 在 server # main 启动方法中 , 会调用以下语句 SessionHolder.init(parameterParser.getStoreMode());

C- SessionHolder # init 中进行了如下操作 :
  • 获得配置的 store.mode , 从下面的代码可以看到支持 DB , FILE , REDIS
  • 通过 EnhancedServiceLoader#load 加载 SessionManager
public static void init(String mode) // 获取配置文件中的 store.mode 属性 if (StringUtils.isBlank(mode)) mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE); // 构建 StoreMode StoreMode storeMode = StoreMode.get(mode); if (StoreMode.DB.equals(storeMode)) // 基础会话管理器 ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName()); // 异步会话管理器 ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(), new Object[] ASYNC_COMMITTING_SESSION_MANAGER_NAME); // 重试提交会话管理器 RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(), new Object[] RETRY_COMMITTING_SESSION_MANAGER_NAME); // 重试回退会话管理器 RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(), new Object[] RETRY_ROLLBACKING_SESSION_MANAGER_NAME); else if (StoreMode.FILE.equals(storeMode)) //..... 省略 else if (StoreMode.REDIS.equals(storeMode)) //..... 省略 else // unknown store throw new IllegalArgumentException("unknown store mode:" + mode); // 详见最后 , 刷新操作 reload(storeMode);

public static < S> S load(Class< S> service, String activateName) throws EnhancedServiceNotFoundException // SPI 核心 : 这里就是一个简单的下层调用 , 这里简单说一下 InnerEnhancedServiceLoader return InnerEnhancedServiceLoader.getServiceLoader(service).load(activateName, findClassLoader()); // getServiceLoader 获取 ServiceLoader private static < S> InnerEnhancedServiceLoader< S> getServiceLoader(Class< S> type) // 主要就是通过 SERVICE_LOADERS 获取整个集合 , 另外可以看到 , 这里是每次调用时为空就会先创建 , 再缓存一遍 return (InnerEnhancedServiceLoader< S> )CollectionUtils.computeIfAbsent(SERVICE_LOADERS, type, key -> new InnerEnhancedServiceLoader< > (type));

[PRO:] InnerEnhancedServiceLoader 的作用 ?
InnerEnhancedServiceLoader 是 EnhancedServiceLoader 的内部类 :// Pro : EnhancedServiceLoader 作用 EnhancedServiceLoader 是 Seata SPI 实现核心类 , Seata 通过 SPI 机制来实现 seata 的扩展 , 使其可以兼容多种注册中心 :EnhancedServiceLoader 中 load 有如下的方式加载一个服务 : M- load(Class< S> service, ClassLoader loader) : 通过服务类型和加载器加载 M- load(Class< S> service) : 通过服务类型加载 M- load(Class< S> service, String activateName) : 通过服务类型和激活名 (加载 ExtensionDefinition 会使用该名称) M- load(Class< S> service, String activateName, ClassLoader loader) M- load(Class< S> service, String activateName, Object[] args) : 带属性参数 (Instance 创建实例时会使用该参数) // load 同时提供载入一组服务 M- loadAll(Class< S> service) M- loadAll(Class< S> service, Class[] argsType, Object[] args)// Pro :SPI Server 存放的位置 Seata 的Service 类和 Spring factories 基本上一直 , 也是放在 META-INF.service 中 , 其中提供了如下配置-> PIC30001 // Pro :EnhancedServiceLoader 的 子类 EnhancedServiceLoader 中有一个内部类 : C- InnerEnhancedServiceLoader, 主要作用为避免多次载入时出现不必要的载入InnerEnhancedServiceLoader 中提供了如下的参数 : // class 对应的 InnerEnhancedServiceLoader 集合 ConcurrentMap< Class< ?> , InnerEnhancedServiceLoader< ?> > SERVICE_LOADERS = new ConcurrentHashMap< > (); // Holder 内部有一个 volatile 参数用于保存对象, 保证多线程可见 Holder< List< ExtensionDefinition> > definitionsHolder = new Holder< > (); // ExtensionDefinition 集合 ConcurrentMap< ExtensionDefinition, Holder< Object> > definitionToInstanceMap = new ConcurrentHashMap< > (); // name 对应的 ExtensionDefinition 集合 ConcurrentMap< String, List< ExtensionDefinition> > nameToDefinitionsMap = new ConcurrentHashMap< > (); // ExtensionDefinition class类型 对应的 ExtensionDefinition ConcurrentMap< Class< ?> , ExtensionDefinition> classToDefinitionMap = new ConcurrentHashMap< > ();

PIC30001 : META-INF.service 数据
盘点 Seata : Server 端事务的 Session 如何处理()

文章图片

该单元是主要的处理流程 , 用于
C- EnhancedServiceLoader private S loadExtension(String activateName, ClassLoader loader, Class[] argTypes, Object[] args) // activateName 判空操作 , 为空抛出异常 IllegalArgumentException try // 1 . 从配置文件 (META-INF 中加载所有的 Extension 对象) loadAllExtensionClass(loader); // 2 . 通过激活名获得 ExtensionDefinition 类数据 ExtensionDefinition cachedExtensionDefinition = getCachedExtensionDefinition(activateName); // 3 . 获得实例 return getExtensionInstance(cachedExtensionDefinition, loader, argTypes, args); catch (Throwable e) // .... 异常处理省略

C- EnhancedServiceLoader // 1\\. 判断和发起加载 private List< Class> loadAllExtensionClass(ClassLoader loader) List< ExtensionDefinition> definitions = definitionsHolder.get(); if (definitions == null) synchronized (definitionsHolder) definitions = definitionsHolder.get(); if (definitions == null) // 加锁后查询所有的 ExtensionDefinition , 避免线程冲突 definitions = findAllExtensionDefinition(loader); definitionsHolder.set(definitions); return definitions.stream().map(def -> def.getServiceClass()).collect(Collectors.toList()); // 2\\. 加载流程 private List< ExtensionDefinition> findAllExtensionDefinition(ClassLoader loader) // 从 META-INF.service 和META-INF.seata 中获取配置 List< ExtensionDefinition> extensionDefinitions = new ArrayList< > (); try loadFile(SERVICES_DIRECTORY, loader, extensionDefinitions); loadFile(SEATA_DIRECTORY, loader, extensionDefinitions); catch (IOException e) throw new EnhancedServiceNotFoundException(e); // 加载所有扩展后,按顺序对缓存进行排序 -> nameToDefinitionsMap if (!nameToDefinitionsMap.isEmpty()) for (List< ExtensionDefinition> definitions : nameToDefinitionsMap.values()) Collections.sort(definitions, (def1, def2) -> int o1 = def1.getOrder(); int o2 = def2.getOrder(); return Integer.compare(o1, o2); ); // 对加载的 extensionDefinitions 进行排序 if (!extensionDefinitions.isEmpty()) Collections.sort(extensionDefinitions, (definition1, definition2) -> int o1 = definition1.getOrder(); int o2 = definition2.getOrder(); return Integer.compare(o1, o2); ); return extensionDefinitions;

// 比较简单 , 就是获取 ConcurrentMap< String, List< ExtensionDefinition> > nameToDefinitionsMap private ExtensionDefinition getCachedExtensionDefinition(String activateName) List< ExtensionDefinition> definitions = nameToDefinitionsMap.get(activateName); return CollectionUtils.getLast(definitions);

// 发起流程 : loadExtension -> getExtensionInstance -> createNewExtension// getExtensionInstance 逻辑比较简单 , 就是判断是否为单例从而进行了一个单例模式的创建// createNewExtension 创建实例 private S createNewExtension(ExtensionDefinition definition, ClassLoader loader, Class[] argTypes, Object[] args) Class< ?> clazz = definition.getServiceClass(); try S newInstance = initInstance(clazz, argTypes, args); return newInstance; catch (Throwable t) throw new IllegalStateException(....); // initInstance 初始化实例 private S initInstance(Class implClazz, Class[] argTypes, Object[] args) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException S s = null; if (argTypes != null & & args != null) // 获取 构造函数的参数 Constructor< S> constructor = implClazz.getDeclaredConstructor(argTypes); // 如果有参数 ,通过参数创建实例 s = type.cast(constructor.newInstance(args)); else // 使用默认构造器创建 (无参数的情况) s = type.cast(implClazz.newInstance()); if (s instanceof Initialize) // 核心 7-1 实例init初始化 ((Initialize)s).init(); return s;

其中关于 DataBase 会使用 DataBaseSessionManager 操作 , 这一块看一下整体的体系 :
盘点 Seata : Server 端事务的 Session 如何处理()

文章图片

public void init() // 初始化 DataBaseTransactionStoreManager transactionStoreManager = DataBaseTransactionStoreManager.getInstance(); // PS : Initialize 的实现类都需要实现 init 方法 public interface Initialize void init();

C- DataBaseTransactionStoreManager P- int DEFAULT_LOG_QUERY_LIMIT = 100; private DataBaseTransactionStoreManager() logQueryLimit = CONFIG.getInt(ConfigurationKeys.STORE_DB_LOG_QUERY_LIMIT, DEFAULT_LOG_QUERY_LIMIT); // 获取 Datasource 类型 String datasourceType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE); // 初始化 dataSource DataSource logStoreDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide(); // 构建 LogStoreDataBaseDAO logStore = new LogStoreDataBaseDAO(logStoreDataSource); // [Pro] : ConfigurationKeys 的参数 String STORE_DB_LOG_QUERY_LIMIT = STORE_DB_PREFIX + "queryLimit"; // [Pro] : DataSourceProvider 的实现类 io.seata.server.store.DbcpDataSourceProvider io.seata.server.store.DruidDataSourceProvider io.seata.server.store.HikariDataSourceProvider// 此处构建了一个 DataSource @LoadLevel(name = "hikari") public class HikariDataSourceProvider extends AbstractDataSourceProvider @Override public DataSource generate() Properties properties = new Properties(); properties.setProperty("dataSource.cachePrepStmts", "true"); properties.setProperty("dataSource.prepStmtCacheSize", "250"); properties.setProperty("dataSource.prepStmtCacheSqlLimit", "2048"); properties.setProperty("dataSource.useServerPrepStmts", "true"); properties.setProperty("dataSource.useLocalSessionState", "true"); properties.setProperty("dataSource.rewriteBatchedStatements", "true"); properties.setProperty("dataSource.cacheResultSetMetadata", "true"); properties.setProperty("dataSource.cacheServerConfiguration", "true"); properties.setProperty("dataSource.elideSetAutoCommits", "true"); properties.setProperty("dataSource.maintainTimeStats", "false"); HikariConfig config = new HikariConfig(properties); config.setDriverClassName(getDriverClassName()); config.setJdbcUrl(getUrl()); config.setUsername(getUser()); config.setPassword(getPassword()); config.setMaximumPoolSize(getMaxConn()); config.setMinimumIdle(getMinConn()); config.setAutoCommit(true); config.setConnectionTimeout(getMaxWait()); config.setInitializationFailTimeout(-1); return new HikariDataSource(config);

在执行完上述逻辑后还没完全 , 注意 Step 2 中最后还有个 Reload 操作 ,该操作会继续处理 DataBaseSessionManager
c- SessionHolder // 这里会议一下之前的属性 private static SessionManager ROOT_SESSION_MANAGER; private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER; private static SessionManager RETRY_COMMITTING_SESSION_MANAGER; private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER; protected static void reload(StoreMode storeMode) if (ROOT_SESSION_MANAGER instanceof Reloadable) ((Reloadable) ROOT_SESSION_MANAGER).reload(); // Collection< GlobalSession> allSessions = ROOT_SESSION_MANAGER.allSessions(); if (CollectionUtils.isNotEmpty(allSessions)) List< GlobalSession> removeGlobalSessions = new ArrayList< > (); Iterator< GlobalSession> iterator = allSessions.iterator(); while (iterator.hasNext()) GlobalSession globalSession = iterator.next(); GlobalStatus globalStatus = globalSession.getStatus(); // 通过属性来判断处理的方式 switch (globalStatus) case UnKnown: case Committed: case CommitFailed: case Rollbacked: case RollbackFailed: case TimeoutRollbacked: case TimeoutRollbackFailed: case Finished: removeGlobalSessions.add(globalSession); break; case AsyncCommitting: if (storeMode == StoreMode.FILE) queueToAsyncCommitting(globalSession); break; default: // TODO : 此处的原理在后面说 Lock 逻辑的时候统一说 if (storeMode == StoreMode.FILE) lockBranchSessions(globalSession.getSortedBranches()); // 如果上述都没有 , 需要先处理分支事务 switch (globalStatus) case Committing: case CommitRetrying: queueToRetryCommit(globalSession); break; case Rollbacking: case RollbackRetrying: case TimeoutRollbacking: case TimeoutRollbackRetrying: queueToRetryRollback(globalSession); break; case Begin: globalSession.setActive(true); break; default: throw new ShouldNeverHappenException("NOT properly handled " + globalStatus); break; for (GlobalSession globalSession : removeGlobalSessions) removeInErrorState(globalSession); C- 以 Database 为例 ,allSessions 又如下操作 String ROOT_SESSION_MANAGER_NAME = "root.data"; String ASYNC_COMMITTING_SESSION_MANAGER_NAME = "async.commit.data"; String RETRY_COMMITTING_SESSION_MANAGER_NAME = "retry.commit.data"; String RETRY_ROLLBACKING_SESSION_MANAGER_NAME = "retry.rollback.data"; public Collection< GlobalSession> allSessions() // get by taskName if (SessionHolder.ASYNC_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) return findGlobalSessions(new SessionCondition(GlobalStatus.AsyncCommitting)); else if (SessionHolder.RETRY_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) return findGlobalSessions(new SessionCondition(new GlobalStatus[] GlobalStatus.CommitRetrying, GlobalStatus.Committing)); else if (SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) return findGlobalSessions(new SessionCondition(new GlobalStatus[] GlobalStatus.RollbackRetrying, GlobalStatus.Rollbacking, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying)); else // all data return findGlobalSessions(new SessionCondition(new GlobalStatus[] GlobalStatus.UnKnown, GlobalStatus.Begin, GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking, GlobalStatus.RollbackRetrying, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.AsyncCommitting));

[Pro] : Reloadable 对象体系
public interface Reloadable void reload(); // 这里的实现类主要为 FileSessionManager

五 . 扩展知识点 5.1 LoadLevel 的作用@LoadLevel(name = " file" , scope = Scope.PROTOTYPE)
LoadLevel 注解提供了三个参数 :@Documented @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE, ElementType.METHOD) public @interface LoadLevel String name(); // 在类似链式调用的过程中 , 可以对 Provider 进行排序 int order() default 0; Scope scope() default Scope.SINGLETON; // 作用域范围 public enum Scope SINGLETON, PROTOTYPE

总结LoadLevel 和 MATA-INF 真正的作用是用于扩展不同的数据库 , 后续等 seata 梳理完成后 , 再来看一下如何进行定制.
【盘点 Seata : Server 端事务的 Session 如何处理()】自此 Session 的处理类初始化完成 , 后面来看一下 Session 在调用过程中的处理和数据库处理

    推荐阅读