从具有空闲分区的 kafka 主题消费时,水印生成在 local/standalone 模式下的工作方式是否不同?
Does watermark generation work differently in local/standalone modes when consuming from kafka topic with idle partitions?
我正在考虑这个水印:
class MyWatermarker(val maxTimeLag: Long = 0)
extends AssignerWithPeriodicWatermarks[MyEvent] {
var maxTs: Long = 0
override def extractTimestamp(e: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = e.timestamp
maxTs = maxTs.max(timestamp)
timestamp
}
override def getCurrentWatermark: WatermarkOld = {
println(s"event watermark: ${maxTs - maxTimeLag}")
new WatermarkOld(maxTs - maxTimeLag)
}
底层事件来自kafka源,然后交给流程函数。实现与问题无关,我只分享相关位:
override def processElement(
event: MyEvent,
ctx: KeyedProcessFunction[String, MyEvent, MyEvent]#Context,
out: Collector[StreamEvent]
): Unit = {
println(
s"In process function, got event: $event, ctx.timestamp: ${ctx.timestamp}, currentWatermark: ${ctx.timerService.currentWatermark}"
)
...
}
当我 运行 这个应用程序在一个真正的 kubernetes 集群上使用具有空闲分区的 kafka 源主题时,水印按预期保持为 0:
In process function, got event: xxx, ctx.timestamp: 1601475710619, currentWatermark: 0
我还可以看到水印中生成的这些日志:
event watermark: 1601475710619
event watermark: 0
event watermark: 1601475710619
event watermark: 0
有趣的是,当我在 IntelliJ 上本地 运行 相同的应用程序时,也有相同主题的空闲 kafka 分区时,我也从水印器中获取上述日志,水印在振荡在 0 和最新接收到的元素的 ts 之间,因为 maxLag = 0
。然而,出乎我意料的是,process函数的日志显示水印还在推进:
In process function, got event: xxx, ctx.timestamp: 1601475710619, currentWatermark: 1601475710618
为什么会这样?仅供参考,在这两种情况下,我都使用 Flink 1.10,环境并行度设置为 2,事件时间语义。
如果您要使用 per-partition 水印,您可以通过直接在 Flink Kafka 消费者 [1] 上调用 assignTimestampsAndWatermarks 来实现,那么我相信空闲分区会始终阻止整体水印.
使用 per-partition 水印,每个 Kafka 源任务将分别对其处理的每个分区应用水印,然后发出那些 per-partition 水印中的最小值作为其水印。因此,至少有一个 Kafka 源任务的水印为 0,假设您在水印之后和处理函数之前有一个 keyBy,它将在处理函数处阻止水印。
否则,如果对Kafka源任务的输出应用水印,那么水印任务的水印是否为0取决于它们对应的Kafka源任务是否有任何non-idle分区。如果分区到实例的分配不是确定性的,这可以解释为什么您在 IntelliJ 中看到不同的结果。
请注意,空闲源的处理已在 Flink 1.11 [2] 中进行了重新设计,与此相关的错误修复仍在等待 [3]。
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission.
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#dealing-with-idle-sources.
[3] https://issues.apache.org/jira/browse/FLINK-18934
我正在考虑这个水印:
class MyWatermarker(val maxTimeLag: Long = 0)
extends AssignerWithPeriodicWatermarks[MyEvent] {
var maxTs: Long = 0
override def extractTimestamp(e: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = e.timestamp
maxTs = maxTs.max(timestamp)
timestamp
}
override def getCurrentWatermark: WatermarkOld = {
println(s"event watermark: ${maxTs - maxTimeLag}")
new WatermarkOld(maxTs - maxTimeLag)
}
底层事件来自kafka源,然后交给流程函数。实现与问题无关,我只分享相关位:
override def processElement(
event: MyEvent,
ctx: KeyedProcessFunction[String, MyEvent, MyEvent]#Context,
out: Collector[StreamEvent]
): Unit = {
println(
s"In process function, got event: $event, ctx.timestamp: ${ctx.timestamp}, currentWatermark: ${ctx.timerService.currentWatermark}"
)
...
}
当我 运行 这个应用程序在一个真正的 kubernetes 集群上使用具有空闲分区的 kafka 源主题时,水印按预期保持为 0:
In process function, got event: xxx, ctx.timestamp: 1601475710619, currentWatermark: 0
我还可以看到水印中生成的这些日志:
event watermark: 1601475710619
event watermark: 0
event watermark: 1601475710619
event watermark: 0
有趣的是,当我在 IntelliJ 上本地 运行 相同的应用程序时,也有相同主题的空闲 kafka 分区时,我也从水印器中获取上述日志,水印在振荡在 0 和最新接收到的元素的 ts 之间,因为 maxLag = 0
。然而,出乎我意料的是,process函数的日志显示水印还在推进:
In process function, got event: xxx, ctx.timestamp: 1601475710619, currentWatermark: 1601475710618
为什么会这样?仅供参考,在这两种情况下,我都使用 Flink 1.10,环境并行度设置为 2,事件时间语义。
如果您要使用 per-partition 水印,您可以通过直接在 Flink Kafka 消费者 [1] 上调用 assignTimestampsAndWatermarks 来实现,那么我相信空闲分区会始终阻止整体水印.
使用 per-partition 水印,每个 Kafka 源任务将分别对其处理的每个分区应用水印,然后发出那些 per-partition 水印中的最小值作为其水印。因此,至少有一个 Kafka 源任务的水印为 0,假设您在水印之后和处理函数之前有一个 keyBy,它将在处理函数处阻止水印。
否则,如果对Kafka源任务的输出应用水印,那么水印任务的水印是否为0取决于它们对应的Kafka源任务是否有任何non-idle分区。如果分区到实例的分配不是确定性的,这可以解释为什么您在 IntelliJ 中看到不同的结果。
请注意,空闲源的处理已在 Flink 1.11 [2] 中进行了重新设计,与此相关的错误修复仍在等待 [3]。
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission.
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#dealing-with-idle-sources.
[3] https://issues.apache.org/jira/browse/FLINK-18934