Flink 没有自己的源系统和数据存储系统,所以都要依赖外部系统。

应用一致性保证

当应用失败时,Flink 可以从 Checkpoint 中恢复所有的状态。

但为了实现数据一致性保证,这远远不够。因此需要在 sourcesink 端也介入 checkpoint 和 恢复机制。

Alt text

为了提供应用的 exactly-once 状态一致性保证,每个 source 都需要有 checkpoint 机制来保存上次读的位置:

  • 提供保存已读 offset 功能的文件系统;
  • Kafka source;

而如果 source 不提供保存已读位置的功能,则 Flink 只能提供 at-most-once 保证。

但是

即使 source 和 Flink 都提供了保存已读功能,已经触发过的状态也有可能重复触发。

为了提供真正的 端对端exactly-once 保证,还需要特殊的 sink connector,有两种技术可以实现这种功能:

  • 幂等写操作
  • 事务写操作

幂等写

一个幂等操作可以被执行多次而不影响结果。
比如,向一个 HashMap 重复插入同一个 KV 对就是幂等操作。 而追加操作就不是。

事务写

只将在上一个 checkpoint 成功之前的结果数据写入外部存储。

Flink 提供了两种机制来实现事务 Sink Connector

  • Write-Ahead-Log(WAL)
    WAL Sink 将所有数据写到 StateBackend 中,一旦接收到 checkpoint 已经完成的通知后,向外部 Sink 触发写入;
    WAL Sink 可以适用于所有的 Sink 系统。
  • 两阶段提交 (Two-Phase-Commit)
    2PC 要求 Sink 系统能够提供 事务 支持。
    对每次 checkpoint,sink 会开启一个 事务,然后将所有接收到的数据追加到事务中:
    • 先写入,但是不提交
    • 等收到 Checkpoint 成功的通知后,再提交,并将写入的结果物化

内置的连接器

Kafka Source

Flink 的 Kafka 连接器并行地从一个 Kafka topic 摄入数据。

源 Operator 的每个并行实例从 Kafka 的分区读取数据: 每个实例从多个或者零个Kafka分区中读取。

  • 每个 源 Operator 会跟踪它的 Kafka 分区的当前已读 Offset将这个 Offset 数据保存在 checkpoint 数据中
  • 当从失败任务中恢复时,已读 Offset 被恢复,然后 源 Operator 从这个 Offset 继续从 Kafka Topic 读数据。

Alt text

1
2
3
4
5
6
7
8
9
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")

val stream: DataStream[String] = env.addSource(
new FlinkKafkaConsumer011[String](
"topic",
new SimpleStringSchema(),
properties))

三个参数:

  1. Topic 名称,可以使一个,也可以是多个,当是多个的时候,Flink 会将所有 topic 的数据都混在一起;
  2. DeserializationSchema 或者 KeyedDeserializationSchema
    Kafka 消息会以原始的 Byte 形式保存,所以需要被序列化成 Java 或者 Scala 对象;
  3. Properties 对象。用于 Kafka 客户端的连接配置。

Kafka Sink Connector

1
2
3
4
5
6
7
8
val stream: DataStream[String] = ...

val myProducer = new FlinkKafkaProducer011[String](
"localhost:9092", // broker list
"topic", // target topic
new SimpleStringSchema) // serialization schema

stream.addSink(myProducer)

在一致性上,依赖于 Kafka 的版本:

  • Kafka 0.8.x
    不提供任何保证:数据可能被写入 0 次,1 次或者多次。
  • Kafka 0.9.xKafka 0.10.x
    可以提供 at-least-once 保证,条件如下:
    • checkpoint 机制开启,并且所有的数据源都是可重置的;
    • 当写入不成功时,Sink Connector 会报错,然后应用挂掉并恢复;
    • Sink Connector 会等待 Kafka 的 ACK,知道完成 Checkpoint。
  • Kafka 0.11.x
    提供了 事务写 功能。当 FlinkKafkaProducer011 构造器中提供的 Semantic 参数:
    • Semantic.NONE: 不提供任何保证;
    • Semantic.AT_LEAST_ONCE: 保证数据不丢,但可能会重复写入。这是默认参数;
    • Semantic.EXACTLY_ONCE: 保证只读一次。

文件系统 Source Connector

Flink 可以从文件系统中读取数据流并且实现 可重置 功能。

1
2
3
4
5
6
7
8
val lineReader = new TextInputFormat(null) 

val lineStream: DataStream[String] = env.readFile[String](
lineReader, // The FileInputFormat
"hdfs:///path/to/my/data", // The path to read
FileProcessingMode
.PROCESS_CONTINUOUSLY, // The processing mode
30000L) // The monitoring interval in ms

参数:

  • FileInputFormat
    负责读文件的内容;
  • 文件路径,可以是单个文件,也可以是文件夹;
  • 读文件模式:
    • PROCESS_ONCE:只读这个文件路径一次;
    • PROCESS_CONTINUOUSLY:以固定频率扫描文件路径,如果有新文件,就持续读。
      在这种模式下,常见做法是,将要读的文件持续的写入一个临时文件夹,等文件写完后,将文件转移到真正监控的 Path
  • 上述读模式如果是 PROCESS_CONTINUOUSLY,固定的频率。

FileInputFormat

FileInputFormat 分两步从文件系统读文件:

  1. 扫描对应路径的文件,然后为所有匹配的文件生成 input splits
    每个 input split 都是文件的一部分:包含开始 offset 和长度;
    splits 可以被分布到多个 reader 任务以并行地读取文件内容。
  2. 接收 split,读取文件内容,返回全部结果。

为了实现 Exactly-Once 保证,FileInputFormat 还需要实现 CheckpointableInputFormat 接口,否则,由于每次需要从头重读split文件内容,所以只能提供 At-Least-Once 保证。

文件系统 Sink Connector

Flink 的文件系统 sink connector 被称作 BucketingSink,可以提供端到端的 exactly-once 保证,只要能保证下述条件:

  • source operator 是可重置的;
  • 启用了 Exactly-Once 的 checkpoints;

一个简单的例子:

1
2
3
4
val input: DataStream[String] = …
val fileSink = new BucketingSink[String]("/base/path")

input.addSink(fileSink)

一个 bucket 是由 Bucketer 选择的,Bucketer 会决定数据记录存在哪个文件夹下面:

  • Bucketer 可以由 BucketingSink.setBucketer() 方法设置
  • 默认为 DateTimeBucketer,用于生成基于processing-time的小时级别的 Bucketer
  • 路径为 [base-path]/[bucket-path]/[part-prefix]-[task-no]-[task-file-count]/johndoe/demo/2018-07-22--17/part-4-8
  • bucketer 会生成一个新的 part-file,基于两个条件:
    • 当前文件大小大于BucketingSink.setBucketSize()设置的阈值,默认为 384M;
    • 如果在固定时间内没有新的数据进入Bucketer,由 BucketingSink.setInactiveBucketThreshold() 设置,默认为:1 分钟;

数据通过一个 Writer 被写入 part-file

  • 默认的 writer 是 StringWriter,通过调用数据的 toString() 方法将数据写入文件,并用 换行符 分隔;
  • 可以通过调用 BucketingSink.setWriter() 方法设置其他的 Writer: 比如 SequenceFileWriter

BucketingSink 提供了 exactly-once 保证,方法是将写入数据的过程分解为多个阶段:

  • in-progress
    当一个 sink 向文件写入数据的过程中时,文件为 in-progress 状态;
  • pending
    当超过大小限制,或者活跃时间限制时,状态变为 pending;
  • finished
    当下一个 checkpoint 成功后,变为 finished 状态。

自定义 Source Function

两个接口:

  • SourceFunctionRichSourceFunction 用于定义非并行的 source-connector
  • ParallelSourceFunctionRichParallelSourceFunction 用于定义并行的任务。

两个方法以供覆盖:

  • void run(SourceContext<T> ctx)
    真正接收数据并摄入系统的方法。
    run() 方法只会被调用一次,并在一个无限循环中不停地接收数据。并且在一个单独的线程中运行
    任务可以被显示地取消:如果是有限流。
  • void cancel()
    当应用被取消或者关闭时,cancel() 方法会被调用:以执行 graceful shutdown,当 cancel() 被调用时,另一个线程中的 run() 方法会被立即终止。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class CountSource extends SourceFunction[Long] {
var isRunning: Boolean = true

override def run(ctx: SourceFunction.SourceContext[Long]) = {

var cnt: Long = -1
while (isRunning && cnt < Long.MaxValue) {
cnt += 1
ctx.collect(cnt)
}
}

override def cancel() = isRunning = false
}

可重置的 Source Function

如果想让 source function 支持回放功能,就必须要接入 Flink 的 checkpointing 机制,并且在 checkpoint 成功后必须将当前的已读 offset 持久化。因此,需要实现 CheckpointedFunction 接口。

更重要的是,要保证 SourceFunction.run() 方法不会在 CheckpointedFunction.snapshotState() 方法被调用前提前读 offset 并发送数据:

通过将 run() 方法中的读取 offset 和触发数据的代码放在一把中,这把锁可以通过 ctx.getCheckpointLock 获得:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class ResettableCountSource
extends SourceFunction[Long] with CheckpointedFunction {

var isRunning: Boolean = true
var cnt: Long = _
var offsetState: ListState[Long] = _

override def run(ctx: SourceFunction.SourceContext[Long]) = {
while (isRunning && cnt < Long.MaxValue) {
// synchronize data emission and checkpoints
ctx.getCheckpointLock.synchronized {
cnt += 1
ctx.collect(cnt)
}
}
}

override def cancel() = isRunning = false

override def snapshotState(snapshotCtx: FunctionSnapshotContext): Unit = {
// remove previous cnt
offsetState.clear()
// add current cnt
offsetState.add(cnt)
}

override def initializeState(
initCtx: FunctionInitializationContext): Unit = {

val desc = new ListStateDescriptor[Long]("offset", classOf[Long])
offsetState = initCtx.getOperatorStateStore.getListState(desc)
// initialize cnt variable
val it = offsetState.get()
cnt = if (null == it || !it.iterator().hasNext) {
-1L
} else {
it.iterator().next()
}
}
}

Source Functions, Timestamps, Watermarks

Source Function 通过 SourceContext 来分配时间戳并触发水位

  • def collectWithTimestamp(T record, long timestamp): Unit
  • def emitWatermark(Watermark watermark): Unit

两个问题:

  1. 数据源如果有分区的概念(比如 Kafka 的 topic 分区),而 Source Function 的并行度小于分区数量,则会发生一个 Source Function 实例处理多个分区的情况:这会造成分区间数据的事件发生时间乱序的情况。
    Flink 通过取所有分区的最小 Watermark 作为整个数据源的水位 来解决这个问题。
  2. 如果某个 Source Function 的实例暂时闲置,会造成这个实例不触发数据的情况。而这会造成整个应用停止更新 Watermark 从而拖住整个应用。
    Flink 提供了一个 SourceContext.markAsTemporarilyIdle() 方法来将一个没有接收到数据的实例置为闲置状态,从而使 Flink 的水位线机制忽略这个实例。

自定义 Sink Function

通过实现 SinkFunction 或者 RichSinkFunction 接口来实现自定义 Sink Function:

  • void invoke(IN value, Context ctx)
    ctx 提供了对 processing time, event time, 水位 等元信息的获取功能。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
val readings: DataStream[SensorReading] = ???

// write the sensor readings to a socket
readings.addSink(new SimpleSocketSink("localhost", 9191))
// set parallelism to 1 because only one thread can write to a socket
.setParallelism(1)

// -----

class SimpleSocketSink(val host: String, val port: Int)
extends RichSinkFunction[SensorReading] {

var socket: Socket = _
var writer: PrintStream = _

override def open(config: Configuration): Unit = {
// open socket and writer
socket = new Socket(InetAddress.getByName(host), port)
writer = new PrintStream(socket.getOutputStream)
}

override def invoke(
value: SensorReading,
ctx: SinkFunction.Context[_]): Unit = {
// write sensor reading to socket
writer.println(value.toString)
writer.flush()
}

override def close(): Unit = {
// close writer and socket
writer.close()
socket.close()
}
}

为了提供端对端 exactly-once 的一致性保证,SinkFunction 应该要么实现幂等或者事务的 sink connector

幂等 Sink Connector

在大多数情况下,只要满足下面两个条件,SinkFunction 接口就足够实现幂等 sink connector:

  1. 结果数据需要一个确定的(组合)Key,幂等的更新操作可以在这个 Key 上进行;
  2. 外部系统必须能支持在一个 Key 上的更新操作,比如一个关系型数据库或者一个 Key-Value 存储系统。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
val readings: DataStream[SensorReading] = ???

// write the sensor readings to a Derby table
readings.addSink(new DerbyUpsertSink)

// -----

class DerbyUpsertSink extends RichSinkFunction[SensorReading] {
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _

override def open(parameters: Configuration): Unit = {
// connect to embedded in-memory Derby
conn = DriverManager.getConnection(
"jdbc:derby:memory:flinkExample",
new Properties())
// prepare insert and update statements
insertStmt = conn.prepareStatement(
"INSERT INTO Temperatures (sensor, temp) VALUES (?, ?)")
updateStmt = conn.prepareStatement(
"UPDATE Temperatures SET temp = ? WHERE sensor = ?")
}

override def invoke(r: SensorReading, context: Context[_]): Unit = {
// set parameters for update statement and execute it
updateStmt.setDouble(1, r.temperature)
updateStmt.setString(2, r.id)
updateStmt.execute()
// execute insert statement if update statement did not update any row
if (updateStmt.getUpdateCount == 0) {
// set parameters for insert statement
insertStmt.setString(1, r.id)
insertStmt.setDouble(2, r.temperature)
// execute insert statement
insertStmt.execute()
}
}

override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}

上述代码实现了一个简单的幂等 UPSERT 操作。

事务 Sink Connector

Flink 提供了两个模板来用于实现自定义的事务 Sink 算子,两者都实现了 CheckpointListener 接口–用于接收来自 JobManager 的完成了的 checkpoints:

  • GenericWriteAheadSink

    1. 收集每个 checkpoint 的所有数据,并保存在 sink 任务的算子状态中。
    2. 这些状态也会被 Checkpointed。
    3. 当任务接收到 checkpoint 完成 的通知后,将这个 checkpoint 对应的所有数据都写入到外部系统。

      只能做到 at-least-once 的一致性保证:只要上述第二步的状态在应用失败前还没有被正确 Checkpointed,就有可能会重复 Commit.

  • TwoPhaseCommitSinkFunction
    这个接口会利用外部系统自身的事务

    1. 对每个 checkpoint,开启一个新的事务,并在这个事务的上下文下将所有的数据都传给外部系统;
    2. 当接收到 checkpoint 完成 的通知时,将事务提交了。

      通常来说,在分布式系统中使用两阶段提交是一个非常昂贵的操作。
      但是在 Flink 的语境中,每个 checkpoint 只会运行一次这个协议,因此带来的开销并不大。
      2PC方法的运作方式和 WAL sink 相似,但是它不会将数据存储在 Flink 应用状态中,而是将数据写在外部系统的事务中。

异步接入外部系统

一个常见的需求是:

对数据流中每个数据而言,需要去外部存储系统中查询相关的其他信息,补充到数据中,然后形成一条新的数据流。

通常可以这样做:

实现一个 MapFunction, 为每个数据查询外部存储系统中的相关信息,等待返回结果,然后形成新的数据流。

这么做很简单直接,但是会有每个数据的延时问题。

Flink 提供了一个 AsyncFunction 来减轻外部 IO 调用的延时。前提是外部系统能支持异步调用。

AsyncFunction 的接口如下:

1
2
3
trait AsyncFunction[IN, OUT] extends Function {
def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
}

asyncInvoke 方法会在每个数据调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
val readings: DataStream[SensorReading] = ???

val sensorLocations: DataStream[(String, String)] = AsyncDataStream
// orderedWait 可以保证输出的数据和输入的顺序是一样的
// unorderedWait 只能保证 水位 和 checkpoints 是对齐的的。
.orderedWait(
readings,
new DerbyAsyncFunction,
5, TimeUnit.SECONDS, // timeout requests after 5 seconds
100)
// at most 100 concurrent requests

class DerbyAsyncFunction
extends AsyncFunction[SensorReading, (String, String)] {

// caching execution context used to handle the query threads
private lazy val cachingPoolExecCtx =
ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
// direct execution context to forward result future to callback object
private lazy val directExecCtx =
ExecutionContext.fromExecutor(
org.apache.flink.runtime.concurrent.Executors.directExecutor())

/**
* Executes JDBC query in a thread and handles the resulting Future
* with an asynchronous callback.
*/
override def asyncInvoke(
reading: SensorReading,
resultFuture: ResultFuture[(String, String)]): Unit = {

val sensor = reading.id
// get room from Derby table as Future
val room: Future[String] = Future {
// Creating a new connection and statement for each record.
// Note: This is NOT best practice!
// Connections and prepared statements should be cached.
val conn = DriverManager
.getConnection(
"jdbc:derby:memory:flinkExample",
new Properties())
val query = conn.createStatement()

// submit query and wait for result. this is a synchronous call.
val result = query.executeQuery(
s"SELECT room FROM SensorLocations WHERE sensor = '$sensor'")

// get room if there is one
val room = if (result.next()) {
result.getString(1)
} else {
"UNKNOWN ROOM"
}

// close resultset, statement, and connection
result.close()
query.close()
conn.close()
// return room
room
}(cachingPoolExecCtx)

// apply result handling callback on the room future
room.onComplete {
case Success(r) => resultFuture.complete(Seq((sensor, r)))
case Failure(e) => resultFuture.completeExceptionally(e)
}(directExecCtx)
}
}

可以看到,应用 AsyncFunciton 的异步算子是被 AsyncDataStream 调用的。

不要这样做:

  • 不要发送一个会阻塞 asyncInvoke() 方法的请求
  • 不要在 asyncInvoke() 方法中等待请求的完成。

(To be continued…)