窗口化商店未填满(Spring Cloud Stream Kafka)
Windowed Store not getting filled (Spring Cloud Stream Kafka)
我正在尝试从一堆数据点创建一个窗口存储,但由于某种原因,流拓扑分支似乎没有被评估。
我使用相同的流来填充 KTable
物化到商店中,效果很好。
我正在使用具有以下配置的 Spring Cloud Streams:
spring:
application.name: stream-test
kafka.bootstrap-servers: localhost:9092
cloud.stream:
# assign group and topic name to binding
bindings:
windowedStream:
destination: myTopic
group: stream-test-window
kafka:
# configure kafka binder
binder:
brokers: ${spring.kafka.bootstrap-servers}
configuration.auto.offset.reset: latest
# kafka-streams specific binding configuration
streams.bindings.windowedStream.consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: kstreamstest.StreamSerdes$DataItemSerde
DataItemSerde
只是扩展的 JSON-Serde(也适用于 KTable
)。
@Data class DataItem {
String value;
}
public class StreamSerdes {
public static final Serde<DataItem> DATA_ITEM_SERDE = new DataItemSerde();
public static class DataItemSerde extends JsonSerde<DataItem> {}
}
与绑定
interface WindowedTableBinding {
String WINDOW_STREAM = "windowedStream";
@Input(WINDOW_STREAM)
KStream<String, DataItem> stream();
}
我这样创建一个流监听器
@Configuration
@EnableBinding(WindowedTableBinding.class)
class StreamToWindowed {
String storeName = "wvs";
@Bean
String windowedStoreName() {
return storeName;
}
@StreamListener(WindowedTableBinding.WINDOW_STREAM)
public void windowStream(@Input(WindowedTableBinding.WINDOW_STREAM) KStream<String, DataItem> stream) {
stream.peek((k, v) -> System.out.printf("%s: %s%n", k, v))
.groupByKey()
.windowedBy(TimeWindows.of(5_000))
.reduce((d1, d2) -> d2,
Materialized
.<String, DataItem, WindowStore<Bytes, byte[]>>as("wvs")
.withKeySerde(Serdes.String())
.withValueSerde(StreamSerdes.DATA_ITEM_SERDE));
}
}
但是,当我随后查询商店时
Set<String> getWindowedKeys() {
ReadOnlyWindowStore<String, DataItem> queryableStore = queryService
.getQueryableStore(windowedStoreName, QueryableStoreTypes.windowStore());
Set<String> result = new HashSet<>();
if (queryableStore != null) { // store is not null though
try (KeyValueIterator<Windowed<String>, DataItem> values = queryableStore.all()) {
values.forEachRemaining(kvs -> result.add(kvs.key.key()));
}
}
return result;
}
那个集合总是空的(当然是在我发送数据之后)。 System.out.print
语句也没有被触发,所以我猜分支根本没有被评估。
同样,我为相同的值并行建立了一个 KTable
,并且得到了很好的填充。我可以删除它,但窗口版仍然无法使用。
我确实看到了 this example,但我看到的唯一区别是它将数据写回到输出流,而我不想这样做。还有,加了也没用
我也试过了
@KafkaStreamsStateStore(name="wvs", type= KafkaStreamsStateStoreProperties.StoreType.WINDOW, lengthMs=5_000)
public void windowStream(@Input(WindowedTableBinding.WINDOW_STREAM) KStream<String, DataItem> stream) {}
但这没有什么区别。
我需要做什么来填充窗口数据存储?
像往常一样使用 Spring,结果证明是配置问题。
我需要为两个绑定提供单独的 application-id
:
spring.cloud.stream.kafka.streams.bindings:
tableStream.consumer:
application-id: table-generator
windowedStream.consumer:
application-id: windows-generator
我正在尝试从一堆数据点创建一个窗口存储,但由于某种原因,流拓扑分支似乎没有被评估。
我使用相同的流来填充 KTable
物化到商店中,效果很好。
我正在使用具有以下配置的 Spring Cloud Streams:
spring:
application.name: stream-test
kafka.bootstrap-servers: localhost:9092
cloud.stream:
# assign group and topic name to binding
bindings:
windowedStream:
destination: myTopic
group: stream-test-window
kafka:
# configure kafka binder
binder:
brokers: ${spring.kafka.bootstrap-servers}
configuration.auto.offset.reset: latest
# kafka-streams specific binding configuration
streams.bindings.windowedStream.consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: kstreamstest.StreamSerdes$DataItemSerde
DataItemSerde
只是扩展的 JSON-Serde(也适用于 KTable
)。
@Data class DataItem {
String value;
}
public class StreamSerdes {
public static final Serde<DataItem> DATA_ITEM_SERDE = new DataItemSerde();
public static class DataItemSerde extends JsonSerde<DataItem> {}
}
与绑定
interface WindowedTableBinding {
String WINDOW_STREAM = "windowedStream";
@Input(WINDOW_STREAM)
KStream<String, DataItem> stream();
}
我这样创建一个流监听器
@Configuration
@EnableBinding(WindowedTableBinding.class)
class StreamToWindowed {
String storeName = "wvs";
@Bean
String windowedStoreName() {
return storeName;
}
@StreamListener(WindowedTableBinding.WINDOW_STREAM)
public void windowStream(@Input(WindowedTableBinding.WINDOW_STREAM) KStream<String, DataItem> stream) {
stream.peek((k, v) -> System.out.printf("%s: %s%n", k, v))
.groupByKey()
.windowedBy(TimeWindows.of(5_000))
.reduce((d1, d2) -> d2,
Materialized
.<String, DataItem, WindowStore<Bytes, byte[]>>as("wvs")
.withKeySerde(Serdes.String())
.withValueSerde(StreamSerdes.DATA_ITEM_SERDE));
}
}
但是,当我随后查询商店时
Set<String> getWindowedKeys() {
ReadOnlyWindowStore<String, DataItem> queryableStore = queryService
.getQueryableStore(windowedStoreName, QueryableStoreTypes.windowStore());
Set<String> result = new HashSet<>();
if (queryableStore != null) { // store is not null though
try (KeyValueIterator<Windowed<String>, DataItem> values = queryableStore.all()) {
values.forEachRemaining(kvs -> result.add(kvs.key.key()));
}
}
return result;
}
那个集合总是空的(当然是在我发送数据之后)。 System.out.print
语句也没有被触发,所以我猜分支根本没有被评估。
同样,我为相同的值并行建立了一个 KTable
,并且得到了很好的填充。我可以删除它,但窗口版仍然无法使用。
我确实看到了 this example,但我看到的唯一区别是它将数据写回到输出流,而我不想这样做。还有,加了也没用
我也试过了
@KafkaStreamsStateStore(name="wvs", type= KafkaStreamsStateStoreProperties.StoreType.WINDOW, lengthMs=5_000)
public void windowStream(@Input(WindowedTableBinding.WINDOW_STREAM) KStream<String, DataItem> stream) {}
但这没有什么区别。
我需要做什么来填充窗口数据存储?
像往常一样使用 Spring,结果证明是配置问题。
我需要为两个绑定提供单独的 application-id
:
spring.cloud.stream.kafka.streams.bindings:
tableStream.consumer:
application-id: table-generator
windowedStream.consumer:
application-id: windows-generator