如何链接多个并发 rx-java Single

How to chain multiple concurrent rx-java Single

我正在使用 Vert.x 学习 Rx-Java2,我想将成功的配置检索与一些并行任务链接起来。

我创建了一个搜索配置的方法,returns 一个 Single 订阅它并且它工作正常。 但是我有疑问在哪里以及如何调用后续任务:

   public void start(Future<Void> startFuture) throws Exception {
      Single<JsonObject> configSingle = prepareConfigurationAsync();
      configSingle.subscribe(onSuccess -> {
        System.out.println(onSuccess);
        -->   Single<Boolean> task1 = prepareLongAsyncTask1(onSuccess).subscribe(...); 
        -->   Completable task2 = prepareLongAsyncTask2(onSuccess)..subscribe(...); 

    }, onError -> {
        startFuture.fail(onError);
    }));

我的方法似乎可行,但没有并行性。我怎样才能实现它?

我应该如何以及在何处处理这些订阅?

通常通过 flatMap 继续其他来源。并行处理通常使用 zipmerge 来完成。在你的情况下,我认为你不需要内部 Single 的值作为输出的一部分,所以你可以试试这个:

 Completable config = prepareConfigurationAsync()
     .flatMapCompletable(success ->
         System.out.println(success);
         return Completable.mergeArray (
             prepareLongAsyncTask1(success)
                 .doOnSuccess(innerSuccess -> /* ... */)
                 .toCompletable(),
             prepareLongAsyncTask2(success)
                 .doOnComplete(() ->  /* ... */)
         )
      );

  config
  .subscribe( () -> /* completed */, error -> /* error'd */);