如何将 Vert.x Future.compose() 转换为等效的 RxJava2 实现

How do I convert a Vert.x Future.compose() to an equivalent RxJava2 implementation

我有一段代码如下所示:

this.initConfigRetriever()                // Produces Future<JsonObject>
    .compose(this::asyncLoadDbSchema)     // Consumes JsonObject, produces Future<Void>
    .compose(v -> this.provisionRouter()) // Produces Future<RouterFactory>
    .compose(this::createHttpServer)      // Consumes RouterFactory
    .compose(s -> startFuture.complete(), startFuture);

而且我想知道如何将其转换为 RxJava2 中的等价物?理想情况下,我想要像 Completable 所做的那样的东西,但是值从一个传递到下一个:

例如:

this.initConfigRetriever()            // Returns a JsonObject
    .andThen(this::asyncLoadDbSchema) // Consumes JsonObject and produces a Void
    .andThen(this::provisionRouter)   // Consume Void and produces RouterFactory
    .andThen(this::createHttpServer)  // Consumes RouterFactory
    .onError(startFuture::fail)       // Consumes any errors along the chain

下面可能符合您的要求。

  • flatMap 运算符允许您将值从一个流传递到另一个流的创建中
  • 订阅者可以处理错误
  • 我使用 Single 假设这似乎与运行一次的引导逻辑有关

    // Produces Future<JsonObject>
    Single.just("...")
            .flatMapCompletable {
                // Consumes JsonObject, emits "completion" (or ends the stream)
                Completable.fromCallable { /* ... */ }
            }
            .toSingle {
                // On complete produces RouterFactory
                Single.just("...")
            }
            .flatMapCompletable {
                // Consumes RouterFactory, emits "completion" (or ends the stream)
                Completable.fromCallable { /* ... */ }
            }
            .subscribeBy(
                    onComplete = { 
                        // Handle completion...
                    },
                    onError = { error -> 
                        // Handle errors... 
                    }
            )
    

希望对您有所帮助!

这是我的结构最终的样子

this.initConfigRetriever()             // If it fails, jump to error handler
    .flatMap(this::asyncLoadDbSchema)  // If it fails, jump to error handler
    .flatMap(this::provisionRouter)    // If it fails, jump to error handler
    .flatMap(this::createHttpServer)   // If it fails, jump to error handler
    .doOnError(startFuture::fail)
    .subscribe(m -> startFuture.complete()); // If all steps succeeded

我完成此操作的方法是将每个方法进行平面映射 return Maybe<T>。这非常干净地解决了我的问题。

一个有趣的消极方面是,当我调试以解决这个问题时,很难找出哪里出了问题(我从未开始使用流)。为了在 IntelliJ 中更容易调试,我将方法引用转换为 lambda,以便我可以在 lambda 内部设置断点。

具体可以看代码HERE