数据读取与保存

1. 概览 Spark支持以下三类常见的数据源:

  • 文件格式与文件系统:
    对于存储在本地文件系统或分布式文件系统(比如 NFS、HDFS、Amazon S3 等)中的文件格式,包括文本文件、JSON、SequenceFile,以及 protocol buffer,Saprk都支持。
  • 结构化数据源:
    这里主要指Saprk SQL支持的包括JSON,Apache Hive在内的结构化数据源。
  • 数据库,键值存储:
    Spark 自带的库和一些第三方库可以用来连接 Cassandra、HBase、Elasticsearch 以及 JDBC 源。
2. 文件格式数据源 Spark 会根据文件扩展名选择对应的处理方式,下表是Spark支持的一些常见格式:
数据读取与保存
文章图片

(1)文本文件 在 Spark 中读写文本文件很容易。
  • 当我们将一个文本文件读取为 RDD 时,输入的每一行都会成为 RDD 的一个元素。
  • 也可以将多个完整的文本文件一次性读取为一个 pair RDD,其中键是文件名,值是文件内容。


读取文本文件:
【数据读取与保存】只需要使用文件路径作为参数调用 SparkContext 中的 textFile()函数,就可以读取一个文本文件。如果要控制分区数的话,可以指定 minPartitions
在 Python 中读取一个文本文件:
input = sc.textFile("file:///home/holden/repos/spark/README.md")

在 Scala 中读取一个文本文件:
val input = sc.textFile("file:///home/holden/repos/spark/README.md")

如果你想读取一个目录中的所有文件,你可以使用两种方式读取:
  1. 使用textFile函数: 它以目录为参数,把所有文件合并起来,返回一个RDD。
  2. 使用wholeTextFiles()方法: 它也以目录为参数,返回一个 pair RDD,其中key是输入文件的文件名,value是对应文件的所有数据,size为文件个数。wholeTextFiles() 在每个文件表示一个特定时间段内的数据时非常有用。比如文件表示不同阶段销售数据,则可以很容易地求出每个阶段的平均值。
看一个例子
我的/home/brownwong/Desktop/Test目录下有两个文件1.txt, 2.txt 各自只有三行数据,内容如下:# 1.txt Brown HAHA HIHI# 2.txt Wong GUN DAN

Scala Code
scala> val input1 = sc.textFile("/home/brownwong/Desktop/Test") input1: org.apache.spark.rdd.RDD[String] = /home/brownwong/Desktop/Test MapPartitionsRDD[1] at textFile at :21scala> input1.collect res0: Array[String] = Array(Brown, HAHA, HIHI, Wong, GUN, DAN)scala> val input2 = sc.wholeTextFiles("/home/brownwong/Desktop/Test") input2: org.apache.spark.rdd.RDD[(String, String)] = /home/brownwong/Desktop/Test WholeTextFileRDD[2] at wholeTextFiles at :21scala> input2.collect res1: Array[(String, String)] = Array((file:/home/brownwong/Desktop/Test/1.txt,"Brown HAHA HIHI "), (file:/home/brownwong/Desktop/Test/2.txt,"Wong GUN DAN "))scala> input2.collect.size res3: Int = 2

注意:
Spark 还支持在输入路径中使用通配字符(如 part-*.txt )。


保存文本文件
输出文本文件也相当简单。 saveAsTextFile() 方法接收一个目录路径,并将RDD 中的内容都输出到路径对应的文件中。Spark 会在那个目录下输出多个文件。这样,Spark 就可以从多个节点上并行输出了。在这个方法中,我们不能控制数据的哪一部分输出到哪个文件中。
Code Example:
result.saveAsTextFile(outputDirectory)

(2)JSON JSON 是一种使用较广的半结构化数据格式。读取 JSON 数据的最简单的方式是将数据作为文本文件读取,然后使用 JSON 解析器来对 RDD 中的值进行映射操作。类似地,可以使用 JSON 序列化库来将数据转为字符串,然后将其写出去。
在 Java 和 Scala 中也可以使用一个自定义 Hadoop 格式来操作 JSON 数据。(后面还会展示如何使用Spark SQL 读取 JSON 数据)


读取JSON
将数据作为文本文件读取,然后对 JSON 数据进行解析,这样的方法可以在所有支持的编程语言中使用。这种方法假设文件中的每一行都是一条 JSON 记录。如果你有跨行的JSON 数据,你就只能读入整个文件,然后对每个文件进行解析。如果在你使用的语言中构建一个 JSON 解析器的开销较大,你可以使用 mapPartitions() 来重用解析器(??)。
Python,Scala,Java都有大量可用的 JSON 库,这里只介绍一种。对于python,我们使用内置库json,对于Scala和Java,我们使用Jackson

    推荐阅读