Kafka Stream 在 KTable 值字段上分组
Kafka Stream grouping on KTable value field
我有一个用例,我的 KTable 是这样的。
KTable : 订单表
键:值
{123} : {id1,12}
{124} : {id2,10}
{125} : {id1,5}
{126} : {id2,11}
KTable : orderByIdTable
=>
这个 table 将在 groupBy Value field
(id)
上并计数列值的总和为 id1=(12+5)
, id2=(10+11)
键:值
{id1} : {17}
{id2} : {21}
final KTable<String, Order> orderTable = builder.table("order-topic");
Don't know how to do this further.....
final KTable<String,Long> orderByIdTable = ?
这是一个代码示例(仅使用 Java 基本类型,这让我可以更快地组合在一起)演示如何 re-key 又名 re-partition 一个 KTable,结果在新的 KTable 中。您应该能够轻松地使其适应将 KTable<String, Order>
转换为 KTable<String, Long>
.
的示例
就个人而言,我会为您的用例选择变体 2。
示例如下。 未完全测试,可能是墓碑记录(带有 non-null 键但为空值的消息,这表明应从 table 中删除该键) 处理不当。
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Integer, String> table = builder.table(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String()));
// Variant 1 (https://docs.confluent.io/current/streams/faq.html#option-1-write-kstream-to-ak-read-back-as-ktable)
// Here, we re-key the KTable, write the results to a new topic, and then re-read that topic into a new KTable.
table
.toStream()
.map((key, value) -> KeyValue.pair(value, key))
.to(outputTopic1, Produced.with(Serdes.String(), Serdes.Integer()));
KTable<String, Integer> rekeyedTable1 =
builder.table(outputTopic1, Consumed.with(Serdes.String(), Serdes.Integer()));
// Variant 2 (https://docs.confluent.io/current/streams/faq.html#option-2-perform-a-dummy-aggregation)
// Here, we re-key the KTable (resulting in a KGroupedTable), and then perform a dummy aggregation to turn the
// KGroupedTable into a KTable.
final KTable<String, Integer> rekeyedTable2 =
table
.groupBy(
(key, value) -> KeyValue.pair(value, key),
Grouped.with(Serdes.String(), Serdes.Integer())
)
// Dummy aggregation
.reduce(
(aggValue, newValue) -> newValue, /* adder */
(aggValue, oldValue) -> oldValue /* subtractor */
);
rekeyedTable2.toStream().to(outputTopic2, Produced.with(Serdes.String(), Serdes.Integer()));
我有一个用例,我的 KTable 是这样的。
KTable : 订单表
键:值
{123} : {id1,12}
{124} : {id2,10}
{125} : {id1,5}
{126} : {id2,11}
KTable : orderByIdTable
=>
这个 table 将在 groupBy Value field
(id)
上并计数列值的总和为 id1=(12+5)
, id2=(10+11)
键:值
{id1} : {17}
{id2} : {21}
final KTable<String, Order> orderTable = builder.table("order-topic");
Don't know how to do this further.....
final KTable<String,Long> orderByIdTable = ?
这是一个代码示例(仅使用 Java 基本类型,这让我可以更快地组合在一起)演示如何 re-key 又名 re-partition 一个 KTable,结果在新的 KTable 中。您应该能够轻松地使其适应将 KTable<String, Order>
转换为 KTable<String, Long>
.
就个人而言,我会为您的用例选择变体 2。
示例如下。 未完全测试,可能是墓碑记录(带有 non-null 键但为空值的消息,这表明应从 table 中删除该键) 处理不当。
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Integer, String> table = builder.table(inputTopic, Consumed.with(Serdes.Integer(), Serdes.String()));
// Variant 1 (https://docs.confluent.io/current/streams/faq.html#option-1-write-kstream-to-ak-read-back-as-ktable)
// Here, we re-key the KTable, write the results to a new topic, and then re-read that topic into a new KTable.
table
.toStream()
.map((key, value) -> KeyValue.pair(value, key))
.to(outputTopic1, Produced.with(Serdes.String(), Serdes.Integer()));
KTable<String, Integer> rekeyedTable1 =
builder.table(outputTopic1, Consumed.with(Serdes.String(), Serdes.Integer()));
// Variant 2 (https://docs.confluent.io/current/streams/faq.html#option-2-perform-a-dummy-aggregation)
// Here, we re-key the KTable (resulting in a KGroupedTable), and then perform a dummy aggregation to turn the
// KGroupedTable into a KTable.
final KTable<String, Integer> rekeyedTable2 =
table
.groupBy(
(key, value) -> KeyValue.pair(value, key),
Grouped.with(Serdes.String(), Serdes.Integer())
)
// Dummy aggregation
.reduce(
(aggValue, newValue) -> newValue, /* adder */
(aggValue, oldValue) -> oldValue /* subtractor */
);
rekeyedTable2.toStream().to(outputTopic2, Produced.with(Serdes.String(), Serdes.Integer()));