Kafka Streams 在 countByKey 之后没有写出预期的结果
Kafka Streams not writing expected result after countByKey
使用 Kafka Streams(版本 0.10.0.1)和 Kafka Broker(0.10.0.1)我正在尝试根据消息键生成计数。我使用以下命令生成消息:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-streams-topic --property parse.key=true --property key.separator=,
当我运行上面的命令时,我可以像这样发送一个键和值:
1,{"value":10}
这将向 kafka 发送一条消息,该消息的键 = 1,值 = {"value":10}。
然后我的目标是计算有多少条消息具有 key=1。鉴于上述命令,计数将为 1.
这是我使用的代码:
public class StreamProcessor {
public static void main(String[] args) {
KStreamBuilder builder = new KStreamBuilder();
final Serde<Long> longSerde = Serdes.Long();
final Serde<String> stringSerde = Serdes.String();
KStream<String, String> values = builder.stream(stringSerde, stringSerde, "kafka-streams-topic");
KStream<String, Long> counts = values
.countByKey(stringSerde, "valueCounts")
.toStream();
counts.print(stringSerde, longSerde);
counts.to(stringSerde, longSerde, "message-counts-topic");
KafkaStreams streams = new KafkaStreams(builder, properties());
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static Properties properties() {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-poc");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return streamsConfiguration;
}
}
当我 运行 counts.print(stringSerde,longSerde) 我得到:
1 , 1
意思是我有一个密钥=1,他们有 1 条消息有那个密钥。这就是我所期望的。
但是,当以下行 运行s:
counts.to(stringSerde, longSerde, "message-counts-topic");
名为 message-counts-topic 的主题收到一条消息发送给它,但是当我尝试使用此命令阅读消息时:
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic message-counts-topic --property print.key=true --property key.separator=, --from-beginning
我得到以下输出:
1 ,
其中 1 是键,值不显示任何内容。我希望看到消息 1 , 1。但由于某种原因,计数值丢失了,即使它在调用 print 方法时显示。
您需要为 bin/kafka-console-consumer.sh
指定一个不同的值反序列化器。添加以下内容:
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
默认字符串反序列化器无法正确读取 long 值。
使用 Kafka Streams(版本 0.10.0.1)和 Kafka Broker(0.10.0.1)我正在尝试根据消息键生成计数。我使用以下命令生成消息:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-streams-topic --property parse.key=true --property key.separator=,
当我运行上面的命令时,我可以像这样发送一个键和值:
1,{"value":10}
这将向 kafka 发送一条消息,该消息的键 = 1,值 = {"value":10}。
然后我的目标是计算有多少条消息具有 key=1。鉴于上述命令,计数将为 1.
这是我使用的代码:
public class StreamProcessor {
public static void main(String[] args) {
KStreamBuilder builder = new KStreamBuilder();
final Serde<Long> longSerde = Serdes.Long();
final Serde<String> stringSerde = Serdes.String();
KStream<String, String> values = builder.stream(stringSerde, stringSerde, "kafka-streams-topic");
KStream<String, Long> counts = values
.countByKey(stringSerde, "valueCounts")
.toStream();
counts.print(stringSerde, longSerde);
counts.to(stringSerde, longSerde, "message-counts-topic");
KafkaStreams streams = new KafkaStreams(builder, properties());
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static Properties properties() {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-poc");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return streamsConfiguration;
}
}
当我 运行 counts.print(stringSerde,longSerde) 我得到:
1 , 1
意思是我有一个密钥=1,他们有 1 条消息有那个密钥。这就是我所期望的。
但是,当以下行 运行s:
counts.to(stringSerde, longSerde, "message-counts-topic");
名为 message-counts-topic 的主题收到一条消息发送给它,但是当我尝试使用此命令阅读消息时:
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic message-counts-topic --property print.key=true --property key.separator=, --from-beginning
我得到以下输出:
1 ,
其中 1 是键,值不显示任何内容。我希望看到消息 1 , 1。但由于某种原因,计数值丢失了,即使它在调用 print 方法时显示。
您需要为 bin/kafka-console-consumer.sh
指定一个不同的值反序列化器。添加以下内容:
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
默认字符串反序列化器无法正确读取 long 值。