flink 水印 时间戳
flink水印就是一个时间戳
flink水印就是一个时间戳,它给每个消息添加一个允许一定延迟的时间戳。窗口可以继续计算一定时间范围内延迟的消息,添加水印后,窗口会等 n 秒,再执行计算。若超过 n 秒,则舍弃。窗口执行计算时间由 水印时间 来触发,当接收到消息的 watermark >= endtime ,触发计算。Flink提供添加水印的API如下所示:
val watermarkData: DataStream[Message] =
clicklogDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message]{
var currentTimestamp: Long = 0L
val maxDelayTime = 5000L
var watermark: Watermark = null
// 获取当前的水印
override def getCurrentWatermark = {
watermark = new Watermark(currentTimestamp - maxDelayTime)
watermark
}
// 时间戳抽取操作
override def extractTimestamp(t: Message, l: Long) = {
val timeStamp = t.timestamp
currentTimestamp = Math.max(timeStamp, currentTimestamp)
currentTimestamp
}
})