Spark算子


目录

  • 算子分类
  • 转换(Transformations)算子
    • Map
    • FlatMap
    • MapPartitions
    • Filter
    • Sample
    • Union
    • Join
    • Groupby

算子分类
【Spark算子】转换算子是将一个RDD变成另一个RDD之间的转换,懒执行,需要操作算子触发执行
操作算子不能将一个RDD变成另一个RDD,每一操作算子都会触发一个job
可以通过算子的返回值去判断 该算子是转换/操作算子
Spark算子
文章图片

转换(Transformations)算子
Map
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object MapDemo1 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local").setAppName("MapDemo1") val sc: SparkContext = new SparkContext(conf)/** * map:传入一条数据 返回一条数据 * 不会改变数据的规模 * 接收一个函数f: 参数同RDD中数据的类型,返回值类型由自己决定 * 将RDD中的每一条数据 依次传给函数f */// 可以通过parallelize方法构建List的RDD val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5)) val RDD2: RDD[Double] = listRDD.map(s => { println("map" + s) (s * 2).toDouble })// foreach 也是一个操作算子 可以触发任务 RDD2.foreach(println)List(1, 2, 3, 4, 5).map(s => { println("list" + s) s * 2 })//方便观察控制台出现网站的job 写一个死循环一直保持运行 while (true){} }}

FlatMap
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject FlatMapDemo2 { /** * flatMap :传入一条返回N条 * 需要接收一个函数f,会将RDD中的每一条数据传给函数f,函数f处理完后需要返回一个集合或者数组 * flatMap会自动将结果进行扁平化处理(展开) */def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local").setAppName("FlatMapDemo2") val sc: SparkContext = new SparkContext(conf)val listRDD: RDD[String] = sc.parallelize(List("java,spark,hive","hadoop,hive")) listRDD.flatMap(s=>{ val splits: mutable.ArrayOps[String] = s.split(",") splits }).foreach(println)//省略写 listRDD.flatMap(s=>s.split(",")) .foreach(println)//map来写 listRDD.map(s=>{ val splits: Array[String] = s.split(",") splits //直接prt,只能是输出两个数组地址 }).foreach(s=> println(s.mkString("-")))}}

MapPartitions
import java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object MapPartitionsDemo3 {/**Map、MapPartitions foreach、foreachPartitions 有时在处理数据的时候需要和外部系统建立连接 如果连接建立在Driver端 连接是不能被序列化的 及在算子内部无法使用算子外部创建的连接 那么可以使用MapPartitions、foreachPartitions算子降低连接的创建与销毁次数,提高效率 */ def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local").setAppName("MapPartitionsDemo3") val sc: SparkContext = new SparkContext(conf)// 读取学生数据构建RDD val stuRDD: RDD[String] = sc.textFile("scala/data/students.txt",6) // 获取分区数量 println(stuRDD.getNumPartitions)// take 也是一个action算子 会返回一个Array // 这里的foreach实际上是Array的方法 不是RDD的算子 stuRDD.take(10).foreach(println)// mapPartitions也是一个转换算子 stuRDD.mapPartitions(part=>{ println("mappartitions") //一共输出6个mappartitions,因为设置了6个分区 part.map(s=>s.split(",")(1)).take(10) }).foreach(println)// mapPartitionsWithIndex也是一个转换算子 stuRDD.mapPartitionsWithIndex((index,part)=>{ println("现在的分区是:"+index) part.map(s=>s.split(",")(1)).take(10) }).foreach(println)// foreachPartition也是一个操作算子,无返回值 // 每一个分区由一个task去处理数据 // 相当于最终每个分区会创建一次conn stuRDD.foreachPartition(part=>{ //在里面创建JDBC连接,因为有6个分区,所以只有六次执行Class.forName("com.mysql.jdbc.Driver") val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?useSSL=false","root","123456") val statement: PreparedStatement = conn.prepareStatement("insert into student values(?,?,?,?,?)") println("分区")// // 按分区去处理数据 part.foreach(s=>{ val splits: Array[String] = s.split(",") val id: Int = splits(0).toInt val name: String = splits(1) val age: Int = splits(2).toInt val gender: String = splits(3) val clazz: String = splits(4)//将遍历的每一个数据变成每一个 ? statement.setInt(1,id) statement.setString(2,name) statement.setInt(3,age) statement.setString(4,gender) statement.setString(5,clazz)//一次五个加入到addBatch中处理 statement.addBatch()})//这里不是execute,因为处理上面的Batch,所以用executeBatch statement.executeBatch() statement.closeOnCompletion() conn.close()})}}

Filter
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object FilterDemo4 {/** * filter算子:主要用于过滤数据 * 接收一个函数f,函数f需要返回一个布尔值,为true则保留,false则过滤 */ def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("FilterDemo4").setMaster("local") val sc: SparkContext = new SparkContext(conf)//过滤奇数 val listRDD: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8)) listRDD.filter(s=>{ s%2==1 }).foreach(println)} }

Sample
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object SampleDemo5 {/** * sample:对数据取样 * withReplacement 有无放回 * fraction 抽样比例 */def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SampleDemo5").setMaster("local") val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("scala/data/students.txt") stuRDD.sample(false,0.1) .foreach(println) }}

Union
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object UnionDemo6 {/** * union : 将两个RDD首尾相连变成一个RDD * 两个RDD的结构必须一样 */ def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("UnionDemo6").setMaster("local") val sc: SparkContext = new SparkContext(conf)val listRDD1: RDD[Int] = sc.parallelize(List(1,2,3,4,5)) println(listRDD1.getNumPartitions)// 1 val listRDD2: RDD[Int] = sc.parallelize(List(1,2,3)) println(listRDD2.getNumPartitions)// 1val resRDD: RDD[Int] = listRDD1.union(listRDD2) println(resRDD.getNumPartitions)// 2 resRDD.foreach(println) }}

Join
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object JoinDemo7 { def main(args: Array[String]): Unit = { /** * join 同MySQL中的join操作类似 * 将两个k-v格式的RDD,按照相同的key做类似与inner join操作 */val conf: SparkConf = new SparkConf().setAppName("JoinDemo7").setMaster("local") val sc: SparkContext = new SparkContext(conf)// 构建K-V格式的RDD val tuple2RDD1: RDD[(String, String)] = sc.parallelize(List(("001", "张三"), "002" -> "小红", "003" -> "小明")) val tuple2RDD2: RDD[(String, Int)] = sc.parallelize(List(("001", 20), "002" -> 22, "003" -> 21)) val tuple2RDD3: RDD[(String, String)] = sc.parallelize(List(("001", "男"), "002" -> "女"))val joinRDD: RDD[(String, (String, Int))] = tuple2RDD1.join(tuple2RDD2)joinRDD.map(kv=>{ val id: String = kv._1 val name: String = kv._2._1 val age: Int = kv._2._2 id+","+name+","+age }).foreach(println)// 使用match接收RDD中的每一条数据,用的是map{} joinRDD.map { case (id: String, (name: String, age: Int)) =>id+"|"+name+"|"+age }.foreach(println)tuple2RDD2.leftOuterJoin(tuple2RDD3) .map{ // 关联上的处理逻辑 case (id:String,(age:Int,Some(gender))) =>id+","+age+"|"+gender // 未关联上的处理逻辑 case (id:String,(age:Int,None)) =>id+","+age+","+"-" }.foreach(println) } }

Groupby
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object GroupbyDemo8 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("GroupbyDemo8").setMaster("local") val sc: SparkContext = new SparkContext(conf)val stuRDD: RDD[String] = sc.textFile("scala/data/students.txt")//得到班级人数 stuRDD.groupBy(s=>s.split(",")(4)) .map(kv=>{ val clazz: String = kv._1 val value: Iterable[String] = kv._2 val size: Int = value.size clazz+","+size }).foreach(println)/** * groupByKey 作用K-V格式的RDD上,默认按K分组 */ stuRDD.map(s=>(s.split(",")(4),s)) .groupByKey() .map(kv=>{ kv._1+","+kv._2.size }).foreach(println)/** * reduceByKey 需要接收一个聚合函数 * 首先会对数据按key分组 然后在组内进行聚合(一般是加和,也可以是Max、Min之类的操作) * 相当于 MR 中的combiner * 可以在Map端进行预聚合,减少shuffle过程需要传输的数据量,以此提高效率 * 相对于groupByKey来说,效率更高,但功能更弱 * 幂等操作 * y = f(x) = f(y) = f(f(x)) * *///得到班级人数另一种算法 stuRDD.map(s=>(s.split(",")(4),1)) .groupBy(_._1) .map(kv=>{ val clazz: String = kv._1 val sum: Int = kv._2.map(_._2).sum clazz+","+sum }).foreach(println)//使用reduceByKey stuRDD.map(s=>(s.split(",")(4),1)) .reduceByKey((x,y)=>x+y) .foreach(println)//简写 stuRDD.map(s=>(s.split(",")(4),1)) .reduceByKey(_+_) .foreach(println) } }

    推荐阅读