Reactor Flux 的条件逻辑
Conditional logic on a Reactor Flux
我是 Reactor 新手。我正在尝试开发以下应用程序逻辑:
- 阅读来自 Kafka 主题的消息
source
。
- 变换按摩。
- 将转换后的消息的子集写入新的 Kafka 主题
target
。
- 明确确认对所有最初从主题
source
读取的消息的读取操作。
我找到的唯一解决方案是重写上面的业务逻辑如下。
- 阅读来自 Kafka 主题的消息
source
。
- 变换按摩。
- 立即确认消息未写入主题
target
。
- 过滤以上所有消息。
- 将剩余的转换消息写入新的 Kafka 主题
target
。
- 明确确认对这些消息的读取操作
实现第二种逻辑的代码如下:
receiver.receive()
.flatMap(this::processMessage)
.map(this::acknowledgeMessagesNotToWriteInKafka)
.filter(this::isMessageToWriteInKafka)
.as(this::sendToKafka)
.doOnNext(r -> r.correlationMetadata().acknowledge());
显然,receiver
类型是 KafkaReceiver
,而方法 sendToKafka
使用了 KafkaSender
。我不喜欢的一件事是我正在使用 map
来确认某些消息。
有没有更好的方案来实现原来的逻辑?
这不完全你的四个业务逻辑步骤,但我认为它更接近你想要的。
您可以确认在 .filter
之后不会写入 .doOnDiscard
的 "discarded" 消息...
receiver.receive()
.flatMap(this::processMessage)
.filter(this::isMessageToWriteInKafka)
.doOnDiscard(ReceiverRecord.class, record -> record.receiverOffset().acknowledge())
.as(this::sendToKafka)
.doOnNext(r -> r.correlationMetadata().acknowledge());
注意:您需要使用被丢弃的正确对象类型。我不知道 Publisher 从 processMessage
返回的对象类型,但我假设您可以从中获取 ReceiverRecord
或 ReceiverOffset
以确认它。
或者,您可以将 filter
/doOnDiscard
组合成一个 .handle
运算符...
receiver.receive()
.flatMap(this::processMessage)
.handle((m, sink) -> {
if (isMessageToWriteInKafka(m)) {
sink.next(m);
} else {
m.getReceiverRecord().getReceiverOffset().acknowledge();
}
})
.as(this::sendToKafka)
.doOnNext(r -> r.correlationMetadata().acknowledge());
我是 Reactor 新手。我正在尝试开发以下应用程序逻辑:
- 阅读来自 Kafka 主题的消息
source
。 - 变换按摩。
- 将转换后的消息的子集写入新的 Kafka 主题
target
。 - 明确确认对所有最初从主题
source
读取的消息的读取操作。
我找到的唯一解决方案是重写上面的业务逻辑如下。
- 阅读来自 Kafka 主题的消息
source
。 - 变换按摩。
- 立即确认消息未写入主题
target
。 - 过滤以上所有消息。
- 将剩余的转换消息写入新的 Kafka 主题
target
。 - 明确确认对这些消息的读取操作
实现第二种逻辑的代码如下:
receiver.receive()
.flatMap(this::processMessage)
.map(this::acknowledgeMessagesNotToWriteInKafka)
.filter(this::isMessageToWriteInKafka)
.as(this::sendToKafka)
.doOnNext(r -> r.correlationMetadata().acknowledge());
显然,receiver
类型是 KafkaReceiver
,而方法 sendToKafka
使用了 KafkaSender
。我不喜欢的一件事是我正在使用 map
来确认某些消息。
有没有更好的方案来实现原来的逻辑?
这不完全你的四个业务逻辑步骤,但我认为它更接近你想要的。
您可以确认在 .filter
之后不会写入 .doOnDiscard
的 "discarded" 消息...
receiver.receive()
.flatMap(this::processMessage)
.filter(this::isMessageToWriteInKafka)
.doOnDiscard(ReceiverRecord.class, record -> record.receiverOffset().acknowledge())
.as(this::sendToKafka)
.doOnNext(r -> r.correlationMetadata().acknowledge());
注意:您需要使用被丢弃的正确对象类型。我不知道 Publisher 从 processMessage
返回的对象类型,但我假设您可以从中获取 ReceiverRecord
或 ReceiverOffset
以确认它。
或者,您可以将 filter
/doOnDiscard
组合成一个 .handle
运算符...
receiver.receive()
.flatMap(this::processMessage)
.handle((m, sink) -> {
if (isMessageToWriteInKafka(m)) {
sink.next(m);
} else {
m.getReceiverRecord().getReceiverOffset().acknowledge();
}
})
.as(this::sendToKafka)
.doOnNext(r -> r.correlationMetadata().acknowledge());