gbase8a|基于datax实现从gbase到mysql的数据迁移--时间字段篇

项目背景 前期已经写过几篇用datax实现异构数据迁移的文章,面对复杂的上游数据,无法用一种通用的方式来实现所有业务表的迁移,比如一个大业务表中rowid字段与表记录差异特别大(一个表有3亿条记录,里面的最大rowid为30亿),这种情况下通过rowid切片也能实现迁移,但切片后的数据分布不均匀,这时基于时间字段来迁移则要顺利得多。
系统环境 【gbase8a|基于datax实现从gbase到mysql的数据迁移--时间字段篇】gbase8a 16节点集群
mysql5.6.46主从
迁移策略 迁移数据有几种方式,需要根据实际的情况来决定采用哪一钟,面对复杂的业务数据,很难有标准的方式来实现所有业务数据的迁移,现把我遇到的迁移策略整理如下:
1、对于小表(百万级)进行批量迁移
2、大表无时间字段(千万以及亿级),通过rowid字段切片
3、大表有时间字段,但rowid比表记录大几倍甚至几十倍的情况 ,基于时间字段切片迁移
通过几上三种方式组合,完美的解决了目前项目遇到的数据迁移问题
实现脚本 datax配置文件

{ "job": { "setting": { "speed": { "byte": 1048576 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "$source_reader", "parameter": { "username": "$source_db_user", "password": "$source_db_pwd", "connection": [ { "querySql": [ "select * from $source_table_name where $date_column>='$start_time' and$date_column<'$end_time'; " ], "jdbcUrl": [ "$source_db_conn" ] } ], "fetchSize": 1024 } }, "writer": { "name": "$target_reader", "parameter": { "username": "$target_db_user", "password": "$target_db_pwd", "session": [], "preSql": [], "column": ["REC_ID","PRO_ORG_NO","TG_ID","ORG_NO","TG_NO","TG_NAME","STAT_DATE","CONS_NUM","TG_CAP","PPQ","UPQ","LOSS_PQ","LINELOSS_RATE","ABNORMAL_LL_TYPE","METER_SUM","METER_COVER_SUM","COVER_RATE","IS_COVER","METER_SUCC_SUM","SUCC_RATE","IS_MONITOR","COLL_TYPE","IS_CALC","EVAL_TYPE","BUF_UPDATETIME","PUBLIC_MET_CNT","LOW_MET_CNT","DISTRIBUTED_MET_CNT","RESP_EMP_NO"], "connection": [ { "table": [ "$target_table_name" ], "jdbcUrl": "$target_db_conn" } ] } } } ] } }

迁移脚本文件/data/datax/job/gbasetomysql.json
#!/bin/bash #function: #version:0.3 #author:lineqi #crt_time:2020-04-26 #大表同步思路 #1、从gbase8a数据库里获取大表的名称和时间字段,按天生成同步记录,存储在table_name.txt文件中,格式为buf_aa.t_xx_pb_aayjl:in_xx_gbase_aa.t_xx_pb_aayjl:fssj:2020-06-01:2020-06-02,该步骤先通过手动实现,可以一天、两天,甚至一周为单位进行切分 #2、循环读取table_name.txt中的内容 #3、以天为单位抽取数据#datax命令调用案例 #eg:/opt/datax/bin/datax.py -p "-Dsource_reader=${v_gbase_reader}"/opt/datax/job/mysqltest.json >> /opt/datax/log/table_name_2020-04-26.log#定义文件与命令执行路径v_table_list='/data/datax/job/table_name.txt' v_exec_command='/data/datax/bin/datax.py' v_path_json='/data/datax/job/gbasetomysql.json' v_path_log='/data/datax/log/'#定义常用参数 v_source_table_name='' v_target_table_name='' v_sync_start_time=`date -d "today" +"%Y-%m-%d-%H-%M-%S"` v_start_time='' v_end_time='' v_append_time=" 00:00:00" v_date_column=''#定义源数据库的连接方式 v_gbase_user='qyw_qjqx' v_gbase_pwd='qyw12_19QJQX#' v_gbase_conn='jdbc:gbase://10.xx.xx.28:15258/xx_amr' v_gbase_reader='rdbmsreader'#定义目标数据库的连接方式 v_mysql_user='qjqx' v_mysql_pwd='Whayer1234' v_mysql_reader='mysqlwriter' v_mysql_conn='jdbc:mysql://25.xx.xx.68:13306/xx_gbase8a_xx_amr'#从table_name.txt获取表名、表记录数并计算分片 for table_name in `cat $v_table_list` do #get table_name v_source_table_name=`echo $table_name|awk -F ":" '{print $1}'` v_target_table_name=`echo $table_name|awk -F ":" '{print $2}'` v_date_column=`echo $table_name|awk -F ":" '{print $3}'` v_start_time=`echo $table_name|awk -F ":" '{print $4}'`"$v_append_time" v_end_time=`echo $table_name|awk -F ":" '{print $5}'`"$v_append_time"$v_exec_command -p "\ -Dsource_reader=${v_gbase_reader}\ -Dsource_db_user=${v_gbase_user} \ -Dsource_db_pwd=${v_gbase_pwd} \ -Dsource_db_conn=${v_gbase_conn} \ -Dsource_table_name=${v_source_table_name} \ -Ddate_column=${v_date_column} \ -Dstart_time='${v_start_time}' \ -Dend_time='${v_end_time}' \ -Dtarget_reader=${v_mysql_reader}\ -Dtarget_db_user=${v_mysql_user} \ -Dtarget_db_pwd=${v_mysql_pwd} \ -Dtarget_db_conn=${v_mysql_conn} \ -Dtarget_table_name=${v_target_table_name} \ " $v_path_json >>"$v_path_log"$v_source_table_name"$v_sync_start_time".log 2>&1# echo $v_table_name,$v_start_num,$v_end_numdone

表配置文件table_name.txt
buf_aa.t_xx_pb_aayjl:in_xx_gbase_aa.t_xx_pb_aayjl:fssj:2020-06-03:2020-06-04
buf_aa.t_xx_pb_aayjl:in_xx_gbase_aa.t_xx_pb_aayjl:fssj:2020-06-02:2020-06-03
buf_aa.t_xx_pb_aayjl:in_xx_gbase_aa.t_xx_pb_aayjl:fssj:2020-06-01:2020-06-02
说明:表配置文件中的第一行由7个部分组成,分别代表的含义如下
buf_aa:代表源库的数据库名或schema名
t_xx_pb_aayjl:代表源库的业务表名与目标下的业务表名一致
in_xx_gbase_aa:目标库数据库名或schema名
fssj:源库业务表中的时间字段
2020-06-01:源库业务表同步的起始时间
2020-06-02:源库业务表同步的结束时间
总结: 1、不管是基于时间切分、rowid切分、小表批量迁移,只要一次处理的数据不超过datax的遇到内存即可
2、表配置文件table_name.txt文件的时间没有带时、分、秒,这里是在shell脚本定义变量来实现的
3、当时基于时间切分迁移数据时,表配置文件table_name.txt里有500多条记录,当运行到400多条记录时,datax运行出问题了,将已经同步完成的400多条记录从table_name.txt清除后,继续则没有出现问题,有点奇怪。

    推荐阅读