Project Reactor - 如何处理来自 Flux.interval 的 OverflowException?
Project Reactor - How to handle OverflowException from Flux.interval?
我正在使用 Spring Webflux 构建一个 spring 引导应用程序,我想让该应用程序完全无阻塞。应用程序本身有一些 REST 端点和一个需要每隔几秒 运行 的批处理作业。对于批处理作业,我正在尝试 Flux.interval(Duration.ofMillis(1000))
生成我忽略的长值和 运行 我的计划作业。
Flux.interval(Duration.ofMillis(1000))
.flatMap(ignore -> doSomething())
.subscribe();
但是一段时间后我收到错误
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 257 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
谁能告诉我如何解决这个问题?
问题的原因很可能是 doSomething()
操作花费的时间超过了指定的 Flux 间隔,这意味着一段时间后 doSomething
作业相互重叠并且背压开始。由于Flux.interval
是一个热源(意味着它不会按需发出信号)并且 flatMap
具有默认的并发限制 (256),操作员不知所措,这导致 OverflowException
.
根据您的要求,这个问题有几个可能的解决方案:
1。忽略溢出错误并丢弃会溢出的信号
这意味着有时,如果我们已经有很多 (256) 个正在进行中,我们会跳过一秒钟并且不会在间隔内安排作业。
Flux.interval(Duration.ofMillis(1000))
.onBackpressureDrop()
.flatMap(ignore -> doSomething())
2。将 flatMap
并发设置为更高的值
这仍然会在一段时间后导致 OverflowException,但会延迟问题的解决(可能不是最佳解决方案)。
Flux.interval(Duration.ofMillis(1000))
.flatMap(ignore -> doSomething(), Integer.MAX_VALUE)
3。不要让工作相互重叠
我们从热源切换到冷源,消除了溢出的可能性。但是,我们失去了每秒安排一个事件的保证。相反,它们将在上一个作业完成且至少经过 1 秒后按需安排。
Mono.just(1).repeat() // infinite Flux with backpressure
.delayElements(Duration.ofMillis(1000))
.concatMap(ignore -> doSomething())
如果您不介意重叠作业并在 flatMap
调用中定义合理的并发级别,您也可以将此解决方案与上一个解决方案结合使用。
我正在使用 Spring Webflux 构建一个 spring 引导应用程序,我想让该应用程序完全无阻塞。应用程序本身有一些 REST 端点和一个需要每隔几秒 运行 的批处理作业。对于批处理作业,我正在尝试 Flux.interval(Duration.ofMillis(1000))
生成我忽略的长值和 运行 我的计划作业。
Flux.interval(Duration.ofMillis(1000))
.flatMap(ignore -> doSomething())
.subscribe();
但是一段时间后我收到错误
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 257 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
谁能告诉我如何解决这个问题?
问题的原因很可能是 doSomething()
操作花费的时间超过了指定的 Flux 间隔,这意味着一段时间后 doSomething
作业相互重叠并且背压开始。由于Flux.interval
是一个热源(意味着它不会按需发出信号)并且 flatMap
具有默认的并发限制 (256),操作员不知所措,这导致 OverflowException
.
根据您的要求,这个问题有几个可能的解决方案:
1。忽略溢出错误并丢弃会溢出的信号
这意味着有时,如果我们已经有很多 (256) 个正在进行中,我们会跳过一秒钟并且不会在间隔内安排作业。
Flux.interval(Duration.ofMillis(1000))
.onBackpressureDrop()
.flatMap(ignore -> doSomething())
2。将 flatMap
并发设置为更高的值
这仍然会在一段时间后导致 OverflowException,但会延迟问题的解决(可能不是最佳解决方案)。
Flux.interval(Duration.ofMillis(1000))
.flatMap(ignore -> doSomething(), Integer.MAX_VALUE)
3。不要让工作相互重叠
我们从热源切换到冷源,消除了溢出的可能性。但是,我们失去了每秒安排一个事件的保证。相反,它们将在上一个作业完成且至少经过 1 秒后按需安排。
Mono.just(1).repeat() // infinite Flux with backpressure
.delayElements(Duration.ofMillis(1000))
.concatMap(ignore -> doSomething())
如果您不介意重叠作业并在 flatMap
调用中定义合理的并发级别,您也可以将此解决方案与上一个解决方案结合使用。