数据库|logstash同步mysql数据至elastic

logstash同步mysql数据至elastic 题记

项目数据采集、数据分析需要,我们需要同步MySQL数据至ES;在研发、测试、压测过程中,我们采用了代码同步存储MySQL数据至ESMQ异步消息方式同步数据,定时任务批量同步;但是最终发现了一个问题。
问题分析
【数据库|logstash同步mysql数据至elastic】不管是何种方式,需要通过JAVA代码调起,操作相对繁琐,整体性能不高,由于项目自身已经介入了ELK,尝试通过Logstash数据采集的形式实现相关数据同步。
参考资料
https://www.elastic.co/guide/en/logstash/6.4/plugins-inputs-jdbc.html
https://www.elastic.co/guide/en/logstash/6.4/plugins-outputs-elasticsearch.html
https://www.elastic.co/guide/en/logstash/6.4/plugins-outputs-stdout.html
实现方案
logstash conf 配置文件 test_user.conf
input { jdbc { jdbc_driver_library => "/opt/mysql/mysql-connector-java-8.0.19.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://10.x.3.xxx:3306/user_table" jdbc_user => "root" jdbc_password => "bsakjd7sl2ada" # TODO schedule =>"* * * * *" statement => "SELECT id ,user_id ,user_name, update_time FROM user_db where update_time >:sql_last_value" type => "user_table" } # 多表实现 jdbc { jdbc_driver_library => "/opt/mysql/mysql-connector-java-8.0.19.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://10.x.3.xxx:3306/user_db" jdbc_user => "root" jdbc_password => "bsakjd7sl2ada" # TODO schedule =>"* * * * *" statement => "SELECT id ,user_id ,user_name, user_phone, user_address, update_time FROM user_table_msg where update_time >:sql_last_value" type => "user_table_msg" } } filter { } output { if[type]=="user_table"{ elasticsearch{ # 集群:["",""] hosts => "http://10.x.x.xxx:9200" user => "elastic" password => "S8sdshlD" index => "user_table" document_id => "%{id}" document_type => "user_table" } } # 多表实现 if[type]=="user_table_msg"{ elasticsearch{ # 集群:["",""] hosts => "http://10.x.x.xxx:9200" user => "elastic" password => "S8sdshlD" index => "user_table_msg" document_id => "%{id}" document_type => "user_table_msg" } } stdout { # JSON输出 # codec => json codec => json_lines } }

mysql建表语句
建表语句
-- 用户表 CREATE TABLE `USER_TABLE` ( `ID` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '主键id', `USER_ID` VARCHAR(32) NOT NULL DEFAULT '' COMMENT '用户id', `USER_NAME` VARCHAR(100) NOT NULL DEFAULT '' COMMENT '用户名称', `UPDATE_TIME` TIMESTAMP NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`) ) ENGINE=INNODB DEFAULT CHARSET=UTF8 COMMENT '用户表'; -- 用户信息表 CREATE TABLE `USER_TABLE_MSG` ( `ID` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '主键id', `USER_ID` VARCHAR(32) NOT NULL DEFAULT '' COMMENT '用户id', `USER_NAME` VARCHAR(100) NOT NULL DEFAULT '' COMMENT '用户名称', `USER_PHONE` VARCHAR(11) NOT NULL DEFAULT '' COMMENT '用户手机号', `USER_ADDRESS` VARCHAR(255) NOT NULL DEFAULT '' COMMENT '用户地址', `UPDATE_TIME` TIMESTAMP NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`) ) ENGINE=INNODB DEFAULT CHARSET=UTF8 COMMENT '用户信息表';

测试验证 登录kibana查询索引user_table相关数据信息,按照既定的update_time验证数据同步条件是否正常。
问题延伸
在上述问题分析、开发、验证的过程中,我们采用logstash plugin完成了数据从mysql数据库到elastic的抽取过程,但是如果数据量过大,需要怎么处理呢,我们如何解决单次查询无法覆盖增量数据的情况。
如何完成循环查询 select () update_time >:sql_last_value limit 0, 1000;
分页策略实现
上述文件TODO位置增加如下配置
# 这将导致SQL语句分为多个查询。 # 每个查询都将使用限制和偏移来集体检索完整结果集。 # 使用jdbc_page_size设置限制大小。 jdbc_paging_enabled => true jdbc_page_size => 1000

再次验证
  • 造数:在最新时间节点后,单次提交增加1000+以上数据;
  • 重启:重启节点,重新编译加载test_user.conf文件
  • 查看logstash采集日志,登录kibana平台查看数据;
多表实现
  • input模块增加jdbc 节点,typeuser_table_msg
  • output模块增加if[type]=="user_table_msg"节点;

    推荐阅读