澄清 "the order of execution for the subtractor and adder is not defined"

Clarify "the order of execution for the subtractor and adder is not defined"

Streams DSL documentation 包含关于使用 aggregate 方法转换 KGroupedTable → KTable 的警告,如下(强调我的):

When subsequent non-null values are received for a key (e.g., UPDATE), then (1) the subtractor is called with the old value as stored in the table and (2) the adder is called with the new value of the input record that was just received. The order of execution for the subtractor and adder is not defined.

我对最后一行的解释暗示可能会发生以下三种情况之一:

  1. 减法器可以在加法器之前调用
  2. 加法器可以在减法器之前调用
  3. 加法器和减法器可以同时调用

这是我希望得到解答的问题:
在 KGroupedTable 上使用聚合方法时,上述所有 3 种情况实际上都可能吗?
还是我误解了文档?对于我的用例(详见下文),如果总是在加法器之前调用减法器,那将是理想的。


为什么这个问题很重要?

如果加法器和减法器是非交换运算并且它们的执行顺序可能不同,则根据加法器和减法器的执行顺序,您最终可能会得到不同的结果。一个有用的非交换操作的例子是,如果我们将记录聚合到一个集合中:

.aggregate[Set[Animal]](Set.empty)(
  adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue,
  subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - animalValue
)

在此示例中,对于重复事件,如果在减法器之前调用加法器,您最终会从集合中完全删除该值(我想这对于大多数用例来说都是有问题的)。


为什么我怀疑文档(假设我对它的解释是正确的)?

  1. 似乎是一个不寻常的设计选择
  2. 当我进行 运行 单元测试时(使用 TopologyTestDriver 和 EmbeddedKafka),我总是看到减法器在 加法器。不幸的是,如果存在某种竞争条件 涉及,完全有可能我永远不会打到对方 场景。
  3. 我也尝试查看 kafka-streams 代码库。调用用户提供的 adder/subtracter 函数的 KTableProcessorSupplier 似乎是这个:https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81 并且在第 92 行,您甚至可以看到一条评论说“首先尝试删除旧值” .似乎这会明确回答我的问题,对吗?不幸的是,在我自己的测试中,我看到 process 函数本身被调用了两次;首先使用仅包含旧值的 Change<V> value,然后使用仅包含新值的 Change<V> value 再次调用 process 函数。不幸的是,我无法深入挖掘以找到生成旧值记录和新值记录(在收到更新后)的内部代码以确定它是否真的按该顺序生成这些记录。

订单是硬编码的(即,没有竞争条件),但不能保证订单在未来的版本中不会更改,恕不另行通知(即,它不是 public 合同,也没有需要 KIP 来更改它)。我猜会有一个关于它的Jira......但事实上,它并不重要(下面有详细信息)。

对于您提到的三种情况,第三种情况不会发生:聚合器在单个线程中执行(每个分片),因此先调用加法器或减法器。

first with a Change value that includes only the old value and then the process function is called again with a Change value that includes only the new value.

一般来说,两条记录可能由不同的线程处理,因此不可能只发送一条记录。只是 TTD 模拟单线程执行,因此两条记录总是在同一个处理器中结束。

比照

但是,只有当两条记录确实在同一个处理器中结束时(如果分组键在上游更新期间没有改变),顺序实际上才重要。

此外,顺序实际上不取决于下游聚合实现,而是取决于写入 groupBy() 的重新分区主题的顺序,并且对于多个并行上游处理器,这些写入无论如何都是交错的。因此,一般来说,您应该将“加”和“减”部分视为独立的实体,不要对它们的顺序做出任何假设(另外,即使键没有改变,两条记录也可能被其他记录交错...... .)

提供的唯一保证是(假设您正确配置了生产者以避免在 send() 期间重新排序),如果分组键不更改,则不会重新发送旧值和新值- 相对于彼此排序。尽管发送顺序在上游处理器中是硬编码的:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99

因此,下游聚合处理器的顺序实际上是没有意义的。