当数据集数量超过单机能承受的范围时,就需要使用分布式文件系统了。但和其他分布式系统一样,由于有网络编程模型的介入,其复杂程度变得不可估量。

Hadoop 拥有一套通用的文件系统抽象逻辑,可以接入本地文件系统、Amazon S3、Ceph 等文件系统,但 HDFS 才是 Hadoop 的主要使用的分布式文件系统。

HDFS 设计

HDFS 的架构都写在了 这篇文章里。
这里简单介绍其设计目的:

  • 超大文件
    文件大小可以大到 T, P 级别

  • 流式数据
    设计基于 一次写入,多次读取 的模式,而且假设每次数据读取都会读取这份数据的绝大部分。因此读取整个数据集需要的时间比读取第一个数据记录花费的时间更重要。

  • 商用硬件
    这个意思是 Hadoop 集群假设你的硬件设备很容易坏,因此从软件层面做容错处理。至少是对用户透明的。

但 HDFS 也不是万能的,下面列出其不适用的场景:

  • 对数据的快速获取
    HDFS 系统是为传输和存储超大(量)文件而设计的,因此会牺牲时间性能。可以考虑使用 HBase 来做对低延时的服务。

  • 大量小文件
    由于 namenode 把 HDFS 文件系统的所有元信息都保存在 内存,因此系统中的文件数量受限于 namenode 的内存大小。

  • 单个文件的多写操作
    目前 HDFS 中的文件都是由单个 writerappend-only 的方式写入的。

HDFS 概念

磁盘有 block 的概念,一个 block 是磁盘可以读写的最小单元。文件系统依靠 block 来管理数据。

HDFS 也有 block 的概念,默认每个 block 128 MB。HDFS 中的文件被分割为 128 M 的 blocks,这些 block 单独存储。如果一个文件小于 block size,它不会独占这个 block,只会占它原来的大小。

HDFS 中 block 这么大的原因是:减少查找数据的时间。随着磁盘的 IO 性能提升, blcok size 会随之增加。

抽象出 block 概念的意义:

  • 可以存储大于集群中任意节点磁盘容量的文件;
  • 简化了存储子系统
    • 由于 block 是大小固定的,所以可以容易地知道系统还剩多少空间;
    • 由于 block 不是文件,只是一堆数据,所以文件元数据(比如权限)不必随数据一起存储,而可以用另外一个系统管理。
  • block 的概念很利于制作副本。通常每个 block 会被复制多份(通常是 3 份)到不同的节点上。

使用 hdfs fsck / -files -blocks 命令来检查系统健康程度。

Namenodes 和 Datanodes

Namenode (Master) 管理文件系统树和系统中所有文件夹和文件的元数据。这些数据被以两种形式存在 Namenode 的本地磁盘上:namespace imageedit log

Datanode 存储真正的数据,并向外提供数据,也持续向 Namenode 汇报所持有的 blocks 信息。

由于 Namenode 的重要性(如果 Namenode 所在的节点挂了,那么就无法从 datanodes 中还原数据了),Hadoop 提供了两种机制来保证 Namenode 的高可用:

  • 备份所有的原信息到另一个系统:当 Namenode 向本地磁盘写数据的同时 同步地,原子地 向远程的某个文件系统写。
  • 提供一个 secondary namenode,这个节点会持续把 edit log 合并至 namespace image。这个节点通常也会保持同样一份 namespace image 到这个节点的本地。

Block 缓存

对于热数据,datanode 会存在堆外内存 block cache 中。

HDFS Federation

Namenode 内存是限制 HDFS 集群扩展的原因。
HDFS Federation 允许集群通过增加 Namenode 来扩展。也就是每个 namenode 管理 HDFS 系统中的一部分文件。

HDFS 高可用

当 Namdenode 挂掉后,可以通过 primary namenode 重启,但这个过程要经历:加载 namespace image, 读 metadata, 读配置文件等,会花的时间很长。

Hadoop 2 通过实现 HDFS High Availability(HA):有一对 namenodes 以 active-standy 的配置存在。当主 namenode 挂了后,standby namenode 向外提供 HDFS namenode 服务:

  • active, standby namenode 必须随时共享存储和 edit log;
  • datanodes 必须同时向两个 namenode 同步信息;
  • HDFS 客户端必须要知道这种 崩溃恢复 机制;

Hadoop Filesystems

Hadoop 对文件系统有一个抽象,而 HDFS 只是其中的一个实现。

org.apache.hadoop.fs.FileSystem 代表客户端接口,具体的实现有下面这些:


Hadoop 使用 scheme 的形式来对应正确的文件系统:

hadoop fs -ls file:///

表示 列出本地文件系统数据。

Java 接口

Hadop URL 读数据

java.net.URL 结合 org.apache.hadoop.fs.FsUrlStreamHandlerFactory 可以以数据流的形式读出 HDFS 文件。具体怎么操作参见这个文件

FileSystem API 读数据

HDFS 系统中,用 Hadoop Path 表示一个文件。可以使用 FileSystem.open(new Path())

具体例子可以参见 这个文件

写数据

最简单的写数据的方法是创建一个用 Path 表示的 HDFS 文件 用于被写:

1
2
// create
public FSDataOutputStream create(Path f) throws IOException

另一个选择是:append

1
2
// append
public FSDataOutputStream append(Path f) throws IOException

append 的限制:同一时间只有一个 Writer append 到这个文件,靠 closed 来实现

例子可以参见这个文件

文件夹

1
public boolean mkdirs(Path f) throws IOExcepion

通常来说不用手动创建 文件夹,因为 create 方法会自动创建不存在的文件夹。

文件系统查询

FileStatus 类保存了一个文件(夹)的所有状态信息。

具体可以参见这个 测试文件

HDFS 接口还提供了 通配符 status 可以使用 正则通配符 来过滤或者查询想要的文件。

删除文件

直接使用这个接口

1
public boolean delete(Path f, boolean recursive) throws IOException

数据流

读文件剖析

  1. 客户端调用 FileSystem 对象的 open() 方法(对于 HDFS 而言就是 DistributedFileSystem)来试图打开文件;
  2. DFS 通过 RPC 调用向 Namenode 询问 文件的第一个 block 所在的位置;
    • namenode 会返回这个 block 所有备份所在的 datanode 的地址;
    • datanodes 会根据它们和 client 的位置关系排好序(根据 Hadoop 自身的网络拓扑);
    • 如果 client 所在的节点本身也是一个 datanode,如果这个 datanode 有备份就直接读本地文件;
    • DFS 返回一个 DFSInputStream,它会管理 datanodenamenode 的 IO;
  3. DFSInputStream 保存了文件的第一个 block 所在的位置最近的 datanode 地址,然后连接到这个 datanode,文件以流的形式返回给 DFSInputStream
  4. client 调用 DFSInputStreamread() 方法,数据被返回给 client;反复这个操作直到这个 block 被读完;
  5. DFSInputStream 关闭和 datanode 的连接,然后找下一个 block,反复这个操作;
  6. 当文件所有的 block 都被读完后,关闭 client 到 DFSDataInputStream 的连接。

写文件剖析

  1. 客户端调用 DFScreate() 方法来试图创建一个文件;
  2. DFS 调用一个 RPC 方法试图在 namenode 创建一个文件;
    • namenode 做一些检查确保文件可以被创建(包括权限,是否已经存在等);
    • DFS 给 客户端 返回一个 FSDataOutputStream 用于写入数据;FSDataOutputStream 包含一个 DFSOutputStream 对象,它负责和 namenode 以及 datanode 通信;
  3. 客户端写数据时,DFSOutputStream 把文件分割成多个包,然后把这些包放进一个内部队列: data queue;
    • 有一个 DataStreamer 去消费这个队列,而且这个 DataStreamer 还负责向 namenode 询问应该将数据放在哪些 datanode 上;
  4. 这些 datanodes 组成一个 pipelineDataStreamer 将数据包先流经第一个 datanode 然后依次到最后一个;
  5. DFSOutputStream 还会维护一个 ack queue 用于表明一个 数据包被成功写入 datanodes,只有当一个 pipeline 中的所有 datanodesack 了才会将这个数据包从 ack queue 中摘除;
  6. 当 客户端 完成了写数据,调用 close() 方法关闭数据流;
  7. DFS 通知 namenode 写入已经完成;

一致性模型

HDFS 为了性能牺牲了一些一致性要求。
创建一个新的文件后,在这个 DFS 中,文件时可见的:

1
2
3
Path p = new Path("p");
fs.create(p);
assertThat(fs.exists(p), is(true));

但任何写入这个文件的内容都不被保证是可见的:

1
2
3
4
5
6
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("conten".getBytes("UTF-8"));
out.flush();

assertThat(fs.getFileStatus(p).getLen(), is(0L));

只有当当前 block 的下一个 block 的数据被写入后,当前 block 的数据才能保证可见;

HDFS 提供了一个方法 hflush() 来保证这个事情:

1
2
3
4
5
6
Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("conten".getBytes("UTF-8"));
out.hflush();

assertThat(fs.getFileStatus(p).getLen(), is((long) "content".length()));

在 out 上调用 close()hflush() 功效一样;

然而,hflush() 只能保证数据被写入了所有 datanodes 的内存而不是磁盘上;

为了解决这个问题,HDFS 提供了一个 hsync() 方法来做这个事情;

distcp

hadoop distcp 最常见的使用场景是在两个 HDFS 集群间移动数据:

1
hadoop distcp -update -delete -p hdfs://namenode1/foo hdfs://namenode2/foo

如果两个 HDFS 集群的版本不同,可以这样:

1
hadoop distcp webhdfs://namenode1:50070/foo webhdfs://namenode2:50070/foo

hadoop distcp 命令不仅可以在 HDFS 系统间移动文件,也可以用于在同一个 HDFS 内移动:

1
2
hadoop distcp file1 file2
hadoop distcp dir1 dir2

distcp 是用 MapReduce 方法实现的。每个文件都用一个 mapper 拷贝,没有 reducer。