从具有空闲分区的 kafka 主题消费时,水印生成在 local/standalone 模式下的工作方式是否不同?

Does watermark generation work differently in local/standalone modes when consuming from kafka topic with idle partitions?

我正在考虑这个水印:

class MyWatermarker(val maxTimeLag: Long = 0)
    extends AssignerWithPeriodicWatermarks[MyEvent] {
  var maxTs: Long = 0

  override def extractTimestamp(e: MyEvent, previousElementTimestamp: Long): Long = {
    val timestamp = e.timestamp
    maxTs = maxTs.max(timestamp)
    timestamp
  }

  override def getCurrentWatermark: WatermarkOld = {
    println(s"event watermark: ${maxTs - maxTimeLag}")
    new WatermarkOld(maxTs - maxTimeLag)
  }

底层事件来自kafka源,然后交给流程函数。实现与问题无关,我只分享相关位:

  override def processElement(
    event: MyEvent,
    ctx: KeyedProcessFunction[String, MyEvent, MyEvent]#Context,
    out: Collector[StreamEvent]
  ): Unit = {
    println(
      s"In process function, got event: $event, ctx.timestamp: ${ctx.timestamp}, currentWatermark: ${ctx.timerService.currentWatermark}"
    )
  ...
  }

当我 运行 这个应用程序在一个真正的 kubernetes 集群上使用具有空闲分区的 kafka 源主题时,水印按预期保持为 0:

In process function, got event: xxx, ctx.timestamp: 1601475710619, currentWatermark: 0

我还可以看到水印中生成的这些日志:

event watermark: 1601475710619
event watermark: 0
event watermark: 1601475710619
event watermark: 0

有趣的是,当我在 IntelliJ 上本地 运行 相同的应用程序时,也有相同主题的空闲 kafka 分区时,我也从水印器中获取上述日志,水印在振荡在 0 和最新接收到的元素的 ts 之间,因为 maxLag = 0。然而,出乎我意料的是,process函数的日志显示水印还在推进:

In process function, got event: xxx, ctx.timestamp: 1601475710619, currentWatermark: 1601475710618

为什么会这样?仅供参考,在这两种情况下,我都使用 Flink 1.10,环境并行度设置为 2,事件时间语义。

如果您要使用 per-partition 水印,您可以通过直接在 Flink Kafka 消费者 [1] 上调用 assignTimestampsAndWatermarks 来实现,那么我相信空闲分区会始终阻止整体水印.

使用 per-partition 水印,每个 Kafka 源任务将分别对其处理的每个分区应用水印,然后发出那些 per-partition 水印中的最小值作为其水印。因此,至少有一个 Kafka 源任务的水印为 0,假设您在水印之后和处理函数之前有一个 keyBy,它将在处理函数处阻止水印。

否则,如果对Kafka源任务的输出应用水印,那么水印任务的水印是否为0取决于它们对应的Kafka源任务是否有任何non-idle分区。如果分区到实例的分配不是确定性的,这可以解释为什么您在 IntelliJ 中看到不同的结果。

请注意,空闲源的处理已在 Flink 1.11 [2] 中进行了重新设计,与此相关的错误修复仍在等待 [3]。

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission.
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#dealing-with-idle-sources.
[3] https://issues.apache.org/jira/browse/FLINK-18934