在 Kafka Streams 应用程序中,不再写入第二个输出流
In Kafka Streams Application the second output stream is not written anymore
我目前正在实施一个 Kafka Streams 应用程序,我正在阅读一个主题并进行一些处理。在处理过程中,我将其分成两个流,一个写入一个主题(Avro Schema),另一个是计数聚合(字数统计),将 Key/Value 对 (String/Long) 写入另一个主题。代码之前运行良好,但最近第二个流不再写了。
在此代码示例中:
KStream<String, ProcessedSentence> sentenceKStream = stream
.map((k,v) -> {
[...]
});
// configure serializers for publishing to topic
final Serde<ProcessedSentence> valueProcessedSentence = new SpecificAvroSerde<>();
valueProcessedSentence.configure(serdeConfig, false);
stringSerde.configure(serdeConfig, true);
// write to Specific Avro Record
sentenceKStream
.to(EnvReader.KAFKA_SENTENCES_TOPIC_NAME_OUT,
Produced.with(
Serdes.String(),
valueProcessedSentence));
句子流(sentenceKStream
)写对了,但是字数分组出现问题:
KStream<String, Long> wordCountKStream =
sentenceKStream.flatMap((key, processedSentence) -> {
List<KeyValue<String, Long>> result = new LinkedList<>();
Map<CharSequence, Long> words = processedSentence.getWords();
for (CharSequence word: words.keySet() ) {
result.add(KeyValue.pair(word.toString(), words.get(word)));
}
return result;
})
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream();
// write to Specific Avro Record
wordCountKStream
.to(EnvReader.KAFKA_WORDS_COUNT_TOPIC_NAME_OUT,
Produced.with(
Serdes.String(),
Serdes.Long()));
真不明白为什么wordCountKStream
不写了
也许有人可以提供一些帮助?我很乐意提供更多详细信息!
非常感谢
更新: 我发现两个新的输出流中都缺少数据。实际上,所有内容都已正确写入,但在写入数据几分钟后,两个主题中的数据都被删除了(剩余 0 字节)。
它与实现本身无关。我刚刚使用
删除了所有主题偏移量
kafka-consumer-groups.sh --bootstrap-server [broker:port] --delete-offsets --group [group_name] --topic [topic_name]
解决了问题。刚刚存储的偏移量出现问题,并与调试过程中流应用程序的多次重启发生冲突。
对于那些想要列出组以查找存储的主题位置的人,请致电
kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
更新:不幸的是,组偏移量的删除也没有正常工作。实际问题是,输出主题中新条目的时间戳是原始主题(已使用)中的时间戳,根本没有改变。因此,新条目携带的时间戳早于默认保留时间。
由于消费主题的 rentention.ms 为 -1(永远保留数据),而新主题的标准是,我认为,6 天,消费主题中的条目仍然存在,但那些生成的主题总是被删除,因为它们超过 6 天。
最终解决方案 是将输出主题的 retention.ms 更改为 -1。
提示: 对于 Streams 应用程序,建议使用 Application Reset Tool 而不是手动 reset/deletion 偏移量,如上所示。
我目前正在实施一个 Kafka Streams 应用程序,我正在阅读一个主题并进行一些处理。在处理过程中,我将其分成两个流,一个写入一个主题(Avro Schema),另一个是计数聚合(字数统计),将 Key/Value 对 (String/Long) 写入另一个主题。代码之前运行良好,但最近第二个流不再写了。
在此代码示例中:
KStream<String, ProcessedSentence> sentenceKStream = stream
.map((k,v) -> {
[...]
});
// configure serializers for publishing to topic
final Serde<ProcessedSentence> valueProcessedSentence = new SpecificAvroSerde<>();
valueProcessedSentence.configure(serdeConfig, false);
stringSerde.configure(serdeConfig, true);
// write to Specific Avro Record
sentenceKStream
.to(EnvReader.KAFKA_SENTENCES_TOPIC_NAME_OUT,
Produced.with(
Serdes.String(),
valueProcessedSentence));
句子流(sentenceKStream
)写对了,但是字数分组出现问题:
KStream<String, Long> wordCountKStream =
sentenceKStream.flatMap((key, processedSentence) -> {
List<KeyValue<String, Long>> result = new LinkedList<>();
Map<CharSequence, Long> words = processedSentence.getWords();
for (CharSequence word: words.keySet() ) {
result.add(KeyValue.pair(word.toString(), words.get(word)));
}
return result;
})
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream();
// write to Specific Avro Record
wordCountKStream
.to(EnvReader.KAFKA_WORDS_COUNT_TOPIC_NAME_OUT,
Produced.with(
Serdes.String(),
Serdes.Long()));
真不明白为什么wordCountKStream
不写了
也许有人可以提供一些帮助?我很乐意提供更多详细信息!
非常感谢
更新: 我发现两个新的输出流中都缺少数据。实际上,所有内容都已正确写入,但在写入数据几分钟后,两个主题中的数据都被删除了(剩余 0 字节)。
它与实现本身无关。我刚刚使用
删除了所有主题偏移量kafka-consumer-groups.sh --bootstrap-server [broker:port] --delete-offsets --group [group_name] --topic [topic_name]
解决了问题。刚刚存储的偏移量出现问题,并与调试过程中流应用程序的多次重启发生冲突。
对于那些想要列出组以查找存储的主题位置的人,请致电
kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
更新:不幸的是,组偏移量的删除也没有正常工作。实际问题是,输出主题中新条目的时间戳是原始主题(已使用)中的时间戳,根本没有改变。因此,新条目携带的时间戳早于默认保留时间。
由于消费主题的 rentention.ms 为 -1(永远保留数据),而新主题的标准是,我认为,6 天,消费主题中的条目仍然存在,但那些生成的主题总是被删除,因为它们超过 6 天。
最终解决方案 是将输出主题的 retention.ms 更改为 -1。
提示: 对于 Streams 应用程序,建议使用 Application Reset Tool 而不是手动 reset/deletion 偏移量,如上所示。