spark读写ES

【spark读写ES】直接上代码:

package sparkUtilimport org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.elasticsearch.spark.sql.{EsSparkSQL, SparkDataFrameFunctions}object SparkES { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.set("es.nodes", "127.0.0.1") conf.set("es.port", "9200") val sparkSession = SparkSession.builder().master("local[*]").appName("sparkSQLcrudES").config(conf).getOrCreate()var json ="""[{"id":"1aa","title":"华为","category":"手机","brand":"华为", "price":"3500", "images":"www.jd.com/123123.jpg"},{"id":"2aaa","title":"华为","category":"手机","brand":"华为", "price":"5800", "images":"www.jd.com/asda.jpg"}]""" import sparkSession.implicits._ var dataset = sparkSession.createDataset(s"$json" :: Nil) var df = sparkSession.read.json(dataset)/* *第一种方式,第一个dataframe 第二个形参格式 _index/_type */EsSparkSQL.saveToEs(df, "item/docs") EsSparkSQL.saveToEs(df, "", Map( "es.mapping.id" -> "datapool_seq_id" , "es.index.auto.create" -> "true" ) )/** * 第二种,底层还是调方法一 */ new SparkDataFrameFunctions(df).saveToEs( "index/type", Map( "es.mapping.id" -> "datapool_seq_id" , "es.index.auto.create" -> "true" , "es.nodes" -> "127.0.0.1" , "es.batch.size.entries" -> "200" , "es.port" -> "9200" ) )sparkSession.close() } }


    推荐阅读