带窗口的 KTable 产生错误的类型
KTable with windowing produces wrong type
我在 Kafka 中创建带有时间窗口的 KTable 时遇到了一些问题。
我想创建一个 table 来计算流中 ID 的数量,就像这样。
ID (String) | Count (Long)
X | 5
Y | 6
Z | 7
等等。我希望能够使用 Kafka REST-API 获得 table,最好是 .json.
这是我目前的代码:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> streams = builder.stream(srcTopic);
KTable<Windowed<String>, Long> numCount = streams
.flatMapValues(value -> getID(value))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(windowSizeMs).advanceBy(advanceMs))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("foo"));
我现在面临的问题是 table 不是创建为 <String, Long>
,而是创建为 <String, String>
。这意味着我无法获得正确的计数,但我收到的是正确的密钥,但计数已损坏。我试图使用 Long.valueOf(value)
将其作为 Long
强制输出,但没有成功。我不知道如何从这里开始。我是否需要将 KTable 写入新主题?因为我希望 table 可以使用 kafka REST-API 进行查询,所以我认为不需要它,对吗? Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("foo")
应该使其可查询为 "foo",对吧?
KTable 创建了一个 changelog
主题,这足以使其可查询吗?还是我必须创建一个新主题才能写入?
我正在使用另一个 KStream 来验证输出。
KStream<String, String> streamOut = builder.stream(srcTopic);
streamOut.foreach((key, value) -> System.out.println(key + " => " + value));
它输出:
ID COUNT
2855 => ~
2857 => �
2859 => �
2861 => V(
2863 => �
2874 => �
2877 => J
2880 => �2
2891 => �=
不管怎样,我真的不想使用KStream 来收集输出,我想查询KTable。但如前所述,我不太了解查询的工作原理..
更新
设法让它与
一起工作
ReadOnlyWindowStore<String, Long> windowStore =
kafkaStreams.store("tst", QueryableStoreTypes.windowStore());
long timeFrom = 0;
long timeTo = System.currentTimeMillis(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.fetch("x", timeFrom, timeTo);
while (iterator.hasNext()) {
KeyValue<Long, Long> next = iterator.next();
long windowTimestamp = next.key;
System.out.println(windowTimestamp + ":" + next.value);
}
非常感谢,
KTable
的输出类型是 <Windowed<String>,String>
因为在 Kafka Streams 中多个 windows 并行维护以允许处理乱序数据。因此,不是情况,只有一个 window 实例,但并行有多个 window 实例。 (比照https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#hopping-time-windows)
保持"older" windows允许在数据迟到时更新它们。请注意,Kafka Streams 语义基于事件时间。
你仍然可以查询KTable
——你只需要知道你想查询什么window。
更新
JavaDoc 描述了如何查询 table:https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java#L94-L101
KafkaStreams streams = ... // counting words
Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());
String key = "some-word";
long fromTime = ...;
long toTime = ...;
WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)
我在 Kafka 中创建带有时间窗口的 KTable 时遇到了一些问题。
我想创建一个 table 来计算流中 ID 的数量,就像这样。
ID (String) | Count (Long)
X | 5
Y | 6
Z | 7
等等。我希望能够使用 Kafka REST-API 获得 table,最好是 .json.
这是我目前的代码:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> streams = builder.stream(srcTopic);
KTable<Windowed<String>, Long> numCount = streams
.flatMapValues(value -> getID(value))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(windowSizeMs).advanceBy(advanceMs))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("foo"));
我现在面临的问题是 table 不是创建为 <String, Long>
,而是创建为 <String, String>
。这意味着我无法获得正确的计数,但我收到的是正确的密钥,但计数已损坏。我试图使用 Long.valueOf(value)
将其作为 Long
强制输出,但没有成功。我不知道如何从这里开始。我是否需要将 KTable 写入新主题?因为我希望 table 可以使用 kafka REST-API 进行查询,所以我认为不需要它,对吗? Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("foo")
应该使其可查询为 "foo",对吧?
KTable 创建了一个 changelog
主题,这足以使其可查询吗?还是我必须创建一个新主题才能写入?
我正在使用另一个 KStream 来验证输出。
KStream<String, String> streamOut = builder.stream(srcTopic);
streamOut.foreach((key, value) -> System.out.println(key + " => " + value));
它输出:
ID COUNT
2855 => ~
2857 => �
2859 => �
2861 => V(
2863 => �
2874 => �
2877 => J
2880 => �2
2891 => �=
不管怎样,我真的不想使用KStream 来收集输出,我想查询KTable。但如前所述,我不太了解查询的工作原理..
更新
设法让它与
一起工作 ReadOnlyWindowStore<String, Long> windowStore =
kafkaStreams.store("tst", QueryableStoreTypes.windowStore());
long timeFrom = 0;
long timeTo = System.currentTimeMillis(); // now (in processing-time)
WindowStoreIterator<Long> iterator = windowStore.fetch("x", timeFrom, timeTo);
while (iterator.hasNext()) {
KeyValue<Long, Long> next = iterator.next();
long windowTimestamp = next.key;
System.out.println(windowTimestamp + ":" + next.value);
}
非常感谢,
KTable
的输出类型是 <Windowed<String>,String>
因为在 Kafka Streams 中多个 windows 并行维护以允许处理乱序数据。因此,不是情况,只有一个 window 实例,但并行有多个 window 实例。 (比照https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#hopping-time-windows)
保持"older" windows允许在数据迟到时更新它们。请注意,Kafka Streams 语义基于事件时间。
你仍然可以查询KTable
——你只需要知道你想查询什么window。
更新
JavaDoc 描述了如何查询 table:https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java#L94-L101
KafkaStreams streams = ... // counting words Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore()); String key = "some-word"; long fromTime = ...; long toTime = ...; WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)