在 flatMap 之后实现 kafka Stream
Materialize kafka Stream after a flatMap
我想通过 Spring Kafka 支持的 Kafka Streams 使用两个 Kafka 主题。
主题具有不同的键和值。我想通过以下方法将第二个主题的键和值映射到第一个主题 merge
中:.merge(KStream<X,Y> otherStream)
.
这是一个例子:
// Block 1
KStream<MyKey, MyValue> stream2 = streamsBuilder.stream(
"second-topic",
consumedAs(OtherKey.class, OtherValue.class, AllowEmpty.NONE) // Provides default json Serde
).flatMap(
(key, value) -> {
List<KeyValue<MyKey, MyValue>> list = new ArrayList<>();
// Do stuff an fill out the list
return list;
});
// Block 2
KStream<MyKey, MyValue>[] branches = stream
.merge(stream2)
... business stuff
通过这个解决方案,我得到了 ClassCastException
,原因是 MyKey
无法转换为 MyKey
。原因是,它们是由不同的模块和 class 加载器提供的。错误发生在序列化中,在合并块中。使用 transform(..)
我得到了相同的行为。
如果我附加命令 .through("tmp-topic")
一切正常。它通过主题 returns 一个有效的可序列化对象而不是 flatMap(...)
.
来实现实体化
我在 groupByKey
中找到了以下 API 文档:
...
If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper), map(KeyValueMapper), flatMap(KeyValueMapper), or transform(TransformerSupplier, String...)), and no data redistribution happened afterwards (e.g., via through(String)) an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "XXX" is an internally generated name, and "-repartition" is a fixed suffix. You can retrieve all generated internal topic names via Topology.describe().
For this case, all data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting KGroupedStream is partitioned correctly on its key...
正如您所看到的,由于序列化和重新分区,更改 flatMap(...)
等操作后的最佳实践似乎是将其写入主题。
您如何看待 through("topic")
的用法以使其发挥作用?
有谁知道,是否有可能在 flatMap(...)
之后实现而不写主题?
版本
Spring卡夫卡版本:2.2.5.RELEASE
Apache Kafka 客户端:2.0.1
Apache Kafka 流:2.0.1
就某些情况而言,每当您使用键更改操作时,任何使用新键的下游处理器都会触发重新分区主题的创建。重新分区主题确保新密钥位于正确的分区上。我知道你可能已经知道这一点,但为了清楚起见,我只是在这里重申这一点。
考虑到这一点,在修改密钥后执行 through()
操作是完全可以接受的,因为这就是 Kafka Streams 无论如何都会在幕后执行的操作。
所以 flatMap(...).through(someTopic)
工作正常。
此外,如果您在下游的其他操作(连接、聚合)中重新使用具有修改后的键的 KStream
实例,这样做还可以防止出现多次重新分区的可能性。
HTH,
比尔
我想通过 Spring Kafka 支持的 Kafka Streams 使用两个 Kafka 主题。
主题具有不同的键和值。我想通过以下方法将第二个主题的键和值映射到第一个主题 merge
中:.merge(KStream<X,Y> otherStream)
.
这是一个例子:
// Block 1
KStream<MyKey, MyValue> stream2 = streamsBuilder.stream(
"second-topic",
consumedAs(OtherKey.class, OtherValue.class, AllowEmpty.NONE) // Provides default json Serde
).flatMap(
(key, value) -> {
List<KeyValue<MyKey, MyValue>> list = new ArrayList<>();
// Do stuff an fill out the list
return list;
});
// Block 2
KStream<MyKey, MyValue>[] branches = stream
.merge(stream2)
... business stuff
通过这个解决方案,我得到了 ClassCastException
,原因是 MyKey
无法转换为 MyKey
。原因是,它们是由不同的模块和 class 加载器提供的。错误发生在序列化中,在合并块中。使用 transform(..)
我得到了相同的行为。
如果我附加命令 .through("tmp-topic")
一切正常。它通过主题 returns 一个有效的可序列化对象而不是 flatMap(...)
.
我在 groupByKey
中找到了以下 API 文档:
... If a key changing operator was used before this operation (e.g., selectKey(KeyValueMapper), map(KeyValueMapper), flatMap(KeyValueMapper), or transform(TransformerSupplier, String...)), and no data redistribution happened afterwards (e.g., via through(String)) an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "XXX" is an internally generated name, and "-repartition" is a fixed suffix. You can retrieve all generated internal topic names via Topology.describe(). For this case, all data of this stream will be redistributed through the repartitioning topic by writing all records to it, and rereading all records from it, such that the resulting KGroupedStream is partitioned correctly on its key...
正如您所看到的,由于序列化和重新分区,更改 flatMap(...)
等操作后的最佳实践似乎是将其写入主题。
您如何看待 through("topic")
的用法以使其发挥作用?
有谁知道,是否有可能在 flatMap(...)
之后实现而不写主题?
版本
Spring卡夫卡版本:2.2.5.RELEASE
Apache Kafka 客户端:2.0.1
Apache Kafka 流:2.0.1
就某些情况而言,每当您使用键更改操作时,任何使用新键的下游处理器都会触发重新分区主题的创建。重新分区主题确保新密钥位于正确的分区上。我知道你可能已经知道这一点,但为了清楚起见,我只是在这里重申这一点。
考虑到这一点,在修改密钥后执行 through()
操作是完全可以接受的,因为这就是 Kafka Streams 无论如何都会在幕后执行的操作。
所以 flatMap(...).through(someTopic)
工作正常。
此外,如果您在下游的其他操作(连接、聚合)中重新使用具有修改后的键的 KStream
实例,这样做还可以防止出现多次重新分区的可能性。
HTH,
比尔