如何 运行 超过 1 个 ktable-ktable 应用程序实例在单个分区的 kafka 主题上加入 kafka 流应用程序?
How to run more than 1 application instances of ktable-ktable joins kafka streams application on single partitioned kafka topics?
KTable<Key1, GenericRecord> primaryTable = createKTable(key1, kstream, statestore-name);
KTable<Key2, GenericRecord> childTable1 = createKTable(key1, kstream, statestore-name);
KTable<Key3, GenericRecord> childTable2 = createKTable(key1, kstream, statestore-name);
primaryTable.leftJoin(childTable1, (primary, choild1) -> compositeObject)
.leftJoin(childTable2,(compositeObject, child2) -> compositeObject, Materialized.as("compositeobject-statestore"))
.toStream().to(""composite-topics)
对于我的应用程序,我正在使用 KTable-Ktable 连接,因此无论何时在主流或子流上接收到数据,它都可以将其设置为具有所有三个表的 setter 和 getter 的复合对象。这三个传入流具有不同的键,但是在创建 KTable 时,我为所有三个 KTable 设置了相同的键。
我的所有主题都只有一个分区。当我 运行 在单个实例上应用时,一切 运行 都很好。我可以看到 compositeObject 填充了来自所有三个表的数据。
所有交互式查询也 运行 可以很好地传递 recordID 和本地 statestore 名称。
但是当我 运行 同一应用程序的两个实例时,我看到带有主要数据和 child1 数据的 compositeObject,但 child2 仍然是空的。即使我尝试使用交互式查询调用 statestore,它也没有 return 任何东西。
我正在使用 spring-cloud-stream-kafka-streams 库来编写代码。
请提出它没有设置的原因是什么以及处理这个问题的正确解决方案。
Kafka Streams 的扩展模型与输入主题分区的数量相关联。因此,如果您的输入主题是单一分区的,则您无法横向扩展。输入主题分区的数量决定了你的最大并行度。
因此,您需要创建具有更高并行度的新主题。
KTable<Key1, GenericRecord> primaryTable = createKTable(key1, kstream, statestore-name);
KTable<Key2, GenericRecord> childTable1 = createKTable(key1, kstream, statestore-name);
KTable<Key3, GenericRecord> childTable2 = createKTable(key1, kstream, statestore-name);
primaryTable.leftJoin(childTable1, (primary, choild1) -> compositeObject)
.leftJoin(childTable2,(compositeObject, child2) -> compositeObject, Materialized.as("compositeobject-statestore"))
.toStream().to(""composite-topics)
对于我的应用程序,我正在使用 KTable-Ktable 连接,因此无论何时在主流或子流上接收到数据,它都可以将其设置为具有所有三个表的 setter 和 getter 的复合对象。这三个传入流具有不同的键,但是在创建 KTable 时,我为所有三个 KTable 设置了相同的键。
我的所有主题都只有一个分区。当我 运行 在单个实例上应用时,一切 运行 都很好。我可以看到 compositeObject 填充了来自所有三个表的数据。 所有交互式查询也 运行 可以很好地传递 recordID 和本地 statestore 名称。
但是当我 运行 同一应用程序的两个实例时,我看到带有主要数据和 child1 数据的 compositeObject,但 child2 仍然是空的。即使我尝试使用交互式查询调用 statestore,它也没有 return 任何东西。
我正在使用 spring-cloud-stream-kafka-streams 库来编写代码。
请提出它没有设置的原因是什么以及处理这个问题的正确解决方案。
Kafka Streams 的扩展模型与输入主题分区的数量相关联。因此,如果您的输入主题是单一分区的,则您无法横向扩展。输入主题分区的数量决定了你的最大并行度。
因此,您需要创建具有更高并行度的新主题。