天下之事常成于困约,而败于奢靡。这篇文章主要讲述Kettle实现ES到ES循环增量抽取相关的知识,希望能为你提供帮助。
主页:??小王叔叔的博客??欢迎来访
支持:点赞?收藏 ?关注
本博客内容,实践前,请先逐一浏览,然后再逐一学习1、效果
2、实现2.1 创建数据库
见 ??Kettle安装使用??
2.2 创建作业
2.2.1 初始化变量:设置变量,通过变量实现作业的循环更新初始值
parent_job.setVariable("isContinue", "1");
parent_job.setVariable("lastUpdateTime", "");
true;
2.2.2 创建核心转换
【见2.3】
2.2.3 写日志记录
isContinue = $isContinue-------------------
lastUpdateTime = $lastUpdateTime===============
2.2.4 设置循环
通过【2.2】中设置,可以将基本循环抽取动作的作业可以实现循环。
2.3 创建转换:关键处!!!
思路:
1.通过mysql中kettle业务抽取的时间备用表,进行设置最后一次修改更新时间。
2.设置基本循环单次抽取的条数,和基本抽取的json格式
3.设置抽取的数据源
4.解析抽取后的es中内置的hits-source的相关结构
5.成功解析之后,将抽取到的数据进行入库,同时变量获取最新的更新时间保存到MySQL中,便于下次更新使用
2.3.1 选择数据源
select round(unix_timestamp(timetable_dev.modify_time)*1000) as modifyTime, 1 as isContinue
from es_kettle.timetable_dev WHERE index_name = sta_resource_operation
2.3.2 更新常量
"from":0,"size":10,"query":"bool":"filter":["bool":"must":["range":"last_update_time":"from":startTime,"to":null,"include_lower":true,"include_upper":true,"boost":1],"adjust_pure_negative":true,"boost":1],"adjust_pure_negative":true,"boost":1,"sort":["last_update_time":"order":"asc"]
2.3.4 参数替换
执行SQL脚本:
update es_kettle.timetable_dev set modify_time = FROM_UNIXTIME(?, %Y-%m-%d %H:%i:%S) where index_name = sta_resource_operation
以上就是ES通过作业,转换进行抽取到新的ES结果
3、注意事项3.1)设置对应字段
3.2)组件之间的关联性
4、最后完成效果
转载声明:本文为博主原创文章,未经博主允许不得转载
??注意 ~
【Kettle实现ES到ES循环增量抽取】
推荐阅读
- 浮动静态路由
- SpringSession的源码解析(生成session,保存session,写入cookie全流程分析)
- 面试官灵魂三问(什么是SOA(什么是微服务?SOA和微服务有什么区别?))
- 多线程基本概念(并发与并行线程与进程)和入门案例
- 最简单的通用Mapper的使用手册不了解一下()
- Veeam Backup Configuration Tool
- Harbor 2.5.1新版发布,赶紧升级尝尝鲜
- Java使用FreeMarker模版技术动态生成word实践
- 优维低代码(构件事件传递)