Spark distinct() vs. groupBy()
前言
最近在项目中遇到一个需求:
数据需要按照某些维度分区存储在 S3 上,例如:
s3://bucket.name/field_1=a/field_2=b/field_3=c/xxxxx.parquet
。
另外,在公司系统中,元数据是手动管理,因此在对数据文件进行更改过程中,需要在写入前手动取出所有的分区字段对应的值。
先不论是否有更好地方式管理写入文件相应过程,我们考虑一下目前这个需求该怎样高效地实现。
在这次讨论中,我们都只考虑 Dataframe
接口。
方案一:使用 distinct()
1 | values = df.select("field_1", "field_2", "field_3").distinct().collect() |
分析一
distinct()
在 Spark
源码中:
1 | /** |
distinct()
和 dropDuplicates()
是同一个接口。
dropDuplicates()
再看 dropDuplicates()
接口:
1 | def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns) |
在 dropDuplicates
中:
- 首先检查需要被
distinct
的field
是否被正常解析了。 返回一个
case class
:Deduplicate
Deduplicate
在org.apache.spark.sql.catalyst.plans.logical
package 下;定义如下:
1
2
3
4
5
6
7/** A logical plan for `dropDuplicates`. */
case class Deduplicate(
keys: Seq[Attribute],
child: LogicalPlan)
extends UnaryNode {
override def output: Seq[Attribute] = child.output
}可以发现,这是一个逻辑执行计划的操作符,用于将 SQL 语句或者
DataSet
API 中的DISTINCT
关键字和接口转换为Deduplicate
操作符。
接下来可以看这个操作符在 SQL 优化器中被用到:
1 | /** |
可以看到,这段优化做的主要工作是:
将 DeDuplicate 接口优化成聚合算子。
在这个过程中,有几个考虑:
将所有字段分为两类:
keys
和aggColumns
:- 其中,
keys
为被分组的字段,aggColumn
为其他字段; - 根据
keys
分组后,取所有aggColumn
字段在分组中的第一个出现的值作为字段的值。
- 其中,
根据
SPARK-22951
,解决了当keys
为空的时候的 Bug.
小结
根据上面的分析可以知道:
在 Spark DataSet API 中:
distinct()
==dropDuplicates()
dopDuplicates()
的实现和下述操作没差别:1
2
3
4
5df.select($"field_1", $"field_2", $"field_3").dropDuplicates()
// almost equilivate:
df.groupBy($"field_1", $"field_2", $"field_3").agg(frist($"field_4"))).drop($"field_4")
方案二:使用 groupBy()
1 | values = df.groupBy("field_1", "field_2", "field_3").count().drop("count").collect() |
分析二
使用 groupBy
接口会创建一个 RelationalGroupedDataset
。
在这个 RelationalGroupedDataset
上可以调用一系列的操作符,在上面的方案中,使用了 count
。
我们来看一下 count
方法的实现:
1 | /** |
其中,Count
操作符的定义如下:
1 | ( |
小结
由上面的分析可以知道,如果使用 .count()
算子,至少需要遍历 rdd 的每个分区。
初步结论
直接使用 distinct()
方法和使用 groupBy
+ count
的方法原理类似,但似乎后者性能更差,因为需要遍历每个分区计算总和。
真的吗?
如果单独使用 count()
性能确实不如直接使用 distinct()
方法。
但是,在 count()
后加上 drop("count")
后,情况就不一样了。
我们来看一下执行计划:
1 | df.groupBy("State", "City", "Department", "Quarter Ending").count().drop("count").explain(True) |
可以看到,优化器在将逻辑执行计划优化的过程中,将 Aggregate function
去掉了,从最终的物理执行计划中也能看出,最终已经没有了 Aggregate function
。
最后我们来看一下直接使用 distinct()
的物理执行计划:
1 | df.select("State", "City", "Department", "Quarter Ending").distinct().explain() |
可以看到,两者的物理执行计划几乎是一模一样的,唯一的差别在于后者多了一步 select
的操作。
还有其他吗?
在 Spark
中,RDD
的接口也提供了 distinct
方法。
我们来看一下具体实现:
1 | /** |
可以看到,RDD
的 distinct
是使用 reduceByKey
接口,通过遍历每个分区所有数据,并且只拿第一个数据条目,最后合并每个分区数据。
这个性能和上述的 Dataset
接口性能应该是类似的。但是考虑到使用 Dataset
接口会应用到 Spark
内部的优化,包括 Tungsten
和 Catalyst
的优化。因此推荐还是使用 Dataset
接口。
总结
根据上面的分析,我们可以知道,上述两种基于 Dataset
的方案的性能应该是几乎一样的,都可以使用,任君选择。