在 Kafka Streams 应用程序中,不再写入第二个输出流

In Kafka Streams Application the second output stream is not written anymore

我目前正在实施一个 Kafka Streams 应用程序,我正在阅读一个主题并进行一些处理。在处理过程中,我将其分成两个流,一个写入一个主题(Avro Schema),另一个是计数聚合(字数统计),将 Key/Value 对 (String/Long) 写入另一个主题。代码之前运行良好,但最近第二个流不再写了。

在此代码示例中:

KStream<String, ProcessedSentence> sentenceKStream = stream
        .map((k,v) -> {
                [...]
        });

// configure serializers for publishing to topic
final Serde<ProcessedSentence> valueProcessedSentence = new SpecificAvroSerde<>();
valueProcessedSentence.configure(serdeConfig, false);
stringSerde.configure(serdeConfig, true);

// write to Specific Avro Record
sentenceKStream
        .to(EnvReader.KAFKA_SENTENCES_TOPIC_NAME_OUT,
                Produced.with(
                        Serdes.String(),
                        valueProcessedSentence));

句子流(sentenceKStream)写对了,但是字数分组出现问题:

KStream<String, Long> wordCountKStream =
        sentenceKStream.flatMap((key, processedSentence) -> {
            List<KeyValue<String, Long>> result = new LinkedList<>();
            Map<CharSequence, Long> words = processedSentence.getWords();
            for (CharSequence word: words.keySet() ) {
                result.add(KeyValue.pair(word.toString(), words.get(word)));
            }
            return result;
        })
        .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
        .reduce(Long::sum)
        .toStream();

// write to Specific Avro Record
wordCountKStream
        .to(EnvReader.KAFKA_WORDS_COUNT_TOPIC_NAME_OUT,
                Produced.with(
                        Serdes.String(),
                        Serdes.Long()));

真不明白为什么wordCountKStream不写了

也许有人可以提供一些帮助?我很乐意提供更多详细信息!

非常感谢

更新: 我发现两个新的输出流中都缺少数据。实际上,所有内容都已正确写入,但在写入数据几分钟后,两个主题中的数据都被删除了(剩余 0 字节)。

它与实现本身无关。我刚刚使用

删除了所有主题偏移量
kafka-consumer-groups.sh --bootstrap-server [broker:port] --delete-offsets --group [group_name] --topic [topic_name]

解决了问题。刚刚存储的偏移量出现问题,并与调试过程中流应用程序的多次重启发生冲突。

对于那些想要列出组以查找存储的主题位置的人,请致电

kafka-consumer-groups.sh --bootstrap-server node1:9092 --list

更新:不幸的是,组偏移量的删除也没有正常工作。实际问题是,输出主题中新条目的时间戳是原始主题(已使用)中的时间戳,根本没有改变。因此,新条目携带的时间戳早于默认保留时间。

由于消费主题的 rentention.ms 为 -1(永远保留数据),而新主题的标准是,我认为,6 天,消费主题中的条目仍然存在,但那些生成的主题总是被删除,因为它们超过 6 天。

最终解决方案 是将输出主题的 retention.ms 更改为 -1。

提示: 对于 Streams 应用程序,建议使用 Application Reset Tool 而不是手动 reset/deletion 偏移量,如上所示。