笛里谁知壮士心,沙头空照征人骨。这篇文章主要讲述YYDS|不得不看的Spark内存管理机制相关的知识,希望能为你提供帮助。
今天和大家介绍Spark的内存模型,干货多多,不要错过奥~
与数据频繁落盘的??Mapreduce?
?引擎不同,Spark是基于??内存?
?的分布式计算引擎,其内置强大的内存管理机制,保证数据??优先内存?
?处理,并支持数据磁盘存储。
本文将重点探讨Spark的内存管理是如何实现的,内容如下:
- Spark内存概述
- Spark 内存管理机制
- Spark on Yarn模式的内存分配
- 用户在?
?Driver?
?端提交任务,初始化运行环境(SparkContext等) - Driver根据配置向?
?ResoureManager?
?申请资源(executors及内存资源) - ResoureManager资源管理器选择合适的?
?worker?
?节点创建executor进程 - ?
?Executor?
?向Driver注册,并等待其分配??task?
?任务 - Driver端完成?
?SparkContext?
?初始化,创建DAG,分配taskset到Executor上执行。 - Executor启动线程执行task任务,返回结果。
Spark在任务运行过程中,会启动?
?Driver?
?和??Executor?
?两个进程。其中Driver进程除了作为Spark提交任务的执行节点外,还负责申请Executor资源、注册Executor和提交Task等,完成整个任务的协调调度工作。而Executor进程负责在工作节点上执行具体的??task?
?任务,并与Driver保持通信,返回结果。由上可见,Spark的数据计算主要在?
?Executor?
?进程内完成,而Executor对于RDD的??持久化?
?存储以及??Shuffle?
?运行过程,均在Spark内存管理机制下统一进行,其内运行的task任务也??共享?
?Executor内存,因此本文主要围绕Executor的内存管理进行展开描述。Spark内存分为?
?堆内内存?
?(On-heap Memory)和??堆外内存?
?(Off-heap Memory)。其中堆内内存基于??JVM内存?
?模型,而堆外内存则通过调用底层??JDK Unsafe API?
?。两种内存类型统一由Spark内存管理模块接口实现。def acquireStorageMemory(...): Boolean //申请存储内存
def acquireExecutionMemory(...): Long //申请执行内存
def releaseStorageMemory(...): Unit //释放执行内存
def releaseStorageMemory(...): Unit //释放存储内存
1.1 Spark的堆内内存
Executo作为一个?
?JVM?
?进程,其内部基于JVM的内存管理模型。Spark在其之上封装了统一的内存管理接口?
?MemoryManager?
?,通过对JVM堆空间进行合理的规划(逻辑上),完成对象实例内存空间的??申请?
?和??释放?
?。保证满足Spark运行机制的前提下,最大化利用内存空间。Spark中堆内内存参数有:?
1. 这里涉及到的??JVM堆?
??空间概念,简单描述就是在程序中,关于对象实例|数组的??创建?
??、??使用?
??和??释放?
?的内存,都会在JVM中的一块被称作为"JVM堆"内存区域内进行管理分配。
2. Spark程序在创建对象后,JVM会在堆内内存中??分配?
??一定大小的空间,创建??Class对象?
?并返回对象引用,Spark保存对象引用,同时记录占用的内存信息。
?-executor-memory?
???-executor-memory?
??或??-spark-executor-memory?
??。通常是任务提交时在参数中进行定义,且与??-executor-cores?
?等相关配置一起被提交至ResourceManager中进行Executor的资源申请。在Worker节点创建一定数目的Executor,每个Executor被分配?Spark堆内内存主要分为??-executor-memory?
?大小的堆内内存。Executor的堆内内存被所有的Task线程任务共享,多线程在内存中进行数据交换。
?Storage?
??(存储内存)、??Execution?
??(执行内存)和??Other?
?(其他) 几部分。- Storage用于缓存RDD数据和broadcast广播变量的内存使用
- Execution仅提供shuffle过程的内存使用
- Other提供Spark内部对象、用户自定义对象的内存空间
1.2 Spark的堆外内存
Spark?
?1.6?
?在堆内内存的基础上引入了堆外内存,进一步优化了Spark内存的使用率。其实如果你有过java相关编程经历的话,相信对堆外内存的使用并不陌生。其底层调用?Spark在2.x之后,摒弃了之前版本的??基于C?
?的JDK Unsafe类方法,通过??指针?
?直接进行内存的操作,包括内存空间的申请、使用、删除释放等。
?Tachyon?
?,采用Java中常见的基于??JDK Unsafe API?
?来对堆外内存进行管理。此模式不在JVM中申请内存,而是直接操作系统内存,减少了JVM中内存??空间切换?
?的开销,降低了??GC回收?
?占用的消耗,实现对内存的精确管控。堆外内存默认情况下是不开启的,需要在配置中将?
?spark.memory.offHeap.enabled?
?设为True,同时配置??spark.memory.offHeap.size?
?参数设置堆大小。对于堆外内存的划分,仅包含Execution(执行内存)和Storage(存储内存)两块区域,且被所有task线程任务共享。
2Spark内存管理机制前文说到,不同模式下的Spark堆内、堆外内存区域划分占比是不同的。
在Spark1.6之前,Spark采用的是?
?静态管理?
?(Static Memory Manager)模式,Execution内存和Storage内存的分配占比全部是??静态?
?的,其值为系统预先设置的默认参数。在Spark1.6后,为了考虑内存管理的动态灵活性,Spark的内存管理改为?
?统一管理?
?(Unified Memory Manager)模式,支持Storage和Execution内存??动态占用?
?。至于静态管理方式任然被保留,可通过??spark.memory.useLegacyMode?
?参数启用。2.1静态内存管理(Static Memory Manager)
Spark最原始的内存管理模式,默认通过系统固定的内存配置参数,分配相应的Storage、Execution等内存空间,支持用户自定义修改配置。
1. 堆内内存分配
堆内内存空间整体被分为?
?Storage?
?(存储内存)、??Execution?
?(执行内存)、??Other?
?(其他内存)三部分,默认按照??6:2:2?
?的比率划分。其中Storage内存区域参数: ??spark.storage.memoryFraction?
?(默认为0.6),Execution内存区域参数: ??spark.shuffle.memoryFraction?
?(默认为0.2)。Other内存区域主要用来存储用户定义的数据结构、Spark内部元数据,占系统内存的20%。在Storage内存区域中,10%的大小被用作?
?Reserved?
?预留空间,防止内存溢出情况,由参数: ??spark.shuffle.safetyFraction?
?(默认0.1)控制。90%的空间当作可用的Storage内存,这里是Executor进行RDD数据缓存和broadcast数据的内存区域,参数和Reserved一致。还有一部分??Unroll?
?区域,这一块主要存储Unroll过程的数据,占用20%的可用Storage空间。Execution内存区域中,20%的大小被用作Reserved预留空间,防止OOM和其他内存不够的情况,由参数: ?
Unroll过程:
RDD在缓存到内存之前,partition
中record对象实例在堆内other内存区域中的不连续
空间中存储。RDD的缓存过程中, 不连续存储空间内的partition被转换为连续存储空间
的Block对象,并在Storage内存区域存储,此过程被称作为Unroll(展开)。
?spark.shuffle.safetyFraction?
??(默认0.2)控制。80%的空间当作可用的Execution内存,缓存shuffle过程的中间数据,参数:??spark.shuffle.safetyFraction?
?(默认0.8)。计算公式
可用的存储内存 =
systemMaxMemory
* spark.storage.memoryFraction
* spark.storage.safetyFraction
可用的执行内存 =
systemMaxMemory
* spark.shuffle.memoryFraction
* spark.shuffle.safetyFraction
2. 堆外内存
相较于堆内内存,堆外内存的分配较为简单。堆外内存默认为?
?384M?
?,由系统参数??spark.yarn.executor.memoryOverhead?
?设定。整体内存分为Storage和Execution两部分,此部分分配和堆内内存一致,由参数: ??spark.memory.storageFaction?
?决定。堆外内存一般存储序列化后的二进制数据(字节流),在存储空间中是一段连续的内存区域,其大小可精确计算,故此时无需设置预留空间。3. 总结
- 实现机制简单,易理解
- 容易出现内存失衡的问题,即Storage、Execution一方内存过剩,一方内容不足
- 需要开发人员充分了解存储机制,调优不便
?youlong525?
?2.2统一内存管理(Unified Memory Manager)
为了解决(Static Memory Manager)静态内存管理的?
?内存失衡?
?等问题,Spark在1.6之后使用了一种新的内存管理模式—Unified Memory Manager(统一内存管理)。在新模式下,移除了旧模式下的Executor内存静态占比分配,启用了??内存动态占比机制?
?,并将Storage和Execution划分为统一共享内存区域。1. 堆内内存
堆内内存整体划分为?
?Usable Memory?
?(可用内存)和??Reversed Memory?
?(预留内存)两大部分。其中预留内存作为OOM等异常情况的内存使用区域,默认被分配300M的空间。可用内存可进一步分为(Unified Memory)统一内存和Other内存其他两部分,默认占比为6:4。统一内存中的Storage(存储内存)和Execution(执行内存)以及Other内存,其参数及使用范围均与静态内存模式一致,不再重复赘述。只是此时的Storage、Execution之间启用了?
?动态内存占用?
?机制。2. 堆外内存
动态内存占用机制
- 设置内存的初始值,即Execution和Storage均需设定各自的内存区域范围(默认参数0.5)
- 若存在一方内存不足,另一方内存空余时,可占用对方内存空间
- 双方内存均不足时,需落盘处理
- Execution内存被占用时,Storage需将此部分转存硬盘并归还空间
- Storage内存被占用时,Execution无需归还
和静态管理模式分配一致,堆外内存默认值为384M。整体分为Storage和Execution两部分,且启用?
?动态内存占用?
?机制,其中默认的初始化占比值均为0.5。计算公式
可用的存储& 执行内存 =
(systemMaxMemory -ReservedMemory)
* spark.memoryFraction
* spark.storage.storageFraction
(启用内存动态分配机制,己方内存不足时可占用对方)
3. 总结
- 动态内存占比,提升内存的合理利用率
- 统一管理Storage和Execution内存,便于调优和维护
- 由于Execution占用Storage内存可不规划,存在Storage内存不够频繁GC的情况
3Spark On Yarn模式的内存分配由于Spark内存管理机制的健全,Executor能够高效的处理节点中RDD的内存运算和数据流转。而作为分配Executor内存的资源管理器Yarn,如何在过程中保证内存的最合理化分配,也是一个值得关注的问题。
首先看下Spark On Yarn的基本流程:
- Spark ??
?Driver?
??端提交程序,并向Yarn申请Application - Yarn接受请求响应,在NodeManager节点上创建AppMaster
- ?
?AppMaster?
?向Yarn ResourceManager申请资源(Container) - 选择合适的节点创建Container(Executor进程)
- 后续的Driver启动调度,运行任务
var executorMemory = 1024
val MEMORY_OVERHEAD_FACTOR = 0.10
val MEMORY_OVERHEAD_MIN = 384
// Executo堆外内存
val executorMemoryOverhead =
sparkConf.getInt("spark.yarn.executor
.memoryOverhead",
math.max((MEMORY_OVERHEAD_FACTOR
* executorMemory).toInt
, MEMORY_OVERHEAD_MIN))
// Executor总分配内存
val executorMem= args.executorMemory
+ executorMemoryOverhead
因此假设当我们提交一个spark程序时,如果设置?
?-executor-memory?
?=5g。spark-submit
--master yarn-cluster
--name test
--executor-memory 5g
--driver-memory 5g
根据源码中的计算公式可得:
memoryMem=args.executorMemory(5120) + executorMemoryOverhead(512) = 5632M然而事实上查看?
?Yarn UI?
?上的内存却不是这个数值?这是因为Yarn默认开启了??资源规整化?
?。1. Yarn的资源规整化
Yarn会根据最小可申请资源数、最大可申请资源数和规整化因子综合判断当前申请的资源数,从而合理规整化应用程序资源。
- 定义
【YYDS|不得不看的Spark内存管理机制】程序申请的资源如果不是该因子的整数倍,则将被修改为最小的整数倍对应的值
公式: ceil(a/b)*推荐阅读
- 谷粒商城学习日记(20)——Vue语法入门
- Spring 专场「IOC 容器」不看源码就带你认识核心流程以及运作原理
- docker 命令及问题
- 2万字聊聊什么是秒杀系统(上)
- 树莓派4B基于docker搭建devops平台
- K8S(十三)探针
- 跟着动画学习 GO 数据结构之 Go 链表
- 如何彻底删除EKS中一直卡在Terminating的Namespace
- 无网络环境安装mysql8.0