如何在 Reactor 中结合使用 retry 和 repeat?
How to use retry and repeat combined in Reactor?
我正在轮询队列消息,如果发生错误我想做 retry
,如果我没有收到任何消息我想做 repeat
。重试和重复的总次数应等于 numRetries
。实现此目标的最佳方法是什么?
这就是我到目前为止所取得的成就:
Flux.defer { awsSQSClient.receiveMessage(10) }
.retry(numRetries)
.repeatWhen { emittedEachAttempt ->
emittedEachAttempt.handle<Flux<Message>> { lastEmitted, sink ->
val ctx = sink.currentContext()
val rl = ctx.getOrDefault("repeatsLeft", numRetries)
if (rl > 0 && lastEmitted == 0L) {
sink.next(Context.of("repeatsLeft", rl - 1)) // <-- This line doesn't compile although I copied it from reactor documentation
} else {
sink.next(Flux.empty())
}
}
}
我正在为 repeatWhen
lambda 而苦苦挣扎,我也不明白如何耦合 retry
和 repeat
计数。
I would like to do retry if some error happens or repeat if I don't receive any messages.
在这里区分重试和重复次数没有多大意义,尤其是当您想要整体跟踪总重复次数/重试次数时。两个运营商本质上都会做同样的事情(重新订阅源发布者),只是他们在不同的情况下这样做。
你会更好用:
.switchIfEmpty(Mono.error(new RuntimeException()))
...在重试行上方,然后完全废弃 repeatWhen()
部分。然后,您可以将 numRetries()
设置为您想要的重试/重复总数。
(当然,您可能还想在实际使用中使用更合适的异常类型。)
我正在轮询队列消息,如果发生错误我想做 retry
,如果我没有收到任何消息我想做 repeat
。重试和重复的总次数应等于 numRetries
。实现此目标的最佳方法是什么?
这就是我到目前为止所取得的成就:
Flux.defer { awsSQSClient.receiveMessage(10) }
.retry(numRetries)
.repeatWhen { emittedEachAttempt ->
emittedEachAttempt.handle<Flux<Message>> { lastEmitted, sink ->
val ctx = sink.currentContext()
val rl = ctx.getOrDefault("repeatsLeft", numRetries)
if (rl > 0 && lastEmitted == 0L) {
sink.next(Context.of("repeatsLeft", rl - 1)) // <-- This line doesn't compile although I copied it from reactor documentation
} else {
sink.next(Flux.empty())
}
}
}
我正在为 repeatWhen
lambda 而苦苦挣扎,我也不明白如何耦合 retry
和 repeat
计数。
I would like to do retry if some error happens or repeat if I don't receive any messages.
在这里区分重试和重复次数没有多大意义,尤其是当您想要整体跟踪总重复次数/重试次数时。两个运营商本质上都会做同样的事情(重新订阅源发布者),只是他们在不同的情况下这样做。
你会更好用:
.switchIfEmpty(Mono.error(new RuntimeException()))
...在重试行上方,然后完全废弃 repeatWhen()
部分。然后,您可以将 numRetries()
设置为您想要的重试/重复总数。
(当然,您可能还想在实际使用中使用更合适的异常类型。)