抑制窗口化 KTable 的输出时如何正确实施缓冲区配置?

How do I implement buffer config correctly when suppressing output from a windowed KTable?

我有一个 windowed KTable 按预期工作,但每次收到新值时它都会输出。我找到了 .suppress 运算符,它完全符合我的要求:仅在时间 window 结束时输出结果。我已将 grace 值添加到我的 TimeWindow,但无法使 .suppress 与 windowed KTable 一起使用。

我在阅读 Apache's documentation 时发现 untilWindowCloses 是 Suppressed 接口的一个方法,这意味着我无法实例化 Suppressed 对象,对吗?我不确定如何以这种方式实现接口(在 windowed KTable.suppress 的参数中)。

我觉得我好像漏掉了一些愚蠢的东西,但我已经搜索和搜索了,无法弄清楚。有什么想法吗?

TimeWindows window = TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofSeconds(10));

final KTable<Windowed<String>, GenericRecord> joinedKTable = groupedStream
    .windowedBy(window)
    .reduce(new Reducer<GenericRecord>() {
        @Override
        public GenericRecord apply(GenericRecord aggValue, GenericRecord newValue) {
            //reduce code
        }
    })
    .suppress(Suppressed.untilWindowCloses(unbounded())); //need help here

我正在使用 Eclipse,它告诉我 "The method unbounded() is undefined." 我做错了什么?

您需要静态导入 unbounded(),或限定引用。

import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;

.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))