【MySQLmom程序增量同步MySQL数据到ES】吾生也有涯,而知也无涯。这篇文章主要讲述MySQLmom程序增量同步MySQL数据到ES相关的知识,希望能为你提供帮助。
说明: 演示mysqlmom增量的同步数据到ES环境中redis版本3.2.8,ES-5.0.0,mysql5.7.22
分析 binlog 的增量同步的要求:1.确保要增量同步的MySql数据库开启binlog,且开启redis(为了存储最后一次读到的binlog文件名及读到的位置。未来可能支持本地文件存储该信息。)
注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;
启动增量同步前。首先对所要增量同步的表进行一次全量同步,然后才是开启增量同步
2.需要在本地安装redis服务
/usr/local/redis/bin/redis-server /usr/local/redis/conf/redis.conf
[root@tidb05 conf]#/usr/local/redis/bin/redis-server /usr/local/redis/conf/redis.conf
[root@tidb05 conf]# ss -lntup|grep redis
tcpLISTEN08192127.0.0.1:10201*:*users:(("redis-server",pid=10597,fd=4))
[root@tidb05 conf]# /usr/local/redis/bin/redis-cli -h 127.0.0.1 -p 10201 -a \'YHu222tuEq\' info Memory
# Memory
used_memory:6179328
used_memory_human:5.89M
used_memory_rss:5095424
used_memory_rss_human:4.86M
used_memory_peak:6179328
used_memory_peak_human:5.89M
total_system_memory:16656146432
total_system_memory_human:15.51G
used_memory_lua:37888
used_memory_lua_human:37.00K
maxmemory:1000000000
maxmemory_human:953.67M
maxmemory_policy:noeviction
mem_fragmentation_ratio:0.82
mem_allocator:jemalloc-4.0.3
3.新建配置文件,只支持" insert" , " update" 的增量同步:
mom new test_mom/binlog_config.py -t binlog --force
[root@tidb05 mysqlsmom]# mom new test_mom/binlog_config.py -t binlog --force
new config at /data1/soft/mysqlsmom/test_mom/binlog_config.py
4.编辑 test_mom/binlog_config.py,按注释提示修改配置:
[root@tidb05 conf]# cat/data1/soft/mysqlsmom/test_mom/binlog_config.py
# coding=utf-8
STREAM = "BINLOG"# "BINLOG" or "INIT"
SERVER_ID = 172160197 ### 确保每个用于binlog同步的配置文件的SERVER_ID不同
SLAVE_UUID = __name__# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
###BULK_SIZE = 10000此参数只能是在全量导数据时开启,在增量同步数据时禁止开启,否则导致增量同步失败BINLOG_CONNECTION = {
\'host\': \'172.16.0.197\',
\'port\': 3306,
\'user\': \'click_rep\',
\'passwd\': \'jwtest123456\'
}
# redis存储上次同步位置等信息
REDIS = {
"host": "127.0.0.1",
"port": 10201,
"db": 0,
"password": "YHu222tuEq",# 不需要密码则注释或删掉该行
}# 配置es节点
#NODES = [{"host": "127.0.0.1", "port": 9200}]
NODES = [{"host": "172.16.0.247", "port": 9999}]
TASKS = [
{
"stream": {
"database": "stdb01",
"table": "test01"
},
"jobs": [
{
"actions": ["insert", "update"],
"pipeline": [
#{"only_fields": {"fields": ["id", "name", "age"]}}, ##只同步 id,name和age字段数据到es;
注释掉该行则同步全部字段的值到es
{"set_id": {"field": "id"}}
],
"dest": {
"es": {
"action": "upsert",
"index": "test01_index",
"type": "test01",
"nodes": NODES
}
}
}
]
}
]# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"
5.运行
该进程会一直运行,实时同步新增和更改后的数据到elasticsearch;
注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;同步旧数据请看全量同步MySql数据到es;
6.验证测试:
验证MySQL测试表stdb01.test01已经存在的数据:
[root@tidb04 ~]# mysql -e "select * from stdb01.test01;
"
+----+----------+------------+---------------------+
| id | username | password| create_time|
+----+----------+------------+---------------------+
|1 | tomcat| xiaohuahua | 2021-07-11 10:57:57 |
|2 | java| 123456| 2021-07-11 10:57:57 |
|3 | lua| ssd123456| 2021-07-11 10:57:57 |
|4 | php| seurw456| 2021-07-11 10:57:57 |
|5 | python| seueurw456 | 2021-07-11 10:57:58 |
|6 | java| 123456| 2021-07-11 16:59:47 |
|7 | java| 123456| 2021-07-11 16:59:51 |
|8 | java| 123456| 2021-07-11 16:59:58 |
|9 | tomcat| ceshi001| 2021-07-28 00:24:41 |
| 10 | c++| 558996| 2021-07-28 00:33:01 |
| 11 | c++| 558996| 2021-07-11 16:59:58 |
| 12 | c++| 558996| 2021-07-11 16:59:58 |
| 13 | java| 596| 2021-07-28 00:41:14 |
| 14 | java| 7890| 2021-07-28 00:41:34 |
| 15 | php| 7890| 2021-07-28 00:41:51 |
| 16 | python| 654321| 2021-07-28 00:42:08 |
+----+----------+------------+---------------------+
启动mysqlmom服务:
[root@tidb05 mysqlsmom]# mom run -c ./test_mom/binlog_config.py
binglog文件和pos位置点:
[root@tidb05 ~]# /usr/local/redis/bin/redis-cli -h 127.0.0.1 -p 10201 -a \'YHu222tuEq\'
127.0.0.1:10201>
info keyspace
# Keyspace
db0:keys=2,expires=0,avg_ttl=0
127.0.0.1:10201>
keys *
1) "binlog_config_log_pos"
2) "binlog_config_log_file"
127.0.0.1:10201>
get binlog_config_log_pos
"2268"
127.0.0.1:10201>
get binlog_config_log_file
"mysql-bin.000013"
MySQL新增和删除记录:
root@tidb04 23:59:[stdb01]>
INSERT INTO test01(username,password,create_time) values(\'go\', \'654\',now());
root@tidb04 00:16:[stdb01]>
delete from test01 where id=17;
Query OK, 1 row affected (0.00 sec)[root@tidb05 mysqlsmom]# mom run -c ./test_mom/binlog_config.py 2573
2021-07-29 00:13:46,528 rootINFO{"timestamp": "2021-07-29 00:13:46", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17}, "action": "insert", "table": "test01", "schema": "stdb01"}
2021-07-29 00:13:46,529 rootINFO{"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17, "_id": 17}
2870
2021-07-29 00:16:43,364 rootINFO{"timestamp": "2021-07-29 00:16:43", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17}, "action": "delete", "table": "test01", "schema": "stdb01"}
**虽然记录了日志但是es里面没有任何值新增和删除的变化**
root@tidb04 00:19:[stdb01]>
INSERT INTO test01(username,password,create_time) values(\'go\', \'654\',now());
Query OK, 1 row affected (0.00 sec)root@tidb04 00:19:[stdb01]>
INSERT INTO test01(username,password,create_time) values(\'C++\', \'654\',now());
Query OK, 1 row affected (0.01 sec)3330
2021-07-29 00:19:47,088 rootINFO{"timestamp": "2021-07-29 00:19:47", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1}, "action": "insert", "table": "test01", "schema": "stdb01"}
2021-07-29 00:19:47,088 rootINFO{"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1, "_id": 1}3636
2021-07-29 00:20:42,729 rootINFO{"timestamp": "2021-07-29 00:20:42", "host": "172.16.0.197", "values": {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2}, "action": "insert", "table": "test01", "schema": "stdb01"}
2021-07-29 00:20:42,729 rootINFO{"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2, "_id": 2}**重复尝试了好多次,虽然在启动binlog增量同步数据时,有增删数据日志的输出,但是登录ES,始终没看到ES中数据的变化****存放在redis中的pos位置点一直没变话:**
127.0.0.1:10201>
get binlog_config_log_file
"mysql-bin.000013"
127.0.0.1:10201>
get binlog_config_log_pos
"2268"
后面发现问题原因:
主要原因是错误的把配置文件binlog_config.py 中BULK_SIZE这个参数开启导致的。果断注销掉,重启binglog同步程序,终于可以看大有数据写入到ES了
7、全量同步stdb01.test01表数据到:
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py
2021-07-29 00:23:47,371 rootINFO{"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1, "_id": 1}
2021-07-29 00:23:47,371 rootINFO{"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2, "_id": 2}
2021-07-29 00:23:48,501 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:1.130s]real0m1.768s
user0m0.463s
sys 0m0.059s[es@tidb06 logs]$ curl \'http://172.16.0.247:9999/_cat/indices?v\'
health status indexuuidpri rep docs.count docs.deleted store.size pri.store.size
yellow opentest01_index LMMynfXaThGktG_YEAO2QQ512010.1kb10.1kb[es@tidb06 logs]$ curl -s -XGET \'http://172.16.0.247:9999/_cat/indices/test01_index?v\'
health status indexuuidpri rep docs.count docs.deleted store.size pri.store.size
yellow opentest01_index LMMynfXaThGktG_YEAO2QQ512010.1kb10.1kb
8.再次启动binlog_config.py 进行增量同步:
mom run -c ./test_mom/binlog_config.py写入2条sql:
root@tidb04 00:44:[stdb01]>
INSERT INTO test01(username,password,create_time) values(\'php\', \'123\',now());
Query OK, 1 row affected (0.01 sec)root@tidb04 00:44:[stdb01]>
INSERT INTO test01(username,password,create_time) values(\'java\', \'321\',now());
Query OK, 1 row affected (0.01 sec)root@tidb04 00:44:[stdb01]>
select * from test01;
+----+----------+----------+---------------------+
| id | username | password | create_time|
+----+----------+----------+---------------------+
|1 | go| 654| 2021-07-29 00:19:47 |
|2 | C++| 654| 2021-07-29 00:20:42 |
|3 | php| 123| 2021-07-29 00:44:39 |
|4 | java| 321| 2021-07-29 00:44:49 |
+----+----------+----------+---------------------+
4 rows in set (0.00 sec)**存放在redis中的pos位置点开始变化了:**
127.0.0.1:10201>
get binlog_config_log_file
"mysql-bin.000013"
127.0.0.1:10201>
get binlog_config_log_pos
"3278"**登录ES,看到insert数据也写入到ES了**
9、inert,update,delete的增量同步MySQL数据到ES的配置文件:
[root@tidb05 soft]# cat mysqlsmom/test_mom/binlog_config.py
# coding=utf-8
STREAM = "BINLOG"# "BINLOG" or "INIT"
SERVER_ID = 172160197 ### 确保每个用于binlog同步的配置文件的SERVER_ID不同
SLAVE_UUID = __name__# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
#BULK_SIZE = 10000此参数只能是在全量导数据时开启,在增量同步数据时禁止开启,否则导致增量同步失败BINLOG_CONNECTION = {
\'host\': \'172.16.0.197\',
\'port\': 3306,
\'user\': \'click_rep\',
\'passwd\': \'jwtest123456\'
}
# redis存储上次同步位置等信息
REDIS = {
"host": "127.0.0.1",
"port": 10201,
"db": 0,
"password": "YHu222tuEq",# 不需要密码则注释或删掉该行
}# 配置es节点
#NODES = [{"host": "127.0.0.1", "port": 9200}]
NODES = [{"host": "172.16.0.247", "port": 9999}]
TASKS = [
{
"stream": {
"database": "stdb01",
"table": "test01"
},
"jobs": [
# 同步插入、更新es数据
{
"actions": ["insert", "update"],
"pipeline": [
#{"only_fields": {"fields": ["id", "name", "age"]}}, ##只同步 id,name和age字段数据到es;
注释掉该行则同步全部字段的值到es
{"set_id": {"field": "id"}}
],
"dest": {
"es": {
"action": "upsert",
"index": "test01_index",
"type": "test01",
"nodes": NODES
}
}
},
#delete 同步删除es数据
{
"actions": ["delete"],
"pipeline": [
#{"only_fields": {"fields": ["id", "name", "age"]}}, ##只同步 id,name和age字段数据到es;
注释掉该行则同步全部字段的值到es
{"set_id": {"field": "id"}}
],
"dest": {
"es": {
"action": "delete",
"index": "test01_index",
"type": "test01",
"nodes": NODES
}
}
}]
}
]# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"
关于MySQLmom增量同步指定MySQL表数据到ES简单介绍完毕,请继续关注博主,后面还会有更精彩的文章继续分享。
推荐阅读
- 大白话聊聊微服务——人人都能看懂的演进过程
- 富文本及编辑器的跨平台方案
- 从源码分析Hystrix工作机制
- 本图文详细教程教你Win10怎样查看无线网络密码
- win10备份软件最新推荐
- 本图文详细教程教你win10迅速打开怎样关闭
- 本图文详细教程教你Win10怎样关闭UAC控制
- 本图文详细教程教你win10怎样关机
- 本图文详细教程教你win10无法打开应用商店