系统架构

一套可运行的 Flink 环境由 4 个部分组成:

  1. JobManager
  2. ResourceManager
  3. TaskManager
  4. Dispatcher

这 4 个组件都运行在 JVM 中。

Alt text

组件间的交互如上图所示。

各部分的职责是这样的:

  • JobManager
    JobManager 是控制每个应用运行的主进程JobManager接收应用程序用于运行。
    一个应用程序包括一个 JobGraph,一个logical dataflow graph以及一个包含所有需要的类,库和其他资源的 JAR 包。
    JobManagerJobGraph 转换为物理执行计划 ExecutionGraph,这个执行计划中包含被并行执行的任务。
    JobManagerResourceManager 请求运行资源(即 TaskManager slots)来运行任务,一旦收到足够多的 TaskManager Slost,就将任务分发给 TaskManagers 来执行任务。
    在任务运行过程中,JobManager 负责所有需要中央协调的操作,比如 checkpoints 操作。

  • ResourceManager
    Flink 提供了多种(YARN, Mesos, K8S 等)基于不同资源提供者的 ResourceManager 的实现。
    ResourceManager 负责管理 TaskManager Slot —- Flink 的处理资源单位。
    除了负责向 providers 申请资源,ResourceManager 还负责释放空闲的资源。

  • TaskManager
    这是 Flink 的 Worker 进程。
    通常来说,一个 Flink 应用中有多个 TaskManager 在运行着。
    一个 TaskManager 提供固定数量的 slots,slots 的数量限制了一个 TaskManager 可运行的任务的数量。
    TaskManager 启动后向 ResourceManager 注册 slots,然后 ResourceManager 会指导它向 JobManager 提供一个或多个 slots。
    JobManager 随后分配任务给这些 slots 来运行。在运行过程中,不同 TaskManager 之间可以交换数据。

  • Dispatcher
    Dispatcher 是在所有 Job 之上的概念,它提供了一个可以提交应用运行的 REST 接口。
    一旦接收到了一个应用程序,就启动一个 JobManager 并把应用程序交给她开始运行。
    Dispatcher 还提供了一个 Web Dashboard 提供 Job 的运行信息。

任务执行

一个 TaskManager 可以同时运行多个任务。这些任务可以是属于同一个算子(数据并行),也可以属于不同算子(任务并行),甚至属于不同的应用(Job 并行)。

一个 TaskManager 提供固定数量的处理 Slot 用以控制并发的任务数。每个 Slot 负责执行一个任务。

Alt text

关系如上图所示。

JobGraph 中,有 5 个算子。A 和 C 是数据源,E 是 Sink。C 和 E 有两个并行任务,其他都有 4 个。

由于最大的并发度是 4 ,所以至少需要 4 个 slot。

将任务调度到 Slots 的过程还需要考虑数据交换过程中的网络传输问题,以尽量少地网络 IO 为标准。

每个 TaskManager 代表一个 JVM 进程,每个运行的任务代表一个 线程。

由于线程的隔离性没有进程高,所以有可能会一个有问题的 任务 导致整个 TaskManager 进程挂掉以至于其中所有的任务挂掉,所以,一个 TaskManager 最好值运行一个 应用的任务。

高可用

Flink 的高可用依赖于 ZooKeeper。

Flink 使用 ZK 来做 leader 选举、高可用以及数据存储。

在高可用模式下,JobManager 会将 JobGraph 和应用程序所需的所有数据和资源(包括 JAR 包) 写到一个远程存储系统中(比如 HDFS)。另外,JobManager 会写一个 pointer 到 ZK 的数据存储中。在整个应用程序的运行过程中,JobManager 会接收到单个任务的checkpointsstate handles 即存储位置,一旦所有任务成功将状态写入到远程存储中,JobManager 就会将 state handles 写入到远程存储中并且向 ZK 写入 pointer。

Alt text

当一个 JobManager 挂掉后,所有相关的 task 都自动被取消,一个新的 JobManager 会经过如下步骤接手:

  1. 向 ZK 请求 JobGraph 和相关所有资源存储的位置,以及最后一次 checkpoint 的 state handles 的地址。
  2. ResourceManager 请求 slots 用以继续执行应用程序。
  3. 重启应用并重新设置所有 task 的状态到上一次 checkpoint 状态。

数据传输

数据传输由 TaskManager 负责。

数据在发送和接收前会先缓存为一批。这是高吞吐的前提。这也说明了 Flink 的处理模型是基于微批处理的。

每个 TaskManager 都有一个网络 buffer 池,默认大小为 32KB。

  • 假设一个任务要给 n 个其他的任务发送数据,那么这个任务需要 n 个发送缓冲区;
  • 相对应的,如果一个任务需要从 m 个其他任务接收数据,那么这个任务需要 m 个接收缓冲区。

如图所示:

Alt text

要发送给另一个 TaskManager 的缓存会在同一个网络连接上多路复用。

另外,如果数据交换发生在同一个 TaskManager 的不同 slots 之间,sender 任务将数据序列化为 byte buffer 后将缓存放进一个队列中,接收者任务从这个队列中抓出数据并反序列化。因此没有网络 IO。
由于序列化操作非常消耗计算资源,因此,Flink 可以在一定程度下将多个算子当做一个任务。同一个任务中的算子通过给内嵌函数传值来交流,这样可以避免序列化。

高吞吐和低延迟

数据缓冲虽然可以提升带宽利用率,但在流式处理的语境下,会带来一定程度的延迟。

Flink 提供了一个 timeout 来尽量避免缓冲带来的延迟问题: 无论缓冲区有没有填满,只要到 timeout 了,就发送数据。

反压(back pressure)

当一个任务的处理速率小于数据输入的速率时,stream 处理器应该限制应用程序从数据源摄入数据的速率,这种限流的场景可以轻易地被监测到,通常的解决办法是增加计算资源并增加瓶颈算子的并发度。

上述流控制技术被称之为 反压

Flink 的反压描述如图所示:

Alt text
  1. 当接收的 TaskManager 处理不过来太多输入数据时,先将接受到的数据放到缓冲队列。
  2. 到某个时候,接收 TaskManager 的缓冲池耗尽再也接收不了传输来的数据时,发送 TaskManager 开始缓冲要被传输的数据,直到缓冲池被耗尽;
  3. 最后,发送 TaskManager 也不能发送数据了,这时这个任务就被 Block 了,总体来看,这个应用就会被 block.

事件时间处理

Flink 提供了直观易用的常见 事件时间 处理接口,同时也提供了更高级的处理接口。要理解 Flink 的事件时间处理机制有两个概念需要详细介绍:

时间戳

Flink 事件时间应用处理的每份记录都必须有一个时间戳。这个时间戳的具体含义可以自定,只要大致上它的顺序的递增的。

水位

水位线 用于描述每个任务当前的事件时间。
基于时间的算子会根据这个时间来触发相应的计算和处理。

Flink 将水位当作特殊的保存了一个时间戳的数据记录:

Alt text

水位线有两个基本的属性:

  1. 必须单调递增;
  2. 必须和记录时间戳相关。一个标志为 t 的水位表示它之后的所有记录的时间戳都必须 > t。

水位线用于通知任务没有其他更早的事件了,可以触发计算了。但是,如果处理之后又遇到了更早的事件,被称之为 late records。后面会详述。

水位和事件时间

Flink 中的每个任务都有一个内置的时间服务用于维护 timers。一个 timer 在用于在将来某个时间做出某种计算

当一个任务接收到一个水位时:

  1. 根据水位线时间戳更新内部 事件时间 时钟。
  2. 任务时间服务可以找出所有小于这个水位时间的 timer,任务将在这些过期的 timer 上调用一个回调函数用于触发已经发生的事件。
  3. 任务触发这个有更新时间戳的水位。

Flink 通过分区并使用不同的任务来处理分区的方式并行地处理一个数据流。

一个分区就是有时间戳的数据记录和水位的一组数据流。

一个任务会从不同分区接收到数据记录和水位同时也会向其他分区发送记录和分区:

Alt text

一个 Flink DataStream 应用可以通过下面几种方式分配 时间戳 并生成 水位:

  • SourceFunction
  • AssignerWithPeriodicWatermarks
  • AssignerWithPunctuatedWatermarks

状态管理

总的来说,一个任务维护的所有数据和用于计算出这些结果的原始数据都属于这个任务的状态

可以将状态理解为一个任务的局部变量

Alt text

在 Flink 中,状态始终是和某个算子想关联的。为了让 Flink 运行时知道某个算子的状态,算子需要注册它的状态。

有两种状态:

  • Operator State
    作用域在一个任务内:所有在这个人物里处理的数据记录都访问的是这个状态,同时,其他任务的算子不能访问这个状态。

    Alt text
  • Keyed State

    Alt text

    可以将 keyed state 理解为一个 map。某个算子的所有并行的任务的 key。

State Backends

Flink 提供两种:

  • JVM Heap
  • RocksDB

有状态的算子的扩展

当输入数据量变化时,需要动态调整一个算子的并发数。

要挑战一个有状态算子的并发度是比较有挑战的,因为它们的状态需要重新分区并分配给更多或者更少的任务。

Alt text
Alt text
Alt text
Alt text

Checkpoints, Savepoints, State Recovery

作为分布式系统,Flink 同样需要处理失败恢复。

Consistent Checkpoints

一个 Flink 流应用的 consistent checkpoint:

  1. 暂停摄取所有的输入数据;
  2. 等待所有还在被处理的数据已经完全被处理完毕;
  3. 复制每个任务的状态到一个远程的持久化的存储系统,当所有任务都完成这个步骤后,一个 checkpoint 就算完成了;
  4. 恢复摄取数据。
Alt text

从 Consistent Checkpoints 恢复

当系统某个组件挂掉后,Flink 会使用最新的 checkpoint 来恢复应用的状态并重启整个过程。

Alt text

步骤如下:

  1. 重启失败的任务;
  2. 根据最新的 checkpoint 重置应用的所有状态:给每个任务重设状态值;
  3. 重新运行所有任务。

这种机制提供了失败恢复时状态的 exactly-once 一致性。

只有当所有的输入数据流以可重设消费的机制运行(比如 Kafka)时,Flink 才能保证状态恢复的 Exactly-once 一致性。

由于上述的恢复过程需要stop-the-world的过程,所以时延会很高。

Flink 使用的是分布式快照算法: Chandy-Lamport 算法。这个算法不用暂停整个应用,而是将每个任务的 checkpoint 过程解耦,当一些任务在持久化状态的时候其他任务仍然在工作。

这个算法的基础是一种特殊的数据记录:checkpoint barrier。和 watermark 类似,checkpoint barrier 是数据源算子注入到数据流记录中的。

每个 checkpoint barrier 都携带一个 checkpoint ID 来标识这个 barrier 属于哪一个 checkpoint。

下面举例说明这个算法:

这个应用从两个数据源消费递增的数字数据流。输出被分区为奇偶数,每个分区被一个计算接收到数字的和的任务处理。然后输出到各自的 sink。

如下图所示:

Alt text
  1. checkpoint 初始化:JobManager 发送一个有新的 checkpoint ID 的消息给每个数据源任务。如下图所示:

    Alt text
  2. 当数据源任务接收到消息后:

    • 暂停发送数据
    • 触发一次 checkpoint,将在 state backend 中的本地状态上传至远程存储
    • 并且将这次 checkpoint barrier 广播到所有和这个人物相关联的下游分区任务
    • 当 checkpoint 完成时,state backend 会通知任务,随后任务会将 checkpoint 告知 JobManager
    • 当所有的 barrier 都发送完毕后,源任务继续它的任务操作。
      Alt text
  3. 当一个(所有)分区从某一个上游任务中接收到一个新的 checkpoint 时:

    • 它会等待这个 checkpoint 的所有的(来自其他任务) barrier
    • 在等待的同时,它会继续处理来自那些还未发送 barrier 的分区的数据。
    • 而来自那些已经发送过 barrier 的分区的数据只能放在缓冲区域中,不能被立即处理。
      如图所示:
      Alt text
  4. 当所有 barrier 都被收到后:

    • 这个任务在 state backend 中初始化一个相应的 checkpoint
    • 将这个 checkpoint barrier 广播到所有下游任务
      如图所示:
      Alt text
    • 这个任务开始处理缓冲区中的数据,当缓冲中的所有数据都被处理完毕后,任务开始正常处理输入数据。
      如下图所示:
      Alt text
  5. 最终,checkpoint barrier 会到达 sink 任务。当 sink 任务接收到一个 barrier 后:

    • barrier 对齐
    • 将本地状态上传到远程存储系统
    • 通知 JobManager 自己收到了这个 barrier。
  6. 当 JobManager 收到所有任务的这个 checkpoint 的收据后,JobManager 会标识这个 checkpoint 已经全部完成。如图所示:

    Alt text

对上述算法,Flink 有两个优化:

  1. 在任务上传状态也就是 checkpointing 的过程中,这个任务的处理被 blocked 了,这回增加整个应用的时延。
    Flink 的做法是将状态上传的过程和职责交给 state backends:

    • 比如 RocksDB 就支持异步和增量地上传状态:异步表示会减小时延,增量表示会减小状态数据传输量;
  2. barrier 对齐过程中,由于数据缓冲区的存在,时延也会增加。
    Flink 的做法是:可以处理所有到来的数据,而不是等待并将数据放在缓冲区。这样的后果是:当前 checkpoint 可能包含下一个 checkpoint 才有的数据,在失败恢复过程中,就成了 at-least-once 而不是 exactly-once 一致性保证。

Savepoints

状态的一致性快照除了在失败恢复的过程提供支持外,还有更多其他的作用: savepoints

总的来说,savepoints 就是包含了更多附加元信息的 checkpoints,并且其生成算法和 checkpoints 一样。Flink 不会自动生成 savepoints,用户可以手动出发生成。

从一个 savepoint 启动一个应用可以:

  • 从 savepoint 启动一个不同的,但是兼容的应用
  • 启动一个相同的但是有不同并行度的应用,也就是说可以扩展
  • 在不同的集群上启动相同的应用:比如将应用迁移到一个新的 Flink 版本上,将应用迁移到不同的 DC。

(To Be Continued…)