12.spark|12.spark sql之读写数据

简介 ??Spark SQL支持多种结构化数据源,轻松从各种数据源中读取Row对象。这些数据源包括Parquet、JSON、Hive表及关系型数据库等。
??当只使用一部分字段时,Spark SQL可以智能地只扫描这些字段,而不会像hadoopFile方法一样简单粗暴地扫描全部数据。
Parquet ??Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。Parquet自动保存原始数据的类型,当写入Parquet文件时,所有的列会自动转为可空约束。

  • scala
// Encoders for most common types are automatically provided by importing spark.implicits._ import spark.implicits._val peopleDF = spark.read.json("examples/src/main/resources/people.json")// DataFrames can be saved as Parquet files, maintaining the schema information peopleDF.write.parquet("people.parquet")// Read in the parquet file created above // Parquet files are self-describing so the schema is preserved // The result of loading a Parquet file is also a DataFrame val parquetFileDF = spark.read.parquet("people.parquet")// Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show() // +------------+ // |value| // +------------+ // |Name: Justin| // +------------+

  • java
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; Dataset peopleDF = spark.read().json("examples/src/main/resources/people.json"); // DataFrames can be saved as Parquet files, maintaining the schema information peopleDF.write().parquet("people.parquet"); // Read in the Parquet file created above. // Parquet files are self-describing so the schema is preserved // The result of loading a parquet file is also a DataFrame Dataset parquetFileDF = spark.read().parquet("people.parquet"); // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile"); Dataset namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19"); Dataset namesDS = namesDF.map( (MapFunction) row -> "Name: " + row.getString(0), Encoders.STRING()); namesDS.show(); // +------------+ // |value| // +------------+ // |Name: Justin| // +------------+

  • python
peopleDF = spark.read.json("examples/src/main/resources/people.json")# DataFrames can be saved as Parquet files, maintaining the schema information. peopleDF.write.parquet("people.parquet")# Read in the Parquet file created above. # Parquet files are self-describing so the schema is preserved. # The result of loading a parquet file is also a DataFrame. parquetFile = spark.read.parquet("people.parquet")# Parquet files can also be used to create a temporary view and then used in SQL statements. parquetFile.createOrReplaceTempView("parquetFile") teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.show() # +------+ # |name| # +------+ # |Justin| # +------+

  • sql
CREATE TEMPORARY VIEW parquetTable USING org.apache.spark.sql.parquet OPTIONS ( path "examples/src/main/resources/people.parquet" )SELECT * FROM parquetTable

JSON ??Spark SQL可以自动推断JSON数据集的结构,并加载为以Row为集合项的Dataset。
??默认Spark SQL读取的json文件不是常规的json文件,每一行必须包含一个独立的、自包含的有效JSOn对象。对于常规的多行JSON文件,设置multiLine选项为true即可。
  • scala
// Primitive types (Int, String, etc) and Product types (case classes) encoders are // supported by importing this when creating a Dataset. import spark.implicits._// A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files val path = "examples/src/main/resources/people.json" val peopleDF = spark.read.json(path)// The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() // root //|-- age: long (nullable = true) //|-- name: string (nullable = true)// Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people")// SQL statements can be run by using the sql methods provided by spark val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() // +------+ // |name| // +------+ // |Justin| // +------+// Alternatively, a DataFrame can be created for a JSON dataset represented by // a Dataset[String] storing one JSON object per string val otherPeopleDataset = spark.createDataset( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleDataset) otherPeople.show() // +---------------+----+ // |address|name| // +---------------+----+ // |[Columbus,Ohio]| Yin| // +---------------+----+

  • java
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files Dataset people = spark.read().json("examples/src/main/resources/people.json"); // The inferred schema can be visualized using the printSchema() method people.printSchema(); // root //|-- age: long (nullable = true) //|-- name: string (nullable = true)// Creates a temporary view using the DataFrame people.createOrReplaceTempView("people"); // SQL statements can be run by using the sql methods provided by spark Dataset namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19"); namesDF.show(); // +------+ // |name| // +------+ // |Justin| // +------+// Alternatively, a DataFrame can be created for a JSON dataset represented by // a Dataset storing one JSON object per string. List jsonData = https://www.it610.com/article/Arrays.asList("{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); Dataset anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING()); Dataset anotherPeople = spark.read().json(anotherPeopleDataset); anotherPeople.show(); // +---------------+----+ // |address|name| // +---------------+----+ // |[Columbus,Ohio]| Yin| // +---------------+----+

  • python
# spark is from the previous example. sc = spark.sparkContext# A JSON dataset is pointed to by path. # The path can be either a single text file or a directory storing text files path = "examples/src/main/resources/people.json" peopleDF = spark.read.json(path)# The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() # root #|-- age: long (nullable = true) #|-- name: string (nullable = true)# Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people")# SQL statements can be run by using the sql methods provided by spark teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() # +------+ # |name| # +------+ # |Justin| # +------+# Alternatively, a DataFrame can be created for a JSON dataset represented by # an RDD[String] storing one JSON object per string jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'] otherPeopleRDD = sc.parallelize(jsonStrings) otherPeople = spark.read.json(otherPeopleRDD) otherPeople.show() # +---------------+----+ # |address|name| # +---------------+----+ # |[Columbus,Ohio]| Yin| # +---------------+----+

  • sql
CREATE TEMPORARY VIEW jsonTable USING org.apache.spark.sql.json OPTIONS ( path "examples/src/main/resources/people.json" )SELECT * FROM jsonTable

Hive ??Spark SQL支持任何Hive支持的存储格式(SerDe),包括文本文件、RCFiles、ORC、Parquet、Avro及Protocol Buffer等。
??如果已配置好hive环境,将hive-site.xml,core-site.xml(用于安全配置),hdfs-site.xml(HDFS配置)放到conf目录下;如果没有hive环境,Spark SQL会自动在spark-warehouse(spark.sql.warehouse.dir配置项)目录下创建metastore_db。另外,需要赋予执行spark应用的用户写权限。
  • scala
import java.io.Fileimport org.apache.spark.sql.Row import org.apache.spark.sql.SparkSessioncase class Record(key: Int, value: String)// warehouseLocation points to the default location for managed databases and tables val warehouseLocation = new File("spark-warehouse").getAbsolutePathval spark = SparkSession .builder() .appName("Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate()import spark.implicits._ import spark.sqlsql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")// Queries are expressed in HiveQL sql("SELECT * FROM src").show() // +---+-------+ // |key|value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ...// Aggregation queries are also supported. sql("SELECT COUNT(*) FROM src").show() // +--------+ // |count(1)| // +--------+ // |500 | // +--------+// The results of SQL queries are themselves DataFrames and support all normal functions. val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")// The items in DataFrames are of type Row, which allows you to access each column by ordinal. val stringsDS = sqlDF.map { case Row(key: Int, value: String) => s"Key: $key, Value: $value" } stringsDS.show() // +--------------------+ // |value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ...// You can also use DataFrames to create temporary views within a SparkSession. val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) recordsDF.createOrReplaceTempView("records")// Queries can then join DataFrame data with data stored in Hive. sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // |2| val_2|2| val_2| // |4| val_4|4| val_4| // |5| val_5|5| val_5| // ...

  • java
import java.io.File; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public static class Record implements Serializable { private int key; private String value; public int getKey() { return key; }public void setKey(int key) { this.key = key; }public String getValue() { return value; }public void setValue(String value) { this.value = https://www.it610.com/article/value; } }// warehouseLocation points to the default location for managed databases and tables String warehouseLocation = new File("spark-warehouse").getAbsolutePath(); SparkSession spark = SparkSession .builder() .appName("Java Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate(); spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive"); spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL spark.sql("SELECT * FROM src").show(); // +---+-------+ // |key|value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ...// Aggregation queries are also supported. spark.sql("SELECT COUNT(*) FROM src").show(); // +--------+ // |count(1)| // +--------+ // |500 | // +--------+// The results of SQL queries are themselves DataFrames and support all normal functions. Dataset sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key"); // The items in DataFrames are of type Row, which lets you to access each column by ordinal. Dataset stringsDS = sqlDF.map( (MapFunction) row -> "Key: " + row.get(0) + ", Value: " + row.get(1), Encoders.STRING()); stringsDS.show(); // +--------------------+ // |value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ...// You can also use DataFrames to create temporary views within a SparkSession. List records = new ArrayList<>(); for (int key = 1; key < 100; key++) { Record record = new Record(); record.setKey(key); record.setValue("val_" + key); records.add(record); } Dataset recordsDF = spark.createDataFrame(records, Record.class); recordsDF.createOrReplaceTempView("records"); // Queries can then join DataFrames data with data stored in Hive. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show(); // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // |2| val_2|2| val_2| // |2| val_2|2| val_2| // |4| val_4|4| val_4| // ...

  • python
from os.path import expanduser, join, abspathfrom pyspark.sql import SparkSession from pyspark.sql import Row# warehouse_location points to the default location for managed databases and tables warehouse_location = abspath('spark-warehouse')spark = SparkSession \ .builder \ .appName("Python Spark SQL Hive integration example") \ .config("spark.sql.warehouse.dir", warehouse_location) \ .enableHiveSupport() \ .getOrCreate()# spark is an existing SparkSession spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")# Queries are expressed in HiveQL spark.sql("SELECT * FROM src").show() # +---+-------+ # |key|value| # +---+-------+ # |238|val_238| # | 86| val_86| # |311|val_311| # ...# Aggregation queries are also supported. spark.sql("SELECT COUNT(*) FROM src").show() # +--------+ # |count(1)| # +--------+ # |500 | # +--------+# The results of SQL queries are themselves DataFrames and support all normal functions. sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")# The items in DataFrames are of type Row, which allows you to access each column by ordinal. stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value)) for record in stringsDS.collect(): print(record) # Key: 0, Value: val_0 # Key: 0, Value: val_0 # Key: 0, Value: val_0 # ...# You can also use DataFrames to create temporary views within a SparkSession. Record = Row("key", "value") recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)]) recordsDF.createOrReplaceTempView("records")# Queries can then join DataFrame data with data stored in Hive. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() # +---+------+---+------+ # |key| value|key| value| # +---+------+---+------+ # |2| val_2|2| val_2| # |4| val_4|4| val_4| # |5| val_5|5| val_5| # ...

JDBC连接 【12.spark|12.spark sql之读写数据】??Spark SQL可以使用JDBC连接读写关系型数据库中的数据。这种方式比使用spark core中的JdbcRDD要好,因为生成的DataFrame可以很容易被处理。
  • scala
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods // Loading data from a JDBC source val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load()val connectionProperties = new Properties() connectionProperties.put("user", "username") connectionProperties.put("password", "password") val jdbcDF2 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)// Saving data to a JDBC source jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save()jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)// Specifying create table column data types on write jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

  • java
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods // Loading data from a JDBC source Dataset jdbcDF = spark.read() .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load(); Properties connectionProperties = new Properties(); connectionProperties.put("user", "username"); connectionProperties.put("password", "password"); Dataset jdbcDF2 = spark.read() .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties); // Saving data to a JDBC source jdbcDF.write() .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save(); jdbcDF2.write() .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties); // Specifying create table column data types on write jdbcDF.write() .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

  • python
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods # Loading data from a JDBC source jdbcDF = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .load()jdbcDF2 = spark.read \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"})# Saving data to a JDBC source jdbcDF.write \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .save()jdbcDF2.write \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"})# Specifying create table column data types on write jdbcDF.write \ .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"})

  • sql
CREATE TEMPORARY VIEW jdbcTable USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:postgresql:dbserver", dbtable "schema.tablename", user 'username', password 'password' )INSERT INTO TABLE jdbcTable SELECT * FROM resultTable

    推荐阅读