MySQLmom程序增量同步MySQL数据到ES

【MySQLmom程序增量同步MySQL数据到ES】吾生也有涯,而知也无涯。这篇文章主要讲述MySQLmom程序增量同步MySQL数据到ES相关的知识,希望能为你提供帮助。
说明: 演示mysqlmom增量的同步数据到ES环境中redis版本3.2.8,ES-5.0.0,mysql5.7.22
分析 binlog 的增量同步的要求:1.确保要增量同步的MySql数据库开启binlog,且开启redis(为了存储最后一次读到的binlog文件名及读到的位置。未来可能支持本地文件存储该信息。)
注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步; 启动增量同步前。首先对所要增量同步的表进行一次全量同步,然后才是开启增量同步
2.需要在本地安装redis服务

/usr/local/redis/bin/redis-server /usr/local/redis/conf/redis.conf [root@tidb05 conf]#/usr/local/redis/bin/redis-server /usr/local/redis/conf/redis.conf [root@tidb05 conf]# ss -lntup|grep redis tcpLISTEN08192127.0.0.1:10201*:*users:(("redis-server",pid=10597,fd=4)) [root@tidb05 conf]# /usr/local/redis/bin/redis-cli -h 127.0.0.1 -p 10201 -a \'YHu222tuEq\' info Memory # Memory used_memory:6179328 used_memory_human:5.89M used_memory_rss:5095424 used_memory_rss_human:4.86M used_memory_peak:6179328 used_memory_peak_human:5.89M total_system_memory:16656146432 total_system_memory_human:15.51G used_memory_lua:37888 used_memory_lua_human:37.00K maxmemory:1000000000 maxmemory_human:953.67M maxmemory_policy:noeviction mem_fragmentation_ratio:0.82 mem_allocator:jemalloc-4.0.3

3.新建配置文件,只支持" insert" , " update" 的增量同步:
mom new test_mom/binlog_config.py -t binlog --force [root@tidb05 mysqlsmom]# mom new test_mom/binlog_config.py -t binlog --force new config at /data1/soft/mysqlsmom/test_mom/binlog_config.py

4.编辑 test_mom/binlog_config.py,按注释提示修改配置:
[root@tidb05 conf]# cat/data1/soft/mysqlsmom/test_mom/binlog_config.py # coding=utf-8 STREAM = "BINLOG"# "BINLOG" or "INIT" SERVER_ID = 172160197 ### 确保每个用于binlog同步的配置文件的SERVER_ID不同 SLAVE_UUID = __name__# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1 ###BULK_SIZE = 10000此参数只能是在全量导数据时开启,在增量同步数据时禁止开启,否则导致增量同步失败BINLOG_CONNECTION = { \'host\': \'172.16.0.197\', \'port\': 3306, \'user\': \'click_rep\', \'passwd\': \'jwtest123456\' } # redis存储上次同步位置等信息 REDIS = { "host": "127.0.0.1", "port": 10201, "db": 0, "password": "YHu222tuEq",# 不需要密码则注释或删掉该行 }# 配置es节点 #NODES = [{"host": "127.0.0.1", "port": 9200}] NODES = [{"host": "172.16.0.247", "port": 9999}] TASKS = [ { "stream": { "database": "stdb01", "table": "test01" }, "jobs": [ { "actions": ["insert", "update"], "pipeline": [ #{"only_fields": {"fields": ["id", "name", "age"]}}, ##只同步 id,name和age字段数据到es; 注释掉该行则同步全部字段的值到es {"set_id": {"field": "id"}} ], "dest": { "es": { "action": "upsert", "index": "test01_index", "type": "test01", "nodes": NODES } } } ] } ]# CUSTOM_ROW_HANDLERS = "./my_handlers.py" # CUSTOM_ROW_FILTERS = "./my_filters.py"

5.运行
该进程会一直运行,实时同步新增和更改后的数据到elasticsearch;
注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;同步旧数据请看全量同步MySql数据到es;
6.验证测试:
验证MySQL测试表stdb01.test01已经存在的数据: [root@tidb04 ~]# mysql -e "select * from stdb01.test01; " +----+----------+------------+---------------------+ | id | username | password| create_time| +----+----------+------------+---------------------+ |1 | tomcat| xiaohuahua | 2021-07-11 10:57:57 | |2 | java| 123456| 2021-07-11 10:57:57 | |3 | lua| ssd123456| 2021-07-11 10:57:57 | |4 | php| seurw456| 2021-07-11 10:57:57 | |5 | python| seueurw456 | 2021-07-11 10:57:58 | |6 | java| 123456| 2021-07-11 16:59:47 | |7 | java| 123456| 2021-07-11 16:59:51 | |8 | java| 123456| 2021-07-11 16:59:58 | |9 | tomcat| ceshi001| 2021-07-28 00:24:41 | | 10 | c++| 558996| 2021-07-28 00:33:01 | | 11 | c++| 558996| 2021-07-11 16:59:58 | | 12 | c++| 558996| 2021-07-11 16:59:58 | | 13 | java| 596| 2021-07-28 00:41:14 | | 14 | java| 7890| 2021-07-28 00:41:34 | | 15 | php| 7890| 2021-07-28 00:41:51 | | 16 | python| 654321| 2021-07-28 00:42:08 | +----+----------+------------+---------------------+

启动mysqlmom服务:
[root@tidb05 mysqlsmom]# mom run -c ./test_mom/binlog_config.py

binglog文件和pos位置点:
[root@tidb05 ~]# /usr/local/redis/bin/redis-cli -h 127.0.0.1 -p 10201 -a \'YHu222tuEq\' 127.0.0.1:10201> info keyspace # Keyspace db0:keys=2,expires=0,avg_ttl=0 127.0.0.1:10201> keys * 1) "binlog_config_log_pos" 2) "binlog_config_log_file" 127.0.0.1:10201> get binlog_config_log_pos "2268" 127.0.0.1:10201> get binlog_config_log_file "mysql-bin.000013"

MySQL新增和删除记录:
root@tidb04 23:59:[stdb01]> INSERT INTO test01(username,password,create_time) values(\'go\', \'654\',now()); root@tidb04 00:16:[stdb01]> delete from test01 where id=17; Query OK, 1 row affected (0.00 sec)[root@tidb05 mysqlsmom]# mom run -c ./test_mom/binlog_config.py 2573 2021-07-29 00:13:46,528 rootINFO{"timestamp": "2021-07-29 00:13:46", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17}, "action": "insert", "table": "test01", "schema": "stdb01"} 2021-07-29 00:13:46,529 rootINFO{"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17, "_id": 17} 2870 2021-07-29 00:16:43,364 rootINFO{"timestamp": "2021-07-29 00:16:43", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17}, "action": "delete", "table": "test01", "schema": "stdb01"} **虽然记录了日志但是es里面没有任何值新增和删除的变化**

root@tidb04 00:19:[stdb01]> INSERT INTO test01(username,password,create_time) values(\'go\', \'654\',now()); Query OK, 1 row affected (0.00 sec)root@tidb04 00:19:[stdb01]> INSERT INTO test01(username,password,create_time) values(\'C++\', \'654\',now()); Query OK, 1 row affected (0.01 sec)3330 2021-07-29 00:19:47,088 rootINFO{"timestamp": "2021-07-29 00:19:47", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1}, "action": "insert", "table": "test01", "schema": "stdb01"} 2021-07-29 00:19:47,088 rootINFO{"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1, "_id": 1}3636 2021-07-29 00:20:42,729 rootINFO{"timestamp": "2021-07-29 00:20:42", "host": "172.16.0.197", "values": {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2}, "action": "insert", "table": "test01", "schema": "stdb01"} 2021-07-29 00:20:42,729 rootINFO{"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2, "_id": 2}**重复尝试了好多次,虽然在启动binlog增量同步数据时,有增删数据日志的输出,但是登录ES,始终没看到ES中数据的变化****存放在redis中的pos位置点一直没变话:** 127.0.0.1:10201> get binlog_config_log_file "mysql-bin.000013" 127.0.0.1:10201> get binlog_config_log_pos "2268"

后面发现问题原因:
主要原因是错误的把配置文件binlog_config.py 中BULK_SIZE这个参数开启导致的。果断注销掉,重启binglog同步程序,终于可以看大有数据写入到ES了
7、全量同步stdb01.test01表数据到:
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py 2021-07-29 00:23:47,371 rootINFO{"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1, "_id": 1} 2021-07-29 00:23:47,371 rootINFO{"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2, "_id": 2} 2021-07-29 00:23:48,501 elasticsearch INFOPOST http://172.16.0.247:9999/_bulk [status:200 request:1.130s]real0m1.768s user0m0.463s sys 0m0.059s[es@tidb06 logs]$ curl \'http://172.16.0.247:9999/_cat/indices?v\' health status indexuuidpri rep docs.count docs.deleted store.size pri.store.size yellow opentest01_index LMMynfXaThGktG_YEAO2QQ512010.1kb10.1kb[es@tidb06 logs]$ curl -s -XGET \'http://172.16.0.247:9999/_cat/indices/test01_index?v\' health status indexuuidpri rep docs.count docs.deleted store.size pri.store.size yellow opentest01_index LMMynfXaThGktG_YEAO2QQ512010.1kb10.1kb

8.再次启动binlog_config.py 进行增量同步:
mom run -c ./test_mom/binlog_config.py写入2条sql: root@tidb04 00:44:[stdb01]> INSERT INTO test01(username,password,create_time) values(\'php\', \'123\',now()); Query OK, 1 row affected (0.01 sec)root@tidb04 00:44:[stdb01]> INSERT INTO test01(username,password,create_time) values(\'java\', \'321\',now()); Query OK, 1 row affected (0.01 sec)root@tidb04 00:44:[stdb01]> select * from test01; +----+----------+----------+---------------------+ | id | username | password | create_time| +----+----------+----------+---------------------+ |1 | go| 654| 2021-07-29 00:19:47 | |2 | C++| 654| 2021-07-29 00:20:42 | |3 | php| 123| 2021-07-29 00:44:39 | |4 | java| 321| 2021-07-29 00:44:49 | +----+----------+----------+---------------------+ 4 rows in set (0.00 sec)**存放在redis中的pos位置点开始变化了:** 127.0.0.1:10201> get binlog_config_log_file "mysql-bin.000013" 127.0.0.1:10201> get binlog_config_log_pos "3278"**登录ES,看到insert数据也写入到ES了**

9、inert,update,delete的增量同步MySQL数据到ES的配置文件:
[root@tidb05 soft]# cat mysqlsmom/test_mom/binlog_config.py # coding=utf-8 STREAM = "BINLOG"# "BINLOG" or "INIT" SERVER_ID = 172160197 ### 确保每个用于binlog同步的配置文件的SERVER_ID不同 SLAVE_UUID = __name__# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1 #BULK_SIZE = 10000此参数只能是在全量导数据时开启,在增量同步数据时禁止开启,否则导致增量同步失败BINLOG_CONNECTION = { \'host\': \'172.16.0.197\', \'port\': 3306, \'user\': \'click_rep\', \'passwd\': \'jwtest123456\' } # redis存储上次同步位置等信息 REDIS = { "host": "127.0.0.1", "port": 10201, "db": 0, "password": "YHu222tuEq",# 不需要密码则注释或删掉该行 }# 配置es节点 #NODES = [{"host": "127.0.0.1", "port": 9200}] NODES = [{"host": "172.16.0.247", "port": 9999}] TASKS = [ { "stream": { "database": "stdb01", "table": "test01" }, "jobs": [ # 同步插入、更新es数据 { "actions": ["insert", "update"], "pipeline": [ #{"only_fields": {"fields": ["id", "name", "age"]}}, ##只同步 id,name和age字段数据到es; 注释掉该行则同步全部字段的值到es {"set_id": {"field": "id"}} ], "dest": { "es": { "action": "upsert", "index": "test01_index", "type": "test01", "nodes": NODES } } }, #delete 同步删除es数据 { "actions": ["delete"], "pipeline": [ #{"only_fields": {"fields": ["id", "name", "age"]}}, ##只同步 id,name和age字段数据到es; 注释掉该行则同步全部字段的值到es {"set_id": {"field": "id"}} ], "dest": { "es": { "action": "delete", "index": "test01_index", "type": "test01", "nodes": NODES } } }] } ]# CUSTOM_ROW_HANDLERS = "./my_handlers.py" # CUSTOM_ROW_FILTERS = "./my_filters.py"

关于MySQLmom增量同步指定MySQL表数据到ES简单介绍完毕,请继续关注博主,后面还会有更精彩的文章继续分享。

    推荐阅读