RxJava 如何从 CompletableFuture<Long> 创建一个非阻塞 Single

RxJava How to create a non blocking Single from CompletableFuture<Long>

给定一个存储库 class,其中 returns 一个:

Single<SomeObject> find()

和一个 CompletableFuture,其中 returns 一个浮点数:

CompletableFuture<Long> completableFuture

我想先调用存储库方法,根据结果,我需要调用completableFuture。这是我的代码:

repository.find()
          .flatMap(s -> {
              CompletableFuture<Long> completableFuture = serviceReturningCompletableFuture;

              return Single.fromFuture(completableFuture);

          }).subscribe(System.out::println)

这里的问题是 Single.fromFuture 会阻塞,因此无法使用。

为了解决这个问题,我尝试了以下方法:

repository.find()
          .map(s -> {
              CompletableFuture<Long> completableFuture = new CompletableFuture<>();

              return Flowable.fromFuture(completableFuture);
          }).subscribe(System.out::println)

虽然这在没有阻塞的情况下工作正常,但订阅函数打印以下内容而不是 CompletableFuture

返回的数字
io.reactivex.internal.operators.flowable.FlowableFromFuture@62ce978e

我还尝试使用非阻塞转换器从 net.javacrumbs.future-converter:future-converter-rxjava-java8:1.2.0:

中单选
repository.find()
          .map(s -> {
              CompletableFuture<Long> completableFuture = new CompletableFuture<>();

              return toSingle(completableFuture);
          }).subscribe(System.out::println)

但是,这会导致几乎相同的输出:net.javacrumbs.futureconverter.rxjavacommon.RxJavaFutureUtils$ValueSourceBackedSingle@3f1eebb8

我错过了什么?

稍后一些摆弄和这个辅助方法:

public static <T> Single<T> toSingle(CompletableFuture<T> future) {
    return Single.create(subscriber ->
            future.whenComplete((result, error) -> {
                if (error != null) {
                    subscriber.onError(error);
                } else {
                    subscriber.onSuccess(result);
                }
            }));
}

似乎可以解决问题:

repository.find()
          .flatMap(s -> {
              CompletableFuture<Long> completableFuture = serviceReturningCompletableFuture;

              return toSingle(completableFuture);

          }).subscribe(System.out::println)

正如@akarnokd 在评论中指出的那样,有这个库: https://github.com/akarnokd/RxJavaJdk8Interop#completionstage-to-rxjava 其工作方式几乎相同:

repository.find()
          .flatMap(s -> {
              CompletableFuture<Long> completableFuture = serviceReturningCompletableFuture;

              return SingleInterop.fromFuture(completableFuture);

          }).subscribe(System.out::println)