Flux.onErrorContinue() 空对象

Flux.onErrorContinue() null object

我对 onErrorContinue() 有疑问,传递给 Biconsumer 的对象为空。

我正在使用 Spring boot 2.1.13.RELEASE 和 reactive mongo with reactor-core version 3.2.15.RELEASE.

当调用数据库检索 ID 为 returns 的记录时出现问题,没有记录并使用运算符 switchIfEmtpy() 我使用 Mono.error() 和下游 i尝试使用 onErrorContinue() 处理此异常。

下面的代码解释了这个问题:

public static void main(String[] args) {
    Flux.range(1, 10)
            .flatMap(integer -> mapInteger(integer))
            .doOnNext(System.out::println)
            .onErrorContinue((throwable, o) -> System.out.println("error with " + o)) // o is null
            .subscribe();
}

public static Mono<Integer> mapInteger(Integer num) { // This is here to simulate the db call
    return Mono.just(num)
            .flatMap(t -> {
                if (t == 5)
                    return Mono.empty();
                else
                    return Mono.just(num * 2);
            })
            .switchIfEmpty(Mono.error(new RuntimeException("Error happened while mapping integer!")));
}

这将打印以下值:

2
4
6
8
error with null
12
14
16
18
20

PS。当流中发生另一个错误时,我没有问题。

更新: mapInteger() 用于模拟以下对 reactivemongo 存储库的调用:

public Mono<MetaData> getFromDbByKey(String key) {
        return repository
            .findByKeyAndDeletedIsFalse(key)
            .switchIfEmpty(Mono.error(() -> new RuntimeException()));
    }

以及对 getFromDbByKey() 的调用 returns 我需要将此元数据映射到主流中的通量。

使用 onErrorContinue,我们捕获 Throwable 并根据其类型对每种类型的错误进行不同的处理。

你不能指望在返回时收到“项目”t Mono.empty() if t==5 ?

此代码将打印您需要的内容。

    Flux.range(1, 10)
        .flatMap(integer -> mapInteger(integer))
        .doOnNext(System.out::println)
        .onErrorContinue((throwable, o) -> {
          System.out.println(throwable.getMessage());
        })
        .subscribe();
  }
  public static Mono<Integer> mapInteger(Integer num) { // This is here to simulate the db call
    return Mono.just(num)
        .flatMap(t -> {
          if (t == 5)
            return Mono.empty();
          else
            return Mono.just(num * 2);
        })
        .switchIfEmpty(Mono.error(new RuntimeException("error with " + num)));
  }

打印:

2
4
6
8
error with 5
12
14
16
18
20

在你实际的 mongoDB 调用中,你可以有类似的东西:

public Mono<MetaData> getFromDbByKey(String key) {
        return repository
            .findByKeyAndDeletedIsFalse(key)
            .switchIfEmpty(Mono.error(() -> new RuntimeException("Couldnt find metadata which is not deleted for the key: " + key)));
    }

这是一位同事向我推荐的solution/workaround:

public static void main(String[] args) {

    Flux.range(1, 10)
        .flatMap(integer -> mapInteger(integer)
                           .switchIfEmpty(Mono.error(
                                new CustomException("Error happened while mapping integer!", 
                            integer))))
        .doOnNext(System.out::println)
        .onErrorContinue((throwable, o) -> {
            if (CustomException.class.isInstance(throwable))
                System.out.println("Object to blame " + ((CustomException)throwable).getBlamedObject());
            System.out.println("error with " + o);
        })
    .subscribe();
  }

  public static Mono<Integer> mapInteger(Integer num) {
      return Mono.just(num)
          .flatMap(t -> {
              if (t == 5)
                  return Mono.empty();
              else
                  return Mono.just(num * 2);
          });
  }

  public static class CustomException extends RuntimeException {
      private Integer blamedObject;

      public CustomException(String message, Integer blamedObject) {
          super(message);
          this.blamedObject = blamedObject;
      }

      public Integer getBlamedObject() {
          return blamedObject;
      }
  }

这将 return:

2
4
6
8
Object to blame 5
error with null
12
14
16
18
20

solution/workaround 是创建一个自定义异常,接受有错误的对象,并将 switchIfEmpty() 移动到主流,这样我就可以得到导致问题的对象,稍后在 onErrorContinue() 我可以检查异常类型并获取导致问题的对象。

这对我有用,因为示例中的 mapInteger() 只是对反应式 mongo 存储库的模拟,如果在原始 post.

中没有找到更新的结果