DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战

DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战
文章图片

背景 MySQL库A 到 MySQL库B的增量数据同步需求
DolphinScheduler中配置DataX MySQL To MySQL工作流 工作流定义 工作流定义 > 创建工作流 > 拖入1个SHELL组件 > 拖入1个DATAX组件
SHELL组件(文章)
脚本

echo '文章同步 MySQL To MySQL'

DATAX组件(t_article)
用到2个插件mysqlreader^[1]、mysqlwriter^[2]
选 自定义模板:
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "connection": [ { "jdbcUrl": [ "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库A?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false" ], "querySql": [ "select a.id,a.title,a.content,a.is_delete,a.delete_date,a.create_date,a.update_date from t_article a.update_date >= '${biz_update_dt}'; " ] } ], "password": "${biz_mysql_password}", "username": "${biz_mysql_username}" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [ "`id`", "`title`", "`content`", "`is_delete`", "`delete_date`", "`create_date`", "`update_date`" ], "connection": [ { "jdbcUrl": "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库B?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false", "table": [ "t_article" ] } ], "writeMode": "replace", "password": "${biz_mysql_password}", "username": "${biz_mysql_username}" } } } ], "setting": { "errorLimit": { "percentage": 0, "record": 0 }, "speed": { "channel": 1, "record": 1000 } } } }

reader和writer的字段配置需保持一致
自定义参数:
biz_update_dt: ${global_bizdate} biz_mysql_host: 你的mysql ip biz_mysql_port: 3306 biz_mysql_username: 你的mysql账号 biz_mysql_password: 你的mysql密码# 本文实验环境A库和B库用的同一个实例,如果MySQL是多个实例,可以再新增加参数定义例如 biz_mysql_host_b,在模板中对应引用即可

配置的自定义参数将会自动替换json模板中的同名变量
DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战
文章图片

reader mysqlreader插件中关键配置: a.update_date >= '${biz_update_dt}' 就是实现增量同步的关键配置
writer mysqlwriter插件中关键配置: ``
"parameter": { "writeMode": "replace", ...... }

writeMode为replace,相同主键id重复写入数据,就会更新数据。sql本质上执行的是 replace into
保存工作流 全局变量设置
global_bizdate: $[yyyy-MM-dd 00:00:00-1]
global_bizdate 引用的变量为 DolphinScheduler 内置变量,具体参考官网文档^[3]
结合调度时间设计好时间滚动的窗口时长,比如按1天增量,那么这里时间就是减1天
DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战
文章图片

最终的工作流DAG图为:
DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战
文章图片

爬坑记录
  • 官网下载的DataX不包含ElasticSearchWriter写插件
    默认不带该插件,需要自己编译ElasticSearchWriter插件。
    git clone https://github.com/alibaba/DataX.git

    为了加快编译速度,可以只编译elasticsearchwriter
    项目根目录的pom.xml
    全注释掉,下只保留elasticsearchwriter其他注释掉,另外也需要保留
如果自己不想编译或者编译失败请搜索 "流水理鱼"微信公众号,或者加我私人微信我给你已经编译好的插件包
by 流水理鱼|wwek
参考 【DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战】1. DataX MysqlReader 插件文档 https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
2. DataX MysqlWriter 插件文档 https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md
3. Apache DolphinScheduler 内置参数 https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/parameter/built-in.html
本文首发于流水理鱼博客,如要转载请注明出处。
欢迎关注我的公众号:流水理鱼(liushuiliyu),全栈、云原生、Homelab交流。
如果您对相关文章感兴趣,也可以关注我的博客:www.iamle.com 上面有更多内容

    推荐阅读