为了减小一张表里的数据量,Hive 提供了 分区 的功能,基本思路就是把每个分区的数据放在 HDFS 里独立的文件夹中,这样在扫描全表的时候被扫描的数据量就变小了。常见的分区策略是按时间分区

但是在一些场景下,即使在分区后,每个分区下面数据量也很大,这时候就可以用 Hive 的 Buckets 功能了。

概念

Hive 分区将一张大表分割成多个分区,这些分区可以进一步被分割成更易被管理的 Buckets

Bucket 的概念基于 Hash 函数,在表的某一列上通过 CLUSTERED BY 语句作用这个 Hash 函数。通过同一列被装桶的数据总是存在同一个桶里,而每个桶都被创建为一个文件。

Bucket 的概念使得在一列或者多列上排序变得更加灵活,而由于 Bucket 后的数据文件大小都类似,所以使用 map-side join 变得更加容易且迅速。

实践

准备数据

1
2
3
4
5
6
7
8
9
10
11
12
13
use default;
drop table if exists input_table;
CREATE TABLE if not exists input_table (street string,
city string,
zip string,
STATE string,
beds string,
baths string,
sq_feet string,
flat_type string,
price string)
ROW format delimited fields terminated BY ','
STORED AS TEXTFILE;

从已有的 .csv 文件导入数据:

1
2
3
SET hive.warehouse.subdir.inherit.perms = FALSE;

LOAD DATA inpath '/tmp/real_state.csv' overwrite INTO TABLE input_table;

创建分桶表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=20000;
set hive.exec.max.dynamic.partitions.pernode=20000;
set hive.enforce.bucketing=true;

DROP TABLE IF EXISTS bucketed_input_table;
CREATE TABLE if not exists bucketed_input_table (street string,
zip string,
STATE string,
beds string,
baths string,
sq_feet string,
flat_type string,
price string)
partitioned by (city string)
clustered by (street) into 4 buckets
-- ROW format delimited fields terminated BY ',';
-- STORED AS TEXTFILE;

导入数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
INSERT overwrite TABLE bucketed_input_table partition(city)
SELECT street,
zip,
STATE,
beds,
baths,
sq_feet,
flat_type,
price,
city
FROM input_table;

-- INFO : Compiling command(queryId=hive_20181220150751_02cc7292-01c3-43c4-8a89-a0c71c9ceae6):
-- INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:street, type:string, comment:null), FieldSchema(name:zip, type:string, comment:null), FieldSchema(name:state, type:string, comment:null), FieldSchema(name:beds, type:string, comment:null), FieldSchema(name:baths, type:string, comment:null), FieldSchema(name:sq_feet, type:string, comment:null), FieldSchema(name:flat_type, type:string, comment:null), FieldSchema(name:price, type:string, comment:null), FieldSchema(name:city, type:string, comment:null)], properties:null)
-- INFO : Completed compiling command(queryId=hive_20181220150751_02cc7292-01c3-43c4-8a89-a0c71c9ceae6); Time taken: 0.078 seconds
-- INFO : Executing command(queryId=hive_20181220150751_084c5303-c4e2-483d-9a91-6e1501306439):
-- INFO : Query ID = hive_20181220150751_084c5303-c4e2-483d-9a91-6e1501306439
-- INFO : Total jobs = 2
-- INFO : Launching Job 1 out of 2
-- INFO : Starting task [Stage-1:MAPRED] in parallel
-- INFO : Number of reduce tasks determined at compile time: 4
-- INFO : Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 4
-- INFO : 2018-12-20 15:07:57,953 Stage-1 map = 0%, reduce = 0%
-- INFO : 2018-12-20 15:08:04,148 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 2.08 sec
-- INFO : 2018-12-20 15:08:11,347 Stage-1 map = 100%, reduce = 50%, Cumulative CPU 13.04 sec
-- INFO : 2018-12-20 15:08:14,424 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 24.18 sec
-- INFO : MapReduce Total cumulative CPU time: 24 seconds 180 msec
-- INFO : Launching Job 2 out of 2
-- INFO : Starting task [Stage-3:MAPRED] in parallel
-- INFO : Starting task [Stage-2:STATS] in serial mode
-- INFO : Number of reduce tasks not specified. Estimated from input data size: 1
-- INFO : In order to change the average load for a reducer (in bytes):
-- INFO : set hive.exec.reducers.bytes.per.reducer=<number>
-- INFO : In order to limit the maximum number of reducers:
-- INFO : set hive.exec.reducers.max=<number>
-- INFO : In order to set a constant number of reducers:
-- INFO : set mapreduce.job.reduces=<number>
-- WARN : Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
-- INFO : number of splits:3

-- INFO : Hadoop job information for Stage-3: number of mappers: 3; number of reducers: 1
-- INFO : 2018-12-20 15:08:32,556 Stage-3 map = 0%, reduce = 0%
-- INFO : 2018-12-20 15:08:37,750 Stage-3 map = 100%, reduce = 0%, Cumulative CPU 4.11 sec
-- INFO : 2018-12-20 15:08:44,932 Stage-3 map = 100%, reduce = 100%, Cumulative CPU 7.13 sec
-- INFO : MapReduce Total cumulative CPU time: 7 seconds 130 msec
-- INFO : OK

可以看到,由于我们设置了 hive.enforce.bucketing=true,所以 reduce 任务和 bucket 的数量应该一致,都为 4.

查看任意一个 city 分区:


可以看到,每个分区的数据已经被按照 分桶了。

## 应用

### Bucket Map Join

使用场景:
Join 操作只在 Map 阶段,处理 a 表的 bucket 1 的 mapper 只需取 b 表的 bucket 1.
当所有参与 JOIN 的表满足下面的条件时,使用 Bucket Map Join 是极好的。

  • 表的数据量都很大;
  • Join 的列和 Bucket 的列是相同的;
  • 一个表的 Bucket 的数量是另一个表的 倍数
  • 都没有排序;

需要注意的点:

  • 参与 Join 的表需要在插入数据时就要分桶:set hive.enforce.bucketing=true
  • set hive.optimize.bucketmapjoin=true

Sort Merge Bucket(SMB) Map Join

使用场景:

Join 只在 Map 阶段,每个表相同的 bucket 之间相互 join。

当参与 join 的表满足下面的条件时:

  • 表都很大;
  • join 的列和 bucket 的列相同;
  • 基于 join 的列排好序;
  • 所有参与 join 的表的 Buckets 的数量相同;

注意事项:

  • 参与 Join 的表需要在插入数据时就要分桶:set hive.enforce.bucketing=true
  • 需要这些设置将 SMB join 换成 SMB Map Join:
    1
    2
    3
    4
    set hive.auto.convert.sortmerge.join=true;
    set hive.optimize.bucketmapjoin = true;
    set hive.optimize.bucketmapjoin.sortedmerge = true;
    set hive.auto.convert.sortmerge.join.noconditionaltask=true;