如何将 m:n 数据流转置为 n:m table?

How to transpose a steam of m:n data into a n:m table?

假设我们收集汽车型号/轮胎型号兼容性的统计数据。输入流将汽车型号列为键,将兼容的轮胎型号列为值:

汽车轮胎话题:

car1 -> [tire1, tire2, tire3]
car2 -> [tire2, tire4]

期望的最终结果是 table,以轮胎型号为键,与该轮胎兼容的车型数量:

轮胎计数-table:

tire1 -> 1
tire2 -> 2
tire3 -> 1
tire4 -> 1

轮胎型号有时会停产。然后将它们从兼容性列表中删除:

汽车轮胎话题:

car1 -> [tire2, tire3]

('tire1' 已删除)。 另一方面,新的轮胎型号进入市场并添加到兼容性列表中:

汽车轮胎话题:

car1 -> [tire2, tire3, tire5]

如何使用 Kafka Streams DSL 实现这种转换?

我的方法#1carTireStream.flatTransform() 中,我从 tire-car-table 状态存储中检索旧轮胎兼容性列表。对于新值中缺少的每个轮胎型号(已删除),我发出一条带有复合键的记录:

{carId, tireId} -> null

对于旧列表中缺少的每个轮胎型号(已添加),我发出

{carId, tireId} -> 1

记录。这个流然后通过密钥聚合到 car-tire-diff-table。此 table 仅包含有效的汽车/轮胎模型组合。 null 条价值记录删除了所有停产的 car/tire 组合。

之后,这个 table 按轮胎型号分组(通过从复合键检索轮胎型号)。组聚合器和减法器函数创建列表 adding/removing 汽车模型 from/to 汽车模型列表。结果是 tire-car-table:

tire2 -> [car1, car2]
tire3 -> [car1]
tire4 -> [car2]
tire5 -> [car1]

(轮胎 1 被移除了,还记得吗?)

最后一步很简单。我在此 table 上应用 .mapValues(),并发出列表的长度。

这种方法有几个缺点:

我的方法 #2 使用处理器 API 我可以避免中间 table 和奇怪的 create/delete 跟踪值。

一个处理器消耗 car-tire-topic。它从 car-tire-table 中读取旧值并将新值存储到 car-tire-table 中(从而更新它)。将新旧轮胎兼容性列表放在一个地方:对于每个添加的轮胎型号,它从 tire-car-table 中读取,将汽车型号添加到列表中,然后写回值;对于每个删除的轮胎模型,它从 tire-car-table 中读取,删除汽车模型,然后写回更新后的列表。

这种方法的缺点:

限制:这是对原始问题的概括。我无法通过更改源主题包含的内容来解决该问题。或者通过添加 'tire got removed from the marked' 流并简单地从 tire-car-table.

中删除轮胎记录

如果 KTable api 公开某种更新处理程序,它会接收旧值和新值,那么整个事情会更容易。

有人能想出比我更优雅的方法来解决这个问题吗?

如果您想要完整的转置映射,我认为没有比方法 1 更好的方法了。正如您所指出的,您有两个具有不同键的有状态步骤,因此您必须至少分两步执行操作才能支持多个分区。

如果您只想要最终计数,您可以 flatTransform 将您的原始 carTireStream 流式传输到 tireId -> 1 用于新轮胎条目或 tireId -> -1 用于已移除的轮胎条目(使用您的 tire-car-table state store)然后...

tireDeltaStream
  .groupByKey()
  .reduce((oldCount, delta) -> oldCount + delta)

你现在有一个 table 每个轮胎的最新汽车数量,你可以查询(如果你给它一个名字)或写入流。

如果您想使用高级 DSL 完成所有操作,我能想到的唯一方法是将 flatTransform 调用替换为 [=18= 上的 aggregate ] 流保留最新的轮胎列表 增量列表,然后 flatMap 提取增量。

例如以下关于 carTireStream 主题的消息

car1 -> [tire1, tire2, tire3]

会被聚合转化为...

car1 -> ([tire1, tire2, tire3], [tire1 -> 1, tire2 -> 1, tire3 -> 1])

在提取增量的 flatMap 之后将是...

tire1 -> 1
tire2 -> 1
tire3 -> 1

然后,关于carTireStream主题的以下消息

car1 -> [tire2, tire3, tire5]

会被聚合转化为...

car1 -> ([tire2, tire3, tire5], [tire1 -> -1, tire5 -> 1])

在 flatMap 之后是...

tire1 -> -1
tire5 -> 1

如果聚合方法有一种机制来发出与内部状态的当前值不同的值,那么这种方法会更清晰。 在这种情况下,您只需存储最新的轮胎 ID 列表并发出 tireId -> delta 值,从而无需存储额外的状态并有一个额外的 flatMap 步骤。