前提

先上一段例子代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/** Object that defines the DataStream program in the main() method */
object AverageSensorReadings {

/** main() defines and executes the DataStream program */
def main(args: Array[String]) {

// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.enableCheckpointing(
Constants.TEN_SECONDS,
CheckpointingMode.AT_LEAST_ONCE
)

// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// configure watermark interval
env.getConfig.setAutoWatermarkInterval(1000L)

// ingest sensor stream
val sensorData: DataStream[SensorReading] = env
// SensorSource generates random temperature readings
.addSource(new SensorSource)
// assign timestamps and watermarks which are required for event time
.assignTimestampsAndWatermarks(new SensorTimeAssigner)

val avgTemp: DataStream[SensorReading] = sensorData
// convert Fahrenheit to Celsius using an inlined map function
.map(
r =>
SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0))
)
// organize stream by sensorId
.keyBy(_.id)
// group readings in 1 second windows
.timeWindow(Time.seconds(1))
// compute average temperature using a user-defined function
.apply(new TemperatureAverager)

// print result stream to standard out
avgTemp.print()

// execute application
env.execute("Compute average sensor temperature")
}
}

/** User-defined WindowFunction to compute the average temperature of SensorReadings */
class TemperatureAverager
extends WindowFunction[SensorReading, SensorReading, String, TimeWindow] {

/** apply() is invoked once for each window */
override def apply(
sensorId: String,
window: TimeWindow,
vals: Iterable[SensorReading],
out: Collector[SensorReading]
): Unit = {

println(s"[${classOf[TemperatureAverager].getName}]: apply called")

// compute the average temperature
val (cnt, sum) =
vals.foldLeft((0, 0.0))((c, r) => (c._1 + 1, c._2 + r.temperature))
val avgTemp = sum / cnt

// emit a SensorReading with the average temperature
out.collect(SensorReading(sensorId, window.getEnd, avgTemp))
}
}

概述

一个典型的 Flink 流数据应用的结构由以下几个部分组成:

  • 设置执行环境 (execution environment)
  • 从一个或多个流读数据源
  • 应用 streaming transformations 来实现应用逻辑
  • 选择性地将结果输出到一个或多个数据 sink
  • 执行(Execute)程序

下面详述。

设置执行环境

一个 Flink 应用的第一步是设置应用的执行环境

执行环境决定了程序是以本地模式还是集群模式运行。在 DataStream API 中,执行环境是 StreamExecutionEnvironment。一般情况下,通过调用 StreamExecutionEnvironment.getExecutionEnvironment 就能拿到当前执行环境,如果这个方法是通过向集群提交任务的客户端调用的,那么就会返回一个远程执行环境,否则就会返回一个本地执行环境。

也可以通过相应的接口显式的创建远程或者本地执行环境:

1
2
3
4
5
6
7
8
// create a local stream execution environment
val localEnv: StreamExecutionEnvironment.createLocalEnvironment()

// create a remote stream execution environment
val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
"host", // hostname of JobManager
1234, // port of JobManager process
"path/to/jarFile.jar) // JAR file to ship to the JobManager

然后通过 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 设置应用使用 event-time 的处理时间模式。

此外,还可以通过 env 设置各种运行时参数,包括 checkpoint, parallelism 等之类的。

读一个输入流

StreamExecutionEnvironment 提供了各种方法从输入流读入数据。

在上述实例代码中,我们通过 env.addSource(new SensorSource) 来将应用连接到 SensorSource,并创建一个类型为 SensorReadingDataStream

然后通过 assignTimestampsAndWatermarks(new SensorTimeAssigner) 来分配时间戳和水位。

注意,对于使用 event-time 时间模式的 FLink 应用,assignTimestampsAndWatermarks 方法是必须要手动设置的。

执行 transformations

有些 transformation 操作会产生一个新的 DataStream.
有些不会修改原 DataStream 中的数据记录,但是会通过分区分组重新组织 DataStream 中的数据。

一个应用的逻辑是通过 transformation 链 来定义的。

在上述例子中,

  1. 我们通过一个 map() transformation,来转换每个数据记录;
  2. 使用 keyBy() transformation 来将 SensorReading 依据 sendorID 分区。
  3. 然后,使用一个 timeWindow() transformation 来将属于同一 sensorID 的 SensorReading 分组,分组规则是长度为 5 秒的 tumbling time window。
  4. 最后,我们定义了一个 UDF 来计算每个 window 中的所有数据的平均温度。

输出结果

流式应用通常会将计算结果发送到一些外部的系统,比如:Kafka, HDFS, MySQL 等。Flink 提供了一些维护很好的 stream sinks。

另外,也有一种情况是不发送数据,而是把结果内部保存并提供 Flink 的状态查询服务。

在上述示例中,结果直接打印在 STD_OUT 了。

执行

当这个应用被完全定义好了后,可以通过调用 StreamExecutionEnvironment.execute() 来执行整个应用。

Flink 应用都是 lazy 执行的:

  • 在调用 execute() 方法之前,执行环境会构建一个执行计划,这个计划包括了从数据源开始到作用在这些数据源上所有的 transformations。
  • 直到调用 execute() 方法,应用开始真正执行这个计划。

数据类型

在 Flink 中,类型信息在序列化, 反序列化, 比较器, 算子状态管理中都是必须的:比如,DataStream 中的数据记录需要被序列化,这样才能跨网络传输或者持久化到存储系统(比如 checkpointing 过程)。

支持的数据类型

在 Flink 中最常用的数据类型可以分为以下几类:

  • Primitives
    所有 Java 和 Scala 原生类型都是支持的。
  • Java 和 Scala Tuple
  • Scala case classes
  • POJOs
  • Flink Value types
  • 一些特殊类型

这种类型要实现 org.apache.flink.types.Value 这个接口: read() 方法和 write() 方法。用这两个方法来实现序列化和反序列化。

Flink 内置了一些 Value 类型: IntValue, DoubleValue, StringValue等,用于提供 Java 和 Scala 的不可变的原生类型的 可变替代者

特殊类型

比如

  • Scala 的 Either, Option, Try
  • Java Enum
  • Hadoop 的 Writable

类型提示

大部分情况下,Flink 可以自动推断数据类型,然后选择合适的序列化方法,但如果有的算子方法的返回值是个泛型变量,就需要显式的告诉 Flink 这个类型:使用 returns() 方法:

1
2
3
DataStream<Integer> result = input
.flatMap(new MyFlatMapFunction<String, Integer>())
.returns(new TypeHint<Integer>(){});

TypeInformation

Flink 类型系统的基础是 TypeInformation 类。用于选择正确的序列化方式。
比如,在 join 和 group by 的时候,TypeInformation 用于检查 key 是否可用。

基础类型被映射为单个字段:

  • tuples 和 case classes 被映射为和 class 字段一样多的字段;
  • 由于这种机制必须对所有类型都适用,所以对于变长的类型比如 各种集合,会被映射为一个字段。

Transformations

编写一个流式应用可基本上可以看做是将各种 transformatins 组合在一起形成一个 数据流图。

大部分的 transformations 都是 UDF。

UDF 都是一些实现了具体的 Function 接口 的类。由于大部分这些接口都是 SAM(single abstract method),所以可以直接实现一个相应的匿名函数而不是一个类。

大体来说,transformation API 可以分为四类:

  1. Basic transformation
    对每个数据的基本转换;
  2. KeyedStream transformations
    作用在有一个 Key 的数据流上;
  3. Multi-stream transformations
    合并多个数据流到一个或者将一个数据流分列为多个;
  4. Partitioning transformation
    将刘数据重新组织

Basic Transformations

常用的 Basic Transformation 包括但不限于:

  • filter
  • map
  • flatMap

KeyedStream Transformations

一种常见的需求是:想要将共享某个属性的所有数据分组一起处理,Flink 提供了 KeyedStream

当使用基于 KeyedStream 的状态管理时,共享同一个 key 的数据可以使用同一个状态。

keyBy(T=>K): KeyedStream[T, K]

keyBy transformation 可以将一个 DataStream 通过一个 key 转换为 KeyedStream

  • 拥有相同 key 的数据一定被分配到同一个分区中;
  • 拥有不同 key 的数据可能也被分配到同一个分区中。

因此,一个分区可能包含多个逻辑子数据流,每个子数据流都拥有相同的 Key.
如图所示:

Alt text

滚动聚合 [KeyedStream => DataStream]

滚动聚合 transformations包括 (sum, min, max 等) 作用在 KeyedStream 上并且产生 聚合结果数据流。

对每个输入数据,滚动聚合算子会更新相应的字段的值并将更新后的整个数据发送出去,其他的字段就以第一个收到的值为最终输出值。

每个滚动聚合算子方法接收的参数是想要被聚合的字段。

reduce[KeyedStream => DataStream]

reduce transformations 是滚动聚合的抽象:

在 KeyedStrream 上应用一个 UDF,这个 UDF 会将每个输入数据和当前聚合后的值结合。

reduce transformations 不会改变 stream 的类型。

reduce transformations 实际上是实现了 ReduceFunction 接口:

1
2
ReduceFunction[T]
> reduce(T, T): T

Multi-Stream Transformations

有些应用需要从多个数据源摄入数据并统一处理的,也有应用需要从一个摄入一个数据源并拆分成多个输出的。

Union[DataStream => DataStream]

Union 会将多个输入数据源合并为一个输出。

Alt text

connect, coMap, coFlatMap[ConnectedStreams => DataStream]

有时需要联合两个类型不同的输入数据流。常见的需求有:join 两个数据流的数据。

DataStream.connect() 方法接收一个 DataStream 并返回一个 ConnectedStreams 对象,代表被连接的数据流:

1
2
3
4
5
6
7
// first stream
val first: DataStream[Int] = ...
// second stream
val second: DataStream[String] = ...

// connect streams
val connected: ConnectedStreams[Int, String] = first.connect(second)

为了在多个数据流之间建立联系,可以在 ConnectedStream 上使用 keyBy()broadcast()

1
2
3
4
5
6
7
8
9
10
11
12
13
// first stream
val first: DataStream[(Int, Long)] = ...
// second stream
val second: DataStream[(Int, String)] = ...

// connect streams with keyBy
val keyedConnect: ConnectedStreams[(Int, Long), (Int, String)] = first
.connect(second)
.keyBy(0, 0) // key both input streams on first attribute

// connect streams with broadcast
val keyedConnect: ConnectedStreams[(Int, Long), (Int, String)] = first
.connect(second.broadcast()) // broadcast second input stream

Split[DataStream => SplitStream], Select[SplitStream => DataStream]

Split 方法和 union 相反。对于一个输入数据流,可以被分为 0 个或多个输出数据流,因此 split 也可以被当做 filter 使用。

如图所示:

Alt text

DataStream.split() 方法接收一个 OutputSelector 参数,OutputSelector 定义了一个 select() 方法,用于定义一个数据流元素是怎么被分配到输出的,返回一个 Iterable[String],这些 String 代表输出的名称。

split() 方法返回一个 SplitStreamSplitStream 提供了一个 select 方法,接收刚才说的, String 用于选择一个或者多个输出名称。

1
2
3
4
5
6
7
8
val inputStream: DataStream[(Int, String)] = ...

val splitted: SplitStream[(Int, String)] = inputStream
.split(t => if (t._1 > 1000) Seq("large") else Seq("small"))

val large: DataStream[(Int, String)] = splitted.select("large")
val small: DataStream[(Int, String)] = splitted.select("small")
val all: DataStream[(Int, String)] = splitted.select("small", "large")

Partitioning Transformations

当使用 DataStream API 时,系统会自动选择数据分区策略。但有时候也需要手动控制分区策略:比如当我们知道某个分区有数据倾斜的时候。

注意,keyBy 不在这个讨论范围,因为 keyBy 返回一个 KeyedStream 而本节讨论的都是返回 DataStream 的方法。

random()

random 方法将数据根据均匀分布随机讲数据分布到下一个任务。

round-robin

rebalance 方法以 round-robin 的方式将数据依次发送给下一个任务。

rescale

rescalerebalance 类似,也可以将数据以 round-robin 的方式发送给下游任务,不同点是:rebalance 会将数据发送发送给所有下游任务,而 rescale 只将数据发送给部分下游任务:

Alt text
Alt text

broadcast

broadcast() 方法会复制所有的输入数据,然后将所有数据都发送给所有下游任务。

custom

当所有分区策略都不满意时,可以使用 partitionCustom() 方法使用自定义的分区策略。

partitionCustom() 方法接收一个 Partitioner 对象,这个对象实现了具体的分区逻辑:

1
2
3
4
5
6
7
8
9
10
11

val numbers: DataStream[(Int)] = ...
numbers.partitionCustom(myPartitioner, 0)

object myPartitioner extends Partitioner[Int] {
val r = scala.util.Random

override def partition(key: Int, numPartitions: Int): Int = {
if (key < 0) 0 else r.nextInt(numPartitions)
}
}

设置并发度

在 Flink 应用中,每个算子都被分成一个或多个并行的任务,每个任务都处理数据流的一部分数据。一个算子并行处理的任务个数被称为这个算子的并行度

StreamExecutionEnvironment.setParallelim() 方法可以设置应用中所有算子的默认并发度。

也可以在每个算子上设置具体的并行度用以覆盖默认的:

1
2
3
4
5
6
7
8
9
10
11
12
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// set default parallelism to 4
env.setParallelism(4)

// the source has parallelism 4
val result: = env.addSource(new CustomSource)
// set the map parallelism to 8
.map(new MyMapper).setParallelism(8)
// set the print sink parallelism to 2
.print().setParallelism(2)

UDFs

匿名函数

当没有复杂的需求时(比如状态管理),直接使用匿名函数是非常方便的。

1
2
3
4
val tweets: DataStream[String] = ...
// a filter lambda function that checks if tweets contains the
// word “flink”
val flinkTweets = tweets.filter(_.contains("flink"))

Rich Functiions Classes

Rich Functions 在实现逻辑的基础上附加了 初始化销毁 的功能。

1
2
3
4
5
6
val flinkTweets = tweets.filter(
new RichFilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains(“flink”)
}
})

所有内置的 transformation 都有 Rich Function 版本:RichFilterFunction, RichMapFunction 等等。

注意:Flink 会将所有 UDFs 用 Java Serialization 序列化,然后将它们分布到 worker 进程。在 UDF 中的所有东西都必须可序列化!

在使用 Rich Functions 时,可以:

  • 使用 open() 方法来做一些初始化操作。这个方法只在每个任务最开始调用一次,在正式 transformation 之前。
  • 使用 close() 方法来做 finalization 操作,这个操作只在每个任务处理完所有 transform 操作后被调用一次。通常用于释放资源。

注意,这两个方法的 Configuration 参数都只在 DataSet API 中才有用。

另外,Rich Function 还提供了 getRuntimeContext() 方法来获取operator 的 RuntimeContext。运行时上下文提供了包括:算子并行度,子任务索引,UDF 名称等信息。另外,还提供了获取分区状态的方法。

Global Conf

opengetRuntimeContext 方法还可以获取一些全部配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def main(args: Array[String]) : Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

// create a configuration object
val conf = new Configuration()

// set the parameter “keyWord” to “flink”
conf.setString("keyWord", "flink")

// set the configuration as global
env.getConfig.setGlobalJobParameters(conf)

// create some data
val input: DataStream[String] = env.fromElements(
"I love flink", "bananas", "apples", "flinky")

// filter the input stream and print it to stdout
input.filter(new MyFilterFunction).print()

env.execute()
}

class MyFilterFunction extends RichFilterFunction[String] {
var keyWord = ""

override def open(configuration: Configuration): Unit = {
// retrieve the global configuration
val globalParams = getRuntimeContext.getExecutionConfig.getGlobalJobParameters

// cast to a Configuration object
val globConf = globalParams.asInstanceOf[Configuration]

// retrieve the keyWord parameter
keyWord = globConf.getString("keyWord", null)
}

override def filter(value: String): Boolean = {
// use the keyWord parameter to filter out elements
value.contains(keyWord)
}
}

(To Be Continued …)