RxJava 如何从 CompletableFuture<Long> 创建一个非阻塞 Single
RxJava How to create a non blocking Single from CompletableFuture<Long>
给定一个存储库 class,其中 returns 一个:
Single<SomeObject> find()
和一个 CompletableFuture,其中 returns 一个浮点数:
CompletableFuture<Long> completableFuture
我想先调用存储库方法,根据结果,我需要调用completableFuture。这是我的代码:
repository.find()
.flatMap(s -> {
CompletableFuture<Long> completableFuture = serviceReturningCompletableFuture;
return Single.fromFuture(completableFuture);
}).subscribe(System.out::println)
这里的问题是 Single.fromFuture
会阻塞,因此无法使用。
为了解决这个问题,我尝试了以下方法:
repository.find()
.map(s -> {
CompletableFuture<Long> completableFuture = new CompletableFuture<>();
return Flowable.fromFuture(completableFuture);
}).subscribe(System.out::println)
虽然这在没有阻塞的情况下工作正常,但订阅函数打印以下内容而不是 CompletableFuture
返回的数字
io.reactivex.internal.operators.flowable.FlowableFromFuture@62ce978e
我还尝试使用非阻塞转换器从 net.javacrumbs.future-converter:future-converter-rxjava-java8:1.2.0
:
中单选
repository.find()
.map(s -> {
CompletableFuture<Long> completableFuture = new CompletableFuture<>();
return toSingle(completableFuture);
}).subscribe(System.out::println)
但是,这会导致几乎相同的输出:net.javacrumbs.futureconverter.rxjavacommon.RxJavaFutureUtils$ValueSourceBackedSingle@3f1eebb8
我错过了什么?
稍后一些摆弄和这个辅助方法:
public static <T> Single<T> toSingle(CompletableFuture<T> future) {
return Single.create(subscriber ->
future.whenComplete((result, error) -> {
if (error != null) {
subscriber.onError(error);
} else {
subscriber.onSuccess(result);
}
}));
}
似乎可以解决问题:
repository.find()
.flatMap(s -> {
CompletableFuture<Long> completableFuture = serviceReturningCompletableFuture;
return toSingle(completableFuture);
}).subscribe(System.out::println)
正如@akarnokd 在评论中指出的那样,有这个库:
https://github.com/akarnokd/RxJavaJdk8Interop#completionstage-to-rxjava 其工作方式几乎相同:
repository.find()
.flatMap(s -> {
CompletableFuture<Long> completableFuture = serviceReturningCompletableFuture;
return SingleInterop.fromFuture(completableFuture);
}).subscribe(System.out::println)
给定一个存储库 class,其中 returns 一个:
Single<SomeObject> find()
和一个 CompletableFuture,其中 returns 一个浮点数:
CompletableFuture<Long> completableFuture
我想先调用存储库方法,根据结果,我需要调用completableFuture。这是我的代码:
repository.find()
.flatMap(s -> {
CompletableFuture<Long> completableFuture = serviceReturningCompletableFuture;
return Single.fromFuture(completableFuture);
}).subscribe(System.out::println)
这里的问题是 Single.fromFuture
会阻塞,因此无法使用。
为了解决这个问题,我尝试了以下方法:
repository.find()
.map(s -> {
CompletableFuture<Long> completableFuture = new CompletableFuture<>();
return Flowable.fromFuture(completableFuture);
}).subscribe(System.out::println)
虽然这在没有阻塞的情况下工作正常,但订阅函数打印以下内容而不是 CompletableFuture
返回的数字io.reactivex.internal.operators.flowable.FlowableFromFuture@62ce978e
我还尝试使用非阻塞转换器从 net.javacrumbs.future-converter:future-converter-rxjava-java8:1.2.0
:
repository.find()
.map(s -> {
CompletableFuture<Long> completableFuture = new CompletableFuture<>();
return toSingle(completableFuture);
}).subscribe(System.out::println)
但是,这会导致几乎相同的输出:net.javacrumbs.futureconverter.rxjavacommon.RxJavaFutureUtils$ValueSourceBackedSingle@3f1eebb8
我错过了什么?
稍后一些摆弄和这个辅助方法:
public static <T> Single<T> toSingle(CompletableFuture<T> future) {
return Single.create(subscriber ->
future.whenComplete((result, error) -> {
if (error != null) {
subscriber.onError(error);
} else {
subscriber.onSuccess(result);
}
}));
}
似乎可以解决问题:
repository.find()
.flatMap(s -> {
CompletableFuture<Long> completableFuture = serviceReturningCompletableFuture;
return toSingle(completableFuture);
}).subscribe(System.out::println)
正如@akarnokd 在评论中指出的那样,有这个库: https://github.com/akarnokd/RxJavaJdk8Interop#completionstage-to-rxjava 其工作方式几乎相同:
repository.find()
.flatMap(s -> {
CompletableFuture<Long> completableFuture = serviceReturningCompletableFuture;
return SingleInterop.fromFuture(completableFuture);
}).subscribe(System.out::println)