本篇文章是在结合自己业务和这篇文章的基础上写的。

话接上文, 了解了 Spark 运行时机制后,需要通过下面几种方法来最大化利用集群资源:

  • 正确的资源配置;
  • 正确的并行度配置;
  • 数据本身属性配置。

资源分配调优

本文主要描述 spark-on-yarn

在运行过程中,Spark 主要考虑两个资源:CPU 和 内存

CPU 和 内存

每个 Executor 都有固定数量的 CPU 核 --executor-cores 和 固定的内存堆大小 --executor-memory.

  • cores 参数决定了每个 executor 能并行执行的最大任务数。
  • memory 参数会影响两个功能:
    • Spark 能 cache 的数据量;
    • grouping, aggregationsjoin 阶段能 shuffle 的数据量;

Executor

--num-executors 控制了 Executor 的数量。

从 Spark 1.3 开始,可以通过设置 spark.dynamicAllocation.enabled 属性为 true 来动态申请 Executor:

  • 当有一定数量的处于 pending 状态的 task ,并且集群中有可用的资源时,会分配新的 Executor 给应用。

Yarn 资源

Spark 的资源分配还受到 Yarn 配置的影响:

对于 cores 配置,Yarn 会分配相应个数的 vcores

对于 内存 配置而言,会复杂一点:

Alt text

除了上述考虑外,还有几点值得注意的:

  • driver 的内存和 CPU 可以通过 --driver-memory--driver-cores 参数来调节。
  • executor 的内存如果过大可能会导致 GC 时间超长。单个 executor 的内存最好不要超过 64 G,即 --executor-memory < 64G
  • HDFS 对于大量并发的处理有困难,一般建议每个 executor 的核数不要超过 5 个,即 --executor-cores <= 5
  • 如果 executor 的配置太小了(比如只有 1 个核和 1 G 的内存)会丢失单个 JVM 内运行多个任务的优势:比如,广播变量 会被复制传送给每个 executor,如果一个 executor 不能并发执行任务,会导致复制太多 广播变量,或者等待的任务数过多。

并发度调优

Spark 本质上是一个并行数据处理引擎。和所有并行处理器一样,并发度对于系统的性能至关重要。

每个 Spark Stage 都有多个 task,每个 task 会依次处理数据。

在 Spark 调优的过程中,任务数可能是最重要的一个调优参数了。

每个阶段(Stage)中的任务数量 == 上一个 RDD 的分区数。

一个 RDD 的分区数量 == 它依赖的上一个 RDD 的分区数量。这个结论有以下几个例外:

  • coalesce(numPartitions) 会创建一个比父 RDD 分区更少的 RDD;
  • union 会创建一个父 RDD 分区之和的分区数量;
  • cartesian 会创建一个 笛卡尔乘积 个数分区的 RDD。

对于没有父 RDD 的 RDD(比如由 textFile 产生的 RDD),分区数量由底层 MapReduce InputFormat 决定。通常来说,一个 HDFS Block 会对应一个 分区。如果没有特殊参数,Spark 使用 spark.default.parallelism 来决定分区数量。

调用 rdd.partitions.size 可以查看一个 rdd 的分区数量。

需要考虑的点

task 数量太小

  • 如果 task 的数量小于 所有 slots 数量,会造成 CPU 浪费;
  • 除此之外,task 太少还会给每个 task 中的 聚合操作 带来很大的内存压力:

    • join, cogroup*ByKey 操作都涉及到将数据存在 哈希表 或者内存中用于分组和排序:
      • join, cogroupgroupByKey 都在一个 shuffle 的取数据一端将使用数据;
      • reduceByKeyaggregateByKey 会在 shuffle 的两端都使用这些数据。
  • 当这些聚合操作的数据量比可用内存大很多时,就有很多混乱的现象可能发生:

    1. 在 HashMap 或者 Mem Buffer 中保存这么多数据量会给 GC 带来很大压力。甚至可能直接停止这个 JVM 进程。
    2. 当数据量太大时,Spark 可能会将数据保存在磁盘上,这样的操作在大的 shuffle 阶段会造成严重的性能瓶颈;
  • 解决

    • 当这个 Stage 是直接从 Hadoop 读取数据时,可以:
      • 调用 repartition 操作(会造成 shuffle);
      • 配置 InputFormat 以创建更多 splits;
      • 在写入数据到 HDFS 的时候使用一个更小的 blockSize
    • 如果这个 Stage 是从上一个 Stage 拿到数据的,可以:

      • 在上一个 Stage 触发 shuffle 时的 transformation 可以接收一个 numPartitions 参数:
        1
        val rdd2 = rdd2.reduceByKey(_ + _, numPartitions = X)
    • X 应该是多少?

      1. 最方便的解决办法:
        假设父 RDD 的分区数量(通过 rdd.partitions.size 查看)是 Y
        `让 X = Y * 1.5`,不停重试,直到性能不再增加。
        
      2. 另外还有一个通用方法,但在日常工作应用并不实际: 主要目标是运行足够的任务,以便每个任务不会 OOM
        • 每个任务可用的内存大小为:
          spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction/ spark.executor.cores

总的来说,在 Spark 应用中,提高任务的数量总是更好的选择,如果出现问题可以再调整。

这和 MapReduce 任务的调优不同,在 MapReduce 中,要求任务数的设置更保守一些。

这是因为两者在 MapReduce 中任务的启动开销更大得多。

防止臃肿的数据结构

在 Spark 中数据以 Records 的形式流转。每条数据记录都有两种呈现的形式:

  • 一个反序列化的 Java 对象
  • 一个 序列化二进制对象

通常来说,Spark 在内存中使用 反序列化对象,在硬盘和网络传输中使用 序列化对象

spark.serializer 用于在两种呈现方式之间转换,通常推荐使用 org.apache.spark.serializer.KryoSerializer

如果反序列化对象太过臃肿,会导致:

  • Spark 将数据 dump 到硬盘的次数增加;
  • 可以 cache 的数据数量减少;

官方文档有专门的章节描述该如何调优:

在考虑内存使用调优时,有三个考虑因素:

  1. 你所有对象的内存使用总量
  2. 获取对象的代价
  3. GC 的开销

数据格式

如果可能,尽量使用 可扩展的二进制格式: Avro, Parquet, Thrift, Protobuf 作为存储文件的格式。

一个奇怪但是合理的建议:存储时,使用 Hadoop SequenceFile,只是每条数据的格式是上述之一。

不选择直接使用上述格式的文件的原因是:单个文件的 Size 太小了,如果直接存储对 Namenode 的压力过大。

(完)