Reactor Flux 的条件逻辑

Conditional logic on a Reactor Flux

我是 Reactor 新手。我正在尝试开发以下应用程序逻辑:

  1. 阅读来自 Kafka 主题的消息 source
  2. 变换按摩。
  3. 将转换后的消息的子集写入新的 Kafka 主题 target
  4. 明确确认对所有最初从主题 source 读取的消息的读取操作。

我找到的唯一解决方案是重写上面的业务逻辑如下。

  1. 阅读来自 Kafka 主题的消息 source
  2. 变换按摩。
  3. 立即确认消息未写入主题 target
  4. 过滤以上所有消息。
  5. 将剩余的转换消息写入新的 Kafka 主题 target
  6. 明确确认对这些消息的读取操作

实现第二​​种逻辑的代码如下:

receiver.receive()
        .flatMap(this::processMessage)
        .map(this::acknowledgeMessagesNotToWriteInKafka)
        .filter(this::isMessageToWriteInKafka)
        .as(this::sendToKafka)
        .doOnNext(r -> r.correlationMetadata().acknowledge());

显然,receiver 类型是 KafkaReceiver,而方法 sendToKafka 使用了 KafkaSender。我不喜欢的一件事是我正在使用 map 来确认某些消息。

有没有更好的方案来实现原来的逻辑?

不完全你的四个业务逻辑步骤,但我认为它更接近你想要的。

您可以确认在 .filter 之后不会写入 .doOnDiscard 的 "discarded" 消息...

receiver.receive()
        .flatMap(this::processMessage)
        .filter(this::isMessageToWriteInKafka)
        .doOnDiscard(ReceiverRecord.class, record -> record.receiverOffset().acknowledge())
        .as(this::sendToKafka)
        .doOnNext(r -> r.correlationMetadata().acknowledge());

注意:您需要使用被丢弃的正确对象类型。我不知道 Publisher 从 processMessage 返回的对象类型,但我假设您可以从中获取 ReceiverRecordReceiverOffset 以确认它。

或者,您可以将 filter/doOnDiscard 组合成一个 .handle 运算符...

receiver.receive()
        .flatMap(this::processMessage)
        .handle((m, sink) -> {
            if (isMessageToWriteInKafka(m)) {
                sink.next(m);
            } else {
                m.getReceiverRecord().getReceiverOffset().acknowledge();
            }
        })
        .as(this::sendToKafka)
        .doOnNext(r -> r.correlationMetadata().acknowledge());