KStreamWindowAggregate 似乎共享导致 windows 过期的流时间
KStreamWindowAggregate seems to share streamtime causing windows to expire
消息因过期而被丢弃 windows 即使对于那个特定的密钥 window 不应关闭
我想根据事件时间对从单个分区主题消耗的消息和 window 这些消息按 30 秒分组。为了避免立即处理,我调用了 suppress 方法并使用了 .grace 方法。一旦 windows 关闭(30 秒后 + 宽限期 0),我希望最终结果被添加到主题中。我从主题中消费的消息有两个不同的键:300483976 和 300485339。我消费的消息将事件时间增加了 10 秒。我读到流时间只会根据增加事件时间的新消息而增加。这也是我的经历。但是我看到的问题如下:
我使用了密钥 300483976 的前 10 条消息。根据方法 "KStreamWindowAggregate.process",我注意到 internalProcessorContext.streamTime() 每次都会根据最新使用的消息增加。处理完 10 条消息后,最终的事件时间现在是开始时间 + 300 秒。在那一刻之后,密钥 300485339 的消息被使用。除最新消息外,所有消息都标记为过期并与消息 "Skipping record for expired window." 一起丢弃。似乎 internalProcessorContext.streamTime() 仍然记得第一个 运行 的最新值,因此丢弃了密钥为 300485339 的消息。
stream
.groupByKey(Grouped.with(Serdes.String(), new DataSerde()))
.windowedBy(
TimeWindows.of(Duration.ofSeconds(30))
.grace(Duration.ofMillis(0))) // override the default of 24 hours
.aggregate(Data::new, transform(), materialize())
.filter((key, value) -> {
log.info("agg {} {}", key, value.toString());
return true;
})
.suppress(
Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream();
我预计,由于消息是按键(300483976 和 300485339)分组的,所以流时间不会是 "shared"。我希望密钥 300483976 和密钥 300485339 会有单独的 windows。知道哪里出了问题吗?我正在使用 kafka-streams 2.1.0 和一个从消息中的字段获取事件时间的时间戳提取器。
更新
我做了一些额外的测试并改编了一个不使用聚合的示例,但确实显示了与流时间相同的问题:
@Test
public void shouldSupportFinalResultsForTimeWindows() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Windowed<String>, Long> valueCounts = builder
.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
.groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
.windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(1L)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled());
valueCounts
.suppress(untilWindowCloses(unbounded()))
.toStream()
.map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
.to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
valueCounts
.toStream()
.map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
.to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
final Topology topology = builder.build();
System.out.println(topology.describe());
final ConsumerRecordFactory<String, String> recordFactory =
new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
driver.pipeInput(recordFactory.create("input", "k2", "v1", 7L));
// note this last records sets the streamtime to 7L causing the next messages to be discarded
driver.pipeInput(recordFactory.create("input", "k1", "v1", 2L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
}
}
在上面的示例中,第二条消息将流时间设置为 7L,使得创建的 window 0 到 2 被关闭,即使消息具有不同的密钥。这也会导致接下来的几条消息被丢弃,即使密钥是 k1。所以从这个例子中可以清楚地看出,密钥没有被考虑在内。如果这实际上是它的设计方式,我想知道这个场景是什么。特别是当我认为一个主题具有不同分区的消息并且一个分区可能具有与其他分区的流时间(源自事件时间)完全不同的消息时,这是很常见的。希望您能对此有所了解??
观察到的行为是设计使然。显然,stream-时间在所有消息中被跟踪(它不是 substream-time)。
您看到的 "problem" 是,您的输入数据是 out-of-order(只需输入 key 和 ts):
(k1, 1), (k1, 2), (k1, 3), (k2, 1), (k2, 2), (k3, 3)
时间不是单调递增的,即键为k2
的记录相对于键为k1
的记录为out-of-order。因为您将宽限期设置为零,所以您告诉 Kafka Streams 不允许无序数据(或者实际上只有 window 中的一些 out-of-order 数据)。因此,结果只会如您所料,对于具有交错键但单调递增时间戳的有序数据流:
(k1, 1), (k2, 1), (k1, 2), (k2, 2), (k1, 3), (k3, 3)
如果你有 out-of-order 数据,你应该相应地设置宽限期(零只适用于有序数据流)。
消息因过期而被丢弃 windows 即使对于那个特定的密钥 window 不应关闭
我想根据事件时间对从单个分区主题消耗的消息和 window 这些消息按 30 秒分组。为了避免立即处理,我调用了 suppress 方法并使用了 .grace 方法。一旦 windows 关闭(30 秒后 + 宽限期 0),我希望最终结果被添加到主题中。我从主题中消费的消息有两个不同的键:300483976 和 300485339。我消费的消息将事件时间增加了 10 秒。我读到流时间只会根据增加事件时间的新消息而增加。这也是我的经历。但是我看到的问题如下:
我使用了密钥 300483976 的前 10 条消息。根据方法 "KStreamWindowAggregate.process",我注意到 internalProcessorContext.streamTime() 每次都会根据最新使用的消息增加。处理完 10 条消息后,最终的事件时间现在是开始时间 + 300 秒。在那一刻之后,密钥 300485339 的消息被使用。除最新消息外,所有消息都标记为过期并与消息 "Skipping record for expired window." 一起丢弃。似乎 internalProcessorContext.streamTime() 仍然记得第一个 运行 的最新值,因此丢弃了密钥为 300485339 的消息。
stream
.groupByKey(Grouped.with(Serdes.String(), new DataSerde()))
.windowedBy(
TimeWindows.of(Duration.ofSeconds(30))
.grace(Duration.ofMillis(0))) // override the default of 24 hours
.aggregate(Data::new, transform(), materialize())
.filter((key, value) -> {
log.info("agg {} {}", key, value.toString());
return true;
})
.suppress(
Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream();
我预计,由于消息是按键(300483976 和 300485339)分组的,所以流时间不会是 "shared"。我希望密钥 300483976 和密钥 300485339 会有单独的 windows。知道哪里出了问题吗?我正在使用 kafka-streams 2.1.0 和一个从消息中的字段获取事件时间的时间戳提取器。
更新
我做了一些额外的测试并改编了一个不使用聚合的示例,但确实显示了与流时间相同的问题:
@Test
public void shouldSupportFinalResultsForTimeWindows() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Windowed<String>, Long> valueCounts = builder
.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
.groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
.windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(1L)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled());
valueCounts
.suppress(untilWindowCloses(unbounded()))
.toStream()
.map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
.to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
valueCounts
.toStream()
.map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
.to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
final Topology topology = builder.build();
System.out.println(topology.describe());
final ConsumerRecordFactory<String, String> recordFactory =
new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
driver.pipeInput(recordFactory.create("input", "k2", "v1", 7L));
// note this last records sets the streamtime to 7L causing the next messages to be discarded
driver.pipeInput(recordFactory.create("input", "k1", "v1", 2L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
}
}
在上面的示例中,第二条消息将流时间设置为 7L,使得创建的 window 0 到 2 被关闭,即使消息具有不同的密钥。这也会导致接下来的几条消息被丢弃,即使密钥是 k1。所以从这个例子中可以清楚地看出,密钥没有被考虑在内。如果这实际上是它的设计方式,我想知道这个场景是什么。特别是当我认为一个主题具有不同分区的消息并且一个分区可能具有与其他分区的流时间(源自事件时间)完全不同的消息时,这是很常见的。希望您能对此有所了解??
观察到的行为是设计使然。显然,stream-时间在所有消息中被跟踪(它不是 substream-time)。
您看到的 "problem" 是,您的输入数据是 out-of-order(只需输入 key 和 ts):
(k1, 1), (k1, 2), (k1, 3), (k2, 1), (k2, 2), (k3, 3)
时间不是单调递增的,即键为k2
的记录相对于键为k1
的记录为out-of-order。因为您将宽限期设置为零,所以您告诉 Kafka Streams 不允许无序数据(或者实际上只有 window 中的一些 out-of-order 数据)。因此,结果只会如您所料,对于具有交错键但单调递增时间戳的有序数据流:
(k1, 1), (k2, 1), (k1, 2), (k2, 2), (k1, 3), (k3, 3)
如果你有 out-of-order 数据,你应该相应地设置宽限期(零只适用于有序数据流)。