elasticsearch插件分析(2)-elasticsearch-sql

elasticsearch插件分析(2)-elasticsearch-sql
文章图片

嗯,最近实在是闲的有点不知所措了
背景介绍 elasticsearch-sql插件是之前发现的一款可以用sql来代替es本身令人头疼的语法的插件。es的查询一般来说是使用curl去访问它的rest接口,大部分情况下如果我需要查询一些数据我都必须打开head插件然后小心谨慎的编写json查询字符串,时不时的还要判断自己时不时多了少了大括号逗号,其中苦闷可想而知;并且es的查询语法毕竟也没有到像sql一样可以熟练,每次查询的时候还是要去复制以前的模板过来修改。我个人为了工作是有收藏一些常用的查询语句拿来改的,但该插件可以使用sql语句去查询es索引,方便之余便也想探究它的源码。
依赖介绍
  • elasticsearch5.6.10(快速构造集群方法可以参考我之前的文章使用docker-compose构建elasticsearch集群)
  • idea (当然你可以使用别的ide)
  • elasticsearch-sql插件5.6.10.0
过程分析 搭建环境 首先还是一样,访问elasticsearch-sql的github地址,很意外的看到这是在NLPchina账号的仓库下,居然是国产的作品!那么更值得去分析一下了。下方的readme也提示了不同版本之间的对应关系,目前支持的最新版本是6.3.0.不过我最近在测试的是5.6.10版本。所以到本地目录做如下操作
git clone xxxx git tag git check 5.6.10.0

OK,我们成功check到5.6.10版本的源代码。接下来打开IDEA进行Import。导入过程中无脑next就完事了。看文件目录的文件有pom文件所以可以很清晰的确认该项目是由maven管理,剩下的就按照平时管理maven项目的方式进行处理就可以了。
总体分析 【elasticsearch插件分析(2)-elasticsearch-sql】首先可以看下工程的整个大致目录结构
├── BUILDING.md ├── LICENSE ├── README.md ├── doc │└── features.md ├── elasticsearch-sql.iml ├── open-source.pom.xml ├── pom.xml ├── src │├── _site │├── assembly │├── main │├── site-server │└── test └── target ├── classes ├── generated-sources ├── generated-test-sources └── test-classes

对整个工程会有一个大致的了解,然后打开pom文件浏览整个工程的依赖构成。稍微会关注几个依赖,比如es依赖包的版本是否正确。但这个时候看到一个比较出乎我意料的依赖
com.alibaba druid 1.0.15

我个人因为工作的关系和关系型数据库们打的交道不多,但是这个大名鼎鼎的产品我还是知道的
Druid是一个JDBC组件库,包括数据库连接池、SQL Parser等组件。DruidDataSource是最好的数据库连接池
我第一反应是为什么它会存在,这个插件主要是在和es集群互动,实际上不会使用到mysql驱动,并不会使用到JDBC这个组件,为什么pom文件中会有它的出现呢?其实往后看就明白了。
初步尝试 再回看上面的目录树,可以看到src目录下的几个子目录,有几个目录名字都是见名思义,这也是我觉得java圈子中一些规范的好处,约定大于配置。比如assembly目录下一定回事打包配置文件,main目录下有源码的根包,test目录下会有单元测试代码。所以顺利成章的我会去先通过单元测试来了解整个插件的源代码。
├── AggregationTest.java ├── CSVResultsExtractorTests.java ├── DeleteTest.java ├── ExplainTest.java ├── JDBCTests.java ├── JoinTests.java ├── MainTestSuite.java ├── MethodQueryTest.java ├── MultiQueryTests.java ├── MyTest.java ├── QueryTest.java ├── SQLFunctionsTest.java ├── ShowTest.java ├── SourceFieldTest.java ├── SqlParserTests.java ├── TestsConstants.java ├── UtilTests.java └── WktToGeoJsonConverterTests.java

上面是test目录下的文件结构,其中MyTest文件是我加的。
从文件名上可以猜测出对应es各个操作的测试以及一些其他的测试,比如AggregationTest就很容易猜测说它是聚合操作的相关测试,我们初来乍到,找一个最简单的测试,QueryTest.java。
@Test public void searchTypeTest() throws IOException, SqlParseException, SQLFeatureNotSupportedException{ SearchHits response = query(String.format("SELECT * FROM %s/phrase LIMIT 1000", TEST_INDEX)); Assert.assertEquals(4, response.getTotalHits()); }

上面是QueryTest类的第一个测试方法,看样子也很简单,做一次
SELECT * FROM TEST_INDEX LIMIT 1000的查询,结果如果等于4的话单元测试通过
tips:Assert是断言的意思,当然我知道你已经知道。
无脑直接运行,即使我知道我什么配置文件都没配置过。
java.lang.NullPointerException at org.nlpcn.es4sql.QueryTest.query(QueryTest.java:942) at org.nlpcn.es4sql.QueryTest.searchTypeTest(QueryTest.java:48) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

毫不意外地收到报错,但是为什么是空指针异常?我原来的猜测是肯定会跳出找不到集群,然后我跟随去配置集群地址就好。根据错误栈我来到了这个query方法
private SearchHits query(String query) throws SqlParseException, SQLFeatureNotSupportedException, SQLFeatureNotSupportedException { SearchDao searchDao = MainTestSuite.getSearchDao(); SqlElasticSearchRequestBuilder select = (SqlElasticSearchRequestBuilder) searchDao.explain(query).explain(); return ((SearchResponse)select.get()).getHits(); }

该方法并没有@Test的注解,不是单元测试方法,异常出现在SearchDao searchDao = MainTestSuite.getSearchDao(); searchDao是null,那么为什么会是null呢?继续跟踪到MainTestSuite类中,然后发现了新天地。其实这边也有约定大于配置的好处,看到TestSuite就知道这是个批量测试的类了。类中有两个注解@BeforeClass @AfterClass,刚才的原因就找到了,刚才直接获取searchDao没有经过预加载,所以是null。那么新的问题来了,我不想要运行整个TestSuite,我只想要运行一个测试方法,要怎么办呢?这时候需要稍微修改下代码了,回到QueryTest.java中,添加以下两个方法
@Before public void setup() throws Exception { MainTestSuite.setUp(); }@After public void end() throws InterruptedException { MainTestSuite.tearDown(); }

同时有一个地方要注意,除非你通过外部参数传入你的es的ip和端口,否则可以在MainTestSuite中做以下修改
protected static InetSocketTransportAddress getTransportAddress() throws UnknownHostException { String host = System.getenv("ES_TEST_HOST"); String port = System.getenv("ES_TEST_PORT"); if(host == null) { host = "localhost"; System.out.println("ES_TEST_HOST enviroment variable does not exist. choose default 'localhost'"); }if(port == null) { port = "9302"; System.out.println("ES_TEST_PORT enviroment variable does not exist. choose default '9300'"); }System.out.println(String.format("Connection details: host: %s. port:%s.", host, port)); return new InetSocketTransportAddress(InetAddress.getByName(host), Integer.parseInt(port)); }

将你的ip和端口直接修改。
接下来就可以直接在测试方法上右键运行了。
浅尝辄止 我并没有特别细致的阅读完整个源代码,我只想要找到我关注的点去仔细阅读。而在我拿到这份源代码的时候我有两点特别感兴趣
  • 通过什么方式来封装sql语句为es的请求
  • 有没有什么比较干净优雅的抽象方式
    这里我不再细致的列出我怎么翻到的步骤,而是上最终结果,同时也解答了前面为什么会有druid的疑惑。
    直接看看以下这个类DefaultQueryAction.java的explain方法
@Override public SqlElasticSearchRequestBuilder explain() throws SqlParseException { this.request = client.prepareSearch(); setIndicesAndTypes(); setFields(select.getFields()); setWhere(select.getWhere()); setSorts(select.getOrderBys()); setLimit(select.getOffset(), select.getRowCount()); boolean usedScroll = useScrollIfNeeded(select.isOrderdSelect()); if (!usedScroll) { request.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); } updateRequestWithIndexAndRoutingOptions(select, request); updateRequestWithHighlight(select, request); updateRequestWithCollapse(select, request); SqlElasticSearchRequestBuilder sqlElasticRequestBuilder = new SqlElasticSearchRequestBuilder(request); return sqlElasticRequestBuilder; }

对于es的api熟悉的人看到这个就明白了this.request = client.prepareSearch();
在这里类中创建了一个request请求,将select对象中已经把sql语句解析出来的结果以各种方式转换成request中的参数,最后直接发送这个request整个封装过程就结束了。那么这个select结果如何获得呢?我们看这个类ESActionFactory.java
public static QueryAction create(Client client, String sql) throws SqlParseException, SQLFeatureNotSupportedException { sql = sql.replaceAll("\n"," "); String firstWord = sql.substring(0, sql.indexOf(' ')); switch (firstWord.toUpperCase()) { case "SELECT": SQLQueryExpr sqlExpr = (SQLQueryExpr) toSqlExpr(sql); if(isMulti(sqlExpr)){ MultiQuerySelect multiSelect = new SqlParser().parseMultiSelect((SQLUnionQuery) sqlExpr.getSubQuery().getQuery()); handleSubQueries(client,multiSelect.getFirstSelect()); handleSubQueries(client,multiSelect.getSecondSelect()); return new MultiQueryAction(client, multiSelect); } else if(isJoin(sqlExpr,sql)){ JoinSelect joinSelect = new SqlParser().parseJoinSelect(sqlExpr); handleSubQueries(client, joinSelect.getFirstTable()); handleSubQueries(client, joinSelect.getSecondTable()); return ESJoinQueryActionFactory.createJoinAction(client, joinSelect); } else { Select select = new SqlParser().parseSelect(sqlExpr); handleSubQueries(client, select); return handleSelect(client, select); } case "DELETE": SQLStatementParser parser = createSqlStatementParser(sql); SQLDeleteStatement deleteStatement = parser.parseDeleteStatement(); Delete delete = new SqlParser().parseDelete(deleteStatement); return new DeleteQueryAction(client, delete); case "SHOW": return new ShowQueryAction(client,sql); default: throw new SQLFeatureNotSupportedException(String.format("Unsupported query: %s", sql)); } }

其中最关键的SQLQueryExpr sqlExpr = (SQLQueryExpr) toSqlExpr(sql);
SQLQueryExpr是druid中用来描述sql语句的类,不需要再自己重新封装,只需要利用阿里的工作成果即可~高
走到这里突然想到,其实druid是一个对于数据源的管理方式和工具,并不一定是结构数据库,如果说把es也看成一个数据源,是不是更好理解了呢?
结束 不过说到底这个插件我用的还是不多,不灵活,以及前期已经投入了很多对于es语法的学习成本,还有一点是,熟悉es的语法对于使用原生的javaAPI时很有帮助。

    推荐阅读