特别介绍
Spark于2009年诞生于加州大学伯克利分校的AMP实验室(算法、机器与人实验室),2010年开放 。2013年,Spark向Apache软件基金会捐款,2014年,Spark成为Apache顶级项目 。
如今,十年过去了,Spark已经成为大大小小的企业和研究机构的常用工具之一,依然受到众多开发者的喜爱 。如果你初入江湖,想了解和学习Spark的“小虾米”,那么InfoQ与飞轮技术专家Leo合作的《Spark:原理详解与开发实践》系列文章一定会适合你!
本文是特别系列的第二篇 。
继上一本书之后,在最后一章《内存计算的起源——RDD》中,我们从“虚”和“实”两个方面介绍了RDD的基本结构 。d是依赖关系和计算属性端到端连接形成的计算路径,技术上称为lineage-lineage,也称为DAG(有向无环图) 。为什么一个概念有两个名字?这两个不同的名字有什么区别和联系?简单来说,血统和DAG从两个不同的角度描述了同一件事 。谱系,着重从数据角度描述不同RDD的依赖性;DAG从计算的角度描述了不同RDD之间的转换逻辑 。如果说RDD是Spark对分布式数据模型的抽象,那么相应地DAG就是Spark对分布式计算模型的抽象 。
顾名思义,DAG是一种“图” 。图计算模型的应用有着悠久的历史,早在上个世纪就被应用于图形数据库的实现 。任何图都包含两个基本元素:一个节点(顶点)和一条边(边) 。节点通常用于表示实体,而边表示实体之间的关系 。比如《倚天屠龙记》社交网络的好友关系中,每个节点代表一个特定的人,每条边意味着两端的实体之间已经建立了好友关系 。
田义·龙图社交网络
上面的社交网络中,朋友是相互的,比如张无忌和周芷若是彼此的朋友,所以图中的边是没有方向性的;另外,细心的同学可能已经发现,上面的图结构中还有“环”,比如张无忌、谢逊、白眉鹰王组成的关系环,张无忌、谢逊、紫衫龙王、小昭之间的关系环,等等 。像上面这样的图结构叫做“无向循环图” 。没有比较,就没有歧视 。有向无环图(DAG)自然是一种具有方向性,没有“环”结构的图模型 。还记得土豆工坊的例子吗?
马铃薯车间山
在上面的土豆加工DAG中,每一个节点都是一个RDD,每一条边都代表了不同rdd之间的父子关系——父子关系天然是单向的,所以整个画面是有方向的 。此外,我们注意到在整个图形中没有环形结构 。像这样的土豆加工线可以说是最简单的有向无环图 。每个节点的Indegree(指向自己的边)和Outdegree(从自己开始的边)都是1,整个图只有一个分支 。
但工业应用中的Spark DAG要比这复杂得多,往往是由不同的RDD通过关联和分裂生成的具有多个分支的有向无环图 。为了说明这一点,我们以土豆工坊为例 。将“原味”薯片投放市场一段时间后,作坊老板发现季度销量骤降,老板急得不知所措 。这时有人向他建议:“为什么不推出更多口味的薯片来满足大众多样化的选择”,于是老板命令工人对流水线做如下改动 。
马铃薯车间先进生产线
与以前的工艺相比,新工艺增加了三条调味线,用于分配不同的调味粉 。新流水线中的辣椒粉配送到收集小薯片的流水线,孜然粉配送到中薯片流水线,番茄粉对应配送到大薯片流水线 。改造后的土豆车间现在可以生产三种口味不同大小的薯片,分别是麻辣口味的小薯片、孜然口味的中薯片和番茄口味的大薯片 。如果我们用flavoursRDD抽象调味品,那么新车间作业流程对应的DAG就会演化成有向无环图,有两个分支,如下图所示 。
多分支DAG
在上一篇文章中,我们讨论了星火核心内心法的第一精髓——RDD 。在本文中,我们来谈谈内心法的第二个秘密——Dag 。
RDD算子Dag的边缘在上一章《内存计算的起源RDDs》的最后,我们以WordCount为例,展示了不同RDD之间转换形成的DAG计算图 。通读代码,从开发的角度,我们发现DAG的关键是RDD算子调用 。不同于Hadoop MapReduce,Spark提供了丰富的RDD算子供开发者灵活排列组合,从而实现多样化的数据处理逻辑 。那么问题来了,Spark提供了哪些算子?
来源:https://spark . Apache . org/docs/latest/rdd-programming-guide . html 。
从表中可以看出,Spark的RDD算子是如此的丰富,让人眼花缭乱 。对于刚接触Spark的同学来说,如果不稍微分类的话,真的是无法上手众多的操作人员 。Apache Spark官网将RDD算子分为变换和动作,这也是各种Spark技术博客中常用的分类方法 。为了解释转换和操作运算符之间的本质区别,我们必须提到Spark计算模型的惰性计算(也称为延迟计算)特征 。
掌握一个新概念最有效的方法之一就是找到相反的概念——相对于“懒惰计算”,大多数传统编程语言和编程框架的评估策略是“热切评估” 。比如我们熟悉的C,C,Java,每一条指令都会试图调度CPU,占用时钟周期,触发计算的执行 。同时,CPU寄存器需要与内存通信,完成数据交换和数据缓存 。在传统的编程模式下,每一条指令都渴望被调度到“前线”,参与战斗 。
惯性计算模型不是这样的——具体来说,Spark,大部分RDD算子都是“稳”的,特别能屏住呼吸,他们会明确地告诉DAGScheduler:“哥们,你先走吧,别管我,我伸一会儿懒腰,抽根烟 。队伍的前排是我们的领导,没有他的命令,我们不会轻举妄动 。”有了对懒计算和早求值的基本了解,下面就来说说变换和动作的区别 。在Spark的RDD算子中,变换算子都是懒惰的求值运算,只参与DAG计算图的构建和表示计算逻辑,不会被立即调度和执行 。惰性求值的特点是只有在需要物化数据时才触发计算的执行 。RDD的Actions operator提供了各种数据物化操作,其主要职责是触发整个DAG计算链的执行 。并且只有当Actions操作符触发计算时,从DAG的开头到结尾的所有操作符(之前用于构建DAG的Transformations操作符)才会按照依赖的顺序依次被调度和执行 。
说到这里,读者不禁要问:Spark的惰性评价的计算模型有什么优点?或者反过来说,为什么Spark不采用传统的早期评测?不知道大家有没有听说过“延迟满足效应”(又称“糖果效应”),是指为了获得长远的、更大的利益,主动延迟甚至放弃现在的、更小的满足感 。俗话说“云要衣装花好看,猪要肥,人要红” 。Spark不仅是天才儿童,年纪轻轻也相当有天赋 。原来内功不是为了赢现在的一招半式,而是着眼于整个武林 。太过分了 。让我们把它拿回来 。总的来说,惰性计算为Spark执行引擎的整体优化提供了广泛的空 。关于惰性计算如何帮助Spark做全局优化——一个讲故事的人的嘴表达不了两件事,后面的文档我们慢慢开发吧 。
我们再来谈谈RDD算子 。除了常见的按转化和作用分类的方法外,作者还从适用范围和用途两个维度对老铁进行了分类 。毕竟人脑喜欢结构化的知识,官网的长蛇阵一词列表总是让人昏昏欲睡 。有了这个表,我们知道*ByKey的操作一定作用在配对的RDD上 。所谓成对RDD指的是通过模式明确区分的键、值对的RDD,而任意RDD指的是没有模式或具有任意模式的RDD 。从使用的角度可以分辨出RDD算子分类比较分散,篇幅比较小的原因,这里就不一一介绍了 。让我们得到我们需要的 。
值得一提的是,对于同一个计算场景,不同运营商带来的实现性能可能会有很大差异 。我们将在下面的性能调优章节中分析具体的问题 。好的,随着坑越挖越多,请稍等片刻 。先说一下刚才提到的根据FIFO原理的hot DAGScheduler 。
Dag调度程序-DAG的向导DAGScheduler是Spark分布式调度系统的重要组成部分之一 。其他组件包括TaskScheduler、MapOutputTracker、SchedulerBackend等 。DAGScheduler的主要职责是根据RDD依赖将DAG划分为阶段,以阶段粒度提交任务(TaskSet)并跟踪任务的进度 。如果把DAG看作是Spark job或者“战略地形”的执行路径,那么DAGScheduler就是这个地形的引导官,负责从头到尾摸清地形,根据地形特点安排兵力 。更形象的说,回到土豆车间的例子,DAGScheduler要做的就是将抽象的土豆加工DAG转化为车间流水线上具体的薯片加工任务 。那么问题来了,DAGScheduler是如何探索“地形”的呢?如何划分阶段?划分阶段的依据是什么?再者,把DAG分成阶段有什么好处?Spark为什么要这么做?
DAGScheduler的核心职责
要回答这些问题,我们首先需要将DAG的“头”和“尾”定义如下:在DAG中,没有父RDD的节点称为头节点,而没有子RDD的节点称为尾节点 。以土豆工坊为例,其中有两个首节点,分别是potatosRDD和flavoursRDD,尾节点是flavouredBakedChipsRDD 。
DAG中开始和结束的定义
当DAGScheduler试图探索DAG的“地形”时,它以一种颠倒的方式从后往前走 。具体来说,对于土豆工场的DAG,DAGScheduler将从尾节点flavouredBakedChipsRDD开始,按照RDD依赖关系依次向前遍历所有父RDD节点,遍历过程中以Shuffle为边界划分阶段 。Shuffle字面意思是“洗牌”,没错,扑克游戏中的洗牌 。Shuffle在大数据领域扩展为“跨节点的数据分布”,指的是为了实现某种计算逻辑,数据需要在集群内的不同计算节点间定向分布 。在大多数场景下,Shuffle是当之无愧的“性能瓶颈” 。不客气的说,哪里有洗牌,哪里就有性能优化的空室 。我们后面会分别讨论Spark Shuffle的原理和性能优化技巧 。在土豆工坊的DAG中,有两个地方发生了Shuffle,一个是从bakedChipsRDD到flavouredbekedchipsrdd的计算,另一个是从flavoursRDD到flavouredbekedchipsrdd的计算,如下图所示 。
在土豆车间洗牌
读者不禁要问:DAGScheduler是如何判断RDD之间的转换是否会洗牌的?这位读者说,“前一份文件说,接线员半天是RDD之间转换的关键 。是不是要根据运营商来判断是否会发生洗牌?”你真的猜错了 。运算符和Shuffle之间没有对应关系 。以连接操作符为例 。在大多数情况下,连接会引入洗牌;;但是在同位连接中,当左右表的数据分布一致时,就不会发生Shuffle 。所以,你看,DAGScheduler真的不能依靠运营商本身来判断是否发生洗牌 。要回答这个问题,我们还是要回到上一篇文献《内存计算的起源——RDD》中介绍RDD时提到的五个属性 。
名称成员类型属性含义相关性变量生成RDD所依赖的父RDD 。Compute方法生成RDD的计算接口分区变量 。RDD的所有数据片段 。实体划分器方法划分数据片段的规则 。数据片段的可变物理位置偏好
RDD的五种属性及其含义
其中,第一种属性依赖可以细分为窄依赖和洗牌依赖 。窄依赖(NarrowDependency)也叫“窄依赖”,是指RDD所依赖的数据不需要分布,基于当前已有的数据切片就可以实现compute attribute封装的功能 。另一方面,ShuffleDependency意味着RDD所依赖的数据碎片需要分布在集群中,然后才能执行RDD的compute函数来完成计算 。因此,RDD之间的转换是否是无序的取决于子RDD的依赖类型 。如果依赖类型是ShuffleDependency,那么DAGScheduler决定RDD和RDD之间的转换将引入Shuffle 。在回溯到DAG的过程中,一旦DAGScheduler发现RDD的依赖类型是ShuffleDependency,它将依次执行以下三个操作:
沿着 Shuffle 边界的子 RDD 方向创建新的 Stage 对象把新建的 Stage 注册到 DAGScheduler 的 stages 系列字典中,这些字典用于存储、记录与 Stage 有关的状态和元信息,以备后用沿着当前 RDD 的父 RDD 遵循广度优先搜索算法继续回溯 DAG
以土豆工坊为例 。其尾节点flavouredBakedChipsRDD依赖于两个父RDD bakedChipsRDD和flavoursRDD,依赖类型为ShuffleDependency 。然后,根据DAGScheduler的执行逻辑,此时将执行以下三个具体操作:
DAGScheduler在DAG过程中遇到ShuffleDependency时,会回撤主操作流程 。
DAGScheduler沿着尾节点回溯并划分为stage0 。
完成第一阶段(stage0)的创建和注册后,DAGScheduler继续向bakedChipsRDD方向回溯 。当我们沿着这条路跑的时候,我们的DAGScheduler向导惊讶地发现:“我去!去马平川的一路上,风景很好,站与站之间没有障碍,交通也很顺畅 。真是好地形!”——沿路遇到的所有RDD(Bakedchipsrdd,chipsrdd,cleanedPotatosRDD,PotatosRDD)都是窄依赖 。
回溯结束时,DAGScheduler也会重复上述三个步骤 。根据DAGScheduler以Shuffle为界划分阶段的原理,将沿途所有RDD归为同一阶段,暂记为stage1 。值得一提的是,Stage对象的rdd属性对应的数据类型是RDD[],而不是列表[RDD[]] 。对于逻辑上包含多个RDD的阶段,其RDD属性存储路径末端的RDD节点,在我们的例子中是bakedChipsRDD 。
DAGScheduler沿着bakedChipsRDD方向回溯,划分stage1 。
勤奋的DAGScheduler,在成功创建stage1之后,还在不忘初心,牢记使命,继续向未探索的路线前进 。从上图中,我们可以清楚地看到,flavoursRDD方向的路径保留在整个地形中,不包含在DAGScheduler的视图中 。我们的DAGScheduler向导记忆力很好 。早在stage0划分的时候,他就在笔记本(栈)上写下:“这个路口有个岔口 。先沿着bakedChipsRDD走,再回头沿着flavoursRDD探索 。”记住,记住!”此时,向导拿出之前的笔记本,用横线划掉了bakedChipsRDD方向的路径——表示这个方向的路径已经探索过了,然后沿着flavoursRDD方向大步走去 。一脚下去,发现:“我去!结束了!”,然后按照通常的“三招一套”流程——创建阶段、注册阶段、继续回溯 。随着DAGScheduler创建最后一个阶段:stage2,地形上的所有路径都已被探索 。
DAGScheduler创建最后一个阶段:阶段2
到目前为止,我们的向导几乎断了腿,按照首尾相连的顺序搜索了整个地形,最后把地形划分为三个战略阶段 。那么,问题来了 。指南划分的三个区域有什么用?DAGScheduler他老爸一直这样跑 。有什么意义?如前所述,DAGScheduler的核心职责是将抽象的DAG计算图转化为可以并行计算的具体分布式任务 。追溯到DAG和创建Stage只是这个核心职责的第一步 。DAGScheduler以Stage(TaskSet)作为任务调度的粒度,与TaskScheduler、SchedulerBackend等众多bosses协同作战,运筹帷幄,调兵遣将 。不过,毕竟这篇文章的主题是DAG,距离星火调度系统的核心还有一段距离,所以暂且在这里挖个坑,稍后再单独打开(星火调度系统),讲讲几位大佬之间的趣闻轶事 。填坑的路很长,修远,我会上下挖掘这个坑 。
我们来回顾一下导游的心路历程 。首先,DAGScheduler沿着DAG的尾节点北上,沿途判断每个RDD节点的依赖属性 。之后,如果确定RDD的依赖属性是NarrowDependency,DAGScheduler将继续向前回溯;如果RDD的依赖是ShuffleDependency,DAGScheduler将启动“三动一集”移动,创建阶段,注册阶段并继续回溯 。因此,何时切割DAG并生成新阶段由RDD的依赖类型决定,并且DAGScheduler仅当且仅当RDD的依赖是ShuffleDependency时才会创建新阶段 。
如果你喜欢提问,你一定会问,“DAGScheduler怎么知道RDD依赖哪种类型?他怎么知道RDD的依赖是狭义依赖还是洗牌依赖?”要回答这个问题,我们必须回到RDD的五个属性,但这次是分裂者 。还记得这个属性吗?Partitioner是RDD的划分器,定义了RDD数据分片的划分规则 。它决定了RDD的数据碎片如何分布在分布式集群中 。这个属性很重要,后面介绍Shuffle的时候会提到 。DAGScheduler通过划分器来确定每个RDD的依赖类型 。具体地,如果子RDD的划分器与父RDD的划分器一致,则DAGScheduler确定子RDD对父RDD的依赖属于窄依赖 。相反,如果两个partitioner不一致,即分区规则不同(不同的分区规则意味着必须有数据的“洗牌”,即Shuffle),那么DAGScheduler确定子对父的依赖是ShuffleDependency 。至此,DAGScheduler对DAG的划分逻辑暂时可以告一段落了 。原理说了,举了例子 。还缺什么?对!代码 。
给我看看代码古人云:“光说不练 。”我们用一个小例子来说明DAG和Stage的关系 。或者使用上一章“内存计算的起源——RDD”中的WordCount来效仿 。该文件的内容如下 。
样本文件内容
代码没有改变:
字数统计示例代码
虽然文件的内容和代码没有变,但是我们观察问题的角度变了 。这一次,我们关注的是DAG中阶段的划分以及阶段之间的关系 。RDD的toDebugString函数可以让我们看到DAG的组成和阶段的划分,如下图所示 。
DAG组成和阶段划分
在上图中,从第3行开始,每一行代表一个RDD 。显然,第3行的ShuffledRDD是DAG的尾节点,而第7行的HadoopRDD是首节点 。我们来观察一下每一行串印的特点 。首先,最明显的是第4、5、6、7行前面有一个Tab,明显和第3行错位了,也就是说第3行的ShuffledRDD被划分为一个阶段(标记为stage0),而第4、5、6、7行的其他RDD被划分为另一个阶段(标记为stage0)假设第7行下面的RDD字符串打印了两个Tab,也就是和第7行错位了,那么第7行下面的RDD就被标记为新的阶段,以此类推 。
因此,通过RDD的toDebugString观察DAG的阶段划分时,tab是一个重要的指标 。此外,我们看到在第3行和第4行的开头有一个括号,括号中有一个数字,它标记了RDD的分区大小 。当然,还有更直观的方法来观察RDD、达格和舞台 。Spark的Web UI提供了更丰富的视觉信息 。但是Spark的Web UI面板众多,很容易让新生第一眼就无所适从 。也许后面时间允许的话我们会单独说一下Spark的web UI 。
附言本文是Spark分布式计算科普专栏第二篇文章 。作者的无知和粗心是必然的 。如果你有什么问题,或者觉得文章中的描述有遗漏或者不恰当,欢迎在评论区留言讨论 。要掌握一项技术,书本上的知识往往只占20%,30%靠讨论,50%靠实践 。更多的讨论可以激发更多的观点、视角和见解,只有这样,对一项技术的认知和理解才能更加深入和坚定 。
在这篇博文中,我们从DAG-Spark RDD算子的边缘开始,介绍连接RDD的两种算子:变换和动作,对懒惰计算有一个初步的了解 。然后以土豆工坊为例,介绍了DAGScheduler切割DAG和生成Stage的过程和步骤 。特别需要注意的是,DAGScheduler以Shuffle为边界划分Stage 。
最后,用上一篇文章的字数简单地演示了DAG和Stage之间的关系 。细心的读者可能已经发现,Spark是一个精致而复杂的分布式计算引擎,Spark中的很多概念在这篇博文中都有“预引”,在这篇文章中多次提到 。换句话说,这个博客引入了一些概念(如懒惰计算、Shuffle、TaskScheduler、TaskSet、Spark调度系统) 。这种叙述方式可能会给你带来困惑 。毕竟用一个不明确的概念去解释另一个新概念,总是不那么靠谱的 。
俗话说“杀人偿命,欠债还钱”,在后续的专栏中,我们将继续探讨Spark的核心概念和原理,慢慢偿还欠你的技术债,尽可能还原Spark分布式内存计算引擎的全貌 。毕竟Spark调度系统哪边神圣?DAGScheduler,连同TaskScheduler、SchedulerBackend、TaskSetManager等 。,玩权利游戏,且听下回分解 。
作者简介
Leo,Spark Summit China 2017讲师,World AI Conference 2020讲师,曾任职于IBM、联想研究院、新浪微博,在数据库、数据仓库、大数据的开发和调优方面有着丰富的经验,领导了基于海量数据的大规模机器学习框架的设计和实现 。目前我是康卡斯特飞轮机器学习团队的负责人,负责机器学习应用在计算广告业务的实践、落地和推广 。我热爱技术分享,热衷于从生活的角度解读技术 。我在IBM developerWorks和程序员杂志上发表过很多技术文章 。
延伸阅读:
简单的火花(1):内存计算的起源
【sx是什么意思 shuffle什么意思】我也转发这篇文章,相信我“收到信息”,就可以免费获得价值4999元的InfoQ迷你本 。点击文末“了解更多”,可移步InfoQ官网获取最新资讯~
推荐阅读
- 春羽叶子发黄怎么办 春羽叶子发黄的解决方法
- 学富五车的近义词是什么 学富五车有哪些近义词
- 百分之90怀疑宫外孕 宫外孕有什么症状前期
- 断夜奶的方法
- 世界上最大最深的海沟介绍
- 猴赛雷是什么意思
- 柿子和香蕉可否一起吃 柿子跟香蕉能一起吃么
- 山竹和番茄可以一起吃吗 山竹和番茄可否一起吃
- 土豆和山药可否一起吃 土豆跟山药能不能一起吃