前言

最近在项目中遇到一个需求:

数据需要按照某些维度分区存储在 S3 上,例如:s3://bucket.name/field_1=a/field_2=b/field_3=c/xxxxx.parquet
另外,在公司系统中,元数据是手动管理,因此在对数据文件进行更改过程中,需要在写入前手动取出所有的分区字段对应的值。
先不论是否有更好地方式管理写入文件相应过程,我们考虑一下目前这个需求该怎样高效地实现。

在这次讨论中,我们都只考虑 Dataframe 接口。

方案一:使用 distinct()

1
values = df.select("field_1", "field_2", "field_3").distinct().collect()

分析一

distinct()

Spark 源码中:

1
2
3
4
5
/**
* Returns a new Dataset that contains only the unique rows from this Dataset.
* This is an alias for `dropDuplicates`.
*/
def distinct(): Dataset[T] = dropDuplicates()

distinct()dropDuplicates() 是同一个接口。

dropDuplicates()

再看 dropDuplicates() 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)

/**
* (Scala-specific) Returns a new Dataset with duplicate rows removed, considering only
* the subset of columns.
*/
def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
val resolver = sparkSession.sessionState.analyzer.resolver
val allColumns = queryExecution.analyzed.output
val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
// It is possibly there are more than one columns with the same name,
// so we call filter instead of find.
val cols = allColumns.filter(col => resolver(col.name, colName))
if (cols.isEmpty) {
throw new AnalysisException(
s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
}
cols
}
Deduplicate(groupCols, logicalPlan)
}

dropDuplicates 中:

  • 首先检查需要被 distinctfield 是否被正常解析了。
  • 返回一个 case class: Deduplicate

    • Deduplicateorg.apache.spark.sql.catalyst.plans.logical package 下;
    • 定义如下:

      1
      2
      3
      4
      5
      6
      7
          /** A logical plan for `dropDuplicates`. */
      case class Deduplicate(
      keys: Seq[Attribute],
      child: LogicalPlan)
      extends UnaryNode {
      override def output: Seq[Attribute] = child.output
      }
    • 可以发现,这是一个逻辑执行计划的操作符,用于将 SQL 语句或者 DataSet API 中的 DISTINCT 关键字和接口转换为 Deduplicate 操作符。

接下来可以看这个操作符在 SQL 优化器中被用到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Replaces logical [[Deduplicate]] operator with an [[Aggregate]] operator.
*/
object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Deduplicate(keys, child) if !child.isStreaming =>
val keyExprIds = keys.map(_.exprId)
val aggCols = child.output.map { attr =>
if (keyExprIds.contains(attr.exprId)) {
attr
} else {
Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId)
}
}
// SPARK-22951: Physical aggregate operators distinguishes global aggregation and grouping
// aggregations by checking the number of grouping keys. The key difference here is that a
// global aggregation always returns at least one row even if there are no input rows. Here
// we append a literal when the grouping key list is empty so that the result aggregate
// operator is properly treated as a grouping aggregation.
val nonemptyKeys = if (keys.isEmpty) Literal(1) :: Nil else keys
Aggregate(nonemptyKeys, aggCols, child)
}
}

可以看到,这段优化做的主要工作是:

将 DeDuplicate 接口优化成聚合算子。

在这个过程中,有几个考虑:

  • 将所有字段分为两类:keysaggColumns:

    • 其中,keys 为被分组的字段,aggColumn 为其他字段;
    • 根据 keys 分组后,取所有 aggColumn 字段在分组中的第一个出现的值作为字段的值。
  • 根据 SPARK-22951,解决了当 keys 为空的时候的 Bug.

小结

根据上面的分析可以知道:

在 Spark DataSet API 中:

  • distinct() == dropDuplicates()
  • dopDuplicates() 的实现和下述操作没差别:
    1
    2
    3
    4
    5
    df.select($"field_1", $"field_2", $"field_3").dropDuplicates()

    // almost equilivate:

    df.groupBy($"field_1", $"field_2", $"field_3").agg(frist($"field_4"))).drop($"field_4")

方案二:使用 groupBy()

1
values = df.groupBy("field_1", "field_2", "field_3").count().drop("count").collect()

分析二

使用 groupBy 接口会创建一个 RelationalGroupedDataset

在这个 RelationalGroupedDataset 上可以调用一系列的操作符,在上面的方案中,使用了 count

我们来看一下 count 方法的实现:

1
2
3
4
5
6
7
/**
* Count the number of rows for each group.
* The resulting `DataFrame` will also contain the grouping columns.
*
* @since 1.3.0
*/
def count(): DataFrame = toDF(Seq(Alias(Count(Literal(1)).toAggregateExpression(), "count")()))

其中,Count 操作符的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@ExpressionDescription(
usage = """
_FUNC_(*) - Returns the total number of retrieved rows, including rows containing null.

_FUNC_(expr[, expr...]) - Returns the number of rows for which the supplied expression(s) are all non-null.

_FUNC_(DISTINCT expr[, expr...]) - Returns the number of rows for which the supplied expression(s) are unique and non-null.
""")
// scalastyle:on line.size.limit
case class Count(children: Seq[Expression]) extends CountLike {

override lazy val updateExpressions = {
val nullableChildren = children.filter(_.nullable)
if (nullableChildren.isEmpty) {
Seq(
/* count = */ count + 1L
)
} else {
Seq(
/* count = */ If(nullableChildren.map(IsNull).reduce(Or), count, count + 1L)
)
}
}
}

object Count {
def apply(child: Expression): Count = Count(child :: Nil)
}

小结

由上面的分析可以知道,如果使用 .count() 算子,至少需要遍历 rdd 的每个分区。

初步结论

直接使用 distinct() 方法和使用 groupBy + count 的方法原理类似,但似乎后者性能更差,因为需要遍历每个分区计算总和。

真的吗?

如果单独使用 count() 性能确实不如直接使用 distinct() 方法。

但是,在 count() 后加上 drop("count") 后,情况就不一样了。

我们来看一下执行计划:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
df.groupBy("State", "City", "Department", "Quarter Ending").count().drop("count").explain(True)

== Parsed Logical Plan ==
Project [State#16, City#15, Department#11, Quarter Ending#10]
+- Aggregate [State#16, City#15, Department#11, Quarter Ending#10], [State#16, City#15, Department#11, Quarter Ending#10, count(1) AS count#559L]
+- Relation[Quarter Ending#10,Department#11,UnitNo#12,Vendor Number#13,Vendor#14,City#15,State#16,DeptID Description#17,DeptID#18,Amount#19,Account#20,AcctNo#21,Fund Description#22,Fund#23] csv

== Analyzed Logical Plan ==
State: string, City: string, Department: string, Quarter Ending: string
Project [State#16, City#15, Department#11, Quarter Ending#10]
+- Aggregate [State#16, City#15, Department#11, Quarter Ending#10], [State#16, City#15, Department#11, Quarter Ending#10, count(1) AS count#559L]
+- Relation[Quarter Ending#10,Department#11,UnitNo#12,Vendor Number#13,Vendor#14,City#15,State#16,DeptID Description#17,DeptID#18,Amount#19,Account#20,AcctNo#21,Fund Description#22,Fund#23] csv

== Optimized Logical Plan ==
Aggregate [State#16, City#15, Department#11, Quarter Ending#10], [State#16, City#15, Department#11, Quarter Ending#10]
+- Project [Quarter Ending#10, Department#11, City#15, State#16]
+- Relation[Quarter Ending#10,Department#11,UnitNo#12,Vendor Number#13,Vendor#14,City#15,State#16,DeptID Description#17,DeptID#18,Amount#19,Account#20,AcctNo#21,Fund Description#22,Fund#23] csv

== Physical Plan ==
*(2) HashAggregate(keys=[State#16, City#15, Department#11, Quarter Ending#10], functions=[], output=[State#16, City#15, Department#11, Quarter Ending#10])
+- Exchange hashpartitioning(State#16, City#15, Department#11, Quarter Ending#10, 200)
+- *(1) HashAggregate(keys=[State#16, City#15, Department#11, Quarter Ending#10], functions=[], output=[State#16, City#15, Department#11, Quarter Ending#10])
+- *(1) FileScan csv [Quarter Ending#10,Department#11,City#15,State#16] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/Vermont_Vendor_Payments.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Quarter Ending:string,Department:string,City:string,State:string>

可以看到,优化器在将逻辑执行计划优化的过程中,将 Aggregate function 去掉了,从最终的物理执行计划中也能看出,最终已经没有了 Aggregate function

最后我们来看一下直接使用 distinct() 的物理执行计划:

1
2
3
4
5
6
7
8
df.select("State", "City", "Department", "Quarter Ending").distinct().explain()

== Physical Plan ==
*(2) HashAggregate(keys=[State#16, City#15, Department#11, Quarter Ending#10], functions=[])
+- Exchange hashpartitioning(State#16, City#15, Department#11, Quarter Ending#10, 200)
+- *(1) HashAggregate(keys=[State#16, City#15, Department#11, Quarter Ending#10], functions=[])
+- *(1) Project [State#16, City#15, Department#11, Quarter Ending#10]
+- *(1) FileScan csv [Quarter Ending#10,Department#11,City#15,State#16] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/Vermont_Vendor_Payments.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Quarter Ending:string,Department:string,City:string,State:string>

可以看到,两者的物理执行计划几乎是一模一样的,唯一的差别在于后者多了一步 select 的操作。

还有其他吗?

Spark 中,RDD 的接口也提供了 distinct 方法。

我们来看一下具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}

/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}

可以看到,RDDdistinct 是使用 reduceByKey 接口,通过遍历每个分区所有数据,并且只拿第一个数据条目,最后合并每个分区数据。

这个性能和上述的 Dataset 接口性能应该是类似的。但是考虑到使用 Dataset 接口会应用到 Spark 内部的优化,包括 TungstenCatalyst 的优化。因此推荐还是使用 Dataset 接口。

总结

根据上面的分析,我们可以知道,上述两种基于 Dataset 的方案的性能应该是几乎一样的,都可以使用,任君选择。