MySQLmom程序全量同步MySQL表数据到ES

人生必须的知识就是引人向光明方面的明灯。这篇文章主要讲述MySQLmom程序全量同步MySQL表数据到ES相关的知识,希望能为你提供帮助。
特殊提示:
本次演示的ES的版本为公司内部定制的elasticsearch-5.0.0的版本测试的,而且ES是单节点安装。本次演示从一个全量同步mysql表数据到elasticsearch-5.0.0开始
一、创建全量同步配置文件

[root@tidb05 mysqlsmom]# mom new test_mom/init_config.py -t init --force /usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version! RequestsDependencyWarning) new config at /data1/soft/mysqlsmom/test_mom/init_config.py [root@tidb05 mysqlsmom]# echo $? 0

说明:test_mom是可以指定名称的。
解决其中一个warning警告,其实对本次演示时没任何影响的,但是对于洁癖的我,看着总觉得不爽。具体warning警告内容和解决办法如下:
[root@tidb05 mysqlsmom]# mom new 197_testdb_mom/init_config.py -t init --force /usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version! RequestsDependencyWarning) new config at /data1/soft/mysqlsmom/197_testdb_mom/init_config.py[root@tidb05 mysqlsmom]# ll /data1/soft/mysqlsmom/197_testdb_mom/init_config.py -rw-r--r-- 1 root root 1298 Jul 11 10:46 /data1/soft/mysqlsmom/197_testdb_mom/init_config.py

上面的/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version!**解决办法:** pip uninstall urllib3 -y pip uninstall chardet -y pip install requests

二、案例配置演示 案例一:全量同步MySQL某张表全部字段数据到es
创建测试库和测试表,并写入测试数据到test01测试表:
create database stdb01; CREATE TABLE `test01` ( `id` int(8) NOT NULL AUTO_INCREMENT, `username` varchar(20) COLLATE utf8_unicode_ci NOT NULL, `password` varchar(20) COLLATE utf8_unicode_ci NOT NULL, `create_time` varchar(20) COLLATE utf8_unicode_ci NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; INSERT INTO test01(username,password,create_time) values(\'tomcat\', \'xiaohuahua\',now()); INSERT INTO test01(username,password,create_time) values(\'java\', \'123456\',now()); INSERT INTO test01(username,password,create_time) values(\'lua\', \'ssd123456\',now()); INSERT INTO test01(username,password,create_time) values(\'php\', \'seurw456\',now()); INSERT INTO test01(username,password,create_time) values(\'python\', \'seueurw456\',now()); root@tidb04 10:57:[stdb01]> select * from test01; +----+----------+------------+---------------------+ | id | username | password| create_time| +----+----------+------------+---------------------+ |1 | tomcat| xiaohuahua | 2021-07-11 10:57:57 | |2 | java| 123456| 2021-07-11 10:57:57 | |3 | lua| ssd123456| 2021-07-11 10:57:57 | |4 | php| seurw456| 2021-07-11 10:57:57 | |5 | python| seueurw456 | 2021-07-11 10:57:58 | +----+----------+------------+---------------------+ 5 rows in set (0.00 sec)

创建连接库的账户:(这个账户的权限其实可以再小点:grant replication slave on .
GRANT ALL PRIVILEGES ON *.* TO \'click_rep\'@\'172.16.0.246\' identified by \'jwtest123456\'; flush privileges;

全量同步stdb01.test01表数据的配置文件内容如下:
[root@tidb05 mysqlsmom]# cat /data1/soft/mysqlsmom/test_mom/init_config.py # coding=utf-8 STREAM = "INIT" # 修改数据库连接 CONNECTION = { \'host\': \'172.16.0.197\', \'port\': 3306, \'user\': \'click_rep\', \'passwd\': \'jwtest123456\' } # 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1 BULK_SIZE = 1 # 修改elasticsearch节点 #NODES = [{"host": "127.0.0.1", "port": 9200}] NODES = [{"host": "172.16.0.247", "port": 9999}] TASKS = [ { "stream": { "database": "stdb01",# 在此数据库执行sql语句 "sql": "select * from test01",# 将该sql语句选中的数据同步到 elasticsearch; # "pk": {"field": "id", "type": "char"}# 当主键id的类型是字符串时 }, "jobs": [ { "actions": ["insert", "update"], "pipeline": [ {"set_id": {"field": "id"}}# 默认设置 id字段的值 为elasticsearch中的文档id; 此处可以指定具体同步哪些字段的数据到es.不指定的话默认同步表全部的数据到es。 ], "dest": { "es": { "action": "upsert", "index": "test01_index",# 设置 index "type": "test01",# 设置 type "nodes": NODES } } } ] } ] # CUSTOM_ROW_HANDLERS = "./my_handlers.py" # CUSTOM_ROW_FILTERS = "./my_filters.py"

执行全量同步命令:
[root@tidb05 mysqlsmom]# pwd /data1/soft/mysqlsmom [root@tidb05 mysqlsmom]# ls 197_testdb_momdocsmysqlsmomREADME.mdREADME_OLD.mdsetup.pytest_mom [root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py /usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version! RequestsDependencyWarning) 2021-07-11 11:09:44,500 rootINFO{"username": "tomcat", "password": "xiaohuahua", "create_time": "2021-07-11 10:57:57", "id": 1, "_id": 1} 2021-07-11 11:09:44,623 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.123s] 2021-07-11 11:09:44,624 rootINFO{"username": "java", "password": "123456", "create_time": "2021-07-11 10:57:57", "id": 2, "_id": 2} 2021-07-11 11:09:44,630 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.006s] 2021-07-11 11:09:44,630 rootINFO{"username": "lua", "password": "ssd123456", "create_time": "2021-07-11 10:57:57", "id": 3, "_id": 3} 2021-07-11 11:09:44,639 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.009s] 2021-07-11 11:09:44,640 rootINFO{"username": "php", "password": "seurw456", "create_time": "2021-07-11 10:57:57", "id": 4, "_id": 4} 2021-07-11 11:09:44,644 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.004s] 2021-07-11 11:09:44,645 rootINFO{"username": "python", "password": "seueurw456", "create_time": "2021-07-11 10:57:58", "id": 5, "_id": 5} 2021-07-11 11:09:44,650 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]real0m0.640s user0m0.444s sys 0m0.051s

具体ES里面的数据此处就不再截图演示了。
案例二:全量同步MySQL某张表部分字段数据到es
创建测试表test02和写入测试数据:
CREATE TABLE `test02` ( `id` int(8) NOT NULL AUTO_INCREMENT, `username` varchar(20) COLLATE utf8_unicode_ci NOT NULL, `password` varchar(20) COLLATE utf8_unicode_ci NOT NULL, `create_time` varchar(20) COLLATE utf8_unicode_ci NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; INSERT INTO test02(username,password,create_time) values(\'tomcat\', \'xiaohuahua\',now()); INSERT INTO test02(username,password,create_time) values(\'java\', \'123456\',now()); INSERT INTO test02(username,password,create_time) values(\'lua\', \'ssd123456\',now()); INSERT INTO test02(username,password,create_time) values(\'php\', \'seurw456\',now()); INSERT INTO test02(username,password,create_time) values(\'python\', \'seueurw456\',now());

运行同步命令:
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py /usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version! RequestsDependencyWarning) 2021-07-11 11:25:53,126 rootINFO{"username": "tomcat", "_id": 1, "id": 1} 2021-07-11 11:25:53,217 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.091s] 2021-07-11 11:25:53,218 rootINFO{"username": "java", "_id": 2, "id": 2} 2021-07-11 11:25:53,223 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.005s] 2021-07-11 11:25:53,223 rootINFO{"username": "lua", "_id": 3, "id": 3} 2021-07-11 11:25:53,228 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.005s] 2021-07-11 11:25:53,229 rootINFO{"username": "php", "_id": 4, "id": 4} 2021-07-11 11:25:53,235 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.006s] 2021-07-11 11:25:53,235 rootINFO{"username": "python", "_id": 5, "id": 5} 2021-07-11 11:25:53,241 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.006s]real0m0.597s user0m0.440s sys 0m0.047s

具体ES里面的数据此处就不再截图演示了。
此案例演示的具体配置文件如下:
[root@tidb05 mysqlsmom]# cat /data1/soft/mysqlsmom/test_mom/init_config.py # coding=utf-8 STREAM = "INIT" # 修改数据库连接 CONNECTION = { \'host\': \'172.16.0.197\', \'port\': 3306, \'user\': \'click_rep\', \'passwd\': \'jwtest123456\' } # 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1 BULK_SIZE = 1 # 修改elasticsearch节点 #NODES = [{"host": "127.0.0.1", "port": 9200}] NODES = [{"host": "172.16.0.247", "port": 9999}] TASKS = [ { "stream": { "database": "stdb01",# 在此数据库执行sql语句 "sql": "select * from test02",# 将该sql语句选中的数据同步到 elasticsearch # "pk": {"field": "id", "type": "char"}# 当主键id的类型是字符串时 }, "jobs": [ { "actions": ["insert", "update"], "pipeline": [ {"only_fields": {"fields": ["id", "username"]}}, # 只同步 id 和 username字段 {"set_id": {"field": "id"}}# 默认设置 id字段的值 为elasticsearch中的文档id ], "dest": { "es": { "action": "upsert", "index": "test02_index",# 设置 index "type": "test02",# 设置 type "nodes": NODES } } } ] } ] # CUSTOM_ROW_HANDLERS = "./my_handlers.py" # CUSTOM_ROW_FILTERS = "./my_filters.py"

案例三:全量同步MySQL多张表数据到es
创建测试表test03,test04,写入测试数据:
CREATE TABLE `test03` ( `id` int(8) NOT NULL AUTO_INCREMENT, `username` varchar(20) COLLATE utf8_unicode_ci NOT NULL, `password` varchar(20) COLLATE utf8_unicode_ci NOT NULL, `create_time` varchar(20) COLLATE utf8_unicode_ci NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; INSERT INTO test03(username,password,create_time) values(\'tomcat\', \'xiaohuahua\',now()); INSERT INTO test03(username,password,create_time) values(\'java\', \'123456\',now()); CREATE TABLE `test04` ( `id` int(8) NOT NULL AUTO_INCREMENT, `username` varchar(20) COLLATE utf8_unicode_ci NOT NULL, `password` varchar(20) COLLATE utf8_unicode_ci NOT NULL, `create_time` varchar(20) COLLATE utf8_unicode_ci NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; INSERT INTO test04(username,password,create_time) values(\'tomcat\', \'xiaohuahua\',now()); INSERT INTO test04(username,password,create_time) values(\'java\', \'123456\',now()); root@tidb04 12:59:[stdb01]> select * from test04; +----+----------+------------+---------------------+ | id | username | password| create_time| +----+----------+------------+---------------------+ |1 | tomcat| xiaohuahua | 2021-07-11 12:59:01 | |2 | java| 123456| 2021-07-11 12:59:01 | +----+----------+------------+---------------------+ 2 rows in set (0.00 sec)root@tidb04 12:59:[stdb01]> select * from test03; +----+----------+------------+---------------------+ | id | username | password| create_time| +----+----------+------------+---------------------+ |1 | tomcat| xiaohuahua | 2021-07-11 12:58:53 | |2 | java| 123456| 2021-07-11 12:58:54 | +----+----------+------------+---------------------+ 2 rows in set (0.00 sec)

此案例配置文件内容如下:
[root@tidb05 test_mom]# cat init_config.py # coding=utf-8 STREAM = "INIT" # 修改数据库连接 CONNECTION = { \'host\': \'172.16.0.197\', \'port\': 3306, \'user\': \'click_rep\', \'passwd\': \'jwtest123456\' } # 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1 BULK_SIZE = 1 # 修改elasticsearch节点 #NODES = [{"host": "127.0.0.1", "port": 9200}] NODES = [{"host": "172.16.0.247", "port": 9999}] TASKS = [ # 同步stdb01.test03到es: { "stream": { "database": "stdb01",# 在此数据库执行sql语句 "sql": "select * from test03",# 将该sql语句选中的数据同步到 elasticsearch # "pk": {"field": "id", "type": "char"}# 当主键id的类型是字符串时 }, "jobs": [ { "actions": ["insert", "update"], "pipeline": [ {"set_id": {"field": "id"}}# 默认设置 id字段的值 为elasticsearch中的文档id ], "dest": { "es": { "action": "upsert", "index": "test03_index",# 设置 index "type": "test03",# 设置 type "nodes": NODES } } } ] }, # 同步stdb01.test04到es: { "stream": { "database": "stdb01",# 在此数据库执行sql语句 "sql": "select * from test04",# 将该sql语句选中的数据同步到 elasticsearch # "pk": {"field": "id", "type": "char"}# 当主键id的类型是字符串时 }, "jobs": [ { "actions": ["insert", "update"], "pipeline": [ {"only_fields": {"fields": ["id", "username"]}}, # 只同步 id 和 username字段 {"set_id": {"field": "id"}}# 默认设置 id字段的值 为elasticsearch中的文档id ], "dest": { "es": { "action": "upsert", "index": "test04_index",# 设置 index "type": "test04",# 设置 type "nodes": NODES } } } ] } ] # CUSTOM_ROW_HANDLERS = "./my_handlers.py" # CUSTOM_ROW_FILTERS = "./my_filters.py"

启动运行命令:
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py /usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version! RequestsDependencyWarning) 2021-07-11 13:01:09,473 rootINFO{"username": "tomcat", "password": "xiaohuahua", "create_time": "2021-07-11 12:58:53", "id": 1, "_id": 1} 2021-07-11 13:01:09,555 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.082s] 2021-07-11 13:01:09,556 rootINFO{"username": "java", "password": "123456", "create_time": "2021-07-11 12:58:54", "id": 2, "_id": 2} 2021-07-11 13:01:09,561 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.005s] 2021-07-11 13:01:09,564 rootINFO{"username": "tomcat", "_id": 1, "id": 1} 2021-07-11 13:01:09,636 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.072s] 2021-07-11 13:01:09,636 rootINFO{"username": "java", "_id": 2, "id": 2} 2021-07-11 13:01:09,642 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]real0m0.629s user0m0.411s sys 0m0.055s

具体ES里面的数据此处就不再截图演示了。
案例四、同步同MySQL实例下不同的库不同的表数据到ES
MySQL表数据如下:
root@tidb04 10:58:[test_db]> select * from stdb01.test01; +----+----------+----------+---------------------+ | id | username | password | create_time| +----+----------+----------+---------------------+ | 30 | fox| 556| 2021-07-30 08:19:37 | | 31 | fox| 556| 2021-07-30 08:19:38 | +----+----------+----------+---------------------+ 2 rows in set (0.00 sec)root@tidb04 10:58:[test_db]> select * from test_db.test01; +----+----------+------------+---------------------+ | id | username | password| create_time| +----+----------+------------+---------------------+ |1 | tomcat| xiaohuahua | 2021-07-03 23:51:17 | |2 | php| xiao| 2021-07-03 23:53:36 | |3 | fix| xiao| 2021-07-03 23:53:49 | |4 | java| bai| 2021-07-03 23:54:01 | +----+----------+------------+---------------------+ 4 rows in set (0.00 sec)

全量同步MySQL数据到ES:
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py 2021-08-01 10:58:49,966 rootINFO{"username": "fox", "password": "556", "create_time": "2021-07-30 08:19:37", "id": 30, "_id": 30} 2021-08-01 10:58:49,967 rootINFO{"username": "fox", "password": "556", "create_time": "2021-07-30 08:19:38", "id": 31, "_id": 31} 2021-08-01 10:58:50,115 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.148s] 2021-08-01 10:58:50,119 rootINFO{"username": "tomcat", "_id": 1, "id": 1} 2021-08-01 10:58:50,119 rootINFO{"username": "php", "_id": 2, "id": 2} 2021-08-01 10:58:50,119 rootINFO{"username": "fix", "_id": 3, "id": 3} 2021-08-01 10:58:50,119 rootINFO{"username": "java", "_id": 4, "id": 4} 2021-08-01 10:58:50,259 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.139s]real0m0.873s user0m0.505s sys 0m0.080s

ES数据验证:
MySQLmom程序全量同步MySQL表数据到ES

文章图片

MySQLmom程序全量同步MySQL表数据到ES

文章图片

MySQLmom程序全量同步MySQL表数据到ES

文章图片

全量同步的配置文件内容如下:
[root@tidb05 mysqlsmom]# cat ./test_mom/init_config.py # coding=utf-8 STREAM = "INIT" # 修改数据库连接 CONNECTION = { \'host\': \'172.16.0.197\', \'port\': 3306, \'user\': \'click_rep\', \'passwd\': \'jwtest123456\' } # 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1 BULK_SIZE = 50000 # 修改elasticsearch节点 #NODES = [{"host": "127.0.0.1", "port": 9200}] NODES = [{"host": "172.16.0.247", "port": 9999}] TASKS = [ # 同步stdb01.test03到es: { "stream": { "database": "stdb01",# 在此数据库执行sql语句 "sql": "select * from test01",# 将该sql语句选中的数据同步到 elasticsearch # "pk": {"field": "id", "type": "char"}# 当主键id的类型是字符串时 }, "jobs": [ { "actions": ["insert", "update"], "pipeline": [ {"set_id": {"field": "id"}}# 默认设置 id字段的值 为elasticsearch中的文档id ], "dest": { "es": { "action": "upsert", "index": "stdb01.test01_index",# 设置 index "type": "user_table",# 设置 type "nodes": NODES } } } ] }, ##同步test_db.test01到es: { "stream": { "database": "test_db",# 在此数据库执行sql语句 "sql": "select * from test01",# 将该sql语句选中的数据同步到 elasticsearch # "pk": {"field": "id", "type": "char"}# 当主键id的类型是字符串时 }, "jobs": [ { "actions": ["insert", "update"], "pipeline": [ {"only_fields": {"fields": ["id", "username"]}}, # 只同步 id 和 username字段 {"set_id": {"field": "id"}}# 默认设置 id字段的值 为elasticsearch中的文档id ],"dest": { "es": { "action": "upsert", "index": "test_db.test01_index",# 设置 index "type": "user_table",# 设置 type "nodes": NODES } } } ] }] # CUSTOM_ROW_HANDLERS = "./my_handlers.py" # CUSTOM_ROW_FILTERS = "./my_filters.py"

案例五、设置参数BULK_SIZE =1和=10000进行测试全量同步时间测试
【MySQLmom程序全量同步MySQL表数据到ES】设置BULK_SIZE =10000进行全量同步表t_work_order_follow数据到es测试
[root@tidb04 ~]# mysql -e "select count(*) from stdb01.t_work_order_follow; " +----------+ | count(*) | +----------+ |3975925 | +----------+ [root@tidb05 mysqlsmom]#time mom run -c ./test_mom/init_config.py real30m7.618s user23m51.398s sys0m58.087s

设置参数BULK_SIZE =1 进行全量同步表t_work_order_follow数据到es测试,说实话花费的时间非常的长了:
root@tidb04 08:07:[test_db]> select count(*)from t_work_order_follow; +----------+ | count(*) | +----------+ |3975925 | +----------+ 1 row in set (0.62 sec)[root@tidb06 mysqlsmom]# time mom run -c ./test_mom/init_config.pyreal237m59.067s user53m49.099s sys3m25.431s

后面测试把设置BULK_SIZE =50000 测试结果和设置BULK_SIZE =10000消耗的时间基本差不多。
全量同步演示到此介绍完毕。后面会分享下增量同步的配置方法,尽情期待。

    推荐阅读