从 Mono.first 发出第一个成功的结果

Emit the first successfull result from Mono.first

我有两个 Mono 要么 return 一个 404 要么产生一个 result.

我怎样才能将这两个 Mono 组合起来

我试过了

      Mono<Result> mono0 = client.get()
          .uri(uri1)
          .retrieve()
          .bodyToMono(Result.class)
          .onErrorResume(e -> Mono.never());

      Mono<Result> mono1 = client.get()
          .uri(uri2)
          .retrieve()
          .bodyToMono(Result.class)
          .onErrorResume(e -> Mono.never());

      return Mono.first(mono0, mono1)
          .blockOptional()

这种方法的问题是,如果两个 Mono 都产生错误,它永远不会完成...

我想我需要类似 OnErrorDetach ....!?

为了让问题更清楚,我创建了一个测试用例:

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

import java.util.Optional;

import org.junit.jupiter.api.Test;

import reactor.core.publisher.Mono;

class DummyTest {

  public Optional<String> getResult(Mono<String> m1, Mono<String> m2) {
    return Mono.first(m1, m2)
        // DoSomethingHere ?!??!?
        .blockOptional();
  }

  @Test
  void testFirstSuccessfullAndSecondErrorMono() {
    Optional<String> result = getResult(Mono.just("Something"), Mono.error(new RuntimeException()));

    assertThat(result, is(Optional.of("Something")));
  }

  @Test
  void testSecondSuccessfullAndFirstErrorMono() {
    Optional<String> result = getResult(Mono.error(new RuntimeException()), Mono.just("Something"));

    assertThat(result, is(Optional.of("Something")));
  }

  @Test
  void testTwoErrorMonosYieldEmpty() {
    Optional<String> result =
        getResult(Mono.error(new RuntimeException()), Mono.error(new RuntimeException()));

    assertThat(result, is(Optional.empty()));
  }


}


Mono.first() 的问题是它需要第一个 信号 (不是第一个值),所以你最终试图延迟相应的 Mono 永远让它不重播完成信号。相反,您想要一些东西来获取第一个 value.

而不是 .onErrorResume(e -> Mono.never());,使用 .onErrorResume(e -> Mono.empty());。然后您可以使用:

Flux.merge(mono0, mono1).next();

merge()(与 concat() 相反)与 next() 相结合应确保采用第一个值而忽略另一个值。当然,如果你愿意,你仍然可以blockOptional()