Spring|springboot项目(spark 2 操作es6+)

本文介绍springboot下 spark2 操作es6.
1、环境

springboot 2.1.3.RELEASE es: elasticsearch 6.4.3 spark : 2.4.3 scala: 2.11.12

【Spring|springboot项目(spark 2 操作es6+)】本人MAC本相关环境变量配置如下:
JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home" CLASS_PATH="$JAVA_HOME/lib" PATH=".:$PATH:$JAVA_HOME/bin" export M2_HOME="/Applications/dev/apache-maven-3.5.3" export SCALA_HOME="/Users/xxxx/Documents/programs/scala-2.11.12"export SPARK_HOME="/Users/xxxx/Documents/programs/spark-2.4.3-bin-hadoop2.7"export PATH="$SPARK_HOME/bin:$SCALA_HOME/bin:$M2_HOME/bin:$PATH"

2、springbootmaven配置
UTF-8 2.3.3 2.6.4 6.3.2 org.apache.spark spark-sql_2.11 ${spark.version} org.apache.spark spark-core_2.11 ${spark.version} org.apache.spark spark-hive_2.11 ${spark.version} org.apache.hadoop hadoop-common ${hadoop.version} org.elasticsearch elasticsearch-spark-20_2.11 ${es.hadoop.version}

3、demo
private void demoSaveES() {String tableName = "user/docs"; SparkSession hiveSparkSession = null; IESDAC esDAC = SpringBootBeanUtils.getBean("esDAC", IESDAC.class); IDAC hiveDAC = SpringBootBeanUtils.getBean("hiveDAC", IDAC.class); ArrayList users = Lists.newArrayList( UserES.builder().id(1001L).userName("张三").userPhone("178133348888").build(), UserES.builder().id(1002L).userName("李四").userPhone("17979188888").build(), UserES.builder().id(1004L).userName("王五").userPhone("123003188888").build() ); try { hiveSparkSession = hiveDAC.createSparkSession(); Dataset hiveData = https://www.it610.com/article/hiveSparkSession.sql("SELECT * " + "FROM aa " + "LIMIT 1 "); hiveData.persist(StorageLevel.MEMORY_ONLY()).show(); Dataset esDataset = hiveData.map(new MapFunction() { @Override public UserES call(Row row) throws Exception { Long id = Long.valueOf(row.getAs("AA")); return UserES.builder() .id(id) .userName(row.getAs("bb")) .userPhone("13890909999") .build(); } }, Encoders.bean(UserES.class)); esDataset.show(); esDAC.save2DbWithMappingId(esDataset, tableName, "id"); } finally { hiveDAC.Stop(hiveSparkSession); }}

@Service("esDAC") public class EsDAC implements IESDAC {@Autowired private ESConfig esConfig; /** * 创建 Spark Session 对象 * * @return org.apache.spark.sql.SparkSession */ @Override public SparkSession createSparkSession() { SparkSession sparkSession = SparkSession.builder().appName("ESHadoop").master("local[3]") .config("es.nodes", esConfig.getHost()) //.config("es.nodes", esConfig.getClusterNodes()) .config("es.port", esConfig.getPort()) //.config("es.nodes.wan.only", "true") .config("es.index.auto.create", "true") .getOrCreate(); return sparkSession; }/** * 执行结果 * * @param sparkSession * @param sql * @return org.apache.spark.sql.Dataset*/ @Override public Dataset execSql(SparkSession sparkSession, String sql) { return null; }/** * 停止 Spark session ,释放资源 * * @param sparkSession * @return void */ @Override public void Stop(SparkSession sparkSession) {if (sparkSession == null) { return; } sparkSession.stop(); }@Override public int save2Db(Dataset dataFrame, SaveMode saveMode, String tableName) { save2ES(dataFrame, tableName, null); return 1; }private int save2ES(Dataset dataFrame, String tableName, Map mapParams) { if (mapParams == null || mapParams.size() <= 0) { JavaEsSparkSQL.saveToEs(dataFrame, tableName); } else { JavaEsSparkSQL.saveToEs(dataFrame, tableName, mapParams); } return 1; }@Override public int save2DbWithAppend(Dataset dataFrame, String tableName) { return save2Db(dataFrame, SaveMode.Append, tableName); }@Override public int save2DbWithMappingId(Dataset dataFrame, String tableName, String mappingId) {HashMap map = new HashMap<>(); if (StringUtils.isNotBlank(mappingId)) { map.put("es.mapping.id", mappingId); } map.put("es.nodes", esConfig.getClusterNodes()); map.put("es.port", esConfig.getPort()); JavaEsSparkSQL.saveToEs(dataFrame, tableName, map); return 1; } }


参考:https://www.programcreek.com/java-api-examples/?class=org.elasticsearch.spark.rdd.api.java.JavaEsSpark&method=saveToEsWithMeta
ES问题解决:Elasticsearch health check failed 参考:https://blog.csdn.net/CharlesYooSky/article/details/90405699

    推荐阅读