连接从时间窗口派生的两个表时,时间窗口是否应该相同
Should TimeWindows be the same when joining two KTable dervied from TimeWindows
我为两个不同的 KTable 使用了两个不同的保留时间,它适用于 RocksDB States 和 changelog Kafka Topics。
KTable由KStream生成,groupBy
然后windowedBy
.
我相信加入 KStream
与窗口时,TimeWindows
是一样的。我想知道如果 TimeWindows
参数不同,当加入两个由 TimeWindows
窗口化的不同 KTable 时,会有好处还是坏处?
代码片段:
final KStream<Integer, String> eventStream = builder.stream("events",
Consumed.with(Serdes.Integer(), Serdes.String())
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST));
final KTable<Windowed<Integer>, String> eventWindowTable = eventStream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(60)).until(Duration.ofSeconds(100).toMillis()))
.reduce((oldValue, newValue) -> newValue);
final KStream<Integer, String> clickStream = builder.stream("clicks",
Consumed.with(Serdes.Integer(), Serdes.String())
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST));
final KTable<Windowed<Integer>, String> clickWindowTable = clickStream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).until(Duration.ofSeconds(70).toMillis()))
.reduce((oldValue, newValue) -> newValue);
final KTable<Windowed<Integer>, String> join = eventWindowTable.leftJoin(clickWindowTable,
(event, click) -> event + " ; " + click + " ; " + Instant.now()
);
最初我认为用不同的 TimeWindows
参数连接两个不同的 KTable 是行不通的,因为连接依赖于 TimeWindowedKey,一个时隙的键。但是经过测试,它也能正常工作。
执行连接是因为两个键的类型相同:Windowed<Integer>
。如果键相同,连接当然只会产生结果。假设您有以下 windows(请注意,只有 window 开始时间戳存储用于 TimeWindows
):
eventWindowTable: <A,0> <A,60>
clickWindowTable: <A,0> <A,30> <A,60> <A,90>
对于这种情况,只有 <A,0>
和 <A,60>
会加入。因此,具有不同的 windows 确实会影响您的结果,因为 window 开始时间戳是密钥的一部分并且一些 windows 永远不会加入(例如,<A,30>
和 <A,90>
在我们的例子中)。
我为两个不同的 KTable 使用了两个不同的保留时间,它适用于 RocksDB States 和 changelog Kafka Topics。
KTable由KStream生成,groupBy
然后windowedBy
.
我相信加入 KStream
与窗口时,TimeWindows
是一样的。我想知道如果 TimeWindows
参数不同,当加入两个由 TimeWindows
窗口化的不同 KTable 时,会有好处还是坏处?
代码片段:
final KStream<Integer, String> eventStream = builder.stream("events",
Consumed.with(Serdes.Integer(), Serdes.String())
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST));
final KTable<Windowed<Integer>, String> eventWindowTable = eventStream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(60)).until(Duration.ofSeconds(100).toMillis()))
.reduce((oldValue, newValue) -> newValue);
final KStream<Integer, String> clickStream = builder.stream("clicks",
Consumed.with(Serdes.Integer(), Serdes.String())
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST));
final KTable<Windowed<Integer>, String> clickWindowTable = clickStream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).until(Duration.ofSeconds(70).toMillis()))
.reduce((oldValue, newValue) -> newValue);
final KTable<Windowed<Integer>, String> join = eventWindowTable.leftJoin(clickWindowTable,
(event, click) -> event + " ; " + click + " ; " + Instant.now()
);
最初我认为用不同的 TimeWindows
参数连接两个不同的 KTable 是行不通的,因为连接依赖于 TimeWindowedKey,一个时隙的键。但是经过测试,它也能正常工作。
执行连接是因为两个键的类型相同:Windowed<Integer>
。如果键相同,连接当然只会产生结果。假设您有以下 windows(请注意,只有 window 开始时间戳存储用于 TimeWindows
):
eventWindowTable: <A,0> <A,60>
clickWindowTable: <A,0> <A,30> <A,60> <A,90>
对于这种情况,只有 <A,0>
和 <A,60>
会加入。因此,具有不同的 windows 确实会影响您的结果,因为 window 开始时间戳是密钥的一部分并且一些 windows 永远不会加入(例如,<A,30>
和 <A,90>
在我们的例子中)。