kafka flink timestamp 事件时间和水印

kafka flink timestamp Event time and watermark

我正在阅读《使用 Apache Flink 进行流处理》一书,其中指出“从 0.10.0 版开始,Kafka 支持消息时间戳。当从 Kafka 版本 0.10 或更高版本读取时,如果应用程序以事件时间模式运行,消费者将自动提取消息时间戳作为事件时间时间戳*” 那么在 processElement 函数中调用 context.timestamp() 将默认 return kafka 消息时间戳? 能否请您提供一个简单的示例,说明如何实现 AssignerWithPeriodicWatermarks/AssignerWithPunctuatedWatermarks 根据消耗的 kafka 消息时间戳提取(并构建水印)。

如果我使用 TimeCharacteristic.ProcessingTime,ctx.timestamp() return 的处理时间是否会和 context.timerService().currentProcessingTime() 类似。

谢谢。

Flink Kafka 消费者会为您处理这件事,并将时间戳放在需要的地方。在 Flink 1.11 中,您可以简单地依赖它,尽管您仍然需要注意提供指定 out-of-orderness(或断言时间戳是有序的)的 WatermarkStrategy:

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.assignTimestampsAndWatermarks(
    WatermarkStrategy.
        .forBoundedOutOfOrderness(Duration.ofSeconds(20)));

在 Flink 的早期版本中,您必须提供时间戳分配器的实现,如下所示:

public long extractTimestamp(Long element, long previousElementTimestamp) {
    return previousElementTimestamp;
}

此版本的 extractTimestamp 方法将 StreamRecord 中存在的时间戳的当前值作为 previousElementTimestamp 传递,在这种情况下,它将是 Flink Kafka 消费者放置在那里的时间戳。

Flink 1.11 docs
Flink 1.10 docs

至于使用TimeCharacteristic.ProcessingTimectx.timestamp()返回的是什么,这种情况下returns NULL。 (语义上,是的,就好像时间戳是当前处理时间,但它不是这样实现的。)