ElasticSearch版本控制--java实现
一、前言
最近工作中有这样一个ElasticSearch(以下简称ES)写入的场景,Flink处理完数据实时写入ES。现在需要将一批历史数据通过Flink加载到到ES,有两个点需要保证:
- 对于历史数据,ES已有文档,则舍弃旧数据,ES没有则插入历史数据。
- 对于新数据,能对现有的ES数据进行更新。
二、代码实现及验证
代码实现 请求写数据时加入version和version_type参数,主要代码如下:
IndexRequest indexRequest = Requests.indexRequest()
.index(indexName)
.id("1")
// 指定版本比较的业务字段,具体业务具体分析,一般取时间戳较为合适
.version(Long.parseLong(dataMap1.get("create").toString()))
// 指定使用外部版本号
.versionType(VersionType.EXTERNAL)
.source(dataMap);
验证 验证demo可使用当前时间的时间戳作为版本比较依据。验证思路如下:
- 运行demo程序,在当前时间戳下,插入一条数据,通过kibana等工具检验数据是否插入成功。并记录当前的时间戳。
- 更改某些字段值对数据进行更新,再次运行程序,检验数据是否更新成功。
- 将时间版本比较的字段值固定为第一次执行程序的时间戳,检验数据是否更新成功。
文章图片
文章图片
文章图片
三、总结
由截图可看到,第一步和第二步都能执行成功,第三步执行会出现版本冲突的异常,根据提示很方便能识别出原因,即ElasticSearch进阶篇(一)--版本控制中得出的结论,使用version和version_type=EXTERNAL进行版本控制时,只有要写入文档的版本号大于已有文档的版本号才能更新成功。
【ElasticSearch版本控制--java实现】案例代码参考:elasticsearch_demo
推荐阅读
- 【Hadoop踩雷】Mac下安装Hadoop3以及Java版本问题
- 我的拖延症如何控制了我,又一次
- 真正的爱
- C语言的版本比较
- [源码解析]|[源码解析] NVIDIA HugeCTR,GPU版本参数服务器---(3)
- 每天听本书《控制焦虑》
- K8S|K8S 生态周报| Istio 即将发布重大安全更新,多个版本受影响
- 控制自己的心为什么这么难([追光日记(第2篇)])
- Caffe在Windows10下CPU版本的安装(cpu+anaconda3+vs2013+pycaffe)
- 《不要用爱控制我》