从 Flux 消费时按顺序调用非阻塞操作,包括重试
Invoking non-blocking operations sequentially while consuming from a Flux including retries
所以我的用例是在 Spring Webflux 应用程序中使用来自 Kafka 的消息,同时使用 Project Reactor 以反应式风格编程,并以相同的顺序对每条消息执行非阻塞操作因为消息是从卡夫卡收到的。系统应该也能自行恢复。
这是设置为使用的代码片段:
Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
return receiver.receive();
});
messages.map(this::transformToOutputFormat)
.map(this::performAction)
.flatMapSequential(receiverRecordMono -> receiverRecordMono)
.doOnNext(record -> record.receiverOffset().acknowledge())
.doOnError(error -> logger.error("Error receiving record", error))
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.subscribe();
如你所见,我所做的是:从Kafka中获取消息,将其转换为一个对象,用于新的目的地,然后将其发送到目的地,然后确认偏移量以将消息标记为已消费并进行了处理。以与从 Kafka 使用的消息相同的顺序确认偏移量至关重要,这样我们就不会将偏移量移动到未完全处理的消息之外(包括将一些数据发送到目的地)。因此,我使用 flatMapSequential
来确保这一点。
为简单起见,我们假设 transformToOutputFormat()
方法是恒等变换。
public ReceiverRecord<Integer, DataDocument> transformToOutputFormat(ReceiverRecord<Integer, DataDocument> record) {
return record;
}
performAction()
方法需要通过网络执行某些操作,例如调用 HTTP REST API。所以合适的APIs return一个Mono,就是需要订阅链。此外,我需要 ReceiverRecord
通过此方法进行 return 编辑,以便可以在上面的 flatMapSequential() 运算符中确认偏移量。因为我需要订阅 Mono,所以我在上面使用 flatMapSequential
。如果没有,我可以使用 map
代替。
public Mono<ReceiverRecord<Integer, DataDocument>> performAction(ReceiverRecord<Integer, DataDocument> record) {
return Mono.just(record)
.flatMap(receiverRecord ->
HttpClient.create()
.port(3000)
.get()
.uri("/makeCall?data=" + receiverRecord.value().getData())
.responseContent()
.aggregate()
.asString()
)
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.then(Mono.just(record));
我在这个方法中有两个相互矛盾的需求:
1.订阅进行HTTP调用的链
2. Return ReceiverRecord
使用 flatMap() 意味着我的 return 类型更改为 Mono。在同一个地方使用 doOnNext() 会在链中保留 ReceiverRecord,但不会允许自动订阅 HttpClient 响应。
我无法在 asString()
之后添加 .subscribe()
,因为我想等到 HTTP 响应完全接收后才能确认偏移量。
我也不能使用 .block()
,因为它在并行线程上运行。
因此,我需要作弊 return 方法作用域中的 record
对象。
另一件事是在 performAction
内重试时它会切换线程。由于 flatMapSequential() 急切地订阅外部通量中的每个 Mono,这意味着虽然可以保证偏移量的确认顺序,但我们不能保证 performAction
中的 HTTP 调用将以相同的顺序执行。
所以我有两个问题。
- 是否可以以自然的方式 return
record
而不是 return 方法作用域对象?
- 是否可以确保 HTTP 调用和偏移量确认的执行顺序与发生这些操作的消息的顺序相同?
这是我想出的解决方案。
Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
return receiver.receive();
});
messages.map(this::transformToOutputFormat)
.delayUntil(this::performAction)
.doOnNext(record -> record.receiverOffset().acknowledge())
.doOnError(error -> logger.error("Error receiving record", error))
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.subscribe();
我没有使用 flatMapSequential 订阅 performAction Mono 并保留序列,而是延迟了对来自 Kafka 接收器的更多消息的请求,直到执行操作。这实现了我需要的一次一个处理。
因此,performAction 不需要 return ReceiverRecord 的 Mono。我还将其简化为以下内容:
public Mono<String> performAction(ReceiverRecord<Integer, DataDocument> record) {
HttpClient.create()
.port(3000)
.get()
.uri("/makeCall?data=" + receiverRecord.value().getData())
.responseContent()
.aggregate()
.asString()
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5));
}
所以我的用例是在 Spring Webflux 应用程序中使用来自 Kafka 的消息,同时使用 Project Reactor 以反应式风格编程,并以相同的顺序对每条消息执行非阻塞操作因为消息是从卡夫卡收到的。系统应该也能自行恢复。
这是设置为使用的代码片段:
Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
return receiver.receive();
});
messages.map(this::transformToOutputFormat)
.map(this::performAction)
.flatMapSequential(receiverRecordMono -> receiverRecordMono)
.doOnNext(record -> record.receiverOffset().acknowledge())
.doOnError(error -> logger.error("Error receiving record", error))
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.subscribe();
如你所见,我所做的是:从Kafka中获取消息,将其转换为一个对象,用于新的目的地,然后将其发送到目的地,然后确认偏移量以将消息标记为已消费并进行了处理。以与从 Kafka 使用的消息相同的顺序确认偏移量至关重要,这样我们就不会将偏移量移动到未完全处理的消息之外(包括将一些数据发送到目的地)。因此,我使用 flatMapSequential
来确保这一点。
为简单起见,我们假设 transformToOutputFormat()
方法是恒等变换。
public ReceiverRecord<Integer, DataDocument> transformToOutputFormat(ReceiverRecord<Integer, DataDocument> record) {
return record;
}
performAction()
方法需要通过网络执行某些操作,例如调用 HTTP REST API。所以合适的APIs return一个Mono,就是需要订阅链。此外,我需要 ReceiverRecord
通过此方法进行 return 编辑,以便可以在上面的 flatMapSequential() 运算符中确认偏移量。因为我需要订阅 Mono,所以我在上面使用 flatMapSequential
。如果没有,我可以使用 map
代替。
public Mono<ReceiverRecord<Integer, DataDocument>> performAction(ReceiverRecord<Integer, DataDocument> record) {
return Mono.just(record)
.flatMap(receiverRecord ->
HttpClient.create()
.port(3000)
.get()
.uri("/makeCall?data=" + receiverRecord.value().getData())
.responseContent()
.aggregate()
.asString()
)
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.then(Mono.just(record));
我在这个方法中有两个相互矛盾的需求: 1.订阅进行HTTP调用的链 2. Return ReceiverRecord
使用 flatMap() 意味着我的 return 类型更改为 Mono。在同一个地方使用 doOnNext() 会在链中保留 ReceiverRecord,但不会允许自动订阅 HttpClient 响应。
我无法在 asString()
之后添加 .subscribe()
,因为我想等到 HTTP 响应完全接收后才能确认偏移量。
我也不能使用 .block()
,因为它在并行线程上运行。
因此,我需要作弊 return 方法作用域中的 record
对象。
另一件事是在 performAction
内重试时它会切换线程。由于 flatMapSequential() 急切地订阅外部通量中的每个 Mono,这意味着虽然可以保证偏移量的确认顺序,但我们不能保证 performAction
中的 HTTP 调用将以相同的顺序执行。
所以我有两个问题。
- 是否可以以自然的方式 return
record
而不是 return 方法作用域对象? - 是否可以确保 HTTP 调用和偏移量确认的执行顺序与发生这些操作的消息的顺序相同?
这是我想出的解决方案。
Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
return receiver.receive();
});
messages.map(this::transformToOutputFormat)
.delayUntil(this::performAction)
.doOnNext(record -> record.receiverOffset().acknowledge())
.doOnError(error -> logger.error("Error receiving record", error))
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.subscribe();
我没有使用 flatMapSequential 订阅 performAction Mono 并保留序列,而是延迟了对来自 Kafka 接收器的更多消息的请求,直到执行操作。这实现了我需要的一次一个处理。
因此,performAction 不需要 return ReceiverRecord 的 Mono。我还将其简化为以下内容:
public Mono<String> performAction(ReceiverRecord<Integer, DataDocument> record) {
HttpClient.create()
.port(3000)
.get()
.uri("/makeCall?data=" + receiverRecord.value().getData())
.responseContent()
.aggregate()
.asString()
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5));
}