DataX -- 配置解析

DataX是阿里的异构数据源离线同步工具,详细的介绍和使用可以参考官网介绍和Quick Start。DataX系列主要是更详细的介绍整个运行的原理。
Configuration DataX配置的解析包括job.json、core.json、plugin.json这三个文件。这三个json文件都是多层级的json配置的,比如一个a.b.c=d的json,我们如果通过json获取,一般是先通过a这个key获取到b.c的json,然后再通过b这个key获取到c的josn,最后通过c这个key获取到d,这样代码写起来就很繁琐。
DataX提供了一个Configuration的类,可以直接把json进行压平,我们通过下面的例子来看看。

public static String JSON = "{'a': {'b': {'c': 'd'}}}"; public static void main(String[] args) { Configuration configuration = Configuration.from(JSON); System.out.println(configuration.get("a.b")); System.out.println(configuration.get("a.b.c")); System.out.println(configuration.get("a.b.d")); }

运行结果如下,可以看到通过Configuration可以很方便的获取到json的多层级的数据。除了get,还有合并merge、根据String类型取值getString、获取必填项getNecessaryValue等方法,这里就不一一介绍了。
{"c":"d"} d null

job.json job.json是作业的配置文件,在任务运行之前,通过参数把配置文件的全路径传入,所以名称是可以自定义的。
主要配置的内容包括job.content.reader、job.content.writer以及job.setting.speed,reader和writer可以参考每个对应模块里resources/plugin_job_template.json这个文件,也可以通过命令直接获取,这种方式做Quick Start里有示例。主要是指定用哪个reader进行读取数据,哪个writer进行写入数据,以及reader和writer的相关配置信息。
setting.speed主要的流速的控制,这个后面再来详细讲解。
{ "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { } }, "writer": { "name": "streamwriter", "parameter": { } } } ], "setting": { "speed": { "channel": 5 } } } }

core.json 全路径是在DATAX_HOME/conf/core.json,配置一些全局的信息,比如taskGroup的channel个数,类型转换就是这里配置的。
{ "entry": { "jvm": "-Xms1G -Xmx1G", "environment": {} }, "common": { "column": { "datetimeFormat": "yyyy-MM-dd HH:mm:ss", "timeFormat": "HH:mm:ss", "dateFormat": "yyyy-MM-dd", "extraFormats":["yyyyMMdd"], "timeZone": "GMT+8", "encoding": "utf-8" } }, "core": { "dataXServer": { "address": "http://localhost:7001/api", "timeout": 10000, "reportDataxLog": false, "reportPerfLog": false }, "transport": { "channel": { "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel", "speed": { "byte": -1, "record": -1 }, "flowControlInterval": 20, "capacity": 512, "byteCapacity": 67108864 }, "exchanger": { "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger", "bufferSize": 32 } }, "container": { "job": { "reportInterval": 10000 }, "taskGroup": { "channel": 5 }, "trace": { "enable": "false" }}, "statistics": { "collector": { "plugin": { "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector", "maxDirtyNumber": 10 } } } } }

plugin.json plugin.json的全路径是DATAX_HOME/plugin/reader/streamreader/plugin.json,这个streamreader和上面job.json里是对应的关系。
这个文件主要的内容就是name和class,class就是运行时要用到的插件类。由于会有reader和writer,所以这里会加载两个plugin.json。
{ "name": "streamreader", "class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader", "description": { "useScene": "only for developer test.", "mechanism": "use datax framework to transport data from stream.", "warn": "Never use it in your real job." }, "developer": "alibaba" }

【DataX -- 配置解析】以上job.json、core.json、plugin.json这三个文件加载后会通过merge方法进行合并,所以最终得到的Configuration就是这些文件的合并信息,后面就是通过Configuration来启动插件。

    推荐阅读