历史啊特点啊什么的就不说了,直接开搞!

一些基本概念

MapReduce 工作流的主要特点是:把整个处理数据的过程分成两个阶段:

  • Map
  • Reduce

每个阶段都有由键值对组成的输入输出

  • Map

    • 输入: 是原始数据, key 是每行数据起始值对于整个数据集所在的 offset,value 是每行的具体数据;
    • 输出: 关心的 key, 关心的 value,每个输入会有一个输出 键值对。
    • 处理: MapReduce 框架会根据上面的输入键值对的 key 聚合并排序。
  • Reduce

    • 输入: Map 阶段的处理后的键值对;
    • 输出: 真正想要的键值对。

一个简单的示例图:

map-reduce-logic

hadoop 命令

以下面的命令为例:

1
hadoop jar ch02-4.0.jar com.xieyuanpeng.hadoop.book.ch02.MaxTemp /user/xieyuanpeng/isd-lite/1996,/user/xieyuanpeng/isd-lite/1949,/user/xieyuanpeng/isd-lite/1948 /user/xieyuanpeng/book3

hadoop 命令和 jar 子命令使用时,会调用 JVM 来运行其后跟随的 class

hadoop 命令会将 Hadoop 库以及其依赖加载进 classpath

MapReduce 任务 log

每个 MapReduce 任务结束后在 console 中的最后的 Counters 对于 debug 很有用。

在 HDFS 集群上执行 MapReduce 任务

Hadoop 会通过 YARNMR 的计算放到每个保存了部分数据的机器上。

数据流

首先介绍一些*术语*:

job: 一个 MR job 是用户想要做的工作的组合,包含了输入数据,MR 程序,以及配置信息等。
tasks: Hadoop 会将 job 分割为两类 taskmap tasks 以及 reduce tasks。任务通过 YARN 被调度到集群中的节点上运行。如果一个任务失败了,会自动被重新调度到其他节点上运行。
input splits: Hadoop 会将 MR 程序的输入数据分割成大小固定的片段: input splits。Hadoop 会为每个 split 创建一个 map 任务,这个 map 任务会为 split 中的每个 record 运行用户自定义的 map 函数。

当集群中的节点数量足够大的情况下,有一个需要做决定的 trade-off:

  • splits 越多越小 —-> 整个 job 就越负载均衡;
  • splits 越多 —-> 维护 map task 的开销就越大。

对于大多数 job 而言,split 的大小都趋于和 HDFS block 大小一致 —- 默认为 128M。

Hadoop 会尽力保证 map task 在输入数据所在的 HDFS 集群的节点上运行。data locality optimization 是也。

但有时也会发生 data split 所有副本所在的节点都在运行其他的 map task 的情况,这时 hadoop 会在同一个 rack 内找其他可用的节点来执行这个 map taskrack-local 是也。

在更极端的情况下,在同一个 rack 内都找不到可用的节点执行 map task:hadoop 会在另外的 rack 的节点上执行这个 map taskoff-rack 是也。

以上就是为什么一个 input split 要和 一个 HDFS block 大小一致。

map task的输出数据保存在节点本地 —- 而不是 HDFS 集群上,原因是整个 job 结束后,作为中间结果的 map 输出值可以被丢掉。如果在 map 输出结果被上传给 reduce task 之前节点就崩溃了,hadoop 会重新选一个节点重新执行一遍 map task

对于 reduce task 而言,没有 locality 而言,因为基本上每个 reduce task 的输入都是所有 map task 的输出。
对于单个 reduce task 而言,整个数据流程如下图所示:

关于 HDFS 的 rack, 可以参见 这篇文章, 这篇文章 以及 这篇文章

reduce task 的数量并不是靠输入数据的大小决定的。你可以通过 setReduceTaskNum() 方法手动控制。以后会详细介绍。

当有多个 reduce task 时,如下图所示:

假设有 N 个 reducer task, map tasks 会把输出做分区(partition),而且每个 map task 的输出分为 N 个分区,每个 reducer task 都接收一个分区的数据。在每个 分区 中,可以有多个 key,并且每个 key 对应的所有的 value 都在同一个分区中。

既然有分区的概念,当然就有 分区函数,通常来说默认的分区函数就足够好了,以后再详细说明。

结合函数

由于在 Hadoop 集群中带宽是非常昂贵的资源,所以在节点间传输的数据量越小越好。因此可以通过使用 combiner functionsmap task 的输出做聚合处理来减小需要传输给 reducer task 的数据量。

通常来说,Combiner class 可以就用 Reducer class:

job.setCombiberClass(Reducer.class);

当然也可以不相同,但这个 CombinerClass 也得实现 Reducer 接口。

Combine 前:Reduce 的 input records 和 map output records 一样

Combine 后:Reduce 的 input records 和 map task 的数量一样

Combiner Function 可以减小 网络 IO,但会增加 磁盘 IO。

P.S. 本文相关的代码可以在 这个 Repo 中看到。