Hello Mr.J——路由解析

上次分析了一个SQL语句,传到MyCat这里,对他做的一系列的判断从而处理一些不需要执行到数据库的操作。这次我们在分析完SQL语句之后,开始执行SQL语句,并且需要根据我们设置的分片规则,取出所有的数据,拼接成完整的结果。
首先,在执行SQL语句之前,进行了一系列的检查工作,数据库的检查,schema中的虚拟库(原来的注释将这个叫Schema)的检查,兼容一些第三方的工具,并且支持sql语句中换一个数据库查询。

public void execute(String sql, int type) { //连接状态检查 if (this.isClosed()) { LOGGER.warn("ignore execute ,server connection is closed " + this); return; } // 事务状态检查 if (txInterrupted) { writeErrMessage(ErrorCode.ER_YES, "Transaction error, need to rollback." + txInterrputMsg); return; }// 检查当前使用的DB String db = this.schema; boolean isDefault = true; if (db == null) { db = SchemaUtil.detectDefaultDb(sql, type); if (db == null) { writeErrMessage(ErrorCode.ERR_BAD_LOGICDB, "No MyCAT Database selected"); return; } isDefault = false; }// 兼容PhpAdmin's, 支持对MySQL元数据的模拟返回 // 处理对information_schema的查询语句 if (ServerParse.SELECT == type && db.equalsIgnoreCase("information_schema") ) { MysqlInformationSchemaHandler.handle(sql, this); return; }// 兼容MySQLWorkbench if (ServerParse.SELECT == type && sql.contains("mysql") && sql.contains("proc")) {SchemaUtil.SchemaInfo schemaInfo = SchemaUtil.parseSchema(sql); if (schemaInfo != null && "mysql".equalsIgnoreCase(schemaInfo.schema) && "proc".equalsIgnoreCase(schemaInfo.table)) {MysqlProcHandler.handle(sql, this); return; } }//获取Schema.xml中配置的DB SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db); if (schema == null) { writeErrMessage(ErrorCode.ERR_BAD_LOGICDB, "Unknown MyCAT Database '" + db + "'"); return; }//修复navicat中SELECT STATE AS `State`, ROUND(SUM(DURATION),7) AS `Duration`, CONCAT(ROUND(SUM(DURATION)/*100,3), '%') AS `Percentage` FROM INFORMATION_SCHEMA.PROFILING WHERE QUERY_ID= GROUP BY STATE ORDER BY SEQ if(ServerParse.SELECT == type &&sql.contains(" INFORMATION_SCHEMA.PROFILING ")&&sql.contains("CONCAT(ROUND(SUM(DURATION)/")) { InformationSchemaProfiling.response(this); return; }//改写schema.xml中配置的虚拟DB,使用sql语句中的虚拟DB if (isDefault && schema.isCheckSQLSchema() && isNormalSql(type)) { SchemaUtil.SchemaInfo schemaInfo = SchemaUtil.parseSchema(sql); if (schemaInfo != null && schemaInfo.schema != null && !schemaInfo.schema.equals(db)) { SchemaConfig schemaConfig = MycatServer.getInstance().getConfig().getSchemas().get(schemaInfo.schema); if (schemaConfig != null) schema = schemaConfig; } }//路由执行SQL routeEndExecuteSQL(sql, type, schema); }

之后调用的一开始初始化就有的RouteService,使用Route方法进行查询。

public void routeEndExecuteSQL(String sql, int type, SchemaConfig schema) { // 路由计算 RouteResultset rrs = null; try { rrs = MycatServer .getInstance() .getRouterservice() .route(MycatServer.getInstance().getConfig().getSystem(), schema, type, sql, this.charset, this); } catch (Exception e) { StringBuilder s = new StringBuilder(); LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e); String msg = e.getMessage(); writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg); return; } if (rrs != null) { // session执行 session.execute(rrs, rrs.isSelectForUpdate()?ServerParse.UPDATE:type); } }

在route方法中,主要是加了一个缓存的查询,和注解的处理。

public RouteResultset route(SystemConfig sysconf, SchemaConfig schema, int sqlType, String stmt, String charset, ServerConnection sc) throws SQLNonTransientException { RouteResultset rrs = null; String cacheKey = null; /** *SELECT 类型的SQL, 检测 */ if (sqlType == ServerParse.SELECT) { //先获取缓存中的查询结果 cacheKey = schema.getName() + stmt; rrs = (RouteResultset) sqlRouteCache.get(cacheKey); if (rrs != null) { checkMigrateRule(schema.getName(), rrs, sqlType); return rrs; } } /*!mycat: sql = select name from aa */ /*!mycat: schema = test */ //判断SQL语句中是不是有mycat注解 int hintLength = RouteService.isHintSql(stmt); if (hintLength != -1) { int endPos = stmt.indexOf("*/"); if (endPos > 0) { // 用!mycat:内部的语句来做路由分析 String hint = stmt.substring(hintLength, endPos).trim(); int firstSplitPos = hint.indexOf(HINT_SPLIT); if (firstSplitPos > 0) { Map hintMap = parseHint(hint); String hintType = (String) hintMap.get(MYCAT_HINT_TYPE); String hintSql = (String) hintMap.get(hintType); if (hintSql.length() == 0) { LOGGER.warn("comment int sql must meet :/*!mycat:type=value*/ or /*#mycat:type=value*/ or /*mycat:type=value*/: " + stmt); throw new SQLSyntaxErrorException("comment int sql must meet :/*!mycat:type=value*/ or /*#mycat:type=value*/ or /*mycat:type=value*/: " + stmt); } String realSQL = stmt.substring(endPos + "*/".length()).trim(); //处理注解 HintHandler hintHandler = HintHandlerFactory.getHintHandler(hintType); if (hintHandler != null) {if (hintHandler instanceof HintSQLHandler) { /** * 修复 注解SQL的 sqlType 与 实际SQL的 sqlType 不一致问题, 如: hint=SELECT,real=INSERT * fixed by zhuam */ int hintSqlType = ServerParse.parse(hintSql) & 0xff; rrs = hintHandler.route(sysconf, schema, sqlType, realSQL, charset, sc, tableId2DataNodeCache, hintSql, hintSqlType, hintMap); } else { rrs = hintHandler.route(sysconf, schema, sqlType, realSQL, charset, sc, tableId2DataNodeCache, hintSql, sqlType, hintMap); }} else { LOGGER.warn("TODO , support hint sql type : " + hintType); }} else {//fixed by runfriends@126.com LOGGER.warn("comment in sql must meet :/*!mycat:type=value*/ or /*#mycat:type=value*/ or /*mycat:type=value*/: " + stmt); throw new SQLSyntaxErrorException("comment in sql must meet :/*!mcat:type=value*/ or /*#mycat:type=value*/ or /*mycat:type=value*/: " + stmt); } } } else { stmt = stmt.trim(); //非注解语句执行查询 rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt, charset, sc, tableId2DataNodeCache); }if (rrs != null && sqlType == ServerParse.SELECT && rrs.isCacheAble()) { sqlRouteCache.putIfAbsent(cacheKey, rrs); } //数据迁移的切换准备阶段,需要拒绝写操作和所有的跨多节点写操作 checkMigrateRule(schema.getName(), rrs, sqlType); return rrs; }

【Hello Mr.J——路由解析】最后会根据是否有注解,调用不同的route的方法,这里就不管他这个注解了,毕竟还没用过。。
来看看普通SQL语句的route方法。
路由执行之前,会进行一些处理操作,比如把Insert语句生成全局自增ID,之后会进行对SQL语句的拦截改写,虽然并没看懂这里是做什么用的。。然后会处理DDL语句,就是CREATE,ALTER这种对表进行操作的语句。最后才会到普通的语句路由处理。

public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL, String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {//对应schema标签checkSQLschema属性,把表示schema的字符去掉 if (schema.isCheckSQLSchema()) { origSQL = RouterUtil.removeSchema(origSQL, schema.getName()); }/** * 处理一些路由之前的逻辑 * 全局序列号,父子表插入 */ if (beforeRouteProcess(schema, sqlType, origSQL, sc)) { return null; }/** * SQL 语句拦截 */ String stmt = MycatServer.getInstance().getSqlInterceptor().interceptSQL(origSQL, sqlType); if (!origSQL.equals(stmt) && LOGGER.isDebugEnabled()) { LOGGER.debug("sql intercepted to " + stmt + " from " + origSQL); }RouteResultset rrs = new RouteResultset(stmt, sqlType); /** * 优化debug loaddata输出cache的日志会极大降低性能 */ if (LOGGER.isDebugEnabled() && origSQL.startsWith(LoadData.loadDataHint)) { rrs.setCacheAble(false); }/** * rrs携带ServerConnection的autocommit状态用于在sql解析的时候遇到 * select ... for update的时候动态设定RouteResultsetNode的canRunInReadDB属性 */ if (sc != null) { rrs.setAutocommit(sc.isAutocommit()); }/** * DDL 语句的路由 */ if (ServerParse.DDL == sqlType) { return RouterUtil.routeToDDLNode(rrs, sqlType, stmt, schema); }/** * 检查是否有分片 */ if (schema.isNoSharding() && ServerParse.SHOW != sqlType) { rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt); } else { RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs); if (returnedSet == null) { rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool); } }return rrs; }

这里我主要看一下Select的分片处理。

private RouteResultset analyseDoubleAtSgin(SchemaConfig schema, RouteResultset rrs, String stmt) throws SQLSyntaxErrorException { String upStmt = stmt.toUpperCase(); int atSginInd = upStmt.indexOf(" @@"); if (atSginInd > 0) { return RouterUtil.routeToMultiNode(false, rrs, schema.getMetaDataNodes(), stmt); } return RouterUtil.routeToSingleNode(rrs, schema.getRandomDataNode(), stmt); }

这里判断了一下是否有@@这个标志,有的话就是多个分片,没有就走单个分片。没看到在哪里加上的,我猜测是在之前route方法中的拦截,加入到原来的Select语句中。
之后的方法,加载了配置文件中的datanode并且根据配置,返回了路由信息的实体。

public static RouteResultset routeToMultiNode(boolean cache, RouteResultset rrs, Collection dataNodes, String stmt) { RouteResultsetNode[] nodes = new RouteResultsetNode[dataNodes.size()]; int i = 0; RouteResultsetNode node; for (String dataNode : dataNodes) { node = new RouteResultsetNode(dataNode, rrs.getSqlType(), stmt); node.setSource(rrs); if (rrs.getDataNodeSlotMap().containsKey(dataNode)) { node.setSlot(rrs.getDataNodeSlotMap().get(dataNode)); } if (rrs.getCanRunInReadDB() != null) { node.setCanRunInReadDB(rrs.getCanRunInReadDB()); } if (rrs.getRunOnSlave() != null) { nodes[0].setRunOnSlave(rrs.getRunOnSlave()); } nodes[i++] = node; } rrs.setCacheAble(cache); rrs.setNodes(nodes); return rrs; }


一直到这里,都是分析SQL语句进行处理,并且读取路由的信息,并没有执行查询。路由的信息返回之后,会在session.excute方法中进行查询,就是第二段代码的最后一句。
哎呦,越来越看不懂了。。。

    推荐阅读