Spring Cloud Stream Kafka Streams:下游消息的数量与发送到主题的消息总数不匹配
Spring Cloud Stream Kafka Streams: The number of downstream messages doesn't match the sum of messages sent to the topic
我有一个 Spring 基于引导的 Spring Cloud Stream Kafka Streams Binder 应用程序。
它定义了一个拓扑,其中包含以下部分:
绿色数字表示通过Spring Cloud Stream Kafka Streams binder 绑定的各个处理器定义的拓扑传递的消息数量,以下是各自的属性:
spring.cloud.stream.bindings:
...
hint1Stream-out-0:
destination: hints
realityStream-out-0:
destination: hints
countStream-in-0:
destination: hints
我正在计算每个处理器使用 peek()
方法生成/使用的消息,如下所示:
return stream -> {
stream
.peek((k, v)-> input0count.incrementAndGet())
...
.peek((k, v)-> output0count.incrementAndGet())
};
我正在使用具有几乎默认设置的嵌入式 Kafka 从单元测试开始我的应用程序:
@RunWith(SpringRunner.class)
@SpringBootTest(
properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
topics = {
...
TOPIC_HINTS
}
)
public class MyApplicationTests {
...
在我的测试中,我等待了足够长的时间,直到所有发布的测试消息都到达 countStream:
CountDownLatch latch = new CountDownLatch(1);
...
publishFromCsv(...)
...
latch.await(30, TimeUnit.SECONDS);
logCounters();
如您所见,放入 "hints" 主题的消息总数与 "counterStream" 端的消息数不匹配:1309 + 2589 != 3786
我可能缺少一些 Kafka 或 Kafka Streams 设置来刷新每批次?也许我的自定义 TimestampExtractor 会生成时间戳 "too old"? (我很确定它们不小于零)也许这与 Kafka 日志压缩有关?
这种不匹配的可能原因是什么?
更新
通过执行
检查了底层主题偏移量
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:60231 --topic hints
测试正在等待超时。
正如预期的那样,主题中的消息数等于两个输入流计数的总和。传递到 counterStream 输入的消息数仍然比预期少几十条。
正在使用的其他 Kafka 配置:
spring.cloud.stream.kafka.streams:
configuration:
schema.registry.url: mock://torpedo-stream-registry
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
commit.interval.ms: 100
即对应processing.guarantee = at_least_once
。无法测试 processing.guarantee = exactly_once
,因为这需要至少 3 个代理的集群。
同时设置:
spring.cloud.stream.kafka.binder.configuration:
auto.offset.reset: earliest
spring.cloud.stream.kafka.streams.binder.configuration:
auto.offset.reset: earliest
spring.cloud.stream.kafka.streams:
default:
consumer:
startOffset: earliest
spring.cloud.stream.bindings:
countStream-in-0:
destination: hints
consumer:
startOffset: earliest
concurrency: 1
没有帮助:(
有用的是 仅在 countStream 消费者中留下 stream.peak(..)
,例如:
@Bean
public Consumer<KStream<String, Hint>> countStream() {
return stream -> {
KStream<String, Hint> kstream = stream.peek((k, v) -> input0count.incrementAndGet());
};
}
在这种情况下,我会立即开始获取在 countConsumer 端计算的预期消息数。
这意味着我的 Count Consumer 内部结构对行为有影响。
这是它的完整版本 "doesn't work":
@Bean
public Consumer<KStream<String, Hint>> countStream() {
return stream -> {
KStream<String, Hint> kstream = stream.peek((k, v) -> notifyObservers(input0count.incrementAndGet()));
KStream<String, Hint> realityStream = kstream
.filter((key, hint) -> realityDetector.getName().equals(hint.getDetector()));
KStream<String, Hint> hintsStream = kstream
.filter((key, hint) -> !realityDetector.getName().equals(hint.getDetector()));
this.countsTable = kstream
.groupBy((key, hint) -> key.concat(":").concat(hint.getDetector()))
.count(Materialized
.as("countsTable"));
this.countsByActionTable = kstream
.groupBy((key, hint) -> key.concat(":")
.concat(hint.getDetector()).concat("|")
.concat(hint.getHint().toString()))
.count(Materialized
.as("countsByActionTable"));
this.countsByHintRealityTable = hintsStream
.join(realityStream,
(hint, real) -> {
hint.setReal(real.getHint());
return hint;
}, JoinWindows.of(countStreamProperties.getJoinWindowSize()))
.groupBy((key, hint) -> key.concat(":")
.concat(hint.getDetector()).concat("|")
.concat(hint.getHint().toString()).concat("-")
.concat(hint.getReal().toString())
)
.count(Materialized
.as("countsByHintRealityTable"));
};
}
我正在那里的几个 KTable 中存储计数。这是 Counts Consumer 内部发生的情况:
更新 2
Count Consumer 的最后一部分显然导致了最初的意外行为:
this.countsByHintRealityTable = hintsStream
.join(realityStream,
(hint, real) -> {
hint.setReal(real.getHint());
return hint;
}, JoinWindows.of(countStreamProperties.getJoinWindowSize()))
.groupBy((key, hint) -> key.concat(":")
.concat(hint.getDetector()).concat("|")
.concat(hint.getHint().toString()).concat("-")
.concat(hint.getReal().toString())
)
.count(Materialized
.as("countsByHintRealityTable"));
没有它,消息计数将按预期匹配。
这样的下游代码如何影响 Consumer KStream 输入?
我认为以下内容帮助我解决了这个问题:
有帮助的是将 Counter Consumer 分成两个部分(从我的角度来看)完全等同于单个消费者实现:
peek()
在两个消费者输入上报告的消息计数显示预期的消息数。
但结果是不确定的。接下来的每个 运行 都会产生不同的结果,有时仍然不匹配。
我发现并删除了以下在测试期间创建的临时文件夹 运行:
/tmp/kafka-streams/*
(都是空的)
/var/folders/ms/pqwfgz297b91gw_b8xymf1l00000gn/T/spring*
(这些看起来是嵌入式 Kafka 的临时文件夹)
在那之后,我无法使用相同的代码重现该问题还。
我必须清理的临时目录是在 spring-kafka-test EmbeddedKafkaBroker 中创建的:
我希望这个文件夹在优雅的单元测试退出时被自动删除?
这可能是 Kafka 本身的责任,但那里的类似错误看起来已经修复:KAFKA-1258
我在
中将 Kafka 代理 log.dir
设置为 "target/kafka"
kafka.properties
log.dir=target/kafka
MyApplicationTests.java
@RunWith(SpringRunner.class)
@SpringBootTest(
properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
topics = {
TOPIC_QUOTES,
TOPIC_WINDOWS,
TOPIC_HINTS,
TOPIC_REAL
},
brokerPropertiesLocation = "kafka.properties"
)
@Slf4j
public class MyApplicationTests {
我可以看到 target/kafka 文件夹在测试期间如何充满临时文件夹和文件 运行。它也会在测试退出时被删除 "by itself".
我仍然看到 ${io.java.tmpdir} 中的一些文件夹正在测试日志中使用,例如/var/folders/ms/pqwfgz297b91gw_b8xymf1l00000gn/T/kafka-16220018198285185785/version-2/snapshot.0
。他们也得到清洁。
在大多数情况下,我的计数现在都匹配了。尽管如此,我想我已经看到过一次或另一次他们没有看到。
根据保留策略,可以删除邮件。更改拓扑反映了更改处理所需的时间量。如果在处理过程中出现保留,您可能会丢失消息。它还取决于偏移量重置策略。
尝试设置log.retention.hours=-1
。这将禁用自动创建主题的保留。
我有一个 Spring 基于引导的 Spring Cloud Stream Kafka Streams Binder 应用程序。 它定义了一个拓扑,其中包含以下部分:
绿色数字表示通过Spring Cloud Stream Kafka Streams binder 绑定的各个处理器定义的拓扑传递的消息数量,以下是各自的属性:
spring.cloud.stream.bindings:
...
hint1Stream-out-0:
destination: hints
realityStream-out-0:
destination: hints
countStream-in-0:
destination: hints
我正在计算每个处理器使用 peek()
方法生成/使用的消息,如下所示:
return stream -> {
stream
.peek((k, v)-> input0count.incrementAndGet())
...
.peek((k, v)-> output0count.incrementAndGet())
};
我正在使用具有几乎默认设置的嵌入式 Kafka 从单元测试开始我的应用程序:
@RunWith(SpringRunner.class)
@SpringBootTest(
properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
topics = {
...
TOPIC_HINTS
}
)
public class MyApplicationTests {
...
在我的测试中,我等待了足够长的时间,直到所有发布的测试消息都到达 countStream:
CountDownLatch latch = new CountDownLatch(1);
...
publishFromCsv(...)
...
latch.await(30, TimeUnit.SECONDS);
logCounters();
如您所见,放入 "hints" 主题的消息总数与 "counterStream" 端的消息数不匹配:1309 + 2589 != 3786
我可能缺少一些 Kafka 或 Kafka Streams 设置来刷新每批次?也许我的自定义 TimestampExtractor 会生成时间戳 "too old"? (我很确定它们不小于零)也许这与 Kafka 日志压缩有关?
这种不匹配的可能原因是什么?
更新
通过执行
检查了底层主题偏移量kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:60231 --topic hints
测试正在等待超时。
正如预期的那样,主题中的消息数等于两个输入流计数的总和。传递到 counterStream 输入的消息数仍然比预期少几十条。
正在使用的其他 Kafka 配置:
spring.cloud.stream.kafka.streams:
configuration:
schema.registry.url: mock://torpedo-stream-registry
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
commit.interval.ms: 100
即对应processing.guarantee = at_least_once
。无法测试 processing.guarantee = exactly_once
,因为这需要至少 3 个代理的集群。
同时设置:
spring.cloud.stream.kafka.binder.configuration:
auto.offset.reset: earliest
spring.cloud.stream.kafka.streams.binder.configuration:
auto.offset.reset: earliest
spring.cloud.stream.kafka.streams:
default:
consumer:
startOffset: earliest
spring.cloud.stream.bindings:
countStream-in-0:
destination: hints
consumer:
startOffset: earliest
concurrency: 1
没有帮助:(
有用的是 仅在 countStream 消费者中留下 stream.peak(..)
,例如:
@Bean
public Consumer<KStream<String, Hint>> countStream() {
return stream -> {
KStream<String, Hint> kstream = stream.peek((k, v) -> input0count.incrementAndGet());
};
}
在这种情况下,我会立即开始获取在 countConsumer 端计算的预期消息数。
这意味着我的 Count Consumer 内部结构对行为有影响。
这是它的完整版本 "doesn't work":
@Bean
public Consumer<KStream<String, Hint>> countStream() {
return stream -> {
KStream<String, Hint> kstream = stream.peek((k, v) -> notifyObservers(input0count.incrementAndGet()));
KStream<String, Hint> realityStream = kstream
.filter((key, hint) -> realityDetector.getName().equals(hint.getDetector()));
KStream<String, Hint> hintsStream = kstream
.filter((key, hint) -> !realityDetector.getName().equals(hint.getDetector()));
this.countsTable = kstream
.groupBy((key, hint) -> key.concat(":").concat(hint.getDetector()))
.count(Materialized
.as("countsTable"));
this.countsByActionTable = kstream
.groupBy((key, hint) -> key.concat(":")
.concat(hint.getDetector()).concat("|")
.concat(hint.getHint().toString()))
.count(Materialized
.as("countsByActionTable"));
this.countsByHintRealityTable = hintsStream
.join(realityStream,
(hint, real) -> {
hint.setReal(real.getHint());
return hint;
}, JoinWindows.of(countStreamProperties.getJoinWindowSize()))
.groupBy((key, hint) -> key.concat(":")
.concat(hint.getDetector()).concat("|")
.concat(hint.getHint().toString()).concat("-")
.concat(hint.getReal().toString())
)
.count(Materialized
.as("countsByHintRealityTable"));
};
}
我正在那里的几个 KTable 中存储计数。这是 Counts Consumer 内部发生的情况:
更新 2
Count Consumer 的最后一部分显然导致了最初的意外行为:
this.countsByHintRealityTable = hintsStream
.join(realityStream,
(hint, real) -> {
hint.setReal(real.getHint());
return hint;
}, JoinWindows.of(countStreamProperties.getJoinWindowSize()))
.groupBy((key, hint) -> key.concat(":")
.concat(hint.getDetector()).concat("|")
.concat(hint.getHint().toString()).concat("-")
.concat(hint.getReal().toString())
)
.count(Materialized
.as("countsByHintRealityTable"));
没有它,消息计数将按预期匹配。
这样的下游代码如何影响 Consumer KStream 输入?
我认为以下内容帮助我解决了这个问题:
有帮助的是将 Counter Consumer 分成两个部分(从我的角度来看)完全等同于单个消费者实现:
peek()
在两个消费者输入上报告的消息计数显示预期的消息数。
但结果是不确定的。接下来的每个 运行 都会产生不同的结果,有时仍然不匹配。
我发现并删除了以下在测试期间创建的临时文件夹 运行:
/tmp/kafka-streams/*
(都是空的)/var/folders/ms/pqwfgz297b91gw_b8xymf1l00000gn/T/spring*
(这些看起来是嵌入式 Kafka 的临时文件夹)
在那之后,我无法使用相同的代码重现该问题还。
我必须清理的临时目录是在 spring-kafka-test EmbeddedKafkaBroker 中创建的:
我希望这个文件夹在优雅的单元测试退出时被自动删除?
这可能是 Kafka 本身的责任,但那里的类似错误看起来已经修复:KAFKA-1258
我在
中将 Kafka 代理log.dir
设置为 "target/kafka"
kafka.properties
log.dir=target/kafka
MyApplicationTests.java
@RunWith(SpringRunner.class)
@SpringBootTest(
properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
topics = {
TOPIC_QUOTES,
TOPIC_WINDOWS,
TOPIC_HINTS,
TOPIC_REAL
},
brokerPropertiesLocation = "kafka.properties"
)
@Slf4j
public class MyApplicationTests {
我可以看到 target/kafka 文件夹在测试期间如何充满临时文件夹和文件 运行。它也会在测试退出时被删除 "by itself".
我仍然看到 ${io.java.tmpdir} 中的一些文件夹正在测试日志中使用,例如/var/folders/ms/pqwfgz297b91gw_b8xymf1l00000gn/T/kafka-16220018198285185785/version-2/snapshot.0
。他们也得到清洁。
在大多数情况下,我的计数现在都匹配了。尽管如此,我想我已经看到过一次或另一次他们没有看到。
根据保留策略,可以删除邮件。更改拓扑反映了更改处理所需的时间量。如果在处理过程中出现保留,您可能会丢失消息。它还取决于偏移量重置策略。
尝试设置log.retention.hours=-1
。这将禁用自动创建主题的保留。