数据传输 | dtle 之库表重命名

作者:陈怡
爱可生南分团队 DBA,负责公司自动化运维平台维护和处理客户问题。
本文来源:原创投稿
*爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。
前言 DTLE 是开源的数据传输组件,支持 MySQL 多种使用场景的数据传输。 可能会遇到这样的场景,将数据传输到目的端时,目的端的库名想要与源端的库名不一样。或者传输到目的端时,库名与源端的一样,但是想重命名表名与源端的不同。本文将简单介绍 DTLE 如何设置满足这样的场景。
安装部署 1、3.21.10.0 版本 rpm 包下载地址
https://github.com/actiontech/dtle/releases/download/v3.21.10.1/dtle-ce-3.21.10.1.x86_64.rpm

2、安装
rpm -ivh dtle-ce-3.21.10.1.x86_64.rpm --prefix=/data/dtle

安装完成后,dtle 的相关日志会位于 /data/dtle/var/log 目录下
3、启动 dtle
systemctl start dtle-consul dtle-nomad

启动 dtle 之后,我们就可以创建任务来实现我们的数据传输了。
测试环境准备 1、准备两个 5.7 版本的 MySQL 实例,分别用作源端和目的端数据库。
2、在源库,创建测试数据如下
mysql> create database dtle_d1; Query OK, 1 row affected (0.00 sec)mysql> use dtle_d1 Database changed mysql> create table dtle_t1(id int,name varchar(20),PRIMARY KEY(id)); Query OK, 0 rows affected (0.02 sec)mysql> insert into dtle_t1 values (1,'xiaoming'),(2,'xiaohong'),(3,'xiaofang'); Query OK, 3 rows affected (0.00 sec) Records: 3Duplicates: 0Warnings: 0mysql> select * from dtle_t1; +----+----------+ | id | name| +----+----------+ |1 | xiaoming | |2 | xiaohong | |3 | xiaofang | +----+----------+ 3 rows in set (0.00 sec)mysql> create database dtle_db2; Query OK, 1 row affected (0.01 sec)mysql> use dtle_db2 Database changed mysql> create table dtle_table2(id int,name varchar(20),tag varchar(20),PRIMARY KEY(id)); Query OK, 0 rows affected (0.01 sec)mysql> insert into dtle_table2 values(1,'mao','0'),(2,'gou','0'),(3,'tu',0),(4,'shu','0'),(5,'yu','0'); Query OK, 5 rows affected (0.00 sec) Records: 5Duplicates: 0Warnings: 0mysql> select * from dtle_table2; +----+------+------+ | id | name | tag| +----+------+------+ |1 | mao| 0| |2 | gou| 0| |3 | tu| 0| |4 | shu| 0| |5 | yu| 0| +----+------+------+ 5 rows in set (0.00 sec)

3、安装 jq 工具以方便在 Linux 上查询任务状态。此为可选项,非必选项。
yum install jq -y

实现数据传输 在启动 dtle 组件后,先创建作业配置,然后启动作业,就可以实现通过 dtle 完成 MySQL 到 MySQL 的数据传输。作业配置一般采用 json ( HTTP API 提交) 或 hcl ( nomad 命令行工具提交)文件。对应文件的样例模板位于 /data/dtle/usr/share/dtle/scripts 目录下。 本文将介绍采用 HTTP API 提交 json 文件 配置的方式启动 job 实现数据传输。
场景一: 1、实现将源端 dtle_d1 库数据传输到目的端时,目的端库名修改为 dtle_d1_new,表名不变 。 创建 /data/dtle/usr/share/dtle/scripts/job1.json 文件如下:
{ "Job": { "ID": "job1",# 指定 job id,查看 job 状态时需要用到该 id "Datacenters": ["dc1"], "TaskGroups": [ { "Name": "src", "Tasks": [{ "Name": "src", "Driver": "dtle", "Config": { "ReplicateDoDb": [{# 指定复制的库名/表名,当该配置为空时,则复制整个实例 "TableSchema": "dtle_d1",# 指定复制的库名 "TableSchemaRename": "dtle_d1_new" # 设置库复制到目的端时,新的数据库名 }], "GroupMaxSize": 1024,# 源端发送数据时, 等待数据包达到一定大小后发送该包. 单位为字节. 值为 1 时则表示即刻发送数据 "GroupTimeout": 100,# 等待包超时时间。如果等待指定时间后,数据包大小还未到达 GroupMaxSize 值,则直接发送当前数据包 "DropTableIfExists": true, "Gtid": "",# 1、为空时,则全量+增量进行复制。2、填写为已复制的 GTID 集合, 则将从未复制的 GTID 开始增量复制 "ChunkSize": 2000,# 可以控制全量复制时,每次读取-传输-写入的行数 "ConnectionConfig": {# 配置访问源端的方式,ip、端口、用户名、密码 "Host": "10.186.65.16", "Port": 3316, "User": "root", "Password": "root" } } }], "RestartPolicy": { "Attempts": 3, "Interval": 600000000000, "Delay": 15000000000, "Mode": "delay" } }, { "Name": "dest", "Tasks": [{ "Name": "dest", "Driver": "dtle", "Config": { "ConnectionConfig": {# 配置访问目的端的数据库访问方式 "Host": "10.186.65.15", "Port": 3316, "User": "root", "Password": "root" } } }], "RestartPolicy": { "Attempts": 3, "Interval": 600000000000, "Delay": 15000000000, "Mode": "delay" } } ], "ReschedulePolicy": { "Attempts": 1, "Interval": 1800000000000, "Unlimited": false } } }

注意:上述注释部分是为了方便快速介绍各配置项意思,实际使用时,需要去掉注释部分。
2、启动 job
[root@10-186-65-5 ~]# cd /data/dtle/usr/share/dtle/scripts/ [root@10-186-65-5 scripts]# curl -XPOST "http://10.186.65.5:4646/v1/jobs" -d @job1.json -s | jq { "EvalID": "98023d12-82e7-76f6-9711-2ca17b500221", "EvalCreateIndex": 2950, "JobModifyIndex": 2950, "Warnings": "", "Index": 2950, "LastContact": 0, "KnownLeader": false }

3、根据json文件中的 job id 可查看到任务的状态是否 running,如果不记得 job id,也可以执行命令获取 job id
[root@10-186-65-5 ~]#curl -s -XGET 127.0.0.1:4646/v1/jobs | jq '.[].ID' "job1" [root@10-186-65-5 ~]#curl -s -XGET 127.0.0.1:4646/v1/job/job1 | jq '.Status' "running"

4、查看目的端数据库,源端数据已全量到目的端,且测试增量数据也能正常同步。
##### 目的端查看数据同步情况 mysql> show databases; +--------------------+ | Database| +--------------------+ | information_schema | | dtle| | dtle_d1_new| | mysql| | performance_schema | | sys| +--------------------+ 6 rows in set (0.00 sec)mysql> select * from dtle_d1_new.dtle_t1; +----+----------+ | id | name| +----+----------+ |1 | xiaoming | |2 | xiaohong | |3 | xiaofang | +----+----------+ 3 rows in set (0.00 sec)##### 源端新增数据 mysql> insert into dtle_t1 value(4,"xiaobai"),(5,"xiaosu"); Query OK, 2 rows affected (0.10 sec) Records: 2Duplicates: 0Warnings: 0##### 目的端查询到数据已经同步过来 mysql> select * from dtle_d1_new.dtle_t1; +----+----------+ | id | name| +----+----------+ |1 | xiaoming | |2 | xiaohong | |3 | xiaofang | |4 | xiaobai| |5 | xiaosu| +----+----------+ 5 rows in set (0.00 sec)

场景二: 1、实现将源端 dtle_db2.dtle_table2 表数据传输到目的端时,目的端表名修改为 dtle_db2.dtle_table2_new。 创建 /data/dtle/usr/share/dtle/scripts/job2.json 文件如下:
{ "Job": { "ID": "job2",# 指定 job id,查看 job 状态时需要用到该 id "Datacenters": ["dc1"], "TaskGroups": [ { "Name": "src", "Tasks": [{ "Name": "src", "Driver": "dtle", "Config": { "ReplicateDoDb": [{ "TableSchema": "dtle_db2",# 指定源端要复制的库名 "Tables": [{ "TableName": "dtle_table2",# 指定源端要复制的表 "TableRename":"dtle_table2_new" # 设置库复制到目的端时,新的表名 }] }], "GroupMaxSize": 1024, "GroupTimeout": 100, "DropTableIfExists": true, "Gtid": "", "ChunkSize": 2000, "ConnectionConfig": { "Host": "10.186.65.16", "Port": 3316, "User": "root", "Password": "root" } } }], "RestartPolicy": { "Attempts": 3, "Interval": 600000000000, "Delay": 15000000000, "Mode": "delay" } }, { "Name": "dest", "Tasks": [{ "Name": "dest", "Driver": "dtle", "Config": { "ConnectionConfig": { "Host": "10.186.65.15", "Port": 3316, "User": "root", "Password": "root" } } }], "RestartPolicy": { "Attempts": 3, "Interval": 600000000000, "Delay": 15000000000, "Mode": "delay" } } ], "ReschedulePolicy": { "Attempts": 1, "Interval": 1800000000000, "Unlimited": false } } }

注意:上述注释部分是为了方便快速介绍各配置项意思,实际使用时,需要去掉注释部分。
2、启动 job
[root@10-186-65-5 ~]# cd /data/dtle/usr/share/dtle/scripts/ [root@10-186-65-5 scripts]# curl -XPOST "http://10.186.65.5:4646/v1/jobs" -d @job2.json -s | jq { "EvalID": "9a54b81f-e139-e65a-20ec-a1b9f622f27e", "EvalCreateIndex": 7039, "JobModifyIndex": 7039, "Warnings": "", "Index": 7039, "LastContact": 0, "KnownLeader": false }

3、查看 job 执行状态
[root@10-186-65-5 scripts]# curl -s -XGET 10.186.65.5:4646/v1/job/job2 | jq '.Status' "running"

4、检查数据同步情况,查看目的端数据库,源端数据已全量到目的端,且测试增量数据也能正常同步。
##### 目的端查看 mysql> show databases; +--------------------+ | Database| +--------------------+ | information_schema | | dtle| | dtle_d1_new| | dtle_db2| | mysql| | performance_schema | | sys| +--------------------+ 7 rows in set (0.00 sec)mysql> use dtle_db2 Reading table information for completion of table and column names You can turn off this feature to get a quicker startup with -ADatabase changed mysql> show tables; +--------------------+ | Tables_in_dtle_db2 | +--------------------+ | dtle_table2_new| +--------------------+ 1 row in set (0.00 sec)mysql> select * from dtle_table2_new; +----+------+------+ | id | name | tag| +----+------+------+ |1 | mao| 0| |2 | gou| 0| |3 | tu| 0| |4 | shu| 0| |5 | yu| 0| +----+------+------+ 5 rows in set (0.00 sec)##### 源端更新数据 mysql> update dtle_table2 set tag='1' where id >3; Query OK, 2 rows affected (0.00 sec) Rows matched: 2Changed: 2Warnings: 0##### 目的端查看更新语句已经同步 mysql> select * from dtle_table2_new; +----+------+------+ | id | name | tag| +----+------+------+ |1 | mao| 0| |2 | gou| 0| |3 | tu| 0| |4 | shu| 1| |5 | yu| 1| +----+------+------+ 5 rows in set (0.00 sec)

其他 如果要停止删除掉启动的 job ,比如删除 job2 任务,如下命令操作:
[root@10-186-65-5 scripts]#curl -s -XGET 10.186.65.5:4646/v1/jobs | jq '.[].ID' "job1" "job2" [root@10-186-65-5 scripts]# curl -s -XDELETE 10.186.65.5:4646/v1/job/job2?purge=true |jq { "EvalID": "42889a00-9239-21e4-761c-6684abb0316c", "EvalCreateIndex": 7067, "JobModifyIndex": 7067, "VolumeEvalID": "", "VolumeEvalIndex": 0, "Index": 7067, "LastContact": 0, "KnownLeader": false } [root@10-186-65-5 scripts]#curl -s -XGET 10.186.65.5:4646/v1/jobs | jq '.[].ID' "job1"

dtle 用户手册链接:
【数据传输 | dtle 之库表重命名】https://actiontech.github.io/...

    推荐阅读