从 Mono.first 发出第一个成功的结果
Emit the first successfull result from Mono.first
我有两个 Mono
要么 return 一个 404
要么产生一个 result
.
我怎样才能将这两个 Mono
组合起来
-
result
会在 Mono
成功完成后立即发出。
- 一旦所有
Mono
产生 404
或面临任何其他错误,Optional.empty
就会被 return 编辑?
我试过了
Mono<Result> mono0 = client.get()
.uri(uri1)
.retrieve()
.bodyToMono(Result.class)
.onErrorResume(e -> Mono.never());
Mono<Result> mono1 = client.get()
.uri(uri2)
.retrieve()
.bodyToMono(Result.class)
.onErrorResume(e -> Mono.never());
return Mono.first(mono0, mono1)
.blockOptional()
这种方法的问题是,如果两个 Mono
都产生错误,它永远不会完成...
我想我需要类似 OnErrorDetach
....!?
为了让问题更清楚,我创建了一个测试用例:
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
class DummyTest {
public Optional<String> getResult(Mono<String> m1, Mono<String> m2) {
return Mono.first(m1, m2)
// DoSomethingHere ?!??!?
.blockOptional();
}
@Test
void testFirstSuccessfullAndSecondErrorMono() {
Optional<String> result = getResult(Mono.just("Something"), Mono.error(new RuntimeException()));
assertThat(result, is(Optional.of("Something")));
}
@Test
void testSecondSuccessfullAndFirstErrorMono() {
Optional<String> result = getResult(Mono.error(new RuntimeException()), Mono.just("Something"));
assertThat(result, is(Optional.of("Something")));
}
@Test
void testTwoErrorMonosYieldEmpty() {
Optional<String> result =
getResult(Mono.error(new RuntimeException()), Mono.error(new RuntimeException()));
assertThat(result, is(Optional.empty()));
}
}
Mono.first()
的问题是它需要第一个 信号 (不是第一个值),所以你最终试图延迟相应的 Mono
永远让它不重播完成信号。相反,您想要一些东西来获取第一个 value.
而不是 .onErrorResume(e -> Mono.never());
,使用 .onErrorResume(e -> Mono.empty());
。然后您可以使用:
Flux.merge(mono0, mono1).next();
merge()
(与 concat()
相反)与 next()
相结合应确保采用第一个值而忽略另一个值。当然,如果你愿意,你仍然可以blockOptional()
。
我有两个 Mono
要么 return 一个 404
要么产生一个 result
.
我怎样才能将这两个 Mono
组合起来
-
result
会在Mono
成功完成后立即发出。 - 一旦所有
Mono
产生404
或面临任何其他错误,Optional.empty
就会被 return 编辑?
我试过了
Mono<Result> mono0 = client.get()
.uri(uri1)
.retrieve()
.bodyToMono(Result.class)
.onErrorResume(e -> Mono.never());
Mono<Result> mono1 = client.get()
.uri(uri2)
.retrieve()
.bodyToMono(Result.class)
.onErrorResume(e -> Mono.never());
return Mono.first(mono0, mono1)
.blockOptional()
这种方法的问题是,如果两个 Mono
都产生错误,它永远不会完成...
我想我需要类似 OnErrorDetach
....!?
为了让问题更清楚,我创建了一个测试用例:
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
class DummyTest {
public Optional<String> getResult(Mono<String> m1, Mono<String> m2) {
return Mono.first(m1, m2)
// DoSomethingHere ?!??!?
.blockOptional();
}
@Test
void testFirstSuccessfullAndSecondErrorMono() {
Optional<String> result = getResult(Mono.just("Something"), Mono.error(new RuntimeException()));
assertThat(result, is(Optional.of("Something")));
}
@Test
void testSecondSuccessfullAndFirstErrorMono() {
Optional<String> result = getResult(Mono.error(new RuntimeException()), Mono.just("Something"));
assertThat(result, is(Optional.of("Something")));
}
@Test
void testTwoErrorMonosYieldEmpty() {
Optional<String> result =
getResult(Mono.error(new RuntimeException()), Mono.error(new RuntimeException()));
assertThat(result, is(Optional.empty()));
}
}
Mono.first()
的问题是它需要第一个 信号 (不是第一个值),所以你最终试图延迟相应的 Mono
永远让它不重播完成信号。相反,您想要一些东西来获取第一个 value.
而不是 .onErrorResume(e -> Mono.never());
,使用 .onErrorResume(e -> Mono.empty());
。然后您可以使用:
Flux.merge(mono0, mono1).next();
merge()
(与 concat()
相反)与 next()
相结合应确保采用第一个值而忽略另一个值。当然,如果你愿意,你仍然可以blockOptional()
。