Spark学习记录之SparkCore核心属性

Spark学习记录之SparkCore初步 概念 Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
包含的模块有,Spark Core,Spark SQL,Spark Streaming,Spark MLib,Spark GraphX
Spark Submit例子

  1. Standalone
    bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://spark111:7077 \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 1000

  2. Yarn
    bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 100bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 100

Spark On Yarn 两种模式的区别在于:Driver程序运行的节点不同。Client模式将用于监控和调度的Driver模块启动在客户端,而Cluster则将Driver模块启动在集群中。因此Client模式一般用于测试,Cluster则用于生产部署。
  1. 【Spark学习记录之SparkCore核心属性】Yarn Client模式
    • Driver在提交任务的本地运行
    • Driver启动后,会和ReourceManager通讯,申请启动ApplicationMaster
    • ResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,负责向ResourceManager申请Executor的资源
    • ResourceManager接到ApplicationMaster的申请后,分配Container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程。
    • Executor进程启动后,会反向注册到Driver,Executor全部启动完成后,Driver开始执行main函数
    • 执行到Action算子时,触发一个job,并根据宽依赖开始切分stage,每个stage生成对应的TaskSet,之后Driver会将task分发到各个Executor上执行
  2. Yarn Cluster模式
    • 在Yarn Cluster模式下,任务提交后会向ResourceManager通讯申请启动ApplicationMaster
    • 随后ResourceManger分配Container,在合适的NodeManager上启动ApplicationMaster,此时ApplicationMaster就是Driver
    • Driver启动后,向ResourceManager申请Exexutor资源。
    • ResourceManager分配Contaioner,ApplicationMaster在对应的NodeManager上启动Excutor。Executor启动后会向Driver进行反向注册
    • 当Executor全部启动完毕后,Driver开始执行main函数
    • 执行到Action算子时,触发一个job,并根据宽依赖切分stage,每个stage上生成对应的TaskSet,之后Driver回将task分发到各个Executor上执行
RDD RDD(Resilient Distributed Dataset),弹性分布式数据集
  1. 分区,与Hadoop MapReduce的对比
    • MapReduce有切片和分区,这是两个不同的概念,切片主要作用于MapTask阶段,以TextInputFormat为例,切片根据文件块大小来决定,默认情况下,切片大小就等于块大小,当然切片大小可通过配置调节,一般情况下,块大小为128M或256M,以磁盘速度决定。一个文件会以切片大小来切分为多个逻辑上的小文件,因此切片个数就等于MapTask的个数,即MapTask的并行数
    • MapReduce的分区是要在代码中指定设置的,默认为一个分区,分区个数对应的是ReduceTask的个数。默认的分区器Partitioner是HashPartitioner,以(key & Long.MAX_VALUE)%numReduceTasks计算得来,即在HashPartitioner的计算逻辑中,设定多少个numReduceTasks就会有多少个分区。用户可以通过继承Partioner来自定义分区器,以实现指定的分区个数。
    • MapReduce的分区数也会作用在MapTask阶段,在数据处理map方法进入环形缓冲区前会给数据标记上分区,在环形缓冲区的溢写排序和多个溢写文件的排序合并中都会以分区为单位进行
    • RDD中,一个stage中的task数量,是以该stage中的最后一个算子的分区数决定的
  2. 5个核心属性
    • 分区列表
      protected def getPartitions: Array[Partition]

    • 分区计算函数
      def compute(split: Partition, context: TaskContext): Iterator[T]

    • RDD之间的依赖关系
      protected def getDependencies: Seq[Dependency[_]] = deps

    • 分区器(可选)
      @transient val partitioner: Option[Partitioner] = None

    • 首选位置(可选)
      protected def getPreferredLocations(split: Partition): Seq[String] = Nil

  3. 分区与并行度
    分区与并行度有关系,但是是不同的概念。多个分区会有多个task,但是只有一个Executor的话,也就只能是并发,而非并行。分区的意思是写代码的人希望在足够资源的情况下可以达到分区数的并行度。
    ? 分区内计算有序,分区间计算无序
    • markRDD的分区,从集合中创建
      • numSlices = 循环i = 0~3,文件length,
      • start = ((i * length)/numSlices).toInt
      • end = (((i +1) * length)/numSlices).toInt
        [1,2,3,4,5] numSlices = 2 length = 50=> [0,2)=>1,2 1=> [2,5)=>3,4,5

        def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = { if (numSlices < 1) { throw new IllegalArgumentException("Positive number of partitions required") } // Sequences need to be sliced at the same set of index positions for operations // like RDD.zip() to behave as expected def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { (0 until numSlices).iterator.map { i => val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) } } //...省略多行代码 }

    • textFile的分区
      • 采用Hadoop的读文件方式,TextInputformat
      • 以行为单位进行读取
      • 读取数据时以偏移量为单位
      • 偏移量不会重复读取
        1234567@@=> 012345678 89@@=> 9101112 0=> 1314 / 2 = 7 [0,7]=> 1234567@@ [7,14]=> 89@@ 0

        public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { //...省略多行代码 long totalSize = 0L; //...省略多行代码 totalSize += file.getLen(); //...省略多行代码 long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits); long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize); //...省略多行代码 //blockSize,本地运行环境,32M,生产128M或者256M long blockSize = file.getBlockSize(); long splitSize = this.computeSplitSize(goalSize, minSize, blockSize); //...省略多行代码 for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap); splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); } //...省略多行代码 }protected long computeSplitSize(long goalSize, long minSize, long blockSize) { return Math.max(minSize, Math.min(goalSize, blockSize)); }

    推荐阅读