Hive Buckets
为了减小一张表里的数据量,Hive
提供了 分区
的功能,基本思路就是把每个分区的数据放在 HDFS 里独立的文件夹中,这样在扫描全表的时候被扫描的数据量就变小了。常见的分区策略是按时间分区。
但是在一些场景下,即使在分区后,每个分区下面数据量也很大,这时候就可以用 Hive 的 Buckets
功能了。
概念
Hive 分区将一张大表分割成多个分区,这些分区可以进一步被分割成更易被管理的 Buckets
。
Bucket
的概念基于 Hash 函数
,在表的某一列上通过 CLUSTERED BY
语句作用这个 Hash 函数。通过同一列被装桶的数据总是存在同一个桶里,而每个桶都被创建为一个文件。
Bucket
的概念使得在一列或者多列上排序变得更加灵活,而由于 Bucket 后的数据文件大小都类似,所以使用 map-side join
变得更加容易且迅速。
实践
准备数据
1 | use default; |
从已有的 .csv
文件导入数据:
1 | SET hive.warehouse.subdir.inherit.perms = FALSE; |
创建分桶表:
1 | set hive.exec.dynamic.partition=true; |
导入数据:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46INSERT overwrite TABLE bucketed_input_table partition(city)
SELECT street,
zip,
STATE,
beds,
baths,
sq_feet,
flat_type,
price,
city
FROM input_table;
-- INFO : Compiling command(queryId=hive_20181220150751_02cc7292-01c3-43c4-8a89-a0c71c9ceae6):
-- INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:street, type:string, comment:null), FieldSchema(name:zip, type:string, comment:null), FieldSchema(name:state, type:string, comment:null), FieldSchema(name:beds, type:string, comment:null), FieldSchema(name:baths, type:string, comment:null), FieldSchema(name:sq_feet, type:string, comment:null), FieldSchema(name:flat_type, type:string, comment:null), FieldSchema(name:price, type:string, comment:null), FieldSchema(name:city, type:string, comment:null)], properties:null)
-- INFO : Completed compiling command(queryId=hive_20181220150751_02cc7292-01c3-43c4-8a89-a0c71c9ceae6); Time taken: 0.078 seconds
-- INFO : Executing command(queryId=hive_20181220150751_084c5303-c4e2-483d-9a91-6e1501306439):
-- INFO : Query ID = hive_20181220150751_084c5303-c4e2-483d-9a91-6e1501306439
-- INFO : Total jobs = 2
-- INFO : Launching Job 1 out of 2
-- INFO : Starting task [Stage-1:MAPRED] in parallel
-- INFO : Number of reduce tasks determined at compile time: 4
-- INFO : Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 4
-- INFO : 2018-12-20 15:07:57,953 Stage-1 map = 0%, reduce = 0%
-- INFO : 2018-12-20 15:08:04,148 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 2.08 sec
-- INFO : 2018-12-20 15:08:11,347 Stage-1 map = 100%, reduce = 50%, Cumulative CPU 13.04 sec
-- INFO : 2018-12-20 15:08:14,424 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 24.18 sec
-- INFO : MapReduce Total cumulative CPU time: 24 seconds 180 msec
-- INFO : Launching Job 2 out of 2
-- INFO : Starting task [Stage-3:MAPRED] in parallel
-- INFO : Starting task [Stage-2:STATS] in serial mode
-- INFO : Number of reduce tasks not specified. Estimated from input data size: 1
-- INFO : In order to change the average load for a reducer (in bytes):
-- INFO : set hive.exec.reducers.bytes.per.reducer=<number>
-- INFO : In order to limit the maximum number of reducers:
-- INFO : set hive.exec.reducers.max=<number>
-- INFO : In order to set a constant number of reducers:
-- INFO : set mapreduce.job.reduces=<number>
-- WARN : Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
-- INFO : number of splits:3
-- INFO : Hadoop job information for Stage-3: number of mappers: 3; number of reducers: 1
-- INFO : 2018-12-20 15:08:32,556 Stage-3 map = 0%, reduce = 0%
-- INFO : 2018-12-20 15:08:37,750 Stage-3 map = 100%, reduce = 0%, Cumulative CPU 4.11 sec
-- INFO : 2018-12-20 15:08:44,932 Stage-3 map = 100%, reduce = 100%, Cumulative CPU 7.13 sec
-- INFO : MapReduce Total cumulative CPU time: 7 seconds 130 msec
-- INFO : OK
可以看到,由于我们设置了 hive.enforce.bucketing=true
,所以 reduce
任务和 bucket
的数量应该一致,都为 4.
查看任意一个 city 分区:

可以看到,每个分区的数据已经被按照 列 分桶了。
## 应用
### Bucket Map Join

使用场景:
Join 操作只在 Map
阶段,处理 a 表的 bucket 1 的 mapper 只需取 b 表的 bucket 1.
当所有参与 JOIN 的表满足下面的条件时,使用 Bucket Map Join
是极好的。
- 表的数据量都很大;
- Join 的列和 Bucket 的列是相同的;
- 一个表的 Bucket 的数量是另一个表的 倍数;
- 都没有排序;
需要注意的点:
- 参与 Join 的表需要在插入数据时就要分桶:
set hive.enforce.bucketing=true
set hive.optimize.bucketmapjoin=true
Sort Merge Bucket(SMB) Map Join

使用场景:
Join 只在 Map 阶段,每个表相同的 bucket 之间相互 join。
当参与 join 的表满足下面的条件时:
- 表都很大;
- join 的列和 bucket 的列相同;
- 基于 join 的列排好序;
- 所有参与 join 的表的 Buckets 的数量相同;
注意事项:
- 参与 Join 的表需要在插入数据时就要分桶:
set hive.enforce.bucketing=true
- 需要这些设置将 SMB join 换成 SMB Map Join:
1
2
3
4set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;