设置处理时间

在流处理系统中,时间是一个非常重要的概念。Flink 可以设置下面三种事件时间。

  • ProcessingTime
    ProcessingTime 意味着算子使用数据被处理时所在机器的系统时间
    • 优点:延时低,因为算子不用等待按顺序来的数据
    • 缺点:结果不是非确定的。因为每个窗口的数据内容取决于数据到达的速度。
  • EventTime
    EventTime 指的是算子使用数据本身包含的时间信息作为当前时间。每个数据都携带一个时间戳和水位线。当一个水位通知说在给定的时间范围内所有数据已经到达时,event-time 窗口就会触发计算。
    EventTime 窗口计算出的结果是确定的:结果和数据到达的顺序无关
  • IngestionTime
    IngestionTime 是数据进入流处理器的时间。可以将 IngestionTime 理解为 source operatorProcessingTime

设置时间属性如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
object AverageSensorReadings {

// main() defines and executes the DataStream program
def main(args: Array[String]) {

// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// ingest sensor stream
val sensorData: DataStream[SensorReading] = env.addSource(...)
}
}

Event-Time 应用中的时间戳和水位

在基于 Event-Time 的流处理应用中,每个数据有两个必需的信息:

  • 时间戳
    事件发生的时间
  • 水位
    算子通过水位推断当前的事件时间。
    水位用于通知算子没有比水位更小的时间戳的事件会发生了。

可以通过覆盖 SourceFunction 或者 UDF 来生成并分配时间戳和水位。

DataStream API 提同乐一个 TimestampAssigner 接口来从数据中提取时间戳。

一种最佳实践是:将分配时间戳和生成水位的操作离数据源越近越好,甚至就在 SourceFunction 内部更好

TimestampAssigner 和其他 transformation 算子类似: 作用在一条数据流中的所有数据,并生成一条新的带有时间戳的数据和水位的数据流。

1
2
3
4
5
6
7
8
9
10
val env = StreamExecutionEnvironment.getExecutionEnvironment

// set the event time characteristic
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// ingest sensor stream
val readings: DataStream[SensorReading] = env
.addSource(new SensorSource)
// assign timestamps and generate watermarks
.assignTimestampsAndWatermarks(new MyAssigner())

上面的例子中,MyAssigner 可以继承自 AssignerWithPeriodicWatermarks 或者 AssingerWithPuctuatedWatermarks。区别如下:

AssignerWithPeriodicWatermarks

系统会以一个固定的时间值定期检查 event time 的进展。默认的时间是 200 ms。可以通过 env.getConfig.setAutoWatermarkInterval(interval) 来设置。

1
2
3
4
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// generate watermarks every 5 seconds
env.getConfig.setAutoWatermarkInterval(5000)

在上面的例子中,系统会每隔 5 秒钟检查当前的水位:

  1. Flink 先调用 getCurrentWatermark() 方法;
  2. 如果上述方法返回一个时间戳更大的水位,则将新的水位发送。该步骤可以保证时间事件是持续增长的;
  3. 反之则不发送。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class PeriodicAssigner 
extends AssignerWithPeriodicWatermarks[SensorReading] {

val bound: Long = 60 * 1000 // 1 min in ms
var maxTs: Long = Long.MinValue // the maximum observed timestamp

override def getCurrentWatermark: Watermark = {
// generated watermark with 1 min tolerance
new Watermark(maxTs - bound)
}

override def extractTimestamp(
r: SensorReading,
previousTS: Long): Long = {
// update maximum timestamp
maxTs = maxTs.max(r.timestamp)
// return record timestamp
r.timestamp
}
}

当事件的发生时间是单调递增的时候,可以在 DataStream 上调用 assignAscendingTimestamps 方法,该方法使用当前的时间戳生成水位: 因为不会出现更早的时间戳。

1
2
3
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream
.assignAscendingTimestamps(e => e.getCreationTime)

AssignerWithPunctuatedWatermarks

当水位可以基于数据的某些属性来决定时,Flink 提供了 AssignerWithPunctuatedWatermarks 接口,该接口包含 checkAndGetNextWatermark 方法,这个方法会在每次 extractTimestamp() 方法被调用后调用,它可以决定是否生成一个新的水位。另外,和上面一样,如果该方法返回了一个时间戳更大的水位时,发送一个新的水位。

水位,延时,准确性

水位是一种可以权衡一个流处理应用延时结果准确性的机制:

  • 松水位
    如果水位时间戳远远小于最新事件时间戳,应用的延时会变得比较大,但是准确性会提升。

  • 紧水位
    如果水位时间戳和最新事件时间很接近,应用的延时会比较小,但是准确性会下降。

延时-准确性的权衡是所有流处理系统的一个基本特性。

Process Function

DataStream API 提供了一系列的底层可操控时间戳水位ProcessFunction接口。这些接口还可以向系统注册 timer 用于在将来某些特定时间触发事件。另外,process functions 还可以用于将输出数据发送到多个输出。

ProcessFunction 接口适用于构建自定义逻辑的事件驱动的应用。对于预定义好的时间窗口的操作可能不太适用。

ProcessFunction 可以作用于 DataStreamKeyedStream 上,这个方法在 Stream 里的每个数据上调用。

ProcessFunction 提供了两个通用的方法:

  • processElement(v:IN, ctx: Context, out:Collector[OUT])
    作用在数据流上的每个数据;
    输出数据被发送到 Collector;
    Context 提供了操控当前数据时间戳TimeService的功能;Context 还可以向多个输出流发送数据。

  • onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])
    该方法会在指定的已注册的 timer 触发时被调用;
    timestamp 参数是指定 timer 触发时的时间戳;
    Collector 用于发送结果数据;
    OnTimerContext 提供类似 processElement 中 ctx 相同的功能。

TimerService 和 Timers

ContextOnTimerContext 中的 TimerService 提供了下述方法:

  • currentProcessingTime(): Long
  • currentWatermark(): Long
  • registerProcessingTimeTimer(timestamp: Long): Unit
    注册一个作用于当前 key 的 processing time timer
    这个 timer 会在机器的执行时间和 timestamp 参数匹配时触发执行上述 onTimer() 方法;
  • registerEventTimeTimer(timestamp: Long): Unit功能一样,只是换做了Event Time`
  • deleteProcessingTimeTimer(timestamp: Long): Unit
    删除一个 processing timer,如果没有已注册的,则无效。
  • deleteEventTimeTimer(timestamp: Long): Unit

timer 触发时,onTimer() 会被调用,而且 onTimer()processElement 是被同步(synchronized) 调用的。

timers 只能被用于 keyedStream。

timers 的常用场景是:

  • 在经过某段时间的不活跃后清除一个 key 对应的状态;
  • 或者实现自定义的基于时间的窗口逻辑。

可以在每个数据上创建一个不重要的 key 来将一个普通 Stream 变成 KeyedStream。

触发到边输出

边输出(Side outputs) 是一种可以通过函数将一个输入流的数据处理后触发到多个输出流的机制:

  • 除了主输出流外的边输出流的数量没有限制
  • 每个单独的边输出都用 OutputTag[X] 标记;
  • 一个 ProcessFunction 可以通过 Context将一个数据记录触发到一个或多个边输出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
val monitoredReadings: DataStream[SensorReading] = readings
// monitor stream for readings with freezing temperatures
.process(new FreezingMonitor)

// retrieve and print the freezing alarms
monitoredReadings
.getSideOutput(new OutputTag[String]("freezing-alarms"))
.print()

// print the main output
readings.print()

// =================== //

/** Emits freezing alarms to a side output for readings
* with a temperature below 32F. */

class FreezingMonitor extends ProcessFunction[SensorReading, SensorReading] {

// define a side output tag
lazy val freezingAlarmOutput: OutputTag[String] =
new OutputTag[String]("freezing-alarms")

override def processElement(
r: SensorReading,
ctx: ProcessFunction[SensorReading, SensorReading]#Context,
out: Collector[SensorReading]): Unit = {
// emit freezing alarm if temperature is below 32F.
if (r.temperature < 32.0) {
ctx.output(freezingAlarmOutput, s"Freezing Alarm for ${r.id}")
}
// forward all readings to the regular output
out.collect(r)
}
}

CoProcessFunction

对于两个输入流的操作,DataStream API 提供了 CoProcessFunction

对于每个输入流,CoProcessFunction 都提供了一个转换函数:

  • processElement1()
  • processElement2()

同样的,CoProcessFunction 还提供了 TimerService 等相关接口。

窗口算子

在流处理系统中,窗口是很常见的概念。窗口算子可以在无限流数据中划分出一个有限范围,然后做各种处理。通常情况下,这个有限范围都是基于时间的。

定义窗口算子

窗口算子可以作用在

  • KeyedStream
    并行计算
  • non-keyed Stream
    单线程上计算

要创建一个窗口算子,需要两个组件:

  1. Window Assigner
    决定了数据流中的数据是如何被划分到窗口中的,并且返回一个(keyed) WindowedStream 或者 (non-keyed) AllWindowedStream
  2. Window Function
    作用于 WindowedStream 或者 AllWindowedStream上,然后处理窗口中的元素。

如下面代码所示:

1
2
3
4
5
6
7
8
9
10
// define a keyed window operator
stream
.keyBy(...)
.window(...) // specify the window assigner
.reduce/aggregate/process(...) // specify the window function

// define a non-keyed window-all operator
stream
.windowAll(...) // specify the window assigner
.reduce/aggregate/process(...) // specify the window function

内置的 Window Assigners

基于时间的窗口会根据事件时间(event time 或者 processing time)将一个数据分配给某个窗口。每个时间窗口都有一个开始时间戳结束时间戳

所有内置的窗口分配器都会提供一个默认的触发器,一旦时间超过某个窗口的结束时间,触发器就会触发对这个窗口的计算。

Tumbling Windows

TumblingWindow 会将数据放在不重叠,大小固定的窗口中:

Alt text

内置了两个分配器:TumblingEventTimeWindowsTumblingProcessingTimeWindows

1
2
3
4
5
6
7
val sensorData: DataStream[SensorReading] = ...

val avgTemp = sensorData
.keyBy(_.id)
// group readings in 1s event-time windows
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.process(new TemperatureAverager)

一种简写方式是:

1
2
3
4
5
6
val avgTemp = sensorData
.keyBy(_.id)
// shortcut for window.(TumblingEventTimeWindows.of(size))
// timeWindow 方法会根据 timeCharacter 来决定是 EventTimeWindows 还是 ProcessingTimeWindows
.timeWindow(Time.seconds(1))
.process(new TemperatureAverager)

默认情况下,tumbling windows 和纪元时间(1970-01-01-00:00:00.000)对齐,比如,一个大小为一小时的窗口的开始和结束时间为: 00:00:00, 01:0:00, 02:00:00 等。

所以,分配器还提供了第二个参数,可以控制偏移量:

1
2
3
4
5
6
val avgTemp = sensorData
.keyBy(_.id)
// group readings in 1 hour windows with 15 min offset
// 00:15:00, 01:15:00, 02:15:00 ...
.window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))
.process(new TemperatureAverager

Sliding Windows

SlidingWindow 会将数据分配到长度固定,但可能会重叠的窗口中:

Alt text

对于一个滑动窗口,需要声明两个参数: 窗口大小新窗口生成频率

1
2
3
4
5
6
// event-time sliding windows assigner
val slidingAvgTemp = sensorData
.keyBy(_.id)
// create 1h event-time windows every 15 minutes
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))
.process(new TemperatureAverager)

有两种情况:

  • 当窗口生成频率小于窗口大小时,窗口会重叠,数据可能会被分配给两个窗口;
  • 当窗口生成频率大于窗口大小时,有些数据可能会得不到分配。

也可以传入第三个参数:偏移量

同样的,也可以用快捷方法:timeWindow(windowSize, slideSize) 来表示滑动窗口。

Session Windows

Session Window 将数据分配到 不覆盖长度不固定的窗口中。

Session Window 的边界由不活跃间隔定义:例如,没收到任何数据的时间长度

Alt text
1
2
3
4
5
6
// event-time session windows assigner
val sessionWindows = sensorData
.keyBy(_.id)
// create event-time session windows with a 15 min gap
.window(EventTimeSessionWindows.withGap(Time.minutes(15)))
.process(...)

SessionWindows 会先将每个输入数据分配到单独的窗口,以数据的时间为窗口开始时间,并且以 session gap 为窗口大小,最后会将所有窗口合并。

将函数作用在窗口上

窗口上的函数有两种:

  • 增量聚合函数
    每当有输入数据加入窗口时,函数就作用在其上,并更新一个单独的窗口状态值。
    空间效率非常高,最终会出发一个聚合后的结果。
    ReduceFunctionAggregateFunction 是常见的增量聚合函数。

  • 全窗口函数
    会先收集窗口中的所有数据,然后遍历计算。
    需要更多空间。但是适用于更复杂的逻辑。
    ProcessWindowFunction 是全窗口函数。

ReduceFunction

ReduceFunction 接收两个类型相同的数据,并将其结合输出为一个类型相同的值:单个窗口状态。

当有新数据加入窗口时:该函数被调用,并传入新数据和窗口状态值作为参数。

优点

  • 窗口状态恒定而且数据量小;
  • 接口简单;

缺点

  • 输入和输出结果类型都是相同的。因此通常被限制为简单的聚合方法。
1
2
3
4
5
val minTempPerWindow: DataStream[(String, Double)] = sensorData
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(15))
.reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))

AggregateFunction

增量聚合,窗口状态单一,但是接口更灵活也更复杂:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {

// create a new accumulator to start a new aggregate.
ACC createAccumulator();

// add an input element to the accumulator and return the accumulator.
ACC add(IN value, ACC accumulator);

// merge two accumulators and return the result.
ACC merge(ACC a, ACC b);

// compute the result from the accumulator and return it.
OUT getResult(ACC accumulator);

}

可以看到,输出,输入,中间状态都可以是不同类型

ProcessWindowFunction

有些场景是增量聚合函数很难实现的,比如:取中位数,词频等。

ProcessWindowFunction 可以先收集窗口中所有数据,然后在作用其上计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> 
extends AbstractRichFunction {

// Evaluates the window.
void process(KEY key, Context ctx, Iterable<IN> vals, Collector<OUT> out) throws Exception;

// Deletes any custom per-window state when the window is purged.
public void clear(Context ctx) throws Exception {}

// The context holding window metadata.
public abstract class Context implements Serializable {

// Returns the metadata of the window
public abstract W window();

// Returns the current processing time.
public abstract long currentProcessingTime();

// Returns the current event-time watermark.
public abstract long currentWatermark();

// State accessor for per-window state.
public abstract KeyedStateStore windowState();

// State accessor for per-key global state.
public abstract KeyedStateStore globalState();

// Emits a record to the side output identified by the OutputTag.
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}

值得注意的是,和其他 ProcessFunction 一样,该方法也提供 Context 参数,这个参数除了普通的方法外,还提供窗口的一些原信息,比如:开始时间,结束时间,以及每个窗口的状态 windowState()每个 key 的总状态 globalState()

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// output the lowest and highest temperature reading every 5 seconds
val minMaxTempPerWindow: DataStream[MinMaxTemp] = sensorData
.keyBy(_.id)
.timeWindow(Time.seconds(5))
.process(new HighAndLowTempProcessFunction)

// ========= //

case class MinMaxTemp(id: String, min: Double, max:Double, endTs: Long)

/**
* A ProcessWindowFunction that computes the lowest and highest temperature
* reading per window and emits a them together with the
* end timestamp of the window.
*/
class HighAndLowTempProcessFunction
extends ProcessWindowFunction[SensorReading, MinMaxTemp, String, TimeWindow] {

override def process(
key: String,
ctx: Context,
vals: Iterable[SensorReading],
out: Collector[MinMaxTemp]): Unit = {

val temps = vals.map(_.temperature)
val windowEnd = ctx.window.getEnd

out.collect(MinMaxTemp(key, temps.min, temps.max, windowEnd))
}
}

增量聚合 和 ProcessWindowFunction

有这样的场景:窗口函数可以被表达为增量聚合函数,但同时也想获取窗口的元信息

在这种场景下,可以将两者结合:

  • 当数据进入窗口后,立即被增量聚合函数处理;
  • 当触发器触发时,聚合结果被交给 ProcessWindowFunction;
  • ProcessWindowFunction.process 方法的 vals: Iterable 参数只有一个值:上述的聚合结果。

在 DataStream API 中,调用方法为:

1
2
3
4
5
6
input
.keyBy(...)
.timeWindow(...)
// 或者 .aggregate
.reduce(incrAggregator: ReduceFunction[IN],
function: ProcessWindowFunction[IN, OUT, K, W])

上面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
val minMaxTempPerWindow2: DataStream[MinMaxTemp] = sensorData
.map(r => (r.id, r.temperature, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
.reduce(
// incrementally compute min and max temperature
(r1: (String, Double, Double), r2: (String, Double, Double)) => {
(r1._1, r1._2.min(r2._2), r1._3.max(r2._3))
},
// finalize result in ProcessWindowFunction
new AssignWindowEndProcessFunction()
)

// ========= //

case class MinMaxTemp(id: String, min: Double, max:Double, endTs: Long)

class AssignWindowEndProcessFunction
extends ProcessWindowFunction[(String, Double, Double), MinMaxTemp, String, TimeWindow] {

override def process(
key: String,
ctx: Context,
minMaxIt: Iterable[(String, Double, Double)],
out: Collector[MinMaxTemp]): Unit = {

val minMax = minMaxIt.head
val windowEnd = ctx.window.getEnd
out.collect(MinMaxTemp(key, minMax._2, minMax._3, windowEnd))
}
}

自定义窗口算子

自定义窗口算子可以解决一些内置窗口无法解决的问题:

  • 在窗口结束前发送结果;
  • 当有迟到的数据进入窗口后,更新相应的状态;
  • 当遇到特殊的数据条目时,开始或者结束一个窗口。

通过定义 assingers, triggers, evictorswindow functions 可以定义一个窗口算子并处理其中的元素:

  1. 当有数据到达窗口算子,被 WindowAssigner 处理。这个方法决定了数据被分配到哪个窗口,如果这个窗口不存在,那么就会被新创建;
  2. 如果窗口函数增量聚合函数,那么新数据会被立即聚合,计算结果被存储为窗口的内容;反之,如果是全窗口函数,新元素会被追加到ListState用于保存窗口所有的函数。
  3. 当数据被分配给一个窗口时,同时也被传给窗口的Trigger
    触发器定义了一个窗口何时被认为准备好评估以及何时被关闭清除内容。
    触发器的定义可以基于窗口中元素的数量注册的timer
    当触发器触发时,其具体行为由注册的窗口函数决定:

    • 如果是增量聚合函数,当前的聚合结果立即被触发。
      Alt text
    • 如果是全窗口函数
      Alt text
    • 如果是混合型:
      Alt text
  4. Evictor 是一个可选项,用于清除一个窗口中所有的元素,由于需要遍历,所有只能作用在全窗口函数上。

下例描述了如何定义:

1
2
3
4
5
6
7
8
9
stream
.keyBy(...)
.window(...) // specify the window assigner
// optional: specify the trigger
// window 分配器中默认都有一个 trigger
// 如果显式地声明了 trigger,则会覆盖默认的
[.trigger(...)]
[.evictor(...)] // optional: specify the evictor
.reduce/aggregate/process(...) // specify the window function

窗口的生命周期

  1. 创建

    WindowAssigner 分配第一个元素给窗口时,被创建。因此,不存在一个元素都不存在的窗口。一个窗口包含不同的状态

    • 窗口内容
      窗口内容包含了被分配的所有元素,或者聚合结果(如果是增量聚合函数)。
    • 窗口对象
      窗口对象保存了可以分辨不同窗口的信息。每个窗口对象都有一个结束时间戳用于定义窗口可以被删除的时间。
    • 触发器的 timer
      一个 Trigger 可以注册定时器用于在特定时间被回调:评估窗口或者清除窗口内容。
      由窗口算子维护。
    • 触发器中自定义的状态
      可以自定义每个窗口中的状态或者每个key对应的状态。
      触发器维护。
  2. 删除

    当时间到达窗口的结束时间时,窗口被删除。
    当窗口被删除后,窗口算子会清楚这个窗口的窗口内容,并丢弃窗口对象。
    触发器状态和定时器不会被清除,因为它们是由窗口算子维护。
    所以,需要调用 Trigger.clear() 来清除状态以防止状态泄露。

窗口分配器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/** A custom window that groups events into 30 second tumbling windows. */
class ThirtySecondsWindows
extends WindowAssigner[Object, TimeWindow] {

val windowSize: Long = 30 * 1000L

override def assignWindows(
o: Object,
ts: Long,
ctx: WindowAssigner.WindowAssignerContext): java.util.List[TimeWindow] = {

// rounding down by 30 seconds
val startTime = ts - (ts % windowSize)
val endTime = startTime + windowSize
// emitting the corresponding time window
Collections.singletonList(new TimeWindow(startTime, endTime))
}

override def getDefaultTrigger(
env: environment.StreamExecutionEnvironment): Trigger[Object, TimeWindow] = {
EventTimeTrigger.create()
}

override def getWindowSerializer(
executionConfig: ExecutionConfig): TypeSerializer[TimeWindow] = {
new TimeWindow.Serializer
}

override def isEventTime = true
}

Triggers

触发器可以基于数据特性或者时间特性来决定计算一个窗口。

触发器可以接触到窗口的元数据,包括时间,定时器,状态等。

触发器可以做到如下述场景:

  • 当窗口接收到固定数量的数据;
  • 某个特定的数据到达窗口;
  • 在当前时间未达到水位时触发计算返回结果(可以降低时延);

等等。

没当trigger被调用时,会返回一个 TriggerResult,用于决定窗口的动作。

  • CONTINUE: 不做任何操作;
  • FIRE: 如果窗口函数是 增量聚合函数,那么这个函数被调用,结果被触发;窗口状态不变;
  • PURGE: 清除窗口内容和状态;
  • FIRE_AND_PURGE: 先 FIREPURGE

Evictors

Evictors 可以在窗口函数被计算后或者计算前清除其中的元素。

接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public interface Evictor<T, W extends Window> extends Serializable {

// Optionally evicts elements. Called before windowing function.
void evictBefore(
Iterable<TimestampedValue<T>> elements,
int size,
W window,
EvictorContext evictorContext);

// Optionally evicts elements. Called after windowing function.
void evictAfter(
Iterable<TimestampedValue<T>> elements,
int size,
W window,
EvictorContext evictorContext);

// A context object that is given to Evictor methods.
interface EvictorContext {

// Returns the current processing time.
long getCurrentProcessingTime();

// Returns the current event time watermark.
long getCurrentWatermark();
}

根据时间 Join 数据流

常见场景:Join 两个数据流中的数据

Flink 内置了两个处理这种场景的算子。

Interval Join

Window Join

处理迟到的数据

Flink 基于水位来处理 event time 的进程。正如上面讨论的,总会有迟到的事件(事件时间小于水位)在事后才到达。

有以下几种处理方式:

丢弃迟到的数据

最简单的办法。

也是 EventTime 窗口的默认方法。

ProcessFunction 可以通过对比事件时间和当前水位决定是否丢弃。

重定向迟到的数据

通过 SideOutput 将迟到的数据重定向到另一个数据流,然后再取出来做业务上的处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// define a side output tag
val lateReadingsOutput: OutputTag[SensorReading] =
new OutputTag[SensorReading]("late-readings")

// =================== //

val readings: DataStream[SensorReading] = ???

val filteredReadings: DataStream[SensorReading] = readings
.process(new LateReadingsFilter)

// retrieve late readings
val lateReadings: DataStream[SensorReading] = filteredReadings
.getSideOutput(lateReadingsOutput)

// =================== //
/** A ProcessFunction that filters out late sensor readings and
* re-directs them to a side output */
class LateReadingsFilter
extends ProcessFunction[SensorReading, SensorReading] {

override def processElement(
r: SensorReading,
ctx: ProcessFunction[SensorReading, SensorReading]#Context,
out: Collector[SensorReading]): Unit = {

// compare record timestamp with current watermark
if (r.timestamp < ctx.timerService().currentWatermark()) {
// this is a late reading => redirect it to the side output
ctx.output(lateReadingsOutput, r)
} else {
out.collect(r)
}
}
}

包含迟到的数据更新结果

将迟到的数据包括进来,然后重新计算一次。

问题多多:

  • 为了重新计算所有的数据,算子必须保存所有的状态;
  • 下游算子也要重新计算,甚至 Sink 到磁盘的算子也要覆盖;

Flink 给窗口算子提供了一个允许的迟到时间: allowedLateness

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
val readings: DataStream[SensorReading] = ???

val countPer10Secs: DataStream[(String, Long, Int, String)] = readings
.keyBy(_.id)
.timeWindow(Time.seconds(10))
// process late readings for 5 additional seconds
.allowedLateness(Time.seconds(5))
// count readings and update results if late readings arrive
.process(new UpdatingWindowCountFunction)

// =================== //

/** A counting WindowProcessFunction that distinguishes between
* first results and updates. */
class UpdatingWindowCountFunction
extends ProcessWindowFunction[SensorReading, (String, Long, Int, String), String, TimeWindow] {

override def process(
id: String,
ctx: Context,
elements: Iterable[SensorReading],
out: Collector[(String, Long, Int, String)]): Unit = {
:
// count the number of readings
val cnt = elements.count(_ => true)

// state to check if this is the first evaluation of the window or not.
val isUpdate = ctx.windowState.getState(
new ValueStateDescriptor[Boolean]("isUpdate", Types.of[Boolean]))

if (!isUpdate.value()) {
// first evaluation, emit first result
out.collect((id, ctx.window.getEnd, cnt, "first"))
isUpdate.update(true)
} else {
// not the first evaluation, emit an update
out.collect((id, ctx.window.getEnd, cnt, "update"))
}
}
}