Spark|Spark architecture
2019-10-18spark应用程序的运行架构:
(1)简单的说:
由driver向集群申请资源,集群分配资源,启动executor。driver将spark应用程序的代码和文件传送给executor。executor上运行task,运行完之后将结果返回给driver或者写入外界。
(2)复杂点说:
提交应用程序,构建sparkContext,构建DAG图,提交给scheduler进行解析,解析成一个个stage,提交给集群,由集群任务管理器进行调度,集群启动spark executor。driver把代码和文件传给executor。executor进行各种运算完成task任务。driver上的block tracker记录executor在各个节点上产生的数据块。task运行完之后,将数据写入HDFS上或者其他类型数据库里。
(3)全面点说:
spark应用程序进行各种transformation的计算,最后通过action触发job。提交之后首先通过sparkContext根据RDD的依赖关系构建DAG图,DAG图提交给DAGScheduler进行解析,解析时是以shuffle为边界,反向解析,构建stage,stage之间也有依赖关系。这个过程就是对DAG图进行解析划分stage,并且计算出各个stage之间的依赖关系。然后将一个个TaskSet提交给底层调度器,在spark中是提交给taskScheduler处理,生成TaskSet manager,最后提交给executor进行计算,executor多线程计算,计算完反馈给TaskSetmanager,再反馈给taskScheduler,然后再反馈回DAGScheduler。全部运行完之后写入数据。
【Spark|Spark architecture】(4)更加深入理解:
应用程序提交后,触发action,构建sparkContext,构建DAG图,提交给DAGScheduler,构建stage,以stageSet方式提交给TaskScheduler,构建taskSet Manager,然后将task提交给executor运行。executor运行完task后,将完成信息提交给schedulerBackend,由它将任务完成的信息提交给TaskScheduler。TaskScheduler反馈信息给TaskSetManager,删除该task任务,执行下一个任务。同时TaskScheduler将完成的结果插入到成功队列里,加入之后返回加入成功的信息。TaskScheduler将任务处理成功的信息传给TaskSet Manager。全部任务完成后TaskSet Manager将结果反馈给
DAGScheduler。如果属于resultTask,交给JobListener。如果不属于resultTask,保存结果。
http://www.aboutyun.com/thread-24246-1-1.html
程序在运行之前,已经申请过资源,driver和Executors通讯,不需要和master进行通讯
高效原因
- 基于内存计算,减少低效的磁盘交互
- 高效的调度算法,基于DAG
- 容错机制Linage
- 自动的进行内存和磁盘的存储切换
- 基于Lingage的高效容错
- task如果失败会自动进行特定次数的重试
- stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片
- checkpoint和persist,数据计算之后持久化缓存
- 数据调度弹性,DAG TASK调度和资源无关
- 数据分片的高度弹性,a.分片很多碎片可以合并成大的,b.parquet
- 不支持细粒度的写和更新操作(如网络爬虫),spark写数据是粗粒度的 所谓粗粒度,就是批量写入数据,为了提高效率。但是读数据是细粒度的也就是 说可以一条条的读
- 不支持增量迭代计算,Flink支持
shuffle 过程 https://www.cnblogs.com/jxhd1/p/6528540.html
调优
- 平台层面的调优:防止不必要的jar包分发,提高数据的本地性,选择高效的存储格式如parquet
- 应用程序层面的调优:过滤操作符的优化降低过多小任务,降低单条记录的资源开销,处理数据倾斜,复用RDD进行缓存,作业并行化执行
- JVM层面的调优:设置合适的资源量,设置合理的JVM,启用高效的序列化方法如kyro,增大off head内存等等
reduce是action,reducebykey不是
读取kafka
- 基于Receiver的方式 这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
- 基于Direct的方式 这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据
http://www.coin163.com/it/5310443029667760572/spark-
storage level
- useDisk:使用硬盘(外存)
- useMemory:使用内存
- useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
- deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象
- replication:备份数(在多个节点上备份)
spark sql https://zhuanlan.zhihu.com/p/45729547
rdd vs dataframe
https://www.jianshu.com/p/c0181667daa0
推荐阅读
- Spark|Spark 数据倾斜及其解决方案
- 前沿论文|论文精读(Neural Architecture Search without Training)
- Spark--java.util.NoSuchElementException:|Spark--java.util.NoSuchElementException: None.get at at
- spark|spark 资源动态释放
- Clean architecture for the rest of us
- 前端的Clean Architecture
- Flutter System Architecture(Flutter系统架构图)
- SparkStreaming读Kafka-|SparkStreaming读Kafka- Couldn't find leaders for Set
- spark|spark UDAF根据某列去重求合 distinct sum
- 大数据|spark UDAF 自定义聚合函数 UserDefinedAggregateFunction 带条件的去重操作