如何真正丢弃迟到的记录?

How to actually discard late records?

这是问题所在:

假设有一个数字流,我想在 1 小时的桶中收集这些数字中的最大值,我允许在给定的桶上延迟最多 3 小时。

这听起来像是 tumbling windows 的实验室案例。

这是我目前的情况:

stream.aggregate(
      () -> 0L,
      (aggKey, value, aggregate) -> Math.max(value, aggregate),
      TimeWindows.of(TimeUnit.HOURS.toMillis(1L)).until(TimeUnit.HOURS.toMillis(3L)),
      Serdes.Long(),
      "my_store"
)

首先,我无法通过测试验证这是否确实发生了。时间戳是通过 TimestampExtractor 提取的,我用 Thread.sleep 模拟延迟(我将 windows 设置为较小的值以进行测试),但是 "late record" 仍然被处理而不是被丢弃。

常规 windows 上似乎很少(没有?)示例。有一个关于 SessionWindows 的集成测试,仅此而已。我对这些概念的理解是否正确?

编辑 2

JUnit 测试示例。由于它相当大,我通过要点分享它。

https://gist.github.com/Hartimer/6018a731753846c1930429716703e5a6

编辑(添加更多代码)

数据点具有时间戳(收集数据的时间)、收集数据的机器的主机名和值。

{
    "collectedAt": 12314124134, // timestamp
    "hostname": "machine-1",
    "reading": 3
}

自定义时间戳提取器用于提取 collectedAt。这是我的管道的更完整表示:

source.map(this::fixKey) // Associates record with a key like "<timestamp>:<hostname>"
  .groupByKey(Serdes.String(), roundDataSerde)
  .aggregate(
          () -> RoundData.EMPTY_ROUND,
          (aggKey, value, aggregate) -> max(value, aggregate),
          TimeWindows.of(TimeUnit.HOURS.toMillis(1L))
                     .until(TimeUnit.SECONDS.toMillis(1L)), // For testing I allow 1 second delay
          roundDataSerde,
          "entries_store"
  )
  .toStream()
  .map(this::simpleRoundDataToAggregate) // Associates record with a key like "<timestamp floored to nearest hour>"
  .groupByKey(aggregateSerde, aggregateSerde)
  .aggregate(
          () -> MyAggregate.EMPTY,
          (aggKey, value, aggregate) -> aggregate.merge(value), // I know this is not idempotent, that's a WIP
          TimeWindows.of(TimeUnit.HOURS.toMillis(1L))
                     .until(TimeUnit.SECONDS.toMillis(1L)), // For testing I allow 1 second delay
          aggregateSerde,
          "result_store"
  )
  .print()

测试的一个片段是

Instant roundId = Instant.now().truncatedTo(ChronoUnit.HOURS).minus(9L, ChronoUnit.HOURS);
    sendRecord("mytopic", roundId, 3);
    sendRecord("mytopic", roundId.plusMillis(15000), 2);

    log.info("Waiting a little before sending more usage. (simulating late record)");
    Thread.sleep(5000L);

    sendRecord("mytopic", roundId.plusMillis(30000), 5);

    // Assert stored value is "3".
    // It actually is 5 because the last round is accounted for

如有任何帮助,我们将不胜感激。

我相信我找到了我自己的问题。归结为 TimestampExtractor 以及我用来评估 "delayed record".

的值

在Kafka Stream术语中,有三个"times"(see here):

  • Event-time: 数据记录时间
  • Processing-time: 流处理器什么时候接收到数据
  • 摄取时间:(与问题无关)

在我的示例中,我实际上是在使用 事件时间 来确定是否有延迟,但这并不代表延迟记录。收集数据的人会将此值设置为他们对时间的本地感知(至少在我的用例中)。

重要日期是处理时间。无论事件何时生成,我们需要多长时间才能接收到该事件。我的聚合已按 "event time".

处理分组

我创建了一个新的 Gist,其中包含现在通过的测试的更新版本。添加了一个额外的字段 receivedAt,模拟 "processing time".

https://gist.github.com/Hartimer/c79569ad517ab95d08dbe8e84bfa6789

认为 Hartimer 的自我回答实际上是不正确的。让我试着解释发生了什么,至少就我自己的知识而言。 :-)

  • 延迟到达的数据是根据通过配置的时间戳提取器为您的应用程序配置的时间语义来处理的。在@Hartimer 的例子中,这是 event-time(这里使用自定义时间戳提取器)。
  • FWIW,在 processing-time 的情况下,根据定义,没有迟到的记录:每个记录都到达 "just in time"。当前 window 中包含一条 "late-arriving" 记录(同样,在此上下文中没有这样的记录),但从未回溯到早期的记录中。
  • 设置 window 保留时间的调用 TimeWindows#until() 是保留时间的 下限 。 Kafka 可能会在 "a bit longer" 左右保持 window(我在这里故意模糊,见下文)而不是配置的保留时间。出于这个原因,@Hartimer 进行的严格测试可能不会产生直观预期的结果。

关于 window 保留时间作为 下限 的幕后实际发生的事情有点棘手(也许超出了这个问题的范围) ,所以除非有具体要求,否则我不会尝试解释这一点。

更新: 此外,问题片段中的这段代码甚至不应该工作,因为它应该抛出 IllegalArgumentException:

TimeWindows.of(TimeUnit.HOURS.toMillis(1L))
           .until(TimeUnit.SECONDS.toMillis(1L))

要求是,对于各自的输入参数,until() >= of()。您不能定义大小为 1 小时但保留时间仅为 1 秒的 window(此处保留必须 >= 1 小时)。

更新 2: 幕后发生的事情是 TimeWindows#until() 的设置用于 create/manage 本地 window 的分段文件] 商店。只要 window 的段存在,就会接受 window 的迟到记录。我将跳过有关段如何 removed/expired 的部分,因为我真的需要为此深入研究代码(我不知道自己的头脑)。