文章目录
- Spark优化总结(二)——代码编写
-
- 1. 前言
- 2. 选择合理的数据结构
- 3. Java容器与Scala容器互转
- 4. 关注经常执行的代码块
- 5. Spark API
- 6. 广播的问题
- 7. 数据传输与解析
- 8. 异常数据处理
- 9. 数据同步锁问题
- 10. 设计一个合宜的项目结构
Spark优化总结(二)——代码编写 1. 前言
- 编写一个性能较高的Spark应,需要有良好的代码编写功底。一块不好的代码,通常会导致BUG、效率缓慢等问题,而经常需要执行的代码块部分尤为如此。合理的代码设计能够较高可能的避免BUG、有效的提升运行效率,同时给人以良好的可读性。
- 另外,这部分在大多数程序中都是通用的。
- 通常来说,选择错误的数据结构会导致额外的内存开销、性能等问题。我们应当根据情况选择合适的数据结构。
- 示例:
- 尽量使用嵌套较少的数据结构
- 例如,一个PairRDD,尽量应该由RDD[(String, ((String, Long), (String, String, String)))] 改为 RDD[(String, (String, Long, String, String, String))]
- 尽量选择内存占用较少、效率较高的类型
- 考虑用Int替代String,例如用id替代人名(如果人名字符串较长的话)
- 考虑引入koloboke的Set、Map,替代现有的Set、Map
- 考虑引入FastUtil中的类型替代现有类型
- ListBuffer(链表)与ArrayBuffer(数组)的抉择
- 经常需要根据角标做查询时,不要用链表,用数组
- 不能确定容器的大小,需要节省空间的话,用链表
- 只需要对容器进行全遍历或往后追加的话,用链表
- 判断一个对象是否存在容器中时,使用Hash类型容器,例如HashSet
- 需要按范围快速查询时,使用Tree类型容器,例如TreeSet
- 尽量使用嵌套较少的数据结构
- 开发Spark时很可能会使用Java的代码,使用的容器需要在不同代码间做转换。
- 我们应当尽量使用iterator方式遍历容器,少使用asJava、asScala转换(会新生成容器)。特别是在被反复调用的代码块中,效率影响极大。
- 经常执行的代码块是占用Spark应用运行时间的很大一部分,优化好这一部分代码,能够显著提升运行效率。
- 示例:
- RDD算子
- 例如一个map操作内的代码块,显然前面有多少条数据,就需要被执行多少次。如果在里面进行较耗时的操作,会非常影响性能。例如:new一个容器、做一次远程访问请求、为每个map操作创建JDBC连接等
- while、for循环
- 同样,while、for循环内也是经常执行的代码块
- 公共代码块
- 公共代码块,例如工具类,是经常需要被各处调用的代码,其性能也需要多加关注
- RDD算子
- 选择合适的SparkAPI来处理数据,才会得到良好的效果。我罗列了一些示例如下:
- 示例:
- 如果存储数据想生成较少数量的文件,可以使用repartition与coalesce。需要注意的是存储之前对数据处理的Stage比较花时间的话,建议使用repartition,因为coalesce虽然不会发生shuffle,但是会降低最后一个Stage的并行度
- 使用reduceByKey、aggregateByKey、combineByKey替代groupBy
- join时想办法利用广播进行map-side join
- 读取表时,添加schema,可以加速表的读取速度
- 处理数据之前,提前做好过滤,效率更高
- 针对于SparkAPI(例如mapPartition、mapValues)传给我们的iterator对象,尽量接着调用iterator对象的方法进行处理。如果遍历iterator对象,将结果放入另一个实例化的容器(例如ListBuffer),将影响性能,在每个iterator对象包含的数据量较大时,可能会假死(速度极慢)。因为调用iterator对象的方法时,数据是一条一条取出、处理的,不会额外占用很大的内存空间。
- 能写在一个算子内的逻辑,尽量写一起,不要写map.map类似操作。从Spark处理逻辑上来说,这样写没问题,但是写多个map很可能会由于失误导致每个算子中多定义一部分数据(或多处理)。例如,你可以用flatMap替代filter.map操作,用map替代mapValue.map操作。
- 问题示例 (原始数据示例 “38, laowang, beijing”)
- 分多次的写法(filter+map),可能会由于失误重复解析了2次line
lineRDD .filter(line => "xiaoming".equals(line.split(",")(1).trim)) .map { line => val fields = line.split(",") val name = fields(1).trim val age = fields(0).trim.toInt val address = fields(2).trim(name, age, address) }
- 一次flatMap的示例,不容易出错
lineRDD .flatMap { line => val fields = line.split(",") // 为了快,你可以想法先只解析第一个name val name = fields(1).trim if (!"xiaoming".equals(name)) { val age = fields(0).trim.toInt // toInt有解析错误的可能,你也可以返回None val address = fields(2).trimSome(name, age, address) } else { None } }
- 流式处理时,需要按业务时间设定窗口时,应使用StructuredStreaming
- 各个Excutor的多个core处理数据时,如果RDD的处理中存在println,会极度影响效率,因为打印自带同步锁
- 在算子内的System.out.println打印的数据是在各个节点执行的,需要查看节点日志
- 根据情况,少使用collect,使用take。collect会获取所有数据,可能导致Driver内存崩溃
- 不要重复创建相同的RDD,尽量复用RDD
- 需要多次重复使用的RDD、DataFrame应该做缓存(选择合理的持久化级别),用完缓存后要记得清除缓存(在合理的位置调用清除)
- 较长的RDD链(尤其是包含宽依赖的),可以适当做checkpoint,防止意外故障导致重复计算
- 在repartition后马上需要排序的话,可以使用repartitionAndSortWithinPartitions(同HIVE的cluster by)代替repartition+sort
- 有的操作不需要为每条数据执行一次(例如JDBC连接),应该使用mapPartitions、foreachPartirions。或者考虑单例设计模式,为每个JVM创建一个。
- 其他自己慢慢想吧 =W= 我后续也会添加…
- 另外,要看数据倾斜怎么处理的示例,请看Spark代码可读性与性能优化——示例五、六、七、八
- 需要将数据传入RDD算子中时,应当使用广播
- 广播用完后尽量删除,调用 dataBroadcast.unpersist()
- 广播的使用还会会有一些问题点,具体请看Spark代码可读性与性能优化——示例五(HashJoin)中的常见的误操作、陷阱
- kryo序列化、cvs、json等
- 关键点:不解析不需要的部分
- 具体示例:Spark代码可读性与性能优化——示例九(数据传输与解析)
- 在开发中,获取的数据源存在脏数据是很正常的。
- 一般情况下,遇到脏数据,解析错误,Spark应用报错停止,接着你又找到异常的数据样例,再修改代码逻辑。或许你反复了处理了几十次,但仍然还会偶尔出现脏数据导致的异常(因为你无法预测数据源会存在什么样的脏数据)。这个时候,建议您这样做:
- 异常数据处理示例
val data = https://www.it610.com/article/List("小明,18,北京,男", "小李,34,四川,女", "小王,!@#,重庆,男" ) spark.sparkContext.parallelize(data) // 利用flatMap的特性来处理脏数据 .flatMap { line => val fields = line.split(',') val name = fields(0) val ageStr = fields(1) val address = fields(2) val gender = fields(3)try { // age中可能存在无法解析的脏数据 val age = Integer.parseInt(ageStr)Some((name, age, address, gender)) } catch { // 如果解析异常,直接不要该数据 case_: Throwable => None } }
- 大量数据的处理,如果需要访问某个点(对象),那么就存在同步锁的问题。大量的数据走同步锁是非常影响性能的,我们需要尽可能的降低锁的使用量。
- 下面列出几条建议:
- 多个线程读,多个线程写 -> 用Synchronized、ReentraintLock等,锁住读/写方法,或用并发类型的容器
- 多个线程读,少量线程写 -> 用读写锁 ReentrantReadWriteLock
- 多个线程读,单个线程写 -> 用volatile关键字修饰对象,进行广播
- 例如,先用volatile修饰原始对象A。当写的线程需要更新对象时,先new一个对象B,复制原对象A的属性给对象B,更新对象B,然后将对象A的变量名引用指向对象B,即可。
- 良好的项目结构能够减少犯错的几率、提高项目的易读性
- 具体示例请看:Spark代码可读性与性能优化——示例十(项目结构)
推荐阅读
- 高性能|高性能软件系统设计中应该考虑的问题
- #|【opencv】关于透视变换
- 《“深入浅出”数据结构》|链表OJ经典题浅刷< 1 >(看完不再害怕链表题)
- 数据结构|基本排序算法总结(Java实现)
- 数据结构|winRAR真难用,我决定自创一个(炼虚期) 文件的压缩与解压 将色色一网打尽
- 备战蓝桥|【备战蓝桥,冲击省一】高精度算法实现加减乘除
- 备战蓝桥|备战蓝桥,冲击省一 进制转换 你不会还不会吧()
- 备战蓝桥|备战蓝桥,冲击省一 二分查找法 看完你就会了
- 备战蓝桥|【2021年蓝桥省赛真题】赛前最后冲刺,省一我来啦