本篇文章是在结合自己业务和这篇文章的基础上写的。

前言

在编写和维护一个 Spark 程序的过程中,会遇到一些相关术语:

  • transformation, action, RDD, DataSet, DataFrame

    理解这一层次的原语对于写出一个 Spark 很重要。因为一个 Spark 程序都是由这些结构和操作构成的。

  • job, stage, task

    如果想要写出一个好的 Spark 程序,理解这一层的原语就变得很重要。因为当你发现你的 Spark 程序运行得很慢或者报错了,你需要到 web UI 上查看原因,而这就是你需要理解这些术语的原因。

总之,要写出一个高效的 Spark 程序,理解其底层运行模型是至关重要的。

Spark 如何执行程序

众所周知,一个 Spark 应用由一个 driver 进程和多个 executor 进程组成。这些进程都分散在一个集群的多个节点上。

driverexecutor 概述

driver 负责从上层掌控所有工作的控制流;
executor 负责以 task 的形式执行这些工作;除此之外,executor 还负责存储用户手动 cache() 的数据。

driverexecutor 在整个应用运行期间通常会保持不变。(1.3 以后可以用动态资源分配来动态管理)

每个 executor 都有一些 slots 用于运行 tasks:这些任务可以在这个 executor 的生命周期内并行地运行。

job,stage,task

Alt text

如上图所示,在执行层次的最上层是 jobs:

在 Spark 应用内部调用一个 action 方法就会触发一个 Spark job 的启动。

Spark 通过检查这个 action 依赖的 RDD 的 DAG 并生成一个执行计划:

  • 这个执行计划从最远端的 RDD(指那些没有任何依赖其他 RDD 或者 已经 cache 过的 RDD 的 RDD) 开始,到最终的 RDD 。

  • 这个执行计划还包括 将 job 的 transformation 方法组装成 stages

    • 每个 stage 对应到一组 tasks这组 task 都执行相同的代码,只是每个 task 运行在不同的数据集上。
    • 每个 stage 都包含一组无需 shuffling 过程的 transformation。

那么决定是否需要 full shuffle 的因素是什么呢?

每个 RDD 都由固定数量的分区组成,每个分区包含一部分数据记录。对于那些由 narrow transformation (比如 map, filter)返回的 RDD,每个分区中的数据只需要依赖父 RDD 的同一个分区,每个数据记录的产生只需要依赖父 RDD 中的一个记录。

注意,coalesce 方法也被认为是 narrow transformation。因为该方法不涉及到 full shuffle,只会将某些分区的数据移动到另外分区上。详情可以参考 这个回答这个文章

相对的,对于 wide transformation (比如 groupByKey, reduceByKey),RDD 的一个分区的数据产生可能依赖父 RDD 的多个分区。

对于有 key 的 数据,同一个 key 对应的所有数据都必须落在相同的分区,由同一个 task 处理:

这就要求 Spark 必须执行一个 shuffle 操作,起一个新的 stage,将集群中所有相关的数据都放在一些新的分区中。

举个例子:

1
2
3
4
5
6
val tokenized = sc.textFile(args(0)).flatMap(_.split(' '))
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
val filtered = wordCounts.filter(_._2 >= 1000)
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).
reduceByKey(_ + _)
charCounts.collect()

上述代码会以 reduceByKey 为分界线产生 3 个 stage

对于更为复杂一点的操作如:

Alt text

每个洋红框都表示一个 stage。

shuffle

在每个 stage 的边界,数据都被 父 Stage 中的 task 写到磁盘上,并且由 子 Stage 中的 task 跨网络远程取走:

  • 由于涉及到 网络 IO 和 磁盘 IO,所以 Stage 边界这个事情非常昂贵,应该想方设法避免;
  • 父子 Stage 中 RDD 的分区数量可能有所不同:只需要一个 numPartitions 参数就能让 transformation 触发一个 stage 边界并决定了子 Stage 的 RDD 分区数量。
  • Stage 边界的分区数量会很大程度上影响一个 Spark 应用的性能。

选择正确的算子

有时候使用不同的 transformationaction 算子可以得到相同的结果,但是不同的选择带来的开销可能会截然不同。

使用 SparkSQL 时可能不必考虑该如何挑选算子,但是使用 DataSet 或者 RDD 接口时,就需要仔细考虑了。

选择算子的首要目的是减少 shuffle 的次数:shuffle 的数据都要先写在磁盘并通过网络发送。

repartition, join, cogroup, *By, *ByKey 等 transformation 算子都会造成数据 shuffle,常见的优化手段和注意事项:

  • 在执行一个关联 reduce 操作时,避免使用 groupByKey
    比如,rdd.groupByKey.mapValues(_.sum)rdd.reduceByKey(_ + _) 的结果相同,但是前者会在网络中移动整个数据集合,而后者会先计算本地每个 key 对应 value 的和,然后再 shuffle,然后再计算总的和。

  • 当输入和输出值的类型不同时,使用 aggregateByKey
    aggregateByKey 使用 map-side 聚合

  • 不要使用 flatMap-join-groupBy 模式
    当两个数据集已经根据 key 分好组,而想要 Join 并保持分组时,使用 cogroup,可以避免 解包和打包的开销。

减少 shuffle 的数量

当上一个 transformation 已经根据相同的 partitioner 将数据分区时,Spark 会避免 shuffle:

1
2
3
rdd1 = someRdd.reduceByKey(...)
rdd2 = someOtherRdd.reduceByKey(...)
rdd3 = rdd1.join(rdd2)

由于没有 partitioner 被当做参数传进 reduceByKey,Spark 会使用默认 partitioner。

这两个 reduceByKey transformation 会造成两次 shuffle。

当两个 rdd 都被 reduceByKey 操作重新分为相同个数的分区时,join 算子就不会造成新的 shuffle:

这是因为在分区数相同时,RDD1 在同一个结果分区中的 keys 只可能出现在 RDD2 的一个分区中。因此 RDD3 的一个分区只依赖于 RDD1 的一个分区 和 RDD2 的一个分区,所以就没有第三个 shuffle:

Alt text

当使用不同的 partitioner 时,只有 partitioner 较小的 rdd 需要 reshuffle:

Alt text

广播变量

在 join 两个数据集时,还有一种方法可以避免 shuffle:广播变量

当其中一个数据集小到足以放进每个 executor 的内存中时,可以将其以 HashMap 的形式加载到 driver 的内存中,然后将其广播到每个 executor 的内存中。然后 join 操作就可以在每个 executor 中单独进行而不需要 shuffle 了。

增加 Shuffle 的数量

也有当 shuffle 次数增加时,性能更好的情况:

  • 当输入数据量很大,但是 InputFormat 给每个分区分配了数量巨大的数据时,使用 repartition 算子将分区增加可以让每个分区处理更少的数据,从而更好的利用集群中的 多核,从而提升系统的性能。

  • 当需要将聚合后的数据发送到 Driver 时,先在每个分区使用 reduceByKey 或者 aggregateByKey 操作可以将计算的压力分散到多个分区上从而减小 driver 的计算量。 这种操作对那些已经根据 key 分好组的情况尤其有用!

二次排序

repartitionAndSortWithinPartitions 将排序操作下推到 shuffle 的阶段,量大的数据可以被有效地分割,并且排序操作可以和其他操作结合在一起进行。

当你想要先将数据根据 key 分组后,在每个 key 对应的这组数据内根据某个字段排序时,可以直接用这个方法。

(To Be Continued…)