kafka 流跳跃窗口聚合在时间戳零处导致多个 windows
kafka streams hopping windowed aggregation causing multiple windows at timestamp zero
Kafka Streams DSL windowed 聚合导致多个 windows.
@StreamListener("input")
public void process(KStream<String, Data> DataKStream) {
JsonSerde<DataAggregator> DataJsonSerde =
new JsonSerde<>(DataAggregator.class);
DataKStream
.groupByKey()
.windowedBy(TimeWindows.of(60000).advanceBy(30000))
.aggregate(
DataAggregator::new,
(key, Data, aggregator) -> aggregator.add(Data),
Materialized.with(Serdes.String(), DataJsonSerde)
);
}
DataAggregator.java
public class DataAggregator {
private List<String> dataList = new ArrayList<>();
public DataAggregator add(Data data) {
dataList.add(data.getId());
System.out.println(dataList);
return this;
}
public List<String> getDataList() {
return dataList;
}
}
我根据键对输入数据进行分组,然后进行 1 分钟 window 跳转 30 秒,在聚合器中我只是收集数据并显示。
我在开始时期待 1 window,在 30 秒后又期待另一个 window。但是实际输出是不同的,因为开始本身 2 windows 正在创建。
预期:
[1]
[1, 2]
[1, 2, 3]
[1, 2, 3, 4]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5, 6] // till 30 seconds only one window
[6] // new window after 30 seconds
[1, 2, 3, 4, 5, 6, 7]
[6, 7]
[1, 2, 3, 4, 5, 6, 7, 8]
[6, 7, 8]
实际输出:
[1]
[1]
[1, 2]
[1, 2]
[1, 2, 3]
[1, 2, 3]
[1, 2, 3, 4]
[1, 2, 3, 4]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5, 6]
[1, 2, 3, 4, 5, 6] // duplicate window even before 30 seconds
[6] // new window after 30 seconds and 1 window from earlier will be dropped
[1, 2, 3, 4, 5, 6, 7]
[6, 7]
因为我希望 window 在 1 分钟内达到 30 秒 window。我相信,最初应该只有一个 window,30 秒后应该创建另一个 window。
有人可以告诉我,实际输出是预期的行为还是我遗漏了什么?
注意: 我每 4 秒获取一次输入数据,expected/actual 输出仅用于表示。
来自 Kafka 文档:
Hopping time windows are aligned to the epoch, with the lower interval
bound being inclusive and the upper bound being exclusive. “Aligned to
the epoch” means that the first window starts at timestamp zero. For
example, hopping windows with a size of 5000ms and an advance interval
(“hop”) of 3000ms have predictable window boundaries
[0;5000),[3000;8000),... — and not [1000;6000),[4000;9000),... or even
something “random” like [1452;6452),[4452;9452),....
因为您的 windows 重叠,每个时间戳会得到多个 windows。对于您的特定 window 配置,您总是会得到 2 windows(以毫秒为单位):
[0,60000) [60000,12000) [12000,18000) ...
[30000,90000) [90000,15000) ...
您无法更改此行为,但是,您可以对结果应用 filter()
(即 aggregate(...).filter(...)
以删除您不感兴趣的 windows。
此外,默认情况下,记录事件时间由 Kafka Streams 使用。有一个 WallclockTimestampExtractor
但它仅在您明确设置时使用。比照。 https://docs.confluent.io/current/streams/developer-guide/config-streams.html#default-timestamp-extractor
Kafka Streams DSL windowed 聚合导致多个 windows.
@StreamListener("input")
public void process(KStream<String, Data> DataKStream) {
JsonSerde<DataAggregator> DataJsonSerde =
new JsonSerde<>(DataAggregator.class);
DataKStream
.groupByKey()
.windowedBy(TimeWindows.of(60000).advanceBy(30000))
.aggregate(
DataAggregator::new,
(key, Data, aggregator) -> aggregator.add(Data),
Materialized.with(Serdes.String(), DataJsonSerde)
);
}
DataAggregator.java
public class DataAggregator {
private List<String> dataList = new ArrayList<>();
public DataAggregator add(Data data) {
dataList.add(data.getId());
System.out.println(dataList);
return this;
}
public List<String> getDataList() {
return dataList;
}
}
我根据键对输入数据进行分组,然后进行 1 分钟 window 跳转 30 秒,在聚合器中我只是收集数据并显示。
我在开始时期待 1 window,在 30 秒后又期待另一个 window。但是实际输出是不同的,因为开始本身 2 windows 正在创建。
预期:
[1]
[1, 2]
[1, 2, 3]
[1, 2, 3, 4]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5, 6] // till 30 seconds only one window
[6] // new window after 30 seconds
[1, 2, 3, 4, 5, 6, 7]
[6, 7]
[1, 2, 3, 4, 5, 6, 7, 8]
[6, 7, 8]
实际输出:
[1]
[1]
[1, 2]
[1, 2]
[1, 2, 3]
[1, 2, 3]
[1, 2, 3, 4]
[1, 2, 3, 4]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5, 6]
[1, 2, 3, 4, 5, 6] // duplicate window even before 30 seconds
[6] // new window after 30 seconds and 1 window from earlier will be dropped
[1, 2, 3, 4, 5, 6, 7]
[6, 7]
因为我希望 window 在 1 分钟内达到 30 秒 window。我相信,最初应该只有一个 window,30 秒后应该创建另一个 window。
有人可以告诉我,实际输出是预期的行为还是我遗漏了什么?
注意: 我每 4 秒获取一次输入数据,expected/actual 输出仅用于表示。
来自 Kafka 文档:
Hopping time windows are aligned to the epoch, with the lower interval bound being inclusive and the upper bound being exclusive. “Aligned to the epoch” means that the first window starts at timestamp zero. For example, hopping windows with a size of 5000ms and an advance interval (“hop”) of 3000ms have predictable window boundaries [0;5000),[3000;8000),... — and not [1000;6000),[4000;9000),... or even something “random” like [1452;6452),[4452;9452),....
因为您的 windows 重叠,每个时间戳会得到多个 windows。对于您的特定 window 配置,您总是会得到 2 windows(以毫秒为单位):
[0,60000) [60000,12000) [12000,18000) ...
[30000,90000) [90000,15000) ...
您无法更改此行为,但是,您可以对结果应用 filter()
(即 aggregate(...).filter(...)
以删除您不感兴趣的 windows。
此外,默认情况下,记录事件时间由 Kafka Streams 使用。有一个 WallclockTimestampExtractor
但它仅在您明确设置时使用。比照。 https://docs.confluent.io/current/streams/developer-guide/config-streams.html#default-timestamp-extractor