如何将 m:n 数据流转置为 n:m table?
How to transpose a steam of m:n data into a n:m table?
假设我们收集汽车型号/轮胎型号兼容性的统计数据。输入流将汽车型号列为键,将兼容的轮胎型号列为值:
汽车轮胎话题:
car1 -> [tire1, tire2, tire3]
car2 -> [tire2, tire4]
期望的最终结果是 table,以轮胎型号为键,与该轮胎兼容的车型数量:
轮胎计数-table:
tire1 -> 1
tire2 -> 2
tire3 -> 1
tire4 -> 1
轮胎型号有时会停产。然后将它们从兼容性列表中删除:
汽车轮胎话题:
car1 -> [tire2, tire3]
('tire1' 已删除)。
另一方面,新的轮胎型号进入市场并添加到兼容性列表中:
汽车轮胎话题:
car1 -> [tire2, tire3, tire5]
如何使用 Kafka Streams DSL 实现这种转换?
我的方法#1
在 carTireStream.flatTransform()
中,我从 tire-car-table
状态存储中检索旧轮胎兼容性列表。对于新值中缺少的每个轮胎型号(已删除),我发出一条带有复合键的记录:
{carId, tireId} -> null
对于旧列表中缺少的每个轮胎型号(已添加),我发出
{carId, tireId} -> 1
记录。这个流然后通过密钥聚合到 car-tire-diff-table
。此 table 仅包含有效的汽车/轮胎模型组合。 null
条价值记录删除了所有停产的 car/tire 组合。
之后,这个 table 按轮胎型号分组(通过从复合键检索轮胎型号)。组聚合器和减法器函数创建列表 adding/removing 汽车模型 from/to 汽车模型列表。结果是 tire-car-table
:
tire2 -> [car1, car2]
tire3 -> [car1]
tire4 -> [car2]
tire5 -> [car1]
(轮胎 1 被移除了,还记得吗?)
最后一步很简单。我在此 table 上应用 .mapValues()
,并发出列表的长度。
这种方法有几个缺点:
- 它不是纯 Stream DSL。
- 在以声明方式引入之前,我必须访问
tire-car-table
状态存储。
- 需要中级
car-tire-diff-table
。
car-tire-diff-table
的值仅用于映射创建/删除操作 (1
/ null
)。
我的方法 #2
使用处理器 API 我可以避免中间 table 和奇怪的 create/delete 跟踪值。
一个处理器消耗 car-tire-topic
。它从 car-tire-table
中读取旧值并将新值存储到 car-tire-table
中(从而更新它)。将新旧轮胎兼容性列表放在一个地方:对于每个添加的轮胎型号,它从 tire-car-table
中读取,将汽车型号添加到列表中,然后写回值;对于每个删除的轮胎模型,它从 tire-car-table
中读取,删除汽车模型,然后写回更新后的列表。
这种方法的缺点:
tire-car-table
必须是全局 table。现在我可以创建一种算法,可以将任何可能的 car/tire 组合映射到一个分区,同时仍然有多个分区。
- 处理器包含许多操作。
- 我看不出如何使用 Stream DSL 实现最后的计数步骤。我可以使用
StreamsBuilder.addStateStore()
为 tire-car-tabble
创建状态存储实例,但我发现无法从中创建 KTable
实例。
限制:这是对原始问题的概括。我无法通过更改源主题包含的内容来解决该问题。或者通过添加 'tire got removed from the marked' 流并简单地从 tire-car-table
.
中删除轮胎记录
如果 KTable
api 公开某种更新处理程序,它会接收旧值和新值,那么整个事情会更容易。
有人能想出比我更优雅的方法来解决这个问题吗?
如果您想要完整的转置映射,我认为没有比方法 1 更好的方法了。正如您所指出的,您有两个具有不同键的有状态步骤,因此您必须至少分两步执行操作才能支持多个分区。
如果您只想要最终计数,您可以 flatTransform
将您的原始 carTireStream
流式传输到
tireId -> 1
用于新轮胎条目或 tireId -> -1
用于已移除的轮胎条目(使用您的 tire-car-table
state store)然后...
tireDeltaStream
.groupByKey()
.reduce((oldCount, delta) -> oldCount + delta)
你现在有一个 table 每个轮胎的最新汽车数量,你可以查询(如果你给它一个名字)或写入流。
如果您想使用高级 DSL 完成所有操作,我能想到的唯一方法是将 flatTransform
调用替换为 [=18= 上的 aggregate
] 流保留最新的轮胎列表 和 增量列表,然后 flatMap
提取增量。
例如以下关于 carTireStream
主题的消息
car1 -> [tire1, tire2, tire3]
会被聚合转化为...
car1 -> ([tire1, tire2, tire3], [tire1 -> 1, tire2 -> 1, tire3 -> 1])
在提取增量的 flatMap
之后将是...
tire1 -> 1
tire2 -> 1
tire3 -> 1
然后,关于carTireStream
主题的以下消息
car1 -> [tire2, tire3, tire5]
会被聚合转化为...
car1 -> ([tire2, tire3, tire5], [tire1 -> -1, tire5 -> 1])
在 flatMap 之后是...
tire1 -> -1
tire5 -> 1
如果聚合方法有一种机制来发出与内部状态的当前值不同的值,那么这种方法会更清晰。
在这种情况下,您只需存储最新的轮胎 ID 列表并发出 tireId -> delta
值,从而无需存储额外的状态并有一个额外的 flatMap
步骤。
假设我们收集汽车型号/轮胎型号兼容性的统计数据。输入流将汽车型号列为键,将兼容的轮胎型号列为值:
汽车轮胎话题:
car1 -> [tire1, tire2, tire3]
car2 -> [tire2, tire4]
期望的最终结果是 table,以轮胎型号为键,与该轮胎兼容的车型数量:
轮胎计数-table:
tire1 -> 1
tire2 -> 2
tire3 -> 1
tire4 -> 1
轮胎型号有时会停产。然后将它们从兼容性列表中删除:
汽车轮胎话题:
car1 -> [tire2, tire3]
('tire1' 已删除)。 另一方面,新的轮胎型号进入市场并添加到兼容性列表中:
汽车轮胎话题:
car1 -> [tire2, tire3, tire5]
如何使用 Kafka Streams DSL 实现这种转换?
我的方法#1
在 carTireStream.flatTransform()
中,我从 tire-car-table
状态存储中检索旧轮胎兼容性列表。对于新值中缺少的每个轮胎型号(已删除),我发出一条带有复合键的记录:
{carId, tireId} -> null
对于旧列表中缺少的每个轮胎型号(已添加),我发出
{carId, tireId} -> 1
记录。这个流然后通过密钥聚合到 car-tire-diff-table
。此 table 仅包含有效的汽车/轮胎模型组合。 null
条价值记录删除了所有停产的 car/tire 组合。
之后,这个 table 按轮胎型号分组(通过从复合键检索轮胎型号)。组聚合器和减法器函数创建列表 adding/removing 汽车模型 from/to 汽车模型列表。结果是 tire-car-table
:
tire2 -> [car1, car2]
tire3 -> [car1]
tire4 -> [car2]
tire5 -> [car1]
(轮胎 1 被移除了,还记得吗?)
最后一步很简单。我在此 table 上应用 .mapValues()
,并发出列表的长度。
这种方法有几个缺点:
- 它不是纯 Stream DSL。
- 在以声明方式引入之前,我必须访问
tire-car-table
状态存储。 - 需要中级
car-tire-diff-table
。 car-tire-diff-table
的值仅用于映射创建/删除操作 (1
/null
)。
我的方法 #2 使用处理器 API 我可以避免中间 table 和奇怪的 create/delete 跟踪值。
一个处理器消耗 car-tire-topic
。它从 car-tire-table
中读取旧值并将新值存储到 car-tire-table
中(从而更新它)。将新旧轮胎兼容性列表放在一个地方:对于每个添加的轮胎型号,它从 tire-car-table
中读取,将汽车型号添加到列表中,然后写回值;对于每个删除的轮胎模型,它从 tire-car-table
中读取,删除汽车模型,然后写回更新后的列表。
这种方法的缺点:
tire-car-table
必须是全局 table。现在我可以创建一种算法,可以将任何可能的 car/tire 组合映射到一个分区,同时仍然有多个分区。- 处理器包含许多操作。
- 我看不出如何使用 Stream DSL 实现最后的计数步骤。我可以使用
StreamsBuilder.addStateStore()
为tire-car-tabble
创建状态存储实例,但我发现无法从中创建KTable
实例。
限制:这是对原始问题的概括。我无法通过更改源主题包含的内容来解决该问题。或者通过添加 'tire got removed from the marked' 流并简单地从 tire-car-table
.
如果 KTable
api 公开某种更新处理程序,它会接收旧值和新值,那么整个事情会更容易。
有人能想出比我更优雅的方法来解决这个问题吗?
如果您想要完整的转置映射,我认为没有比方法 1 更好的方法了。正如您所指出的,您有两个具有不同键的有状态步骤,因此您必须至少分两步执行操作才能支持多个分区。
如果您只想要最终计数,您可以 flatTransform
将您的原始 carTireStream
流式传输到
tireId -> 1
用于新轮胎条目或 tireId -> -1
用于已移除的轮胎条目(使用您的 tire-car-table
state store)然后...
tireDeltaStream
.groupByKey()
.reduce((oldCount, delta) -> oldCount + delta)
你现在有一个 table 每个轮胎的最新汽车数量,你可以查询(如果你给它一个名字)或写入流。
如果您想使用高级 DSL 完成所有操作,我能想到的唯一方法是将 flatTransform
调用替换为 [=18= 上的 aggregate
] 流保留最新的轮胎列表 和 增量列表,然后 flatMap
提取增量。
例如以下关于 carTireStream
主题的消息
car1 -> [tire1, tire2, tire3]
会被聚合转化为...
car1 -> ([tire1, tire2, tire3], [tire1 -> 1, tire2 -> 1, tire3 -> 1])
在提取增量的 flatMap
之后将是...
tire1 -> 1
tire2 -> 1
tire3 -> 1
然后,关于carTireStream
主题的以下消息
car1 -> [tire2, tire3, tire5]
会被聚合转化为...
car1 -> ([tire2, tire3, tire5], [tire1 -> -1, tire5 -> 1])
在 flatMap 之后是...
tire1 -> -1
tire5 -> 1
如果聚合方法有一种机制来发出与内部状态的当前值不同的值,那么这种方法会更清晰。
在这种情况下,您只需存储最新的轮胎 ID 列表并发出 tireId -> delta
值,从而无需存储额外的状态并有一个额外的 flatMap
步骤。