Kafka 流 - 外部写入的乱序副作用

Kafka streams - out of sequence side effects from external write

                                 KafkaProducer send("X", K, V)         (k1, x1)   ┌────────────┐
                              ┌──────────────────────────────────────────────────►│ Topic X    │
                              │                                                   │            │
                              │                                       ┌──────────►│            │
                              │                                       │ (k1, x2)  └────────────┘
                              │                                       │
                              │                                       │
                     ┌────────┴─────┐                           ┌─────┴─────────┐
                     │ KStream A    │                           │ KStream B     │
┌───────────┐        │ .from(A)     │      ┌───────────┐        │ .from(B)      │
│Topic A    ├───────►│ .transform() ├──────┤Topic B    ├───────►│ .transform()  ├────►
└───────────┘        │ .peek(x1)    │      └───────────┘        │ .peek(x2)     │
                     │ .to(B)       │                           │ .to(C)        │
                     └──────────────┘                           └───────────────┘

我有两个 Kafka 流应用程序处理消息 A -> B,然后进行进一步的业务处理。每个流应用程序也正在向主题 'X'.

写入事件数据包
stream
    .transform(() -> eventTransformer)
    .peek((key, value) -> {
        eventProducer.send(new ProducerRecord<>("X", key, value));
    });

Stream 应用程序 A 发出带有密钥 k1 的事件消息 x1,并且

Stream 应用程序 B 发出具有相同密钥 k1 的事件消息 x2 以确保相同的分区

但是,我注意到有时它们的写入顺序不正确,x2 在 x1 之前,这导致该特定事务的事件处理失败。

如何保证写入始终按顺序进行,x1 后跟 x2?

我很乐意探索替代方法。

您观察到的事件乱序很可能是因为生产者和 Kafka 流中的缓存(缓冲)。

  1. 您可以配置生产者以最大限度地减少延迟(主要是 linger.msbatch.size 配置参数)。请参阅 Configure Kafka to Minimise Latency 文章了解更多详情。
  2. 看看 Kafka Streams 缓存 Record Caches in DSL。设置 CACHE_MAX_BYTES_BUFFERING_CONFIG=0 在这里可能会有帮助。
  3. 就个人而言,我不喜欢从您的 KS 应用程序调用 producer.send() 方法的想法。您可以使用 .to() 方法将流具体化为主题
var transformed = stream.transform(() -> eventTransformer)

transformed.to("topic-B")

transformed.to("topic-X")