Es7.x使用RestHighLevelClient进行增删改和批量操作

  1. 引入依赖
  2. 初始化RestHighLevelClient和BulkProcessor对象
  3. 增删改操作
    3.1 数据准备
    3.2 单条数据异步插入
    3.3 单条数据同步插入
    3.4 批量插入
    3.5 更新操作
    3.6 带条件的更新语句
    3.7 批量更新
    3.8 删除操作
    3.9 条件删除
Java层面操作elasticSearch7.x,为了便于操作,不集成Spring,使用main方法进行调用。
1. 引入依赖
org.elasticsearch elasticsearch 7.5.1 org.elasticsearch.client elasticsearch-rest-client 7.5.1 org.elasticsearch.client elasticsearch-rest-high-level-client 7.5.1 org.elasticsearch.client elasticsearch-rest-client org.elasticsearch elasticsearch

2. 初始化RestHighLevelClient和BulkProcessor对象
RestHighLevelClientRestHighLevelClient是官方指定的连接API。另外一个是TransportClient,但是TransportClient这个是已经废弃不用的,所以会在ES8.0之后完全移除,也就是说8.0之后就无法使用了。
@Slf4j public class EsTest {//es操作客户端 private static RestHighLevelClient restHighLevelClient; //批量操作的对象 private static BulkProcessor bulkProcessor; static { List httpHosts = new ArrayList<>(); //填充数据 httpHosts.add(new HttpHost("172.26.17.11", 9200)); httpHosts.add(new HttpHost("172.26.17.11", 9201)); httpHosts.add(new HttpHost("172.26.17.11", 9202)); //填充host节点 RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0])); builder.setRequestConfigCallback(requestConfigBuilder -> { requestConfigBuilder.setConnectTimeout(1000); requestConfigBuilder.setSocketTimeout(1000); requestConfigBuilder.setConnectionRequestTimeout(1000); return requestConfigBuilder; }); //填充用户名密码 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("userName", "password")); builder.setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.setMaxConnTotal(30); httpClientBuilder.setMaxConnPerRoute(30); httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); return httpClientBuilder; }); restHighLevelClient = new RestHighLevelClient(builder); }static { bulkProcessor=createBulkProcessor(); }private static BulkProcessor createBulkProcessor() {BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { log.info("1. 【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions()); }@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (!response.hasFailures()) { log.info("2. 【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis()); } else { BulkItemResponse[] items = response.getItems(); for (BulkItemResponse item : items) { if (item.isFailed()) { log.info("2. 【afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage()); break; } } } }@Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {List> requests = request.requests(); List esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList()); log.error("3. 【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure); } }; BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> { restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener); }), listener); //到达10000条时刷新 builder.setBulkActions(10000); //内存到达8M时刷新 builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB)); //设置的刷新间隔10s builder.setFlushInterval(TimeValue.timeValueSeconds(10)); //设置允许执行的并发请求数。 builder.setConcurrentRequests(8); //设置重试策略 builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3)); return builder.build(); } }

整个项目可以共用一个BulkProcessor,可以配置多种刷新策略,将数据由内存刷新到es中。
3. 增删改操作 3.1 数据准备
PUT test_demoPUT test_demo/_mapping { "properties":{ "title":{ "type":"text" }, "tag":{ "type":"keyword" }, "publishTime":{ "type":"date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" } } }GET test_demo/_search { "query": { "match_all": {} } }

3.2 单条数据异步插入
public static void testAsyncSingle() { IndexRequest indexRequest = new IndexRequest("test_demo"); DemoDto demoDto = new DemoDto(2001L, "印度新冠疫情失控", "世界", new Date()); indexRequest.source(JSON.toJSONString(demoDto), XContentType.JSON); indexRequest.timeout(TimeValue.timeValueSeconds(1)); indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); //数据为存储而不是更新 indexRequest.create(false); indexRequest.id(demoDto.getId() + ""); restHighLevelClient.indexAsync(indexRequest, RequestOptions.DEFAULT, new ActionListener() { @Override public void onResponse(IndexResponse indexResponse) { ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { log.error("将id为:{}的数据存入ES时存在失败的分片,原因为:{}", indexRequest.id(), failure.getCause()); } } }@Override public void onFailure(Exception e) { log.error("{}:存储es时异常,数据信息为", indexRequest.id(), e); } }); }

3.3 单条数据同步插入
public static void testSingleAdd() throws IOException { IndexRequest indexRequest = new IndexRequest("test_demo"); DemoDto demoDto = new DemoDto(3001L, "es单数据同步插入2", "IT", new Date()); indexRequest.source(JSON.toJSONString(demoDto), XContentType.JSON); indexRequest.id("3001"); indexRequest.timeout(TimeValue.timeValueSeconds(1)); indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); indexRequest.create(true); indexRequest.id(demoDto.getId() + ""); restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); }

  1. indexRequest.id(demoDto.getId() + ""); —— 填充"_id"字段。
    Es7.x使用RestHighLevelClient进行增删改和批量操作
    文章图片
    image.png
  2. indexRequest.create(true); ——设置操作类型
public IndexRequest create(boolean create) { if (create) { return opType(OpType.CREATE); } else { return opType(OpType.INDEX); } }

  • OpType.CREATE:当存在相同的_id时,插入会出现异常;
  • OpType.INDEX:当存在相同_id时,插入会进行覆盖;
当设置OpType.CREATE时相同id插入异常看出,es进行了乐观锁控制并发写冲突。
Elasticsearch exception [type=version_conflict_engine_exception, reason=[3001]: version conflict, document already exists (current version [3])]

3.4 批量插入 由于设置了BulkProcessor对象,可以将数据设置到BulkProcessor对象中,根据策略批量的刷新到Es中。
/** * 批量插入 */ public static void testBatch() { List indexRequests = new ArrayList<>(); ArrayList demoDtos = new ArrayList<>(); demoDtos.add(new DemoDto(1001L, "中国是中国人的中国", "中国", new Date())); demoDtos.add(new DemoDto(1002L, "2008年奥运会", "体育", new Date())); demoDtos.forEach(e -> { IndexRequest request = new IndexRequest("test_demo"); //填充id request.id(e.getId() + ""); //先不修改id request.source(JSON.toJSONString(e), XContentType.JSON); request.opType(DocWriteRequest.OpType.CREATE); indexRequests.add(request); }); indexRequests.forEach(bulkProcessor::add); }

3.5 更新操作 【Es7.x使用RestHighLevelClient进行增删改和批量操作】更新操作传入的doc为map对象,而不是json字符串,否则会抛出异常。
public static void testSingleUpdate() throws IOException {UpdateRequest updateRequest = new UpdateRequest("test_demo", "3001"); Map kvs = new HashMap<>(); kvs.put("title", "es单数据更新啦!"); updateRequest.doc(kvs); updateRequest.timeout(TimeValue.timeValueSeconds(1)); updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); //数据为存储而不是更新 restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT); }

3.6 带条件的更新语句
public static void testSingleUpdateQuery() throws IOException {UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(); updateByQueryRequest.indices("test_demo"); updateByQueryRequest.setQuery(new TermQueryBuilder("id", 3001)); updateByQueryRequest.setScript(new Script(ScriptType.INLINE, "painless", "ctx._source.tag='电脑'", Collections.emptyMap())); //数据为存储而不是更新 restHighLevelClient.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT); }

3.7 批量更新
/** * 批量更新 */ private static void testBatchUpdate() {List updateRequests=new ArrayList<>(); //更新的数据 List params=new ArrayList<>(); params.add(new DemoDto(2001L)); params.add(new DemoDto(3001L)); params.forEach(e->{ //获取id UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("test_demo"); //更新的id updateRequest.id(e.getId()+""); //更新的数据 Map map=new HashMap<>(); map.put("title","美国社会动荡"); updateRequest.doc(map); updateRequests.add(updateRequest); }); updateRequests.forEach(bulkProcessor::add); }

3.8 删除操作
/** * 单个删除 */ private static void testSingleDel() throws IOException { DeleteRequest deleteRequest=new DeleteRequest(); deleteRequest.index("test_demo"); deleteRequest.id("3001"); restHighLevelClient.delete(deleteRequest,RequestOptions.DEFAULT); }

3.9 条件删除
/** * 单个条件删除 */ private static void testSingleDelQuery() throws IOException { DeleteByQueryRequest deleteByQueryRequest=new DeleteByQueryRequest(); deleteByQueryRequest.indices("test_demo"); deleteByQueryRequest.setQuery(new MatchQueryBuilder("title","国年")); //分词式删除 restHighLevelClient.deleteByQuery(deleteByQueryRequest,RequestOptions.DEFAULT); }

    推荐阅读