Spark 性能调优

作者: Billmay 原文来源: https://tidb.net/blog/5957fd9e

该文档仅提供性能调优的参考

资源调优

Spark 可以通过参数配置资源分配。资源分配不合理会导致 job 运行过慢甚至失败。资源调优就是为当前 job 分配合适的资源,提高资源利用率最终加快任务运行速度。

Driver

配置项默认值参考值备注
spark.driver.memory1g不 OOM 下越小越好如果数据需收集到 driver ,那么需要根据数据大小配置内存,防止 OOM
spark.driver.memoryOverheadmax (driverMemory * 0.1, 384)/
spark.driver.cores1/

Executor

内存

  • 太大的内存会导致 JVM 垃圾回收变慢,尽量小于 64 g

  • executor 申请的总内存不能超过 node/container 的总内存,申请的内存大小为以下参数总和

    • spark.executor.memory
    • spark.executor.memoryOverhead
    • spark.memory.offHeap.size
    • spark.executor.pyspark.memory

CPU

  • 1 core 会导致无法利用 JVM 多线程,以及会使一些 broadcast 相关的参数失效
  • core 设置太多可能会使 job 速度运行变慢
配置项默认值建议值备注
spark.executor.memory1g4-8 g在不 OOM 情况下设小
spark.executor.memoryOverheadmax (executor * 0.1, 384)/用于虚拟机的开销、内部的字符串、本地开销等
spark.memory.offHeap.enabledfalse/当有需要堆外内存的操作时才配置,一般默认 false 即可
spark.memory.offHeap.size0/offHeap.enabled true 才生效
spark.executor.cores- Yarn :1
- standalone:all available cores4
Executor 数量总 core / executor.cores

Executor 内存分配

指 spark.executor.memory 的内存分配

Spark 的内存分为两大类:执行内存和存储内存。

  • 执行内存:在shuffle, join, aggregation 等计算中使用的内存。
  • 存储内存:集群中缓存和 broadcast 使用的内存。
spark.memory.fraction (默认 0.6)

用于执行内存和存储内存的百分比,剩余是为用户数据结构,Spark metadata 等预留的。在预留大小足够下提高此值,可以减少溢写磁盘。

存储内存和执行内存共享同一块空间,且有动态占用机制

  • 双方基础内存占比由 spark.memory.storageFraction 决定
  • 一方空闲时,另一方可以占用
  • 当执行内存不足,且存在被占用内存时:可要求存储内存归还占用部分。存储内存会将占用部分转存到磁盘
  • 当存储内存不足,且存在被占用内存时:不可要求执行内存归还
spark.memory.storageFraction(默认0.5):

不受驱逐的存储内存百分比,即这个占比的内存一定不会被驱逐到磁盘中

用于 task 的执行内存大小可以计算得出

spark.executor.memory * spark.memory.fraction *(1-spark.memory.storageFraction)/ spark.executor.cores

总结:

  • 发生磁盘溢写时:可尝试调大 spark.executor.memory 或提高 spark.memory.fraction
  • spark.memory.storageFraction 一般取默认值即可,不太推荐在溢写时调小该值

动态内存分配

Spark 提供了 Dynamic Executor Allocation ,它能够动态调整 executor 数量,以下场景可以考虑配置

  • 和其他团队共享集群
  • 在乎 cost
  • 某一个 application 有若干不同大小的 job

主要参数如下

spark.dynamicAllocation.enable false

spark.dynamicAllocation.executorIdleTimeout 60s   // 如果任务执行时间普遍短,可以调小 timeout

spark.dynamicAllocation.initialExecutors minExecutors // 对于大的 job,调大 initialExecutors

spark.dynamicAllocation.minExecutors 1  

spark.dynamicAllocation.maxExecutors infinity //共享的 spark 集群最好配置 maxExecutors

并行度

Spark 并行度 = min( 任务数 = 分区数,总核数 )

一个参考值:分区数 = 总核心数的 2-3 倍

分区初始数量

分区数会影响 Spark 集群的并行度,下面有两种方式来计算分区数量

  1. 内存资源紧缺时: Math.round(inputDataSize/availableTaskMemoryMB()).toInt 其中 inputDataSize 为每个 task 的数据大小,可以从 Spark UI 上查看;availableTaskMemoryMB 即为上文计算的 用于 task 的执行内存大小
  2. 内存资源足够时:分区数量先设置为集群可用总 cores *2,然后逐步往上调,寻找一个 最佳分区数 (core 的整数倍)

什么是最佳分区数呢?执行时间最短就是最佳,此外还可以根据 Spark UI 判断

  • 分区数量太多的表现:executor cpu 内存利用率过低,过多 pending 的 task
  • 分区数量太少的表现:executor 空闲

分区调整

  • 分区数量调整:使用 repartition() 可以调整分区数量,但会发生 shuffle,若减少分区,可以尝试使用 coalesce() 来避免 shuffle (一些特殊场景 repartition 更优,其增加的 shuffle 可能会减少其他地方的 shuffle,降低整体的时间)
  • 分区策略调整:若发生数据倾斜,可以通过调整合适的分区策略避免

Shuffle 调优

Shuffle 调优的目的是:避免 spill 到 disk 导致任务速度变慢

当在 Spark UI 观察到存在溢写时,一般有以下手段

  • 增加内存
  • 配置堆外内存
  • 增加分区以减少每个任务的数据量
  • 调整 shuffle 参数

相关配置如下

配置项默认值推荐备注
spark.executor.memory1g增加内存足够增加内存是最好的方式
spark.sql.shuffle.partitions200增加调大分区数可以减少每个分区的数据量防止 spill
- spark.memory.offHeap.enable
- spark.memory.offHeap.size关闭打开配置堆外内存减少 shuffle
spark.memory.fraction0.6增加增加存储内存和执行内存的总额
spark.shuffle.file.buffer32k64kshuffle write 时,会先写到 BufferedOutputStream 缓冲区中,然后再溢写到磁盘。增加此值可以减少 IO 次数,推荐 64k
spark.shuffle.service.index.cache.size100m减少缓存的 shuffle 索引文件中索引的数量,减少该值可以防止内存爆炸
spark.io.compression.lz4.blockSize32k增加增大此配置以减少 shuffle 文件的大小
spark.shuffle.service.enabledfalse/启用外部 shuffle 服务,这样 spark shuffle file 不会保存在 executor
spark.shuffle.io.backLog-1/启用 shuffle.service 时,控制 accept queue
spark.shuffle.registration.timeout5000/启用 shuffle.service 时,注册的超时时间,推荐增大

另外还可以优化代码(SQL or RDD API)防止 shuffle

  • Join 时广播小表(如使用 Broadcast Hash Join)
  • 尽量使用窄依赖而不是宽依赖
  • 使用 ReduceByKey 而不是 GroupbyKey
  • 在要进行宽依赖之前,或者进行完一系列复杂操作后,或进行完某些耗时操作后,persist RDD 进行缓存

Spark 调优工具

推荐使用一些 Spark 调优工具来帮助调优

  • Sparklens
  • Sparklint
  • Dr Elephant

参考文档


版权声明:本文为TiDBer原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。