flink水印就是一个时间戳

flink水印就是一个时间戳,它给每个消息添加一个允许一定延迟的时间戳。窗口可以继续计算一定时间范围内延迟的消息,添加水印后,窗口会等 n 秒,再执行计算。若超过 n 秒,则舍弃。窗口执行计算时间由 水印时间 来触发,当接收到消息的 watermark >= endtime ,触发计算。Flink提供添加水印的API如下所示:

val watermarkData: DataStream[Message] =
clicklogDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message]{

   var currentTimestamp: Long = 0L
   val maxDelayTime = 5000L
   var watermark: Watermark = null

   // 获取当前的水印
   override def getCurrentWatermark = {
    watermark = new Watermark(currentTimestamp - maxDelayTime)
    watermark
  }

   // 时间戳抽取操作
   override def extractTimestamp(t: Message, l: Long) = {
    val timeStamp = t.timestamp
    currentTimestamp = Math.max(timeStamp, currentTimestamp)
    currentTimestamp
  }

 })

标签: none

添加新评论