如何避免在 Kafka 流中创建变更日志?
How can I avoid the creation of changelogs in Kafka streams?
我试图避免使用 inMemoryWindowStore
(我使用的是 Kafka 2.3.0 和 Streams DSL)在 Kafka 流中创建变更日志主题,我也在调用 withLoggingDisabled()
但不知何故当应用程序开始创建并使用更改日志主题,因为我可以看到其中的数据。我究竟做错了什么?如何避免创建更新日志?
WindowBytesStoreSupplier storeSupplier = Stores.inMemoryWindowStore("in-mem-store-" + index,
Duration.ofSeconds(windowRetentionPeriodInSeconds),
Duration.ofSeconds(aggregationWindowSizeInSeconds),
false);
myStream.filter((key, val) -> val!=null)
.selectKey((key, val) -> val.getId())
.groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new MyDtoSerde()))
.aggregate(MyDto::new,
new MyUpdater(),
Materialized.as(storeSupplier)
.withCachingDisabled()
.withLoggingDisabled()
.with(Serdes.String(), new MyDtoSerde()))
正如 Bill Bejeck here 所解释的那样,在 2.3.0 中使用 Materialized 的静态方法有点棘手。
我是这样解决问题的:
Materialized<String, MyDto, WindowStore<Bytes, byte[]>> materialized;
materialized = Materialized.with(Serdes.String(), new MyDtoSerde());
if (withLoggingDisabled) {
materialized.withLoggingDisabled();
}
myStream.filter((key, val) -> val!=null)
.selectKey((key, val) -> val.getId())
.groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new MyDtoSerde()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(aggregationWindowSizeInSeconds))
.grace(Duration.ofSeconds(windowRetentionPeriodInSeconds)))
.aggregate(MyDto::new,
new MyUpdater(),
materialized)
我试图避免使用 inMemoryWindowStore
(我使用的是 Kafka 2.3.0 和 Streams DSL)在 Kafka 流中创建变更日志主题,我也在调用 withLoggingDisabled()
但不知何故当应用程序开始创建并使用更改日志主题,因为我可以看到其中的数据。我究竟做错了什么?如何避免创建更新日志?
WindowBytesStoreSupplier storeSupplier = Stores.inMemoryWindowStore("in-mem-store-" + index,
Duration.ofSeconds(windowRetentionPeriodInSeconds),
Duration.ofSeconds(aggregationWindowSizeInSeconds),
false);
myStream.filter((key, val) -> val!=null)
.selectKey((key, val) -> val.getId())
.groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new MyDtoSerde()))
.aggregate(MyDto::new,
new MyUpdater(),
Materialized.as(storeSupplier)
.withCachingDisabled()
.withLoggingDisabled()
.with(Serdes.String(), new MyDtoSerde()))
正如 Bill Bejeck here 所解释的那样,在 2.3.0 中使用 Materialized 的静态方法有点棘手。
我是这样解决问题的:
Materialized<String, MyDto, WindowStore<Bytes, byte[]>> materialized;
materialized = Materialized.with(Serdes.String(), new MyDtoSerde());
if (withLoggingDisabled) {
materialized.withLoggingDisabled();
}
myStream.filter((key, val) -> val!=null)
.selectKey((key, val) -> val.getId())
.groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new MyDtoSerde()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(aggregationWindowSizeInSeconds))
.grace(Duration.ofSeconds(windowRetentionPeriodInSeconds)))
.aggregate(MyDto::new,
new MyUpdater(),
materialized)