本文介绍springboot下 spark2 操作es6.
1、环境
springboot 2.1.3.RELEASE
es: elasticsearch 6.4.3
spark : 2.4.3
scala: 2.11.12
【Spring|springboot项目(spark 2 操作es6+)】本人MAC本相关环境变量配置如下:
JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_152.jdk/Contents/Home"
CLASS_PATH="$JAVA_HOME/lib"
PATH=".:$PATH:$JAVA_HOME/bin"
export M2_HOME="/Applications/dev/apache-maven-3.5.3"
export SCALA_HOME="/Users/xxxx/Documents/programs/scala-2.11.12"export SPARK_HOME="/Users/xxxx/Documents/programs/spark-2.4.3-bin-hadoop2.7"export PATH="$SPARK_HOME/bin:$SCALA_HOME/bin:$M2_HOME/bin:$PATH"
2、springbootmaven配置
UTF-8
2.3.3
2.6.4
6.3.2
org.apache.spark
spark-sql_2.11
${spark.version}
org.apache.spark
spark-core_2.11
${spark.version}
org.apache.spark
spark-hive_2.11
${spark.version}
org.apache.hadoop
hadoop-common
${hadoop.version}
org.elasticsearch
elasticsearch-spark-20_2.11
${es.hadoop.version}
3、demo
private void demoSaveES() {String tableName = "user/docs";
SparkSession hiveSparkSession = null;
IESDAC esDAC = SpringBootBeanUtils.getBean("esDAC", IESDAC.class);
IDAC hiveDAC = SpringBootBeanUtils.getBean("hiveDAC", IDAC.class);
ArrayList users = Lists.newArrayList(
UserES.builder().id(1001L).userName("张三").userPhone("178133348888").build(),
UserES.builder().id(1002L).userName("李四").userPhone("17979188888").build(),
UserES.builder().id(1004L).userName("王五").userPhone("123003188888").build()
);
try {
hiveSparkSession = hiveDAC.createSparkSession();
Dataset hiveData = https://www.it610.com/article/hiveSparkSession.sql("SELECT * " +
"FROM aa " +
"LIMIT 1 ");
hiveData.persist(StorageLevel.MEMORY_ONLY()).show();
Dataset esDataset = hiveData.map(new MapFunction() {
@Override
public UserES call(Row row) throws Exception {
Long id = Long.valueOf(row.getAs("AA"));
return UserES.builder()
.id(id)
.userName(row.getAs("bb"))
.userPhone("13890909999")
.build();
}
}, Encoders.bean(UserES.class));
esDataset.show();
esDAC.save2DbWithMappingId(esDataset, tableName, "id");
} finally {
hiveDAC.Stop(hiveSparkSession);
}}
@Service("esDAC")
public class EsDAC implements IESDAC {@Autowired
private ESConfig esConfig;
/**
* 创建 Spark Session 对象
*
* @return org.apache.spark.sql.SparkSession
*/
@Override
public SparkSession createSparkSession() {
SparkSession sparkSession = SparkSession.builder().appName("ESHadoop").master("local[3]")
.config("es.nodes", esConfig.getHost())
//.config("es.nodes", esConfig.getClusterNodes())
.config("es.port", esConfig.getPort())
//.config("es.nodes.wan.only", "true")
.config("es.index.auto.create", "true")
.getOrCreate();
return sparkSession;
}/**
* 执行结果
*
* @param sparkSession
* @param sql
* @return org.apache.spark.sql.Dataset*/
@Override
public Dataset execSql(SparkSession sparkSession, String sql) {
return null;
}/**
* 停止 Spark session ,释放资源
*
* @param sparkSession
* @return void
*/
@Override
public void Stop(SparkSession sparkSession) {if (sparkSession == null) {
return;
}
sparkSession.stop();
}@Override
public int save2Db(Dataset dataFrame, SaveMode saveMode, String tableName) {
save2ES(dataFrame, tableName, null);
return 1;
}private int save2ES(Dataset dataFrame, String tableName, Map mapParams) {
if (mapParams == null || mapParams.size() <= 0) {
JavaEsSparkSQL.saveToEs(dataFrame, tableName);
} else {
JavaEsSparkSQL.saveToEs(dataFrame, tableName, mapParams);
}
return 1;
}@Override
public int save2DbWithAppend(Dataset dataFrame, String tableName) {
return save2Db(dataFrame, SaveMode.Append, tableName);
}@Override
public int save2DbWithMappingId(Dataset dataFrame, String tableName, String mappingId) {HashMap map = new HashMap<>();
if (StringUtils.isNotBlank(mappingId)) {
map.put("es.mapping.id", mappingId);
}
map.put("es.nodes", esConfig.getClusterNodes());
map.put("es.port", esConfig.getPort());
JavaEsSparkSQL.saveToEs(dataFrame, tableName, map);
return 1;
}
}
参考:https://www.programcreek.com/java-api-examples/?class=org.elasticsearch.spark.rdd.api.java.JavaEsSpark&method=saveToEsWithMeta
ES问题解决:Elasticsearch health check failed 参考:https://blog.csdn.net/CharlesYooSky/article/details/90405699
推荐阅读
- 人工智能|干货!人体姿态估计与运动预测
- Python专栏|数据分析的常规流程
- =======j2ee|spring用注解实现注入的@resource,@autowired,@inject区别
- 读书笔记|《白话大数据和机器学习》学习笔记1
- jar|springboot项目打成jar包和war包,并部署(快速打包部署)
- 网络|一文彻底搞懂前端监控
- html5|各行业工资单出炉 IT类连续多年霸占“榜首”位置
- 人工智能|【机器学习】深度盘点(详细介绍 Python 中的 7 种交叉验证方法!)
- 网络|简单聊聊压缩网络
- 数据库|效率最高的Excel数据导入---(c#调用SSIS Package将数据库数据导入到Excel文件中【附源代码下载】)...