初识Apache|初识Apache Spark

第一次接触Spark,自己整理了(从网络,书籍,同事那里)一些Spark的相关内容当做笔记。路过的朋友仅供参考,不能保证说得都对。
什么是 Spark
简单来说,Spark是一种面向对象、函数式编程语言。Spark能够像操作本地集合对象一样轻松地操作分布式数据集。它具有运行速度快、易用性好、通用性强和随处运行等特点。
Spark提供了支持Java、scala、Python以及R语言的API。还支持更高级的工具如:Spark Sql、Spark Streaming、MLlib、GraphX等。

官方介绍 :Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
百度百科:Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎
Spark 有什么特点
以下摘自百度百科
更快的速度。内存中计算, 比 Hadoop 快100倍。
易用性。Spark 提供了80多个高级运算符。
通用性。Spark 提供了大量的库,包括SQL、DataFrames、MLlib、GraphX、Spark Streaming。 开发者可以在同一个应用程序中无缝组合使用这些库。
支持多种资源管理器。Spark 支持 Hadoop YARN,Apache Mesos,及其自带的独立集群管理器
更多详细介绍可参考:https://blog.csdn.net/xwc35047/article/details/51072145
什么是 RDD
在我使用Spark的过程中,用到最多的对象就是RDD,比如JavaRDD、JavaPairRDD。然后RDD之间又可以互相转化。这个RDD是个啥?
RDD全文是 Resilient Distributed DataSet(弹性·分布式·数据集)。
RDD是一个只读的、可分区的、支持多种来源 、有容错机制 、可以被缓存 、支持并行操作的分布式数据集,可以装载任何你想装载的数据。他的弹性特点体现在RDD的数据可以在内存与磁盘(外存)灵活交换。
Spark 模型
【初识Apache|初识Apache Spark】再来认识一下下面几个重要概念
Application。也就是我们编写完Spark程序,负责生成SparkContext。
Job。所谓 job,就是由一个 rdd 的 action算子(后面再说action) 触发的动作,可以简单的理解为,当你需要执行一个 rdd 的 action 的时候,会生成一个 job。
Stage。stage 是一个 job 的组成单位,就是说,一个 job 会被切分成 1 个或多个 stage,然后各个 stage 会按照执行顺序依次执行。
Task。stage 下的一个任务执行单元,一般来说,一个 rdd 有多少个 partition(分区,后面再说partition),就会有多少个 task,因为每一个 task 只是处理一个 partition 上的数据。
简单来说就是以RDD为基准,每触发一个action操作,就会生成一个job。job内部有一个或多个stage顺序执行,组成stage的是一系列task,即任务执行单元。
关于Partition分区。Spark RDD主要由Dependency、Partition、Partitioner组成,Partition是其中之一。一份待处理的原始数据会被按照相应的逻辑切分成n份,每份数据对应到RDD中的一个Partition,Partition的数量决定了task的数量,影响着程序的并行度。
关于Stage。Stage以shuffle和result这两种类型来划分。Spark中有两类task,一类是shuffleMapTask,一类是resultTask,第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也以此为依据,shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。
那么刚刚提到的action又是什么?我们再来了解一下RDD操作算子。
什么是 RDD 操作算子
RDD有两种操作算子:Transformation(转换) 和 Action(执行)
Transformation。即一个rdd数据集经过数据转换变成一个新的rdd数据集。常用的Transformation操作有:map、filter、union、distinct、groupByKey 等。Transformation 属于延迟计算,当触发Transformation算子时rdd并没有立即进行转换,仅仅是记住了数据集的逻辑操作。
Action。触发Spark作业的运行,真正触发转换算子的计算。常用的操作有:reduce、collect、count、countByKey等等。
什么是 Shuffle
以下摘自官网
Shuffle 即洗牌。以reduceByKey 操作来说,reduceByKey操作生成一个新的RDD,其中相同key的所有值都组合为一个元组——key和reduce函数的结果。但是,并非所有相同key的值都必须位于同一个分区甚至是同一台计算机上,然而它们必须位于同一位置才能计算结果。
在Spark中,数据通常不会根据特定的操作在必要位置进行跨分区分布。在计算过程中,单个任务在单个分区上运行。因此,要执行reduceByKey任务的所有数据,Spark需要执行全部操作。它必须从所有分区中读取数据以找到单个key的所有值,然后将各分区中的值汇总以计算每个key的最终结果 - 这称为洗牌。
什么是窄依赖、宽依赖
窄依赖。指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区,和两个父RDD的分区对应于一个子RDD 的分区。
宽依赖。指子RDD的分区依赖于父RDD的所有分区。
举例
下面通过一个例子,尽量把我所理解的那部分通过这个小例子表达出来,不保证说的都对。
统计某高中今年参加高考的男生人数
首先我们需要将数据源读取到Spark RDD中(先不管如何读取),一个数据源只生成一个rdd。
rdd内部会按照一定的逻辑分割成n个partition分区,分区数也可以自己指定,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目,每个分区由一个task 执行。
rdd先执行filter 操作即Transformation算子。将参加高考的,性别为男性的对象过滤出来。但Transformation 是惰性的,不会立刻触发spark 作业。
过滤后的rdd 需要进行reduce操作即Action算子。此时触发spark 作业。每个action将生成一个Job。
Job 包含stage,stage有两种:shuffle和result,取决于算子的执行逻辑。如果一个job中有宽依赖,即有shuffle操作,shuffle之前的生成一个shuffle stage。shuffle之后的生成一个result stage。
每个stage 都是一组task 在执行,task 取决于分区数。
reduce 过程将符合条件的学生数计数并返回。
代码示例
package com.yzy.spark; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class SparkDemo { private static String appName = "spark.demo"; private static String master = "local[*]"; public static void main(String[] args) { JavaSparkContext sc = null; try { //初始化JavaSparkContext SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); sc = new JavaSparkContext(conf); // 生成数据源 List data = https://www.it610.com/article/getList(); //生成rdd JavaRDD rdd = sc.parallelize(data); //过滤符合条件的数据 rdd = rdd.filter(new Function() { public Boolean call(Student s) throws Exception { return s.isGaoKao() && s.getSex().equals("男"); } }); // map && reduce Student result = rdd.map(new Function() { public Student call(Student s) throws Exception { s.setCount(1); return s; } }).reduce(new Function2() { public Student call(Student s1, Student s2) throws Exception { s1.setCount(s1.getCount() + s2.getCount()); return s1; } }); System.out.println("执行结果:" + result.getCount()); } catch (Exception e) { e.printStackTrace(); } finally { if (sc != null) { sc.close(); } } }public static List getList(){ List data = https://www.it610.com/article/new ArrayList(); data.add(new Student(true,"男", "A")); data.add(new Student(false,"女", "B")); data.add(new Student(false,"男", "C")); data.add(new Student(true,"女", "D")); data.add(new Student(true,"男", "E")); data.add(new Student(false,"女", "F")); data.add(new Student(true,"男", "G")); return data; }static class Student implements Serializable{ private String name; private boolean gaoKao; private String sex; private int count; public Student(boolean gaoKao, String sex, String name) { this.gaoKao = gaoKao; this.sex = sex; this.name = name; }public String getName() { return name; }public void setName(String name) { this.name = name; }public boolean isGaoKao() { return gaoKao; }public void setGaoKao(boolean gaoKao) { this.gaoKao = gaoKao; }public String getSex() { return sex; }public void setSex(String sex) { this.sex = sex; }public int getCount() { return count; }public void setCount(int count) { this.count = count; } } }

控制台输出
//省略若干行 18/06/22 18:42:25 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 41 ms on localhost (executor driver) (1/4) 18/06/22 18:42:25 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 58 ms on localhost (executor driver) (2/4) 18/06/22 18:42:25 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 44 ms on localhost (executor driver) (3/4) 18/06/22 18:42:25 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 44 ms on localhost (executor driver) (4/4) 18/06/22 18:42:25 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 18/06/22 18:42:25 INFO DAGScheduler: ResultStage 0 (reduce at SparkDemo.java:44) finished in 0.219 s 执行结果:3

请注意:实例中的Student 类必须序列化,否则会报错!

    推荐阅读