Kafka 流字数统计应用程序
Kafka streams word count application
我正在使用 kafka 流 API(Kakfa 版本:0.10.2.0)尝试制作一个简单的 wordcount 示例:Wordcount App gist。我是 运行 生产者和控制台消费者:
./kafka-console-producer.sh -topic input-topic --broker-list localhost:9092
./kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning
启动应用程序,一切似乎都运行良好,但当我在控制台生产者中输入一些字符串时,消费者却什么也没收到。如果我将应用程序更改为对输入执行简单的 toUppercase,消费者将收到流(修改为大写)很好:
//The following code works fine:
val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues(_.toUpperCase())
uppercasedWithMapValues.to("output-topic")
有谁知道为什么我在字数统计示例中什么也没收到?我应该在消费者身上指定任何序列化程序吗?在我上次测试中,控制台消费者处理了我通过控制台发送的消息但没有显示它们,请参见下面的输出:
➜ bin ./kafka-console-consumer.sh \
--topic output-topic \
--bootstrap-server localhost:9092 \
--from-beginning
[2017-08-02 07:48:20,187]WARN Error while fetching metadata with correlation id 2 :
{output-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-08-02 07:48:20,197] WARN The following subscribed topics are not assigned
to any members in the group console-consumer-91651 : [output-topic]
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
^C共处理了7条消息
KStream
之所以有效,是因为它不使用缓存。对于 KTable
,您需要稍等片刻,或者将 cache.max.bytes.buffering
设置为 0
(但不是在生产代码中!)
我正在使用 kafka 流 API(Kakfa 版本:0.10.2.0)尝试制作一个简单的 wordcount 示例:Wordcount App gist。我是 运行 生产者和控制台消费者:
./kafka-console-producer.sh -topic input-topic --broker-list localhost:9092
./kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning
启动应用程序,一切似乎都运行良好,但当我在控制台生产者中输入一些字符串时,消费者却什么也没收到。如果我将应用程序更改为对输入执行简单的 toUppercase,消费者将收到流(修改为大写)很好:
//The following code works fine:
val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues(_.toUpperCase())
uppercasedWithMapValues.to("output-topic")
有谁知道为什么我在字数统计示例中什么也没收到?我应该在消费者身上指定任何序列化程序吗?在我上次测试中,控制台消费者处理了我通过控制台发送的消息但没有显示它们,请参见下面的输出:
➜ bin ./kafka-console-consumer.sh \
--topic output-topic \
--bootstrap-server localhost:9092 \
--from-beginning
[2017-08-02 07:48:20,187]WARN Error while fetching metadata with correlation id 2 :
{output-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-08-02 07:48:20,197] WARN The following subscribed topics are not assigned
to any members in the group console-consumer-91651 : [output-topic]
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
^C共处理了7条消息
KStream
之所以有效,是因为它不使用缓存。对于 KTable
,您需要稍等片刻,或者将 cache.max.bytes.buffering
设置为 0
(但不是在生产代码中!)