远程读取elasticSearch数据库并导出数据

【远程读取elasticSearch数据库并导出数据】最近刚开完题,毕设是使用机器学习算法对电磁数据中的异常进行检测。所有的电磁数据都存储在分布式数据库es中,所以第一步需要导出数据,这两天写了点这部分的程序,已经导出部分数据。

package org.elasticsearch.esTest; import java.awt.List; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.concurrent.ExecutionException; //maven管理依赖 import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.index.query.QueryBuilders.*; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.search.SearchHits; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; /** * Hello world! * */ public class EsClient { static File trace = new File("E:/es data/emcas-2018.01.04_trace.txt"); static File warning = new File("E:/es data/emcas-2018.01.04_warning.txt"); static File other = new File("E:/es data/emcas-2018.01.04_other.txt"); public static ClientgetClient() throws IOException { Settings settings = ImmutableSettings.settingsBuilder() .put("cluster.name", "estest1") .build(); TransportClient client = new TransportClient(settings).addTransportAddress( new InetSocketTransportAddress("10.10.41.153", 9300)); //FileWriter fw = new FileWriter(article); //BufferedWriter bfw = new BufferedWriter(fw); return client; }public static HashSet write2File(Client client) throws IOException{ long start = System.currentTimeMillis(); int scrollSize = 1000; SearchResponse response = null; FileWriter fw_trace1 = new FileWriter(trace); BufferedWriter bfw1 = new BufferedWriter(fw_trace1); FileWriter fw_warning1 = new FileWriter(warning); BufferedWriter bfw2 = new BufferedWriter(fw_warning1); FileWriter fw_other1 = new FileWriter(trace); BufferedWriter bfw3 = new BufferedWriter(fw_other1); //ArrayListcollectid = new ArrayList(); HashSet collectid = new HashSet(); int i =0; while (response == null || response.getHits().hits().length != 0 && i <=1) { //if(i % 100 == 0){ //fw = new FileWriter(autoCreateFile(i/10+1)); //BufferedWriter bfw1 = new BufferedWriter(fw); //bfw = bfw1; //System.out.println("这是第"+i/10+"万条数据"); //} try{ response = client.prepareSearch("emcas-2017.10.16") .setQuery(QueryBuilders.matchAllQuery()) .setSize(scrollSize) .setFrom(i*scrollSize) //.setFrom(0) .execute() .actionGet(); } catch (IndexMissingException e) { System.out.println("not found"); }SearchHits hits = response.getHits(); int trace_count = 0; int warning_count =0; int other_count = 0; for(int j = 0 ; j < hits.getHits().length; j++){ String jsonstr = hits.getHits()[j] .getSourceAsString(); JSONObject json_1 = JSON.parseObject(jsonstr); System.out.println(json_1); if(json_1.get("eventType").equals("trace")){ trace_count++; collectid.add(json_1.get("collectorId")); if(trace_count % 100000 == 0){ FileWriter fw_trace2 = new FileWriter(autoCreateFile(trace_count/100000)); BufferedWriter bfw_trace = new BufferedWriter(fw_trace2); bfw1 = bfw_trace; } bfw1.write(json_1.toString()+'\r'); bfw1.flush(); }else if(json_1.get("eventType").equals("warning")){ warning_count++; if(warning_count % 100 == 0){ FileWriter fw_warning2 = new FileWriter(autoCreateFile(warning_count/100)); BufferedWriter bfw_warning2 = new BufferedWriter(fw_warning2); bfw2 = bfw_warning2; } bfw2.write(json_1.toString()+'\r'); bfw2.flush(); }else{ other_count++; if(other_count % 100 == 0){ FileWriter fw_other2 = new FileWriter(autoCreateFile(other_count/100)); BufferedWriter bfw_other2 = new BufferedWriter(fw_other2); bfw3 = bfw_other2; } bfw3.write(json_1.toString()+'\r'); bfw3.flush(); } } i++; } bfw1.close(); bfw2.close(); bfw3.close(); fw_other1.close(); fw_trace1.close(); fw_warning1.close(); long end = System.currentTimeMillis(); long totalTime = end - start; System.out.println("总耗时:"+totalTime); return collectid; }public static File autoCreateFile(int i ) throws IOException { File file = new File("E:/es data/"+i+".txt"); file.createNewFile(); return file; }public static void main(String[] args) throws InterruptedException, ExecutionException, IOException { EsClient instance = new EsClient(); Client client = instance.getClient(); HashSet hashSet = new HashSet(); hashSet = write2File(client); for (Object object : hashSet) { System.out.println(object); } System.out.println(hashSet.size()+"size!!!!!!!!"); //GetResponse response = client.prepareGet("emcas-2017.10.18","trace","AV8tK5NeSBmsIUk260HQ") //GetResponse response = client.prepareGet("emcas-2017.10.18","status","4") //.execute() //.actionGet(); //System.out.println(response.getSource()); //用于计算es数据库中一个index下docs的总记录数 //SearchResponse response2 = client.prepareSearch("emcas-2018.01.04") //.setQuery(QueryBuilders.matchAllQuery()) //.setSize(0) //.execute() //.actionGet(); //SearchHits hits = response2.getHits(); //long hitscount = hits.getTotalHits(); //System.out.println(hitscount); } }

    推荐阅读