Spring Cloud Stream Kafka Streams:下游消息的数量与发送到主题的消息总数不匹配

Spring Cloud Stream Kafka Streams: The number of downstream messages doesn't match the sum of messages sent to the topic

我有一个 Spring 基于引导的 Spring Cloud Stream Kafka Streams Binder 应用程序。 它定义了一个拓扑,其中包含以下部分:

绿色数字表示通过Spring Cloud Stream Kafka Streams binder 绑定的各个处理器定义的拓扑传递的消息数量,以下是各自的属性:

spring.cloud.stream.bindings:
  ...
  hint1Stream-out-0:
    destination: hints
  realityStream-out-0:
    destination: hints
  countStream-in-0:
    destination: hints

我正在计算每个处理器使用 peek() 方法生成/使用的消息,如下所示:

return stream -> {
    stream
        .peek((k, v)-> input0count.incrementAndGet())
        ...
        .peek((k, v)-> output0count.incrementAndGet())
};

我正在使用具有几乎默认设置的嵌入式 Kafka 从单元测试开始我的应用程序:

@RunWith(SpringRunner.class)
@SpringBootTest(
    properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
        topics = {
                ...
                TOPIC_HINTS
        }
)
public class MyApplicationTests {
...

在我的测试中,我等待了足够长的时间,直到所有发布的测试消息都到达 countStream:

CountDownLatch latch = new CountDownLatch(1);
...
publishFromCsv(...)
...
latch.await(30, TimeUnit.SECONDS);
logCounters();

如您所见,放入 "hints" 主题的消息总数与 "counterStream" 端的消息数不匹配:1309 + 2589 != 3786

我可能缺少一些 Kafka 或 Kafka Streams 设置来刷新每批次?也许我的自定义 TimestampExtractor 会生成时间戳 "too old"? (我很确定它们不小于零)也许这与 Kafka 日志压缩有关?

这种不匹配的可能原因是什么?

更新

通过执行

检查了底层主题偏移量
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:60231 --topic hints

测试正在等待超时。

正如预期的那样,主题中的消息数等于两个输入流计数的总和。传递到 counterStream 输入的消息数仍然比预期少几十条。

正在使用的其他 Kafka 配置:

spring.cloud.stream.kafka.streams:
    configuration:
      schema.registry.url: mock://torpedo-stream-registry
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
      commit.interval.ms: 100

即对应processing.guarantee = at_least_once。无法测试 processing.guarantee = exactly_once,因为这需要至少 3 个代理的集群。

同时设置:

spring.cloud.stream.kafka.binder.configuration:
  auto.offset.reset: earliest
spring.cloud.stream.kafka.streams.binder.configuration:
  auto.offset.reset: earliest
spring.cloud.stream.kafka.streams:
  default:
    consumer:
      startOffset: earliest
spring.cloud.stream.bindings:
  countStream-in-0:
    destination: hints
    consumer:
      startOffset: earliest
      concurrency: 1

没有帮助:(

有用的是 仅在 countStream 消费者中留下 stream.peak(..),例如:

@Bean
public Consumer<KStream<String, Hint>> countStream() {
    return stream -> {
        KStream<String, Hint> kstream = stream.peek((k, v) -> input0count.incrementAndGet());
    };
}

在这种情况下,我会立即开始获取在 countConsumer 端计算的预期消息数。

这意味着我的 Count Consumer 内部结构对行为有影响。

这是它的完整版本 "doesn't work":

@Bean
public Consumer<KStream<String, Hint>> countStream() {
    return stream -> {
        KStream<String, Hint> kstream = stream.peek((k, v) -> notifyObservers(input0count.incrementAndGet()));

        KStream<String, Hint> realityStream = kstream
            .filter((key, hint) -> realityDetector.getName().equals(hint.getDetector()));

        KStream<String, Hint> hintsStream = kstream
            .filter((key, hint) -> !realityDetector.getName().equals(hint.getDetector()));

        this.countsTable = kstream
            .groupBy((key, hint) -> key.concat(":").concat(hint.getDetector()))
            .count(Materialized
                .as("countsTable"));

        this.countsByActionTable = kstream
            .groupBy((key, hint) -> key.concat(":")
                .concat(hint.getDetector()).concat("|")
                .concat(hint.getHint().toString()))
            .count(Materialized
                .as("countsByActionTable"));

        this.countsByHintRealityTable = hintsStream
            .join(realityStream,
                (hint, real) -> {
                    hint.setReal(real.getHint());
                    return hint;
                }, JoinWindows.of(countStreamProperties.getJoinWindowSize()))
            .groupBy((key, hint) -> key.concat(":")
                .concat(hint.getDetector()).concat("|")
                .concat(hint.getHint().toString()).concat("-")
                .concat(hint.getReal().toString())
            )
            .count(Materialized
                .as("countsByHintRealityTable"));

    };
}

我正在那里的几个 KTable 中存储计数。这是 Counts Consumer 内部发生的情况:

更新 2

Count Consumer 的最后一部分显然导致了最初的意外行为:

this.countsByHintRealityTable = hintsStream
        .join(realityStream,
            (hint, real) -> {
                hint.setReal(real.getHint());
                return hint;
            }, JoinWindows.of(countStreamProperties.getJoinWindowSize()))
        .groupBy((key, hint) -> key.concat(":")
            .concat(hint.getDetector()).concat("|")
            .concat(hint.getHint().toString()).concat("-")
            .concat(hint.getReal().toString())
        )
        .count(Materialized
            .as("countsByHintRealityTable"));

没有它,消息计数将按预期匹配。

这样的下游代码如何影响 Consumer KStream 输入?

我认为以下内容帮助我解决了这个问题:

有帮助的是将 Counter Consumer 分成两个部分(从我的角度来看)完全等同于单个消费者实现:

peek() 在两个消费者输入上报告的消息计数显示预期的消息数。

但结果是不确定的。接下来的每个 运行 都会产生不同的结果,有时仍然不匹配。

我发现并删除了以下在测试期间创建的临时文件夹 运行:

  • /tmp/kafka-streams/*(都是空的)
  • /var/folders/ms/pqwfgz297b91gw_b8xymf1l00000gn/T/spring*(这些看起来是嵌入式 Kafka 的临时文件夹)

在那之后,我无法使用相同的代码重现该问题

我必须清理的临时目录是在 spring-kafka-test EmbeddedKafkaBroker 中创建的:

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java#L329

我希望这个文件夹在优雅的单元测试退出时被自动删除?

这可能是 Kafka 本身的责任,但那里的类似错误看起来已经修复:KAFKA-1258

我在

中将 Kafka 代理 log.dir 设置为 "target/kafka"

kafka.properties

log.dir=target/kafka

MyApplicationTests.java

@RunWith(SpringRunner.class)
@SpringBootTest(
    properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
        topics = {
                TOPIC_QUOTES,
                TOPIC_WINDOWS,
                TOPIC_HINTS,
                TOPIC_REAL
        },
        brokerPropertiesLocation = "kafka.properties"
)
@Slf4j
public class MyApplicationTests {

我可以看到 target/kafka 文件夹在测试期间如何充满临时文件夹和文件 运行。它也会在测试退出时被删除 "by itself".

我仍然看到 ${io.java.tmpdir} 中的一些文件夹正在测试日志中使用,例如/var/folders/ms/pqwfgz297b91gw_b8xymf1l00000gn/T/kafka-16220018198285185785/version-2/snapshot.0。他们也得到清洁。

在大多数情况下,我的计数现在都匹配了。尽管如此,我想我已经看到过一次或另一次他们没有看到。

根据保留策略,可以删除邮件。更改拓扑反映了更改处理所需的时间量。如果在处理过程中出现保留,您可能会丢失消息。它还取决于偏移量重置策略。

尝试设置log.retention.hours=-1。这将禁用自动创建主题的保留。