spark读写ES
【spark读写ES】直接上代码:
package sparkUtilimport org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.sql.{EsSparkSQL, SparkDataFrameFunctions}object SparkES {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.set("es.nodes", "127.0.0.1")
conf.set("es.port", "9200")
val sparkSession = SparkSession.builder().master("local[*]").appName("sparkSQLcrudES").config(conf).getOrCreate()var json ="""[{"id":"1aa","title":"华为","category":"手机","brand":"华为", "price":"3500", "images":"www.jd.com/123123.jpg"},{"id":"2aaa","title":"华为","category":"手机","brand":"华为", "price":"5800", "images":"www.jd.com/asda.jpg"}]"""
import sparkSession.implicits._
var dataset = sparkSession.createDataset(s"$json" :: Nil)
var df = sparkSession.read.json(dataset)/*
*第一种方式,第一个dataframe 第二个形参格式 _index/_type
*/EsSparkSQL.saveToEs(df, "item/docs")
EsSparkSQL.saveToEs(df, "",
Map(
"es.mapping.id" -> "datapool_seq_id"
, "es.index.auto.create" -> "true"
)
)/**
* 第二种,底层还是调方法一
*/
new SparkDataFrameFunctions(df).saveToEs(
"index/type",
Map(
"es.mapping.id" -> "datapool_seq_id"
, "es.index.auto.create" -> "true"
, "es.nodes" -> "127.0.0.1"
, "es.batch.size.entries" -> "200"
, "es.port" -> "9200"
)
)sparkSession.close()
}
}
推荐阅读
- 宽容谁
- 我要做大厨
- 增长黑客的海盗法则
- 画画吗()
- 2019-02-13——今天谈梦想()
- 远去的风筝
- 三十年后的广场舞大爷
- 叙述作文
- 20190302|20190302 复盘翻盘
- 学无止境,人生还很长