如何将 Vert.x Future.compose() 转换为等效的 RxJava2 实现
How do I convert a Vert.x Future.compose() to an equivalent RxJava2 implementation
我有一段代码如下所示:
this.initConfigRetriever() // Produces Future<JsonObject>
.compose(this::asyncLoadDbSchema) // Consumes JsonObject, produces Future<Void>
.compose(v -> this.provisionRouter()) // Produces Future<RouterFactory>
.compose(this::createHttpServer) // Consumes RouterFactory
.compose(s -> startFuture.complete(), startFuture);
而且我想知道如何将其转换为 RxJava2 中的等价物?理想情况下,我想要像 Completable 所做的那样的东西,但是值从一个传递到下一个:
例如:
this.initConfigRetriever() // Returns a JsonObject
.andThen(this::asyncLoadDbSchema) // Consumes JsonObject and produces a Void
.andThen(this::provisionRouter) // Consume Void and produces RouterFactory
.andThen(this::createHttpServer) // Consumes RouterFactory
.onError(startFuture::fail) // Consumes any errors along the chain
下面可能符合您的要求。
flatMap
运算符允许您将值从一个流传递到另一个流的创建中
- 订阅者可以处理错误
我使用 Single
假设这似乎与运行一次的引导逻辑有关
// Produces Future<JsonObject>
Single.just("...")
.flatMapCompletable {
// Consumes JsonObject, emits "completion" (or ends the stream)
Completable.fromCallable { /* ... */ }
}
.toSingle {
// On complete produces RouterFactory
Single.just("...")
}
.flatMapCompletable {
// Consumes RouterFactory, emits "completion" (or ends the stream)
Completable.fromCallable { /* ... */ }
}
.subscribeBy(
onComplete = {
// Handle completion...
},
onError = { error ->
// Handle errors...
}
)
希望对您有所帮助!
这是我的结构最终的样子
this.initConfigRetriever() // If it fails, jump to error handler
.flatMap(this::asyncLoadDbSchema) // If it fails, jump to error handler
.flatMap(this::provisionRouter) // If it fails, jump to error handler
.flatMap(this::createHttpServer) // If it fails, jump to error handler
.doOnError(startFuture::fail)
.subscribe(m -> startFuture.complete()); // If all steps succeeded
我完成此操作的方法是将每个方法进行平面映射 return Maybe<T>
。这非常干净地解决了我的问题。
一个有趣的消极方面是,当我调试以解决这个问题时,很难找出哪里出了问题(我从未开始使用流)。为了在 IntelliJ 中更容易调试,我将方法引用转换为 lambda,以便我可以在 lambda 内部设置断点。
具体可以看代码HERE
我有一段代码如下所示:
this.initConfigRetriever() // Produces Future<JsonObject>
.compose(this::asyncLoadDbSchema) // Consumes JsonObject, produces Future<Void>
.compose(v -> this.provisionRouter()) // Produces Future<RouterFactory>
.compose(this::createHttpServer) // Consumes RouterFactory
.compose(s -> startFuture.complete(), startFuture);
而且我想知道如何将其转换为 RxJava2 中的等价物?理想情况下,我想要像 Completable 所做的那样的东西,但是值从一个传递到下一个:
例如:
this.initConfigRetriever() // Returns a JsonObject
.andThen(this::asyncLoadDbSchema) // Consumes JsonObject and produces a Void
.andThen(this::provisionRouter) // Consume Void and produces RouterFactory
.andThen(this::createHttpServer) // Consumes RouterFactory
.onError(startFuture::fail) // Consumes any errors along the chain
下面可能符合您的要求。
flatMap
运算符允许您将值从一个流传递到另一个流的创建中- 订阅者可以处理错误
我使用
Single
假设这似乎与运行一次的引导逻辑有关// Produces Future<JsonObject> Single.just("...") .flatMapCompletable { // Consumes JsonObject, emits "completion" (or ends the stream) Completable.fromCallable { /* ... */ } } .toSingle { // On complete produces RouterFactory Single.just("...") } .flatMapCompletable { // Consumes RouterFactory, emits "completion" (or ends the stream) Completable.fromCallable { /* ... */ } } .subscribeBy( onComplete = { // Handle completion... }, onError = { error -> // Handle errors... } )
希望对您有所帮助!
这是我的结构最终的样子
this.initConfigRetriever() // If it fails, jump to error handler
.flatMap(this::asyncLoadDbSchema) // If it fails, jump to error handler
.flatMap(this::provisionRouter) // If it fails, jump to error handler
.flatMap(this::createHttpServer) // If it fails, jump to error handler
.doOnError(startFuture::fail)
.subscribe(m -> startFuture.complete()); // If all steps succeeded
我完成此操作的方法是将每个方法进行平面映射 return Maybe<T>
。这非常干净地解决了我的问题。
一个有趣的消极方面是,当我调试以解决这个问题时,很难找出哪里出了问题(我从未开始使用流)。为了在 IntelliJ 中更容易调试,我将方法引用转换为 lambda,以便我可以在 lambda 内部设置断点。
具体可以看代码HERE