Kafka wordcount 不更新计数

Kafka wordcount not updating counts

我开始试验 Kafka Streams。我关注 https://kafka.apache.org/0110/documentation/streams/quickstart.

我的沙盒是 运行ning Ubuntu 16.04.2 LTS、Kafka 0.11.0.0 和 Scala 2.11.11.

如 Kafka Streams 快速入门指南中所述,以下是我遵循的步骤:

echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-file-input

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

使用后一个命令查看 streams-wordcount-output 时,我的标准输出显示如下:

all 1
streams 1
lead    1
to  1
kafka   1
hello   1
kafka   2
streams 2
join    1
kafka   3
summit  1

然后,在不中断 bin/kafka-console-consumer.sh 命令的情况下,我重新 运行 控制台生成器如下:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt

令我惊讶的是,标准输出没有改变以反映这个新增功能所带来的变化。在我的理解中,file-input.txt 被用来产生额外的数据,所以字数应该已经刷新(所有的标记现在应该被计算两次)。 我的推理有什么问题?

字数统计演示设计为在 5 秒后停止,如源代码所示:https://github.com/apache/kafka/blob/0.11.0.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java#L88

查看上述源代码的最新版本,了解 5 秒后不会停止,但仅在您按下 ctrl-c 时才停止:https://github.com/apache/kafka/blob/0.11.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java