如何反转多对多关系?
How to invert many-to-many relationship?
我有一个压缩的 Kafka 主题,它是一个实体流,在我想要反转的多对多关系中具有该实体的最新表示。
一个示例是 Author
个对象的主题,其中主题键是 Author.id
(AAA),值是“图书”标识符值的数组:
"AAA" -> {"books": [456]}
当 Author
写入 ID 为 333
的新 Book
时,具有相同键的新事件将写入带有更新的图书列表的流:
"AAA" -> {"books": [456, 333]}
也有可能一个 Book
有多个 Authors
,因此相同的 Book
标识符可能出现在另一个事件中:
"BBB" -> {"books": [333, 555]}
我想使用 kafka 流将其反转为 Books -> [Author]
流,因此上述事件将导致类似:
456 -> {"authors": ["AAA"]}
333 -> {"authors": ["AAA", "BBB"]}
555 -> {"authors": ["BBB"]}
当我再次启动我的应用程序时,我希望恢复状态,这样如果我读入另一个 Author
记录,它会适当地反转关系。所以这个:
"CCC" -> {"books": [555]}
会知道 "BBB"
也是一个 Author
并且会发出更新的事件:
555 -> {"authors": ["BBB", "CCC"]}
我一直在关注 GlobalKTable
,它在本地读取完整的主题状态,但无法弄清楚如何让它反转关系并将值聚合在一起。
如果可以的话,我想我可以将 GlobalKTable
与事件流结合起来,并获得每个 Book
.[=32= 的 Author
的完整列表]
您不必使用 GlobakKTable
来实现您的要求。
在 Kafka Streams 中,由更改密钥引起的内部数据重新分配会自动发生。例如:
orgKStream
.flatMapValues(books -> getBookList) (1)
.map((k,v) -> new KeyValue<>(v, k)) (2)
.groupByKey() (3)
.aggregate(//aggregate author list ) (4)
.toStream(// sink topic) (5)
(1) 将更改您的原始主题,如下所示。
<before>
"AAA" -> {"books": [456, 333]}
"BBB" -> {"books": [333, 555]}
<after>
"AAA" -> 456
"AAA" -> 333
"BBB" -> 333
"BBB" -> 555
(2) 将用值替换键。
<after>
456 -> "AAA"
333 -> "AAA"
333 -> "BBB"
555 -> "BBB"
(3) 和 (4) 将聚合并生成 KTable(和状态存储)
<after>
456 -> {"authors": ["AAA"]}
333 -> {"authors": ["AAA", "BBB"]}
555 -> {"authors": ["BBB"]}
(5) 会将 table 中的全部记录写入给定主题。
现在,您有了一个新主题,其中包含作为键的书籍和作为值的作者列表。如果您想将整个结果放在一个地方,现在只需像下面这样创建 GlobalKTable。
StreamsBuilder.globalTable(<sink topic>)
如果调用(2) (map) 然后调用(3) (groupByKey),将发生通过重新分区主题的内部数据重新分配。这意味着所有具有相同图书 ID 作为键的记录将被发布到内部重新分区主题的相同分区中。因此,您不会丢失任何聚合数据。
我有一个压缩的 Kafka 主题,它是一个实体流,在我想要反转的多对多关系中具有该实体的最新表示。
一个示例是 Author
个对象的主题,其中主题键是 Author.id
(AAA),值是“图书”标识符值的数组:
"AAA" -> {"books": [456]}
当 Author
写入 ID 为 333
的新 Book
时,具有相同键的新事件将写入带有更新的图书列表的流:
"AAA" -> {"books": [456, 333]}
也有可能一个 Book
有多个 Authors
,因此相同的 Book
标识符可能出现在另一个事件中:
"BBB" -> {"books": [333, 555]}
我想使用 kafka 流将其反转为 Books -> [Author]
流,因此上述事件将导致类似:
456 -> {"authors": ["AAA"]}
333 -> {"authors": ["AAA", "BBB"]}
555 -> {"authors": ["BBB"]}
当我再次启动我的应用程序时,我希望恢复状态,这样如果我读入另一个 Author
记录,它会适当地反转关系。所以这个:
"CCC" -> {"books": [555]}
会知道 "BBB"
也是一个 Author
并且会发出更新的事件:
555 -> {"authors": ["BBB", "CCC"]}
我一直在关注 GlobalKTable
,它在本地读取完整的主题状态,但无法弄清楚如何让它反转关系并将值聚合在一起。
如果可以的话,我想我可以将 GlobalKTable
与事件流结合起来,并获得每个 Book
.[=32= 的 Author
的完整列表]
您不必使用 GlobakKTable
来实现您的要求。
在 Kafka Streams 中,由更改密钥引起的内部数据重新分配会自动发生。例如:
orgKStream
.flatMapValues(books -> getBookList) (1)
.map((k,v) -> new KeyValue<>(v, k)) (2)
.groupByKey() (3)
.aggregate(//aggregate author list ) (4)
.toStream(// sink topic) (5)
(1) 将更改您的原始主题,如下所示。
<before>
"AAA" -> {"books": [456, 333]}
"BBB" -> {"books": [333, 555]}
<after>
"AAA" -> 456
"AAA" -> 333
"BBB" -> 333
"BBB" -> 555
(2) 将用值替换键。
<after>
456 -> "AAA"
333 -> "AAA"
333 -> "BBB"
555 -> "BBB"
(3) 和 (4) 将聚合并生成 KTable(和状态存储)
<after>
456 -> {"authors": ["AAA"]}
333 -> {"authors": ["AAA", "BBB"]}
555 -> {"authors": ["BBB"]}
(5) 会将 table 中的全部记录写入给定主题。
现在,您有了一个新主题,其中包含作为键的书籍和作为值的作者列表。如果您想将整个结果放在一个地方,现在只需像下面这样创建 GlobalKTable。
StreamsBuilder.globalTable(<sink topic>)
如果调用(2) (map) 然后调用(3) (groupByKey),将发生通过重新分区主题的内部数据重新分配。这意味着所有具有相同图书 ID 作为键的记录将被发布到内部重新分区主题的相同分区中。因此,您不会丢失任何聚合数据。