将 Spark 中计算好的 DataFrame 保存在 Hive 中应该是使用 Spark 作为数据处理引擎最常见的需求之一了。

但是在使用过程中会遇到一些问题,本文试图总结一下我在使用过程中遇到的问题以及解决办法。

分区

根据 Hive 表是否有分区字段可以分为:

无分区

方法一

1
df.write.mode("overwrite").saveAsTable("TableName")

方法二

  1. 版本 < 2.0

    1
    2
    3
    4
    5
    6
    7
    8
    val hc = HiveContext(sc)
    import hc.implicits._

    df.registerTempTable("MyTempTable")
    hc.sql(s"""
    insert overwrite table $MyTable
    select * from MyTempTable
    """)
  2. 版本 > 2.0

    1
    2
    3
    val spark = SparkSession.builder.enableHiveSupport.getOrCreate
    myDF.createOrReplaceTempView("MyTempTable")
    spark.sql(s"insert overwrite table $MyTable select * from MyTempTable")

有分区

通常情况下,有分去的 Hive 表我们都是希望能 Overwrite 指定的分区。

使用 SparkSQL 的情况:

Hive 对应的表需要先定义成 External:

1
2
3
4
5
6
CREATE EXTERNAL TABLE test
(name STRING)
PARTITIONED BY
(age INT)
STORED AS PARQUET
LOCATION 'hdfs:///tmp/tables/test'

然后用 HiveContext 执行覆盖分区操作:

1
2
3
4
5
6
hiveContext = HiveContext(sc)
update_dataframe.registerTempTable('update_dataframe')

hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age)
SELECT name, age
FROM update_dataframe""")

其中,update_dataframe 是在 HiveContext 上使用 registerTempTable 或者 createOrReplaceTempView 注册的临时表。

注意,要使用这种方法需要设置 Hive 的动态分区功能为 true:
hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

版本: > 2.3

2.3 以后,Spark 终于有了只覆盖对应分区的功能:

1
2
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")

版本: 2.0 ~ 2.3

  1. 直接写文件到 Hive 对应的 HDFS 路径:
    1
    df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")

版本: 1.6.2 ~ 2.0

使用同样的方法,但是需要额外配置:

1
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")