许多 Flink 内置的算子、源处理器、Sink 处理器都是有状态的:会缓存到来的数据记录或者维护部分结果或者元数据。

此外,Flink 的 DataStream API 还暴露了接口以供注册,维护和获取 UDF 中的状态。

Flink 中的方法可以有两种状态:

  • 有键状态
  • 算子状态

有键状态

用户函数可以使用 KeyedState 来存取相关状态。对于一个 key 的每个不同的值,Flink 都会维护一个状态实例。

一个用户函数的所有有键状态实例会被分布到这个函数的所有并行实例上: 每个并行实例负责一定范围内的 key 和其对应的状态实例。

因此,有键状态就像一个分布式的 键值 Map

有键状态只能应用在 KeyedStream 上。

Flink 提供了几种内置的有键状态类型:

  • ValueState[T]
    保存单个类型为 T 的值。
    读:ValueState.value()
    写:ValueState.update(value: T)

  • ListState[T]
    保存一个链表,元素类型为 T
    追加新元素:ListState.add(value: T), ListState.addAll(values: java.util.List[T ])
    读:ListState.get(): Iterable[T]
    更新: ListState.update(values: java.util.List[T])

  • MapState[K, V]
    保存一个 map,提供和 Java 中的 Map 类似的方法:
    get(key: K), put(key: K, value: V), contains(key: K), remove(key: K), keys(), values()
    等。

  • ReducingState[T]
    提供和 ListState[T] 相同的方法(除了 addAllupdate)。
    add(value: T) 会调用 ReduceFunction 作用在新来的 value 和已存在的状态上。
    get() 方法返回的 Iterable 只有一个元素,就是聚合的结果值。

  • AggregatingState[I, O]
    ReducingState[T] 类似,只是调用 AggregateFunction 而不是 ReduceFunction

所有的状态类型都可以使用 State.clear() 清除掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
val sensorData: DataStream[SensorReading]  = ???
// partition and key the stream on the sensor ID
val keyedData: KeyedStream[SensorReading, String] = sensorData
.keyBy(_.id)


// apply a stateful FlatMapFunction on the keyed stream which
// compares the temperature readings and raises alerts.
val alerts: DataStream[(String, Double, Double)] = keyedSensorData
.flatMap(new TemperatureAlertFunction(1.1))

// --------------------------------------------------------------

class TemperatureAlertFunction(val threshold: Double)
extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {

// the state handle object
private var lastTempState: ValueState[Double] = _

override def open(parameters: Configuration): Unit = {
// create state descriptor
val lastTempDescriptor =
new ValueStateDescriptor[Double]("lastTemp", classOf[Double])
// obtain the state handle
lastTempState = getRuntimeContext
.getState[Double](lastTempDescriptor)
}

override def flatMap(
in: SensorReading,
out: Collector[(String, Double, Double)]): Unit = {
// fetch the last temperature from state
val lastTemp = lastTempState.value()
// check if we need to emit an alert
if (lastTemp > 0d && (in.temperature / lastTemp) > threshold) {
// temperature increased by more than the threshold
out.collect((in.id, in.temperature, lastTemp))
}
// update lastTemp state
this.lastTempState.update(in.temperature)
}
}

要创建一个状态对象:

需要通过 Flink 的 runtime RuntimeContext (通过 RichRunction 接口暴露的方法,比如 open 等获取) 注册一个 StateDescriptor

ListCheckpointed 算子状态

算子状态是由一个算子的每个并行实例单独维护的。

算子状态有三种类型: list state, list union state, broadcast state

可以通过实现 ListCheckpointed 接口来实现算子状态,具体来说是两个方法:

  • snapshotState(checkpointId: Long, timestamp: Long): java.util.List[T]
    当需要 checkpoint 时,会调用这个方法;
    返回一个元素为状态对象的 List 作为算子状态。
  • restoreState(state: java.util.List[T]): Unit
    每当函数的状态需要被初始化时被调用:

    • 任务开始
    • 从失败中恢复任务

      state 参数恢复出算子状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class HighTempCounter(val threshold: Double)
extends RichFlatMapFunction[SensorReading, (Int, Long)]
with ListCheckpointed[java.lang.Long] {

// index of the subtask
private lazy val subtaskIdx = getRuntimeContext
.getIndexOfThisSubtask
// local count variable
private var highTempCnt = 0L

override def flatMap(
in: SensorReading,
out: Collector[(Int, Long)]): Unit = {
if (in.temperature > threshold) {
// increment counter if threshold is exceeded
highTempCnt += 1
// emit update with subtask index and counter
out.collect((subtaskIdx, highTempCnt))
}
}

override def restoreState(
state: util.List[java.lang.Long]): Unit = {
highTempCnt = 0
// restore state by adding all longs of the list
for (cnt <- state.asScala) {
highTempCnt += cnt
}
}

override def snapshotState(
chkpntId: Long,
ts: Long): java.util.List[java.lang.Long] = {
// snapshot state as list with a single count
java.util.Collections.singletonList(highTempCnt)
}
}

Connected 广播状态

一种常见的场景:

将相同的信息分布到其他所有并行的节点上。比如一条数据流是「规则」,另一条数据流是「事件」,需要将「规则」应用于事件流中的所有事件。

应用规则的算子需要摄入两条数据流,并且将「规则」存放在 算子状态中,由于需要确保算子的每个并行实例都有完整的规则,所以应该将规则数据流广播。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
val keyedSensorData: KeyedStream[SensorReading, String] = 
sensorData.keyBy(_.id)

// the descriptor of the broadcast state
val broadcastStateDescriptor =
new MapStateDescriptor[String, Double](
"thresholds",
classOf[String],
classOf[Double])

val broadcastThresholds: BroadcastStream[ThresholdUpdate] =
thresholds.broadcast(broadcastStateDescriptor)

// connect keyed sensor stream and broadcasted rules stream
val alerts: DataStream[(String, Double, Double)] = keyedSensorData
.connect(broadcastThresholds)
.process(new UpdatableTempAlertFunction(4.0d))

// --------------------------------------------------------------

class UpdatableTempAlertFunction(val defaultThreshold: Double)
extends KeyedBroadcastProcessFunction[String, SensorReading, ThresholdUpdate, (String, Double, Double)] {

// the descriptor of the broadcast state
private lazy val thresholdStateDescriptor =
new MapStateDescriptor[String, Double](
"thresholds",
classOf[String],
classOf[Double])

// the keyed state handle
private var lastTempState: ValueState[Double] = _

override def open(parameters: Configuration): Unit = {
// create keyed state descriptor
val lastTempDescriptor = new ValueStateDescriptor[Double](
"lastTemp",
classOf[Double])

// obtain the keyed state handle
lastTempState = getRuntimeContext
.getState[Double](lastTempDescriptor)
}

override def processBroadcastElement(
update: ThresholdUpdate,
keyedCtx: KeyedBroadcastProcessFunction[String, SensorReading, ThresholdUpdate, (String, Double, Double)]#KeyedContext,
out: Collector[(String, Double, Double)]): Unit = {
// get broadcasted state handle
val thresholds: MapState[String, Double] = keyedCtx
.getBroadcastState(thresholdStateDescriptor)

if (update.threshold >= 1.0d) {
// configure a new threshold of the sensor
thresholds.put(update.id, update.threshold)
} else {
// remove sensor specific threshold
thresholds.remove(update.id)
}
}

override def processElement(
reading: SensorReading,
keyedReadOnlyCtx: KeyedBroadcastProcessFunction[String, SensorReading, ThresholdUpdate, (String, Double, Double)]#KeyedReadOnlyContext,
out: Collector[(String, Double, Double)]): Unit = {
// get read-only broadcast state
val thresholds: MapState[String, Double] = keyedReadOnlyCtx
.getBroadcastState(thresholdStateDescriptor)

// get threshold for sensor
val sensorThreshold: Double =
if (thresholds.contains(reading.id)) {
thresholds.get(reading.id)
} else {
defaultThreshold
}

// fetch the last temperature from keyed state
val lastTemp = lastTempState.value()

// check if we need to emit an alert
if (lastTemp > 0 &&
(reading.temperature / lastTemp) > sensorThreshold) {
// temperature increased by more than the threshold
out.collect((reading.id, reading.temperature, lastTemp))
}

// update lastTemp state
this.lastTempState.update(reading.temperature)
}
}

有状态应用的稳定性和性能

算子操控状态的方式会从各方面影响应用的性能和稳定性:

  • 状态存储方式
  • checkpoint 算法
  • 应用状态大小

选择状态存储方式

Flink 将流式应用的算子状态维护在一个 state backend 中。

状态后端负责存储每个任务实例的本地状态,并且在 checkpoint 发生的时候将其持久化到 远程存储 中。

InMemoryStateBackendFsStateBackend 将数据存储为普通的对象并存在 TaskManager 的 JVM 的 中:

  • 提供低延时的读写操作;
  • 应用的健壮性遍地:当状态量太大,可能会让 GC 时间变长甚至 OOM。

RocksDBStateBackend 则相反。

Checkpointing

当应用启用了 Checkpionting 机制,JobManager 会以固定频率启用 checkpoints。

checkpoint 的频率决定了 checkpoint 机制的开销:

  • 间隔越短,应用处理的开销越大,但从错误中恢复的时间越短。

更新算子

大多数情况下,应用都有变更的需求。

要保证状态不丢失,Flink 提供了 savepoint 机制

  1. 取运行的应用的 savepoint;
  2. 停止应用;
  3. 从 savepoint 启用新版本的应用。

限制:

新旧版本必须在序列化方面必须兼容,要考虑两个方面:读和写

注意:

为了 Savepoint 的兼容,算子应该总是用 uid 修饰!

1
2
3
4
val alerts: DataStream[(String, Double, Double)] = sensorData
.keyBy(_.id)
// apply stateful FlatMap and set unique ID
.flatMap(new TemperatureAlertFunction(1.1)).uid("alertFunc")

建议

强烈建议使用带有版本的序列化工具:Thrift, PB 等来定义状态变量的类。

防止状态泄露

所有的有状态算子都必须控制状态的大小,并且要保证状态不能无限增长下去。

重点:当状态不需要了的时候,删除掉!

可查询状态

Flink 向外部暴露了一种 可查询状态,外部系统可以查询这种状态。

这种机制由三个系统组成:

  • QueryableStateClient
    由外部系统使用,向 Flink 提交查询并从中获取结果;
  • QueryableStateClientProxy
    接收请求,并提供服务;
    由每个 TaskManager 运行,将请求传送给 QueryableStateServer,并将结果返回给请求方。
  • QueryableStateServer
    响应 ClientProxy 的请求。
    每个 TaskManager 都有一个 StateServer,它从 StateBackend 取回对应 Key 的状态值。

架构图如下:

Alt text

为了启用该功能,需要将 flink-queryable-state-runtime.jar./opt 文件夹拷贝到 ./lib 文件夹。

并且显示地声明:

1
2
3
4
5
6
7
8
9
10
11
override def open(parameters: Configuration): Unit = {

// create state descriptor
val lastTempDescriptor =
new ValueStateDescriptor[Double]("lastTemp", classOf[Double])
// enable queryable state and set its external identifier
lastTempDescriptor.setQueryable("lastTemperature")
// obtain the state handle
lastTempState = getRuntimeContext
.getState[Double](lastTempDescriptor)
}

一种快捷方式是将一整个数据流都保存为可查询状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val tenSecsMaxTemps: DataStream[(String, Double)] = sensorData
// project to sensor id and temperature
.map(r => (r.id, r.temperature))
// compute every 10 seconds the max temperature per sensor
.keyBy(_._1)
.timeWindow(Time.seconds(10))
.max(1)

// store max temperature of the last 10 secs for each sensor
// in a queryable state.
tenSecsMaxTemps
// key by sensor id
.keyBy(_._1)
.asQueryableState("maxTemperature")

从外部应用查询状态

任何 JVM 应用可以使用 QueryableStateClient 来查询:

1
2
3
4
5
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-queryable-state-client-java_2.12</artifactid>
<version>1.7.0</version>
</dependency>
1
2
val client: QueryableStateClient = 
new QueryableStateClient(tmHostname, proxyPort)

一个完整用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
object TemperatureDashboard {

// assume local setup and TM runs on same machine as client
val proxyHost = "127.0.0.1"
val proxyPort = 9069

// jobId of running QueryableStateJob.
// can be looked up in logs of running job or the web UI
val jobId = "d2447b1a5e0d952c372064c886d2220a"

// how many sensors to query
val numSensors = 5
// how often to query the state
val refreshInterval = 10000

def main(args: Array[String]): Unit = {
// configure client with host and port of queryable state proxy
val client = new QueryableStateClient(proxyHost, proxyPort)

val futures = new Array[
CompletableFuture[ValueState[(String, Double)]]](numSensors)
val results = new Array[Double](numSensors)

// print header line of dashboard table
val header =
(for (i <- 0 until numSensors) yield "sensor_" + (i + 1))
.mkString("\t| ")
println(header)

// loop forever
while (true) {
// send out async queries
for (i <- 0 until numSensors) {
futures(i) = queryState("sensor_" + (i + 1), client)
}
// wait for results
for (i <- 0 until numSensors) {
results(i) = futures(i).get().value()._2
}
// print result
val line = results.map(t => f"$t%1.3f").mkString("\t| ")
println(line)

// wait to send out next queries
Thread.sleep(refreshInterval)
}
client.shutdownAndWait()
}

def queryState(
key: String,
client: QueryableStateClient)
: CompletableFuture[ValueState[(String, Double)]] = {

client
.getKvState[String, ValueState[(String, Double)], (String, Double)](
JobID.fromHexString(jobId),
"maxTemperature",
key,
Types.STRING,
new ValueStateDescriptor[(String, Double)](
"", // state name not relevant here
createTypeInformation[(String, Double)]))
}
}

(To Be Continued…)