RDD的详解、创建及其操作

RDD的详解 RDD的详解、创建及其操作
文章图片

RDD的详解、创建及其操作
文章图片

RDD:弹性分布式数据集,是Spark中最基本的数据抽象,用来表示分布式集合,支持分布式操作!
RDD的创建 RDD中的数据可以来源于2个地方:本地集合或外部数据源
RDD的详解、创建及其操作
文章图片

RDD操作 分类 【RDD的详解、创建及其操作】RDD的详解、创建及其操作
文章图片

转换算子
RDD的详解、创建及其操作
文章图片

Map

import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object Demo03Map { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setAppName("Demo03Map").setMaster("local") val sc: SparkContext = new SparkContext(conf) //读取文件数据 val linesRDD: RDD[String] = sc.textFile("spark/data/words.txt") //对数据进行扁平化处理 val flatRDD: RDD[String] = linesRDD.flatMap(_.split(","))//按照单词分组 val groupRDD: RDD[(String, Iterable[String])] = flatRDD.groupBy(w => w) //聚合 val wordsRDD: RDD[String] = groupRDD.map(kv => { val key: String = kv._1 val words: Iterable[String] = kv._2 key + "," + words.size })//分组+聚合 val mapRDD1: RDD[(String, Int)] = flatRDD.map((_, 1)) val words1: RDD[(String, Int)] = mapRDD1.reduceByKey(_ + _)////分组+聚合 val mapRDD2: RDD[(String, Int)] = flatRDD.map((_, 1)) val words2: RDD[(String, Iterable[Int])] = mapRDD2.groupByKey() val wordSum: RDD[(String, Int)] = words2.mapValues(_.size) wordSum.foreach(println)//输出 wordsRDD.foreach(println) words1.foreach(println) } }

flatMap(数据扁平化处理)
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object Demo04FlatMap { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Demo04FlatMap").setMaster("local") val sc: SparkContext = new SparkContext(conf)val linesRDD: RDD[String] = sc.parallelize(List("java,scala,python", "map,java,scala")) //扁平化处理 val flatRDD: RDD[String] = linesRDD.flatMap(_.split(",")) flatRDD.foreach(println) } }

Mappartitions
map和mapPartitions区别
1)map:每次处理一条数据
2)mapPartitions:每次处理一个分区数据
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object Demo05MapPartition { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local") val sc: SparkContext = new SparkContext(conf) val stuRDD: RDD[String] = sc.textFile("spark/data/words.txt",3) stuRDD.mapPartitions(rdd => { println("map partition") // 按分区去处理数据 rdd.map(line => line.split(",")(1)) }).foreach(println) } }

fliter 过滤
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object Demo06Filter { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local") val sc: SparkContext = new SparkContext(conf) val linesRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5)) //过滤,转换算子 linesRDD.filter(kv => { kv % 2 == 1 }).foreach(println) } }

sample 取样
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDDobject Demo07Sample { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local") val sc: SparkContext = new SparkContext(conf) /** * sample:对数据取样 * withReplacement 有无放回 * fraction 抽样比例 * withReplacement:表示抽出样本后是否在放回去,true表示会放回去 * 这也就意味着抽出的样本可能有重复 * fraction :抽出多少,这是一个double类型的参数,0-1之间,eg:0.3表示抽出30% */ val stuRDD: RDD[String] = sc.textFile("spark/data/students.txt",3) stuRDD.sample(withReplacement = true,0.1).foreach(println) } }

union 将相同结结构的数据连接到一起
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object Demo08Union { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local") val sc: SparkContext = new SparkContext(conf)/**union * 将两个相同结构的数据连接在一起 */ val lineRDD1: RDD[String] = sc.parallelize(List("java,scala", "data,python")) val lineRDD2: RDD[String] = sc.parallelize(List("spark,scala", "java,python")) println(lineRDD1.getNumPartitions) val unionRDD: RDD[String] = lineRDD1.union(lineRDD2) println(unionRDD.getNumPartitions) unionRDD.foreach(println) } }

mappatitionWIthindex
//mapPartitionsWithIndex也是一个转换算子 // 会在处理每一个分区的时候获得一个index //可以选择的执行的分区 stuRDD.mapPartitionsWithIndex((index, rdd) => { println("当前遍历的分区:" + index) // 按分区去处理数据 rdd.map(line => line.split(",")(1)) }).foreach(println)join 将数据按照相同key进行关联(数据必须是(K,V))

import java.io
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo09Join {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
// 构建K-V格式的RDD
val tuple2RDD1: RDD[(String, String)] = sc.parallelize(List(("001", "张三"), "002" -> "小红", "003" -> "小明"))
val tuple2RDD2: RDD[(String, Int)] = sc.parallelize(List(("001", 20), "002" -> 22, "003" -> 21))
val tuple2RDD3: RDD[(String, String)] = sc.parallelize(List(("001", "男"), "002" -> "女"))
//将文件进行join
val joinRDD: RDD[(String, (String, Int))] = tuple2RDD1.join(tuple2RDD2)
joinRDD.map(kv => {
val i: String = kv._1
val j: String = kv._2._1
val k: Int = kv._2._2
i + "," + j + "," + k
}).foreach(println)
//第二种方式 joinRDD.map { case (id: String, (name: String, age: Int)) => id + "*" + name + "*" + age }.foreach(println)val leftJoinRDD: RDD[(String, (String, Option[String]))] = tuple2RDD1.leftOuterJoin(tuple2RDD3) leftJoinRDD.map { //存在关联 case (id: String, (name: String, Some(gender))) => id + "*" + name + "*" + gender //不存在关联 case (id: String, (name: String, None)) => id + "*" + name + "*" + "_" }

}
}
groupByKey将kv格式的数据进行key的聚合

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo10GroupByKey {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Demo10GroupByKey").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
/**
* groupBy 指定分组的字段进行分组
*/
// 统计班级人数 val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt") linesRDD.groupBy(word => word.split(",")(4)) .map(kv => { val key = kv._1 val wordsCnt = kv._2.size key + "," + wordsCnt }).foreach(println)val linesMap: RDD[(String, String)] = linesRDD.map(lines => (lines.split(",")(4), lines)) //按照key进行分组 linesMap.groupByKey() .map(lines=>{ val key = lines._1 val wordsCnt: Int = lines._2.size key+","+wordsCnt }).foreach(println)

}
}
ReduceByKey reduceByKey 需要接收一个聚合函数 首先会对数据按key分组 然后在组内进行聚合(一般是加和,也可以是Max、Min之类的操作) 相当于 MR 中的combiner 可以在Map端进行预聚合,减少shuffle过程需要传输的数据量,以此提高效率 相对于groupByKey来说,效率更高,但功能更弱 幂等操作 y = f(x) = f(y) = f(f(x)) reducebyKey与groupbykey的区别 reduceByKey:具有预聚合操作 groupByKey:没有预聚合 在不影响业务逻辑的前提下,优先采用reduceByKey。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo11ReduceByKey {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Demo11ReduceByKey").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
//统计班级人数
linesRDD.map(lines => (lines.split(",")(4), lines))
.groupByKey()
.map(kv => {
val key = kv._1
val cnt = kv._2.size
key + "" + cnt
}).foreach(println)
//ReduceByKey /** * reduceByKey 需要接收一个聚合函数 * 首先会对数据按key分组 然后在组内进行聚合(一般是加和,也可以是Max、Min之类的操作) * 相当于 MR 中的combiner * 可以在Map端进行预聚合,减少shuffle过程需要传输的数据量,以此提高效率 * 相对于groupByKey来说,效率更高,但功能更弱 * 幂等操作 * y = f(x) = f(y) = f(f(x)) */ linesRDD.map(lines=>(lines.split(",")(4),1)) .reduceByKey(_+_) .foreach(println)

}
}
sort 排序,默认升序

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo12Sort {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Demo12Sort").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
/** * sortBy 转换算子 * 指定按什么排序 默认升序 * * sortByKey 转换算子 * 需要作用在KV格式的RDD上,直接按key排序 默认升序 */ linesRDD.sortBy(lines => lines.split(",")(2), ascending = false) //按照年纪降序 .take(10) //转换算子打印十行 .foreach(println)val mapRDD: RDD[(String, String)] = linesRDD.map(l => (l.split(",")(2), l)) mapRDD.sortByKey(ascending = false) .take(10) .foreach(println)

}
}
Mapvalue

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo13MapValue {
def main(args: Array[String]): Unit = {
/**
* mapValues 转换算子
* 需要作用在K—V格式的RDD上
* 传入一个函数f
* 将RDD的每一条数据的value传给函数f,key保持不变
* 数据规模也不会改变
/
val conf: SparkConf = new SparkConf().setAppName("Demo13MapValue").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val linesRDD: RDD[(String, Int)] = sc.parallelize(List(("zs", 10), ("zzw", 34), ("lm", 18)))
linesRDD.mapValues(lines=>lines
2)
.foreach(println)
}
### 行为算子 ![](https://img2020.cnblogs.com/blog/2506444/202111/2506444-20211110144134743-1836525955.png)

    推荐阅读