如何链接多个并发 rx-java Single
How to chain multiple concurrent rx-java Single
我正在使用 Vert.x 学习 Rx-Java2,我想将成功的配置检索与一些并行任务链接起来。
我创建了一个搜索配置的方法,returns 一个 Single 订阅它并且它工作正常。
但是我有疑问在哪里以及如何调用后续任务:
public void start(Future<Void> startFuture) throws Exception {
Single<JsonObject> configSingle = prepareConfigurationAsync();
configSingle.subscribe(onSuccess -> {
System.out.println(onSuccess);
--> Single<Boolean> task1 = prepareLongAsyncTask1(onSuccess).subscribe(...);
--> Completable task2 = prepareLongAsyncTask2(onSuccess)..subscribe(...);
}, onError -> {
startFuture.fail(onError);
}));
我的方法似乎可行,但没有并行性。我怎样才能实现它?
我应该如何以及在何处处理这些订阅?
通常通过 flatMap
继续其他来源。并行处理通常使用 zip
或 merge
来完成。在你的情况下,我认为你不需要内部 Single
的值作为输出的一部分,所以你可以试试这个:
Completable config = prepareConfigurationAsync()
.flatMapCompletable(success ->
System.out.println(success);
return Completable.mergeArray (
prepareLongAsyncTask1(success)
.doOnSuccess(innerSuccess -> /* ... */)
.toCompletable(),
prepareLongAsyncTask2(success)
.doOnComplete(() -> /* ... */)
)
);
config
.subscribe( () -> /* completed */, error -> /* error'd */);
我正在使用 Vert.x 学习 Rx-Java2,我想将成功的配置检索与一些并行任务链接起来。
我创建了一个搜索配置的方法,returns 一个 Single 订阅它并且它工作正常。 但是我有疑问在哪里以及如何调用后续任务:
public void start(Future<Void> startFuture) throws Exception {
Single<JsonObject> configSingle = prepareConfigurationAsync();
configSingle.subscribe(onSuccess -> {
System.out.println(onSuccess);
--> Single<Boolean> task1 = prepareLongAsyncTask1(onSuccess).subscribe(...);
--> Completable task2 = prepareLongAsyncTask2(onSuccess)..subscribe(...);
}, onError -> {
startFuture.fail(onError);
}));
我的方法似乎可行,但没有并行性。我怎样才能实现它?
我应该如何以及在何处处理这些订阅?
通常通过 flatMap
继续其他来源。并行处理通常使用 zip
或 merge
来完成。在你的情况下,我认为你不需要内部 Single
的值作为输出的一部分,所以你可以试试这个:
Completable config = prepareConfigurationAsync()
.flatMapCompletable(success ->
System.out.println(success);
return Completable.mergeArray (
prepareLongAsyncTask1(success)
.doOnSuccess(innerSuccess -> /* ... */)
.toCompletable(),
prepareLongAsyncTask2(success)
.doOnComplete(() -> /* ... */)
)
);
config
.subscribe( () -> /* completed */, error -> /* error'd */);