如何在 RxJava2 中链接两个 Completable

How to chain two Completable in RxJava2

我有两个Completable。我想做以下场景: 如果第一个 Completable 到达 onComplete ,则继续第二个 Completable。最终结果将是第二个 Completable 的 onComplete。

当我有 Single getUserIdAlreadySavedInDevice() 和 Completable login():

时,我就是这样做的
@Override
public Completable loginUserThatIsAlreadySavedInDevice(String password) {
    return getUserIdAlreadySavedInDevice()
            .flatMapCompletable(s -> login(password, s))

}

您正在寻找 andThen 操作员。

Returns a Completable that first runs this Completable and then the other completable.

firstCompletable
    .andThen(secondCompletable)

通常,此运算符是 "replacement" 用于 Completable 上的 flatMap:

Completable       andThen(CompletableSource next)
<T> Maybe<T>      andThen(MaybeSource<T> next)
<T> Observable<T> andThen(ObservableSource<T> next)
<T> Flowable<T>   andThen(Publisher<T> next)
<T> Single<T>     andThen(SingleSource<T> next)

尝试

Completable.concat

Returns a Completable which completes only when all sources complete, one after another.

http://reactivex.io/RxJava/javadoc/io/reactivex/Completable.html#concat(java.lang.Iterable)

TL;DR:其他答案遗漏了一个微妙之处。如果您想要 concat 的等价物,请使用 doThingA().andThen(doThingB());如果您想要 flatMap 的等价物,请使用 doThingA().andThen(Completable.defer(() -> doThingB())


编辑:更完整的参考资料

  • flatMap()merge()
  • 的映射版本
  • concatMap()concat()
  • 的映射版本
  • 对于 Completable 你需要 defer() 来使函数调用像在 SingleObservable 的映射函数中一样惰性调用(或者最好让它什么都不发生直到你点击订阅——这是一个很好的约定,并在官方 Rx 库和我遇到的任何 Rx 扩展中使用,对于高级用户,这仅指冷可完成的,但大多数人可以忽略它)。
  • concat(a, b)a.andThen(b) 之间的唯一区别是语法

一些示例:

  • foo(a).andThen(bar(b)) 将:

    1. 呼叫foo(a)
    2. 立即调用bar(b)即使步骤1returns返回的可完成错误
    3. 订阅任何步骤1 returns
    4. 然后订阅bar(b)的结果仅当最后一步成功完成
  • foo(a).andThen(Completable.defer(() -> bar(b)) 将:

    1. 致电foo(a)
    2. 订阅步骤1
    3. 的结果
    4. 仅当 foo(a) 返回的可完成项成功时才调用 bar(b)

我将省略 merge() 的处理,因为它变得有点复杂,但长话短说,如果您想要 "parallelism".[=47=,就可以调用它]


以上答案有些正确,但我发现它们具有误导性,因为它们遗漏了关于急切求值的微妙之处。

doThingA().andThen(doThingB()) 将立即调用 doThingB(),但仅在 doThingA() 返回的可观察对象完成时订阅 doThingB() 返回的可观察对象。

doThingA().andThen(Completable.defer(() -> doThingB()) 只会在事情 A 完成后调用 doThingB()

仅当 doThingB() 在订阅事件之前有副作用时,这才重要。例如。 Single.just(sideEffect(1)).toCompletable()

在订阅事件(真正的冷可观察对象)之前没有副作用的实现可能是 Single.just(1).doOnSuccess(i -> sideEffect(i)).toCompletable()

在这种情况下,A 是一些验证逻辑,doThingB() 立即启动异步数据库更新,完成 VertX ObservableFuture。这是不好的。可以说 doThingB() 应该写成只在订阅时更新数据库,我将在未来尝试以这种方式设计东西。

我遇到了同样的问题,我使用了运算符 .concactWith 使其工作。 就我而言,我有两个 Completable 类型的乐趣。

fun makeTwoThings(): Completable {
    makeFirstThing().concatWith(makeSecondThing())
}

fun makeFirstThing(): Completable{
     //TODO()
}

fun makeSecondThing(): Completable{
     //TODO()
}

注意这里得票多的答案有点误导。请参阅下面的示例,我的想法是展示一些测试场景并展示使用运算符 andThen 的可完成逻辑的行为方式。

 private fun doThingAFail(): Completable {
        print("Do thingA Called\n")
        return Completable.fromCallable {
            print("calling stream A\n")
            throw(Exception("The excep"))
        }
    }

    private fun doThingB(): Completable {
        print("Do thingB Called\n")
        return Completable.fromCallable {
            print("calling stream B\n")

        }
    }

    private fun doThingA(): Completable {
        print("Do thingA Called\n")
        return Completable.fromCallable {
            print("calling stream A\n")
        }
    }

观察下面的测试:

@Test
fun testCallAPlusB() {
    doThingA().andThen(doThingB())
}

输出将是:

Do thingA Called
Do thingB Called

此处请注意:请注意我们没有订阅此代码段中的这些 Completables。

测试:

@Test
fun theTestSubscribe() {
    doThingA().andThen(doThingB()).subscribe()
}

输出将是:

Do thingA Called
Do thingB Called
calling stream A
calling stream B

最后,如果第一个可完成的失败,第二个可完成的将不会被执行。

@Test
fun theTestFailThingA() {
    doThingAFail().andThen(doThingB()).subscribe()
}

输出将是:

Do thingA Called
Do thingB Called
calling stream A

这里的关键概念是方法内部的逻辑和可观察对象内部的逻辑不是同时执行的。 一旦我们调用 doThingA()doThingB() 方法,就会打印“Do thingA Called”和“Do thingB Called”行。而“调用流 A”和 只有当有人订阅 doThingAdoThingB 方法时才会调用“调用流 B”行。

这里的第二个概念是 andThen 运算符将如何处理错误。在上面的示例中,如果 doThingA() completable 以错误结束,流将结束并且不会打印“调用流 B”行。