如何避免在 Kafka 流中创建变更日志?

How can I avoid the creation of changelogs in Kafka streams?

我试图避免使用 inMemoryWindowStore(我使用的是 Kafka 2.3.0 和 Streams DSL)在 Kafka 流中创建变更日志主题,我也在调用 withLoggingDisabled() 但不知何故当应用程序开始创建并使用更改日志主题,因为我可以看到其中的数据。我究竟做错了什么?如何避免创建更新日志?

    WindowBytesStoreSupplier storeSupplier = Stores.inMemoryWindowStore("in-mem-store-" + index,
            Duration.ofSeconds(windowRetentionPeriodInSeconds),
            Duration.ofSeconds(aggregationWindowSizeInSeconds),
            false);

    myStream.filter((key, val) -> val!=null)
            .selectKey((key, val) -> val.getId())
            .groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new MyDtoSerde()))
            .aggregate(MyDto::new,
                    new MyUpdater(),
                    Materialized.as(storeSupplier)
                            .withCachingDisabled()
                            .withLoggingDisabled()
                            .with(Serdes.String(), new MyDtoSerde()))

正如 Bill Bejeck here 所解释的那样,在 2.3.0 中使用 Materialized 的静态方法有点棘手。

我是这样解决问题的:

    Materialized<String, MyDto, WindowStore<Bytes, byte[]>> materialized;
    materialized = Materialized.with(Serdes.String(), new MyDtoSerde());
    if (withLoggingDisabled) {
        materialized.withLoggingDisabled();
    }

    myStream.filter((key, val) -> val!=null)
        .selectKey((key, val) -> val.getId())
        .groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new MyDtoSerde()))
        .windowedBy(TimeWindows.of(Duration.ofSeconds(aggregationWindowSizeInSeconds))
                   .grace(Duration.ofSeconds(windowRetentionPeriodInSeconds)))
        .aggregate(MyDto::new,
                   new MyUpdater(),
                   materialized)