logstash同步mysql数据至elastic
题记
项目数据采集、数据分析需要,我们需要同步问题分析MySQL
数据至ES
;在研发、测试、压测过程中,我们采用了代码同步存储MySQL
数据至ES
,MQ
异步消息方式同步数据,定时任务批量同步;但是最终发现了一个问题。
【数据库|logstash同步mysql数据至elastic】不管是何种方式,需要通过参考资料JAVA
代码调起,操作相对繁琐,整体性能不高,由于项目自身已经介入了ELK
,尝试通过Logstash
数据采集的形式实现相关数据同步。
https://www.elastic.co/guide/en/logstash/6.4/plugins-inputs-jdbc.html实现方案
https://www.elastic.co/guide/en/logstash/6.4/plugins-outputs-elasticsearch.html
https://www.elastic.co/guide/en/logstash/6.4/plugins-outputs-stdout.html
logstash conf
配置文件
test_user.conf
input {
jdbc {
jdbc_driver_library => "/opt/mysql/mysql-connector-java-8.0.19.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://10.x.3.xxx:3306/user_table"
jdbc_user => "root"
jdbc_password => "bsakjd7sl2ada"
# TODO
schedule =>"* * * * *"
statement => "SELECT id ,user_id ,user_name, update_time FROM user_db where update_time >:sql_last_value"
type => "user_table"
}
# 多表实现
jdbc {
jdbc_driver_library => "/opt/mysql/mysql-connector-java-8.0.19.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://10.x.3.xxx:3306/user_db"
jdbc_user => "root"
jdbc_password => "bsakjd7sl2ada"
# TODO
schedule =>"* * * * *"
statement => "SELECT id ,user_id ,user_name, user_phone, user_address, update_time FROM user_table_msg where update_time >:sql_last_value"
type => "user_table_msg"
}
}
filter {
}
output {
if[type]=="user_table"{
elasticsearch{
# 集群:["",""]
hosts => "http://10.x.x.xxx:9200"
user => "elastic"
password => "S8sdshlD"
index => "user_table"
document_id => "%{id}"
document_type => "user_table"
}
}
# 多表实现
if[type]=="user_table_msg"{
elasticsearch{
# 集群:["",""]
hosts => "http://10.x.x.xxx:9200"
user => "elastic"
password => "S8sdshlD"
index => "user_table_msg"
document_id => "%{id}"
document_type => "user_table_msg"
}
}
stdout {
# JSON输出
# codec => json
codec => json_lines
}
}
mysql建表语句
建表语句
-- 用户表
CREATE TABLE `USER_TABLE` (
`ID` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`USER_ID` VARCHAR(32) NOT NULL DEFAULT '' COMMENT '用户id',
`USER_NAME` VARCHAR(100) NOT NULL DEFAULT '' COMMENT '用户名称',
`UPDATE_TIME` TIMESTAMP NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`ID`)
) ENGINE=INNODB DEFAULT CHARSET=UTF8 COMMENT '用户表';
-- 用户信息表
CREATE TABLE `USER_TABLE_MSG` (
`ID` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`USER_ID` VARCHAR(32) NOT NULL DEFAULT '' COMMENT '用户id',
`USER_NAME` VARCHAR(100) NOT NULL DEFAULT '' COMMENT '用户名称',
`USER_PHONE` VARCHAR(11) NOT NULL DEFAULT '' COMMENT '用户手机号',
`USER_ADDRESS` VARCHAR(255) NOT NULL DEFAULT '' COMMENT '用户地址',
`UPDATE_TIME` TIMESTAMP NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`ID`)
) ENGINE=INNODB DEFAULT CHARSET=UTF8 COMMENT '用户信息表';
测试验证 登录
kibana
查询索引user_table
相关数据信息,按照既定的update_time
验证数据同步条件是否正常。问题延伸
在上述问题分析、开发、验证的过程中,我们采用如何完成循环查询logstash plugin
完成了数据从mysql
数据库到elastic
的抽取过程,但是如果数据量过大,需要怎么处理呢,我们如何解决单次查询无法覆盖增量数据的情况。
select () update_time >:sql_last_value limit 0, 1000;
分页策略实现
上述文件TODO
位置增加如下配置
# 这将导致SQL语句分为多个查询。
# 每个查询都将使用限制和偏移来集体检索完整结果集。
# 使用jdbc_page_size设置限制大小。
jdbc_paging_enabled => true
jdbc_page_size => 1000
再次验证
- 造数:在最新时间节点后,单次提交增加
1000+
以上数据; - 重启:重启节点,重新编译加载
test_user.conf文件
; - 查看
logstash
采集日志,登录kibana
平台查看数据;
input
模块增加jdbc
节点,type
为user_table_msg
;output
模块增加if[type]=="user_table_msg"
节点;
推荐阅读
- 数据库|MySQL 配置主从复制实践
- 数据库|MySQL 主从复制原理
- 数据库|MySQL SQL的完整处理流程
- 数据库|MySQL表锁、行锁、排它锁和共享锁
- 数据库|MySQL存储引擎以及索引
- 数据库|MySQL数据类型、运算符以及数据库范式
- 数据库|MySQL SQL和索引优化总结
- 网络安全|测试攻击机伪装成目标机 IP 给目标机发送攻击报文是否成功
- 大数据|记一次解决clickhouse内存增长的问题