Elasticsearch|Mysql同步数据到Elasticsearch(Logstash)

在使用Elasticsearch时,数据源一般可以从数据库,消息中间件,爬虫获取。
数据库的话就需要使用logstash进行数据同步处理。
logstash实现mysql到es的数据同步 下载地址:Download Logstash Free | Get Started Now | Elastic
1.全量同步

(1)在bin同级目录中创建mysql (2)在mysql中创建jdbc.conf和xxx.sql文件。同时放入一直mysql的jar包

Elasticsearch|Mysql同步数据到Elasticsearch(Logstash)
文章图片

jdbc.conf文件: ? ? input { stdin { } jdbc { # mysql数据库连接 jdbc_connection_string => "jdbc:mysql://localhost:3306/han?characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai" # mysqly用户名和密码 jdbc_user => "root" jdbc_password => "root" # 驱动配置 jdbc_driver_library => "D:\Elasticsearch\logstash-7.15.2\mysql\mysql-connector-java-5.1.30.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" #设置分页,一页1000条数据 jdbc_paging_enabled => "true" jdbc_page_size => "1000"# 执行指定的sql文件 statement_filepath => "D:\Elasticsearch\logstash-7.15.2\mysql\test.sql"# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false); #lowercase_column_names => false #执行的sql语句 #statement => "SELECT * FROM items" # 设置监听 各字段含义 分 时 天 月年 ,默认全部为*代表含义:每分钟都更新 #schedule => "* * * * *" #10秒刷新一次 schedule => "*/10 * * * * *" # 索引类型 #type => "blog" } } ? filter { json { source => "message" remove_field => ["message"] } } output { elasticsearch { #es服务器 hosts => ["localhost:9200"] #ES索引名称 index => "test" #自增ID document_id => "%{id}" #索引类型 document_type => "_doc" } stdout { codec => json_lines } }

test.sql中:

select * from items

2.增量同步 前提是数据库中需要添加update_time和is_delete字段。
update_time的类型是timestamp,并且使用navicat中创建时需要在默认处使用CURRENT_TIMESTAMP,且勾选根据当前时间戳更新。 实现更新数据后刷新update_time。
Elasticsearch|Mysql同步数据到Elasticsearch(Logstash)
文章图片

jdbc.conf中的配置信息

jdbc.conf
input { stdin { } jdbc { # mysql数据库连接 jdbc_connection_string => "jdbc:mysql://localhost:3306/han?characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai" # mysqly用户名和密码 jdbc_user => "root" jdbc_password => "root" # 驱动配置 jdbc_driver_library => "D:\Elasticsearch\logstash-7.15.2\mysql\mysql-connector-java-5.1.30.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" #设置分页,一页1000条数据 jdbc_paging_enabled => "true" jdbc_page_size => "1000"#处理中文乱码问题 #codec => plain { charset => "UTF-8"}#使用其它字段追踪,而不是用时间 use_column_value => true #追踪的字段 tracking_column => update_time #追踪字段的类型 tracking_column_type => "timestamp" #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到last_run_metadata_path指定的文件中 record_last_run => true last_run_metadata_path => "D:\Elasticsearch\logstash-7.15.2\mysql\station_parameter.txt"# 执行指定的sql文件 statement_filepath => "D:\Elasticsearch\logstash-7.15.2\mysql\test.sql" #执行的sql语句 #statement => "SELECT * FROM items" # 设置监听 各字段含义 分 时 天 月年 ,默认全部为*代表含义:每分钟都更新 #schedule => "* * * * *" #10秒刷新一次 schedule => "*/10 * * * * *" # 索引类型 #type => "blog" } } ? filter { # 用来解决数据库中删除标识为1时,便从es中去掉该条数据,结合output中的action =>。。。 # is_delete、disabled =1为删除或者禁用的数据,这里判断然后给加上delete标识 # 需要注意的是这里的字段必须是mysql导入es后的数据别名(能精确定位到其字段) # 我理解的action相当于 @metadata对象的一个属性,应该还可以加其他的,比如[@metadata][test] if [is_delete] == 1 or [diasbled] == 1 { mutate{ add_field => { "[@metadata][action]" => "delete"}} } else { mutate{ add_field => { "[@metadata][action]" => "index"}} }json { source => "message" remove_field => ["message"] } } output { elasticsearch { #es服务器 hosts => ["localhost:9200"] #ES索引名称 index => "test" #自增ID(这里的id必须是不能有别的字段与其重复eg:导入es两个id字段) document_id => "%{id}" #索引类型 document_type => "_doc"# 主要实现想法,就来源于这里action可以指定,那么我前面给数据打上标识,就可以实现删除了 action => "%{[@metadata][action]}" } stdout { codec => json_lines } }

xxx.sql
select *,date_add(update_time,interval 8 hour) update_time_add from items where update_time > (select date_add(:sql_last_value, interval 8 hour) update_time)

时区问题(需注意)
因为logstash的时区比mysql的时区相差了8个小时,所以在查询的时候在where条件中增加了8个小时,同时如果同步的数据中存在时间,那么也需要增加8个小时,不使用原来的时间,使用增加了8个小时的字段 update_time_add 保证时间正确同步过去。
在使用SpringBoot客户端来获取es中的数据时以及进行时间判断都需要考虑es的时区问题
3.启动 打开文件夹到logstash中的bin目录下,使用cmd命令
logstash.bat -f ../mysql/xxx.conf

4.注意事项 (1)多表update_time问题
当联合mysql中多个表的数据导入es中时,会遇到多个表更新时间的问题。就是当其中一个子表更新时,主表的数据并没有更新。而且logstash中只能追踪一个字段,所有多个表中的update_time就是一个问题。
如何解决呢?
可以在mysql同步数据到es中时,根据情节使用mysql中的函数:greatest或者least进行取值。
greatest(字段1,字段2,字段3,..,字段n) 取最大值
least(字段1,字段2,字段3,...,字段n) 取最小值

eg:greatest(a.update_time,b.update_time,c.update_time,..,字段n) AS max_update_time
就可以拿到多表中的update_time中的最新时间,然后将这个字段进行追踪,并将上一条的数据保存到一个文件中。然后在进行同步的mysql语句中进行判断文件中的内容。便可以监控到更新,实现增量同步。
:sql_last_value
代表的就是将追踪字段的值记录下来后保存在指定文件中的数据。可以通过mysql语句进行和它的判断来实现对应数据的同步增量
#使用其它字段追踪,而不是用时间 use_column_value => true #追踪的字段 tracking_column => max_update_time #追踪字段的类型 tracking_column_type => "timestamp" #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到last_run_metadata_path指定的文件中 record_last_run => true last_run_metadata_path => "D:\Elasticsearch\logstash-7.15.2\mysql\station_parameter.txt"

select ..., greatest(`cbase`.`OPERATION_TIME`,`subject`.`OPERATION_TIME`,`division`.`OPERATION_TIME`) AS max_update_time from ((`test_a` `a` join `test_b` `b`) join `test_c` `c`) where () and (... and (greatest(`a`.`update_time`,`b`.`update_time`,`c`.`update_time`) > :sql_last_value))

注意:_id值必须设置,不然会出现更新的数据添加到原有数据上,并不会实现在原有数据上进行更新。
当不确定表中的OPERATION_TIME字段是否为null时,使用if函数并在其中使用length判断该字段的长度来进行判断是否为null。

greatest(IF(LENGTH(cperform.OPERATION_TIME)>0,cperform.OPERATION_TIME,1),IF(LENGTH(cbase.OPERATION_TIME)>0,cbase.OPERATION_TIME,1),IF(LENGTH(sub.OPERATION_TIME)>0,sub.OPERATION_TIME,1),IF(LENGTH(division.OPERATION_TIME)>0,division.OPERATION_TIME,1)) AS max_update_time


(2)定时任务
# 设置监听 各字段含义 分 时 天 月年 ,默认全部为*代表含义:每分钟都更新 #schedule => "* * * * *" #10秒刷新一次 schedule => "*/10 * * * * *" #5秒刷新一次 schedule => "*/5 * * * * *" #一个小时刷新一次 schedule => "* */1 * * *"

(3)实现删除增量
通过filter过滤器,监听软删除字段是否为1,使得话便将该条数据前加上删除的标识
"@metadata" => "delete"
然后在output中实现将处理后的数据输出到下游
这里就是从mysql上游input中获取数据进行处理,然后output将处理后的数据输出到下游,也就是es
使用action便将数据进行类型的区划分区别。

logstash的action
Value type is string Default value is “index” 默认指定的就是index Protocol agnostic (i.e. non-http, non-java specific) configs go here Protocol agnostic methods The Elasticsearch action to perform. Valid actions are: ? index: indexes a document (an event from Logstash). delete: deletes a document by id (An id is required for this action) create: indexes a document, fails if a document by that id already exists in the index. update: updates a document by id. Update has a special case where you can upsert?—?update a document if not already present. See the upsert option. NOTE: This does not work and is not supported in Elasticsearch 1.x. Please upgrade to ES 2.x or greater to use this feature with Logstash! A sprintf style string to change the action based on the content of the event. The value %{[foo]} would use the foo field for the action For more details on actions, check out the Elasticsearch bulk API documentation


然后就会实现不同类型的数据被输出到下游es中,delete的数据会被删除,index的数据被添加进去,update的数据获得更新......

filter { # 用来解决数据库中删除标识为1时,便从es中去掉该条数据,结合output中的action =>。。。 # is_delete、disabled =1为删除或者禁用的数据,这里判断然后给加上delete标识 # 需要注意的是这里的字段必须是mysql导入es后的数据别名(能精确定位到其字段) # 我理解的action相当于 @metadata对象的一个属性,应该还可以加其他的,比如[@metadata][test] if [is_delete] == 1 or [diasbled] == 1 { mutate{ add_field => { "[@metadata][action]" => "delete"}} } else { mutate{ add_field => { "[@metadata][action]" => "index"}} }json { source => "message" remove_field => ["message"] } } ? output { elasticsearch { #es服务器 hosts => ["localhost:9200"] #ES索引名称 index => "test" #自增ID(这里的id必须是不能有别的字段与其重复eg:导入es两个id字段) document_id => "%{id}" #索引类型 document_type => "_doc"# 主要实现想法,就来源于这里action可以指定,那么我前面给数据打上标识,就可以实现删除了 action => "%{[@metadata][action]}" }


(4)mysql到es的数据类型问题
timestamp
timestamp的数据2019-09-09 16:37:38到es中后会变成2019-09-09T08:37:38.000Z
会发现实际的时间会减少8个小时。
所以在一些需要追踪关于时间的字段的情境下,就需要注意时间的问题。
但是在追踪字段时,会选择性的将上一条的该字段信息保存到一个自定义的文件上。
2021-12-06T03:02:58.000Z保存到本地文件上后,会再加上这8个小时的缺失。
--- 2021-12-06 11:02:58.000000000 +08:00

mysql中时间添加的语句:date_add(update_time,interval 8 hour) 就会在update_time的基础上添加8小时

英文大小写问题
input中可以通过配置来控制英文的大小写问题。
# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false); lowercase_column_names => false


【Elasticsearch|Mysql同步数据到Elasticsearch(Logstash)】欢迎各位大佬批评指正,如需转载请表明出处......

    推荐阅读