Kafka Stream groupBy 行为:许多中间 outputs/updates 用于聚合

Kafka Stream groupBy behavior: many intermediate outputs/updates for an aggregation

我正在尝试使用 Kafka Stream 来聚合 People 的某些属性。

我有这样的 kafka 流测试:

    new ConsumerRecordFactory[Array[Byte], Character]("input", new ByteArraySerializer(), new CharacterSerializer())
    var i = 0
    while (i != 5) {
      testDriver.pipeInput(
        factory.create("input",
          Character(123,12), 15*10000L))
      i+=1;
    }
    val output = testDriver.readOutput....

我正在尝试像这样按键对值进行分组:

    streamBuilder.stream[Array[Byte], Character](inputKafkaTopic)
      .filter((key, _) => key == null )
      .mapValues(character=> PersonInfos(character.id, character.id2, character.age) // case class
      .groupBy((_, value) => CharacterInfos(value.id, value.id2) // case class)
        .count().toStream.print(Printed.toSysOut[CharacterInfos, Long])

当我 运行 代码时,我得到了这个:

[KTABLE-TOSTREAM-0000000012]: CharacterInfos(123,12), 1
[KTABLE-TOSTREAM-0000000012]: CharacterInfos(123,12), 2
[KTABLE-TOSTREAM-0000000012]: CharacterInfos(123,12), 3
[KTABLE-TOSTREAM-0000000012]: CharacterInfos(123,12), 4
[KTABLE-TOSTREAM-0000000012]: CharacterInfos(123,12), 5

为什么我得到 5 行而不是只有一行 CharacterInfos 和计数? groupBy 不只是改变 key 吗?

更新:我没有在示例代码中发现引用 Kafka Streams 中的 TopologyTestDriver 的行。我在下面的回答是针对 'normal' KStreams 应用程序行为,而 TopologyTestDriver 的行为不同。请参阅 Matthias J. Sax 对后者的回答。

这是预期的行为。稍微简化一下,Kafka Streams 默认情况下会在收到新的输入记录后立即发出新的输出记录。

当您聚合(此处:计数)输入数据时,一旦收到聚合的新输入,聚合结果就会更新(并因此产生新的输出记录)。

input record 1 ---> new output record with count=1
input record 2 ---> new output record with count=2
...
input record 5 ---> new output record with count=5

怎么办:您可以通过配置所谓的记录缓存的大小以及 commit.interval.ms 参数的设置来减少 'intermediate' 输出的数量。参见 Memory Management。但是,您将看到多少减少不仅取决于这些设置,还取决于输入数据的特征,因此减少的程度也可能随时间变化(想想:第一个小时可能是 90%的数据,第二个小时的数据中有 76%,等等)。也就是说,减少过程是确定性的,但从外部很难预测所产生的减少量。

注意:在进行 windowed 聚合时(如 windowed 计数),您还可以使用 Suppress() API 所以中间更新的数量不仅减少了,而且每个 window 只会有一个输出。但是,在您使用 case/code 时,聚合不是 windowed,因此不能使用 Suppress API.

为了帮助您理解为什么这样设置:您必须记住,流式系统通常在无限制的数据流上运行,这意味着系统不知道 'when it has received all the input data'。因此,即使 'intermediate outputs' 这个词实际上也具有误导性:例如,在收到第二条输入记录时,系统认为(非 windowed)聚合的结果是 '2' - - 据其目前所知,这是正确的结果。它无法预测另一个输入记录是否(或何时)可能到达。

对于 windowed 聚合(支持抑制)这会更容易一些,因为 window 大小定义了输入数据的边界给定的 window。在这里,Suppress() API 允许您在更好的延迟之间做出权衡决定,但每个 window 有多个输出(默认行为,抑制禁用)和更长的延迟,但您只会得到每个 window 的单个输出(启用抑制)。在后一种情况下,如果你有 1h windows,你将不会看到给定 window 的 any 输出,直到 1h 之后,可以这么说。对于某些用例,这是可以接受的,对于其他用例,则不是。

如果您使用 TopologyTestDriver 缓存被有效禁用,因此每个输入记录将 总是 产生一个输出记录。这是设计使然,因为缓存意味着不确定的行为,这使得编写实际的单元测试非常困难。

如果您在实际应用程序中部署代码,行为会有所不同,缓存会减少输出负载——您将获得哪些中间结果,是未定义的(即,不确定的);比较 Michael Noll 的回答。

对于你的单元测试来说,其实应该无关紧要,你可以测试所有输出记录(即所有中间结果),或者将所有输出记录放入一个键值Map和在测试中只测试每个键最后发出的记录(如果你不关心中间结果)。

此外,您可以使用 suppress() 运算符对您获得的输出消息进行细粒度控制。 suppress()——与缓存相反——是完全确定的,因此编写单元测试效果很好。但是,请注意 suppress() 是事件时间驱动的,因此,如果您停止发送新记录,时间不会提前并且 suppress() 不会发出数据。对于单元测试,考虑这一点很重要,因为您可能需要发送一些额外的 "dummy" 数据来触发您实际想要测试的输出。有关 suppress() 的更多详细信息,请查看此博客 post:https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers