反应器 - 第一次变空时停止源
Reactor - Stop source when first empty
我有这样的需求
Flux<Integer> s1 = .....;
s1.flatMap(value -> anotherSource.find(value));
当 anotherSource.find
第一次为空时,我需要一种方法来停止此 s1。怎么做?
注:
一个可能的解决方案是抛出错误然后捕获它停止。
anotherSource.find(value).switchIfempty(Mono.error(..))
我正在寻找比这更好的解决方案。
您不会为此找到特定的运算符,您必须组合运算符才能实现它。 (请注意,这本身并不能使它成为“黑客”,反应式框架通常旨在以将基本运算符组合在一起以实现您的用例的方式使用。)
我同意使用错误来实现远非理想,因为它可能会破坏反应链中 真实 错误的流动 - 所以这真的应该是最后一次度假村。
在我希望流基于内部发布者停止的情况下,我通常采用的方法是具体化内部流,过滤掉 onComplete()
信号,然后重新添加 onComplete()
在适当的时候(在这种情况下,如果它是空的。)然后你可以将外部流非物质化,它会在你注入它的任何地方响应完成的信号,停止流:
s1.flatMap(
value ->
anotherSource
.find(value)
.materialize()
.filter(s -> !s.isOnComplete())
.defaultIfEmpty(Signal.complete()))
.dematerialize()
这样做的好处是可以保留任何错误信号,同时也不需要另一个对象或特殊值。
我有这样的需求
Flux<Integer> s1 = .....;
s1.flatMap(value -> anotherSource.find(value));
当 anotherSource.find
第一次为空时,我需要一种方法来停止此 s1。怎么做?
注:
一个可能的解决方案是抛出错误然后捕获它停止。
anotherSource.find(value).switchIfempty(Mono.error(..))
我正在寻找比这更好的解决方案。
您不会为此找到特定的运算符,您必须组合运算符才能实现它。 (请注意,这本身并不能使它成为“黑客”,反应式框架通常旨在以将基本运算符组合在一起以实现您的用例的方式使用。)
我同意使用错误来实现远非理想,因为它可能会破坏反应链中 真实 错误的流动 - 所以这真的应该是最后一次度假村。
在我希望流基于内部发布者停止的情况下,我通常采用的方法是具体化内部流,过滤掉 onComplete()
信号,然后重新添加 onComplete()
在适当的时候(在这种情况下,如果它是空的。)然后你可以将外部流非物质化,它会在你注入它的任何地方响应完成的信号,停止流:
s1.flatMap(
value ->
anotherSource
.find(value)
.materialize()
.filter(s -> !s.isOnComplete())
.defaultIfEmpty(Signal.complete()))
.dematerialize()
这样做的好处是可以保留任何错误信号,同时也不需要另一个对象或特殊值。