Spark 输出到 Hive 总结
将 Spark 中计算好的 DataFrame 保存在 Hive 中应该是使用 Spark 作为数据处理引擎最常见的需求之一了。
但是在使用过程中会遇到一些问题,本文试图总结一下我在使用过程中遇到的问题以及解决办法。
分区
根据 Hive 表是否有分区字段可以分为:
无分区
方法一
1 | df.write.mode("overwrite").saveAsTable("TableName") |
方法二
版本 < 2.0
1
2
3
4
5
6
7
8val hc = HiveContext(sc)
import hc.implicits._
df.registerTempTable("MyTempTable")
hc.sql(s"""
insert overwrite table $MyTable
select * from MyTempTable
""")版本 > 2.0
1
2
3val spark = SparkSession.builder.enableHiveSupport.getOrCreate
myDF.createOrReplaceTempView("MyTempTable")
spark.sql(s"insert overwrite table $MyTable select * from MyTempTable")
有分区
通常情况下,有分去的 Hive 表我们都是希望能 Overwrite 指定的分区。
使用 SparkSQL 的情况:
Hive 对应的表需要先定义成 External
:
1 | CREATE EXTERNAL TABLE test |
然后用 HiveContext
执行覆盖分区操作:
1 | hiveContext = HiveContext(sc) |
其中,update_dataframe 是在 HiveContext 上使用 registerTempTable
或者 createOrReplaceTempView
注册的临时表。
注意,要使用这种方法需要设置 Hive 的动态分区功能为 true:
hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
版本: > 2.3
2.3 以后,Spark 终于有了只覆盖对应分区的功能:
1 | spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") |
版本: 2.0 ~ 2.3
- 直接写文件到 Hive 对应的 HDFS 路径:
1
df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")
版本: 1.6.2 ~ 2.0
使用同样的方法,但是需要额外配置:
1 | sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") |