Spark - RDD使用

弹性分布式数据集(Resilient Distributed Dataset,RDD)是 Spark 中的核心概念。简单的说,就是在Spark中创建一个RDD,然后通过RDD对数据进行各种各样的操作。
RDD创建 RDD的创建包括:

  1. 从集合中创建
  2. 从其他存储(比如hdfs、本地文件等)创建
  3. 从其他RDD创建
下面通过这几种方式来创建RDD,在创建RDD之前需要创建SparkContext,从下面的例子可以看出SparkContext需要一个conf,这个conf的master属性,在前面两张已经提过了。
从集合中创建有两个方法,parallelize以及makeRDD,makeRDD实际上调用的是parallelize方法,这两个方法都允许除了集合外,还可以再传递一个分区的参数。
从其他存储包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等,这里的文件可以用通配符,而不限定某一个文件,比如textFile("/my/directory/*.txt"),就是/my/directory下所有的txt文件。
object CreateRDD { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("CreateRDD").setMaster("local") val sc = new SparkContext(conf) // parallelize val data = https://www.it610.com/article/Array(1, 2, 3, 4, 5) val rdd1: RDD[Int] = sc.parallelize(data) println(rdd1.count)val rdd2: RDD[Int] = sc.makeRDD(data) // makeRDD println(rdd2.count)val rdd3: RDD[String] = sc.textFile("D:\\data\\test.txt") // 从其他存储 println(rdd3.first) // 从其他RDD val rdd4: RDD[String] = rdd3.map(_ + "bbb") println(rdd4.first) // 关闭 sc.stop } }

RDD的操作 RDD支持两种操作:transformations和action。
transformations从字面上理解,就是转换,可以说从一个数据集转换成另外一个数据集,这里需要注意的是,所有的transformations都是lazy的,也就是说他尽管执行了我们那行语句,比如map映射,他并不会进行map映射,只有触发到action的时候,才会进行计算。
action,会触发RDD的计算并把最终结果返回给驱动程序。
Transformations map
map就是映射,也就是说,传入一个值,把他变成另外一个值。
比如下面的例子中,把1,2,3,4,5变成2,4,6,8,10,传入1把他变成2,传入2把他变成4。
object MapRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("MapRDD").setMaster("local")) val data = https://www.it610.com/article/Array(1, 2, 3, 4, 5) val rdd1: RDD[Int] = sc.parallelize(data) val mapRdd = rdd1.map(_ * 2) mapRdd.foreach(println) // 传入1把他变成2,传入2把他变成4 val mapRdd2 = rdd1.map(x => x * 2) mapRdd2.foreach(println) } }

filter
filter就是过滤,你给我一个值,我计算后返回true或false,如果返回的是false,则不会在新的集合中。
比如下面的例子中,偶数的才返回true,所以最后打印的是2,4
object FilterRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("FilterRDD").setMaster("local")) val data = https://www.it610.com/article/Array(1, 2, 3, 4, 5) val rdd1: RDD[Int] = sc.parallelize(data) val rdd2: RDD[Int] = rdd1.filter(x => x % 2 == 0) rdd2.foreach(println) } }

flatMap
flatMap就是压平,你给我一个值,我给你零个或者多个值。
比如下面的例子中,你给我"a b",我通过split后,给你a和b两个值。
object FlatMapRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("FlatMapRDD ").setMaster("local")) val data = https://www.it610.com/article/Array("a b", "c d") val rdd1: RDD[String] = sc.parallelize(data) val rdd2: RDD[String] = rdd1.flatMap(_.split(" ")) rdd2.foreach(println) } }

mapPartitions
类似于map,在RDD的每个分区(块)上单独运行。当因此运行在类型为T的RDD上,返回值,func的类型必须是Iterator => Iterator
比如下面的例子中,你给我x,我返回的是x.map(_ * 2)。
object MapPartitionsRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("MapPartitionsRDD").setMaster("local")) val data = https://www.it610.com/article/Array(1, 2, 3, 4, 5) val rdd1: RDD[Int] = sc.parallelize(data) val mapRdd = rdd1.mapPartitions(x => x.map(_ * 2)) mapRdd.foreach(println) } }

mapPartitionsWithIndex
类似于mapPartitions,为func提供了一个表示分区索引的整数值。
也就是说,我们可以通过传入的参数index,获取到对应的分区。
object MapPartitionsWithIndexRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("MapPartitionsWithIndexRDD").setMaster("local")) val data = https://www.it610.com/article/Array(1, 2, 3, 4, 5) val rdd1: RDD[Int] = sc.parallelize(data, 2) val mapRdd = rdd1.mapPartitionsWithIndex((index, x) => x.map(index + "," + _)) mapRdd.foreach(println) } }

sample sample就是采样,比如100个随机拿出30个。
sample参数有三个:
  • withReplacement:元素可以被多次采样(采样时替换)
  • fraction:样本的期望大小作为RDD大小的一部分。当withReplacement为true时,fraction为每个元素的期望次数,数值必须大于0;当withReplacement为false时,fraction为每个元素的期望概率,数值为0到1。
  • seed:随机数生成器的种子
object SampleRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("SampleRDD").setMaster("local")) val data = https://www.it610.com/article/1 to 100 val rdd1: RDD[Int] = sc.parallelize(data) val rdd2: RDD[Int] = rdd1.sample(true, 0.5, 1) println(rdd2.count) } }

union
union就是并集,把两个rdd合并起来。
比如下面的例子中,把rdd1和rdd2合并了。
object UnionRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("UnionRDD").setMaster("local")) val data1 = Array(1, 2, 3, 4, 5) val data2 = Array(6, 7, 8, 9, 10) val rdd1: RDD[Int] = sc.parallelize(data1) val rdd2: RDD[Int] = sc.parallelize(data2) val rdd3 = rdd1.union(rdd2) rdd3.foreach(println) } }

intersection
【Spark - RDD使用】intersection就是交集,取2个rdd相同的部分。
比如下面的例子中,rdd1和rdd2的交集是3,4,5。
object IntersectionRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("IntersectionRDD").setMaster("local")) val data1 = Array(1, 2, 3, 4, 5) val data2 = Array(3, 4, 5, 6, 7) val rdd1: RDD[Int] = sc.parallelize(data1) val rdd2: RDD[Int] = sc.parallelize(data2) val rdd3 = rdd1.intersection(rdd2) rdd3.foreach(println) } }

distinct
distinct就是过滤掉重复的部分。
比如下面的例子中,1,2,3有重复,最后打印的是1,2,3,4,5。
object DistinctRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("DistinctRDD").setMaster("local")) val data = https://www.it610.com/article/Array(1, 2, 3, 4, 5, 1, 2, 3) val rdd1: RDD[Int] = sc.parallelize(data) val rdd2: RDD[Int] = rdd1.distinct rdd2.foreach(println) } }

groupByKey
groupByKey就是通过key进行分组,如果没有key的话,需要keyBy方法指定key。
比如下面的例子中,把模2的结果当做key,并返回一个新的rdd。
在foreach中,可以看到参数包括key以及iterator。
object GroupByKeyRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("GroupByKeyRDD").setMaster("local")) val data1 = Array(1, 2, 3, 4, 5) val rdd1: RDD[Int] = sc.parallelize(data1) val rdd2: RDD[(Int, Int)] = rdd1.keyBy(_ % 2) rdd2.groupByKey().foreach(x => { val iterator = x._2.iterator var num = "" while (iterator.hasNext) num = num + iterator.next + " " println(x._1 + ":" + num) }) val data2 = Array(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5)) val rdd3: RDD[(String, Int)] = sc.parallelize(data2) rdd3.groupByKey().foreach(x => { val iterator = x._2.iterator var num = "" while (iterator.hasNext) num = num + iterator.next + " " println(x._1 + ":" + num) }) } }

reduceByKey
reduceByKey会兑相同的key进行计算。
比如下面的例子中,最后的结果是(a,9)、(b,6)
object ReduceByKeyRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("ReduceByKeyRDD").setMaster("local")) val data = https://www.it610.com/article/Array(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5)) val rdd1: RDD[(String, Int)] = sc.parallelize(data) val rdd2: RDD[(String, Int)] = rdd1.reduceByKey(_ + _) rdd2.foreach(println) } }

aggregateByKey
先按分区聚合,最后再进行聚合。
代码如下:
object AggregateByKeyRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("AggregateByKeyRDD").setMaster("local")) val data = https://www.it610.com/article/Array(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("a", 5), ("a", 6), ("b", 7), ("b", 8)) val rdd1: RDD[(String, Int)] = sc.parallelize(data, 2) val rdd2: RDD[(String, Int)] = rdd1.aggregateByKey(3)(math.max(_, _), _ + _) rdd2.foreach(println) } }

这个代码我们用下图来说明一下。
  1. 把集合进行分区
  2. 根据key进行分组
  3. 分组后进行max计算,也就是上面aggregateByKey后的第一个函数,这里的max有两个值,一个是aggregateByKey后面的3,另外一个就是key对应的value,比如a对应的值是1,2,这两个值和3 求最大值,得到的3,所以结果就是a,3。
  4. 两个分区计算后的结果进行相加,也就是上面aggregateByKey后的第二个函数。所以最后的结果就是(b,12)、(a,9)
Spark - RDD使用
文章图片

sortByKey
sortByKey就是根据key进行排序,第一个参数如果是true就是升序,也是默认值,false就是降序。
object SortByKeyRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("SortByKeyRDD").setMaster("local")) val data = https://www.it610.com/article/Array(("a", 1), ("c", 3), ("d", 4), ("b", 2)) val rdd1: RDD[(String,Int)] = sc.parallelize(data) val rdd2: RDD[(String, Int)] = rdd1.sortByKey() rdd2.foreach(println) } }

join
类似mysql的join
object JoinRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("JoinRDD").setMaster("local")) val data1 = Array(("a", 1), ("c", 3), ("d", 4), ("b", 2)) val data2 = Array(("a", 11), ("d", 14), ("e", 25)) val rdd1: RDD[(String, Int)] = sc.parallelize(data1) val rdd2: RDD[(String, Int)] = sc.parallelize(data2) val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2) val rdd4: RDD[(String, (Option[Int], Option[Int]))] = rdd1.fullOuterJoin(rdd2) val rdd5: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2) val rdd6: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd2) // 相当于mysql的INNER JOIN // 结果: // (d,(4,14)) // (a,(1,11)) println("rdd3:join") rdd3.foreach(println) println("rdd4:fullOuterJoin") // 返回左右数据集的全部数据,左右有一边不存在的数据以None填充 // 结果: // (d,(Some(4),Some(14))) //(e,(None,Some(25))) // (a,(Some(1),Some(11))) //(b,(Some(2),None)) // (c,(Some(3),None)) rdd4.foreach(println) // 相当于mysql的LEFT JOIN // 结果: // (d,(4,Some(14))) // (a,(1,Some(11))) // (b,(2,None)) // (c,(3,None)) println("rdd5:leftOuterJoin") rdd5.foreach(println) // 相当于mysql的RIGHT JOIN // 结果: // (d,(Some(4),14)) // (e,(None,25)) // (a,(Some(1),11)) println("rdd6:rightOuterJoin") rdd6.foreach(println) } }

cogroup
对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。
比如下面的例子中,对rdd1的a,拿到的结果是(1, 3),对于rdd2的a,拿到的结果是(23),所以最终的结果就是(a,(CompactBuffer(1, 3),CompactBuffer(23)))。
object CogroupRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("CogroupRDD").setMaster("local")) val data1 = Array(("a", 1), ("a", 3),("b", 2)) val data2 = Array(("b", 24), ("a", 23),("b", 22)) val rdd1: RDD[(String,Int)] = sc.parallelize(data1) val rdd2: RDD[(String,Int)] = sc.parallelize(data2) val rdd3: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2) // 结果: // (a,(CompactBuffer(1, 3),CompactBuffer(23))) // (b,(CompactBuffer(2),CompactBuffer(24, 22))) rdd3.foreach(println) } }

cartesian
就是笛卡尔。
object CartesianRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("CartesianRDD").setMaster("local")) val data1 = Array(1, 2) val data2 = Array(3, 4) val rdd1: RDD[Int] = sc.parallelize(data1) val rdd2: RDD[Int] = sc.parallelize(data2) val rdd3: RDD[(Int, Int)] = rdd1.cartesian(rdd2) // (1,3) // (1,4) // (2,3) // (2,4) rdd3.foreach(println) } }

action reduce
根据传入的函数,对每个值进行计算
object ReduceRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("ReduceRDD").setMaster("local")) val data = https://www.it610.com/article/Array(1, 2, 3, 4, 5) val rdd1: RDD[Int] = sc.parallelize(data) val result: Int = rdd1.reduce(_ + _) println(result) } }

collect
拿到rdd的集合给driver端,通常确认结果比较少的时候用。
object CollectRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("CollectRDD").setMaster("local")) val data = https://www.it610.com/article/Array(1, 2, 3, 4, 5) val rdd1: RDD[Int] = sc.parallelize(data) val result: Array[Int] = rdd1.collect() println(result.mkString(",")) } }

count
拿到rdd的集合数量。
object CountRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("CountRDD").setMaster("local")) val data = https://www.it610.com/article/Array(1, 2, 3, 4, 5) val rdd1: RDD[Int] = sc.parallelize(data) val result: Long = rdd1.count() println(result) } }

first
拿到rdd集合的第一个元素。
object FirstRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("FirstRDD").setMaster("local")) val data = https://www.it610.com/article/Array(1, 2, 3, 4, 5) val rdd1: RDD[Int] = sc.parallelize(data) val result: Int = rdd1.first() println(result) } }

take
拿到前面n个元素,比如下面例子中,传的是3,所以拿到是1,2,3。
object TakeRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("TakeRDD").setMaster("local")) val data = https://www.it610.com/article/Array(1, 2, 3, 4, 5) val rdd1: RDD[Int] = sc.parallelize(data) val result: Array[Int] = rdd1.take(3) println(result.mkString(",")) } }

takeSample
随机抽取n个元素。
object TakeSampleRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("TakeSampleRDD").setMaster("local")) val data = https://www.it610.com/article/1 to 100 val rdd1: RDD[Int] = sc.parallelize(data) val result: Array[Int] = rdd1.takeSample(true, 5, 1) println(result.mkString(",")) } }

takeOrdered
排序后取N个元素。
object TakeOrderedRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("TakeOrderedRDD").setMaster("local")) val data = https://www.it610.com/article/1 to 100 val rdd1: RDD[Int] = sc.parallelize(data) val result: Array[Int] = rdd1.takeOrdered(5) println(result.mkString(",")) } }

countByKey
计算每个key出现的次数
object CountByKeyRDD { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("CountByKeyRDD").setMaster("local")) val data = https://www.it610.com/article/Array(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5)) val rdd1: RDD[(String, Int)] = sc.parallelize(data) val result: collection.Map[String, Long] = rdd1.countByKey() result.foreach(item => println(item._1 + ":" + item._2)) } }

foreach
遍历,比如上面很多例子的打印。

    推荐阅读