具有自定义值类型和已知状态存储的 KStream 聚合
KStream aggregate with custom value type and known state store
我正在尝试使用字符串类型的键和自定义类型的值对流执行聚合 - 如下
stream.groupByKey(Grouped.with(Serdes.String(),barSerde))
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)))
.aggregate(Foo::new,
(String key, Bar bar, Foo foo) -> {
foo.updateMap(bar.getKey(), bar.getValue());
return foo;
},
Materialized.with(Serdes.String(), fooSerde));
我能够获得所需的结果,直到我开始在 Materialized
函数中指定状态存储,如下所示
stream.groupByKey(Grouped.with(Serdes.String(),barSerde))
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)))
.aggregate(Foo::new,
(String key, Bar bar, Foo foo) -> {
foo.updateMap(bar.getKey(), bar.getValue());
return foo;
},
Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as(storeTopic))
.withKeySerde(Serdes.String())
.withValueSerde(fooSerde));
我遇到编译错误
Error:(122, 17) java: no suitable method found for aggregate(Foo::new,(key,daBea[...]an; },org.apache.kafka.streams.kstream.Materialized<java.lang.String,com.test.bean.Foo,org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>>)
method org.apache.kafka.streams.kstream.TimeWindowedKStream.<VR>aggregate(org.apache.kafka.streams.kstream.Initializer<VR>,org.apache.kafka.streams.kstream.Aggregator<? super java.lang.String,? super com.test.bean.Bar,VR>) is not applicable
(cannot infer type-variable(s) VR
(actual and formal argument lists differ in length))
method org.apache.kafka.streams.kstream.TimeWindowedKStream.<VR>aggregate(org.apache.kafka.streams.kstream.Initializer<VR>,org.apache.kafka.streams.kstream.Aggregator<? super java.lang.String,? super com.test.bean.Bar,VR>,org.apache.kafka.streams.kstream.Materialized<java.lang.String,VR,org.apache.kafka.streams.state.WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>>) is not applicable
(cannot infer type-variable(s) VR
(argument mismatch; org.apache.kafka.streams.kstream.Materialized<java.lang.String,com.test.bean.Foo,org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> cannot be converted to org.apache.kafka.streams.kstream.Materialized<java.lang.String,VR,org.apache.kafka.streams.state.WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>>))
如何使用 Materialized
指定 Serdes 和 stateStore 主题?
因为您正在进行窗口聚合,预期的商店类型不是 KeyValueStore
,而是 WindowStore
。
我正在尝试使用字符串类型的键和自定义类型的值对流执行聚合 - 如下
stream.groupByKey(Grouped.with(Serdes.String(),barSerde))
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)))
.aggregate(Foo::new,
(String key, Bar bar, Foo foo) -> {
foo.updateMap(bar.getKey(), bar.getValue());
return foo;
},
Materialized.with(Serdes.String(), fooSerde));
我能够获得所需的结果,直到我开始在 Materialized
函数中指定状态存储,如下所示
stream.groupByKey(Grouped.with(Serdes.String(),barSerde))
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)))
.aggregate(Foo::new,
(String key, Bar bar, Foo foo) -> {
foo.updateMap(bar.getKey(), bar.getValue());
return foo;
},
Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as(storeTopic))
.withKeySerde(Serdes.String())
.withValueSerde(fooSerde));
我遇到编译错误
Error:(122, 17) java: no suitable method found for aggregate(Foo::new,(key,daBea[...]an; },org.apache.kafka.streams.kstream.Materialized<java.lang.String,com.test.bean.Foo,org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>>)
method org.apache.kafka.streams.kstream.TimeWindowedKStream.<VR>aggregate(org.apache.kafka.streams.kstream.Initializer<VR>,org.apache.kafka.streams.kstream.Aggregator<? super java.lang.String,? super com.test.bean.Bar,VR>) is not applicable
(cannot infer type-variable(s) VR
(actual and formal argument lists differ in length))
method org.apache.kafka.streams.kstream.TimeWindowedKStream.<VR>aggregate(org.apache.kafka.streams.kstream.Initializer<VR>,org.apache.kafka.streams.kstream.Aggregator<? super java.lang.String,? super com.test.bean.Bar,VR>,org.apache.kafka.streams.kstream.Materialized<java.lang.String,VR,org.apache.kafka.streams.state.WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>>) is not applicable
(cannot infer type-variable(s) VR
(argument mismatch; org.apache.kafka.streams.kstream.Materialized<java.lang.String,com.test.bean.Foo,org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> cannot be converted to org.apache.kafka.streams.kstream.Materialized<java.lang.String,VR,org.apache.kafka.streams.state.WindowStore<org.apache.kafka.common.utils.Bytes,byte[]>>))
如何使用 Materialized
指定 Serdes 和 stateStore 主题?
因为您正在进行窗口聚合,预期的商店类型不是 KeyValueStore
,而是 WindowStore
。