非键加入 GlobalTable vs Ktable-Ktable Join?
non-key join with GlobalTable vs Ktable-Ktable Join?
我很好奇为什么非键连接与 GlobalKTtable 和 KTable-KTable 一起工作?虽然我明白为什么我们不需要为 globalKTable (BroadCast Join)
进行联合分区,但我不明白是什么启用了非键连接?任何人都可以大致了解正在发生的事情吗?
GlobalKTable 和 KTable,都是 changelog 的抽象,但不同之处在于 KTable 是为每个分区的每个应用程序实例在本地创建的,而 GlobalKTable 是用每个应用程序实例上所有分区的全部数据填充的。它在每个应用程序实例上复制整个数据,这意味着整个数据集可用于在每个实例上进行查询。因此它不需要 co-partitioning,并且可以在整个 table 中进行查找。
在下面的例子中:
KStream<String, Long> left = ...; // // KStream has string type key
GlobalKTable<Integer, Double> right = ...; // GlobalKTable has integer type key
// Java 8+ example, using lambda expressions
KStream<String, String> joined = left.leftJoin(right,
(leftKey, leftValue) -> leftKey.length(), /* derive a (potentially) new key by which to lookup against the table */
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
);
Select 使用 KeyValueMapper 的左侧流中的键,您可以使用它在 GlobalKTable 中查找,如下所示:
(leftKey, leftValue) -> leftKey.length(), /* select a (potentially) new key by which to lookup against the table */
GlobalKTable 用于连接很方便,但价格昂贵,因为与 KTables 相比,它需要更多的存储空间,并且还会增加网络和 kafka 代理负载。
我很好奇为什么非键连接与 GlobalKTtable 和 KTable-KTable 一起工作?虽然我明白为什么我们不需要为 globalKTable (BroadCast Join)
进行联合分区,但我不明白是什么启用了非键连接?任何人都可以大致了解正在发生的事情吗?
GlobalKTable 和 KTable,都是 changelog 的抽象,但不同之处在于 KTable 是为每个分区的每个应用程序实例在本地创建的,而 GlobalKTable 是用每个应用程序实例上所有分区的全部数据填充的。它在每个应用程序实例上复制整个数据,这意味着整个数据集可用于在每个实例上进行查询。因此它不需要 co-partitioning,并且可以在整个 table 中进行查找。
在下面的例子中:
KStream<String, Long> left = ...; // // KStream has string type key
GlobalKTable<Integer, Double> right = ...; // GlobalKTable has integer type key
// Java 8+ example, using lambda expressions
KStream<String, String> joined = left.leftJoin(right,
(leftKey, leftValue) -> leftKey.length(), /* derive a (potentially) new key by which to lookup against the table */
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
);
Select 使用 KeyValueMapper 的左侧流中的键,您可以使用它在 GlobalKTable 中查找,如下所示:
(leftKey, leftValue) -> leftKey.length(), /* select a (potentially) new key by which to lookup against the table */
GlobalKTable 用于连接很方便,但价格昂贵,因为与 KTables 相比,它需要更多的存储空间,并且还会增加网络和 kafka 代理负载。