Kafka 流 DSL:窗口聚合的应用程序滞后
Kafka stream DSL: Application lags for windowed aggregation
我们有以下用例:
从主题中读取(预期吞吐量是一个键每 2 秒记录一次),groupByKey 并进行 windowed 聚合 30 分钟 window,跳跃周期为 1 分钟。
聚合只是附加收到的记录。
当应用程序启动时一切正常,但在后期聚合大小增加时应用程序变慢和滞后
拓扑:
KStream<String, Foo> numericStream = builder.stream("topic", Consumed.with(Serdes.String(), FooSerde));
static Duration WINDOW_MS = Duration.ofMinutes(30);
static Duration ADVANCE_MS = Duration.ofMinutes(15);
KStream<Windowed<String>, Foo1> windowedStream = numericStream.peek((key, value) -> System.out.println(value.getDateTime()))
.groupByKey()
.windowedBy((TimeWindows.of(WINDOW_MS).advanceBy(ADVANCE_MS)).grace(Duration.ofMillis(30)))
.aggregate(new Initializer<Foo1>() {
@Override
public Foo1 apply() {
return new Foo1();
}},
(key, value, aggregate) -> {
aggregate.append(value);
return aggregate;
},
Materialized.<String, Foo1, WindowStore<Bytes,byte[]>>as("some_name").withValueSerde(Foo1Serde))
.toStream()
.peek((key, value) -> System.out.println(" Key: "+key+ " Start: "+getISTTime(key.window().start()) + " End: "+ getISTTime(key.window().end()) +" Count: " + value.getCount() ));
每条记录的大小约为 20KB。当聚合大小超过 10MB 左右时,记录的处理时间超过 2 秒,因此延迟。
COMMIT_INTERVAL_MS_CONFIG 设置为 0,因为状态存储应始终与最新的数据包保持同步,并且查询状态存储的时间间隔不同。
如何消除应用程序的延迟,是否与 RocksDB I/O 操作有关?因为计数操作而不是聚合没有任何延迟
每个主题有 3 个分区,但是具有相同键的记录进入同一分区,所以 threading/multiple 个实例有帮助吗?
我们也在考虑在没有窗口化的情况下这样做,windowing 是否会为较大的聚合造成这种滞后?
由于您向 RocksDB 写入和读取越来越大的数据,它可能会减慢处理速度。
是的,在一个实例中使用三个线程或以一个线程启动三个实例在这种情况下也可能有所帮助。使用您的拓扑和三个分区,处理分布在三个任务上。如果您只有一个实例和一个线程,则所有三个任务将由同一个线程 运行。您可以通过指定一个具有三个线程的实例来向上扩展,或者您可以通过在不同的计算节点上启动三个具有一个线程的实例来进行横向扩展。两个实例之间的设置,一个有两个线程,另一个有一个线程也可以。
没有窗口化,聚合将永远不会过期,也永远不会从状态存储中删除。因此,状态存储中的数据将无限增长,并可能减慢状态存储的速度。
如果您使用交互式查询来查询状态存储,则不需要将 COMMIT_INTERVAL_MS_CONFIG 设置为 0,因为交互式查询也会查询状态存储前面的缓存。实际上,将 COMMIT_INTERVAL_MS_CONFIG 设置为零也可能会减慢您的处理速度,因为它会增加磁盘 I/O,因为您不断地将数据写入磁盘。
我们有以下用例:
从主题中读取(预期吞吐量是一个键每 2 秒记录一次),groupByKey 并进行 windowed 聚合 30 分钟 window,跳跃周期为 1 分钟。 聚合只是附加收到的记录。
当应用程序启动时一切正常,但在后期聚合大小增加时应用程序变慢和滞后
拓扑:
KStream<String, Foo> numericStream = builder.stream("topic", Consumed.with(Serdes.String(), FooSerde));
static Duration WINDOW_MS = Duration.ofMinutes(30);
static Duration ADVANCE_MS = Duration.ofMinutes(15);
KStream<Windowed<String>, Foo1> windowedStream = numericStream.peek((key, value) -> System.out.println(value.getDateTime()))
.groupByKey()
.windowedBy((TimeWindows.of(WINDOW_MS).advanceBy(ADVANCE_MS)).grace(Duration.ofMillis(30)))
.aggregate(new Initializer<Foo1>() {
@Override
public Foo1 apply() {
return new Foo1();
}},
(key, value, aggregate) -> {
aggregate.append(value);
return aggregate;
},
Materialized.<String, Foo1, WindowStore<Bytes,byte[]>>as("some_name").withValueSerde(Foo1Serde))
.toStream()
.peek((key, value) -> System.out.println(" Key: "+key+ " Start: "+getISTTime(key.window().start()) + " End: "+ getISTTime(key.window().end()) +" Count: " + value.getCount() ));
每条记录的大小约为 20KB。当聚合大小超过 10MB 左右时,记录的处理时间超过 2 秒,因此延迟。
COMMIT_INTERVAL_MS_CONFIG 设置为 0,因为状态存储应始终与最新的数据包保持同步,并且查询状态存储的时间间隔不同。
如何消除应用程序的延迟,是否与 RocksDB I/O 操作有关?因为计数操作而不是聚合没有任何延迟
每个主题有 3 个分区,但是具有相同键的记录进入同一分区,所以 threading/multiple 个实例有帮助吗?
我们也在考虑在没有窗口化的情况下这样做,windowing 是否会为较大的聚合造成这种滞后?
由于您向 RocksDB 写入和读取越来越大的数据,它可能会减慢处理速度。
是的,在一个实例中使用三个线程或以一个线程启动三个实例在这种情况下也可能有所帮助。使用您的拓扑和三个分区,处理分布在三个任务上。如果您只有一个实例和一个线程,则所有三个任务将由同一个线程 运行。您可以通过指定一个具有三个线程的实例来向上扩展,或者您可以通过在不同的计算节点上启动三个具有一个线程的实例来进行横向扩展。两个实例之间的设置,一个有两个线程,另一个有一个线程也可以。
没有窗口化,聚合将永远不会过期,也永远不会从状态存储中删除。因此,状态存储中的数据将无限增长,并可能减慢状态存储的速度。
如果您使用交互式查询来查询状态存储,则不需要将 COMMIT_INTERVAL_MS_CONFIG 设置为 0,因为交互式查询也会查询状态存储前面的缓存。实际上,将 COMMIT_INTERVAL_MS_CONFIG 设置为零也可能会减慢您的处理速度,因为它会增加磁盘 I/O,因为您不断地将数据写入磁盘。