Es封装RestHighLevelClient和BulkProcessor的工具类

【Es封装RestHighLevelClient和BulkProcessor的工具类】Es的基础工具类,可以获取单例的RestHighLevelClient类和BulkProcessor类。
1. 引入依赖
org.elasticsearch elasticsearch 7.5.1 org.elasticsearch.client elasticsearch-rest-client 7.5.1 org.elasticsearch.client elasticsearch-rest-high-level-client 7.5.1 org.elasticsearch.client elasticsearch-rest-client org.elasticsearch elasticsearch

2. 工具类
@Slf4j public class EsUtil {private static RestHighLevelClient restHighLevelClient; private static BulkProcessor bulkProcessor; static { List httpHosts = new ArrayList<>(); //填充数据 httpHosts.add(new HttpHost("120.0.0.1", 9200)); httpHosts.add(new HttpHost("120.0.0.1", 9201)); httpHosts.add(new HttpHost("120.0.0.1", 9202)); //填充host节点 RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0])); builder.setRequestConfigCallback(requestConfigBuilder -> { requestConfigBuilder.setConnectTimeout(1000); requestConfigBuilder.setSocketTimeout(1000); requestConfigBuilder.setConnectionRequestTimeout(1000); return requestConfigBuilder; }); //填充用户名密码 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("username", "password")); builder.setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.setMaxConnTotal(30); httpClientBuilder.setMaxConnPerRoute(30); httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); return httpClientBuilder; }); restHighLevelClient = new RestHighLevelClient(builder); }static { bulkProcessor=createBulkProcessor(); }private static BulkProcessor createBulkProcessor() {BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { log.info("1. 【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions()); }@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (!response.hasFailures()) { log.info("2. 【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis()); } else { BulkItemResponse[] items = response.getItems(); for (BulkItemResponse item : items) { if (item.isFailed()) { log.info("2. 【afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage()); break; } } } }@Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {List> requests = request.requests(); List esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList()); log.error("3. 【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure); } }; BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> { restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener); }), listener); builder.setBulkActions(10000); builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB)); //设置允许执行的并发请求数。 builder.setConcurrentRequests(8); builder.setFlushInterval(TimeValue.timeValueSeconds(1)); //设置重试策略 builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3)); return builder.build(); }public static RestHighLevelClient getRestHighLevelClient() { return restHighLevelClient; }public static BulkProcessor getBulkProcessor() { return bulkProcessor; }//远程调用 public static List remoteSearch(SearchRequest searchRequest, SearchSourceBuilder searchSourceBuilder) throws IOException { List results = new ArrayList<>(); searchRequest.indices("test_demo"); searchRequest.source(searchSourceBuilder); log.info("dsl:" + searchSourceBuilder.toString()); SearchResponse response = EsUtil.getRestHighLevelClient().search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = response.getHits(); Iterator iterator = hits.iterator(); while (iterator.hasNext()) { SearchHit next = iterator.next(); log.info("输出分数:" + next.getScore()); log.info("输出数据:" + next.getSourceAsString()); results.add(next); } return results; } }

    推荐阅读