人生必须的知识就是引人向光明方面的明灯。这篇文章主要讲述MySQLmom程序全量同步MySQL表数据到ES相关的知识,希望能为你提供帮助。
特殊提示:
本次演示的ES的版本为公司内部定制的elasticsearch-5.0.0的版本测试的,而且ES是单节点安装。本次演示从一个全量同步mysql表数据到elasticsearch-5.0.0开始
一、创建全量同步配置文件
[root@tidb05 mysqlsmom]# mom new test_mom/init_config.py -t init --force
/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version!
RequestsDependencyWarning)
new config at /data1/soft/mysqlsmom/test_mom/init_config.py
[root@tidb05 mysqlsmom]# echo $?
0
说明:test_mom是可以指定名称的。
解决其中一个warning警告,其实对本次演示时没任何影响的,但是对于洁癖的我,看着总觉得不爽。具体warning警告内容和解决办法如下:
[root@tidb05 mysqlsmom]# mom new 197_testdb_mom/init_config.py -t init --force
/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version!
RequestsDependencyWarning)
new config at /data1/soft/mysqlsmom/197_testdb_mom/init_config.py[root@tidb05 mysqlsmom]# ll /data1/soft/mysqlsmom/197_testdb_mom/init_config.py
-rw-r--r-- 1 root root 1298 Jul 11 10:46 /data1/soft/mysqlsmom/197_testdb_mom/init_config.py
上面的/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version!**解决办法:**
pip uninstall urllib3 -y
pip uninstall chardet -y
pip install requests
二、案例配置演示 案例一:全量同步MySQL某张表全部字段数据到es
创建测试库和测试表,并写入测试数据到test01测试表:
create database stdb01;
CREATE TABLE `test01` (
`id` int(8) NOT NULL AUTO_INCREMENT,
`username` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
`password` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
`create_time` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test01(username,password,create_time) values(\'tomcat\', \'xiaohuahua\',now());
INSERT INTO test01(username,password,create_time) values(\'java\', \'123456\',now());
INSERT INTO test01(username,password,create_time) values(\'lua\', \'ssd123456\',now());
INSERT INTO test01(username,password,create_time) values(\'php\', \'seurw456\',now());
INSERT INTO test01(username,password,create_time) values(\'python\', \'seueurw456\',now());
root@tidb04 10:57:[stdb01]>
select * from test01;
+----+----------+------------+---------------------+
| id | username | password| create_time|
+----+----------+------------+---------------------+
|1 | tomcat| xiaohuahua | 2021-07-11 10:57:57 |
|2 | java| 123456| 2021-07-11 10:57:57 |
|3 | lua| ssd123456| 2021-07-11 10:57:57 |
|4 | php| seurw456| 2021-07-11 10:57:57 |
|5 | python| seueurw456 | 2021-07-11 10:57:58 |
+----+----------+------------+---------------------+
5 rows in set (0.00 sec)
创建连接库的账户:(这个账户的权限其实可以再小点:grant replication slave on . )
GRANT ALL PRIVILEGES ON *.* TO \'click_rep\'@\'172.16.0.246\' identified by \'jwtest123456\';
flush privileges;
全量同步stdb01.test01表数据的配置文件内容如下:
[root@tidb05 mysqlsmom]# cat /data1/soft/mysqlsmom/test_mom/init_config.py
# coding=utf-8
STREAM = "INIT"
# 修改数据库连接
CONNECTION = {
\'host\': \'172.16.0.197\',
\'port\': 3306,
\'user\': \'click_rep\',
\'passwd\': \'jwtest123456\'
}
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1
# 修改elasticsearch节点
#NODES = [{"host": "127.0.0.1", "port": 9200}]
NODES = [{"host": "172.16.0.247", "port": 9999}]
TASKS = [
{
"stream": {
"database": "stdb01",# 在此数据库执行sql语句
"sql": "select * from test01",# 将该sql语句选中的数据同步到 elasticsearch;
# "pk": {"field": "id", "type": "char"}# 当主键id的类型是字符串时
},
"jobs": [
{
"actions": ["insert", "update"],
"pipeline": [
{"set_id": {"field": "id"}}# 默认设置 id字段的值 为elasticsearch中的文档id;
此处可以指定具体同步哪些字段的数据到es.不指定的话默认同步表全部的数据到es。
],
"dest": {
"es": {
"action": "upsert",
"index": "test01_index",# 设置 index
"type": "test01",# 设置 type
"nodes": NODES
}
}
}
]
}
]
# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"
执行全量同步命令:
[root@tidb05 mysqlsmom]# pwd
/data1/soft/mysqlsmom
[root@tidb05 mysqlsmom]# ls
197_testdb_momdocsmysqlsmomREADME.mdREADME_OLD.mdsetup.pytest_mom
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py
/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version!
RequestsDependencyWarning)
2021-07-11 11:09:44,500 rootINFO{"username": "tomcat", "password": "xiaohuahua", "create_time": "2021-07-11 10:57:57", "id": 1, "_id": 1}
2021-07-11 11:09:44,623 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.123s]
2021-07-11 11:09:44,624 rootINFO{"username": "java", "password": "123456", "create_time": "2021-07-11 10:57:57", "id": 2, "_id": 2}
2021-07-11 11:09:44,630 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.006s]
2021-07-11 11:09:44,630 rootINFO{"username": "lua", "password": "ssd123456", "create_time": "2021-07-11 10:57:57", "id": 3, "_id": 3}
2021-07-11 11:09:44,639 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.009s]
2021-07-11 11:09:44,640 rootINFO{"username": "php", "password": "seurw456", "create_time": "2021-07-11 10:57:57", "id": 4, "_id": 4}
2021-07-11 11:09:44,644 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.004s]
2021-07-11 11:09:44,645 rootINFO{"username": "python", "password": "seueurw456", "create_time": "2021-07-11 10:57:58", "id": 5, "_id": 5}
2021-07-11 11:09:44,650 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]real0m0.640s
user0m0.444s
sys 0m0.051s
具体ES里面的数据此处就不再截图演示了。
案例二:全量同步MySQL某张表部分字段数据到es
创建测试表test02和写入测试数据:
CREATE TABLE `test02` (
`id` int(8) NOT NULL AUTO_INCREMENT,
`username` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
`password` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
`create_time` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test02(username,password,create_time) values(\'tomcat\', \'xiaohuahua\',now());
INSERT INTO test02(username,password,create_time) values(\'java\', \'123456\',now());
INSERT INTO test02(username,password,create_time) values(\'lua\', \'ssd123456\',now());
INSERT INTO test02(username,password,create_time) values(\'php\', \'seurw456\',now());
INSERT INTO test02(username,password,create_time) values(\'python\', \'seueurw456\',now());
运行同步命令:
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py
/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version!
RequestsDependencyWarning)
2021-07-11 11:25:53,126 rootINFO{"username": "tomcat", "_id": 1, "id": 1}
2021-07-11 11:25:53,217 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.091s]
2021-07-11 11:25:53,218 rootINFO{"username": "java", "_id": 2, "id": 2}
2021-07-11 11:25:53,223 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]
2021-07-11 11:25:53,223 rootINFO{"username": "lua", "_id": 3, "id": 3}
2021-07-11 11:25:53,228 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]
2021-07-11 11:25:53,229 rootINFO{"username": "php", "_id": 4, "id": 4}
2021-07-11 11:25:53,235 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.006s]
2021-07-11 11:25:53,235 rootINFO{"username": "python", "_id": 5, "id": 5}
2021-07-11 11:25:53,241 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.006s]real0m0.597s
user0m0.440s
sys 0m0.047s
具体ES里面的数据此处就不再截图演示了。
此案例演示的具体配置文件如下:
[root@tidb05 mysqlsmom]# cat /data1/soft/mysqlsmom/test_mom/init_config.py
# coding=utf-8
STREAM = "INIT"
# 修改数据库连接
CONNECTION = {
\'host\': \'172.16.0.197\',
\'port\': 3306,
\'user\': \'click_rep\',
\'passwd\': \'jwtest123456\'
}
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1
# 修改elasticsearch节点
#NODES = [{"host": "127.0.0.1", "port": 9200}]
NODES = [{"host": "172.16.0.247", "port": 9999}]
TASKS = [
{
"stream": {
"database": "stdb01",# 在此数据库执行sql语句
"sql": "select * from test02",# 将该sql语句选中的数据同步到 elasticsearch
# "pk": {"field": "id", "type": "char"}# 当主键id的类型是字符串时
},
"jobs": [
{
"actions": ["insert", "update"],
"pipeline": [
{"only_fields": {"fields": ["id", "username"]}}, # 只同步 id 和 username字段
{"set_id": {"field": "id"}}# 默认设置 id字段的值 为elasticsearch中的文档id
],
"dest": {
"es": {
"action": "upsert",
"index": "test02_index",# 设置 index
"type": "test02",# 设置 type
"nodes": NODES
}
}
}
]
}
]
# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"
案例三:全量同步MySQL多张表数据到es
创建测试表test03,test04,写入测试数据:
CREATE TABLE `test03` (
`id` int(8) NOT NULL AUTO_INCREMENT,
`username` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
`password` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
`create_time` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test03(username,password,create_time) values(\'tomcat\', \'xiaohuahua\',now());
INSERT INTO test03(username,password,create_time) values(\'java\', \'123456\',now());
CREATE TABLE `test04` (
`id` int(8) NOT NULL AUTO_INCREMENT,
`username` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
`password` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
`create_time` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test04(username,password,create_time) values(\'tomcat\', \'xiaohuahua\',now());
INSERT INTO test04(username,password,create_time) values(\'java\', \'123456\',now());
root@tidb04 12:59:[stdb01]>
select * from test04;
+----+----------+------------+---------------------+
| id | username | password| create_time|
+----+----------+------------+---------------------+
|1 | tomcat| xiaohuahua | 2021-07-11 12:59:01 |
|2 | java| 123456| 2021-07-11 12:59:01 |
+----+----------+------------+---------------------+
2 rows in set (0.00 sec)root@tidb04 12:59:[stdb01]>
select * from test03;
+----+----------+------------+---------------------+
| id | username | password| create_time|
+----+----------+------------+---------------------+
|1 | tomcat| xiaohuahua | 2021-07-11 12:58:53 |
|2 | java| 123456| 2021-07-11 12:58:54 |
+----+----------+------------+---------------------+
2 rows in set (0.00 sec)
此案例配置文件内容如下:
[root@tidb05 test_mom]# cat init_config.py
# coding=utf-8
STREAM = "INIT"
# 修改数据库连接
CONNECTION = {
\'host\': \'172.16.0.197\',
\'port\': 3306,
\'user\': \'click_rep\',
\'passwd\': \'jwtest123456\'
}
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1
# 修改elasticsearch节点
#NODES = [{"host": "127.0.0.1", "port": 9200}]
NODES = [{"host": "172.16.0.247", "port": 9999}]
TASKS = [
# 同步stdb01.test03到es:
{
"stream": {
"database": "stdb01",# 在此数据库执行sql语句
"sql": "select * from test03",# 将该sql语句选中的数据同步到 elasticsearch
# "pk": {"field": "id", "type": "char"}# 当主键id的类型是字符串时
},
"jobs": [
{
"actions": ["insert", "update"],
"pipeline": [
{"set_id": {"field": "id"}}# 默认设置 id字段的值 为elasticsearch中的文档id
],
"dest": {
"es": {
"action": "upsert",
"index": "test03_index",# 设置 index
"type": "test03",# 设置 type
"nodes": NODES
}
}
}
]
},
# 同步stdb01.test04到es:
{
"stream": {
"database": "stdb01",# 在此数据库执行sql语句
"sql": "select * from test04",# 将该sql语句选中的数据同步到 elasticsearch
# "pk": {"field": "id", "type": "char"}# 当主键id的类型是字符串时
},
"jobs": [
{
"actions": ["insert", "update"],
"pipeline": [
{"only_fields": {"fields": ["id", "username"]}}, # 只同步 id 和 username字段
{"set_id": {"field": "id"}}# 默认设置 id字段的值 为elasticsearch中的文档id
],
"dest": {
"es": {
"action": "upsert",
"index": "test04_index",# 设置 index
"type": "test04",# 设置 type
"nodes": NODES
}
}
}
]
}
]
# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"
启动运行命令:
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py
/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version!
RequestsDependencyWarning)
2021-07-11 13:01:09,473 rootINFO{"username": "tomcat", "password": "xiaohuahua", "create_time": "2021-07-11 12:58:53", "id": 1, "_id": 1}
2021-07-11 13:01:09,555 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.082s]
2021-07-11 13:01:09,556 rootINFO{"username": "java", "password": "123456", "create_time": "2021-07-11 12:58:54", "id": 2, "_id": 2}
2021-07-11 13:01:09,561 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]
2021-07-11 13:01:09,564 rootINFO{"username": "tomcat", "_id": 1, "id": 1}
2021-07-11 13:01:09,636 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.072s]
2021-07-11 13:01:09,636 rootINFO{"username": "java", "_id": 2, "id": 2}
2021-07-11 13:01:09,642 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]real0m0.629s
user0m0.411s
sys 0m0.055s
具体ES里面的数据此处就不再截图演示了。
案例四、同步同MySQL实例下不同的库不同的表数据到ES
MySQL表数据如下:
root@tidb04 10:58:[test_db]>
select * from stdb01.test01;
+----+----------+----------+---------------------+
| id | username | password | create_time|
+----+----------+----------+---------------------+
| 30 | fox| 556| 2021-07-30 08:19:37 |
| 31 | fox| 556| 2021-07-30 08:19:38 |
+----+----------+----------+---------------------+
2 rows in set (0.00 sec)root@tidb04 10:58:[test_db]>
select * from test_db.test01;
+----+----------+------------+---------------------+
| id | username | password| create_time|
+----+----------+------------+---------------------+
|1 | tomcat| xiaohuahua | 2021-07-03 23:51:17 |
|2 | php| xiao| 2021-07-03 23:53:36 |
|3 | fix| xiao| 2021-07-03 23:53:49 |
|4 | java| bai| 2021-07-03 23:54:01 |
+----+----------+------------+---------------------+
4 rows in set (0.00 sec)
全量同步MySQL数据到ES:
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py
2021-08-01 10:58:49,966 rootINFO{"username": "fox", "password": "556", "create_time": "2021-07-30 08:19:37", "id": 30, "_id": 30}
2021-08-01 10:58:49,967 rootINFO{"username": "fox", "password": "556", "create_time": "2021-07-30 08:19:38", "id": 31, "_id": 31}
2021-08-01 10:58:50,115 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.148s]
2021-08-01 10:58:50,119 rootINFO{"username": "tomcat", "_id": 1, "id": 1}
2021-08-01 10:58:50,119 rootINFO{"username": "php", "_id": 2, "id": 2}
2021-08-01 10:58:50,119 rootINFO{"username": "fix", "_id": 3, "id": 3}
2021-08-01 10:58:50,119 rootINFO{"username": "java", "_id": 4, "id": 4}
2021-08-01 10:58:50,259 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:0.139s]real0m0.873s
user0m0.505s
sys 0m0.080s
ES数据验证:
文章图片
文章图片
文章图片
全量同步的配置文件内容如下:
[root@tidb05 mysqlsmom]# cat ./test_mom/init_config.py
# coding=utf-8
STREAM = "INIT"
# 修改数据库连接
CONNECTION = {
\'host\': \'172.16.0.197\',
\'port\': 3306,
\'user\': \'click_rep\',
\'passwd\': \'jwtest123456\'
}
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 50000
# 修改elasticsearch节点
#NODES = [{"host": "127.0.0.1", "port": 9200}]
NODES = [{"host": "172.16.0.247", "port": 9999}]
TASKS = [
# 同步stdb01.test03到es:
{
"stream": {
"database": "stdb01",# 在此数据库执行sql语句
"sql": "select * from test01",# 将该sql语句选中的数据同步到 elasticsearch
# "pk": {"field": "id", "type": "char"}# 当主键id的类型是字符串时
},
"jobs": [
{
"actions": ["insert", "update"],
"pipeline": [
{"set_id": {"field": "id"}}# 默认设置 id字段的值 为elasticsearch中的文档id
],
"dest": {
"es": {
"action": "upsert",
"index": "stdb01.test01_index",# 设置 index
"type": "user_table",# 设置 type
"nodes": NODES
}
}
}
]
},
##同步test_db.test01到es:
{
"stream": {
"database": "test_db",# 在此数据库执行sql语句
"sql": "select * from test01",# 将该sql语句选中的数据同步到 elasticsearch
# "pk": {"field": "id", "type": "char"}# 当主键id的类型是字符串时
},
"jobs": [
{
"actions": ["insert", "update"],
"pipeline": [
{"only_fields": {"fields": ["id", "username"]}}, # 只同步 id 和 username字段
{"set_id": {"field": "id"}}# 默认设置 id字段的值 为elasticsearch中的文档id
],"dest": {
"es": {
"action": "upsert",
"index": "test_db.test01_index",# 设置 index
"type": "user_table",# 设置 type
"nodes": NODES
}
}
}
]
}]
# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"
案例五、设置参数BULK_SIZE =1和=10000进行测试全量同步时间测试
【MySQLmom程序全量同步MySQL表数据到ES】设置BULK_SIZE =10000进行全量同步表t_work_order_follow数据到es测试
[root@tidb04 ~]# mysql -e "select count(*) from stdb01.t_work_order_follow;
"
+----------+
| count(*) |
+----------+
|3975925 |
+----------+
[root@tidb05 mysqlsmom]#time mom run -c ./test_mom/init_config.py
real30m7.618s
user23m51.398s
sys0m58.087s
设置参数BULK_SIZE =1 进行全量同步表t_work_order_follow数据到es测试,说实话花费的时间非常的长了:
root@tidb04 08:07:[test_db]>
select count(*)from t_work_order_follow;
+----------+
| count(*) |
+----------+
|3975925 |
+----------+
1 row in set (0.62 sec)[root@tidb06 mysqlsmom]# time mom run -c ./test_mom/init_config.pyreal237m59.067s
user53m49.099s
sys3m25.431s
后面测试把设置BULK_SIZE =50000 测试结果和设置BULK_SIZE =10000消耗的时间基本差不多。
全量同步演示到此介绍完毕。后面会分享下增量同步的配置方法,尽情期待。
推荐阅读
- Angular 服务器端渲染的学习笔记
- 使用Centos系统忘记密码怎么办
- 大白话聊聊微服务——人人都能看懂的演进过程
- MySQLmom程序增量同步MySQL数据到ES
- 富文本及编辑器的跨平台方案
- 从源码分析Hystrix工作机制
- 本图文详细教程教你Win10怎样查看无线网络密码
- win10备份软件最新推荐
- 本图文详细教程教你win10迅速打开怎样关闭