Spark 调优 下
本篇文章是在结合自己业务和这篇文章的基础上写的。
话接上文, 了解了 Spark 运行时机制后,需要通过下面几种方法来最大化利用集群资源:
- 正确的资源配置;
- 正确的并行度配置;
- 数据本身属性配置。
资源分配调优
本文主要描述 spark-on-yarn
。
在运行过程中,Spark 主要考虑两个资源:CPU 和 内存。
CPU 和 内存
每个 Executor 都有固定数量的 CPU 核 --executor-cores
和 固定的内存堆大小 --executor-memory
.
cores
参数决定了每个 executor 能并行执行的最大任务数。memory
参数会影响两个功能:- Spark 能 cache 的数据量;
- 在
grouping
,aggregations
和join
阶段能 shuffle 的数据量;
Executor
--num-executors
控制了 Executor 的数量。
从 Spark 1.3 开始,可以通过设置 spark.dynamicAllocation.enabled
属性为 true
来动态申请 Executor:
- 当有一定数量的处于 pending 状态的 task ,并且集群中有可用的资源时,会分配新的 Executor 给应用。
Yarn 资源
Spark 的资源分配还受到 Yarn 配置的影响:
对于 cores
配置,Yarn 会分配相应个数的 vcores
。
对于 内存 配置而言,会复杂一点:

除了上述考虑外,还有几点值得注意的:
- driver 的内存和 CPU 可以通过
--driver-memory
和--driver-cores
参数来调节。 - executor 的内存如果过大可能会导致 GC 时间超长。单个 executor 的内存最好不要超过 64 G,即
--executor-memory < 64G
。 - HDFS 对于大量并发的处理有困难,一般建议每个 executor 的核数不要超过 5 个,即
--executor-cores <= 5
; - 如果 executor 的配置太小了(比如只有 1 个核和 1 G 的内存)会丢失单个 JVM 内运行多个任务的优势:比如,广播变量 会被复制传送给每个 executor,如果一个 executor 不能并发执行任务,会导致复制太多 广播变量,或者等待的任务数过多。
并发度调优
Spark 本质上是一个并行数据处理引擎。和所有并行处理器一样,并发度对于系统的性能至关重要。
每个 Spark Stage 都有多个 task,每个 task 会依次处理数据。
在 Spark 调优的过程中,任务数可能是最重要的一个调优参数了。
每个阶段(Stage)中的任务数量 == 上一个 RDD 的分区数。
一个 RDD 的分区数量 == 它依赖的上一个 RDD 的分区数量。这个结论有以下几个例外:
coalesce(numPartitions)
会创建一个比父 RDD 分区更少的 RDD;union
会创建一个父 RDD 分区之和的分区数量;cartesian
会创建一个 笛卡尔乘积 个数分区的 RDD。
对于没有父 RDD 的 RDD(比如由 textFile
产生的 RDD),分区数量由底层 MapReduce InputFormat 决定。通常来说,一个 HDFS Block 会对应一个 分区。如果没有特殊参数,Spark 使用 spark.default.parallelism
来决定分区数量。
调用 rdd.partitions.size 可以查看一个 rdd 的分区数量。
需要考虑的点
task 数量太小
- 如果 task 的数量小于 所有 slots 数量,会造成 CPU 浪费;
除此之外,task 太少还会给每个 task 中的 聚合操作 带来很大的内存压力:
join
,cogroup
和*ByKey
操作都涉及到将数据存在 哈希表 或者内存中用于分组和排序:join
,cogroup
和groupByKey
都在一个 shuffle 的取数据一端将使用数据;reduceByKey
和aggregateByKey
会在 shuffle 的两端都使用这些数据。
当这些聚合操作的数据量比可用内存大很多时,就有很多混乱的现象可能发生:
- 在 HashMap 或者 Mem Buffer 中保存这么多数据量会给 GC 带来很大压力。甚至可能直接停止这个 JVM 进程。
- 当数据量太大时,Spark 可能会将数据保存在磁盘上,这样的操作在大的 shuffle 阶段会造成严重的性能瓶颈;
解决
- 当这个 Stage 是直接从 Hadoop 读取数据时,可以:
- 调用
repartition
操作(会造成 shuffle); - 配置
InputFormat
以创建更多 splits; - 在写入数据到 HDFS 的时候使用一个更小的 blockSize。
- 调用
如果这个 Stage 是从上一个 Stage 拿到数据的,可以:
- 在上一个 Stage 触发 shuffle 时的 transformation 可以接收一个
numPartitions
参数:1
val rdd2 = rdd2.reduceByKey(_ + _, numPartitions = X)
- 在上一个 Stage 触发 shuffle 时的 transformation 可以接收一个
X
应该是多少?- 最方便的解决办法:试
假设父 RDD 的分区数量(通过rdd.partitions.size
查看)是Y
:`让 X = Y * 1.5`,不停重试,直到性能不再增加。
- 另外还有一个通用方法,但在日常工作应用并不实际: 主要目标是运行足够的任务,以便每个任务不会 OOM
- 每个任务可用的内存大小为:
spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction/ spark.executor.cores
- 每个任务可用的内存大小为:
- 最方便的解决办法:试
- 当这个 Stage 是直接从 Hadoop 读取数据时,可以:
总的来说,在 Spark 应用中,提高任务的数量总是更好的选择,如果出现问题可以再调整。
这和 MapReduce
任务的调优不同,在 MapReduce
中,要求任务数的设置更保守一些。
这是因为两者在 MapReduce
中任务的启动开销更大得多。
防止臃肿的数据结构
在 Spark 中数据以 Records
的形式流转。每条数据记录都有两种呈现的形式:
- 一个反序列化的
Java 对象
- 一个
序列化二进制对象
通常来说,Spark 在内存中使用 反序列化对象
,在硬盘和网络传输中使用 序列化对象
。
spark.serializer
用于在两种呈现方式之间转换,通常推荐使用 org.apache.spark.serializer.KryoSerializer
。
如果反序列化对象太过臃肿,会导致:
- Spark 将数据 dump 到硬盘的次数增加;
- 可以
cache
的数据数量减少;
官方文档有专门的章节描述该如何调优:
在考虑内存使用调优时,有三个考虑因素:
- 你所有对象的内存使用总量
- 获取对象的代价
- GC 的开销
数据格式
如果可能,尽量使用 可扩展的二进制格式: Avro
, Parquet
, Thrift
, Protobuf
作为存储文件的格式。
一个奇怪但是合理的建议:存储时,使用 Hadoop SequenceFile
,只是每条数据的格式是上述之一。
不选择直接使用上述格式的文件的原因是:单个文件的 Size 太小了,如果直接存储对 Namenode 的压力过大。
(完)