#|Spark优化总结(二)——代码编写


文章目录

  • Spark优化总结(二)——代码编写
    • 1. 前言
    • 2. 选择合理的数据结构
    • 3. Java容器与Scala容器互转
    • 4. 关注经常执行的代码块
    • 5. Spark API
    • 6. 广播的问题
    • 7. 数据传输与解析
    • 8. 异常数据处理
    • 9. 数据同步锁问题
    • 10. 设计一个合宜的项目结构
【#|Spark优化总结(二)——代码编写】
Spark优化总结(二)——代码编写 1. 前言
  • 编写一个性能较高的Spark应,需要有良好的代码编写功底。一块不好的代码,通常会导致BUG、效率缓慢等问题,而经常需要执行的代码块部分尤为如此。合理的代码设计能够较高可能的避免BUG、有效的提升运行效率,同时给人以良好的可读性。
  • 另外,这部分在大多数程序中都是通用的。
2. 选择合理的数据结构
  • 通常来说,选择错误的数据结构会导致额外的内存开销、性能等问题。我们应当根据情况选择合适的数据结构。
  • 示例:
    • 尽量使用嵌套较少的数据结构
      • 例如,一个PairRDD,尽量应该由RDD[(String, ((String, Long), (String, String, String)))] 改为 RDD[(String, (String, Long, String, String, String))]
    • 尽量选择内存占用较少、效率较高的类型
      • 考虑用Int替代String,例如用id替代人名(如果人名字符串较长的话)
      • 考虑引入koloboke的Set、Map,替代现有的Set、Map
      • 考虑引入FastUtil中的类型替代现有类型
    • ListBuffer(链表)与ArrayBuffer(数组)的抉择
      • 经常需要根据角标做查询时,不要用链表,用数组
      • 不能确定容器的大小,需要节省空间的话,用链表
      • 只需要对容器进行全遍历或往后追加的话,用链表
    • 判断一个对象是否存在容器中时,使用Hash类型容器,例如HashSet
    • 需要按范围快速查询时,使用Tree类型容器,例如TreeSet
3. Java容器与Scala容器互转
  • 开发Spark时很可能会使用Java的代码,使用的容器需要在不同代码间做转换。
  • 我们应当尽量使用iterator方式遍历容器,少使用asJava、asScala转换(会新生成容器)。特别是在被反复调用的代码块中,效率影响极大。
4. 关注经常执行的代码块
  • 经常执行的代码块是占用Spark应用运行时间的很大一部分,优化好这一部分代码,能够显著提升运行效率。
  • 示例:
    • RDD算子
      • 例如一个map操作内的代码块,显然前面有多少条数据,就需要被执行多少次。如果在里面进行较耗时的操作,会非常影响性能。例如:new一个容器、做一次远程访问请求、为每个map操作创建JDBC连接等
    • while、for循环
      • 同样,while、for循环内也是经常执行的代码块
    • 公共代码块
      • 公共代码块,例如工具类,是经常需要被各处调用的代码,其性能也需要多加关注
5. Spark API
  • 选择合适的SparkAPI来处理数据,才会得到良好的效果。我罗列了一些示例如下:
  • 示例:
    • 如果存储数据想生成较少数量的文件,可以使用repartition与coalesce。需要注意的是存储之前对数据处理的Stage比较花时间的话,建议使用repartition,因为coalesce虽然不会发生shuffle,但是会降低最后一个Stage的并行度
    • 使用reduceByKey、aggregateByKey、combineByKey替代groupBy
    • join时想办法利用广播进行map-side join
    • 读取表时,添加schema,可以加速表的读取速度
    • 处理数据之前,提前做好过滤,效率更高
    • 针对于SparkAPI(例如mapPartition、mapValues)传给我们的iterator对象,尽量接着调用iterator对象的方法进行处理。如果遍历iterator对象,将结果放入另一个实例化的容器(例如ListBuffer),将影响性能,在每个iterator对象包含的数据量较大时,可能会假死(速度极慢)。因为调用iterator对象的方法时,数据是一条一条取出、处理的,不会额外占用很大的内存空间。
    • 能写在一个算子内的逻辑,尽量写一起,不要写map.map类似操作。从Spark处理逻辑上来说,这样写没问题,但是写多个map很可能会由于失误导致每个算子中多定义一部分数据(或多处理)。例如,你可以用flatMap替代filter.map操作,用map替代mapValue.map操作。
      • 问题示例 (原始数据示例 “38, laowang, beijing”)
      • 分多次的写法(filter+map),可能会由于失误重复解析了2次line
      lineRDD .filter(line => "xiaoming".equals(line.split(",")(1).trim)) .map { line => val fields = line.split(",") val name = fields(1).trim val age = fields(0).trim.toInt val address = fields(2).trim(name, age, address) }

      • 一次flatMap的示例,不容易出错
      lineRDD .flatMap { line => val fields = line.split(",") // 为了快,你可以想法先只解析第一个name val name = fields(1).trim if (!"xiaoming".equals(name)) { val age = fields(0).trim.toInt // toInt有解析错误的可能,你也可以返回None val address = fields(2).trimSome(name, age, address) } else { None } }

    • 流式处理时,需要按业务时间设定窗口时,应使用StructuredStreaming
    • 各个Excutor的多个core处理数据时,如果RDD的处理中存在println,会极度影响效率,因为打印自带同步锁
    • 在算子内的System.out.println打印的数据是在各个节点执行的,需要查看节点日志
    • 根据情况,少使用collect,使用take。collect会获取所有数据,可能导致Driver内存崩溃
    • 不要重复创建相同的RDD,尽量复用RDD
    • 需要多次重复使用的RDD、DataFrame应该做缓存(选择合理的持久化级别),用完缓存后要记得清除缓存(在合理的位置调用清除)
    • 较长的RDD链(尤其是包含宽依赖的),可以适当做checkpoint,防止意外故障导致重复计算
    • 在repartition后马上需要排序的话,可以使用repartitionAndSortWithinPartitions(同HIVE的cluster by)代替repartition+sort
    • 有的操作不需要为每条数据执行一次(例如JDBC连接),应该使用mapPartitions、foreachPartirions。或者考虑单例设计模式,为每个JVM创建一个。
    • 其他自己慢慢想吧 =W= 我后续也会添加…
    • 另外,要看数据倾斜怎么处理的示例,请看Spark代码可读性与性能优化——示例五、六、七、八
6. 广播的问题
  • 需要将数据传入RDD算子中时,应当使用广播
  • 广播用完后尽量删除,调用 dataBroadcast.unpersist()
  • 广播的使用还会会有一些问题点,具体请看Spark代码可读性与性能优化——示例五(HashJoin)中的常见的误操作、陷阱
7. 数据传输与解析
  • kryo序列化、cvs、json等
  • 关键点:不解析不需要的部分
  • 具体示例:Spark代码可读性与性能优化——示例九(数据传输与解析)
8. 异常数据处理
  • 在开发中,获取的数据源存在脏数据是很正常的。
  • 一般情况下,遇到脏数据,解析错误,Spark应用报错停止,接着你又找到异常的数据样例,再修改代码逻辑。或许你反复了处理了几十次,但仍然还会偶尔出现脏数据导致的异常(因为你无法预测数据源会存在什么样的脏数据)。这个时候,建议您这样做:
  • 异常数据处理示例
    val data = https://www.it610.com/article/List("小明,18,北京,男", "小李,34,四川,女", "小王,!@#,重庆,男" ) spark.sparkContext.parallelize(data) // 利用flatMap的特性来处理脏数据 .flatMap { line => val fields = line.split(',') val name = fields(0) val ageStr = fields(1) val address = fields(2) val gender = fields(3)try { // age中可能存在无法解析的脏数据 val age = Integer.parseInt(ageStr)Some((name, age, address, gender)) } catch { // 如果解析异常,直接不要该数据 case_: Throwable => None } }

9. 数据同步锁问题
  • 大量数据的处理,如果需要访问某个点(对象),那么就存在同步锁的问题。大量的数据走同步锁是非常影响性能的,我们需要尽可能的降低锁的使用量。
  • 下面列出几条建议:
    • 多个线程读,多个线程写 -> 用Synchronized、ReentraintLock等,锁住读/写方法,或用并发类型的容器
    • 多个线程读,少量线程写 -> 用读写锁 ReentrantReadWriteLock
    • 多个线程读,单个线程写 -> 用volatile关键字修饰对象,进行广播
      • 例如,先用volatile修饰原始对象A。当写的线程需要更新对象时,先new一个对象B,复制原对象A的属性给对象B,更新对象B,然后将对象A的变量名引用指向对象B,即可。
10. 设计一个合宜的项目结构
  • 良好的项目结构能够减少犯错的几率、提高项目的易读性
  • 具体示例请看:Spark代码可读性与性能优化——示例十(项目结构)

    推荐阅读