Kafka Stream DSL non-key join current workaround explained

Kafka Stream DSL non-key join current workaround explained

我正在尝试了解中提到的解决方法:

https://issues.apache.org/jira/browse/KAFKA-3705

Today in Kafka Streams DSL, KTable joins are only based on keys. If users want to join a KTable A by key a with another KTable B by key b but with a "foreign key" a, and assuming they are read from two topics which are partitioned on a and b respectively, they need to do the following pattern:

tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a"

tableA.join(tableB', joiner);

我很难理解到底发生了什么。

特别是那句话令人困惑:"If users want to join a KTable A by key a with another KTable B by key b but with a "foreign key" a"。我也不明白上面的代码。

有人可以澄清一下这里发生了什么吗?

这里也提到了这个:

缩小流中 KTables 语义与关系数据库中表之间的差距。通常的做法是在 RDBMS 中的表发生变化时将其捕获到 Kafka 主题(JDBC-connect、Debezium、Maxwell)中。这些实体通常具有多个一对多关系。通常 RDBMS 提供很好的支持来解决与连接的这种关系。 Streams 在这里不足,解决方法 (group by - join - lateral view) 也没有得到很好的支持,并且不符合基于记录的处理的想法。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable

什么意思(group by - join - lateral view)?我怀疑它与上面的代码有关,但又有点难以理解。任何人都可以对此有所了解吗?

好吧,下面的代码是用 non-key join 连接两个 KTables 的伪代码:

tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a"

tableA.join(tableB', joiner);

解释

假设,tableA 有一个键字段“a”。为了与 tableA 连接另一个 ktable,它应该是 co-partitioned。它应该有相同的键。因此,我们将使用字段“a

重新键入 ktable tableB
tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a"

groupBy() 是 shorthand 用于 selectKey()+ groupByKey() 操作。

groupBy(/* select on field "a" */) 将在字段 "a" 上重新键入 tableB 并按该键分组。因此,现在您有一个以字段“a”作为键的 KGroupedTable。为了得到 KTable,你需要调用 .aggregate() 。这就是上面代码中发生的事情。

P.S。 .agg() 应重命名为 .aggregate()

一旦 tableB' 准备就绪,您可以使用以下代码加入 tableA

tableA.join(tableB', joiner);

这里的joiner指的是ValueJoiner实现。

示例

// Java 8+ example, using lambda expressions
KTable<String, String> joined = left.join(right, 
     /* Below line is ValueJoiner */
    (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue 
  );

目前,这是 Non-key 加入 KTables 的方式 您可以在文档中找到很好的解释:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join