抑制窗口化 KTable 的输出时如何正确实施缓冲区配置?
How do I implement buffer config correctly when suppressing output from a windowed KTable?
我有一个 windowed KTable
按预期工作,但每次收到新值时它都会输出。我找到了 .suppress
运算符,它完全符合我的要求:仅在时间 window 结束时输出结果。我已将 grace
值添加到我的 TimeWindow
,但无法使 .suppress
与 windowed KTable
一起使用。
我在阅读 Apache's documentation 时发现 untilWindowCloses
是 Suppressed 接口的一个方法,这意味着我无法实例化 Suppressed
对象,对吗?我不确定如何以这种方式实现接口(在 windowed KTable
上 .suppress
的参数中)。
我觉得我好像漏掉了一些愚蠢的东西,但我已经搜索和搜索了,无法弄清楚。有什么想法吗?
TimeWindows window = TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofSeconds(10));
final KTable<Windowed<String>, GenericRecord> joinedKTable = groupedStream
.windowedBy(window)
.reduce(new Reducer<GenericRecord>() {
@Override
public GenericRecord apply(GenericRecord aggValue, GenericRecord newValue) {
//reduce code
}
})
.suppress(Suppressed.untilWindowCloses(unbounded())); //need help here
我正在使用 Eclipse,它告诉我 "The method unbounded() is undefined."
我做错了什么?
您需要静态导入 unbounded()
,或限定引用。
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
或
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
我有一个 windowed KTable
按预期工作,但每次收到新值时它都会输出。我找到了 .suppress
运算符,它完全符合我的要求:仅在时间 window 结束时输出结果。我已将 grace
值添加到我的 TimeWindow
,但无法使 .suppress
与 windowed KTable
一起使用。
我在阅读 Apache's documentation 时发现 untilWindowCloses
是 Suppressed 接口的一个方法,这意味着我无法实例化 Suppressed
对象,对吗?我不确定如何以这种方式实现接口(在 windowed KTable
上 .suppress
的参数中)。
我觉得我好像漏掉了一些愚蠢的东西,但我已经搜索和搜索了,无法弄清楚。有什么想法吗?
TimeWindows window = TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofSeconds(10));
final KTable<Windowed<String>, GenericRecord> joinedKTable = groupedStream
.windowedBy(window)
.reduce(new Reducer<GenericRecord>() {
@Override
public GenericRecord apply(GenericRecord aggValue, GenericRecord newValue) {
//reduce code
}
})
.suppress(Suppressed.untilWindowCloses(unbounded())); //need help here
我正在使用 Eclipse,它告诉我 "The method unbounded() is undefined."
我做错了什么?
您需要静态导入 unbounded()
,或限定引用。
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
或
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))