flink 水印 迟到的数据
Flink水印基于事件时间。所谓事件时间,就是Flink DataStream中的数据元素自身带有的、在其实际发生时记录的时间戳,具有业务含义,并与系统时间独立。很显然,由于外部系统产生的数据往往不能及时,不能按序到达Flink系统,所以事件时间比处理时间有更强的不可预测性。为了能够准确地表达事件时间的处理进度,就必须用到水印。
Flink水印的本质是DataStream中的一种特殊元素,每个水印都携带有一个时间戳。当时间戳为T的水印出现时,表示事件时间t <= T的数据都已经到达,即水印后面应该只能流入事件时间t > T的数据。也就是说,水印是Flink判断迟到数据的标准,同时也是窗口触发的标记。
为了形象地说明水印的作用,参考一下下面的图,是一个乱序的基于事件时间的数据流示例。
图中的方框就是数据元素,其中的数字表示事件时间,W(x)就表示时间戳是x的水印,并有长度为4个时间单位的滚动窗口。假设时间单位为秒,可见事件时间为2s、3s、1s的元素都会进入区间为[1s, 4s]的窗口,而事件时间为7s的元素会进入区间为[5s, 8s]的窗口。当水印W(4)到达时,表示已经没有t <= 4s的元素了,[1s, 4s]窗口会被触发并计算。同理,水印W(9)到达时,[5s, 8s]窗口会被触发并计算,以此类推。
不过图中暂时没有示出迟到数据。如果事件时间为6的元素出现在W(9)后面,就算是迟到了。迟到数据的处理后面再说。
迟到数据处理
如上所述,水印的乱序区间能够保证一些迟到数据不被丢弃,但是乱序区间往往不很长,那些真正迟到了的数据该怎么办呢?有两种方法来兜底,可以说是Flink为迟到数据提供的第二重保障。
窗口允许延迟
Flink提供了WindowedStream.allowedLateness()方法来设定窗口的允许延迟。也就是说,正常情况下窗口触发计算完成之后就会被销毁,但是设定了允许延迟之后,窗口会等待allowedLateness的时长后再销毁。在该区间内的迟到数据仍然可以进入窗口中,并触发新的计算。当然,窗口也是吃资源大户,所以allowedLateness的值要适当。给个完整的代码示例如下。
sourceStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(UserActionRecord record) {
return record.getTimestamp();
}
}
)
.keyBy("platform")
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(30))
.aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())
// ......
allowedLateness机制实际上就是DataFlow模型中的回填(backfill)策略的实现。对于滑动窗口和滚动窗口是累积(accumulating)策略,对于会话窗口则是累积与回撤(accumulating & retracting)策略。
侧输出迟到数据
侧输出(side output)是Flink的分流机制。迟到数据本身可以当做特殊的流,我们通过调用WindowedStream.sideOutputLateData()方法将迟到数据发送到指定OutputTag的侧输出流里去,再进行下一步处理(比如存到外部存储或消息队列)。代码如下。
// 侧输出的OutputTag
OutputTag<UserActionRecord> lateOutputTag = new OutputTag<>("late_data_output_tag");
sourceStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(UserActionRecord record) {
return record.getTimestamp();
}
}
)
.keyBy("platform")
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(30))
.sideOutputLateData(lateOutputTag) // 侧输出
.aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())
// ......
// 获取迟到数据并写入对应Sink
stream.getSideOutput(lateOutputTag).addSink(lateDataSink);