Kafka Stream DSL non-key join current workaround explained
Kafka Stream DSL non-key join current workaround explained
我正在尝试了解中提到的解决方法:
https://issues.apache.org/jira/browse/KAFKA-3705
如
Today in Kafka Streams DSL, KTable joins are only based on keys. If
users want to join a KTable A by key a with another KTable B by key b
but with a "foreign key" a, and assuming they are read from two topics
which are partitioned on a and b respectively, they need to do the
following pattern:
tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a"
tableA.join(tableB', joiner);
我很难理解到底发生了什么。
特别是那句话令人困惑:"If users want to join a KTable A by key a with another KTable B by key b but with a "foreign key" a"。我也不明白上面的代码。
有人可以澄清一下这里发生了什么吗?
这里也提到了这个:
缩小流中 KTables 语义与关系数据库中表之间的差距。通常的做法是在 RDBMS 中的表发生变化时将其捕获到 Kafka 主题(JDBC-connect、Debezium、Maxwell)中。这些实体通常具有多个一对多关系。通常 RDBMS 提供很好的支持来解决与连接的这种关系。 Streams 在这里不足,解决方法 (group by - join - lateral view) 也没有得到很好的支持,并且不符合基于记录的处理的想法。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
什么意思(group by - join - lateral view)?我怀疑它与上面的代码有关,但又有点难以理解。任何人都可以对此有所了解吗?
好吧,下面的代码是用 non-key join 连接两个 KTables 的伪代码:
tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a"
tableA.join(tableB', joiner);
解释:
假设,tableA 有一个键字段“a”。为了与 tableA 连接另一个 ktable,它应该是 co-partitioned。它应该有相同的键。因此,我们将使用字段“a”
重新键入 ktable tableB
tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a"
groupBy()
是 shorthand 用于 selectKey()+ groupByKey()
操作。
groupBy(/* select on field "a" */)
将在字段 "a" 上重新键入 tableB 并按该键分组。因此,现在您有一个以字段“a”作为键的 KGroupedTable。为了得到 KTable,你需要调用 .aggregate() 。这就是上面代码中发生的事情。
P.S。 .agg()
应重命名为 .aggregate()
一旦 tableB' 准备就绪,您可以使用以下代码加入 tableA。
tableA.join(tableB', joiner);
这里的joiner指的是ValueJoiner
实现。
示例:
// Java 8+ example, using lambda expressions
KTable<String, String> joined = left.join(right,
/* Below line is ValueJoiner */
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue
);
目前,这是 Non-key 加入 KTables 的方式
您可以在文档中找到很好的解释:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join
我正在尝试了解中提到的解决方法:
https://issues.apache.org/jira/browse/KAFKA-3705
如
Today in Kafka Streams DSL, KTable joins are only based on keys. If users want to join a KTable A by key a with another KTable B by key b but with a "foreign key" a, and assuming they are read from two topics which are partitioned on a and b respectively, they need to do the following pattern:
tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a" tableA.join(tableB', joiner);
我很难理解到底发生了什么。
特别是那句话令人困惑:"If users want to join a KTable A by key a with another KTable B by key b but with a "foreign key" a"。我也不明白上面的代码。
有人可以澄清一下这里发生了什么吗?
这里也提到了这个:
缩小流中 KTables 语义与关系数据库中表之间的差距。通常的做法是在 RDBMS 中的表发生变化时将其捕获到 Kafka 主题(JDBC-connect、Debezium、Maxwell)中。这些实体通常具有多个一对多关系。通常 RDBMS 提供很好的支持来解决与连接的这种关系。 Streams 在这里不足,解决方法 (group by - join - lateral view) 也没有得到很好的支持,并且不符合基于记录的处理的想法。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
什么意思(group by - join - lateral view)?我怀疑它与上面的代码有关,但又有点难以理解。任何人都可以对此有所了解吗?
好吧,下面的代码是用 non-key join 连接两个 KTables 的伪代码:
tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a"
tableA.join(tableB', joiner);
解释:
假设,tableA 有一个键字段“a”。为了与 tableA 连接另一个 ktable,它应该是 co-partitioned。它应该有相同的键。因此,我们将使用字段“a”
重新键入 ktable tableBtableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a"
groupBy()
是 shorthand 用于 selectKey()+ groupByKey()
操作。
groupBy(/* select on field "a" */)
将在字段 "a" 上重新键入 tableB 并按该键分组。因此,现在您有一个以字段“a”作为键的 KGroupedTable。为了得到 KTable,你需要调用 .aggregate() 。这就是上面代码中发生的事情。
P.S。 .agg()
应重命名为 .aggregate()
一旦 tableB' 准备就绪,您可以使用以下代码加入 tableA。
tableA.join(tableB', joiner);
这里的joiner指的是ValueJoiner
实现。
示例:
// Java 8+ example, using lambda expressions
KTable<String, String> joined = left.join(right,
/* Below line is ValueJoiner */
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue
);
目前,这是 Non-key 加入 KTables 的方式 您可以在文档中找到很好的解释:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join