Kafka 流 - 外部写入的乱序副作用
Kafka streams - out of sequence side effects from external write
KafkaProducer send("X", K, V) (k1, x1) ┌────────────┐
┌──────────────────────────────────────────────────►│ Topic X │
│ │ │
│ ┌──────────►│ │
│ │ (k1, x2) └────────────┘
│ │
│ │
┌────────┴─────┐ ┌─────┴─────────┐
│ KStream A │ │ KStream B │
┌───────────┐ │ .from(A) │ ┌───────────┐ │ .from(B) │
│Topic A ├───────►│ .transform() ├──────┤Topic B ├───────►│ .transform() ├────►
└───────────┘ │ .peek(x1) │ └───────────┘ │ .peek(x2) │
│ .to(B) │ │ .to(C) │
└──────────────┘ └───────────────┘
我有两个 Kafka 流应用程序处理消息 A -> B,然后进行进一步的业务处理。每个流应用程序也正在向主题 'X'.
写入事件数据包
stream
.transform(() -> eventTransformer)
.peek((key, value) -> {
eventProducer.send(new ProducerRecord<>("X", key, value));
});
Stream 应用程序 A 发出带有密钥 k1 的事件消息 x1,并且
Stream 应用程序 B 发出具有相同密钥 k1 的事件消息 x2 以确保相同的分区
但是,我注意到有时它们的写入顺序不正确,x2 在 x1 之前,这导致该特定事务的事件处理失败。
如何保证写入始终按顺序进行,x1 后跟 x2?
我很乐意探索替代方法。
您观察到的事件乱序很可能是因为生产者和 Kafka 流中的缓存(缓冲)。
- 您可以配置生产者以最大限度地减少延迟(主要是
linger.ms
和 batch.size
配置参数)。请参阅 Configure Kafka to Minimise Latency 文章了解更多详情。
- 看看 Kafka Streams 缓存 Record Caches in DSL。设置
CACHE_MAX_BYTES_BUFFERING_CONFIG=0
在这里可能会有帮助。
- 就个人而言,我不喜欢从您的 KS 应用程序调用
producer.send()
方法的想法。您可以使用 .to()
方法将流具体化为主题
var transformed = stream.transform(() -> eventTransformer)
transformed.to("topic-B")
transformed.to("topic-X")
KafkaProducer send("X", K, V) (k1, x1) ┌────────────┐
┌──────────────────────────────────────────────────►│ Topic X │
│ │ │
│ ┌──────────►│ │
│ │ (k1, x2) └────────────┘
│ │
│ │
┌────────┴─────┐ ┌─────┴─────────┐
│ KStream A │ │ KStream B │
┌───────────┐ │ .from(A) │ ┌───────────┐ │ .from(B) │
│Topic A ├───────►│ .transform() ├──────┤Topic B ├───────►│ .transform() ├────►
└───────────┘ │ .peek(x1) │ └───────────┘ │ .peek(x2) │
│ .to(B) │ │ .to(C) │
└──────────────┘ └───────────────┘
我有两个 Kafka 流应用程序处理消息 A -> B,然后进行进一步的业务处理。每个流应用程序也正在向主题 'X'.
写入事件数据包stream
.transform(() -> eventTransformer)
.peek((key, value) -> {
eventProducer.send(new ProducerRecord<>("X", key, value));
});
Stream 应用程序 A 发出带有密钥 k1 的事件消息 x1,并且
Stream 应用程序 B 发出具有相同密钥 k1 的事件消息 x2 以确保相同的分区
但是,我注意到有时它们的写入顺序不正确,x2 在 x1 之前,这导致该特定事务的事件处理失败。
如何保证写入始终按顺序进行,x1 后跟 x2?
我很乐意探索替代方法。
您观察到的事件乱序很可能是因为生产者和 Kafka 流中的缓存(缓冲)。
- 您可以配置生产者以最大限度地减少延迟(主要是
linger.ms
和batch.size
配置参数)。请参阅 Configure Kafka to Minimise Latency 文章了解更多详情。 - 看看 Kafka Streams 缓存 Record Caches in DSL。设置
CACHE_MAX_BYTES_BUFFERING_CONFIG=0
在这里可能会有帮助。 - 就个人而言,我不喜欢从您的 KS 应用程序调用
producer.send()
方法的想法。您可以使用.to()
方法将流具体化为主题
var transformed = stream.transform(() -> eventTransformer) transformed.to("topic-B") transformed.to("topic-X")